allspice

A very simple API client for AllSpice Hub

Note that not the full Swagger-API is accessible. The whole implementation is focused on making access and working with Organizations, Teams, Repositories and Users as pain free as possible.

Forked from https://github.com/Langenfeld/py-gitea.

Usage

Docs

See the documentation site.

Examples

Check the examples directory for full, working example scripts that you can adapt or refer to for your own needs.

Quickstart

First get an allspice_client object wrapping access and authentication (via an api token) for your instance of AllSpice Hub.

from allspice import *

# By default, points to hub.allspice.io.
allspice_client = AllSpice(token_text=TOKEN)

# If you are self-hosting:
allspice_client = AllSpice(allspice_hub_url=URL, token_text=TOKEN)

Operations like requesting the AllSpice version or authentication user can be requested directly from the allspice_client object:

print("AllSpice Version: " + allspice_client.get_version())
print("API-Token belongs to user: " + allspice_client.get_user().username)

Adding entities like Users, Organizations, ... also is done via the allspice_client object.

user = allspice_client.create_user("Test Testson", "test@test.test", "password")

All operations on entities in allspice are then accomplished via the according wrapper objects for those entities. Each of those objects has a .request method that creates an entity according to your allspice_client instance.

other_user = User.request(allspice_client, "OtherUserName")
print(other_user.username)

Note that the fields of the User, Organization,... classes are dynamically created at runtime, and thus not visible during divelopment. Refer to the AllSpice API documentation for the fields names.

Fields that can not be altered via allspice-api, are read only. After altering a field, the .commit method of the according object must be called to synchronize the changed fields with your allspice_client instance.

org = Organization.request(allspice_client, test_org)
org.description = "some new description"
org.location = "some new location"
org.commit()

An entity in allspice can be deleted by calling delete.

org.delete()

All entity objects do have methods to execute some of the requests possible though the AllSpice api:

org = Organization.request(allspice_client, ORGNAME)
teams = org.get_teams()
for team in teams:
    repos = team.get_repos()
    for repo in repos:
        print(repo.name)

 1"""
 2.. include:: ../README.md
 3   :start-line: 1
 4   :end-before: Installation
 5"""
 6
 7from .allspice import (
 8    AllSpice,
 9)
10from .apiobject import (
11    Branch,
12    Comment,
13    Commit,
14    Content,
15    DesignReview,
16    Issue,
17    Milestone,
18    Organization,
19    Release,
20    Repository,
21    Team,
22    User,
23)
24from .exceptions import AlreadyExistsException, NotFoundException
25
26__version__ = "3.8.0"
27
28__all__ = [
29    "AllSpice",
30    "AlreadyExistsException",
31    "Branch",
32    "Comment",
33    "Commit",
34    "Content",
35    "DesignReview",
36    "Issue",
37    "Milestone",
38    "NotFoundException",
39    "Organization",
40    "Release",
41    "Repository",
42    "Team",
43    "User",
44]
class AllSpice:
 21class AllSpice:
 22    """Object to establish a session with AllSpice Hub."""
 23
 24    ADMIN_CREATE_USER = """/admin/users"""
 25    GET_USERS_ADMIN = """/admin/users"""
 26    ADMIN_REPO_CREATE = """/admin/users/%s/repos"""  # <ownername>
 27    ALLSPICE_HUB_VERSION = """/version"""
 28    GET_USER = """/user"""
 29    GET_REPOSITORY = """/repos/{owner}/{name}"""
 30    CREATE_ORG = """/admin/users/%s/orgs"""  # <username>
 31    CREATE_TEAM = """/orgs/%s/teams"""  # <orgname>
 32
 33    def __init__(
 34        self,
 35        allspice_hub_url="https://hub.allspice.io",
 36        token_text=None,
 37        auth=None,
 38        verify=True,
 39        log_level="INFO",
 40        ratelimiting=(100, 60),
 41    ):
 42        """Initializing an instance of the AllSpice Hub Client
 43
 44        Args:
 45            allspice_hub_url (str): The URL for the AllSpice Hub instance.
 46                Defaults to `https://hub.allspice.io`.
 47
 48            token_text (str, None): The access token, by default None.
 49
 50            auth (tuple, None): The user credentials
 51                `(username, password)`, by default None.
 52
 53            verify (bool): If True, allow insecure server connections
 54                when using SSL.
 55
 56            log_level (str): The log level, by default `INFO`.
 57
 58            ratelimiting (tuple[int, int], None): `(max_calls, period)`,
 59                If None, no rate limiting is applied. By default, 100 calls
 60                per minute are allowed.
 61        """
 62
 63        self.logger = logging.getLogger(__name__)
 64        handler = logging.StreamHandler(sys.stderr)
 65        handler.setFormatter(
 66            logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
 67        )
 68        self.logger.addHandler(handler)
 69        self.logger.setLevel(log_level)
 70        self.headers = {
 71            "Content-type": "application/json",
 72        }
 73        self.url = allspice_hub_url
 74
 75        if ratelimiting is None:
 76            self.requests = requests.Session()
 77        else:
 78            (max_calls, period) = ratelimiting
 79            self.requests = RateLimitedSession(max_calls=max_calls, period=period)
 80
 81        # Manage authentification
 82        if not token_text and not auth:
 83            raise ValueError("Please provide auth or token_text, but not both")
 84        if token_text:
 85            self.headers["Authorization"] = "token " + token_text
 86        if auth:
 87            self.logger.warning(
 88                "Using basic auth is not recommended. Prefer using a token instead."
 89            )
 90            self.requests.auth = auth
 91
 92        # Manage SSL certification verification
 93        self.requests.verify = verify
 94        if not verify:
 95            urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
 96
 97    def __get_url(self, endpoint):
 98        url = self.url + "/api/v1" + endpoint
 99        self.logger.debug("Url: %s" % url)
100        return url
101
102    def __get(self, endpoint: str, params: Mapping = frozendict()) -> requests.Response:
103        request = self.requests.get(self.__get_url(endpoint), headers=self.headers, params=params)
104        if request.status_code not in [200, 201]:
105            message = f"Received status code: {request.status_code} ({request.url})"
106            if request.status_code in [404]:
107                raise NotFoundException(message)
108            if request.status_code in [403]:
109                raise Exception(
110                    f"Unauthorized: {request.url} - Check your permissions and try again! ({message})"
111                )
112            if request.status_code in [409]:
113                raise ConflictException(message)
114            if request.status_code in [503]:
115                raise NotYetGeneratedException(message)
116            raise Exception(message)
117        return request
118
119    @staticmethod
120    def parse_result(result) -> Dict:
121        """Parses the result-JSON to a dict."""
122        if result.text and len(result.text) > 3:
123            return json.loads(result.text)
124        return {}
125
126    def requests_get(self, endpoint: str, params: Mapping = frozendict(), sudo=None):
127        combined_params = {}
128        combined_params.update(params)
129        if sudo:
130            combined_params["sudo"] = sudo.username
131        return self.parse_result(self.__get(endpoint, combined_params))
132
133    def requests_get_raw(self, endpoint: str, params=frozendict(), sudo=None) -> bytes:
134        combined_params = {}
135        combined_params.update(params)
136        if sudo:
137            combined_params["sudo"] = sudo.username
138        return self.__get(endpoint, combined_params).content
139
140    def requests_get_paginated(
141        self,
142        endpoint: str,
143        params=frozendict(),
144        sudo=None,
145        page_key: str = "page",
146        first_page: int = 1,
147    ):
148        page = first_page
149        combined_params = {}
150        combined_params.update(params)
151        aggregated_result = []
152        while True:
153            combined_params[page_key] = page
154            result = self.requests_get(endpoint, combined_params, sudo)
155
156            if not result:
157                return aggregated_result
158
159            if isinstance(result, dict):
160                if "data" in result:
161                    data = result["data"]
162                    if len(data) == 0:
163                        return aggregated_result
164                    aggregated_result.extend(data)
165                elif "tree" in result:
166                    data = result["tree"]
167                    if data is None or len(data) == 0:
168                        return aggregated_result
169                    aggregated_result.extend(data)
170                else:
171                    raise NotImplementedError(
172                        "requests_get_paginated does not know how to handle responses of this type."
173                    )
174            else:
175                aggregated_result.extend(result)
176
177            page += 1
178
179    def requests_put(self, endpoint: str, data: Optional[dict] = None):
180        if not data:
181            data = {}
182        request = self.requests.put(
183            self.__get_url(endpoint), headers=self.headers, data=json.dumps(data)
184        )
185        if request.status_code not in [200, 204]:
186            message = f"Received status code: {request.status_code} ({request.url}) {request.text}"
187            self.logger.error(message)
188            raise Exception(message)
189
190    def requests_delete(self, endpoint: str, data: Optional[dict] = None):
191        request = self.requests.delete(
192            self.__get_url(endpoint), headers=self.headers, data=json.dumps(data)
193        )
194        if request.status_code not in [200, 204]:
195            message = f"Received status code: {request.status_code} ({request.url})"
196            self.logger.error(message)
197            raise Exception(message)
198
199    def requests_post(
200        self,
201        endpoint: str,
202        data: Optional[dict] = None,
203        params: Optional[dict] = None,
204        files: Optional[dict] = None,
205    ):
206        """
207        Make a POST call to the endpoint.
208
209        :param endpoint: The path to the endpoint
210        :param data: A dictionary for JSON data
211        :param params: A dictionary of query params
212        :param files: A dictionary of files, see requests.post. Using both files and data
213                      can lead to unexpected results!
214        :return: The JSON response parsed as a dict
215        """
216
217        # This should ideally be a TypedDict of the type of arguments taken by
218        # `requests.post`.
219        args: dict[str, Any] = {
220            "headers": self.headers.copy(),
221        }
222        if data is not None:
223            args["data"] = json.dumps(data)
224        if params is not None:
225            args["params"] = params
226        if files is not None:
227            args["headers"].pop("Content-type")
228            args["files"] = files
229
230        request = self.requests.post(self.__get_url(endpoint), **args)
231
232        if request.status_code not in [200, 201, 202]:
233            if "already exists" in request.text or "e-mail already in use" in request.text:
234                self.logger.warning(request.text)
235                raise AlreadyExistsException()
236            self.logger.error(f"Received status code: {request.status_code} ({request.url})")
237            self.logger.error(f"With info: {data} ({self.headers})")
238            self.logger.error(f"Answer: {request.text}")
239            raise Exception(
240                f"Received status code: {request.status_code} ({request.url}), {request.text}"
241            )
242        return self.parse_result(request)
243
244    def requests_patch(self, endpoint: str, data: dict):
245        request = self.requests.patch(
246            self.__get_url(endpoint), headers=self.headers, data=json.dumps(data)
247        )
248        if request.status_code not in [200, 201]:
249            error_message = f"Received status code: {request.status_code} ({request.url}) {data}"
250            self.logger.error(error_message)
251            raise Exception(error_message)
252        return self.parse_result(request)
253
254    def get_orgs_public_members_all(self, orgname):
255        path = "/orgs/" + orgname + "/public_members"
256        return self.requests_get(path)
257
258    def get_orgs(self):
259        path = "/admin/orgs"
260        results = self.requests_get(path)
261        return [Organization.parse_response(self, result) for result in results]
262
263    def get_user(self):
264        result = self.requests_get(AllSpice.GET_USER)
265        return User.parse_response(self, result)
266
267    def get_version(self) -> str:
268        result = self.requests_get(AllSpice.ALLSPICE_HUB_VERSION)
269        return result["version"]
270
271    def get_users(self) -> List[User]:
272        results = self.requests_get(AllSpice.GET_USERS_ADMIN)
273        return [User.parse_response(self, result) for result in results]
274
275    def get_user_by_email(self, email: str) -> Optional[User]:
276        users = self.get_users()
277        for user in users:
278            if user.email == email or email in user.emails:
279                return user
280        return None
281
282    def get_user_by_name(self, username: str) -> Optional[User]:
283        users = self.get_users()
284        for user in users:
285            if user.username == username:
286                return user
287        return None
288
289    def get_repository(self, owner: str, name: str) -> Repository:
290        path = self.GET_REPOSITORY.format(owner=owner, name=name)
291        result = self.requests_get(path)
292        return Repository.parse_response(self, result)
293
294    def create_user(
295        self,
296        user_name: str,
297        email: str,
298        password: str,
299        full_name: Optional[str] = None,
300        login_name: Optional[str] = None,
301        change_pw=True,
302        send_notify=True,
303        source_id=0,
304    ):
305        """Create User.
306        Throws:
307            AlreadyExistsException, if the User exists already
308            Exception, if something else went wrong.
309        """
310        if not login_name:
311            login_name = user_name
312        if not full_name:
313            full_name = user_name
314        request_data = {
315            "source_id": source_id,
316            "login_name": login_name,
317            "full_name": full_name,
318            "username": user_name,
319            "email": email,
320            "password": password,
321            "send_notify": send_notify,
322            "must_change_password": change_pw,
323        }
324
325        self.logger.debug("Gitea post payload: %s", request_data)
326        result = self.requests_post(AllSpice.ADMIN_CREATE_USER, data=request_data)
327        if "id" in result:
328            self.logger.info(
329                "Successfully created User %s <%s> (id %s)",
330                result["login"],
331                result["email"],
332                result["id"],
333            )
334            self.logger.debug("Gitea response: %s", result)
335        else:
336            self.logger.error(result["message"])
337            raise Exception("User not created... (gitea: %s)" % result["message"])
338        user = User.parse_response(self, result)
339        return user
340
341    def create_repo(
342        self,
343        repoOwner: Union[User, Organization],
344        repoName: str,
345        description: str = "",
346        private: bool = False,
347        autoInit=True,
348        gitignores: Optional[str] = None,
349        license: Optional[str] = None,
350        readme: str = "Default",
351        issue_labels: Optional[str] = None,
352        default_branch="master",
353    ):
354        """Create a Repository as the administrator
355
356        Throws:
357            AlreadyExistsException: If the Repository exists already.
358            Exception: If something else went wrong.
359
360        Note:
361            Non-admin users can not use this method. Please use instead
362            `allspice.User.create_repo` or `allspice.Organization.create_repo`.
363        """
364        # although this only says user in the api, this also works for
365        # organizations
366        assert isinstance(repoOwner, User) or isinstance(repoOwner, Organization)
367        result = self.requests_post(
368            AllSpice.ADMIN_REPO_CREATE % repoOwner.username,
369            data={
370                "name": repoName,
371                "description": description,
372                "private": private,
373                "auto_init": autoInit,
374                "gitignores": gitignores,
375                "license": license,
376                "issue_labels": issue_labels,
377                "readme": readme,
378                "default_branch": default_branch,
379            },
380        )
381        if "id" in result:
382            self.logger.info("Successfully created Repository %s " % result["name"])
383        else:
384            self.logger.error(result["message"])
385            raise Exception("Repository not created... (gitea: %s)" % result["message"])
386        return Repository.parse_response(self, result)
387
388    def create_org(
389        self,
390        owner: User,
391        orgName: str,
392        description: str,
393        location="",
394        website="",
395        full_name="",
396    ):
397        assert isinstance(owner, User)
398        result = self.requests_post(
399            AllSpice.CREATE_ORG % owner.username,
400            data={
401                "username": orgName,
402                "description": description,
403                "location": location,
404                "website": website,
405                "full_name": full_name,
406            },
407        )
408        if "id" in result:
409            self.logger.info("Successfully created Organization %s" % result["username"])
410        else:
411            self.logger.error("Organization not created... (gitea: %s)" % result["message"])
412            self.logger.error(result["message"])
413            raise Exception("Organization not created... (gitea: %s)" % result["message"])
414        return Organization.parse_response(self, result)
415
416    def create_team(
417        self,
418        org: Organization,
419        name: str,
420        description: str = "",
421        permission: str = "read",
422        can_create_org_repo: bool = False,
423        includes_all_repositories: bool = False,
424        units=(
425            "repo.code",
426            "repo.issues",
427            "repo.ext_issues",
428            "repo.wiki",
429            "repo.pulls",
430            "repo.releases",
431            "repo.ext_wiki",
432        ),
433        units_map={},
434    ):
435        """Creates a Team.
436
437        Args:
438            org (Organization): Organization the Team will be part of.
439            name (str): The Name of the Team to be created.
440            description (str): Optional, None, short description of the new Team.
441            permission (str): Optional, 'read', What permissions the members
442            units_map (dict): Optional, {}, a mapping of units to their
443                permissions. If None or empty, the `permission` permission will
444                be applied to all units. Note: When both `units` and `units_map`
445                are given, `units_map` will be preferred.
446        """
447
448        result = self.requests_post(
449            AllSpice.CREATE_TEAM % org.username,
450            data={
451                "name": name,
452                "description": description,
453                "permission": permission,
454                "can_create_org_repo": can_create_org_repo,
455                "includes_all_repositories": includes_all_repositories,
456                "units": units,
457                "units_map": units_map,
458            },
459        )
460
461        if "id" in result:
462            self.logger.info("Successfully created Team %s" % result["name"])
463        else:
464            self.logger.error("Team not created... (gitea: %s)" % result["message"])
465            self.logger.error(result["message"])
466            raise Exception("Team not created... (gitea: %s)" % result["message"])
467        api_object = Team.parse_response(self, result)
468        setattr(
469            api_object, "_organization", org
470        )  # fixes strange behaviour of gitea not returning a valid organization here.
471        return api_object

Object to establish a session with AllSpice Hub.

AllSpice( allspice_hub_url='https://hub.allspice.io', token_text=None, auth=None, verify=True, log_level='INFO', ratelimiting=(100, 60))
33    def __init__(
34        self,
35        allspice_hub_url="https://hub.allspice.io",
36        token_text=None,
37        auth=None,
38        verify=True,
39        log_level="INFO",
40        ratelimiting=(100, 60),
41    ):
42        """Initializing an instance of the AllSpice Hub Client
43
44        Args:
45            allspice_hub_url (str): The URL for the AllSpice Hub instance.
46                Defaults to `https://hub.allspice.io`.
47
48            token_text (str, None): The access token, by default None.
49
50            auth (tuple, None): The user credentials
51                `(username, password)`, by default None.
52
53            verify (bool): If True, allow insecure server connections
54                when using SSL.
55
56            log_level (str): The log level, by default `INFO`.
57
58            ratelimiting (tuple[int, int], None): `(max_calls, period)`,
59                If None, no rate limiting is applied. By default, 100 calls
60                per minute are allowed.
61        """
62
63        self.logger = logging.getLogger(__name__)
64        handler = logging.StreamHandler(sys.stderr)
65        handler.setFormatter(
66            logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
67        )
68        self.logger.addHandler(handler)
69        self.logger.setLevel(log_level)
70        self.headers = {
71            "Content-type": "application/json",
72        }
73        self.url = allspice_hub_url
74
75        if ratelimiting is None:
76            self.requests = requests.Session()
77        else:
78            (max_calls, period) = ratelimiting
79            self.requests = RateLimitedSession(max_calls=max_calls, period=period)
80
81        # Manage authentification
82        if not token_text and not auth:
83            raise ValueError("Please provide auth or token_text, but not both")
84        if token_text:
85            self.headers["Authorization"] = "token " + token_text
86        if auth:
87            self.logger.warning(
88                "Using basic auth is not recommended. Prefer using a token instead."
89            )
90            self.requests.auth = auth
91
92        # Manage SSL certification verification
93        self.requests.verify = verify
94        if not verify:
95            urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

Initializing an instance of the AllSpice Hub Client

Args: allspice_hub_url (str): The URL for the AllSpice Hub instance. Defaults to https://hub.allspice.io.

token_text (str, None): The access token, by default None.

auth (tuple, None): The user credentials
    `(username, password)`, by default None.

verify (bool): If True, allow insecure server connections
    when using SSL.

log_level (str): The log level, by default `INFO`.

ratelimiting (tuple[int, int], None): `(max_calls, period)`,
    If None, no rate limiting is applied. By default, 100 calls
    per minute are allowed.
ADMIN_CREATE_USER = '/admin/users'
GET_USERS_ADMIN = '/admin/users'
ADMIN_REPO_CREATE = '/admin/users/%s/repos'
ALLSPICE_HUB_VERSION = '/version'
GET_USER = '/user'
GET_REPOSITORY = '/repos/{owner}/{name}'
CREATE_ORG = '/admin/users/%s/orgs'
CREATE_TEAM = '/orgs/%s/teams'
logger
headers
url
@staticmethod
def parse_result(result) -> Dict:
119    @staticmethod
120    def parse_result(result) -> Dict:
121        """Parses the result-JSON to a dict."""
122        if result.text and len(result.text) > 3:
123            return json.loads(result.text)
124        return {}

Parses the result-JSON to a dict.

def requests_get( self, endpoint: str, params: Mapping = frozendict.frozendict({}), sudo=None):
126    def requests_get(self, endpoint: str, params: Mapping = frozendict(), sudo=None):
127        combined_params = {}
128        combined_params.update(params)
129        if sudo:
130            combined_params["sudo"] = sudo.username
131        return self.parse_result(self.__get(endpoint, combined_params))
def requests_get_raw( self, endpoint: str, params=frozendict.frozendict({}), sudo=None) -> bytes:
133    def requests_get_raw(self, endpoint: str, params=frozendict(), sudo=None) -> bytes:
134        combined_params = {}
135        combined_params.update(params)
136        if sudo:
137            combined_params["sudo"] = sudo.username
138        return self.__get(endpoint, combined_params).content
def requests_get_paginated( self, endpoint: str, params=frozendict.frozendict({}), sudo=None, page_key: str = 'page', first_page: int = 1):
140    def requests_get_paginated(
141        self,
142        endpoint: str,
143        params=frozendict(),
144        sudo=None,
145        page_key: str = "page",
146        first_page: int = 1,
147    ):
148        page = first_page
149        combined_params = {}
150        combined_params.update(params)
151        aggregated_result = []
152        while True:
153            combined_params[page_key] = page
154            result = self.requests_get(endpoint, combined_params, sudo)
155
156            if not result:
157                return aggregated_result
158
159            if isinstance(result, dict):
160                if "data" in result:
161                    data = result["data"]
162                    if len(data) == 0:
163                        return aggregated_result
164                    aggregated_result.extend(data)
165                elif "tree" in result:
166                    data = result["tree"]
167                    if data is None or len(data) == 0:
168                        return aggregated_result
169                    aggregated_result.extend(data)
170                else:
171                    raise NotImplementedError(
172                        "requests_get_paginated does not know how to handle responses of this type."
173                    )
174            else:
175                aggregated_result.extend(result)
176
177            page += 1
def requests_put(self, endpoint: str, data: Optional[dict] = None):
179    def requests_put(self, endpoint: str, data: Optional[dict] = None):
180        if not data:
181            data = {}
182        request = self.requests.put(
183            self.__get_url(endpoint), headers=self.headers, data=json.dumps(data)
184        )
185        if request.status_code not in [200, 204]:
186            message = f"Received status code: {request.status_code} ({request.url}) {request.text}"
187            self.logger.error(message)
188            raise Exception(message)
def requests_delete(self, endpoint: str, data: Optional[dict] = None):
190    def requests_delete(self, endpoint: str, data: Optional[dict] = None):
191        request = self.requests.delete(
192            self.__get_url(endpoint), headers=self.headers, data=json.dumps(data)
193        )
194        if request.status_code not in [200, 204]:
195            message = f"Received status code: {request.status_code} ({request.url})"
196            self.logger.error(message)
197            raise Exception(message)
def requests_post( self, endpoint: str, data: Optional[dict] = None, params: Optional[dict] = None, files: Optional[dict] = None):
199    def requests_post(
200        self,
201        endpoint: str,
202        data: Optional[dict] = None,
203        params: Optional[dict] = None,
204        files: Optional[dict] = None,
205    ):
206        """
207        Make a POST call to the endpoint.
208
209        :param endpoint: The path to the endpoint
210        :param data: A dictionary for JSON data
211        :param params: A dictionary of query params
212        :param files: A dictionary of files, see requests.post. Using both files and data
213                      can lead to unexpected results!
214        :return: The JSON response parsed as a dict
215        """
216
217        # This should ideally be a TypedDict of the type of arguments taken by
218        # `requests.post`.
219        args: dict[str, Any] = {
220            "headers": self.headers.copy(),
221        }
222        if data is not None:
223            args["data"] = json.dumps(data)
224        if params is not None:
225            args["params"] = params
226        if files is not None:
227            args["headers"].pop("Content-type")
228            args["files"] = files
229
230        request = self.requests.post(self.__get_url(endpoint), **args)
231
232        if request.status_code not in [200, 201, 202]:
233            if "already exists" in request.text or "e-mail already in use" in request.text:
234                self.logger.warning(request.text)
235                raise AlreadyExistsException()
236            self.logger.error(f"Received status code: {request.status_code} ({request.url})")
237            self.logger.error(f"With info: {data} ({self.headers})")
238            self.logger.error(f"Answer: {request.text}")
239            raise Exception(
240                f"Received status code: {request.status_code} ({request.url}), {request.text}"
241            )
242        return self.parse_result(request)

Make a POST call to the endpoint.

Parameters
  • endpoint: The path to the endpoint
  • data: A dictionary for JSON data
  • params: A dictionary of query params
  • files: A dictionary of files, see requests.post. Using both files and data can lead to unexpected results!
Returns

The JSON response parsed as a dict

def requests_patch(self, endpoint: str, data: dict):
244    def requests_patch(self, endpoint: str, data: dict):
245        request = self.requests.patch(
246            self.__get_url(endpoint), headers=self.headers, data=json.dumps(data)
247        )
248        if request.status_code not in [200, 201]:
249            error_message = f"Received status code: {request.status_code} ({request.url}) {data}"
250            self.logger.error(error_message)
251            raise Exception(error_message)
252        return self.parse_result(request)
def get_orgs_public_members_all(self, orgname):
254    def get_orgs_public_members_all(self, orgname):
255        path = "/orgs/" + orgname + "/public_members"
256        return self.requests_get(path)
def get_orgs(self):
258    def get_orgs(self):
259        path = "/admin/orgs"
260        results = self.requests_get(path)
261        return [Organization.parse_response(self, result) for result in results]
def get_user(self):
263    def get_user(self):
264        result = self.requests_get(AllSpice.GET_USER)
265        return User.parse_response(self, result)
def get_version(self) -> str:
267    def get_version(self) -> str:
268        result = self.requests_get(AllSpice.ALLSPICE_HUB_VERSION)
269        return result["version"]
def get_users(self) -> List[User]:
271    def get_users(self) -> List[User]:
272        results = self.requests_get(AllSpice.GET_USERS_ADMIN)
273        return [User.parse_response(self, result) for result in results]
def get_user_by_email(self, email: str) -> Optional[User]:
275    def get_user_by_email(self, email: str) -> Optional[User]:
276        users = self.get_users()
277        for user in users:
278            if user.email == email or email in user.emails:
279                return user
280        return None
def get_user_by_name(self, username: str) -> Optional[User]:
282    def get_user_by_name(self, username: str) -> Optional[User]:
283        users = self.get_users()
284        for user in users:
285            if user.username == username:
286                return user
287        return None
def get_repository(self, owner: str, name: str) -> Repository:
289    def get_repository(self, owner: str, name: str) -> Repository:
290        path = self.GET_REPOSITORY.format(owner=owner, name=name)
291        result = self.requests_get(path)
292        return Repository.parse_response(self, result)
def create_user( self, user_name: str, email: str, password: str, full_name: Optional[str] = None, login_name: Optional[str] = None, change_pw=True, send_notify=True, source_id=0):
294    def create_user(
295        self,
296        user_name: str,
297        email: str,
298        password: str,
299        full_name: Optional[str] = None,
300        login_name: Optional[str] = None,
301        change_pw=True,
302        send_notify=True,
303        source_id=0,
304    ):
305        """Create User.
306        Throws:
307            AlreadyExistsException, if the User exists already
308            Exception, if something else went wrong.
309        """
310        if not login_name:
311            login_name = user_name
312        if not full_name:
313            full_name = user_name
314        request_data = {
315            "source_id": source_id,
316            "login_name": login_name,
317            "full_name": full_name,
318            "username": user_name,
319            "email": email,
320            "password": password,
321            "send_notify": send_notify,
322            "must_change_password": change_pw,
323        }
324
325        self.logger.debug("Gitea post payload: %s", request_data)
326        result = self.requests_post(AllSpice.ADMIN_CREATE_USER, data=request_data)
327        if "id" in result:
328            self.logger.info(
329                "Successfully created User %s <%s> (id %s)",
330                result["login"],
331                result["email"],
332                result["id"],
333            )
334            self.logger.debug("Gitea response: %s", result)
335        else:
336            self.logger.error(result["message"])
337            raise Exception("User not created... (gitea: %s)" % result["message"])
338        user = User.parse_response(self, result)
339        return user

Create User. Throws: AlreadyExistsException, if the User exists already Exception, if something else went wrong.

def create_repo( self, repoOwner: Union[User, Organization], repoName: str, description: str = '', private: bool = False, autoInit=True, gitignores: Optional[str] = None, license: Optional[str] = None, readme: str = 'Default', issue_labels: Optional[str] = None, default_branch='master'):
341    def create_repo(
342        self,
343        repoOwner: Union[User, Organization],
344        repoName: str,
345        description: str = "",
346        private: bool = False,
347        autoInit=True,
348        gitignores: Optional[str] = None,
349        license: Optional[str] = None,
350        readme: str = "Default",
351        issue_labels: Optional[str] = None,
352        default_branch="master",
353    ):
354        """Create a Repository as the administrator
355
356        Throws:
357            AlreadyExistsException: If the Repository exists already.
358            Exception: If something else went wrong.
359
360        Note:
361            Non-admin users can not use this method. Please use instead
362            `allspice.User.create_repo` or `allspice.Organization.create_repo`.
363        """
364        # although this only says user in the api, this also works for
365        # organizations
366        assert isinstance(repoOwner, User) or isinstance(repoOwner, Organization)
367        result = self.requests_post(
368            AllSpice.ADMIN_REPO_CREATE % repoOwner.username,
369            data={
370                "name": repoName,
371                "description": description,
372                "private": private,
373                "auto_init": autoInit,
374                "gitignores": gitignores,
375                "license": license,
376                "issue_labels": issue_labels,
377                "readme": readme,
378                "default_branch": default_branch,
379            },
380        )
381        if "id" in result:
382            self.logger.info("Successfully created Repository %s " % result["name"])
383        else:
384            self.logger.error(result["message"])
385            raise Exception("Repository not created... (gitea: %s)" % result["message"])
386        return Repository.parse_response(self, result)

Create a Repository as the administrator

Throws: AlreadyExistsException: If the Repository exists already. Exception: If something else went wrong.

Note: Non-admin users can not use this method. Please use instead allspice.User.create_repo or allspice.Organization.create_repo.

def create_org( self, owner: User, orgName: str, description: str, location='', website='', full_name=''):
388    def create_org(
389        self,
390        owner: User,
391        orgName: str,
392        description: str,
393        location="",
394        website="",
395        full_name="",
396    ):
397        assert isinstance(owner, User)
398        result = self.requests_post(
399            AllSpice.CREATE_ORG % owner.username,
400            data={
401                "username": orgName,
402                "description": description,
403                "location": location,
404                "website": website,
405                "full_name": full_name,
406            },
407        )
408        if "id" in result:
409            self.logger.info("Successfully created Organization %s" % result["username"])
410        else:
411            self.logger.error("Organization not created... (gitea: %s)" % result["message"])
412            self.logger.error(result["message"])
413            raise Exception("Organization not created... (gitea: %s)" % result["message"])
414        return Organization.parse_response(self, result)
def create_team( self, org: Organization, name: str, description: str = '', permission: str = 'read', can_create_org_repo: bool = False, includes_all_repositories: bool = False, units=('repo.code', 'repo.issues', 'repo.ext_issues', 'repo.wiki', 'repo.pulls', 'repo.releases', 'repo.ext_wiki'), units_map={}):
416    def create_team(
417        self,
418        org: Organization,
419        name: str,
420        description: str = "",
421        permission: str = "read",
422        can_create_org_repo: bool = False,
423        includes_all_repositories: bool = False,
424        units=(
425            "repo.code",
426            "repo.issues",
427            "repo.ext_issues",
428            "repo.wiki",
429            "repo.pulls",
430            "repo.releases",
431            "repo.ext_wiki",
432        ),
433        units_map={},
434    ):
435        """Creates a Team.
436
437        Args:
438            org (Organization): Organization the Team will be part of.
439            name (str): The Name of the Team to be created.
440            description (str): Optional, None, short description of the new Team.
441            permission (str): Optional, 'read', What permissions the members
442            units_map (dict): Optional, {}, a mapping of units to their
443                permissions. If None or empty, the `permission` permission will
444                be applied to all units. Note: When both `units` and `units_map`
445                are given, `units_map` will be preferred.
446        """
447
448        result = self.requests_post(
449            AllSpice.CREATE_TEAM % org.username,
450            data={
451                "name": name,
452                "description": description,
453                "permission": permission,
454                "can_create_org_repo": can_create_org_repo,
455                "includes_all_repositories": includes_all_repositories,
456                "units": units,
457                "units_map": units_map,
458            },
459        )
460
461        if "id" in result:
462            self.logger.info("Successfully created Team %s" % result["name"])
463        else:
464            self.logger.error("Team not created... (gitea: %s)" % result["message"])
465            self.logger.error(result["message"])
466            raise Exception("Team not created... (gitea: %s)" % result["message"])
467        api_object = Team.parse_response(self, result)
468        setattr(
469            api_object, "_organization", org
470        )  # fixes strange behaviour of gitea not returning a valid organization here.
471        return api_object

Creates a Team.

Args: org (Organization): Organization the Team will be part of. name (str): The Name of the Team to be created. description (str): Optional, None, short description of the new Team. permission (str): Optional, 'read', What permissions the members units_map (dict): Optional, {}, a mapping of units to their permissions. If None or empty, the permission permission will be applied to all units. Note: When both units and units_map are given, units_map will be preferred.

class AlreadyExistsException(builtins.Exception):
2class AlreadyExistsException(Exception):
3    pass

Common base class for all non-exit exceptions.

class Branch(allspice.baseapiobject.ReadonlyApiObject):
419class Branch(ReadonlyApiObject):
420    commit: Dict[str, Optional[Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]]]
421    effective_branch_protection_name: str
422    enable_status_check: bool
423    name: str
424    protected: bool
425    required_approvals: int
426    status_check_contexts: List[Any]
427    user_can_merge: bool
428    user_can_push: bool
429
430    API_OBJECT = """/repos/{owner}/{repo}/branches/{branch}"""
431
432    def __init__(self, allspice_client):
433        super().__init__(allspice_client)
434
435    def __eq__(self, other):
436        if not isinstance(other, Branch):
437            return False
438        return self.commit == other.commit and self.name == other.name
439
440    def __hash__(self):
441        return hash(self.commit["id"]) ^ hash(self.name)
442
443    _fields_to_parsers: ClassVar[dict] = {
444        # This is not a commit object
445        # "commit": lambda allspice_client, c: Commit.parse_response(allspice_client, c)
446    }
447
448    @classmethod
449    def request(cls, allspice_client, owner: str, repo: str, branch: str):
450        return cls._request(allspice_client, {"owner": owner, "repo": repo, "branch": branch})
Branch(allspice_client)
432    def __init__(self, allspice_client):
433        super().__init__(allspice_client)
commit: Dict[str, Union[str, Dict[str, str], Dict[str, Union[bool, str, NoneType]], NoneType]]
effective_branch_protection_name: str
enable_status_check: bool
name: str
protected: bool
required_approvals: int
status_check_contexts: List[Any]
user_can_merge: bool
user_can_push: bool
API_OBJECT = '/repos/{owner}/{repo}/branches/{branch}'
@classmethod
def request(cls, allspice_client, owner: str, repo: str, branch: str):
448    @classmethod
449    def request(cls, allspice_client, owner: str, repo: str, branch: str):
450        return cls._request(allspice_client, {"owner": owner, "repo": repo, "branch": branch})
class Comment(allspice.baseapiobject.ApiObject):
1566class Comment(ApiObject):
1567    assets: List[Union[Any, Dict[str, Union[int, str]]]]
1568    body: str
1569    created_at: datetime
1570    html_url: str
1571    id: int
1572    issue_url: str
1573    original_author: str
1574    original_author_id: int
1575    pull_request_url: str
1576    updated_at: datetime
1577    user: User
1578
1579    API_OBJECT = """/repos/{owner}/{repo}/issues/comments/{id}"""
1580    GET_ATTACHMENTS_PATH = """/repos/{owner}/{repo}/issues/comments/{id}/assets"""
1581    ATTACHMENT_PATH = """/repos/{owner}/{repo}/issues/comments/{id}/assets/{attachment_id}"""
1582
1583    def __init__(self, allspice_client):
1584        super().__init__(allspice_client)
1585
1586    def __eq__(self, other):
1587        if not isinstance(other, Comment):
1588            return False
1589        return self.repository == other.repository and self.id == other.id
1590
1591    def __hash__(self):
1592        return hash(self.repository) ^ hash(self.id)
1593
1594    @classmethod
1595    def request(cls, allspice_client, owner: str, repo: str, id: str) -> "Comment":
1596        return cls._request(allspice_client, {"owner": owner, "repo": repo, "id": id})
1597
1598    _fields_to_parsers: ClassVar[dict] = {
1599        "user": lambda allspice_client, r: User.parse_response(allspice_client, r),
1600        "created_at": lambda _, t: Util.convert_time(t),
1601        "updated_at": lambda _, t: Util.convert_time(t),
1602    }
1603
1604    _patchable_fields: ClassVar[set[str]] = {"body"}
1605
1606    @property
1607    def parent_url(self) -> str:
1608        """URL of the parent of this comment (the issue or the pull request)"""
1609
1610        if self.issue_url is not None and self.issue_url != "":
1611            return self.issue_url
1612        else:
1613            return self.pull_request_url
1614
1615    @cached_property
1616    def repository(self) -> Repository:
1617        """The repository this comment was posted on."""
1618
1619        owner_name, repo_name = self.parent_url.split("/")[-4:-2]
1620        return Repository.request(self.allspice_client, owner_name, repo_name)
1621
1622    def __fields_for_path(self):
1623        return {
1624            "owner": self.repository.owner.username,
1625            "repo": self.repository.name,
1626            "id": self.id,
1627        }
1628
1629    def commit(self):
1630        self._commit(self.__fields_for_path())
1631
1632    def delete(self):
1633        self.allspice_client.requests_delete(self.API_OBJECT.format(**self.__fields_for_path()))
1634        self.deleted = True
1635
1636    def get_attachments(self) -> List[Attachment]:
1637        """
1638        Get all attachments on this comment. This returns Attachment objects, which
1639        contain a link to download the attachment.
1640
1641        https://hub.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments
1642        """
1643
1644        results = self.allspice_client.requests_get(
1645            self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path())
1646        )
1647        return [Attachment.parse_response(self.allspice_client, result) for result in results]
1648
1649    def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment:
1650        """
1651        Create an attachment on this comment.
1652
1653        https://hub.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment
1654
1655        :param file: The file to attach. This should be a file-like object.
1656        :param name: The name of the file. If not provided, the name of the file will be
1657                     used.
1658        :return: The created attachment.
1659        """
1660
1661        args: dict[str, Any] = {
1662            "files": {"attachment": file},
1663        }
1664        if name is not None:
1665            args["params"] = {"name": name}
1666
1667        result = self.allspice_client.requests_post(
1668            self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()),
1669            **args,
1670        )
1671        return Attachment.parse_response(self.allspice_client, result)
1672
1673    def edit_attachment(self, attachment: Attachment, data: dict) -> Attachment:
1674        """
1675        Edit an attachment.
1676
1677        The list of params that can be edited is available at
1678        https://hub.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment
1679
1680        :param attachment: The attachment to be edited
1681        :param data: The data parameter should be a dictionary of the fields to edit.
1682        :return: The edited attachment
1683        """
1684
1685        args = {
1686            **self.__fields_for_path(),
1687            "attachment_id": attachment.id,
1688        }
1689        result = self.allspice_client.requests_patch(
1690            self.ATTACHMENT_PATH.format(**args),
1691            data=data,
1692        )
1693        return Attachment.parse_response(self.allspice_client, result)
1694
1695    def delete_attachment(self, attachment: Attachment):
1696        """https://hub.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment"""
1697
1698        args = {
1699            **self.__fields_for_path(),
1700            "attachment_id": attachment.id,
1701        }
1702        self.allspice_client.requests_delete(self.ATTACHMENT_PATH.format(**args))
1703        attachment.deleted = True
Comment(allspice_client)
1583    def __init__(self, allspice_client):
1584        super().__init__(allspice_client)
assets: List[Union[Any, Dict[str, Union[int, str]]]]
body: str
created_at: datetime.datetime
html_url: str
id: int
issue_url: str
original_author: str
original_author_id: int
pull_request_url: str
updated_at: datetime.datetime
user: User
API_OBJECT = '/repos/{owner}/{repo}/issues/comments/{id}'
GET_ATTACHMENTS_PATH = '/repos/{owner}/{repo}/issues/comments/{id}/assets'
ATTACHMENT_PATH = '/repos/{owner}/{repo}/issues/comments/{id}/assets/{attachment_id}'
@classmethod
def request( cls, allspice_client, owner: str, repo: str, id: str) -> Comment:
1594    @classmethod
1595    def request(cls, allspice_client, owner: str, repo: str, id: str) -> "Comment":
1596        return cls._request(allspice_client, {"owner": owner, "repo": repo, "id": id})
parent_url: str
1606    @property
1607    def parent_url(self) -> str:
1608        """URL of the parent of this comment (the issue or the pull request)"""
1609
1610        if self.issue_url is not None and self.issue_url != "":
1611            return self.issue_url
1612        else:
1613            return self.pull_request_url

URL of the parent of this comment (the issue or the pull request)

repository: Repository
1615    @cached_property
1616    def repository(self) -> Repository:
1617        """The repository this comment was posted on."""
1618
1619        owner_name, repo_name = self.parent_url.split("/")[-4:-2]
1620        return Repository.request(self.allspice_client, owner_name, repo_name)

The repository this comment was posted on.

def commit(self):
1629    def commit(self):
1630        self._commit(self.__fields_for_path())
def delete(self):
1632    def delete(self):
1633        self.allspice_client.requests_delete(self.API_OBJECT.format(**self.__fields_for_path()))
1634        self.deleted = True
def get_attachments(self) -> List[allspice.apiobject.Attachment]:
1636    def get_attachments(self) -> List[Attachment]:
1637        """
1638        Get all attachments on this comment. This returns Attachment objects, which
1639        contain a link to download the attachment.
1640
1641        https://hub.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments
1642        """
1643
1644        results = self.allspice_client.requests_get(
1645            self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path())
1646        )
1647        return [Attachment.parse_response(self.allspice_client, result) for result in results]

Get all attachments on this comment. This returns Attachment objects, which contain a link to download the attachment.

allspice.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments">https://huballspice.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments

def create_attachment( self, file: <class 'IO'>, name: Optional[str] = None) -> allspice.apiobject.Attachment:
1649    def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment:
1650        """
1651        Create an attachment on this comment.
1652
1653        https://hub.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment
1654
1655        :param file: The file to attach. This should be a file-like object.
1656        :param name: The name of the file. If not provided, the name of the file will be
1657                     used.
1658        :return: The created attachment.
1659        """
1660
1661        args: dict[str, Any] = {
1662            "files": {"attachment": file},
1663        }
1664        if name is not None:
1665            args["params"] = {"name": name}
1666
1667        result = self.allspice_client.requests_post(
1668            self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()),
1669            **args,
1670        )
1671        return Attachment.parse_response(self.allspice_client, result)

Create an attachment on this comment.

allspice.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment

Parameters
  • file: The file to attach. This should be a file-like object.
  • name: The name of the file. If not provided, the name of the file will be used.
Returns

The created attachment.

def edit_attachment( self, attachment: allspice.apiobject.Attachment, data: dict) -> allspice.apiobject.Attachment:
1673    def edit_attachment(self, attachment: Attachment, data: dict) -> Attachment:
1674        """
1675        Edit an attachment.
1676
1677        The list of params that can be edited is available at
1678        https://hub.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment
1679
1680        :param attachment: The attachment to be edited
1681        :param data: The data parameter should be a dictionary of the fields to edit.
1682        :return: The edited attachment
1683        """
1684
1685        args = {
1686            **self.__fields_for_path(),
1687            "attachment_id": attachment.id,
1688        }
1689        result = self.allspice_client.requests_patch(
1690            self.ATTACHMENT_PATH.format(**args),
1691            data=data,
1692        )
1693        return Attachment.parse_response(self.allspice_client, result)

Edit an attachment.

The list of params that can be edited is available at allspice.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment

Parameters
  • attachment: The attachment to be edited
  • data: The data parameter should be a dictionary of the fields to edit.
Returns

The edited attachment

def delete_attachment(self, attachment: allspice.apiobject.Attachment):
1695    def delete_attachment(self, attachment: Attachment):
1696        """https://hub.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment"""
1697
1698        args = {
1699            **self.__fields_for_path(),
1700            "attachment_id": attachment.id,
1701        }
1702        self.allspice_client.requests_delete(self.ATTACHMENT_PATH.format(**args))
1703        attachment.deleted = True

allspice.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment

class Commit(allspice.baseapiobject.ReadonlyApiObject):
1706class Commit(ReadonlyApiObject):
1707    author: User
1708    commit: Dict[str, Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]]
1709    committer: Dict[str, Union[int, str, bool]]
1710    created: str
1711    files: List[Dict[str, str]]
1712    html_url: str
1713    inner_commit: Dict[str, Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]]
1714    parents: List[Union[Dict[str, str], Any]]
1715    sha: str
1716    stats: Dict[str, int]
1717    url: str
1718
1719    API_OBJECT = """/repos/{owner}/{repo}/commits/{sha}"""
1720    COMMIT_GET_STATUS = """/repos/{owner}/{repo}/commits/{sha}/status"""
1721    COMMIT_GET_STATUSES = """/repos/{owner}/{repo}/commits/{sha}/statuses"""
1722
1723    # Regex to extract owner and repo names from the url property
1724    URL_REGEXP = re.compile(r"/repos/([^/]+)/([^/]+)/git/commits")
1725
1726    def __init__(self, allspice_client):
1727        super().__init__(allspice_client)
1728
1729    _fields_to_parsers: ClassVar[dict] = {
1730        # NOTE: api may return None for commiters that are no allspice users
1731        "author": lambda allspice_client, u: (
1732            User.parse_response(allspice_client, u) if u else None
1733        )
1734    }
1735
1736    def __eq__(self, other):
1737        if not isinstance(other, Commit):
1738            return False
1739        return self.sha == other.sha
1740
1741    def __hash__(self):
1742        return hash(self.sha)
1743
1744    @classmethod
1745    def parse_response(cls, allspice_client, result) -> "Commit":
1746        commit_cache = result["commit"]
1747        api_object = cls(allspice_client)
1748        cls._initialize(allspice_client, api_object, result)
1749        # inner_commit for legacy reasons
1750        Commit._add_read_property("inner_commit", commit_cache, api_object)
1751        return api_object
1752
1753    def get_status(self) -> CommitCombinedStatus:
1754        """
1755        Get a combined status consisting of all statues on this commit.
1756
1757        Note that the returned object is a CommitCombinedStatus object, which
1758        also contains a list of all statuses on the commit.
1759
1760        https://hub.allspice.io/api/swagger#/repository/repoGetCommitStatus
1761        """
1762
1763        result = self.allspice_client.requests_get(
1764            self.COMMIT_GET_STATUS.format(**self._fields_for_path)
1765        )
1766        return CommitCombinedStatus.parse_response(self.allspice_client, result)
1767
1768    def get_statuses(self) -> List[CommitStatus]:
1769        """
1770        Get a list of all statuses on this commit.
1771
1772        https://hub.allspice.io/api/swagger#/repository/repoListCommitStatuses
1773        """
1774
1775        results = self.allspice_client.requests_get(
1776            self.COMMIT_GET_STATUSES.format(**self._fields_for_path)
1777        )
1778        return [CommitStatus.parse_response(self.allspice_client, result) for result in results]
1779
1780    @cached_property
1781    def _fields_for_path(self) -> dict[str, str]:
1782        matches = self.URL_REGEXP.search(self.url)
1783        if not matches:
1784            raise ValueError(f"Invalid commit URL: {self.url}")
1785
1786        return {
1787            "owner": matches.group(1),
1788            "repo": matches.group(2),
1789            "sha": self.sha,
1790        }
Commit(allspice_client)
1726    def __init__(self, allspice_client):
1727        super().__init__(allspice_client)
author: User
commit: Dict[str, Union[str, Dict[str, str], Dict[str, Union[bool, str, NoneType]]]]
committer: Dict[str, Union[int, str, bool]]
created: str
files: List[Dict[str, str]]
html_url: str
inner_commit: Dict[str, Union[str, Dict[str, str], Dict[str, Union[bool, str, NoneType]]]]
parents: List[Union[Dict[str, str], Any]]
sha: str
stats: Dict[str, int]
url: str
API_OBJECT = '/repos/{owner}/{repo}/commits/{sha}'
COMMIT_GET_STATUS = '/repos/{owner}/{repo}/commits/{sha}/status'
COMMIT_GET_STATUSES = '/repos/{owner}/{repo}/commits/{sha}/statuses'
URL_REGEXP = re.compile('/repos/([^/]+)/([^/]+)/git/commits')
@classmethod
def parse_response(cls, allspice_client, result) -> Commit:
1744    @classmethod
1745    def parse_response(cls, allspice_client, result) -> "Commit":
1746        commit_cache = result["commit"]
1747        api_object = cls(allspice_client)
1748        cls._initialize(allspice_client, api_object, result)
1749        # inner_commit for legacy reasons
1750        Commit._add_read_property("inner_commit", commit_cache, api_object)
1751        return api_object
def get_status(self) -> allspice.apiobject.CommitCombinedStatus:
1753    def get_status(self) -> CommitCombinedStatus:
1754        """
1755        Get a combined status consisting of all statues on this commit.
1756
1757        Note that the returned object is a CommitCombinedStatus object, which
1758        also contains a list of all statuses on the commit.
1759
1760        https://hub.allspice.io/api/swagger#/repository/repoGetCommitStatus
1761        """
1762
1763        result = self.allspice_client.requests_get(
1764            self.COMMIT_GET_STATUS.format(**self._fields_for_path)
1765        )
1766        return CommitCombinedStatus.parse_response(self.allspice_client, result)

Get a combined status consisting of all statues on this commit.

Note that the returned object is a CommitCombinedStatus object, which also contains a list of all statuses on the commit.

allspice.allspice.io/api/swagger#/repository/repoGetCommitStatus">https://huballspice.allspice.io/api/swagger#/repository/repoGetCommitStatus

def get_statuses(self) -> List[allspice.apiobject.CommitStatus]:
1768    def get_statuses(self) -> List[CommitStatus]:
1769        """
1770        Get a list of all statuses on this commit.
1771
1772        https://hub.allspice.io/api/swagger#/repository/repoListCommitStatuses
1773        """
1774
1775        results = self.allspice_client.requests_get(
1776            self.COMMIT_GET_STATUSES.format(**self._fields_for_path)
1777        )
1778        return [CommitStatus.parse_response(self.allspice_client, result) for result in results]

Get a list of all statuses on this commit.

allspice.allspice.io/api/swagger#/repository/repoListCommitStatuses">https://huballspice.allspice.io/api/swagger#/repository/repoListCommitStatuses

class Content(allspice.baseapiobject.ReadonlyApiObject):
2600class Content(ReadonlyApiObject):
2601    content: Any
2602    download_url: str
2603    encoding: Any
2604    git_url: str
2605    html_url: str
2606    last_commit_sha: str
2607    name: str
2608    path: str
2609    sha: str
2610    size: int
2611    submodule_git_url: Any
2612    target: Any
2613    type: str
2614    url: str
2615
2616    FILE = "file"
2617
2618    def __init__(self, allspice_client):
2619        super().__init__(allspice_client)
2620
2621    def __eq__(self, other):
2622        if not isinstance(other, Content):
2623            return False
2624
2625        return self.sha == other.sha and self.name == other.name
2626
2627    def __hash__(self):
2628        return hash(self.sha) ^ hash(self.name)
Content(allspice_client)
2618    def __init__(self, allspice_client):
2619        super().__init__(allspice_client)
content: Any
download_url: str
encoding: Any
git_url: str
html_url: str
last_commit_sha: str
name: str
path: str
sha: str
size: int
submodule_git_url: Any
target: Any
type: str
url: str
FILE = 'file'
class DesignReview(allspice.baseapiobject.ApiObject):
2084class DesignReview(ApiObject):
2085    """
2086    A Design Review. See
2087    https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest.
2088
2089    Note: The base and head fields are not `Branch` objects - they are plain strings
2090    referring to the branch names. This is because DRs can exist for branches that have
2091    been deleted, which don't have an associated `Branch` object from the API. You can use
2092    the `Repository.get_branch` method to get a `Branch` object for a branch if you know
2093    it exists.
2094    """
2095
2096    additions: int
2097    allow_maintainer_edit: bool
2098    allow_maintainer_edits: Any
2099    assignee: User
2100    assignees: List["User"]
2101    base: str
2102    body: str
2103    changed_files: int
2104    closed_at: Optional[str]
2105    comments: int
2106    created_at: str
2107    deletions: int
2108    diff_url: str
2109    draft: bool
2110    due_date: Optional[str]
2111    head: str
2112    html_url: str
2113    id: int
2114    is_locked: bool
2115    labels: List[Any]
2116    merge_base: str
2117    merge_commit_sha: Optional[str]
2118    mergeable: bool
2119    merged: bool
2120    merged_at: Optional[str]
2121    merged_by: Optional["User"]
2122    milestone: Any
2123    number: int
2124    patch_url: str
2125    pin_order: int
2126    repository: Optional["Repository"]
2127    requested_reviewers: Any
2128    review_comments: int
2129    state: str
2130    title: str
2131    updated_at: str
2132    url: str
2133    user: User
2134
2135    API_OBJECT = "/repos/{owner}/{repo}/pulls/{index}"
2136    MERGE_DESIGN_REVIEW = "/repos/{owner}/{repo}/pulls/{index}/merge"
2137    GET_COMMENTS = "/repos/{owner}/{repo}/issues/{index}/comments"
2138
2139    OPEN = "open"
2140    CLOSED = "closed"
2141
2142    class MergeType(Enum):
2143        MERGE = "merge"
2144        REBASE = "rebase"
2145        REBASE_MERGE = "rebase-merge"
2146        SQUASH = "squash"
2147        MANUALLY_MERGED = "manually-merged"
2148
2149    def __init__(self, allspice_client):
2150        super().__init__(allspice_client)
2151
2152    def __eq__(self, other):
2153        if not isinstance(other, DesignReview):
2154            return False
2155        return self.repository == other.repository and self.id == other.id
2156
2157    def __hash__(self):
2158        return hash(self.repository) ^ hash(self.id)
2159
2160    @classmethod
2161    def parse_response(cls, allspice_client, result) -> "DesignReview":
2162        api_object = super().parse_response(allspice_client, result)
2163        cls._add_read_property(
2164            "repository",
2165            Repository.parse_response(allspice_client, result["base"]["repo"]),
2166            api_object,
2167        )
2168
2169        return api_object
2170
2171    @classmethod
2172    def request(cls, allspice_client, owner: str, repo: str, number: str):
2173        """See https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest"""
2174        return cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number})
2175
2176    _fields_to_parsers: ClassVar[dict] = {
2177        "assignee": lambda allspice_client, u: User.parse_response(allspice_client, u),
2178        "assignees": lambda allspice_client, us: [
2179            User.parse_response(allspice_client, u) for u in us
2180        ],
2181        "base": lambda _, b: b["ref"],
2182        "head": lambda _, h: h["ref"],
2183        "merged_by": lambda allspice_client, u: User.parse_response(allspice_client, u),
2184        "milestone": lambda allspice_client, m: Milestone.parse_response(allspice_client, m),
2185        "user": lambda allspice_client, u: User.parse_response(allspice_client, u),
2186    }
2187
2188    _patchable_fields: ClassVar[set[str]] = {
2189        "allow_maintainer_edits",
2190        "assignee",
2191        "assignees",
2192        "base",
2193        "body",
2194        "due_date",
2195        "milestone",
2196        "state",
2197        "title",
2198    }
2199
2200    _parsers_to_fields: ClassVar[dict] = {
2201        "assignee": lambda u: u.username,
2202        "assignees": lambda us: [u.username for u in us],
2203        "base": lambda b: b.name if isinstance(b, Branch) else b,
2204        "milestone": lambda m: m.id,
2205    }
2206
2207    def commit(self):
2208        data = self.get_dirty_fields()
2209        if "due_date" in data and data["due_date"] is None:
2210            data["unset_due_date"] = True
2211
2212        args = {
2213            "owner": self.repository.owner.username,
2214            "repo": self.repository.name,
2215            "index": self.number,
2216        }
2217        self._commit(args, data)
2218
2219    def merge(self, merge_type: MergeType):
2220        """
2221        Merge the pull request. See
2222        https://hub.allspice.io/api/swagger#/repository/repoMergePullRequest
2223
2224        :param merge_type: The type of merge to perform. See the MergeType enum.
2225        """
2226
2227        self.allspice_client.requests_post(
2228            self.MERGE_DESIGN_REVIEW.format(
2229                owner=self.repository.owner.username,
2230                repo=self.repository.name,
2231                index=self.number,
2232            ),
2233            data={"Do": merge_type.value},
2234        )
2235
2236    def get_comments(self) -> List[Comment]:
2237        """
2238        Get the comments on this pull request, but not specifically on a review.
2239
2240        https://hub.allspice.io/api/swagger#/issue/issueGetComments
2241
2242        :return: A list of comments on this pull request.
2243        """
2244
2245        results = self.allspice_client.requests_get(
2246            self.GET_COMMENTS.format(
2247                owner=self.repository.owner.username,
2248                repo=self.repository.name,
2249                index=self.number,
2250            )
2251        )
2252        return [Comment.parse_response(self.allspice_client, result) for result in results]
2253
2254    def create_comment(self, body: str):
2255        """
2256        Create a comment on this pull request. This uses the same endpoint as the
2257        comments on issues, and will not be associated with any reviews.
2258
2259        https://hub.allspice.io/api/swagger#/issue/issueCreateComment
2260
2261        :param body: The body of the comment.
2262        :return: The comment that was created.
2263        """
2264
2265        result = self.allspice_client.requests_post(
2266            self.GET_COMMENTS.format(
2267                owner=self.repository.owner.username,
2268                repo=self.repository.name,
2269                index=self.number,
2270            ),
2271            data={"body": body},
2272        )
2273        return Comment.parse_response(self.allspice_client, result)

A Design Review. See allspice.allspice.io/api/swagger#/repository/repoGetPullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoGetPullRequest.

Note: The base and head fields are not Branch objects - they are plain strings referring to the branch names. This is because DRs can exist for branches that have been deleted, which don't have an associated Branch object from the API. You can use the Repository.get_branch method to get a Branch object for a branch if you know it exists.

DesignReview(allspice_client)
2149    def __init__(self, allspice_client):
2150        super().__init__(allspice_client)
additions: int
allow_maintainer_edit: bool
allow_maintainer_edits: Any
assignee: User
assignees: List[User]
base: str
body: str
changed_files: int
closed_at: Optional[str]
comments: int
created_at: str
deletions: int
diff_url: str
draft: bool
due_date: Optional[str]
head: str
html_url: str
id: int
is_locked: bool
labels: List[Any]
merge_base: str
merge_commit_sha: Optional[str]
mergeable: bool
merged: bool
merged_at: Optional[str]
merged_by: Optional[User]
milestone: Any
number: int
patch_url: str
pin_order: int
repository: Optional[Repository]
requested_reviewers: Any
review_comments: int
state: str
title: str
updated_at: str
url: str
user: User
API_OBJECT = '/repos/{owner}/{repo}/pulls/{index}'
MERGE_DESIGN_REVIEW = '/repos/{owner}/{repo}/pulls/{index}/merge'
GET_COMMENTS = '/repos/{owner}/{repo}/issues/{index}/comments'
OPEN = 'open'
CLOSED = 'closed'
@classmethod
def parse_response(cls, allspice_client, result) -> DesignReview:
2160    @classmethod
2161    def parse_response(cls, allspice_client, result) -> "DesignReview":
2162        api_object = super().parse_response(allspice_client, result)
2163        cls._add_read_property(
2164            "repository",
2165            Repository.parse_response(allspice_client, result["base"]["repo"]),
2166            api_object,
2167        )
2168
2169        return api_object
@classmethod
def request(cls, allspice_client, owner: str, repo: str, number: str):
2171    @classmethod
2172    def request(cls, allspice_client, owner: str, repo: str, number: str):
2173        """See https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest"""
2174        return cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number})

See allspice.allspice.io/api/swagger#/repository/repoGetPullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoGetPullRequest

def commit(self):
2207    def commit(self):
2208        data = self.get_dirty_fields()
2209        if "due_date" in data and data["due_date"] is None:
2210            data["unset_due_date"] = True
2211
2212        args = {
2213            "owner": self.repository.owner.username,
2214            "repo": self.repository.name,
2215            "index": self.number,
2216        }
2217        self._commit(args, data)
def merge(self, merge_type: DesignReview.MergeType):
2219    def merge(self, merge_type: MergeType):
2220        """
2221        Merge the pull request. See
2222        https://hub.allspice.io/api/swagger#/repository/repoMergePullRequest
2223
2224        :param merge_type: The type of merge to perform. See the MergeType enum.
2225        """
2226
2227        self.allspice_client.requests_post(
2228            self.MERGE_DESIGN_REVIEW.format(
2229                owner=self.repository.owner.username,
2230                repo=self.repository.name,
2231                index=self.number,
2232            ),
2233            data={"Do": merge_type.value},
2234        )

Merge the pull request. See allspice.allspice.io/api/swagger#/repository/repoMergePullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoMergePullRequest

Parameters
  • merge_type: The type of merge to perform. See the MergeType enum.
def get_comments(self) -> List[Comment]:
2236    def get_comments(self) -> List[Comment]:
2237        """
2238        Get the comments on this pull request, but not specifically on a review.
2239
2240        https://hub.allspice.io/api/swagger#/issue/issueGetComments
2241
2242        :return: A list of comments on this pull request.
2243        """
2244
2245        results = self.allspice_client.requests_get(
2246            self.GET_COMMENTS.format(
2247                owner=self.repository.owner.username,
2248                repo=self.repository.name,
2249                index=self.number,
2250            )
2251        )
2252        return [Comment.parse_response(self.allspice_client, result) for result in results]

Get the comments on this pull request, but not specifically on a review.

allspice.allspice.io/api/swagger#/issue/issueGetComments">https://huballspice.allspice.io/api/swagger#/issue/issueGetComments

Returns

A list of comments on this pull request.

def create_comment(self, body: str):
2254    def create_comment(self, body: str):
2255        """
2256        Create a comment on this pull request. This uses the same endpoint as the
2257        comments on issues, and will not be associated with any reviews.
2258
2259        https://hub.allspice.io/api/swagger#/issue/issueCreateComment
2260
2261        :param body: The body of the comment.
2262        :return: The comment that was created.
2263        """
2264
2265        result = self.allspice_client.requests_post(
2266            self.GET_COMMENTS.format(
2267                owner=self.repository.owner.username,
2268                repo=self.repository.name,
2269                index=self.number,
2270            ),
2271            data={"body": body},
2272        )
2273        return Comment.parse_response(self.allspice_client, result)

Create a comment on this pull request. This uses the same endpoint as the comments on issues, and will not be associated with any reviews.

allspice.allspice.io/api/swagger#/issue/issueCreateComment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateComment

Parameters
  • body: The body of the comment.
Returns

The comment that was created.

class DesignReview.MergeType(enum.Enum):
2142    class MergeType(Enum):
2143        MERGE = "merge"
2144        REBASE = "rebase"
2145        REBASE_MERGE = "rebase-merge"
2146        SQUASH = "squash"
2147        MANUALLY_MERGED = "manually-merged"
MERGE = <MergeType.MERGE: 'merge'>
REBASE = <MergeType.REBASE: 'rebase'>
REBASE_MERGE = <MergeType.REBASE_MERGE: 'rebase-merge'>
SQUASH = <MergeType.SQUASH: 'squash'>
MANUALLY_MERGED = <MergeType.MANUALLY_MERGED: 'manually-merged'>
class Issue(allspice.baseapiobject.ApiObject):
1881class Issue(ApiObject):
1882    """
1883    An issue on a repository.
1884
1885    Note: `Issue.assets` may not have any entries even if the issue has
1886    attachments. This happens when an issue is fetched via a bulk method like
1887    `Repository.get_issues`. In most cases, prefer using
1888    `Issue.get_attachments` to get the attachments on an issue.
1889    """
1890
1891    assets: List[Union[Any, "Attachment"]]
1892    assignee: Any
1893    assignees: Any
1894    body: str
1895    closed_at: Any
1896    comments: int
1897    created_at: str
1898    due_date: Any
1899    html_url: str
1900    id: int
1901    is_locked: bool
1902    labels: List[Any]
1903    milestone: Optional["Milestone"]
1904    number: int
1905    original_author: str
1906    original_author_id: int
1907    pin_order: int
1908    pull_request: Any
1909    ref: str
1910    repository: Dict[str, Union[int, str]]
1911    state: str
1912    title: str
1913    updated_at: str
1914    url: str
1915    user: User
1916
1917    API_OBJECT = """/repos/{owner}/{repo}/issues/{index}"""  # <owner, repo, index>
1918    GET_TIME = """/repos/%s/%s/issues/%s/times"""  # <owner, repo, index>
1919    GET_COMMENTS = """/repos/{owner}/{repo}/issues/{index}/comments"""
1920    CREATE_ISSUE = """/repos/{owner}/{repo}/issues"""
1921    GET_ATTACHMENTS = """/repos/{owner}/{repo}/issues/{index}/assets"""
1922
1923    OPENED = "open"
1924    CLOSED = "closed"
1925
1926    def __init__(self, allspice_client):
1927        super().__init__(allspice_client)
1928
1929    def __eq__(self, other):
1930        if not isinstance(other, Issue):
1931            return False
1932        return self.repository == other.repository and self.id == other.id
1933
1934    def __hash__(self):
1935        return hash(self.repository) ^ hash(self.id)
1936
1937    _fields_to_parsers: ClassVar[dict] = {
1938        "milestone": lambda allspice_client, m: Milestone.parse_response(allspice_client, m),
1939        "user": lambda allspice_client, u: User.parse_response(allspice_client, u),
1940        "assets": lambda allspice_client, assets: [
1941            Attachment.parse_response(allspice_client, a) for a in assets
1942        ],
1943        "assignee": lambda allspice_client, u: User.parse_response(allspice_client, u),
1944        "assignees": lambda allspice_client, us: [
1945            User.parse_response(allspice_client, u) for u in us
1946        ],
1947        "state": lambda _, s: (Issue.CLOSED if s == "closed" else Issue.OPENED),
1948    }
1949
1950    _parsers_to_fields: ClassVar[dict] = {
1951        "milestone": lambda m: m.id,
1952    }
1953
1954    _patchable_fields: ClassVar[set[str]] = {
1955        "assignee",
1956        "assignees",
1957        "body",
1958        "due_date",
1959        "milestone",
1960        "state",
1961        "title",
1962    }
1963
1964    def commit(self):
1965        args = {
1966            "owner": self.repository.owner.username,
1967            "repo": self.repository.name,
1968            "index": self.number,
1969        }
1970        self._commit(args)
1971
1972    @classmethod
1973    def request(cls, allspice_client, owner: str, repo: str, number: str):
1974        api_object = cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number})
1975        # The repository in the response is a RepositoryMeta object, so request
1976        # the full repository object and add it to the issue object.
1977        repository = Repository.request(allspice_client, owner, repo)
1978        setattr(api_object, "_repository", repository)
1979        # For legacy reasons
1980        cls._add_read_property("repo", repository, api_object)
1981        return api_object
1982
1983    @classmethod
1984    def create_issue(cls, allspice_client, repo: Repository, title: str, body: str = ""):
1985        args = {"owner": repo.owner.username, "repo": repo.name}
1986        data = {"title": title, "body": body}
1987        result = allspice_client.requests_post(Issue.CREATE_ISSUE.format(**args), data=data)
1988        issue = Issue.parse_response(allspice_client, result)
1989        setattr(issue, "_repository", repo)
1990        cls._add_read_property("repo", repo, issue)
1991        return issue
1992
1993    @property
1994    def owner(self) -> Organization | User:
1995        return self.repository.owner
1996
1997    def get_time_sum(self, user: User) -> int:
1998        results = self.allspice_client.requests_get(
1999            Issue.GET_TIME % (self.owner.username, self.repository.name, self.number)
2000        )
2001        return sum(result["time"] for result in results if result and result["user_id"] == user.id)
2002
2003    def get_times(self) -> Optional[Dict]:
2004        return self.allspice_client.requests_get(
2005            Issue.GET_TIME % (self.owner.username, self.repository.name, self.number)
2006        )
2007
2008    def delete_time(self, time_id: str):
2009        path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times/{time_id}"
2010        self.allspice_client.requests_delete(path)
2011
2012    def add_time(self, time: int, created: Optional[str] = None, user_name: Optional[User] = None):
2013        path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times"
2014        self.allspice_client.requests_post(
2015            path, data={"created": created, "time": int(time), "user_name": user_name}
2016        )
2017
2018    def get_comments(self) -> List[Comment]:
2019        """https://hub.allspice.io/api/swagger#/issue/issueGetComments"""
2020
2021        results = self.allspice_client.requests_get(
2022            self.GET_COMMENTS.format(
2023                owner=self.owner.username, repo=self.repository.name, index=self.number
2024            )
2025        )
2026
2027        return [Comment.parse_response(self.allspice_client, result) for result in results]
2028
2029    def create_comment(self, body: str) -> Comment:
2030        """https://hub.allspice.io/api/swagger#/issue/issueCreateComment"""
2031
2032        path = self.GET_COMMENTS.format(
2033            owner=self.owner.username, repo=self.repository.name, index=self.number
2034        )
2035
2036        response = self.allspice_client.requests_post(path, data={"body": body})
2037        return Comment.parse_response(self.allspice_client, response)
2038
2039    def get_attachments(self) -> List[Attachment]:
2040        """
2041        Fetch all attachments on this issue.
2042
2043        Unlike the assets field, this will always fetch all attachments from the
2044        API.
2045
2046        See https://hub.allspice.io/api/swagger#/issue/issueListIssueAttachments
2047        """
2048
2049        path = self.GET_ATTACHMENTS.format(
2050            owner=self.owner.username, repo=self.repository.name, index=self.number
2051        )
2052        response = self.allspice_client.requests_get(path)
2053
2054        return [Attachment.parse_response(self.allspice_client, result) for result in response]
2055
2056    def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment:
2057        """
2058        Create an attachment on this issue.
2059
2060        https://hub.allspice.io/api/swagger#/issue/issueCreateIssueAttachment
2061
2062        :param file: The file to attach. This should be a file-like object.
2063        :param name: The name of the file. If not provided, the name of the file will be
2064                     used.
2065        :return: The created attachment.
2066        """
2067
2068        args: dict[str, Any] = {
2069            "files": {"attachment": file},
2070        }
2071        if name is not None:
2072            args["params"] = {"name": name}
2073
2074        result = self.allspice_client.requests_post(
2075            self.GET_ATTACHMENTS.format(
2076                owner=self.owner.username, repo=self.repository.name, index=self.number
2077            ),
2078            **args,
2079        )
2080
2081        return Attachment.parse_response(self.allspice_client, result)

An issue on a repository.

Note: Issue.assets may not have any entries even if the issue has attachments. This happens when an issue is fetched via a bulk method like Repository.get_issues. In most cases, prefer using Issue.get_attachments to get the attachments on an issue.

Issue(allspice_client)
1926    def __init__(self, allspice_client):
1927        super().__init__(allspice_client)
assets: List[Union[Any, allspice.apiobject.Attachment]]
assignee: Any
assignees: Any
body: str
closed_at: Any
comments: int
created_at: str
due_date: Any
html_url: str
id: int
is_locked: bool
labels: List[Any]
milestone: Optional[Milestone]
number: int
original_author: str
original_author_id: int
pin_order: int
pull_request: Any
ref: str
repository: Dict[str, Union[int, str]]
state: str
title: str
updated_at: str
url: str
user: User
API_OBJECT = '/repos/{owner}/{repo}/issues/{index}'
GET_TIME = '/repos/%s/%s/issues/%s/times'
GET_COMMENTS = '/repos/{owner}/{repo}/issues/{index}/comments'
CREATE_ISSUE = '/repos/{owner}/{repo}/issues'
GET_ATTACHMENTS = '/repos/{owner}/{repo}/issues/{index}/assets'
OPENED = 'open'
CLOSED = 'closed'
def commit(self):
1964    def commit(self):
1965        args = {
1966            "owner": self.repository.owner.username,
1967            "repo": self.repository.name,
1968            "index": self.number,
1969        }
1970        self._commit(args)
@classmethod
def request(cls, allspice_client, owner: str, repo: str, number: str):
1972    @classmethod
1973    def request(cls, allspice_client, owner: str, repo: str, number: str):
1974        api_object = cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number})
1975        # The repository in the response is a RepositoryMeta object, so request
1976        # the full repository object and add it to the issue object.
1977        repository = Repository.request(allspice_client, owner, repo)
1978        setattr(api_object, "_repository", repository)
1979        # For legacy reasons
1980        cls._add_read_property("repo", repository, api_object)
1981        return api_object
@classmethod
def create_issue( cls, allspice_client, repo: Repository, title: str, body: str = ''):
1983    @classmethod
1984    def create_issue(cls, allspice_client, repo: Repository, title: str, body: str = ""):
1985        args = {"owner": repo.owner.username, "repo": repo.name}
1986        data = {"title": title, "body": body}
1987        result = allspice_client.requests_post(Issue.CREATE_ISSUE.format(**args), data=data)
1988        issue = Issue.parse_response(allspice_client, result)
1989        setattr(issue, "_repository", repo)
1990        cls._add_read_property("repo", repo, issue)
1991        return issue
owner: Organization | User
1993    @property
1994    def owner(self) -> Organization | User:
1995        return self.repository.owner
def get_time_sum(self, user: User) -> int:
1997    def get_time_sum(self, user: User) -> int:
1998        results = self.allspice_client.requests_get(
1999            Issue.GET_TIME % (self.owner.username, self.repository.name, self.number)
2000        )
2001        return sum(result["time"] for result in results if result and result["user_id"] == user.id)
def get_times(self) -> Optional[Dict]:
2003    def get_times(self) -> Optional[Dict]:
2004        return self.allspice_client.requests_get(
2005            Issue.GET_TIME % (self.owner.username, self.repository.name, self.number)
2006        )
def delete_time(self, time_id: str):
2008    def delete_time(self, time_id: str):
2009        path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times/{time_id}"
2010        self.allspice_client.requests_delete(path)
def add_time( self, time: int, created: Optional[str] = None, user_name: Optional[User] = None):
2012    def add_time(self, time: int, created: Optional[str] = None, user_name: Optional[User] = None):
2013        path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times"
2014        self.allspice_client.requests_post(
2015            path, data={"created": created, "time": int(time), "user_name": user_name}
2016        )
def get_comments(self) -> List[Comment]:
2018    def get_comments(self) -> List[Comment]:
2019        """https://hub.allspice.io/api/swagger#/issue/issueGetComments"""
2020
2021        results = self.allspice_client.requests_get(
2022            self.GET_COMMENTS.format(
2023                owner=self.owner.username, repo=self.repository.name, index=self.number
2024            )
2025        )
2026
2027        return [Comment.parse_response(self.allspice_client, result) for result in results]

allspice.allspice.io/api/swagger#/issue/issueGetComments">https://huballspice.allspice.io/api/swagger#/issue/issueGetComments

def create_comment(self, body: str) -> Comment:
2029    def create_comment(self, body: str) -> Comment:
2030        """https://hub.allspice.io/api/swagger#/issue/issueCreateComment"""
2031
2032        path = self.GET_COMMENTS.format(
2033            owner=self.owner.username, repo=self.repository.name, index=self.number
2034        )
2035
2036        response = self.allspice_client.requests_post(path, data={"body": body})
2037        return Comment.parse_response(self.allspice_client, response)

allspice.allspice.io/api/swagger#/issue/issueCreateComment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateComment

def get_attachments(self) -> List[allspice.apiobject.Attachment]:
2039    def get_attachments(self) -> List[Attachment]:
2040        """
2041        Fetch all attachments on this issue.
2042
2043        Unlike the assets field, this will always fetch all attachments from the
2044        API.
2045
2046        See https://hub.allspice.io/api/swagger#/issue/issueListIssueAttachments
2047        """
2048
2049        path = self.GET_ATTACHMENTS.format(
2050            owner=self.owner.username, repo=self.repository.name, index=self.number
2051        )
2052        response = self.allspice_client.requests_get(path)
2053
2054        return [Attachment.parse_response(self.allspice_client, result) for result in response]

Fetch all attachments on this issue.

Unlike the assets field, this will always fetch all attachments from the API.

See allspice.allspice.io/api/swagger#/issue/issueListIssueAttachments">https://huballspice.allspice.io/api/swagger#/issue/issueListIssueAttachments

def create_attachment( self, file: <class 'IO'>, name: Optional[str] = None) -> allspice.apiobject.Attachment:
2056    def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment:
2057        """
2058        Create an attachment on this issue.
2059
2060        https://hub.allspice.io/api/swagger#/issue/issueCreateIssueAttachment
2061
2062        :param file: The file to attach. This should be a file-like object.
2063        :param name: The name of the file. If not provided, the name of the file will be
2064                     used.
2065        :return: The created attachment.
2066        """
2067
2068        args: dict[str, Any] = {
2069            "files": {"attachment": file},
2070        }
2071        if name is not None:
2072            args["params"] = {"name": name}
2073
2074        result = self.allspice_client.requests_post(
2075            self.GET_ATTACHMENTS.format(
2076                owner=self.owner.username, repo=self.repository.name, index=self.number
2077            ),
2078            **args,
2079        )
2080
2081        return Attachment.parse_response(self.allspice_client, result)

Create an attachment on this issue.

allspice.allspice.io/api/swagger#/issue/issueCreateIssueAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateIssueAttachment

Parameters
  • file: The file to attach. This should be a file-like object.
  • name: The name of the file. If not provided, the name of the file will be used.
Returns

The created attachment.

class Milestone(allspice.baseapiobject.ApiObject):
1450class Milestone(ApiObject):
1451    allow_merge_commits: Any
1452    allow_rebase: Any
1453    allow_rebase_explicit: Any
1454    allow_squash_merge: Any
1455    archived: Any
1456    closed_at: Any
1457    closed_issues: int
1458    created_at: str
1459    default_branch: Any
1460    description: str
1461    due_on: Any
1462    has_issues: Any
1463    has_pull_requests: Any
1464    has_wiki: Any
1465    id: int
1466    ignore_whitespace_conflicts: Any
1467    name: Any
1468    open_issues: int
1469    private: Any
1470    state: str
1471    title: str
1472    updated_at: str
1473    website: Any
1474
1475    API_OBJECT = """/repos/{owner}/{repo}/milestones/{number}"""  # <owner, repo>
1476
1477    def __init__(self, allspice_client):
1478        super().__init__(allspice_client)
1479
1480    def __eq__(self, other):
1481        if not isinstance(other, Milestone):
1482            return False
1483        return self.allspice_client == other.allspice_client and self.id == other.id
1484
1485    def __hash__(self):
1486        return hash(self.allspice_client) ^ hash(self.id)
1487
1488    _fields_to_parsers: ClassVar[dict] = {
1489        "closed_at": lambda _, t: Util.convert_time(t),
1490        "due_on": lambda _, t: Util.convert_time(t),
1491    }
1492
1493    _patchable_fields: ClassVar[set[str]] = {
1494        "allow_merge_commits",
1495        "allow_rebase",
1496        "allow_rebase_explicit",
1497        "allow_squash_merge",
1498        "archived",
1499        "default_branch",
1500        "description",
1501        "has_issues",
1502        "has_pull_requests",
1503        "has_wiki",
1504        "ignore_whitespace_conflicts",
1505        "name",
1506        "private",
1507        "website",
1508    }
1509
1510    @classmethod
1511    def request(cls, allspice_client, owner: str, repo: str, number: str):
1512        return cls._request(allspice_client, {"owner": owner, "repo": repo, "number": number})
Milestone(allspice_client)
1477    def __init__(self, allspice_client):
1478        super().__init__(allspice_client)
allow_merge_commits: Any
allow_rebase: Any
allow_rebase_explicit: Any
allow_squash_merge: Any
archived: Any
closed_at: Any
closed_issues: int
created_at: str
default_branch: Any
description: str
due_on: Any
has_issues: Any
has_pull_requests: Any
has_wiki: Any
id: int
ignore_whitespace_conflicts: Any
name: Any
open_issues: int
private: Any
state: str
title: str
updated_at: str
website: Any
API_OBJECT = '/repos/{owner}/{repo}/milestones/{number}'
@classmethod
def request(cls, allspice_client, owner: str, repo: str, number: str):
1510    @classmethod
1511    def request(cls, allspice_client, owner: str, repo: str, number: str):
1512        return cls._request(allspice_client, {"owner": owner, "repo": repo, "number": number})
class NotFoundException(builtins.Exception):
6class NotFoundException(Exception):
7    pass

Common base class for all non-exit exceptions.

class Organization(allspice.baseapiobject.ApiObject):
 33class Organization(ApiObject):
 34    """see https://hub.allspice.io/api/swagger#/organization/orgGetAll"""
 35
 36    active: Optional[bool]
 37    avatar_url: str
 38    created: Optional[str]
 39    description: str
 40    email: str
 41    followers_count: Optional[int]
 42    following_count: Optional[int]
 43    full_name: str
 44    html_url: Optional[str]
 45    id: int
 46    is_admin: Optional[bool]
 47    language: Optional[str]
 48    last_login: Optional[str]
 49    location: str
 50    login: Optional[str]
 51    login_name: Optional[str]
 52    name: Optional[str]
 53    prohibit_login: Optional[bool]
 54    repo_admin_change_team_access: Optional[bool]
 55    restricted: Optional[bool]
 56    source_id: Optional[int]
 57    starred_repos_count: Optional[int]
 58    username: str
 59    visibility: str
 60    website: str
 61
 62    API_OBJECT = """/orgs/{name}"""  # <org>
 63    ORG_REPOS_REQUEST = """/orgs/%s/repos"""  # <org>
 64    ORG_TEAMS_REQUEST = """/orgs/%s/teams"""  # <org>
 65    ORG_TEAMS_CREATE = """/orgs/%s/teams"""  # <org>
 66    ORG_GET_MEMBERS = """/orgs/%s/members"""  # <org>
 67    ORG_IS_MEMBER = """/orgs/%s/members/%s"""  # <org>, <username>
 68    ORG_HEATMAP = """/users/%s/heatmap"""  # <username>
 69
 70    def __init__(self, allspice_client):
 71        super().__init__(allspice_client)
 72
 73    def __eq__(self, other):
 74        if not isinstance(other, Organization):
 75            return False
 76        return self.allspice_client == other.allspice_client and self.name == other.name
 77
 78    def __hash__(self):
 79        return hash(self.allspice_client) ^ hash(self.name)
 80
 81    @classmethod
 82    def request(cls, allspice_client, name: str) -> Self:
 83        return cls._request(allspice_client, {"name": name})
 84
 85    @classmethod
 86    def parse_response(cls, allspice_client, result) -> "Organization":
 87        api_object = super().parse_response(allspice_client, result)
 88        # add "name" field to make this behave similar to users for gitea < 1.18
 89        # also necessary for repository-owner when org is repo owner
 90        if not hasattr(api_object, "name"):
 91            Organization._add_read_property("name", result["username"], api_object)
 92        return api_object
 93
 94    _patchable_fields: ClassVar[set[str]] = {
 95        "description",
 96        "full_name",
 97        "location",
 98        "visibility",
 99        "website",
100    }
101
102    def commit(self):
103        args = {"name": self.name}
104        self._commit(args)
105
106    def create_repo(
107        self,
108        repoName: str,
109        description: str = "",
110        private: bool = False,
111        autoInit=True,
112        gitignores: Optional[str] = None,
113        license: Optional[str] = None,
114        readme: str = "Default",
115        issue_labels: Optional[str] = None,
116        default_branch="master",
117    ):
118        """Create an organization Repository
119
120        Throws:
121            AlreadyExistsException: If the Repository exists already.
122            Exception: If something else went wrong.
123        """
124        result = self.allspice_client.requests_post(
125            f"/orgs/{self.name}/repos",
126            data={
127                "name": repoName,
128                "description": description,
129                "private": private,
130                "auto_init": autoInit,
131                "gitignores": gitignores,
132                "license": license,
133                "issue_labels": issue_labels,
134                "readme": readme,
135                "default_branch": default_branch,
136            },
137        )
138        if "id" in result:
139            self.allspice_client.logger.info("Successfully created Repository %s " % result["name"])
140        else:
141            self.allspice_client.logger.error(result["message"])
142            raise Exception("Repository not created... (gitea: %s)" % result["message"])
143        return Repository.parse_response(self.allspice_client, result)
144
145    def get_repositories(self) -> List["Repository"]:
146        results = self.allspice_client.requests_get_paginated(
147            Organization.ORG_REPOS_REQUEST % self.username
148        )
149        return [Repository.parse_response(self.allspice_client, result) for result in results]
150
151    def get_repository(self, name) -> "Repository":
152        repos = self.get_repositories()
153        for repo in repos:
154            if repo.name == name:
155                return repo
156        raise NotFoundException("Repository %s not existent in organization." % name)
157
158    def get_teams(self) -> List["Team"]:
159        results = self.allspice_client.requests_get(Organization.ORG_TEAMS_REQUEST % self.username)
160        teams = [Team.parse_response(self.allspice_client, result) for result in results]
161        # organisation seems to be missing using this request, so we add org manually
162        for t in teams:
163            setattr(t, "_organization", self)
164        return teams
165
166    def get_team(self, name) -> "Team":
167        teams = self.get_teams()
168        for team in teams:
169            if team.name == name:
170                return team
171        raise NotFoundException("Team not existent in organization.")
172
173    def create_team(
174        self,
175        name: str,
176        description: str = "",
177        permission: str = "read",
178        can_create_org_repo: bool = False,
179        includes_all_repositories: bool = False,
180        units=(
181            "repo.code",
182            "repo.issues",
183            "repo.ext_issues",
184            "repo.wiki",
185            "repo.pulls",
186            "repo.releases",
187            "repo.ext_wiki",
188        ),
189        units_map={},
190    ) -> "Team":
191        """Alias for AllSpice#create_team"""
192        # TODO: Move AllSpice#create_team to Organization#create_team and
193        #       deprecate AllSpice#create_team.
194        return self.allspice_client.create_team(
195            org=self,
196            name=name,
197            description=description,
198            permission=permission,
199            can_create_org_repo=can_create_org_repo,
200            includes_all_repositories=includes_all_repositories,
201            units=units,
202            units_map=units_map,
203        )
204
205    def get_members(self) -> List["User"]:
206        results = self.allspice_client.requests_get(Organization.ORG_GET_MEMBERS % self.username)
207        return [User.parse_response(self.allspice_client, result) for result in results]
208
209    def is_member(self, username) -> bool:
210        if isinstance(username, User):
211            username = username.username
212        try:
213            # returns 204 if its ok, 404 if its not
214            self.allspice_client.requests_get(
215                Organization.ORG_IS_MEMBER % (self.username, username)
216            )
217            return True
218        except Exception:
219            return False
220
221    def remove_member(self, user: "User"):
222        path = f"/orgs/{self.username}/members/{user.username}"
223        self.allspice_client.requests_delete(path)
224
225    def delete(self):
226        """Delete this Organization. Invalidates this Objects data. Also deletes all Repositories owned by the User"""
227        for repo in self.get_repositories():
228            repo.delete()
229        self.allspice_client.requests_delete(Organization.API_OBJECT.format(name=self.username))
230        self.deleted = True
231
232    def get_heatmap(self) -> List[Tuple[datetime, int]]:
233        results = self.allspice_client.requests_get(User.USER_HEATMAP % self.username)
234        results = [
235            (datetime.fromtimestamp(result["timestamp"]), result["contributions"])
236            for result in results
237        ]
238        return results

see allspice.allspice.io/api/swagger#/organization/orgGetAll">https://huballspice.allspice.io/api/swagger#/organization/orgGetAll

Organization(allspice_client)
70    def __init__(self, allspice_client):
71        super().__init__(allspice_client)
active: Optional[bool]
avatar_url: str
created: Optional[str]
description: str
email: str
followers_count: Optional[int]
following_count: Optional[int]
full_name: str
html_url: Optional[str]
id: int
is_admin: Optional[bool]
language: Optional[str]
last_login: Optional[str]
location: str
login: Optional[str]
login_name: Optional[str]
name: Optional[str]
prohibit_login: Optional[bool]
repo_admin_change_team_access: Optional[bool]
restricted: Optional[bool]
source_id: Optional[int]
starred_repos_count: Optional[int]
username: str
visibility: str
website: str
API_OBJECT = '/orgs/{name}'
ORG_REPOS_REQUEST = '/orgs/%s/repos'
ORG_TEAMS_REQUEST = '/orgs/%s/teams'
ORG_TEAMS_CREATE = '/orgs/%s/teams'
ORG_GET_MEMBERS = '/orgs/%s/members'
ORG_IS_MEMBER = '/orgs/%s/members/%s'
ORG_HEATMAP = '/users/%s/heatmap'
@classmethod
def request(cls, allspice_client, name: str) -> Self:
81    @classmethod
82    def request(cls, allspice_client, name: str) -> Self:
83        return cls._request(allspice_client, {"name": name})
@classmethod
def parse_response(cls, allspice_client, result) -> Organization:
85    @classmethod
86    def parse_response(cls, allspice_client, result) -> "Organization":
87        api_object = super().parse_response(allspice_client, result)
88        # add "name" field to make this behave similar to users for gitea < 1.18
89        # also necessary for repository-owner when org is repo owner
90        if not hasattr(api_object, "name"):
91            Organization._add_read_property("name", result["username"], api_object)
92        return api_object
def commit(self):
102    def commit(self):
103        args = {"name": self.name}
104        self._commit(args)
def create_repo( self, repoName: str, description: str = '', private: bool = False, autoInit=True, gitignores: Optional[str] = None, license: Optional[str] = None, readme: str = 'Default', issue_labels: Optional[str] = None, default_branch='master'):
106    def create_repo(
107        self,
108        repoName: str,
109        description: str = "",
110        private: bool = False,
111        autoInit=True,
112        gitignores: Optional[str] = None,
113        license: Optional[str] = None,
114        readme: str = "Default",
115        issue_labels: Optional[str] = None,
116        default_branch="master",
117    ):
118        """Create an organization Repository
119
120        Throws:
121            AlreadyExistsException: If the Repository exists already.
122            Exception: If something else went wrong.
123        """
124        result = self.allspice_client.requests_post(
125            f"/orgs/{self.name}/repos",
126            data={
127                "name": repoName,
128                "description": description,
129                "private": private,
130                "auto_init": autoInit,
131                "gitignores": gitignores,
132                "license": license,
133                "issue_labels": issue_labels,
134                "readme": readme,
135                "default_branch": default_branch,
136            },
137        )
138        if "id" in result:
139            self.allspice_client.logger.info("Successfully created Repository %s " % result["name"])
140        else:
141            self.allspice_client.logger.error(result["message"])
142            raise Exception("Repository not created... (gitea: %s)" % result["message"])
143        return Repository.parse_response(self.allspice_client, result)

Create an organization Repository

Throws: AlreadyExistsException: If the Repository exists already. Exception: If something else went wrong.

def get_repositories(self) -> List[Repository]:
145    def get_repositories(self) -> List["Repository"]:
146        results = self.allspice_client.requests_get_paginated(
147            Organization.ORG_REPOS_REQUEST % self.username
148        )
149        return [Repository.parse_response(self.allspice_client, result) for result in results]
def get_repository(self, name) -> Repository:
151    def get_repository(self, name) -> "Repository":
152        repos = self.get_repositories()
153        for repo in repos:
154            if repo.name == name:
155                return repo
156        raise NotFoundException("Repository %s not existent in organization." % name)
def get_teams(self) -> List[Team]:
158    def get_teams(self) -> List["Team"]:
159        results = self.allspice_client.requests_get(Organization.ORG_TEAMS_REQUEST % self.username)
160        teams = [Team.parse_response(self.allspice_client, result) for result in results]
161        # organisation seems to be missing using this request, so we add org manually
162        for t in teams:
163            setattr(t, "_organization", self)
164        return teams
def get_team(self, name) -> Team:
166    def get_team(self, name) -> "Team":
167        teams = self.get_teams()
168        for team in teams:
169            if team.name == name:
170                return team
171        raise NotFoundException("Team not existent in organization.")
def create_team( self, name: str, description: str = '', permission: str = 'read', can_create_org_repo: bool = False, includes_all_repositories: bool = False, units=('repo.code', 'repo.issues', 'repo.ext_issues', 'repo.wiki', 'repo.pulls', 'repo.releases', 'repo.ext_wiki'), units_map={}) -> Team:
173    def create_team(
174        self,
175        name: str,
176        description: str = "",
177        permission: str = "read",
178        can_create_org_repo: bool = False,
179        includes_all_repositories: bool = False,
180        units=(
181            "repo.code",
182            "repo.issues",
183            "repo.ext_issues",
184            "repo.wiki",
185            "repo.pulls",
186            "repo.releases",
187            "repo.ext_wiki",
188        ),
189        units_map={},
190    ) -> "Team":
191        """Alias for AllSpice#create_team"""
192        # TODO: Move AllSpice#create_team to Organization#create_team and
193        #       deprecate AllSpice#create_team.
194        return self.allspice_client.create_team(
195            org=self,
196            name=name,
197            description=description,
198            permission=permission,
199            can_create_org_repo=can_create_org_repo,
200            includes_all_repositories=includes_all_repositories,
201            units=units,
202            units_map=units_map,
203        )

Alias for AllSpice#create_team

def get_members(self) -> List[User]:
205    def get_members(self) -> List["User"]:
206        results = self.allspice_client.requests_get(Organization.ORG_GET_MEMBERS % self.username)
207        return [User.parse_response(self.allspice_client, result) for result in results]
def is_member(self, username) -> bool:
209    def is_member(self, username) -> bool:
210        if isinstance(username, User):
211            username = username.username
212        try:
213            # returns 204 if its ok, 404 if its not
214            self.allspice_client.requests_get(
215                Organization.ORG_IS_MEMBER % (self.username, username)
216            )
217            return True
218        except Exception:
219            return False
def remove_member(self, user: User):
221    def remove_member(self, user: "User"):
222        path = f"/orgs/{self.username}/members/{user.username}"
223        self.allspice_client.requests_delete(path)
def delete(self):
225    def delete(self):
226        """Delete this Organization. Invalidates this Objects data. Also deletes all Repositories owned by the User"""
227        for repo in self.get_repositories():
228            repo.delete()
229        self.allspice_client.requests_delete(Organization.API_OBJECT.format(name=self.username))
230        self.deleted = True

Delete this Organization. Invalidates this Objects data. Also deletes all Repositories owned by the User

def get_heatmap(self) -> List[Tuple[datetime.datetime, int]]:
232    def get_heatmap(self) -> List[Tuple[datetime, int]]:
233        results = self.allspice_client.requests_get(User.USER_HEATMAP % self.username)
234        results = [
235            (datetime.fromtimestamp(result["timestamp"]), result["contributions"])
236            for result in results
237        ]
238        return results
class Release(allspice.baseapiobject.ApiObject):
2359class Release(ApiObject):
2360    """
2361    A release on a repo.
2362    """
2363
2364    assets: List[Union[Any, Dict[str, Union[int, str]], "ReleaseAsset"]]
2365    author: User
2366    body: str
2367    created_at: str
2368    draft: bool
2369    html_url: str
2370    id: int
2371    name: str
2372    prerelease: bool
2373    published_at: str
2374    repo: Optional["Repository"]
2375    repository: Optional["Repository"]
2376    tag_name: str
2377    tarball_url: str
2378    target_commitish: str
2379    upload_url: str
2380    url: str
2381    zipball_url: str
2382
2383    API_OBJECT = "/repos/{owner}/{repo}/releases/{id}"
2384    RELEASE_CREATE_ASSET = "/repos/{owner}/{repo}/releases/{id}/assets"
2385    # Note that we don't strictly need the get_assets route, as the release
2386    # object already contains the assets.
2387
2388    def __init__(self, allspice_client):
2389        super().__init__(allspice_client)
2390
2391    def __eq__(self, other):
2392        if not isinstance(other, Release):
2393            return False
2394        return self.repo == other.repo and self.id == other.id
2395
2396    def __hash__(self):
2397        return hash(self.repo) ^ hash(self.id)
2398
2399    _fields_to_parsers: ClassVar[dict] = {
2400        "author": lambda allspice_client, author: User.parse_response(allspice_client, author),
2401    }
2402    _patchable_fields: ClassVar[set[str]] = {
2403        "body",
2404        "draft",
2405        "name",
2406        "prerelease",
2407        "tag_name",
2408        "target_commitish",
2409    }
2410
2411    @classmethod
2412    def parse_response(cls, allspice_client, result, repo) -> Release:
2413        release = super().parse_response(allspice_client, result)
2414        Release._add_read_property("repository", repo, release)
2415        # For legacy reasons
2416        Release._add_read_property("repo", repo, release)
2417        setattr(
2418            release,
2419            "_assets",
2420            [
2421                ReleaseAsset.parse_response(allspice_client, asset, release)
2422                for asset in result["assets"]
2423            ],
2424        )
2425        return release
2426
2427    @classmethod
2428    def request(
2429        cls,
2430        allspice_client,
2431        owner: str,
2432        repo: str,
2433        id: Optional[int] = None,
2434    ) -> Release:
2435        args = {"owner": owner, "repo": repo, "id": id}
2436        release_response = cls._get_gitea_api_object(allspice_client, args)
2437        repository = Repository.request(allspice_client, owner, repo)
2438        release = cls.parse_response(allspice_client, release_response, repository)
2439        return release
2440
2441    def commit(self):
2442        if self.repo is None:
2443            raise ValueError("Cannot commit a release without a repository.")
2444
2445        args = {"owner": self.repo.owner.username, "repo": self.repo.name, "id": self.id}
2446        self._commit(args)
2447
2448    def create_asset(self, file: IO, name: Optional[str] = None) -> ReleaseAsset:
2449        """
2450        Create an asset for this release.
2451
2452        https://hub.allspice.io/api/swagger#/repository/repoCreateReleaseAsset
2453
2454        :param file: The file to upload. This should be a file-like object.
2455        :param name: The name of the file.
2456        :return: The created asset.
2457        """
2458
2459        if self.repo is None:
2460            raise ValueError("Cannot commit a release without a repository.")
2461
2462        args: dict[str, Any] = {"files": {"attachment": file}}
2463        if name is not None:
2464            args["params"] = {"name": name}
2465
2466        result = self.allspice_client.requests_post(
2467            self.RELEASE_CREATE_ASSET.format(
2468                owner=self.repo.owner.username,
2469                repo=self.repo.name,
2470                id=self.id,
2471            ),
2472            **args,
2473        )
2474        return ReleaseAsset.parse_response(self.allspice_client, result, self)
2475
2476    def delete(self):
2477        if self.repo is None:
2478            raise ValueError("Cannot commit a release without a repository.")
2479
2480        args = {"owner": self.repo.owner.username, "repo": self.repo.name, "id": self.id}
2481        self.allspice_client.requests_delete(self.API_OBJECT.format(**args))
2482        self.deleted = True

A release on a repo.

Release(allspice_client)
2388    def __init__(self, allspice_client):
2389        super().__init__(allspice_client)
assets: List[Union[Any, Dict[str, Union[int, str]], allspice.apiobject.ReleaseAsset]]
author: User
body: str
created_at: str
draft: bool
html_url: str
id: int
name: str
prerelease: bool
published_at: str
repo: Optional[Repository]
repository: Optional[Repository]
tag_name: str
tarball_url: str
target_commitish: str
upload_url: str
url: str
zipball_url: str
API_OBJECT = '/repos/{owner}/{repo}/releases/{id}'
RELEASE_CREATE_ASSET = '/repos/{owner}/{repo}/releases/{id}/assets'
@classmethod
def parse_response(cls, allspice_client, result, repo) -> Release:
2411    @classmethod
2412    def parse_response(cls, allspice_client, result, repo) -> Release:
2413        release = super().parse_response(allspice_client, result)
2414        Release._add_read_property("repository", repo, release)
2415        # For legacy reasons
2416        Release._add_read_property("repo", repo, release)
2417        setattr(
2418            release,
2419            "_assets",
2420            [
2421                ReleaseAsset.parse_response(allspice_client, asset, release)
2422                for asset in result["assets"]
2423            ],
2424        )
2425        return release
@classmethod
def request( cls, allspice_client, owner: str, repo: str, id: Optional[int] = None) -> Release:
2427    @classmethod
2428    def request(
2429        cls,
2430        allspice_client,
2431        owner: str,
2432        repo: str,
2433        id: Optional[int] = None,
2434    ) -> Release:
2435        args = {"owner": owner, "repo": repo, "id": id}
2436        release_response = cls._get_gitea_api_object(allspice_client, args)
2437        repository = Repository.request(allspice_client, owner, repo)
2438        release = cls.parse_response(allspice_client, release_response, repository)
2439        return release
def commit(self):
2441    def commit(self):
2442        if self.repo is None:
2443            raise ValueError("Cannot commit a release without a repository.")
2444
2445        args = {"owner": self.repo.owner.username, "repo": self.repo.name, "id": self.id}
2446        self._commit(args)
def create_asset( self, file: <class 'IO'>, name: Optional[str] = None) -> allspice.apiobject.ReleaseAsset:
2448    def create_asset(self, file: IO, name: Optional[str] = None) -> ReleaseAsset:
2449        """
2450        Create an asset for this release.
2451
2452        https://hub.allspice.io/api/swagger#/repository/repoCreateReleaseAsset
2453
2454        :param file: The file to upload. This should be a file-like object.
2455        :param name: The name of the file.
2456        :return: The created asset.
2457        """
2458
2459        if self.repo is None:
2460            raise ValueError("Cannot commit a release without a repository.")
2461
2462        args: dict[str, Any] = {"files": {"attachment": file}}
2463        if name is not None:
2464            args["params"] = {"name": name}
2465
2466        result = self.allspice_client.requests_post(
2467            self.RELEASE_CREATE_ASSET.format(
2468                owner=self.repo.owner.username,
2469                repo=self.repo.name,
2470                id=self.id,
2471            ),
2472            **args,
2473        )
2474        return ReleaseAsset.parse_response(self.allspice_client, result, self)

Create an asset for this release.

allspice.allspice.io/api/swagger#/repository/repoCreateReleaseAsset">https://huballspice.allspice.io/api/swagger#/repository/repoCreateReleaseAsset

Parameters
  • file: The file to upload. This should be a file-like object.
  • name: The name of the file.
Returns

The created asset.

def delete(self):
2476    def delete(self):
2477        if self.repo is None:
2478            raise ValueError("Cannot commit a release without a repository.")
2479
2480        args = {"owner": self.repo.owner.username, "repo": self.repo.name, "id": self.id}
2481        self.allspice_client.requests_delete(self.API_OBJECT.format(**args))
2482        self.deleted = True
class Repository(allspice.baseapiobject.ApiObject):
 477class Repository(ApiObject):
 478    allow_fast_forward_only_merge: bool
 479    allow_manual_merge: Any
 480    allow_merge_commits: bool
 481    allow_rebase: bool
 482    allow_rebase_explicit: bool
 483    allow_rebase_update: bool
 484    allow_squash_merge: bool
 485    archived: bool
 486    archived_at: str
 487    autodetect_manual_merge: Any
 488    avatar_url: str
 489    clone_url: str
 490    created_at: str
 491    default_allow_maintainer_edit: bool
 492    default_branch: str
 493    default_delete_branch_after_merge: bool
 494    default_merge_style: str
 495    description: str
 496    empty: bool
 497    enable_prune: Any
 498    external_tracker: Any
 499    external_wiki: Any
 500    fork: bool
 501    forks_count: int
 502    full_name: str
 503    has_actions: bool
 504    has_issues: bool
 505    has_packages: bool
 506    has_projects: bool
 507    has_pull_requests: bool
 508    has_releases: bool
 509    has_wiki: bool
 510    html_url: str
 511    id: int
 512    ignore_whitespace_conflicts: bool
 513    internal: bool
 514    internal_tracker: Dict[str, bool]
 515    language: str
 516    languages_url: str
 517    link: str
 518    mirror: bool
 519    mirror_interval: str
 520    mirror_updated: str
 521    name: str
 522    object_format_name: str
 523    open_issues_count: int
 524    open_pr_counter: int
 525    original_url: str
 526    owner: Union["User", "Organization"]
 527    parent: Any
 528    permissions: Dict[str, bool]
 529    private: bool
 530    projects_mode: str
 531    release_counter: int
 532    repo_transfer: Any
 533    size: int
 534    ssh_url: str
 535    stars_count: int
 536    template: bool
 537    updated_at: datetime
 538    url: str
 539    watchers_count: int
 540    website: str
 541
 542    API_OBJECT = """/repos/{owner}/{name}"""  # <owner>, <reponame>
 543    REPO_IS_COLLABORATOR = """/repos/%s/%s/collaborators/%s"""  # <owner>, <reponame>, <username>
 544    REPO_SEARCH = """/repos/search/"""
 545    REPO_BRANCHES = """/repos/%s/%s/branches"""  # <owner>, <reponame>
 546    REPO_BRANCH = """/repos/{owner}/{repo}/branches/{branch}"""
 547    REPO_ISSUES = """/repos/{owner}/{repo}/issues"""  # <owner, reponame>
 548    REPO_DESIGN_REVIEWS = """/repos/{owner}/{repo}/pulls"""
 549    REPO_DELETE = """/repos/%s/%s"""  # <owner>, <reponame>
 550    REPO_TIMES = """/repos/%s/%s/times"""  # <owner>, <reponame>
 551    REPO_USER_TIME = """/repos/%s/%s/times/%s"""  # <owner>, <reponame>, <username>
 552    REPO_COMMITS = "/repos/%s/%s/commits"  # <owner>, <reponame>
 553    REPO_TRANSFER = "/repos/{owner}/{repo}/transfer"
 554    REPO_MILESTONES = """/repos/{owner}/{repo}/milestones"""
 555    REPO_GET_ARCHIVE = "/repos/{owner}/{repo}/archive/{ref}.{format}"
 556    REPO_GET_ALLSPICE_JSON = "/repos/{owner}/{repo}/allspice_generated/json/{content}"
 557    REPO_GET_ALLSPICE_SVG = "/repos/{owner}/{repo}/allspice_generated/svg/{content}"
 558    REPO_GET_TOPICS = "/repos/{owner}/{repo}/topics"
 559    REPO_ADD_TOPIC = "/repos/{owner}/{repo}/topics/{topic}"
 560    REPO_GET_RELEASES = "/repos/{owner}/{repo}/releases"
 561    REPO_GET_LATEST_RELEASE = "/repos/{owner}/{repo}/releases/latest"
 562    REPO_GET_RELEASE_BY_TAG = "/repos/{owner}/{repo}/releases/tags/{tag}"
 563    REPO_GET_COMMIT_STATUS = "/repos/{owner}/{repo}/statuses/{sha}"
 564    REPO_GET_RAW_FILE = "/repos/{owner}/{repo}/raw/{path}"
 565    REPO_GET_TREE = "/repos/{owner}/{repo}/git/trees/{ref}"
 566
 567    class ArchiveFormat(Enum):
 568        """
 569        Archive formats for Repository.get_archive
 570        """
 571
 572        TAR = "tar.gz"
 573        ZIP = "zip"
 574
 575    class CommitStatusSort(Enum):
 576        """
 577        Sort order for Repository.get_commit_status
 578        """
 579
 580        OLDEST = "oldest"
 581        RECENT_UPDATE = "recentupdate"
 582        LEAST_UPDATE = "leastupdate"
 583        LEAST_INDEX = "leastindex"
 584        HIGHEST_INDEX = "highestindex"
 585
 586    def __init__(self, allspice_client):
 587        super().__init__(allspice_client)
 588
 589    def __eq__(self, other):
 590        if not isinstance(other, Repository):
 591            return False
 592        return self.owner == other.owner and self.name == other.name
 593
 594    def __hash__(self):
 595        return hash(self.owner) ^ hash(self.name)
 596
 597    _fields_to_parsers: ClassVar[dict] = {
 598        # dont know how to tell apart user and org as owner except form email being empty.
 599        "owner": lambda allspice_client, r: (
 600            Organization.parse_response(allspice_client, r)
 601            if r["email"] == ""
 602            else User.parse_response(allspice_client, r)
 603        ),
 604        "updated_at": lambda _, t: Util.convert_time(t),
 605    }
 606
 607    @classmethod
 608    def request(
 609        cls,
 610        allspice_client,
 611        owner: str,
 612        name: str,
 613    ) -> Repository:
 614        return cls._request(allspice_client, {"owner": owner, "name": name})
 615
 616    @classmethod
 617    def search(
 618        cls,
 619        allspice_client,
 620        query: Optional[str] = None,
 621        topic: bool = False,
 622        include_description: bool = False,
 623        user: Optional[User] = None,
 624        owner_to_prioritize: Union[User, Organization, None] = None,
 625    ) -> list[Repository]:
 626        """
 627        Search for repositories.
 628
 629        See https://hub.allspice.io/api/swagger#/repository/repoSearch
 630
 631        :param query: The query string to search for
 632        :param topic: If true, the query string will only be matched against the
 633            repository's topic.
 634        :param include_description: If true, the query string will be matched
 635            against the repository's description as well.
 636        :param user: If specified, only repositories that this user owns or
 637            contributes to will be searched.
 638        :param owner_to_prioritize: If specified, repositories owned by the
 639            given entity will be prioritized in the search.
 640        :returns: All repositories matching the query. If there are many
 641            repositories matching this query, this may take some time.
 642        """
 643
 644        params = {}
 645
 646        if query is not None:
 647            params["q"] = query
 648        if topic:
 649            params["topic"] = topic
 650        if include_description:
 651            params["include_description"] = include_description
 652        if user is not None:
 653            params["user"] = user.id
 654        if owner_to_prioritize is not None:
 655            params["owner_to_prioritize"] = owner_to_prioritize.id
 656
 657        responses = allspice_client.requests_get_paginated(cls.REPO_SEARCH, params=params)
 658
 659        return [Repository.parse_response(allspice_client, response) for response in responses]
 660
 661    _patchable_fields: ClassVar[set[str]] = {
 662        "allow_manual_merge",
 663        "allow_merge_commits",
 664        "allow_rebase",
 665        "allow_rebase_explicit",
 666        "allow_rebase_update",
 667        "allow_squash_merge",
 668        "archived",
 669        "autodetect_manual_merge",
 670        "default_branch",
 671        "default_delete_branch_after_merge",
 672        "default_merge_style",
 673        "description",
 674        "enable_prune",
 675        "external_tracker",
 676        "external_wiki",
 677        "has_actions",
 678        "has_issues",
 679        "has_projects",
 680        "has_pull_requests",
 681        "has_wiki",
 682        "ignore_whitespace_conflicts",
 683        "internal_tracker",
 684        "mirror_interval",
 685        "name",
 686        "private",
 687        "template",
 688        "website",
 689    }
 690
 691    def commit(self):
 692        args = {"owner": self.owner.username, "name": self.name}
 693        self._commit(args)
 694
 695    def get_branches(self) -> List["Branch"]:
 696        """Get all the Branches of this Repository."""
 697
 698        results = self.allspice_client.requests_get_paginated(
 699            Repository.REPO_BRANCHES % (self.owner.username, self.name)
 700        )
 701        return [Branch.parse_response(self.allspice_client, result) for result in results]
 702
 703    def get_branch(self, name: str) -> "Branch":
 704        """Get a specific Branch of this Repository."""
 705        result = self.allspice_client.requests_get(
 706            Repository.REPO_BRANCH.format(owner=self.owner.username, repo=self.name, branch=name)
 707        )
 708        return Branch.parse_response(self.allspice_client, result)
 709
 710    def add_branch(self, create_from: Ref, newname: str) -> "Branch":
 711        """Add a branch to the repository"""
 712        # Note: will only work with gitea 1.13 or higher!
 713
 714        ref_name = Util.data_params_for_ref(create_from)
 715        if "ref" not in ref_name:
 716            raise ValueError("create_from must be a Branch, Commit or string")
 717        ref_name = ref_name["ref"]
 718
 719        data = {"new_branch_name": newname, "old_ref_name": ref_name}
 720        result = self.allspice_client.requests_post(
 721            Repository.REPO_BRANCHES % (self.owner.username, self.name), data=data
 722        )
 723        return Branch.parse_response(self.allspice_client, result)
 724
 725    def get_issues(
 726        self,
 727        state: Literal["open", "closed", "all"] = "all",
 728        search_query: Optional[str] = None,
 729        labels: Optional[List[str]] = None,
 730        milestones: Optional[List[Union[Milestone, str]]] = None,
 731        assignee: Optional[Union[User, str]] = None,
 732        since: Optional[datetime] = None,
 733        before: Optional[datetime] = None,
 734    ) -> List["Issue"]:
 735        """
 736        Get all Issues of this Repository (open and closed)
 737
 738        https://hub.allspice.io/api/swagger#/repository/repoListIssues
 739
 740        All params of this method are optional filters. If you don't specify a filter, it
 741        will not be applied.
 742
 743        :param state: The state of the Issues to get. If None, all Issues are returned.
 744        :param search_query: Filter issues by text. This is equivalent to searching for
 745                             `search_query` in the Issues on the web interface.
 746        :param labels: Filter issues by labels.
 747        :param milestones: Filter issues by milestones.
 748        :param assignee: Filter issues by the assigned user.
 749        :param since: Filter issues by the date they were created.
 750        :param before: Filter issues by the date they were created.
 751        :return: A list of Issues.
 752        """
 753
 754        data = {
 755            "state": state,
 756        }
 757        if search_query:
 758            data["q"] = search_query
 759        if labels:
 760            data["labels"] = ",".join(labels)
 761        if milestones:
 762            data["milestone"] = ",".join(
 763                [
 764                    milestone.name if isinstance(milestone, Milestone) else milestone
 765                    for milestone in milestones
 766                ]
 767            )
 768        if assignee:
 769            if isinstance(assignee, User):
 770                data["assignee"] = assignee.username
 771            else:
 772                data["assignee"] = assignee
 773        if since:
 774            data["since"] = Util.format_time(since)
 775        if before:
 776            data["before"] = Util.format_time(before)
 777
 778        results = self.allspice_client.requests_get_paginated(
 779            Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name),
 780            params=data,
 781        )
 782
 783        issues = []
 784        for result in results:
 785            issue = Issue.parse_response(self.allspice_client, result)
 786            # See Issue.request
 787            setattr(issue, "_repository", self)
 788            # This is mostly for compatibility with an older implementation
 789            Issue._add_read_property("repo", self, issue)
 790            issues.append(issue)
 791
 792        return issues
 793
 794    def get_design_reviews(
 795        self,
 796        state: Literal["open", "closed", "all"] = "all",
 797        milestone: Optional[Union[Milestone, str]] = None,
 798        labels: Optional[List[str]] = None,
 799    ) -> List["DesignReview"]:
 800        """
 801        Get all Design Reviews of this Repository.
 802
 803        https://hub.allspice.io/api/swagger#/repository/repoListPullRequests
 804
 805        :param state: The state of the Design Reviews to get. If None, all Design Reviews
 806                      are returned.
 807        :param milestone: The milestone of the Design Reviews to get.
 808        :param labels: A list of label IDs to filter DRs by.
 809        :return: A list of Design Reviews.
 810        """
 811
 812        params = {
 813            "state": state,
 814        }
 815        if milestone:
 816            if isinstance(milestone, Milestone):
 817                params["milestone"] = milestone.name
 818            else:
 819                params["milestone"] = milestone
 820        if labels:
 821            params["labels"] = ",".join(labels)
 822
 823        results = self.allspice_client.requests_get_paginated(
 824            self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name),
 825            params=params,
 826        )
 827        return [DesignReview.parse_response(self.allspice_client, result) for result in results]
 828
 829    def get_commits(
 830        self,
 831        sha: Optional[str] = None,
 832        path: Optional[str] = None,
 833        stat: bool = True,
 834    ) -> List["Commit"]:
 835        """
 836        Get all the Commits of this Repository.
 837
 838        https://hub.allspice.io/api/swagger#/repository/repoGetAllCommits
 839
 840        :param sha: The SHA of the commit to start listing commits from.
 841        :param path: filepath of a file/dir.
 842        :param stat: Include the number of additions and deletions in the response.
 843                     Disable for speedup.
 844        :return: A list of Commits.
 845        """
 846
 847        data = {}
 848        if sha:
 849            data["sha"] = sha
 850        if path:
 851            data["path"] = path
 852        if not stat:
 853            data["stat"] = False
 854
 855        try:
 856            results = self.allspice_client.requests_get_paginated(
 857                Repository.REPO_COMMITS % (self.owner.username, self.name),
 858                params=data,
 859            )
 860        except ConflictException as err:
 861            logging.warning(err)
 862            logging.warning("Repository %s/%s is Empty" % (self.owner.username, self.name))
 863            results = []
 864        return [Commit.parse_response(self.allspice_client, result) for result in results]
 865
 866    def get_issues_state(self, state) -> List["Issue"]:
 867        """
 868        DEPRECATED: Use get_issues() instead.
 869
 870        Get issues of state Issue.open or Issue.closed of a repository.
 871        """
 872
 873        assert state in [Issue.OPENED, Issue.CLOSED]
 874        issues = []
 875        data = {"state": state}
 876        results = self.allspice_client.requests_get_paginated(
 877            Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name),
 878            params=data,
 879        )
 880        for result in results:
 881            issue = Issue.parse_response(self.allspice_client, result)
 882            # adding data not contained in the issue response
 883            # See Issue.request()
 884            setattr(issue, "_repository", self)
 885            Issue._add_read_property("repo", self, issue)
 886            Issue._add_read_property("owner", self.owner, issue)
 887            issues.append(issue)
 888        return issues
 889
 890    def get_times(self):
 891        results = self.allspice_client.requests_get(
 892            Repository.REPO_TIMES % (self.owner.username, self.name)
 893        )
 894        return results
 895
 896    def get_user_time(self, username) -> float:
 897        if isinstance(username, User):
 898            username = username.username
 899        results = self.allspice_client.requests_get(
 900            Repository.REPO_USER_TIME % (self.owner.username, self.name, username)
 901        )
 902        time = sum(r["time"] for r in results)
 903        return time
 904
 905    def get_full_name(self) -> str:
 906        return self.owner.username + "/" + self.name
 907
 908    def create_issue(self, title, assignees=frozenset(), description="") -> ApiObject:
 909        data = {
 910            "assignees": assignees,
 911            "body": description,
 912            "closed": False,
 913            "title": title,
 914        }
 915        result = self.allspice_client.requests_post(
 916            Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name),
 917            data=data,
 918        )
 919
 920        issue = Issue.parse_response(self.allspice_client, result)
 921        setattr(issue, "_repository", self)
 922        Issue._add_read_property("repo", self, issue)
 923        return issue
 924
 925    def create_design_review(
 926        self,
 927        title: str,
 928        head: Union[Branch, str],
 929        base: Union[Branch, str],
 930        assignees: Optional[Set[Union[User, str]]] = None,
 931        body: Optional[str] = None,
 932        due_date: Optional[datetime] = None,
 933        milestone: Optional["Milestone"] = None,
 934    ) -> "DesignReview":
 935        """
 936        Create a new Design Review.
 937
 938        See https://hub.allspice.io/api/swagger#/repository/repoCreatePullRequest
 939
 940        :param title: Title of the Design Review
 941        :param head: Branch or name of the branch to merge into the base branch
 942        :param base: Branch or name of the branch to merge into
 943        :param assignees: Optional. A list of users to assign this review. List can be of
 944                          User objects or of usernames.
 945        :param body: An Optional Description for the Design Review.
 946        :param due_date: An Optional Due date for the Design Review.
 947        :param milestone: An Optional Milestone for the Design Review
 948        :return: The created Design Review
 949        """
 950
 951        data: dict[str, Any] = {
 952            "title": title,
 953        }
 954
 955        if isinstance(head, Branch):
 956            data["head"] = head.name
 957        else:
 958            data["head"] = head
 959        if isinstance(base, Branch):
 960            data["base"] = base.name
 961        else:
 962            data["base"] = base
 963        if assignees:
 964            data["assignees"] = [a.username if isinstance(a, User) else a for a in assignees]
 965        if body:
 966            data["body"] = body
 967        if due_date:
 968            data["due_date"] = Util.format_time(due_date)
 969        if milestone:
 970            data["milestone"] = milestone.id
 971
 972        result = self.allspice_client.requests_post(
 973            self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name),
 974            data=data,
 975        )
 976
 977        return DesignReview.parse_response(self.allspice_client, result)
 978
 979    def create_milestone(
 980        self,
 981        title: str,
 982        description: str,
 983        due_date: Optional[str] = None,
 984        state: str = "open",
 985    ) -> "Milestone":
 986        url = Repository.REPO_MILESTONES.format(owner=self.owner.username, repo=self.name)
 987        data = {"title": title, "description": description, "state": state}
 988        if due_date:
 989            data["due_date"] = due_date
 990        result = self.allspice_client.requests_post(url, data=data)
 991        return Milestone.parse_response(self.allspice_client, result)
 992
 993    def create_gitea_hook(self, hook_url: str, events: List[str]):
 994        url = f"/repos/{self.owner.username}/{self.name}/hooks"
 995        data = {
 996            "type": "gitea",
 997            "config": {"content_type": "json", "url": hook_url},
 998            "events": events,
 999            "active": True,
1000        }
1001        return self.allspice_client.requests_post(url, data=data)
1002
1003    def list_hooks(self):
1004        url = f"/repos/{self.owner.username}/{self.name}/hooks"
1005        return self.allspice_client.requests_get(url)
1006
1007    def delete_hook(self, id: str):
1008        url = f"/repos/{self.owner.username}/{self.name}/hooks/{id}"
1009        self.allspice_client.requests_delete(url)
1010
1011    def is_collaborator(self, username) -> bool:
1012        if isinstance(username, User):
1013            username = username.username
1014        try:
1015            # returns 204 if its ok, 404 if its not
1016            self.allspice_client.requests_get(
1017                Repository.REPO_IS_COLLABORATOR % (self.owner.username, self.name, username)
1018            )
1019            return True
1020        except Exception:
1021            return False
1022
1023    def get_users_with_access(self) -> Sequence[User]:
1024        url = f"/repos/{self.owner.username}/{self.name}/collaborators"
1025        response = self.allspice_client.requests_get(url)
1026        collabs = [User.parse_response(self.allspice_client, user) for user in response]
1027        if isinstance(self.owner, User):
1028            return [*collabs, self.owner]
1029        else:
1030            # owner must be org
1031            teams = self.owner.get_teams()
1032            for team in teams:
1033                team_repos = team.get_repos()
1034                if self.name in [n.name for n in team_repos]:
1035                    collabs += team.get_members()
1036            return collabs
1037
1038    def remove_collaborator(self, user_name: str):
1039        url = f"/repos/{self.owner.username}/{self.name}/collaborators/{user_name}"
1040        self.allspice_client.requests_delete(url)
1041
1042    def transfer_ownership(
1043        self,
1044        new_owner: Union[User, Organization],
1045        new_teams: Set[Team] | FrozenSet[Team] = frozenset(),
1046    ):
1047        url = Repository.REPO_TRANSFER.format(owner=self.owner.username, repo=self.name)
1048        data: dict[str, Any] = {"new_owner": new_owner.username}
1049        if isinstance(new_owner, Organization):
1050            new_team_ids = [team.id for team in new_teams if team in new_owner.get_teams()]
1051            data["team_ids"] = new_team_ids
1052        self.allspice_client.requests_post(url, data=data)
1053        # TODO: make sure this instance is either updated or discarded
1054
1055    def get_git_content(
1056        self,
1057        ref: Optional["Ref"] = None,
1058        commit: "Optional[Commit]" = None,
1059    ) -> List[Content]:
1060        """
1061        Get the metadata for all files in the root directory.
1062
1063        https://hub.allspice.io/api/swagger#/repository/repoGetContentsList
1064
1065        :param ref: branch or commit to get content from
1066        :param commit: commit to get content from (deprecated)
1067        """
1068        url = f"/repos/{self.owner.username}/{self.name}/contents"
1069        data = Util.data_params_for_ref(ref or commit)
1070
1071        result = [
1072            Content.parse_response(self.allspice_client, f)
1073            for f in self.allspice_client.requests_get(url, data)
1074        ]
1075        return result
1076
1077    def get_tree(self, ref: Optional[Ref] = None, recursive: bool = False) -> List[GitEntry]:
1078        """
1079        Get the repository's tree on a given ref.
1080
1081        By default, this will only return the top-level entries in the tree. If you want
1082        to get the entire tree, set `recursive` to True.
1083
1084        :param ref: The ref to get the tree from. If not provided, the default branch is used.
1085        :param recursive: Whether to get the entire tree or just the top-level entries.
1086        """
1087
1088        ref = Util.data_params_for_ref(ref).get("ref", self.default_branch)
1089        url = self.REPO_GET_TREE.format(owner=self.owner.username, repo=self.name, ref=ref)
1090        params = {"recursive": recursive}
1091        results = self.allspice_client.requests_get_paginated(url, params=params)
1092        return [GitEntry.parse_response(self.allspice_client, result) for result in results]
1093
1094    def get_file_content(
1095        self,
1096        content: Content,
1097        ref: Optional[Ref] = None,
1098        commit: Optional[Commit] = None,
1099    ) -> Union[str, List["Content"]]:
1100        """https://hub.allspice.io/api/swagger#/repository/repoGetContents"""
1101        url = f"/repos/{self.owner.username}/{self.name}/contents/{content.path}"
1102        data = Util.data_params_for_ref(ref or commit)
1103
1104        if content.type == Content.FILE:
1105            return self.allspice_client.requests_get(url, data)["content"]
1106        else:
1107            return [
1108                Content.parse_response(self.allspice_client, f)
1109                for f in self.allspice_client.requests_get(url, data)
1110            ]
1111
1112    def get_raw_file(
1113        self,
1114        file_path: str,
1115        ref: Optional[Ref] = None,
1116    ) -> bytes:
1117        """
1118        Get the raw, binary data of a single file.
1119
1120        Note 1: if the file you are requesting is a text file, you might want to
1121        use .decode() on the result to get a string. For example:
1122
1123            content = repo.get_raw_file("file.txt").decode("utf-8")
1124
1125        Note 2: this method will store the entire file in memory. If you want
1126        to download a large file, you might want to use `download_to_file`
1127        instead.
1128
1129        See https://hub.allspice.io/api/swagger#/repository/repoGetRawFile
1130
1131        :param file_path: The path to the file to get.
1132        :param ref: The branch or commit to get the file from.  If not provided,
1133            the default branch is used.
1134        """
1135
1136        url = self.REPO_GET_RAW_FILE.format(
1137            owner=self.owner.username,
1138            repo=self.name,
1139            path=file_path,
1140        )
1141        params = Util.data_params_for_ref(ref)
1142        return self.allspice_client.requests_get_raw(url, params=params)
1143
1144    def download_to_file(
1145        self,
1146        file_path: str,
1147        io: IO,
1148        ref: Optional[Ref] = None,
1149    ) -> None:
1150        """
1151        Download the binary data of a file to a file-like object.
1152
1153        Example:
1154
1155            with open("schematic.DSN", "wb") as f:
1156                Repository.download_to_file("Schematics/my_schematic.DSN", f)
1157
1158        :param file_path: The path to the file in the repository from the root
1159            of the repository.
1160        :param io: The file-like object to write the data to.
1161        """
1162
1163        url = self.allspice_client._AllSpice__get_url(
1164            self.REPO_GET_RAW_FILE.format(
1165                owner=self.owner.username,
1166                repo=self.name,
1167                path=file_path,
1168            )
1169        )
1170        params = Util.data_params_for_ref(ref)
1171        response = self.allspice_client.requests.get(
1172            url,
1173            params=params,
1174            headers=self.allspice_client.headers,
1175            stream=True,
1176        )
1177
1178        for chunk in response.iter_content(chunk_size=4096):
1179            if chunk:
1180                io.write(chunk)
1181
1182    def get_generated_json(self, content: Union[Content, str], ref: Optional[Ref] = None) -> dict:
1183        """
1184        Get the json blob for a cad file if it exists, otherwise enqueue
1185        a new job and return a 503 status.
1186
1187        WARNING: This is still experimental and not recommended for critical
1188        applications. The structure and content of the returned dictionary can
1189        change at any time.
1190
1191        See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON
1192        """
1193
1194        if isinstance(content, Content):
1195            content = content.path
1196
1197        url = self.REPO_GET_ALLSPICE_JSON.format(
1198            owner=self.owner.username,
1199            repo=self.name,
1200            content=content,
1201        )
1202        data = Util.data_params_for_ref(ref)
1203        return self.allspice_client.requests_get(url, data)
1204
1205    def get_generated_svg(self, content: Union[Content, str], ref: Optional[Ref] = None) -> bytes:
1206        """
1207        Get the svg blob for a cad file if it exists, otherwise enqueue
1208        a new job and return a 503 status.
1209
1210        WARNING: This is still experimental and not yet recommended for
1211        critical applications. The content of the returned svg can change
1212        at any time.
1213
1214        See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG
1215        """
1216
1217        if isinstance(content, Content):
1218            content = content.path
1219
1220        url = self.REPO_GET_ALLSPICE_SVG.format(
1221            owner=self.owner.username,
1222            repo=self.name,
1223            content=content,
1224        )
1225        data = Util.data_params_for_ref(ref)
1226        return self.allspice_client.requests_get_raw(url, data)
1227
1228    def create_file(self, file_path: str, content: str, data: Optional[dict] = None):
1229        """https://hub.allspice.io/api/swagger#/repository/repoCreateFile"""
1230        if not data:
1231            data = {}
1232        url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}"
1233        data.update({"content": content})
1234        return self.allspice_client.requests_post(url, data)
1235
1236    def change_file(self, file_path: str, file_sha: str, content: str, data: Optional[dict] = None):
1237        """https://hub.allspice.io/api/swagger#/repository/repoCreateFile"""
1238        if not data:
1239            data = {}
1240        url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}"
1241        data.update({"sha": file_sha, "content": content})
1242        return self.allspice_client.requests_put(url, data)
1243
1244    def delete_file(self, file_path: str, file_sha: str, data: Optional[dict] = None):
1245        """https://hub.allspice.io/api/swagger#/repository/repoDeleteFile"""
1246        if not data:
1247            data = {}
1248        url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}"
1249        data.update({"sha": file_sha})
1250        return self.allspice_client.requests_delete(url, data)
1251
1252    def get_archive(
1253        self,
1254        ref: Ref = "main",
1255        archive_format: ArchiveFormat = ArchiveFormat.ZIP,
1256    ) -> bytes:
1257        """
1258        Download all the files in a specific ref of a repository as a zip or tarball
1259        archive.
1260
1261        https://hub.allspice.io/api/swagger#/repository/repoGetArchive
1262
1263        :param ref: branch or commit to get content from, defaults to the "main" branch
1264        :param archive_format: zip or tar, defaults to zip
1265        """
1266
1267        ref_string = Util.data_params_for_ref(ref)["ref"]
1268        url = self.REPO_GET_ARCHIVE.format(
1269            owner=self.owner.username,
1270            repo=self.name,
1271            ref=ref_string,
1272            format=archive_format.value,
1273        )
1274        return self.allspice_client.requests_get_raw(url)
1275
1276    def get_topics(self) -> list[str]:
1277        """
1278        Gets the list of topics on this repository.
1279
1280        See http://localhost:3000/api/swagger#/repository/repoListTopics
1281        """
1282
1283        url = self.REPO_GET_TOPICS.format(
1284            owner=self.owner.username,
1285            repo=self.name,
1286        )
1287        return self.allspice_client.requests_get(url)["topics"]
1288
1289    def add_topic(self, topic: str):
1290        """
1291        Adds a topic to the repository.
1292
1293        See https://hub.allspice.io/api/swagger#/repository/repoAddTopic
1294
1295        :param topic: The topic to add. Topic names must consist only of
1296            lowercase letters, numnbers and dashes (-), and cannot start with
1297            dashes. Topic names also must be under 35 characters long.
1298        """
1299
1300        url = self.REPO_ADD_TOPIC.format(owner=self.owner.username, repo=self.name, topic=topic)
1301        self.allspice_client.requests_put(url)
1302
1303    def create_release(
1304        self,
1305        tag_name: str,
1306        name: Optional[str] = None,
1307        body: Optional[str] = None,
1308        draft: bool = False,
1309    ):
1310        """
1311        Create a release for this repository. The release will be created for
1312        the tag with the given name. If there is no tag with this name, create
1313        the tag first.
1314
1315        See https://hub.allspice.io/api/swagger#/repository/repoCreateRelease
1316        """
1317
1318        url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name)
1319        data = {
1320            "tag_name": tag_name,
1321            "draft": draft,
1322        }
1323        if name is not None:
1324            data["name"] = name
1325        if body is not None:
1326            data["body"] = body
1327        response = self.allspice_client.requests_post(url, data)
1328        return Release.parse_response(self.allspice_client, response, self)
1329
1330    def get_releases(
1331        self, draft: Optional[bool] = None, pre_release: Optional[bool] = None
1332    ) -> List[Release]:
1333        """
1334        Get the list of releases for this repository.
1335
1336        See https://hub.allspice.io/api/swagger#/repository/repoListReleases
1337        """
1338
1339        data = {}
1340
1341        if draft is not None:
1342            data["draft"] = draft
1343        if pre_release is not None:
1344            data["pre-release"] = pre_release
1345
1346        url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name)
1347        responses = self.allspice_client.requests_get_paginated(url, params=data)
1348
1349        return [
1350            Release.parse_response(self.allspice_client, response, self) for response in responses
1351        ]
1352
1353    def get_latest_release(self) -> Release:
1354        """
1355        Get the latest release for this repository.
1356
1357        See https://hub.allspice.io/api/swagger#/repository/repoGetLatestRelease
1358        """
1359
1360        url = self.REPO_GET_LATEST_RELEASE.format(owner=self.owner.username, repo=self.name)
1361        response = self.allspice_client.requests_get(url)
1362        release = Release.parse_response(self.allspice_client, response, self)
1363        return release
1364
1365    def get_release_by_tag(self, tag: str) -> Release:
1366        """
1367        Get a release by its tag.
1368
1369        See https://hub.allspice.io/api/swagger#/repository/repoGetReleaseByTag
1370        """
1371
1372        url = self.REPO_GET_RELEASE_BY_TAG.format(
1373            owner=self.owner.username, repo=self.name, tag=tag
1374        )
1375        response = self.allspice_client.requests_get(url)
1376        release = Release.parse_response(self.allspice_client, response, self)
1377        return release
1378
1379    def get_commit_statuses(
1380        self,
1381        commit: Union[str, Commit],
1382        sort: Optional[CommitStatusSort] = None,
1383        state: Optional[CommitStatusState] = None,
1384    ) -> List[CommitStatus]:
1385        """
1386        Get a list of statuses for a commit.
1387
1388        This is roughly equivalent to the Commit.get_statuses method, but this
1389        method allows you to sort and filter commits and is more convenient if
1390        you have a commit SHA and don't need to get the commit itself.
1391
1392        See https://hub.allspice.io/api/swagger#/repository/repoListStatuses
1393        """
1394
1395        if isinstance(commit, Commit):
1396            commit = commit.sha
1397
1398        params = {}
1399        if sort is not None:
1400            params["sort"] = sort.value
1401        if state is not None:
1402            params["state"] = state.value
1403
1404        url = self.REPO_GET_COMMIT_STATUS.format(
1405            owner=self.owner.username, repo=self.name, sha=commit
1406        )
1407        response = self.allspice_client.requests_get_paginated(url, params=params)
1408        return [CommitStatus.parse_response(self.allspice_client, status) for status in response]
1409
1410    def create_commit_status(
1411        self,
1412        commit: Union[str, Commit],
1413        context: Optional[str] = None,
1414        description: Optional[str] = None,
1415        state: Optional[CommitStatusState] = None,
1416        target_url: Optional[str] = None,
1417    ) -> CommitStatus:
1418        """
1419        Create a status on a commit.
1420
1421        See https://hub.allspice.io/api/swagger#/repository/repoCreateStatus
1422        """
1423
1424        if isinstance(commit, Commit):
1425            commit = commit.sha
1426
1427        data = {}
1428        if context is not None:
1429            data["context"] = context
1430        if description is not None:
1431            data["description"] = description
1432        if state is not None:
1433            data["state"] = state.value
1434        if target_url is not None:
1435            data["target_url"] = target_url
1436
1437        url = self.REPO_GET_COMMIT_STATUS.format(
1438            owner=self.owner.username, repo=self.name, sha=commit
1439        )
1440        response = self.allspice_client.requests_post(url, data=data)
1441        return CommitStatus.parse_response(self.allspice_client, response)
1442
1443    def delete(self):
1444        self.allspice_client.requests_delete(
1445            Repository.REPO_DELETE % (self.owner.username, self.name)
1446        )
1447        self.deleted = True
Repository(allspice_client)
586    def __init__(self, allspice_client):
587        super().__init__(allspice_client)
allow_fast_forward_only_merge: bool
allow_manual_merge: Any
allow_merge_commits: bool
allow_rebase: bool
allow_rebase_explicit: bool
allow_rebase_update: bool
allow_squash_merge: bool
archived: bool
archived_at: str
autodetect_manual_merge: Any
avatar_url: str
clone_url: str
created_at: str
default_allow_maintainer_edit: bool
default_branch: str
default_delete_branch_after_merge: bool
default_merge_style: str
description: str
empty: bool
enable_prune: Any
external_tracker: Any
external_wiki: Any
fork: bool
forks_count: int
full_name: str
has_actions: bool
has_issues: bool
has_packages: bool
has_projects: bool
has_pull_requests: bool
has_releases: bool
has_wiki: bool
html_url: str
id: int
ignore_whitespace_conflicts: bool
internal: bool
internal_tracker: Dict[str, bool]
language: str
languages_url: str
mirror: bool
mirror_interval: str
mirror_updated: str
name: str
object_format_name: str
open_issues_count: int
open_pr_counter: int
original_url: str
owner: Union[User, Organization]
parent: Any
permissions: Dict[str, bool]
private: bool
projects_mode: str
release_counter: int
repo_transfer: Any
size: int
ssh_url: str
stars_count: int
template: bool
updated_at: datetime.datetime
url: str
watchers_count: int
website: str
API_OBJECT = '/repos/{owner}/{name}'
REPO_IS_COLLABORATOR = '/repos/%s/%s/collaborators/%s'
REPO_BRANCHES = '/repos/%s/%s/branches'
REPO_BRANCH = '/repos/{owner}/{repo}/branches/{branch}'
REPO_ISSUES = '/repos/{owner}/{repo}/issues'
REPO_DESIGN_REVIEWS = '/repos/{owner}/{repo}/pulls'
REPO_DELETE = '/repos/%s/%s'
REPO_TIMES = '/repos/%s/%s/times'
REPO_USER_TIME = '/repos/%s/%s/times/%s'
REPO_COMMITS = '/repos/%s/%s/commits'
REPO_TRANSFER = '/repos/{owner}/{repo}/transfer'
REPO_MILESTONES = '/repos/{owner}/{repo}/milestones'
REPO_GET_ARCHIVE = '/repos/{owner}/{repo}/archive/{ref}.{format}'
REPO_GET_ALLSPICE_JSON = '/repos/{owner}/{repo}/allspice_generated/json/{content}'
REPO_GET_ALLSPICE_SVG = '/repos/{owner}/{repo}/allspice_generated/svg/{content}'
REPO_GET_TOPICS = '/repos/{owner}/{repo}/topics'
REPO_ADD_TOPIC = '/repos/{owner}/{repo}/topics/{topic}'
REPO_GET_RELEASES = '/repos/{owner}/{repo}/releases'
REPO_GET_LATEST_RELEASE = '/repos/{owner}/{repo}/releases/latest'
REPO_GET_RELEASE_BY_TAG = '/repos/{owner}/{repo}/releases/tags/{tag}'
REPO_GET_COMMIT_STATUS = '/repos/{owner}/{repo}/statuses/{sha}'
REPO_GET_RAW_FILE = '/repos/{owner}/{repo}/raw/{path}'
REPO_GET_TREE = '/repos/{owner}/{repo}/git/trees/{ref}'
@classmethod
def request( cls, allspice_client, owner: str, name: str) -> Repository:
607    @classmethod
608    def request(
609        cls,
610        allspice_client,
611        owner: str,
612        name: str,
613    ) -> Repository:
614        return cls._request(allspice_client, {"owner": owner, "name": name})
@classmethod
def search( cls, allspice_client, query: Optional[str] = None, topic: bool = False, include_description: bool = False, user: Optional[User] = None, owner_to_prioritize: Union[User, Organization, NoneType] = None) -> list[Repository]:
616    @classmethod
617    def search(
618        cls,
619        allspice_client,
620        query: Optional[str] = None,
621        topic: bool = False,
622        include_description: bool = False,
623        user: Optional[User] = None,
624        owner_to_prioritize: Union[User, Organization, None] = None,
625    ) -> list[Repository]:
626        """
627        Search for repositories.
628
629        See https://hub.allspice.io/api/swagger#/repository/repoSearch
630
631        :param query: The query string to search for
632        :param topic: If true, the query string will only be matched against the
633            repository's topic.
634        :param include_description: If true, the query string will be matched
635            against the repository's description as well.
636        :param user: If specified, only repositories that this user owns or
637            contributes to will be searched.
638        :param owner_to_prioritize: If specified, repositories owned by the
639            given entity will be prioritized in the search.
640        :returns: All repositories matching the query. If there are many
641            repositories matching this query, this may take some time.
642        """
643
644        params = {}
645
646        if query is not None:
647            params["q"] = query
648        if topic:
649            params["topic"] = topic
650        if include_description:
651            params["include_description"] = include_description
652        if user is not None:
653            params["user"] = user.id
654        if owner_to_prioritize is not None:
655            params["owner_to_prioritize"] = owner_to_prioritize.id
656
657        responses = allspice_client.requests_get_paginated(cls.REPO_SEARCH, params=params)
658
659        return [Repository.parse_response(allspice_client, response) for response in responses]

Search for repositories.

See allspice.allspice.io/api/swagger#/repository/repoSearch">https://huballspice.allspice.io/api/swagger#/repository/repoSearch

Parameters
  • query: The query string to search for
  • topic: If true, the query string will only be matched against the repository's topic.
  • include_description: If true, the query string will be matched against the repository's description as well.
  • user: If specified, only repositories that this user owns or contributes to will be searched.
  • owner_to_prioritize: If specified, repositories owned by the given entity will be prioritized in the search. :returns: All repositories matching the query. If there are many repositories matching this query, this may take some time.
def commit(self):
691    def commit(self):
692        args = {"owner": self.owner.username, "name": self.name}
693        self._commit(args)
def get_branches(self) -> List[Branch]:
695    def get_branches(self) -> List["Branch"]:
696        """Get all the Branches of this Repository."""
697
698        results = self.allspice_client.requests_get_paginated(
699            Repository.REPO_BRANCHES % (self.owner.username, self.name)
700        )
701        return [Branch.parse_response(self.allspice_client, result) for result in results]

Get all the Branches of this Repository.

def get_branch(self, name: str) -> Branch:
703    def get_branch(self, name: str) -> "Branch":
704        """Get a specific Branch of this Repository."""
705        result = self.allspice_client.requests_get(
706            Repository.REPO_BRANCH.format(owner=self.owner.username, repo=self.name, branch=name)
707        )
708        return Branch.parse_response(self.allspice_client, result)

Get a specific Branch of this Repository.

def add_branch( self, create_from: Union[Branch, Commit, str], newname: str) -> Branch:
710    def add_branch(self, create_from: Ref, newname: str) -> "Branch":
711        """Add a branch to the repository"""
712        # Note: will only work with gitea 1.13 or higher!
713
714        ref_name = Util.data_params_for_ref(create_from)
715        if "ref" not in ref_name:
716            raise ValueError("create_from must be a Branch, Commit or string")
717        ref_name = ref_name["ref"]
718
719        data = {"new_branch_name": newname, "old_ref_name": ref_name}
720        result = self.allspice_client.requests_post(
721            Repository.REPO_BRANCHES % (self.owner.username, self.name), data=data
722        )
723        return Branch.parse_response(self.allspice_client, result)

Add a branch to the repository

def get_issues( self, state: Literal['open', 'closed', 'all'] = 'all', search_query: Optional[str] = None, labels: Optional[List[str]] = None, milestones: Optional[List[Union[Milestone, str]]] = None, assignee: Union[User, str, NoneType] = None, since: Optional[datetime.datetime] = None, before: Optional[datetime.datetime] = None) -> List[Issue]:
725    def get_issues(
726        self,
727        state: Literal["open", "closed", "all"] = "all",
728        search_query: Optional[str] = None,
729        labels: Optional[List[str]] = None,
730        milestones: Optional[List[Union[Milestone, str]]] = None,
731        assignee: Optional[Union[User, str]] = None,
732        since: Optional[datetime] = None,
733        before: Optional[datetime] = None,
734    ) -> List["Issue"]:
735        """
736        Get all Issues of this Repository (open and closed)
737
738        https://hub.allspice.io/api/swagger#/repository/repoListIssues
739
740        All params of this method are optional filters. If you don't specify a filter, it
741        will not be applied.
742
743        :param state: The state of the Issues to get. If None, all Issues are returned.
744        :param search_query: Filter issues by text. This is equivalent to searching for
745                             `search_query` in the Issues on the web interface.
746        :param labels: Filter issues by labels.
747        :param milestones: Filter issues by milestones.
748        :param assignee: Filter issues by the assigned user.
749        :param since: Filter issues by the date they were created.
750        :param before: Filter issues by the date they were created.
751        :return: A list of Issues.
752        """
753
754        data = {
755            "state": state,
756        }
757        if search_query:
758            data["q"] = search_query
759        if labels:
760            data["labels"] = ",".join(labels)
761        if milestones:
762            data["milestone"] = ",".join(
763                [
764                    milestone.name if isinstance(milestone, Milestone) else milestone
765                    for milestone in milestones
766                ]
767            )
768        if assignee:
769            if isinstance(assignee, User):
770                data["assignee"] = assignee.username
771            else:
772                data["assignee"] = assignee
773        if since:
774            data["since"] = Util.format_time(since)
775        if before:
776            data["before"] = Util.format_time(before)
777
778        results = self.allspice_client.requests_get_paginated(
779            Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name),
780            params=data,
781        )
782
783        issues = []
784        for result in results:
785            issue = Issue.parse_response(self.allspice_client, result)
786            # See Issue.request
787            setattr(issue, "_repository", self)
788            # This is mostly for compatibility with an older implementation
789            Issue._add_read_property("repo", self, issue)
790            issues.append(issue)
791
792        return issues

Get all Issues of this Repository (open and closed)

allspice.allspice.io/api/swagger#/repository/repoListIssues">https://huballspice.allspice.io/api/swagger#/repository/repoListIssues

All params of this method are optional filters. If you don't specify a filter, it will not be applied.

Parameters
  • state: The state of the Issues to get. If None, all Issues are returned.
  • search_query: Filter issues by text. This is equivalent to searching for search_query in the Issues on the web interface.
  • labels: Filter issues by labels.
  • milestones: Filter issues by milestones.
  • assignee: Filter issues by the assigned user.
  • since: Filter issues by the date they were created.
  • before: Filter issues by the date they were created.
Returns

A list of Issues.

def get_design_reviews( self, state: Literal['open', 'closed', 'all'] = 'all', milestone: Union[Milestone, str, NoneType] = None, labels: Optional[List[str]] = None) -> List[DesignReview]:
794    def get_design_reviews(
795        self,
796        state: Literal["open", "closed", "all"] = "all",
797        milestone: Optional[Union[Milestone, str]] = None,
798        labels: Optional[List[str]] = None,
799    ) -> List["DesignReview"]:
800        """
801        Get all Design Reviews of this Repository.
802
803        https://hub.allspice.io/api/swagger#/repository/repoListPullRequests
804
805        :param state: The state of the Design Reviews to get. If None, all Design Reviews
806                      are returned.
807        :param milestone: The milestone of the Design Reviews to get.
808        :param labels: A list of label IDs to filter DRs by.
809        :return: A list of Design Reviews.
810        """
811
812        params = {
813            "state": state,
814        }
815        if milestone:
816            if isinstance(milestone, Milestone):
817                params["milestone"] = milestone.name
818            else:
819                params["milestone"] = milestone
820        if labels:
821            params["labels"] = ",".join(labels)
822
823        results = self.allspice_client.requests_get_paginated(
824            self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name),
825            params=params,
826        )
827        return [DesignReview.parse_response(self.allspice_client, result) for result in results]

Get all Design Reviews of this Repository.

allspice.allspice.io/api/swagger#/repository/repoListPullRequests">https://huballspice.allspice.io/api/swagger#/repository/repoListPullRequests

Parameters
  • state: The state of the Design Reviews to get. If None, all Design Reviews are returned.
  • milestone: The milestone of the Design Reviews to get.
  • labels: A list of label IDs to filter DRs by.
Returns

A list of Design Reviews.

def get_commits( self, sha: Optional[str] = None, path: Optional[str] = None, stat: bool = True) -> List[Commit]:
829    def get_commits(
830        self,
831        sha: Optional[str] = None,
832        path: Optional[str] = None,
833        stat: bool = True,
834    ) -> List["Commit"]:
835        """
836        Get all the Commits of this Repository.
837
838        https://hub.allspice.io/api/swagger#/repository/repoGetAllCommits
839
840        :param sha: The SHA of the commit to start listing commits from.
841        :param path: filepath of a file/dir.
842        :param stat: Include the number of additions and deletions in the response.
843                     Disable for speedup.
844        :return: A list of Commits.
845        """
846
847        data = {}
848        if sha:
849            data["sha"] = sha
850        if path:
851            data["path"] = path
852        if not stat:
853            data["stat"] = False
854
855        try:
856            results = self.allspice_client.requests_get_paginated(
857                Repository.REPO_COMMITS % (self.owner.username, self.name),
858                params=data,
859            )
860        except ConflictException as err:
861            logging.warning(err)
862            logging.warning("Repository %s/%s is Empty" % (self.owner.username, self.name))
863            results = []
864        return [Commit.parse_response(self.allspice_client, result) for result in results]

Get all the Commits of this Repository.

allspice.allspice.io/api/swagger#/repository/repoGetAllCommits">https://huballspice.allspice.io/api/swagger#/repository/repoGetAllCommits

Parameters
  • sha: The SHA of the commit to start listing commits from.
  • path: filepath of a file/dir.
  • stat: Include the number of additions and deletions in the response. Disable for speedup.
Returns

A list of Commits.

def get_issues_state(self, state) -> List[Issue]:
866    def get_issues_state(self, state) -> List["Issue"]:
867        """
868        DEPRECATED: Use get_issues() instead.
869
870        Get issues of state Issue.open or Issue.closed of a repository.
871        """
872
873        assert state in [Issue.OPENED, Issue.CLOSED]
874        issues = []
875        data = {"state": state}
876        results = self.allspice_client.requests_get_paginated(
877            Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name),
878            params=data,
879        )
880        for result in results:
881            issue = Issue.parse_response(self.allspice_client, result)
882            # adding data not contained in the issue response
883            # See Issue.request()
884            setattr(issue, "_repository", self)
885            Issue._add_read_property("repo", self, issue)
886            Issue._add_read_property("owner", self.owner, issue)
887            issues.append(issue)
888        return issues

DEPRECATED: Use get_issues() instead.

Get issues of state Issue.open or Issue.closed of a repository.

def get_times(self):
890    def get_times(self):
891        results = self.allspice_client.requests_get(
892            Repository.REPO_TIMES % (self.owner.username, self.name)
893        )
894        return results
def get_user_time(self, username) -> float:
896    def get_user_time(self, username) -> float:
897        if isinstance(username, User):
898            username = username.username
899        results = self.allspice_client.requests_get(
900            Repository.REPO_USER_TIME % (self.owner.username, self.name, username)
901        )
902        time = sum(r["time"] for r in results)
903        return time
def get_full_name(self) -> str:
905    def get_full_name(self) -> str:
906        return self.owner.username + "/" + self.name
def create_issue( self, title, assignees=frozenset(), description='') -> allspice.baseapiobject.ApiObject:
908    def create_issue(self, title, assignees=frozenset(), description="") -> ApiObject:
909        data = {
910            "assignees": assignees,
911            "body": description,
912            "closed": False,
913            "title": title,
914        }
915        result = self.allspice_client.requests_post(
916            Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name),
917            data=data,
918        )
919
920        issue = Issue.parse_response(self.allspice_client, result)
921        setattr(issue, "_repository", self)
922        Issue._add_read_property("repo", self, issue)
923        return issue
def create_design_review( self, title: str, head: Union[Branch, str], base: Union[Branch, str], assignees: Optional[Set[Union[User, str]]] = None, body: Optional[str] = None, due_date: Optional[datetime.datetime] = None, milestone: Optional[Milestone] = None) -> DesignReview:
925    def create_design_review(
926        self,
927        title: str,
928        head: Union[Branch, str],
929        base: Union[Branch, str],
930        assignees: Optional[Set[Union[User, str]]] = None,
931        body: Optional[str] = None,
932        due_date: Optional[datetime] = None,
933        milestone: Optional["Milestone"] = None,
934    ) -> "DesignReview":
935        """
936        Create a new Design Review.
937
938        See https://hub.allspice.io/api/swagger#/repository/repoCreatePullRequest
939
940        :param title: Title of the Design Review
941        :param head: Branch or name of the branch to merge into the base branch
942        :param base: Branch or name of the branch to merge into
943        :param assignees: Optional. A list of users to assign this review. List can be of
944                          User objects or of usernames.
945        :param body: An Optional Description for the Design Review.
946        :param due_date: An Optional Due date for the Design Review.
947        :param milestone: An Optional Milestone for the Design Review
948        :return: The created Design Review
949        """
950
951        data: dict[str, Any] = {
952            "title": title,
953        }
954
955        if isinstance(head, Branch):
956            data["head"] = head.name
957        else:
958            data["head"] = head
959        if isinstance(base, Branch):
960            data["base"] = base.name
961        else:
962            data["base"] = base
963        if assignees:
964            data["assignees"] = [a.username if isinstance(a, User) else a for a in assignees]
965        if body:
966            data["body"] = body
967        if due_date:
968            data["due_date"] = Util.format_time(due_date)
969        if milestone:
970            data["milestone"] = milestone.id
971
972        result = self.allspice_client.requests_post(
973            self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name),
974            data=data,
975        )
976
977        return DesignReview.parse_response(self.allspice_client, result)

Create a new Design Review.

See allspice.allspice.io/api/swagger#/repository/repoCreatePullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoCreatePullRequest

Parameters
  • title: Title of the Design Review
  • head: Branch or name of the branch to merge into the base branch
  • base: Branch or name of the branch to merge into
  • assignees: Optional. A list of users to assign this review. List can be of User objects or of usernames.
  • body: An Optional Description for the Design Review.
  • due_date: An Optional Due date for the Design Review.
  • milestone: An Optional Milestone for the Design Review
Returns

The created Design Review

def create_milestone( self, title: str, description: str, due_date: Optional[str] = None, state: str = 'open') -> Milestone:
979    def create_milestone(
980        self,
981        title: str,
982        description: str,
983        due_date: Optional[str] = None,
984        state: str = "open",
985    ) -> "Milestone":
986        url = Repository.REPO_MILESTONES.format(owner=self.owner.username, repo=self.name)
987        data = {"title": title, "description": description, "state": state}
988        if due_date:
989            data["due_date"] = due_date
990        result = self.allspice_client.requests_post(url, data=data)
991        return Milestone.parse_response(self.allspice_client, result)
def create_gitea_hook(self, hook_url: str, events: List[str]):
 993    def create_gitea_hook(self, hook_url: str, events: List[str]):
 994        url = f"/repos/{self.owner.username}/{self.name}/hooks"
 995        data = {
 996            "type": "gitea",
 997            "config": {"content_type": "json", "url": hook_url},
 998            "events": events,
 999            "active": True,
1000        }
1001        return self.allspice_client.requests_post(url, data=data)
def list_hooks(self):
1003    def list_hooks(self):
1004        url = f"/repos/{self.owner.username}/{self.name}/hooks"
1005        return self.allspice_client.requests_get(url)
def delete_hook(self, id: str):
1007    def delete_hook(self, id: str):
1008        url = f"/repos/{self.owner.username}/{self.name}/hooks/{id}"
1009        self.allspice_client.requests_delete(url)
def is_collaborator(self, username) -> bool:
1011    def is_collaborator(self, username) -> bool:
1012        if isinstance(username, User):
1013            username = username.username
1014        try:
1015            # returns 204 if its ok, 404 if its not
1016            self.allspice_client.requests_get(
1017                Repository.REPO_IS_COLLABORATOR % (self.owner.username, self.name, username)
1018            )
1019            return True
1020        except Exception:
1021            return False
def get_users_with_access(self) -> Sequence[User]:
1023    def get_users_with_access(self) -> Sequence[User]:
1024        url = f"/repos/{self.owner.username}/{self.name}/collaborators"
1025        response = self.allspice_client.requests_get(url)
1026        collabs = [User.parse_response(self.allspice_client, user) for user in response]
1027        if isinstance(self.owner, User):
1028            return [*collabs, self.owner]
1029        else:
1030            # owner must be org
1031            teams = self.owner.get_teams()
1032            for team in teams:
1033                team_repos = team.get_repos()
1034                if self.name in [n.name for n in team_repos]:
1035                    collabs += team.get_members()
1036            return collabs
def remove_collaborator(self, user_name: str):
1038    def remove_collaborator(self, user_name: str):
1039        url = f"/repos/{self.owner.username}/{self.name}/collaborators/{user_name}"
1040        self.allspice_client.requests_delete(url)
def transfer_ownership( self, new_owner: Union[User, Organization], new_teams: Union[Set[Team], FrozenSet[Team]] = frozenset()):
1042    def transfer_ownership(
1043        self,
1044        new_owner: Union[User, Organization],
1045        new_teams: Set[Team] | FrozenSet[Team] = frozenset(),
1046    ):
1047        url = Repository.REPO_TRANSFER.format(owner=self.owner.username, repo=self.name)
1048        data: dict[str, Any] = {"new_owner": new_owner.username}
1049        if isinstance(new_owner, Organization):
1050            new_team_ids = [team.id for team in new_teams if team in new_owner.get_teams()]
1051            data["team_ids"] = new_team_ids
1052        self.allspice_client.requests_post(url, data=data)
1053        # TODO: make sure this instance is either updated or discarded
def get_git_content( self, ref: Union[Branch, Commit, str, NoneType] = None, commit: Optional[Commit] = None) -> List[Content]:
1055    def get_git_content(
1056        self,
1057        ref: Optional["Ref"] = None,
1058        commit: "Optional[Commit]" = None,
1059    ) -> List[Content]:
1060        """
1061        Get the metadata for all files in the root directory.
1062
1063        https://hub.allspice.io/api/swagger#/repository/repoGetContentsList
1064
1065        :param ref: branch or commit to get content from
1066        :param commit: commit to get content from (deprecated)
1067        """
1068        url = f"/repos/{self.owner.username}/{self.name}/contents"
1069        data = Util.data_params_for_ref(ref or commit)
1070
1071        result = [
1072            Content.parse_response(self.allspice_client, f)
1073            for f in self.allspice_client.requests_get(url, data)
1074        ]
1075        return result

Get the metadata for all files in the root directory.

allspice.allspice.io/api/swagger#/repository/repoGetContentsList">https://huballspice.allspice.io/api/swagger#/repository/repoGetContentsList

Parameters
  • ref: branch or commit to get content from
  • commit: commit to get content from (deprecated)
def get_tree( self, ref: Union[Branch, Commit, str, NoneType] = None, recursive: bool = False) -> List[allspice.apiobject.GitEntry]:
1077    def get_tree(self, ref: Optional[Ref] = None, recursive: bool = False) -> List[GitEntry]:
1078        """
1079        Get the repository's tree on a given ref.
1080
1081        By default, this will only return the top-level entries in the tree. If you want
1082        to get the entire tree, set `recursive` to True.
1083
1084        :param ref: The ref to get the tree from. If not provided, the default branch is used.
1085        :param recursive: Whether to get the entire tree or just the top-level entries.
1086        """
1087
1088        ref = Util.data_params_for_ref(ref).get("ref", self.default_branch)
1089        url = self.REPO_GET_TREE.format(owner=self.owner.username, repo=self.name, ref=ref)
1090        params = {"recursive": recursive}
1091        results = self.allspice_client.requests_get_paginated(url, params=params)
1092        return [GitEntry.parse_response(self.allspice_client, result) for result in results]

Get the repository's tree on a given ref.

By default, this will only return the top-level entries in the tree. If you want to get the entire tree, set recursive to True.

Parameters
  • ref: The ref to get the tree from. If not provided, the default branch is used.
  • recursive: Whether to get the entire tree or just the top-level entries.
def get_file_content( self, content: Content, ref: Union[Branch, Commit, str, NoneType] = None, commit: Optional[Commit] = None) -> Union[str, List[Content]]:
1094    def get_file_content(
1095        self,
1096        content: Content,
1097        ref: Optional[Ref] = None,
1098        commit: Optional[Commit] = None,
1099    ) -> Union[str, List["Content"]]:
1100        """https://hub.allspice.io/api/swagger#/repository/repoGetContents"""
1101        url = f"/repos/{self.owner.username}/{self.name}/contents/{content.path}"
1102        data = Util.data_params_for_ref(ref or commit)
1103
1104        if content.type == Content.FILE:
1105            return self.allspice_client.requests_get(url, data)["content"]
1106        else:
1107            return [
1108                Content.parse_response(self.allspice_client, f)
1109                for f in self.allspice_client.requests_get(url, data)
1110            ]

allspice.allspice.io/api/swagger#/repository/repoGetContents">https://huballspice.allspice.io/api/swagger#/repository/repoGetContents

def get_raw_file( self, file_path: str, ref: Union[Branch, Commit, str, NoneType] = None) -> bytes:
1112    def get_raw_file(
1113        self,
1114        file_path: str,
1115        ref: Optional[Ref] = None,
1116    ) -> bytes:
1117        """
1118        Get the raw, binary data of a single file.
1119
1120        Note 1: if the file you are requesting is a text file, you might want to
1121        use .decode() on the result to get a string. For example:
1122
1123            content = repo.get_raw_file("file.txt").decode("utf-8")
1124
1125        Note 2: this method will store the entire file in memory. If you want
1126        to download a large file, you might want to use `download_to_file`
1127        instead.
1128
1129        See https://hub.allspice.io/api/swagger#/repository/repoGetRawFile
1130
1131        :param file_path: The path to the file to get.
1132        :param ref: The branch or commit to get the file from.  If not provided,
1133            the default branch is used.
1134        """
1135
1136        url = self.REPO_GET_RAW_FILE.format(
1137            owner=self.owner.username,
1138            repo=self.name,
1139            path=file_path,
1140        )
1141        params = Util.data_params_for_ref(ref)
1142        return self.allspice_client.requests_get_raw(url, params=params)

Get the raw, binary data of a single file.

Note 1: if the file you are requesting is a text file, you might want to use .decode() on the result to get a string. For example:

content = repo.get_raw_file("file.txt").decode("utf-8")

Note 2: this method will store the entire file in memory. If you want to download a large file, you might want to use download_to_file instead.

See allspice.allspice.io/api/swagger#/repository/repoGetRawFile">https://huballspice.allspice.io/api/swagger#/repository/repoGetRawFile

Parameters
  • file_path: The path to the file to get.
  • ref: The branch or commit to get the file from. If not provided, the default branch is used.
def download_to_file( self, file_path: str, io: <class 'IO'>, ref: Union[Branch, Commit, str, NoneType] = None) -> None:
1144    def download_to_file(
1145        self,
1146        file_path: str,
1147        io: IO,
1148        ref: Optional[Ref] = None,
1149    ) -> None:
1150        """
1151        Download the binary data of a file to a file-like object.
1152
1153        Example:
1154
1155            with open("schematic.DSN", "wb") as f:
1156                Repository.download_to_file("Schematics/my_schematic.DSN", f)
1157
1158        :param file_path: The path to the file in the repository from the root
1159            of the repository.
1160        :param io: The file-like object to write the data to.
1161        """
1162
1163        url = self.allspice_client._AllSpice__get_url(
1164            self.REPO_GET_RAW_FILE.format(
1165                owner=self.owner.username,
1166                repo=self.name,
1167                path=file_path,
1168            )
1169        )
1170        params = Util.data_params_for_ref(ref)
1171        response = self.allspice_client.requests.get(
1172            url,
1173            params=params,
1174            headers=self.allspice_client.headers,
1175            stream=True,
1176        )
1177
1178        for chunk in response.iter_content(chunk_size=4096):
1179            if chunk:
1180                io.write(chunk)

Download the binary data of a file to a file-like object.

Example:

with open("schematic.DSN", "wb") as f:
    Repository.download_to_file("Schematics/my_schematic.DSN", f)
Parameters
  • file_path: The path to the file in the repository from the root of the repository.
  • io: The file-like object to write the data to.
def get_generated_json( self, content: Union[Content, str], ref: Union[Branch, Commit, str, NoneType] = None) -> dict:
1182    def get_generated_json(self, content: Union[Content, str], ref: Optional[Ref] = None) -> dict:
1183        """
1184        Get the json blob for a cad file if it exists, otherwise enqueue
1185        a new job and return a 503 status.
1186
1187        WARNING: This is still experimental and not recommended for critical
1188        applications. The structure and content of the returned dictionary can
1189        change at any time.
1190
1191        See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON
1192        """
1193
1194        if isinstance(content, Content):
1195            content = content.path
1196
1197        url = self.REPO_GET_ALLSPICE_JSON.format(
1198            owner=self.owner.username,
1199            repo=self.name,
1200            content=content,
1201        )
1202        data = Util.data_params_for_ref(ref)
1203        return self.allspice_client.requests_get(url, data)

Get the json blob for a cad file if it exists, otherwise enqueue a new job and return a 503 status.

WARNING: This is still experimental and not recommended for critical applications. The structure and content of the returned dictionary can change at any time.

See allspice.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON">https://huballspice.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON

def get_generated_svg( self, content: Union[Content, str], ref: Union[Branch, Commit, str, NoneType] = None) -> bytes:
1205    def get_generated_svg(self, content: Union[Content, str], ref: Optional[Ref] = None) -> bytes:
1206        """
1207        Get the svg blob for a cad file if it exists, otherwise enqueue
1208        a new job and return a 503 status.
1209
1210        WARNING: This is still experimental and not yet recommended for
1211        critical applications. The content of the returned svg can change
1212        at any time.
1213
1214        See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG
1215        """
1216
1217        if isinstance(content, Content):
1218            content = content.path
1219
1220        url = self.REPO_GET_ALLSPICE_SVG.format(
1221            owner=self.owner.username,
1222            repo=self.name,
1223            content=content,
1224        )
1225        data = Util.data_params_for_ref(ref)
1226        return self.allspice_client.requests_get_raw(url, data)

Get the svg blob for a cad file if it exists, otherwise enqueue a new job and return a 503 status.

WARNING: This is still experimental and not yet recommended for critical applications. The content of the returned svg can change at any time.

See allspice.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG">https://huballspice.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG

def create_file(self, file_path: str, content: str, data: Optional[dict] = None):
1228    def create_file(self, file_path: str, content: str, data: Optional[dict] = None):
1229        """https://hub.allspice.io/api/swagger#/repository/repoCreateFile"""
1230        if not data:
1231            data = {}
1232        url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}"
1233        data.update({"content": content})
1234        return self.allspice_client.requests_post(url, data)

allspice.allspice.io/api/swagger#/repository/repoCreateFile">https://huballspice.allspice.io/api/swagger#/repository/repoCreateFile

def change_file( self, file_path: str, file_sha: str, content: str, data: Optional[dict] = None):
1236    def change_file(self, file_path: str, file_sha: str, content: str, data: Optional[dict] = None):
1237        """https://hub.allspice.io/api/swagger#/repository/repoCreateFile"""
1238        if not data:
1239            data = {}
1240        url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}"
1241        data.update({"sha": file_sha, "content": content})
1242        return self.allspice_client.requests_put(url, data)

allspice.allspice.io/api/swagger#/repository/repoCreateFile">https://huballspice.allspice.io/api/swagger#/repository/repoCreateFile

def delete_file(self, file_path: str, file_sha: str, data: Optional[dict] = None):
1244    def delete_file(self, file_path: str, file_sha: str, data: Optional[dict] = None):
1245        """https://hub.allspice.io/api/swagger#/repository/repoDeleteFile"""
1246        if not data:
1247            data = {}
1248        url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}"
1249        data.update({"sha": file_sha})
1250        return self.allspice_client.requests_delete(url, data)

allspice.allspice.io/api/swagger#/repository/repoDeleteFile">https://huballspice.allspice.io/api/swagger#/repository/repoDeleteFile

def get_archive( self, ref: Union[Branch, Commit, str] = 'main', archive_format: Repository.ArchiveFormat = <ArchiveFormat.ZIP: 'zip'>) -> bytes:
1252    def get_archive(
1253        self,
1254        ref: Ref = "main",
1255        archive_format: ArchiveFormat = ArchiveFormat.ZIP,
1256    ) -> bytes:
1257        """
1258        Download all the files in a specific ref of a repository as a zip or tarball
1259        archive.
1260
1261        https://hub.allspice.io/api/swagger#/repository/repoGetArchive
1262
1263        :param ref: branch or commit to get content from, defaults to the "main" branch
1264        :param archive_format: zip or tar, defaults to zip
1265        """
1266
1267        ref_string = Util.data_params_for_ref(ref)["ref"]
1268        url = self.REPO_GET_ARCHIVE.format(
1269            owner=self.owner.username,
1270            repo=self.name,
1271            ref=ref_string,
1272            format=archive_format.value,
1273        )
1274        return self.allspice_client.requests_get_raw(url)

Download all the files in a specific ref of a repository as a zip or tarball archive.

allspice.allspice.io/api/swagger#/repository/repoGetArchive">https://huballspice.allspice.io/api/swagger#/repository/repoGetArchive

Parameters
  • ref: branch or commit to get content from, defaults to the "main" branch
  • archive_format: zip or tar, defaults to zip
def get_topics(self) -> list[str]:
1276    def get_topics(self) -> list[str]:
1277        """
1278        Gets the list of topics on this repository.
1279
1280        See http://localhost:3000/api/swagger#/repository/repoListTopics
1281        """
1282
1283        url = self.REPO_GET_TOPICS.format(
1284            owner=self.owner.username,
1285            repo=self.name,
1286        )
1287        return self.allspice_client.requests_get(url)["topics"]

Gets the list of topics on this repository.

See http://localhost:3000/api/swagger#/repository/repoListTopics

def add_topic(self, topic: str):
1289    def add_topic(self, topic: str):
1290        """
1291        Adds a topic to the repository.
1292
1293        See https://hub.allspice.io/api/swagger#/repository/repoAddTopic
1294
1295        :param topic: The topic to add. Topic names must consist only of
1296            lowercase letters, numnbers and dashes (-), and cannot start with
1297            dashes. Topic names also must be under 35 characters long.
1298        """
1299
1300        url = self.REPO_ADD_TOPIC.format(owner=self.owner.username, repo=self.name, topic=topic)
1301        self.allspice_client.requests_put(url)

Adds a topic to the repository.

See allspice.allspice.io/api/swagger#/repository/repoAddTopic">https://huballspice.allspice.io/api/swagger#/repository/repoAddTopic

Parameters
  • topic: The topic to add. Topic names must consist only of lowercase letters, numnbers and dashes (-), and cannot start with dashes. Topic names also must be under 35 characters long.
def create_release( self, tag_name: str, name: Optional[str] = None, body: Optional[str] = None, draft: bool = False):
1303    def create_release(
1304        self,
1305        tag_name: str,
1306        name: Optional[str] = None,
1307        body: Optional[str] = None,
1308        draft: bool = False,
1309    ):
1310        """
1311        Create a release for this repository. The release will be created for
1312        the tag with the given name. If there is no tag with this name, create
1313        the tag first.
1314
1315        See https://hub.allspice.io/api/swagger#/repository/repoCreateRelease
1316        """
1317
1318        url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name)
1319        data = {
1320            "tag_name": tag_name,
1321            "draft": draft,
1322        }
1323        if name is not None:
1324            data["name"] = name
1325        if body is not None:
1326            data["body"] = body
1327        response = self.allspice_client.requests_post(url, data)
1328        return Release.parse_response(self.allspice_client, response, self)

Create a release for this repository. The release will be created for the tag with the given name. If there is no tag with this name, create the tag first.

See allspice.allspice.io/api/swagger#/repository/repoCreateRelease">https://huballspice.allspice.io/api/swagger#/repository/repoCreateRelease

def get_releases( self, draft: Optional[bool] = None, pre_release: Optional[bool] = None) -> List[Release]:
1330    def get_releases(
1331        self, draft: Optional[bool] = None, pre_release: Optional[bool] = None
1332    ) -> List[Release]:
1333        """
1334        Get the list of releases for this repository.
1335
1336        See https://hub.allspice.io/api/swagger#/repository/repoListReleases
1337        """
1338
1339        data = {}
1340
1341        if draft is not None:
1342            data["draft"] = draft
1343        if pre_release is not None:
1344            data["pre-release"] = pre_release
1345
1346        url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name)
1347        responses = self.allspice_client.requests_get_paginated(url, params=data)
1348
1349        return [
1350            Release.parse_response(self.allspice_client, response, self) for response in responses
1351        ]

Get the list of releases for this repository.

See allspice.allspice.io/api/swagger#/repository/repoListReleases">https://huballspice.allspice.io/api/swagger#/repository/repoListReleases

def get_latest_release(self) -> Release:
1353    def get_latest_release(self) -> Release:
1354        """
1355        Get the latest release for this repository.
1356
1357        See https://hub.allspice.io/api/swagger#/repository/repoGetLatestRelease
1358        """
1359
1360        url = self.REPO_GET_LATEST_RELEASE.format(owner=self.owner.username, repo=self.name)
1361        response = self.allspice_client.requests_get(url)
1362        release = Release.parse_response(self.allspice_client, response, self)
1363        return release

Get the latest release for this repository.

See allspice.allspice.io/api/swagger#/repository/repoGetLatestRelease">https://huballspice.allspice.io/api/swagger#/repository/repoGetLatestRelease

def get_release_by_tag(self, tag: str) -> Release:
1365    def get_release_by_tag(self, tag: str) -> Release:
1366        """
1367        Get a release by its tag.
1368
1369        See https://hub.allspice.io/api/swagger#/repository/repoGetReleaseByTag
1370        """
1371
1372        url = self.REPO_GET_RELEASE_BY_TAG.format(
1373            owner=self.owner.username, repo=self.name, tag=tag
1374        )
1375        response = self.allspice_client.requests_get(url)
1376        release = Release.parse_response(self.allspice_client, response, self)
1377        return release

Get a release by its tag.

See allspice.allspice.io/api/swagger#/repository/repoGetReleaseByTag">https://huballspice.allspice.io/api/swagger#/repository/repoGetReleaseByTag

def get_commit_statuses( self, commit: Union[str, Commit], sort: Optional[Repository.CommitStatusSort] = None, state: Optional[allspice.apiobject.CommitStatusState] = None) -> List[allspice.apiobject.CommitStatus]:
1379    def get_commit_statuses(
1380        self,
1381        commit: Union[str, Commit],
1382        sort: Optional[CommitStatusSort] = None,
1383        state: Optional[CommitStatusState] = None,
1384    ) -> List[CommitStatus]:
1385        """
1386        Get a list of statuses for a commit.
1387
1388        This is roughly equivalent to the Commit.get_statuses method, but this
1389        method allows you to sort and filter commits and is more convenient if
1390        you have a commit SHA and don't need to get the commit itself.
1391
1392        See https://hub.allspice.io/api/swagger#/repository/repoListStatuses
1393        """
1394
1395        if isinstance(commit, Commit):
1396            commit = commit.sha
1397
1398        params = {}
1399        if sort is not None:
1400            params["sort"] = sort.value
1401        if state is not None:
1402            params["state"] = state.value
1403
1404        url = self.REPO_GET_COMMIT_STATUS.format(
1405            owner=self.owner.username, repo=self.name, sha=commit
1406        )
1407        response = self.allspice_client.requests_get_paginated(url, params=params)
1408        return [CommitStatus.parse_response(self.allspice_client, status) for status in response]

Get a list of statuses for a commit.

This is roughly equivalent to the Commit.get_statuses method, but this method allows you to sort and filter commits and is more convenient if you have a commit SHA and don't need to get the commit itself.

See allspice.allspice.io/api/swagger#/repository/repoListStatuses">https://huballspice.allspice.io/api/swagger#/repository/repoListStatuses

def create_commit_status( self, commit: Union[str, Commit], context: Optional[str] = None, description: Optional[str] = None, state: Optional[allspice.apiobject.CommitStatusState] = None, target_url: Optional[str] = None) -> allspice.apiobject.CommitStatus:
1410    def create_commit_status(
1411        self,
1412        commit: Union[str, Commit],
1413        context: Optional[str] = None,
1414        description: Optional[str] = None,
1415        state: Optional[CommitStatusState] = None,
1416        target_url: Optional[str] = None,
1417    ) -> CommitStatus:
1418        """
1419        Create a status on a commit.
1420
1421        See https://hub.allspice.io/api/swagger#/repository/repoCreateStatus
1422        """
1423
1424        if isinstance(commit, Commit):
1425            commit = commit.sha
1426
1427        data = {}
1428        if context is not None:
1429            data["context"] = context
1430        if description is not None:
1431            data["description"] = description
1432        if state is not None:
1433            data["state"] = state.value
1434        if target_url is not None:
1435            data["target_url"] = target_url
1436
1437        url = self.REPO_GET_COMMIT_STATUS.format(
1438            owner=self.owner.username, repo=self.name, sha=commit
1439        )
1440        response = self.allspice_client.requests_post(url, data=data)
1441        return CommitStatus.parse_response(self.allspice_client, response)

Create a status on a commit.

See allspice.allspice.io/api/swagger#/repository/repoCreateStatus">https://huballspice.allspice.io/api/swagger#/repository/repoCreateStatus

def delete(self):
1443    def delete(self):
1444        self.allspice_client.requests_delete(
1445            Repository.REPO_DELETE % (self.owner.username, self.name)
1446        )
1447        self.deleted = True
class Repository.ArchiveFormat(enum.Enum):
567    class ArchiveFormat(Enum):
568        """
569        Archive formats for Repository.get_archive
570        """
571
572        TAR = "tar.gz"
573        ZIP = "zip"

Archive formats for Repository.get_archive

TAR = <ArchiveFormat.TAR: 'tar.gz'>
ZIP = <ArchiveFormat.ZIP: 'zip'>
class Repository.CommitStatusSort(enum.Enum):
575    class CommitStatusSort(Enum):
576        """
577        Sort order for Repository.get_commit_status
578        """
579
580        OLDEST = "oldest"
581        RECENT_UPDATE = "recentupdate"
582        LEAST_UPDATE = "leastupdate"
583        LEAST_INDEX = "leastindex"
584        HIGHEST_INDEX = "highestindex"

Sort order for Repository.get_commit_status

OLDEST = <CommitStatusSort.OLDEST: 'oldest'>
RECENT_UPDATE = <CommitStatusSort.RECENT_UPDATE: 'recentupdate'>
LEAST_UPDATE = <CommitStatusSort.LEAST_UPDATE: 'leastupdate'>
LEAST_INDEX = <CommitStatusSort.LEAST_INDEX: 'leastindex'>
HIGHEST_INDEX = <CommitStatusSort.HIGHEST_INDEX: 'highestindex'>
class Team(allspice.baseapiobject.ApiObject):
2276class Team(ApiObject):
2277    can_create_org_repo: bool
2278    description: str
2279    id: int
2280    includes_all_repositories: bool
2281    name: str
2282    organization: Optional["Organization"]
2283    permission: str
2284    units: List[str]
2285    units_map: Dict[str, str]
2286
2287    API_OBJECT = """/teams/{id}"""  # <id>
2288    ADD_REPO = """/teams/%s/repos/%s/%s"""  # <id, org, repo>
2289    TEAM_DELETE = """/teams/%s"""  # <id>
2290    GET_MEMBERS = """/teams/%s/members"""  # <id>
2291    GET_REPOS = """/teams/%s/repos"""  # <id>
2292
2293    def __init__(self, allspice_client):
2294        super().__init__(allspice_client)
2295
2296    def __eq__(self, other):
2297        if not isinstance(other, Team):
2298            return False
2299        return self.organization == other.organization and self.id == other.id
2300
2301    def __hash__(self):
2302        return hash(self.organization) ^ hash(self.id)
2303
2304    _fields_to_parsers: ClassVar[dict] = {
2305        "organization": lambda allspice_client, o: Organization.parse_response(allspice_client, o)
2306    }
2307
2308    _patchable_fields: ClassVar[set[str]] = {
2309        "can_create_org_repo",
2310        "description",
2311        "includes_all_repositories",
2312        "name",
2313        "permission",
2314        "units",
2315        "units_map",
2316    }
2317
2318    @classmethod
2319    def request(cls, allspice_client, id: int):
2320        return cls._request(allspice_client, {"id": id})
2321
2322    def commit(self):
2323        args = {"id": self.id}
2324        self._commit(args)
2325
2326    def add_user(self, user: User):
2327        """https://hub.allspice.io/api/swagger#/organization/orgAddTeamMember"""
2328        url = f"/teams/{self.id}/members/{user.login}"
2329        self.allspice_client.requests_put(url)
2330
2331    def add_repo(self, org: Organization, repo: Union[Repository, str]):
2332        if isinstance(repo, Repository):
2333            repo_name = repo.name
2334        else:
2335            repo_name = repo
2336        self.allspice_client.requests_put(Team.ADD_REPO % (self.id, org.username, repo_name))
2337
2338    def get_members(self):
2339        """Get all users assigned to the team."""
2340        results = self.allspice_client.requests_get_paginated(
2341            Team.GET_MEMBERS % self.id,
2342        )
2343        return [User.parse_response(self.allspice_client, result) for result in results]
2344
2345    def get_repos(self):
2346        """Get all repos of this Team."""
2347        results = self.allspice_client.requests_get(Team.GET_REPOS % self.id)
2348        return [Repository.parse_response(self.allspice_client, result) for result in results]
2349
2350    def delete(self):
2351        self.allspice_client.requests_delete(Team.TEAM_DELETE % self.id)
2352        self.deleted = True
2353
2354    def remove_team_member(self, user_name: str):
2355        url = f"/teams/{self.id}/members/{user_name}"
2356        self.allspice_client.requests_delete(url)
Team(allspice_client)
2293    def __init__(self, allspice_client):
2294        super().__init__(allspice_client)
can_create_org_repo: bool
description: str
id: int
includes_all_repositories: bool
name: str
organization: Optional[Organization]
permission: str
units: List[str]
units_map: Dict[str, str]
API_OBJECT = '/teams/{id}'
ADD_REPO = '/teams/%s/repos/%s/%s'
TEAM_DELETE = '/teams/%s'
GET_MEMBERS = '/teams/%s/members'
GET_REPOS = '/teams/%s/repos'
@classmethod
def request(cls, allspice_client, id: int):
2318    @classmethod
2319    def request(cls, allspice_client, id: int):
2320        return cls._request(allspice_client, {"id": id})
def commit(self):
2322    def commit(self):
2323        args = {"id": self.id}
2324        self._commit(args)
def add_user(self, user: User):
2326    def add_user(self, user: User):
2327        """https://hub.allspice.io/api/swagger#/organization/orgAddTeamMember"""
2328        url = f"/teams/{self.id}/members/{user.login}"
2329        self.allspice_client.requests_put(url)

allspice.allspice.io/api/swagger#/organization/orgAddTeamMember">https://huballspice.allspice.io/api/swagger#/organization/orgAddTeamMember

def add_repo( self, org: Organization, repo: Union[Repository, str]):
2331    def add_repo(self, org: Organization, repo: Union[Repository, str]):
2332        if isinstance(repo, Repository):
2333            repo_name = repo.name
2334        else:
2335            repo_name = repo
2336        self.allspice_client.requests_put(Team.ADD_REPO % (self.id, org.username, repo_name))
def get_members(self):
2338    def get_members(self):
2339        """Get all users assigned to the team."""
2340        results = self.allspice_client.requests_get_paginated(
2341            Team.GET_MEMBERS % self.id,
2342        )
2343        return [User.parse_response(self.allspice_client, result) for result in results]

Get all users assigned to the team.

def get_repos(self):
2345    def get_repos(self):
2346        """Get all repos of this Team."""
2347        results = self.allspice_client.requests_get(Team.GET_REPOS % self.id)
2348        return [Repository.parse_response(self.allspice_client, result) for result in results]

Get all repos of this Team.

def delete(self):
2350    def delete(self):
2351        self.allspice_client.requests_delete(Team.TEAM_DELETE % self.id)
2352        self.deleted = True
def remove_team_member(self, user_name: str):
2354    def remove_team_member(self, user_name: str):
2355        url = f"/teams/{self.id}/members/{user_name}"
2356        self.allspice_client.requests_delete(url)
class User(allspice.baseapiobject.ApiObject):
241class User(ApiObject):
242    active: bool
243    admin: Any
244    allow_create_organization: Any
245    allow_git_hook: Any
246    allow_import_local: Any
247    avatar_url: str
248    created: str
249    description: str
250    email: str
251    emails: List[Any]
252    followers_count: int
253    following_count: int
254    full_name: str
255    html_url: str
256    id: int
257    is_admin: bool
258    language: str
259    last_login: str
260    location: str
261    login: str
262    login_name: str
263    max_repo_creation: Any
264    must_change_password: Any
265    password: Any
266    prohibit_login: bool
267    restricted: bool
268    source_id: int
269    starred_repos_count: int
270    username: str
271    visibility: str
272    website: str
273
274    API_OBJECT = """/users/{name}"""  # <org>
275    USER_MAIL = """/user/emails?sudo=%s"""  # <name>
276    USER_PATCH = """/admin/users/%s"""  # <username>
277    ADMIN_DELETE_USER = """/admin/users/%s"""  # <username>
278    ADMIN_EDIT_USER = """/admin/users/{username}"""  # <username>
279    USER_HEATMAP = """/users/%s/heatmap"""  # <username>
280
281    def __init__(self, allspice_client):
282        super().__init__(allspice_client)
283        self._emails = []
284
285    def __eq__(self, other):
286        if not isinstance(other, User):
287            return False
288        return self.allspice_client == other.allspice_client and self.id == other.id
289
290    def __hash__(self):
291        return hash(self.allspice_client) ^ hash(self.id)
292
293    @property
294    def emails(self):
295        self.__request_emails()
296        return self._emails
297
298    @classmethod
299    def request(cls, allspice_client, name: str) -> "User":
300        api_object = cls._request(allspice_client, {"name": name})
301        return api_object
302
303    _patchable_fields: ClassVar[set[str]] = {
304        "active",
305        "admin",
306        "allow_create_organization",
307        "allow_git_hook",
308        "allow_import_local",
309        "email",
310        "full_name",
311        "location",
312        "login_name",
313        "max_repo_creation",
314        "must_change_password",
315        "password",
316        "prohibit_login",
317        "website",
318    }
319
320    def commit(self, login_name: str, source_id: int = 0):
321        """
322        Unfortunately it is necessary to require the login name
323        as well as the login source (that is not supplied when getting a user) for
324        changing a user.
325        Usually source_id is 0 and the login_name is equal to the username.
326        """
327        values = self.get_dirty_fields()
328        values.update(
329            # api-doc says that the "source_id" is necessary; works without though
330            {"login_name": login_name, "source_id": source_id}
331        )
332        args = {"username": self.username}
333        self.allspice_client.requests_patch(User.ADMIN_EDIT_USER.format(**args), data=values)
334        self._dirty_fields = {}
335
336    def create_repo(
337        self,
338        repoName: str,
339        description: str = "",
340        private: bool = False,
341        autoInit=True,
342        gitignores: Optional[str] = None,
343        license: Optional[str] = None,
344        readme: str = "Default",
345        issue_labels: Optional[str] = None,
346        default_branch="master",
347    ):
348        """Create a user Repository
349
350        Throws:
351            AlreadyExistsException: If the Repository exists already.
352            Exception: If something else went wrong.
353        """
354        result = self.allspice_client.requests_post(
355            "/user/repos",
356            data={
357                "name": repoName,
358                "description": description,
359                "private": private,
360                "auto_init": autoInit,
361                "gitignores": gitignores,
362                "license": license,
363                "issue_labels": issue_labels,
364                "readme": readme,
365                "default_branch": default_branch,
366            },
367        )
368        if "id" in result:
369            self.allspice_client.logger.info("Successfully created Repository %s " % result["name"])
370        else:
371            self.allspice_client.logger.error(result["message"])
372            raise Exception("Repository not created... (gitea: %s)" % result["message"])
373        return Repository.parse_response(self.allspice_client, result)
374
375    def get_repositories(self) -> List["Repository"]:
376        """Get all Repositories owned by this User."""
377        url = f"/users/{self.username}/repos"
378        results = self.allspice_client.requests_get_paginated(url)
379        return [Repository.parse_response(self.allspice_client, result) for result in results]
380
381    def get_orgs(self) -> List[Organization]:
382        """Get all Organizations this user is a member of."""
383        url = f"/users/{self.username}/orgs"
384        results = self.allspice_client.requests_get_paginated(url)
385        return [Organization.parse_response(self.allspice_client, result) for result in results]
386
387    def get_teams(self) -> List["Team"]:
388        url = "/user/teams"
389        results = self.allspice_client.requests_get_paginated(url, sudo=self)
390        return [Team.parse_response(self.allspice_client, result) for result in results]
391
392    def get_accessible_repos(self) -> List["Repository"]:
393        """Get all Repositories accessible by the logged in User."""
394        results = self.allspice_client.requests_get("/user/repos", sudo=self)
395        return [Repository.parse_response(self.allspice_client, result) for result in results]
396
397    def __request_emails(self):
398        result = self.allspice_client.requests_get(User.USER_MAIL % self.login)
399        # report if the adress changed by this
400        for mail in result:
401            self._emails.append(mail["email"])
402            if mail["primary"]:
403                self._email = mail["email"]
404
405    def delete(self):
406        """Deletes this User. Also deletes all Repositories he owns."""
407        self.allspice_client.requests_delete(User.ADMIN_DELETE_USER % self.username)
408        self.deleted = True
409
410    def get_heatmap(self) -> List[Tuple[datetime, int]]:
411        results = self.allspice_client.requests_get(User.USER_HEATMAP % self.username)
412        results = [
413            (datetime.fromtimestamp(result["timestamp"]), result["contributions"])
414            for result in results
415        ]
416        return results
User(allspice_client)
281    def __init__(self, allspice_client):
282        super().__init__(allspice_client)
283        self._emails = []
active: bool
admin: Any
allow_create_organization: Any
allow_git_hook: Any
allow_import_local: Any
avatar_url: str
created: str
description: str
email: str
emails
293    @property
294    def emails(self):
295        self.__request_emails()
296        return self._emails
followers_count: int
following_count: int
full_name: str
html_url: str
id: int
is_admin: bool
language: str
last_login: str
location: str
login: str
login_name: str
max_repo_creation: Any
must_change_password: Any
password: Any
prohibit_login: bool
restricted: bool
source_id: int
starred_repos_count: int
username: str
visibility: str
website: str
API_OBJECT = '/users/{name}'
USER_MAIL = '/user/emails?sudo=%s'
USER_PATCH = '/admin/users/%s'
ADMIN_DELETE_USER = '/admin/users/%s'
ADMIN_EDIT_USER = '/admin/users/{username}'
USER_HEATMAP = '/users/%s/heatmap'
@classmethod
def request(cls, allspice_client, name: str) -> User:
298    @classmethod
299    def request(cls, allspice_client, name: str) -> "User":
300        api_object = cls._request(allspice_client, {"name": name})
301        return api_object
def commit(self, login_name: str, source_id: int = 0):
320    def commit(self, login_name: str, source_id: int = 0):
321        """
322        Unfortunately it is necessary to require the login name
323        as well as the login source (that is not supplied when getting a user) for
324        changing a user.
325        Usually source_id is 0 and the login_name is equal to the username.
326        """
327        values = self.get_dirty_fields()
328        values.update(
329            # api-doc says that the "source_id" is necessary; works without though
330            {"login_name": login_name, "source_id": source_id}
331        )
332        args = {"username": self.username}
333        self.allspice_client.requests_patch(User.ADMIN_EDIT_USER.format(**args), data=values)
334        self._dirty_fields = {}

Unfortunately it is necessary to require the login name as well as the login source (that is not supplied when getting a user) for changing a user. Usually source_id is 0 and the login_name is equal to the username.

def create_repo( self, repoName: str, description: str = '', private: bool = False, autoInit=True, gitignores: Optional[str] = None, license: Optional[str] = None, readme: str = 'Default', issue_labels: Optional[str] = None, default_branch='master'):
336    def create_repo(
337        self,
338        repoName: str,
339        description: str = "",
340        private: bool = False,
341        autoInit=True,
342        gitignores: Optional[str] = None,
343        license: Optional[str] = None,
344        readme: str = "Default",
345        issue_labels: Optional[str] = None,
346        default_branch="master",
347    ):
348        """Create a user Repository
349
350        Throws:
351            AlreadyExistsException: If the Repository exists already.
352            Exception: If something else went wrong.
353        """
354        result = self.allspice_client.requests_post(
355            "/user/repos",
356            data={
357                "name": repoName,
358                "description": description,
359                "private": private,
360                "auto_init": autoInit,
361                "gitignores": gitignores,
362                "license": license,
363                "issue_labels": issue_labels,
364                "readme": readme,
365                "default_branch": default_branch,
366            },
367        )
368        if "id" in result:
369            self.allspice_client.logger.info("Successfully created Repository %s " % result["name"])
370        else:
371            self.allspice_client.logger.error(result["message"])
372            raise Exception("Repository not created... (gitea: %s)" % result["message"])
373        return Repository.parse_response(self.allspice_client, result)

Create a user Repository

Throws: AlreadyExistsException: If the Repository exists already. Exception: If something else went wrong.

def get_repositories(self) -> List[Repository]:
375    def get_repositories(self) -> List["Repository"]:
376        """Get all Repositories owned by this User."""
377        url = f"/users/{self.username}/repos"
378        results = self.allspice_client.requests_get_paginated(url)
379        return [Repository.parse_response(self.allspice_client, result) for result in results]

Get all Repositories owned by this User.

def get_orgs(self) -> List[Organization]:
381    def get_orgs(self) -> List[Organization]:
382        """Get all Organizations this user is a member of."""
383        url = f"/users/{self.username}/orgs"
384        results = self.allspice_client.requests_get_paginated(url)
385        return [Organization.parse_response(self.allspice_client, result) for result in results]

Get all Organizations this user is a member of.

def get_teams(self) -> List[Team]:
387    def get_teams(self) -> List["Team"]:
388        url = "/user/teams"
389        results = self.allspice_client.requests_get_paginated(url, sudo=self)
390        return [Team.parse_response(self.allspice_client, result) for result in results]
def get_accessible_repos(self) -> List[Repository]:
392    def get_accessible_repos(self) -> List["Repository"]:
393        """Get all Repositories accessible by the logged in User."""
394        results = self.allspice_client.requests_get("/user/repos", sudo=self)
395        return [Repository.parse_response(self.allspice_client, result) for result in results]

Get all Repositories accessible by the logged in User.

def delete(self):
405    def delete(self):
406        """Deletes this User. Also deletes all Repositories he owns."""
407        self.allspice_client.requests_delete(User.ADMIN_DELETE_USER % self.username)
408        self.deleted = True

Deletes this User. Also deletes all Repositories he owns.

def get_heatmap(self) -> List[Tuple[datetime.datetime, int]]:
410    def get_heatmap(self) -> List[Tuple[datetime, int]]:
411        results = self.allspice_client.requests_get(User.USER_HEATMAP % self.username)
412        results = [
413            (datetime.fromtimestamp(result["timestamp"]), result["contributions"])
414            for result in results
415        ]
416        return results