allspice

A very simple API client for AllSpice Hub

Note that not the full Swagger-API is accessible. The whole implementation is focused on making access and working with Organizations, Teams, Repositories and Users as pain free as possible.

Forked from https://github.com/Langenfeld/py-gitea.

Usage

Docs

See the documentation site.

Examples

Check the examples directory for full, working example scripts that you can adapt or refer to for your own needs.

Quickstart

First get an allspice_client object wrapping access and authentication (via an api token) for your instance of AllSpice Hub.

from allspice import *

# By default, points to hub.allspice.io.
allspice_client = AllSpice(token_text=TOKEN)

# If you are self-hosting:
allspice_client = AllSpice(allspice_hub_url=URL, token_text=TOKEN)

Operations like requesting the AllSpice version or authentication user can be requested directly from the allspice_client object:

print("AllSpice Version: " + allspice_client.get_version())
print("API-Token belongs to user: " + allspice_client.get_user().username)

Adding entities like Users, Organizations, ... also is done via the allspice_client object.

user = allspice_client.create_user("Test Testson", "test@test.test", "password")

All operations on entities in allspice are then accomplished via the according wrapper objects for those entities. Each of those objects has a .request method that creates an entity according to your allspice_client instance.

other_user = User.request(allspice_client, "OtherUserName")
print(other_user.username)

Note that the fields of the User, Organization,... classes are dynamically created at runtime, and thus not visible during divelopment. Refer to the AllSpice API documentation for the fields names.

Fields that can not be altered via allspice-api, are read only. After altering a field, the .commit method of the according object must be called to synchronize the changed fields with your allspice_client instance.

org = Organization.request(allspice_client, test_org)
org.description = "some new description"
org.location = "some new location"
org.commit()

An entity in allspice can be deleted by calling delete.

org.delete()

All entity objects do have methods to execute some of the requests possible though the AllSpice api:

org = Organization.request(allspice_client, ORGNAME)
teams = org.get_teams()
for team in teams:
    repos = team.get_repos()
    for repo in repos:
        print(repo.name)

 1"""
 2.. include:: ../README.md
 3   :start-line: 1
 4   :end-before: Installation
 5"""
 6
 7from .allspice import (
 8    AllSpice,
 9)
10from .apiobject import (
11    Branch,
12    Comment,
13    Commit,
14    Content,
15    DesignReview,
16    Issue,
17    Milestone,
18    Organization,
19    Release,
20    Repository,
21    Team,
22    User,
23)
24from .exceptions import AlreadyExistsException, NotFoundException
25
26__version__ = "3.9.0"
27
28__all__ = [
29    "AllSpice",
30    "AlreadyExistsException",
31    "Branch",
32    "Comment",
33    "Commit",
34    "Content",
35    "DesignReview",
36    "Issue",
37    "Milestone",
38    "NotFoundException",
39    "Organization",
40    "Release",
41    "Repository",
42    "Team",
43    "User",
44]
class AllSpice:
 21class AllSpice:
 22    """Object to establish a session with AllSpice Hub."""
 23
 24    ADMIN_CREATE_USER = """/admin/users"""
 25    GET_USERS_ADMIN = """/admin/users"""
 26    ADMIN_REPO_CREATE = """/admin/users/%s/repos"""  # <ownername>
 27    ALLSPICE_HUB_VERSION = """/version"""
 28    GET_USER = """/user"""
 29    GET_REPOSITORY = """/repos/{owner}/{name}"""
 30    CREATE_ORG = """/admin/users/%s/orgs"""  # <username>
 31    CREATE_TEAM = """/orgs/%s/teams"""  # <orgname>
 32
 33    def __init__(
 34        self,
 35        allspice_hub_url="https://hub.allspice.io",
 36        token_text=None,
 37        auth=None,
 38        verify=True,
 39        log_level="INFO",
 40        ratelimiting=(100, 60),
 41    ):
 42        """Initializing an instance of the AllSpice Hub Client
 43
 44        Args:
 45            allspice_hub_url (str): The URL for the AllSpice Hub instance.
 46                Defaults to `https://hub.allspice.io`.
 47
 48            token_text (str, None): The access token, by default None.
 49
 50            auth (tuple, None): The user credentials
 51                `(username, password)`, by default None.
 52
 53            verify (bool): If True, allow insecure server connections
 54                when using SSL.
 55
 56            log_level (str): The log level, by default `INFO`.
 57
 58            ratelimiting (tuple[int, int], None): `(max_calls, period)`,
 59                If None, no rate limiting is applied. By default, 100 calls
 60                per minute are allowed.
 61        """
 62
 63        self.logger = logging.getLogger(__name__)
 64        handler = logging.StreamHandler(sys.stderr)
 65        handler.setFormatter(
 66            logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
 67        )
 68        self.logger.addHandler(handler)
 69        self.logger.setLevel(log_level)
 70        self.headers = {
 71            "Content-type": "application/json",
 72        }
 73        self.url = allspice_hub_url
 74
 75        if ratelimiting is None:
 76            self.requests = requests.Session()
 77        else:
 78            (max_calls, period) = ratelimiting
 79            self.requests = RateLimitedSession(max_calls=max_calls, period=period)
 80
 81        # Manage authentification
 82        if not token_text and not auth:
 83            raise ValueError("Please provide auth or token_text, but not both")
 84        if token_text:
 85            self.headers["Authorization"] = "token " + token_text
 86        if auth:
 87            self.logger.warning(
 88                "Using basic auth is not recommended. Prefer using a token instead."
 89            )
 90            self.requests.auth = auth
 91
 92        # Manage SSL certification verification
 93        self.requests.verify = verify
 94        if not verify:
 95            urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
 96
 97    def __get_url(self, endpoint):
 98        url = self.url + "/api/v1" + endpoint
 99        self.logger.debug("Url: %s" % url)
100        return url
101
102    def __get(self, endpoint: str, params: Mapping = frozendict()) -> requests.Response:
103        request = self.requests.get(self.__get_url(endpoint), headers=self.headers, params=params)
104        if request.status_code not in [200, 201]:
105            message = f"Received status code: {request.status_code} ({request.url})"
106            if request.status_code in [404]:
107                raise NotFoundException(message)
108            if request.status_code in [403]:
109                raise Exception(
110                    f"Unauthorized: {request.url} - Check your permissions and try again! ({message})"
111                )
112            if request.status_code in [409]:
113                raise ConflictException(message)
114            if request.status_code in [503]:
115                raise NotYetGeneratedException(message)
116            raise Exception(message)
117        return request
118
119    @staticmethod
120    def parse_result(result) -> Dict:
121        """Parses the result-JSON to a dict."""
122        if result.text and len(result.text) > 3:
123            return json.loads(result.text)
124        return {}
125
126    def requests_get(self, endpoint: str, params: Mapping = frozendict(), sudo=None):
127        combined_params = {}
128        combined_params.update(params)
129        if sudo:
130            combined_params["sudo"] = sudo.username
131        return self.parse_result(self.__get(endpoint, combined_params))
132
133    def requests_get_raw(self, endpoint: str, params=frozendict(), sudo=None) -> bytes:
134        combined_params = {}
135        combined_params.update(params)
136        if sudo:
137            combined_params["sudo"] = sudo.username
138        return self.__get(endpoint, combined_params).content
139
140    def requests_get_paginated(
141        self,
142        endpoint: str,
143        params=frozendict(),
144        sudo=None,
145        page_key: str = "page",
146        first_page: int = 1,
147    ):
148        page = first_page
149        combined_params = {}
150        combined_params.update(params)
151        aggregated_result = []
152        while True:
153            combined_params[page_key] = page
154            result = self.requests_get(endpoint, combined_params, sudo)
155
156            if not result:
157                return aggregated_result
158
159            if isinstance(result, dict):
160                if "data" in result:
161                    data = result["data"]
162                    if len(data) == 0:
163                        return aggregated_result
164                    aggregated_result.extend(data)
165                elif "tree" in result:
166                    data = result["tree"]
167                    if data is None or len(data) == 0:
168                        return aggregated_result
169                    aggregated_result.extend(data)
170                else:
171                    raise NotImplementedError(
172                        "requests_get_paginated does not know how to handle responses of this type."
173                    )
174            else:
175                aggregated_result.extend(result)
176
177            page += 1
178
179    def requests_put(self, endpoint: str, data: Optional[dict] = None):
180        if not data:
181            data = {}
182        request = self.requests.put(
183            self.__get_url(endpoint), headers=self.headers, data=json.dumps(data)
184        )
185        if request.status_code not in [200, 204]:
186            message = f"Received status code: {request.status_code} ({request.url}) {request.text}"
187            self.logger.error(message)
188            raise Exception(message)
189
190    def requests_delete(self, endpoint: str, data: Optional[dict] = None):
191        request = self.requests.delete(
192            self.__get_url(endpoint), headers=self.headers, data=json.dumps(data)
193        )
194        if request.status_code not in [200, 204]:
195            message = f"Received status code: {request.status_code} ({request.url})"
196            self.logger.error(message)
197            raise Exception(message)
198
199    def requests_post(
200        self,
201        endpoint: str,
202        data: Optional[dict] = None,
203        params: Optional[dict] = None,
204        files: Optional[dict] = None,
205    ):
206        """
207        Make a POST call to the endpoint.
208
209        :param endpoint: The path to the endpoint
210        :param data: A dictionary for JSON data
211        :param params: A dictionary of query params
212        :param files: A dictionary of files, see requests.post. Using both files and data
213                      can lead to unexpected results!
214        :return: The JSON response parsed as a dict
215        """
216
217        # This should ideally be a TypedDict of the type of arguments taken by
218        # `requests.post`.
219        args: dict[str, Any] = {
220            "headers": self.headers.copy(),
221        }
222        if data is not None:
223            args["data"] = json.dumps(data)
224        if params is not None:
225            args["params"] = params
226        if files is not None:
227            args["headers"].pop("Content-type")
228            args["files"] = files
229
230        request = self.requests.post(self.__get_url(endpoint), **args)
231
232        if request.status_code not in [200, 201, 202]:
233            if "already exists" in request.text or "e-mail already in use" in request.text:
234                self.logger.warning(request.text)
235                raise AlreadyExistsException()
236            self.logger.error(f"Received status code: {request.status_code} ({request.url})")
237            self.logger.error(f"With info: {data} ({self.headers})")
238            self.logger.error(f"Answer: {request.text}")
239            raise Exception(
240                f"Received status code: {request.status_code} ({request.url}), {request.text}"
241            )
242        return self.parse_result(request)
243
244    def requests_patch(self, endpoint: str, data: dict):
245        request = self.requests.patch(
246            self.__get_url(endpoint), headers=self.headers, data=json.dumps(data)
247        )
248        if request.status_code not in [200, 201]:
249            error_message = f"Received status code: {request.status_code} ({request.url}) {data}"
250            self.logger.error(error_message)
251            raise Exception(error_message)
252        return self.parse_result(request)
253
254    def get_orgs_public_members_all(self, orgname):
255        path = "/orgs/" + orgname + "/public_members"
256        return self.requests_get(path)
257
258    def get_orgs(self):
259        path = "/admin/orgs"
260        results = self.requests_get(path)
261        return [Organization.parse_response(self, result) for result in results]
262
263    def get_user(self):
264        result = self.requests_get(AllSpice.GET_USER)
265        return User.parse_response(self, result)
266
267    def get_version(self) -> str:
268        result = self.requests_get(AllSpice.ALLSPICE_HUB_VERSION)
269        return result["version"]
270
271    def get_users(self) -> List[User]:
272        results = self.requests_get(AllSpice.GET_USERS_ADMIN)
273        return [User.parse_response(self, result) for result in results]
274
275    def get_user_by_email(self, email: str) -> Optional[User]:
276        users = self.get_users()
277        for user in users:
278            if user.email == email or email in user.emails:
279                return user
280        return None
281
282    def get_user_by_name(self, username: str) -> Optional[User]:
283        users = self.get_users()
284        for user in users:
285            if user.username == username:
286                return user
287        return None
288
289    def get_repository(self, owner: str, name: str) -> Repository:
290        path = self.GET_REPOSITORY.format(owner=owner, name=name)
291        result = self.requests_get(path)
292        return Repository.parse_response(self, result)
293
294    def create_user(
295        self,
296        user_name: str,
297        email: str,
298        password: str,
299        full_name: Optional[str] = None,
300        login_name: Optional[str] = None,
301        change_pw=True,
302        send_notify=True,
303        source_id=0,
304    ):
305        """Create User.
306        Throws:
307            AlreadyExistsException, if the User exists already
308            Exception, if something else went wrong.
309        """
310        if not login_name:
311            login_name = user_name
312        if not full_name:
313            full_name = user_name
314        request_data = {
315            "source_id": source_id,
316            "login_name": login_name,
317            "full_name": full_name,
318            "username": user_name,
319            "email": email,
320            "password": password,
321            "send_notify": send_notify,
322            "must_change_password": change_pw,
323        }
324
325        self.logger.debug("Gitea post payload: %s", request_data)
326        result = self.requests_post(AllSpice.ADMIN_CREATE_USER, data=request_data)
327        if "id" in result:
328            self.logger.info(
329                "Successfully created User %s <%s> (id %s)",
330                result["login"],
331                result["email"],
332                result["id"],
333            )
334            self.logger.debug("Gitea response: %s", result)
335        else:
336            self.logger.error(result["message"])
337            raise Exception("User not created... (gitea: %s)" % result["message"])
338        user = User.parse_response(self, result)
339        return user
340
341    def create_repo(
342        self,
343        repoOwner: Union[User, Organization],
344        repoName: str,
345        description: str = "",
346        private: bool = False,
347        autoInit=True,
348        gitignores: Optional[str] = None,
349        license: Optional[str] = None,
350        readme: str = "Default",
351        issue_labels: Optional[str] = None,
352        default_branch="master",
353    ):
354        """Create a Repository as the administrator
355
356        Throws:
357            AlreadyExistsException: If the Repository exists already.
358            Exception: If something else went wrong.
359
360        Note:
361            Non-admin users can not use this method. Please use instead
362            `allspice.User.create_repo` or `allspice.Organization.create_repo`.
363        """
364        # although this only says user in the api, this also works for
365        # organizations
366        assert isinstance(repoOwner, User) or isinstance(repoOwner, Organization)
367        result = self.requests_post(
368            AllSpice.ADMIN_REPO_CREATE % repoOwner.username,
369            data={
370                "name": repoName,
371                "description": description,
372                "private": private,
373                "auto_init": autoInit,
374                "gitignores": gitignores,
375                "license": license,
376                "issue_labels": issue_labels,
377                "readme": readme,
378                "default_branch": default_branch,
379            },
380        )
381        if "id" in result:
382            self.logger.info("Successfully created Repository %s " % result["name"])
383        else:
384            self.logger.error(result["message"])
385            raise Exception("Repository not created... (gitea: %s)" % result["message"])
386        return Repository.parse_response(self, result)
387
388    def create_org(
389        self,
390        owner: User,
391        orgName: str,
392        description: str,
393        location="",
394        website="",
395        full_name="",
396    ):
397        assert isinstance(owner, User)
398        result = self.requests_post(
399            AllSpice.CREATE_ORG % owner.username,
400            data={
401                "username": orgName,
402                "description": description,
403                "location": location,
404                "website": website,
405                "full_name": full_name,
406            },
407        )
408        if "id" in result:
409            self.logger.info("Successfully created Organization %s" % result["username"])
410        else:
411            self.logger.error("Organization not created... (gitea: %s)" % result["message"])
412            self.logger.error(result["message"])
413            raise Exception("Organization not created... (gitea: %s)" % result["message"])
414        return Organization.parse_response(self, result)
415
416    def create_team(
417        self,
418        org: Organization,
419        name: str,
420        description: str = "",
421        permission: str = "read",
422        can_create_org_repo: bool = False,
423        includes_all_repositories: bool = False,
424        units=(
425            "repo.code",
426            "repo.issues",
427            "repo.ext_issues",
428            "repo.wiki",
429            "repo.pulls",
430            "repo.releases",
431            "repo.ext_wiki",
432        ),
433        units_map={},
434    ):
435        """Creates a Team.
436
437        Args:
438            org (Organization): Organization the Team will be part of.
439            name (str): The Name of the Team to be created.
440            description (str): Optional, None, short description of the new Team.
441            permission (str): Optional, 'read', What permissions the members
442            units_map (dict): Optional, {}, a mapping of units to their
443                permissions. If None or empty, the `permission` permission will
444                be applied to all units. Note: When both `units` and `units_map`
445                are given, `units_map` will be preferred.
446        """
447
448        result = self.requests_post(
449            AllSpice.CREATE_TEAM % org.username,
450            data={
451                "name": name,
452                "description": description,
453                "permission": permission,
454                "can_create_org_repo": can_create_org_repo,
455                "includes_all_repositories": includes_all_repositories,
456                "units": units,
457                "units_map": units_map,
458            },
459        )
460
461        if "id" in result:
462            self.logger.info("Successfully created Team %s" % result["name"])
463        else:
464            self.logger.error("Team not created... (gitea: %s)" % result["message"])
465            self.logger.error(result["message"])
466            raise Exception("Team not created... (gitea: %s)" % result["message"])
467        api_object = Team.parse_response(self, result)
468        setattr(
469            api_object, "_organization", org
470        )  # fixes strange behaviour of gitea not returning a valid organization here.
471        return api_object

Object to establish a session with AllSpice Hub.

AllSpice( allspice_hub_url='https://hub.allspice.io', token_text=None, auth=None, verify=True, log_level='INFO', ratelimiting=(100, 60))
33    def __init__(
34        self,
35        allspice_hub_url="https://hub.allspice.io",
36        token_text=None,
37        auth=None,
38        verify=True,
39        log_level="INFO",
40        ratelimiting=(100, 60),
41    ):
42        """Initializing an instance of the AllSpice Hub Client
43
44        Args:
45            allspice_hub_url (str): The URL for the AllSpice Hub instance.
46                Defaults to `https://hub.allspice.io`.
47
48            token_text (str, None): The access token, by default None.
49
50            auth (tuple, None): The user credentials
51                `(username, password)`, by default None.
52
53            verify (bool): If True, allow insecure server connections
54                when using SSL.
55
56            log_level (str): The log level, by default `INFO`.
57
58            ratelimiting (tuple[int, int], None): `(max_calls, period)`,
59                If None, no rate limiting is applied. By default, 100 calls
60                per minute are allowed.
61        """
62
63        self.logger = logging.getLogger(__name__)
64        handler = logging.StreamHandler(sys.stderr)
65        handler.setFormatter(
66            logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
67        )
68        self.logger.addHandler(handler)
69        self.logger.setLevel(log_level)
70        self.headers = {
71            "Content-type": "application/json",
72        }
73        self.url = allspice_hub_url
74
75        if ratelimiting is None:
76            self.requests = requests.Session()
77        else:
78            (max_calls, period) = ratelimiting
79            self.requests = RateLimitedSession(max_calls=max_calls, period=period)
80
81        # Manage authentification
82        if not token_text and not auth:
83            raise ValueError("Please provide auth or token_text, but not both")
84        if token_text:
85            self.headers["Authorization"] = "token " + token_text
86        if auth:
87            self.logger.warning(
88                "Using basic auth is not recommended. Prefer using a token instead."
89            )
90            self.requests.auth = auth
91
92        # Manage SSL certification verification
93        self.requests.verify = verify
94        if not verify:
95            urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

Initializing an instance of the AllSpice Hub Client

Args: allspice_hub_url (str): The URL for the AllSpice Hub instance. Defaults to https://hub.allspice.io.

token_text (str, None): The access token, by default None.

auth (tuple, None): The user credentials
    `(username, password)`, by default None.

verify (bool): If True, allow insecure server connections
    when using SSL.

log_level (str): The log level, by default `INFO`.

ratelimiting (tuple[int, int], None): `(max_calls, period)`,
    If None, no rate limiting is applied. By default, 100 calls
    per minute are allowed.
ADMIN_CREATE_USER = '/admin/users'
GET_USERS_ADMIN = '/admin/users'
ADMIN_REPO_CREATE = '/admin/users/%s/repos'
ALLSPICE_HUB_VERSION = '/version'
GET_USER = '/user'
GET_REPOSITORY = '/repos/{owner}/{name}'
CREATE_ORG = '/admin/users/%s/orgs'
CREATE_TEAM = '/orgs/%s/teams'
logger
headers
url
@staticmethod
def parse_result(result) -> Dict:
119    @staticmethod
120    def parse_result(result) -> Dict:
121        """Parses the result-JSON to a dict."""
122        if result.text and len(result.text) > 3:
123            return json.loads(result.text)
124        return {}

Parses the result-JSON to a dict.

def requests_get( self, endpoint: str, params: Mapping = frozendict.frozendict({}), sudo=None):
126    def requests_get(self, endpoint: str, params: Mapping = frozendict(), sudo=None):
127        combined_params = {}
128        combined_params.update(params)
129        if sudo:
130            combined_params["sudo"] = sudo.username
131        return self.parse_result(self.__get(endpoint, combined_params))
def requests_get_raw( self, endpoint: str, params=frozendict.frozendict({}), sudo=None) -> bytes:
133    def requests_get_raw(self, endpoint: str, params=frozendict(), sudo=None) -> bytes:
134        combined_params = {}
135        combined_params.update(params)
136        if sudo:
137            combined_params["sudo"] = sudo.username
138        return self.__get(endpoint, combined_params).content
def requests_get_paginated( self, endpoint: str, params=frozendict.frozendict({}), sudo=None, page_key: str = 'page', first_page: int = 1):
140    def requests_get_paginated(
141        self,
142        endpoint: str,
143        params=frozendict(),
144        sudo=None,
145        page_key: str = "page",
146        first_page: int = 1,
147    ):
148        page = first_page
149        combined_params = {}
150        combined_params.update(params)
151        aggregated_result = []
152        while True:
153            combined_params[page_key] = page
154            result = self.requests_get(endpoint, combined_params, sudo)
155
156            if not result:
157                return aggregated_result
158
159            if isinstance(result, dict):
160                if "data" in result:
161                    data = result["data"]
162                    if len(data) == 0:
163                        return aggregated_result
164                    aggregated_result.extend(data)
165                elif "tree" in result:
166                    data = result["tree"]
167                    if data is None or len(data) == 0:
168                        return aggregated_result
169                    aggregated_result.extend(data)
170                else:
171                    raise NotImplementedError(
172                        "requests_get_paginated does not know how to handle responses of this type."
173                    )
174            else:
175                aggregated_result.extend(result)
176
177            page += 1
def requests_put(self, endpoint: str, data: Optional[dict] = None):
179    def requests_put(self, endpoint: str, data: Optional[dict] = None):
180        if not data:
181            data = {}
182        request = self.requests.put(
183            self.__get_url(endpoint), headers=self.headers, data=json.dumps(data)
184        )
185        if request.status_code not in [200, 204]:
186            message = f"Received status code: {request.status_code} ({request.url}) {request.text}"
187            self.logger.error(message)
188            raise Exception(message)
def requests_delete(self, endpoint: str, data: Optional[dict] = None):
190    def requests_delete(self, endpoint: str, data: Optional[dict] = None):
191        request = self.requests.delete(
192            self.__get_url(endpoint), headers=self.headers, data=json.dumps(data)
193        )
194        if request.status_code not in [200, 204]:
195            message = f"Received status code: {request.status_code} ({request.url})"
196            self.logger.error(message)
197            raise Exception(message)
def requests_post( self, endpoint: str, data: Optional[dict] = None, params: Optional[dict] = None, files: Optional[dict] = None):
199    def requests_post(
200        self,
201        endpoint: str,
202        data: Optional[dict] = None,
203        params: Optional[dict] = None,
204        files: Optional[dict] = None,
205    ):
206        """
207        Make a POST call to the endpoint.
208
209        :param endpoint: The path to the endpoint
210        :param data: A dictionary for JSON data
211        :param params: A dictionary of query params
212        :param files: A dictionary of files, see requests.post. Using both files and data
213                      can lead to unexpected results!
214        :return: The JSON response parsed as a dict
215        """
216
217        # This should ideally be a TypedDict of the type of arguments taken by
218        # `requests.post`.
219        args: dict[str, Any] = {
220            "headers": self.headers.copy(),
221        }
222        if data is not None:
223            args["data"] = json.dumps(data)
224        if params is not None:
225            args["params"] = params
226        if files is not None:
227            args["headers"].pop("Content-type")
228            args["files"] = files
229
230        request = self.requests.post(self.__get_url(endpoint), **args)
231
232        if request.status_code not in [200, 201, 202]:
233            if "already exists" in request.text or "e-mail already in use" in request.text:
234                self.logger.warning(request.text)
235                raise AlreadyExistsException()
236            self.logger.error(f"Received status code: {request.status_code} ({request.url})")
237            self.logger.error(f"With info: {data} ({self.headers})")
238            self.logger.error(f"Answer: {request.text}")
239            raise Exception(
240                f"Received status code: {request.status_code} ({request.url}), {request.text}"
241            )
242        return self.parse_result(request)

Make a POST call to the endpoint.

Parameters
  • endpoint: The path to the endpoint
  • data: A dictionary for JSON data
  • params: A dictionary of query params
  • files: A dictionary of files, see requests.post. Using both files and data can lead to unexpected results!
Returns

The JSON response parsed as a dict

def requests_patch(self, endpoint: str, data: dict):
244    def requests_patch(self, endpoint: str, data: dict):
245        request = self.requests.patch(
246            self.__get_url(endpoint), headers=self.headers, data=json.dumps(data)
247        )
248        if request.status_code not in [200, 201]:
249            error_message = f"Received status code: {request.status_code} ({request.url}) {data}"
250            self.logger.error(error_message)
251            raise Exception(error_message)
252        return self.parse_result(request)
def get_orgs_public_members_all(self, orgname):
254    def get_orgs_public_members_all(self, orgname):
255        path = "/orgs/" + orgname + "/public_members"
256        return self.requests_get(path)
def get_orgs(self):
258    def get_orgs(self):
259        path = "/admin/orgs"
260        results = self.requests_get(path)
261        return [Organization.parse_response(self, result) for result in results]
def get_user(self):
263    def get_user(self):
264        result = self.requests_get(AllSpice.GET_USER)
265        return User.parse_response(self, result)
def get_version(self) -> str:
267    def get_version(self) -> str:
268        result = self.requests_get(AllSpice.ALLSPICE_HUB_VERSION)
269        return result["version"]
def get_users(self) -> List[User]:
271    def get_users(self) -> List[User]:
272        results = self.requests_get(AllSpice.GET_USERS_ADMIN)
273        return [User.parse_response(self, result) for result in results]
def get_user_by_email(self, email: str) -> Optional[User]:
275    def get_user_by_email(self, email: str) -> Optional[User]:
276        users = self.get_users()
277        for user in users:
278            if user.email == email or email in user.emails:
279                return user
280        return None
def get_user_by_name(self, username: str) -> Optional[User]:
282    def get_user_by_name(self, username: str) -> Optional[User]:
283        users = self.get_users()
284        for user in users:
285            if user.username == username:
286                return user
287        return None
def get_repository(self, owner: str, name: str) -> Repository:
289    def get_repository(self, owner: str, name: str) -> Repository:
290        path = self.GET_REPOSITORY.format(owner=owner, name=name)
291        result = self.requests_get(path)
292        return Repository.parse_response(self, result)
def create_user( self, user_name: str, email: str, password: str, full_name: Optional[str] = None, login_name: Optional[str] = None, change_pw=True, send_notify=True, source_id=0):
294    def create_user(
295        self,
296        user_name: str,
297        email: str,
298        password: str,
299        full_name: Optional[str] = None,
300        login_name: Optional[str] = None,
301        change_pw=True,
302        send_notify=True,
303        source_id=0,
304    ):
305        """Create User.
306        Throws:
307            AlreadyExistsException, if the User exists already
308            Exception, if something else went wrong.
309        """
310        if not login_name:
311            login_name = user_name
312        if not full_name:
313            full_name = user_name
314        request_data = {
315            "source_id": source_id,
316            "login_name": login_name,
317            "full_name": full_name,
318            "username": user_name,
319            "email": email,
320            "password": password,
321            "send_notify": send_notify,
322            "must_change_password": change_pw,
323        }
324
325        self.logger.debug("Gitea post payload: %s", request_data)
326        result = self.requests_post(AllSpice.ADMIN_CREATE_USER, data=request_data)
327        if "id" in result:
328            self.logger.info(
329                "Successfully created User %s <%s> (id %s)",
330                result["login"],
331                result["email"],
332                result["id"],
333            )
334            self.logger.debug("Gitea response: %s", result)
335        else:
336            self.logger.error(result["message"])
337            raise Exception("User not created... (gitea: %s)" % result["message"])
338        user = User.parse_response(self, result)
339        return user

Create User. Throws: AlreadyExistsException, if the User exists already Exception, if something else went wrong.

def create_repo( self, repoOwner: Union[User, Organization], repoName: str, description: str = '', private: bool = False, autoInit=True, gitignores: Optional[str] = None, license: Optional[str] = None, readme: str = 'Default', issue_labels: Optional[str] = None, default_branch='master'):
341    def create_repo(
342        self,
343        repoOwner: Union[User, Organization],
344        repoName: str,
345        description: str = "",
346        private: bool = False,
347        autoInit=True,
348        gitignores: Optional[str] = None,
349        license: Optional[str] = None,
350        readme: str = "Default",
351        issue_labels: Optional[str] = None,
352        default_branch="master",
353    ):
354        """Create a Repository as the administrator
355
356        Throws:
357            AlreadyExistsException: If the Repository exists already.
358            Exception: If something else went wrong.
359
360        Note:
361            Non-admin users can not use this method. Please use instead
362            `allspice.User.create_repo` or `allspice.Organization.create_repo`.
363        """
364        # although this only says user in the api, this also works for
365        # organizations
366        assert isinstance(repoOwner, User) or isinstance(repoOwner, Organization)
367        result = self.requests_post(
368            AllSpice.ADMIN_REPO_CREATE % repoOwner.username,
369            data={
370                "name": repoName,
371                "description": description,
372                "private": private,
373                "auto_init": autoInit,
374                "gitignores": gitignores,
375                "license": license,
376                "issue_labels": issue_labels,
377                "readme": readme,
378                "default_branch": default_branch,
379            },
380        )
381        if "id" in result:
382            self.logger.info("Successfully created Repository %s " % result["name"])
383        else:
384            self.logger.error(result["message"])
385            raise Exception("Repository not created... (gitea: %s)" % result["message"])
386        return Repository.parse_response(self, result)

Create a Repository as the administrator

Throws: AlreadyExistsException: If the Repository exists already. Exception: If something else went wrong.

Note: Non-admin users can not use this method. Please use instead allspice.User.create_repo or allspice.Organization.create_repo.

def create_org( self, owner: User, orgName: str, description: str, location='', website='', full_name=''):
388    def create_org(
389        self,
390        owner: User,
391        orgName: str,
392        description: str,
393        location="",
394        website="",
395        full_name="",
396    ):
397        assert isinstance(owner, User)
398        result = self.requests_post(
399            AllSpice.CREATE_ORG % owner.username,
400            data={
401                "username": orgName,
402                "description": description,
403                "location": location,
404                "website": website,
405                "full_name": full_name,
406            },
407        )
408        if "id" in result:
409            self.logger.info("Successfully created Organization %s" % result["username"])
410        else:
411            self.logger.error("Organization not created... (gitea: %s)" % result["message"])
412            self.logger.error(result["message"])
413            raise Exception("Organization not created... (gitea: %s)" % result["message"])
414        return Organization.parse_response(self, result)
def create_team( self, org: Organization, name: str, description: str = '', permission: str = 'read', can_create_org_repo: bool = False, includes_all_repositories: bool = False, units=('repo.code', 'repo.issues', 'repo.ext_issues', 'repo.wiki', 'repo.pulls', 'repo.releases', 'repo.ext_wiki'), units_map={}):
416    def create_team(
417        self,
418        org: Organization,
419        name: str,
420        description: str = "",
421        permission: str = "read",
422        can_create_org_repo: bool = False,
423        includes_all_repositories: bool = False,
424        units=(
425            "repo.code",
426            "repo.issues",
427            "repo.ext_issues",
428            "repo.wiki",
429            "repo.pulls",
430            "repo.releases",
431            "repo.ext_wiki",
432        ),
433        units_map={},
434    ):
435        """Creates a Team.
436
437        Args:
438            org (Organization): Organization the Team will be part of.
439            name (str): The Name of the Team to be created.
440            description (str): Optional, None, short description of the new Team.
441            permission (str): Optional, 'read', What permissions the members
442            units_map (dict): Optional, {}, a mapping of units to their
443                permissions. If None or empty, the `permission` permission will
444                be applied to all units. Note: When both `units` and `units_map`
445                are given, `units_map` will be preferred.
446        """
447
448        result = self.requests_post(
449            AllSpice.CREATE_TEAM % org.username,
450            data={
451                "name": name,
452                "description": description,
453                "permission": permission,
454                "can_create_org_repo": can_create_org_repo,
455                "includes_all_repositories": includes_all_repositories,
456                "units": units,
457                "units_map": units_map,
458            },
459        )
460
461        if "id" in result:
462            self.logger.info("Successfully created Team %s" % result["name"])
463        else:
464            self.logger.error("Team not created... (gitea: %s)" % result["message"])
465            self.logger.error(result["message"])
466            raise Exception("Team not created... (gitea: %s)" % result["message"])
467        api_object = Team.parse_response(self, result)
468        setattr(
469            api_object, "_organization", org
470        )  # fixes strange behaviour of gitea not returning a valid organization here.
471        return api_object

Creates a Team.

Args: org (Organization): Organization the Team will be part of. name (str): The Name of the Team to be created. description (str): Optional, None, short description of the new Team. permission (str): Optional, 'read', What permissions the members units_map (dict): Optional, {}, a mapping of units to their permissions. If None or empty, the permission permission will be applied to all units. Note: When both units and units_map are given, units_map will be preferred.

class AlreadyExistsException(builtins.Exception):
2class AlreadyExistsException(Exception):
3    pass

Common base class for all non-exit exceptions.

class Branch(allspice.baseapiobject.ReadonlyApiObject):
419class Branch(ReadonlyApiObject):
420    commit: Dict[str, Optional[Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]]]
421    effective_branch_protection_name: str
422    enable_status_check: bool
423    name: str
424    protected: bool
425    required_approvals: int
426    status_check_contexts: List[Any]
427    user_can_merge: bool
428    user_can_push: bool
429
430    API_OBJECT = """/repos/{owner}/{repo}/branches/{branch}"""
431
432    def __init__(self, allspice_client):
433        super().__init__(allspice_client)
434
435    def __eq__(self, other):
436        if not isinstance(other, Branch):
437            return False
438        return self.commit == other.commit and self.name == other.name
439
440    def __hash__(self):
441        return hash(self.commit["id"]) ^ hash(self.name)
442
443    _fields_to_parsers: ClassVar[dict] = {
444        # This is not a commit object
445        # "commit": lambda allspice_client, c: Commit.parse_response(allspice_client, c)
446    }
447
448    @classmethod
449    def request(cls, allspice_client, owner: str, repo: str, branch: str):
450        return cls._request(allspice_client, {"owner": owner, "repo": repo, "branch": branch})
Branch(allspice_client)
432    def __init__(self, allspice_client):
433        super().__init__(allspice_client)
commit: Dict[str, Union[str, Dict[str, str], Dict[str, Union[bool, str, NoneType]], NoneType]]
effective_branch_protection_name: str
enable_status_check: bool
name: str
protected: bool
required_approvals: int
status_check_contexts: List[Any]
user_can_merge: bool
user_can_push: bool
API_OBJECT = '/repos/{owner}/{repo}/branches/{branch}'
@classmethod
def request(cls, allspice_client, owner: str, repo: str, branch: str):
448    @classmethod
449    def request(cls, allspice_client, owner: str, repo: str, branch: str):
450        return cls._request(allspice_client, {"owner": owner, "repo": repo, "branch": branch})
class Comment(allspice.baseapiobject.ApiObject):
1590class Comment(ApiObject):
1591    assets: List[Union[Any, Dict[str, Union[int, str]]]]
1592    body: str
1593    created_at: datetime
1594    html_url: str
1595    id: int
1596    issue_url: str
1597    original_author: str
1598    original_author_id: int
1599    pull_request_url: str
1600    updated_at: datetime
1601    user: User
1602
1603    API_OBJECT = """/repos/{owner}/{repo}/issues/comments/{id}"""
1604    GET_ATTACHMENTS_PATH = """/repos/{owner}/{repo}/issues/comments/{id}/assets"""
1605    ATTACHMENT_PATH = """/repos/{owner}/{repo}/issues/comments/{id}/assets/{attachment_id}"""
1606
1607    def __init__(self, allspice_client):
1608        super().__init__(allspice_client)
1609
1610    def __eq__(self, other):
1611        if not isinstance(other, Comment):
1612            return False
1613        return self.repository == other.repository and self.id == other.id
1614
1615    def __hash__(self):
1616        return hash(self.repository) ^ hash(self.id)
1617
1618    @classmethod
1619    def request(cls, allspice_client, owner: str, repo: str, id: str) -> "Comment":
1620        return cls._request(allspice_client, {"owner": owner, "repo": repo, "id": id})
1621
1622    _fields_to_parsers: ClassVar[dict] = {
1623        "user": lambda allspice_client, r: User.parse_response(allspice_client, r),
1624        "created_at": lambda _, t: Util.convert_time(t),
1625        "updated_at": lambda _, t: Util.convert_time(t),
1626    }
1627
1628    _patchable_fields: ClassVar[set[str]] = {"body"}
1629
1630    @property
1631    def parent_url(self) -> str:
1632        """URL of the parent of this comment (the issue or the pull request)"""
1633
1634        if self.issue_url is not None and self.issue_url != "":
1635            return self.issue_url
1636        else:
1637            return self.pull_request_url
1638
1639    @cached_property
1640    def repository(self) -> Repository:
1641        """The repository this comment was posted on."""
1642
1643        owner_name, repo_name = self.parent_url.split("/")[-4:-2]
1644        return Repository.request(self.allspice_client, owner_name, repo_name)
1645
1646    def __fields_for_path(self):
1647        return {
1648            "owner": self.repository.owner.username,
1649            "repo": self.repository.name,
1650            "id": self.id,
1651        }
1652
1653    def commit(self):
1654        self._commit(self.__fields_for_path())
1655
1656    def delete(self):
1657        self.allspice_client.requests_delete(self.API_OBJECT.format(**self.__fields_for_path()))
1658        self.deleted = True
1659
1660    def get_attachments(self) -> List[Attachment]:
1661        """
1662        Get all attachments on this comment. This returns Attachment objects, which
1663        contain a link to download the attachment.
1664
1665        https://hub.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments
1666        """
1667
1668        results = self.allspice_client.requests_get(
1669            self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path())
1670        )
1671        return [Attachment.parse_response(self.allspice_client, result) for result in results]
1672
1673    def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment:
1674        """
1675        Create an attachment on this comment.
1676
1677        https://hub.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment
1678
1679        :param file: The file to attach. This should be a file-like object.
1680        :param name: The name of the file. If not provided, the name of the file will be
1681                     used.
1682        :return: The created attachment.
1683        """
1684
1685        args: dict[str, Any] = {
1686            "files": {"attachment": file},
1687        }
1688        if name is not None:
1689            args["params"] = {"name": name}
1690
1691        result = self.allspice_client.requests_post(
1692            self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()),
1693            **args,
1694        )
1695        return Attachment.parse_response(self.allspice_client, result)
1696
1697    def edit_attachment(self, attachment: Attachment, data: dict) -> Attachment:
1698        """
1699        Edit an attachment.
1700
1701        The list of params that can be edited is available at
1702        https://hub.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment
1703
1704        :param attachment: The attachment to be edited
1705        :param data: The data parameter should be a dictionary of the fields to edit.
1706        :return: The edited attachment
1707        """
1708
1709        args = {
1710            **self.__fields_for_path(),
1711            "attachment_id": attachment.id,
1712        }
1713        result = self.allspice_client.requests_patch(
1714            self.ATTACHMENT_PATH.format(**args),
1715            data=data,
1716        )
1717        return Attachment.parse_response(self.allspice_client, result)
1718
1719    def delete_attachment(self, attachment: Attachment):
1720        """https://hub.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment"""
1721
1722        args = {
1723            **self.__fields_for_path(),
1724            "attachment_id": attachment.id,
1725        }
1726        self.allspice_client.requests_delete(self.ATTACHMENT_PATH.format(**args))
1727        attachment.deleted = True
Comment(allspice_client)
1607    def __init__(self, allspice_client):
1608        super().__init__(allspice_client)
assets: List[Union[Any, Dict[str, Union[int, str]]]]
body: str
created_at: datetime.datetime
html_url: str
id: int
issue_url: str
original_author: str
original_author_id: int
pull_request_url: str
updated_at: datetime.datetime
user: User
API_OBJECT = '/repos/{owner}/{repo}/issues/comments/{id}'
GET_ATTACHMENTS_PATH = '/repos/{owner}/{repo}/issues/comments/{id}/assets'
ATTACHMENT_PATH = '/repos/{owner}/{repo}/issues/comments/{id}/assets/{attachment_id}'
@classmethod
def request( cls, allspice_client, owner: str, repo: str, id: str) -> Comment:
1618    @classmethod
1619    def request(cls, allspice_client, owner: str, repo: str, id: str) -> "Comment":
1620        return cls._request(allspice_client, {"owner": owner, "repo": repo, "id": id})
parent_url: str
1630    @property
1631    def parent_url(self) -> str:
1632        """URL of the parent of this comment (the issue or the pull request)"""
1633
1634        if self.issue_url is not None and self.issue_url != "":
1635            return self.issue_url
1636        else:
1637            return self.pull_request_url

URL of the parent of this comment (the issue or the pull request)

repository: Repository
1639    @cached_property
1640    def repository(self) -> Repository:
1641        """The repository this comment was posted on."""
1642
1643        owner_name, repo_name = self.parent_url.split("/")[-4:-2]
1644        return Repository.request(self.allspice_client, owner_name, repo_name)

The repository this comment was posted on.

def commit(self):
1653    def commit(self):
1654        self._commit(self.__fields_for_path())
def delete(self):
1656    def delete(self):
1657        self.allspice_client.requests_delete(self.API_OBJECT.format(**self.__fields_for_path()))
1658        self.deleted = True
def get_attachments(self) -> List[allspice.apiobject.Attachment]:
1660    def get_attachments(self) -> List[Attachment]:
1661        """
1662        Get all attachments on this comment. This returns Attachment objects, which
1663        contain a link to download the attachment.
1664
1665        https://hub.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments
1666        """
1667
1668        results = self.allspice_client.requests_get(
1669            self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path())
1670        )
1671        return [Attachment.parse_response(self.allspice_client, result) for result in results]

Get all attachments on this comment. This returns Attachment objects, which contain a link to download the attachment.

allspice.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments">https://huballspice.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments

def create_attachment( self, file: <class 'IO'>, name: Optional[str] = None) -> allspice.apiobject.Attachment:
1673    def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment:
1674        """
1675        Create an attachment on this comment.
1676
1677        https://hub.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment
1678
1679        :param file: The file to attach. This should be a file-like object.
1680        :param name: The name of the file. If not provided, the name of the file will be
1681                     used.
1682        :return: The created attachment.
1683        """
1684
1685        args: dict[str, Any] = {
1686            "files": {"attachment": file},
1687        }
1688        if name is not None:
1689            args["params"] = {"name": name}
1690
1691        result = self.allspice_client.requests_post(
1692            self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()),
1693            **args,
1694        )
1695        return Attachment.parse_response(self.allspice_client, result)

Create an attachment on this comment.

allspice.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment

Parameters
  • file: The file to attach. This should be a file-like object.
  • name: The name of the file. If not provided, the name of the file will be used.
Returns

The created attachment.

def edit_attachment( self, attachment: allspice.apiobject.Attachment, data: dict) -> allspice.apiobject.Attachment:
1697    def edit_attachment(self, attachment: Attachment, data: dict) -> Attachment:
1698        """
1699        Edit an attachment.
1700
1701        The list of params that can be edited is available at
1702        https://hub.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment
1703
1704        :param attachment: The attachment to be edited
1705        :param data: The data parameter should be a dictionary of the fields to edit.
1706        :return: The edited attachment
1707        """
1708
1709        args = {
1710            **self.__fields_for_path(),
1711            "attachment_id": attachment.id,
1712        }
1713        result = self.allspice_client.requests_patch(
1714            self.ATTACHMENT_PATH.format(**args),
1715            data=data,
1716        )
1717        return Attachment.parse_response(self.allspice_client, result)

Edit an attachment.

The list of params that can be edited is available at allspice.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment

Parameters
  • attachment: The attachment to be edited
  • data: The data parameter should be a dictionary of the fields to edit.
Returns

The edited attachment

def delete_attachment(self, attachment: allspice.apiobject.Attachment):
1719    def delete_attachment(self, attachment: Attachment):
1720        """https://hub.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment"""
1721
1722        args = {
1723            **self.__fields_for_path(),
1724            "attachment_id": attachment.id,
1725        }
1726        self.allspice_client.requests_delete(self.ATTACHMENT_PATH.format(**args))
1727        attachment.deleted = True

allspice.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment

class Commit(allspice.baseapiobject.ReadonlyApiObject):
1730class Commit(ReadonlyApiObject):
1731    author: User
1732    commit: Dict[str, Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]]
1733    committer: Dict[str, Union[int, str, bool]]
1734    created: str
1735    files: List[Dict[str, str]]
1736    html_url: str
1737    inner_commit: Dict[str, Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]]
1738    parents: List[Union[Dict[str, str], Any]]
1739    sha: str
1740    stats: Dict[str, int]
1741    url: str
1742
1743    API_OBJECT = """/repos/{owner}/{repo}/commits/{sha}"""
1744    COMMIT_GET_STATUS = """/repos/{owner}/{repo}/commits/{sha}/status"""
1745    COMMIT_GET_STATUSES = """/repos/{owner}/{repo}/commits/{sha}/statuses"""
1746
1747    # Regex to extract owner and repo names from the url property
1748    URL_REGEXP = re.compile(r"/repos/([^/]+)/([^/]+)/git/commits")
1749
1750    def __init__(self, allspice_client):
1751        super().__init__(allspice_client)
1752
1753    _fields_to_parsers: ClassVar[dict] = {
1754        # NOTE: api may return None for commiters that are no allspice users
1755        "author": lambda allspice_client, u: (
1756            User.parse_response(allspice_client, u) if u else None
1757        )
1758    }
1759
1760    def __eq__(self, other):
1761        if not isinstance(other, Commit):
1762            return False
1763        return self.sha == other.sha
1764
1765    def __hash__(self):
1766        return hash(self.sha)
1767
1768    @classmethod
1769    def parse_response(cls, allspice_client, result) -> "Commit":
1770        commit_cache = result["commit"]
1771        api_object = cls(allspice_client)
1772        cls._initialize(allspice_client, api_object, result)
1773        # inner_commit for legacy reasons
1774        Commit._add_read_property("inner_commit", commit_cache, api_object)
1775        return api_object
1776
1777    def get_status(self) -> CommitCombinedStatus:
1778        """
1779        Get a combined status consisting of all statues on this commit.
1780
1781        Note that the returned object is a CommitCombinedStatus object, which
1782        also contains a list of all statuses on the commit.
1783
1784        https://hub.allspice.io/api/swagger#/repository/repoGetCommitStatus
1785        """
1786
1787        result = self.allspice_client.requests_get(
1788            self.COMMIT_GET_STATUS.format(**self._fields_for_path)
1789        )
1790        return CommitCombinedStatus.parse_response(self.allspice_client, result)
1791
1792    def get_statuses(self) -> List[CommitStatus]:
1793        """
1794        Get a list of all statuses on this commit.
1795
1796        https://hub.allspice.io/api/swagger#/repository/repoListCommitStatuses
1797        """
1798
1799        results = self.allspice_client.requests_get(
1800            self.COMMIT_GET_STATUSES.format(**self._fields_for_path)
1801        )
1802        return [CommitStatus.parse_response(self.allspice_client, result) for result in results]
1803
1804    @cached_property
1805    def _fields_for_path(self) -> dict[str, str]:
1806        matches = self.URL_REGEXP.search(self.url)
1807        if not matches:
1808            raise ValueError(f"Invalid commit URL: {self.url}")
1809
1810        return {
1811            "owner": matches.group(1),
1812            "repo": matches.group(2),
1813            "sha": self.sha,
1814        }
Commit(allspice_client)
1750    def __init__(self, allspice_client):
1751        super().__init__(allspice_client)
author: User
commit: Dict[str, Union[str, Dict[str, str], Dict[str, Union[bool, str, NoneType]]]]
committer: Dict[str, Union[int, str, bool]]
created: str
files: List[Dict[str, str]]
html_url: str
inner_commit: Dict[str, Union[str, Dict[str, str], Dict[str, Union[bool, str, NoneType]]]]
parents: List[Union[Dict[str, str], Any]]
sha: str
stats: Dict[str, int]
url: str
API_OBJECT = '/repos/{owner}/{repo}/commits/{sha}'
COMMIT_GET_STATUS = '/repos/{owner}/{repo}/commits/{sha}/status'
COMMIT_GET_STATUSES = '/repos/{owner}/{repo}/commits/{sha}/statuses'
URL_REGEXP = re.compile('/repos/([^/]+)/([^/]+)/git/commits')
@classmethod
def parse_response(cls, allspice_client, result) -> Commit:
1768    @classmethod
1769    def parse_response(cls, allspice_client, result) -> "Commit":
1770        commit_cache = result["commit"]
1771        api_object = cls(allspice_client)
1772        cls._initialize(allspice_client, api_object, result)
1773        # inner_commit for legacy reasons
1774        Commit._add_read_property("inner_commit", commit_cache, api_object)
1775        return api_object
def get_status(self) -> allspice.apiobject.CommitCombinedStatus:
1777    def get_status(self) -> CommitCombinedStatus:
1778        """
1779        Get a combined status consisting of all statues on this commit.
1780
1781        Note that the returned object is a CommitCombinedStatus object, which
1782        also contains a list of all statuses on the commit.
1783
1784        https://hub.allspice.io/api/swagger#/repository/repoGetCommitStatus
1785        """
1786
1787        result = self.allspice_client.requests_get(
1788            self.COMMIT_GET_STATUS.format(**self._fields_for_path)
1789        )
1790        return CommitCombinedStatus.parse_response(self.allspice_client, result)

Get a combined status consisting of all statues on this commit.

Note that the returned object is a CommitCombinedStatus object, which also contains a list of all statuses on the commit.

allspice.allspice.io/api/swagger#/repository/repoGetCommitStatus">https://huballspice.allspice.io/api/swagger#/repository/repoGetCommitStatus

def get_statuses(self) -> List[allspice.apiobject.CommitStatus]:
1792    def get_statuses(self) -> List[CommitStatus]:
1793        """
1794        Get a list of all statuses on this commit.
1795
1796        https://hub.allspice.io/api/swagger#/repository/repoListCommitStatuses
1797        """
1798
1799        results = self.allspice_client.requests_get(
1800            self.COMMIT_GET_STATUSES.format(**self._fields_for_path)
1801        )
1802        return [CommitStatus.parse_response(self.allspice_client, result) for result in results]

Get a list of all statuses on this commit.

allspice.allspice.io/api/swagger#/repository/repoListCommitStatuses">https://huballspice.allspice.io/api/swagger#/repository/repoListCommitStatuses

class Content(allspice.baseapiobject.ReadonlyApiObject):
2624class Content(ReadonlyApiObject):
2625    content: Any
2626    download_url: str
2627    encoding: Any
2628    git_url: str
2629    html_url: str
2630    last_commit_sha: str
2631    name: str
2632    path: str
2633    sha: str
2634    size: int
2635    submodule_git_url: Any
2636    target: Any
2637    type: str
2638    url: str
2639
2640    FILE = "file"
2641
2642    def __init__(self, allspice_client):
2643        super().__init__(allspice_client)
2644
2645    def __eq__(self, other):
2646        if not isinstance(other, Content):
2647            return False
2648
2649        return self.sha == other.sha and self.name == other.name
2650
2651    def __hash__(self):
2652        return hash(self.sha) ^ hash(self.name)
Content(allspice_client)
2642    def __init__(self, allspice_client):
2643        super().__init__(allspice_client)
content: Any
download_url: str
encoding: Any
git_url: str
html_url: str
last_commit_sha: str
name: str
path: str
sha: str
size: int
submodule_git_url: Any
target: Any
type: str
url: str
FILE = 'file'
class DesignReview(allspice.baseapiobject.ApiObject):
2108class DesignReview(ApiObject):
2109    """
2110    A Design Review. See
2111    https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest.
2112
2113    Note: The base and head fields are not `Branch` objects - they are plain strings
2114    referring to the branch names. This is because DRs can exist for branches that have
2115    been deleted, which don't have an associated `Branch` object from the API. You can use
2116    the `Repository.get_branch` method to get a `Branch` object for a branch if you know
2117    it exists.
2118    """
2119
2120    additions: int
2121    allow_maintainer_edit: bool
2122    allow_maintainer_edits: Any
2123    assignee: User
2124    assignees: List["User"]
2125    base: str
2126    body: str
2127    changed_files: int
2128    closed_at: Optional[str]
2129    comments: int
2130    created_at: str
2131    deletions: int
2132    diff_url: str
2133    draft: bool
2134    due_date: Optional[str]
2135    head: str
2136    html_url: str
2137    id: int
2138    is_locked: bool
2139    labels: List[Any]
2140    merge_base: str
2141    merge_commit_sha: Optional[str]
2142    mergeable: bool
2143    merged: bool
2144    merged_at: Optional[str]
2145    merged_by: Optional["User"]
2146    milestone: Any
2147    number: int
2148    patch_url: str
2149    pin_order: int
2150    repository: Optional["Repository"]
2151    requested_reviewers: Any
2152    review_comments: int
2153    state: str
2154    title: str
2155    updated_at: str
2156    url: str
2157    user: User
2158
2159    API_OBJECT = "/repos/{owner}/{repo}/pulls/{index}"
2160    MERGE_DESIGN_REVIEW = "/repos/{owner}/{repo}/pulls/{index}/merge"
2161    GET_COMMENTS = "/repos/{owner}/{repo}/issues/{index}/comments"
2162
2163    OPEN = "open"
2164    CLOSED = "closed"
2165
2166    class MergeType(Enum):
2167        MERGE = "merge"
2168        REBASE = "rebase"
2169        REBASE_MERGE = "rebase-merge"
2170        SQUASH = "squash"
2171        MANUALLY_MERGED = "manually-merged"
2172
2173    def __init__(self, allspice_client):
2174        super().__init__(allspice_client)
2175
2176    def __eq__(self, other):
2177        if not isinstance(other, DesignReview):
2178            return False
2179        return self.repository == other.repository and self.id == other.id
2180
2181    def __hash__(self):
2182        return hash(self.repository) ^ hash(self.id)
2183
2184    @classmethod
2185    def parse_response(cls, allspice_client, result) -> "DesignReview":
2186        api_object = super().parse_response(allspice_client, result)
2187        cls._add_read_property(
2188            "repository",
2189            Repository.parse_response(allspice_client, result["base"]["repo"]),
2190            api_object,
2191        )
2192
2193        return api_object
2194
2195    @classmethod
2196    def request(cls, allspice_client, owner: str, repo: str, number: str):
2197        """See https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest"""
2198        return cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number})
2199
2200    _fields_to_parsers: ClassVar[dict] = {
2201        "assignee": lambda allspice_client, u: User.parse_response(allspice_client, u),
2202        "assignees": lambda allspice_client, us: [
2203            User.parse_response(allspice_client, u) for u in us
2204        ],
2205        "base": lambda _, b: b["ref"],
2206        "head": lambda _, h: h["ref"],
2207        "merged_by": lambda allspice_client, u: User.parse_response(allspice_client, u),
2208        "milestone": lambda allspice_client, m: Milestone.parse_response(allspice_client, m),
2209        "user": lambda allspice_client, u: User.parse_response(allspice_client, u),
2210    }
2211
2212    _patchable_fields: ClassVar[set[str]] = {
2213        "allow_maintainer_edits",
2214        "assignee",
2215        "assignees",
2216        "base",
2217        "body",
2218        "due_date",
2219        "milestone",
2220        "state",
2221        "title",
2222    }
2223
2224    _parsers_to_fields: ClassVar[dict] = {
2225        "assignee": lambda u: u.username,
2226        "assignees": lambda us: [u.username for u in us],
2227        "base": lambda b: b.name if isinstance(b, Branch) else b,
2228        "milestone": lambda m: m.id,
2229    }
2230
2231    def commit(self):
2232        data = self.get_dirty_fields()
2233        if "due_date" in data and data["due_date"] is None:
2234            data["unset_due_date"] = True
2235
2236        args = {
2237            "owner": self.repository.owner.username,
2238            "repo": self.repository.name,
2239            "index": self.number,
2240        }
2241        self._commit(args, data)
2242
2243    def merge(self, merge_type: MergeType):
2244        """
2245        Merge the pull request. See
2246        https://hub.allspice.io/api/swagger#/repository/repoMergePullRequest
2247
2248        :param merge_type: The type of merge to perform. See the MergeType enum.
2249        """
2250
2251        self.allspice_client.requests_post(
2252            self.MERGE_DESIGN_REVIEW.format(
2253                owner=self.repository.owner.username,
2254                repo=self.repository.name,
2255                index=self.number,
2256            ),
2257            data={"Do": merge_type.value},
2258        )
2259
2260    def get_comments(self) -> List[Comment]:
2261        """
2262        Get the comments on this pull request, but not specifically on a review.
2263
2264        https://hub.allspice.io/api/swagger#/issue/issueGetComments
2265
2266        :return: A list of comments on this pull request.
2267        """
2268
2269        results = self.allspice_client.requests_get(
2270            self.GET_COMMENTS.format(
2271                owner=self.repository.owner.username,
2272                repo=self.repository.name,
2273                index=self.number,
2274            )
2275        )
2276        return [Comment.parse_response(self.allspice_client, result) for result in results]
2277
2278    def create_comment(self, body: str):
2279        """
2280        Create a comment on this pull request. This uses the same endpoint as the
2281        comments on issues, and will not be associated with any reviews.
2282
2283        https://hub.allspice.io/api/swagger#/issue/issueCreateComment
2284
2285        :param body: The body of the comment.
2286        :return: The comment that was created.
2287        """
2288
2289        result = self.allspice_client.requests_post(
2290            self.GET_COMMENTS.format(
2291                owner=self.repository.owner.username,
2292                repo=self.repository.name,
2293                index=self.number,
2294            ),
2295            data={"body": body},
2296        )
2297        return Comment.parse_response(self.allspice_client, result)

A Design Review. See allspice.allspice.io/api/swagger#/repository/repoGetPullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoGetPullRequest.

Note: The base and head fields are not Branch objects - they are plain strings referring to the branch names. This is because DRs can exist for branches that have been deleted, which don't have an associated Branch object from the API. You can use the Repository.get_branch method to get a Branch object for a branch if you know it exists.

DesignReview(allspice_client)
2173    def __init__(self, allspice_client):
2174        super().__init__(allspice_client)
additions: int
allow_maintainer_edit: bool
allow_maintainer_edits: Any
assignee: User
assignees: List[User]
base: str
body: str
changed_files: int
closed_at: Optional[str]
comments: int
created_at: str
deletions: int
diff_url: str
draft: bool
due_date: Optional[str]
head: str
html_url: str
id: int
is_locked: bool
labels: List[Any]
merge_base: str
merge_commit_sha: Optional[str]
mergeable: bool
merged: bool
merged_at: Optional[str]
merged_by: Optional[User]
milestone: Any
number: int
patch_url: str
pin_order: int
repository: Optional[Repository]
requested_reviewers: Any
review_comments: int
state: str
title: str
updated_at: str
url: str
user: User
API_OBJECT = '/repos/{owner}/{repo}/pulls/{index}'
MERGE_DESIGN_REVIEW = '/repos/{owner}/{repo}/pulls/{index}/merge'
GET_COMMENTS = '/repos/{owner}/{repo}/issues/{index}/comments'
OPEN = 'open'
CLOSED = 'closed'
@classmethod
def parse_response(cls, allspice_client, result) -> DesignReview:
2184    @classmethod
2185    def parse_response(cls, allspice_client, result) -> "DesignReview":
2186        api_object = super().parse_response(allspice_client, result)
2187        cls._add_read_property(
2188            "repository",
2189            Repository.parse_response(allspice_client, result["base"]["repo"]),
2190            api_object,
2191        )
2192
2193        return api_object
@classmethod
def request(cls, allspice_client, owner: str, repo: str, number: str):
2195    @classmethod
2196    def request(cls, allspice_client, owner: str, repo: str, number: str):
2197        """See https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest"""
2198        return cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number})

See allspice.allspice.io/api/swagger#/repository/repoGetPullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoGetPullRequest

def commit(self):
2231    def commit(self):
2232        data = self.get_dirty_fields()
2233        if "due_date" in data and data["due_date"] is None:
2234            data["unset_due_date"] = True
2235
2236        args = {
2237            "owner": self.repository.owner.username,
2238            "repo": self.repository.name,
2239            "index": self.number,
2240        }
2241        self._commit(args, data)
def merge(self, merge_type: DesignReview.MergeType):
2243    def merge(self, merge_type: MergeType):
2244        """
2245        Merge the pull request. See
2246        https://hub.allspice.io/api/swagger#/repository/repoMergePullRequest
2247
2248        :param merge_type: The type of merge to perform. See the MergeType enum.
2249        """
2250
2251        self.allspice_client.requests_post(
2252            self.MERGE_DESIGN_REVIEW.format(
2253                owner=self.repository.owner.username,
2254                repo=self.repository.name,
2255                index=self.number,
2256            ),
2257            data={"Do": merge_type.value},
2258        )

Merge the pull request. See allspice.allspice.io/api/swagger#/repository/repoMergePullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoMergePullRequest

Parameters
  • merge_type: The type of merge to perform. See the MergeType enum.
def get_comments(self) -> List[Comment]:
2260    def get_comments(self) -> List[Comment]:
2261        """
2262        Get the comments on this pull request, but not specifically on a review.
2263
2264        https://hub.allspice.io/api/swagger#/issue/issueGetComments
2265
2266        :return: A list of comments on this pull request.
2267        """
2268
2269        results = self.allspice_client.requests_get(
2270            self.GET_COMMENTS.format(
2271                owner=self.repository.owner.username,
2272                repo=self.repository.name,
2273                index=self.number,
2274            )
2275        )
2276        return [Comment.parse_response(self.allspice_client, result) for result in results]

Get the comments on this pull request, but not specifically on a review.

allspice.allspice.io/api/swagger#/issue/issueGetComments">https://huballspice.allspice.io/api/swagger#/issue/issueGetComments

Returns

A list of comments on this pull request.

def create_comment(self, body: str):
2278    def create_comment(self, body: str):
2279        """
2280        Create a comment on this pull request. This uses the same endpoint as the
2281        comments on issues, and will not be associated with any reviews.
2282
2283        https://hub.allspice.io/api/swagger#/issue/issueCreateComment
2284
2285        :param body: The body of the comment.
2286        :return: The comment that was created.
2287        """
2288
2289        result = self.allspice_client.requests_post(
2290            self.GET_COMMENTS.format(
2291                owner=self.repository.owner.username,
2292                repo=self.repository.name,
2293                index=self.number,
2294            ),
2295            data={"body": body},
2296        )
2297        return Comment.parse_response(self.allspice_client, result)

Create a comment on this pull request. This uses the same endpoint as the comments on issues, and will not be associated with any reviews.

allspice.allspice.io/api/swagger#/issue/issueCreateComment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateComment

Parameters
  • body: The body of the comment.
Returns

The comment that was created.

class DesignReview.MergeType(enum.Enum):
2166    class MergeType(Enum):
2167        MERGE = "merge"
2168        REBASE = "rebase"
2169        REBASE_MERGE = "rebase-merge"
2170        SQUASH = "squash"
2171        MANUALLY_MERGED = "manually-merged"
MERGE = <MergeType.MERGE: 'merge'>
REBASE = <MergeType.REBASE: 'rebase'>
REBASE_MERGE = <MergeType.REBASE_MERGE: 'rebase-merge'>
SQUASH = <MergeType.SQUASH: 'squash'>
MANUALLY_MERGED = <MergeType.MANUALLY_MERGED: 'manually-merged'>
class Issue(allspice.baseapiobject.ApiObject):
1905class Issue(ApiObject):
1906    """
1907    An issue on a repository.
1908
1909    Note: `Issue.assets` may not have any entries even if the issue has
1910    attachments. This happens when an issue is fetched via a bulk method like
1911    `Repository.get_issues`. In most cases, prefer using
1912    `Issue.get_attachments` to get the attachments on an issue.
1913    """
1914
1915    assets: List[Union[Any, "Attachment"]]
1916    assignee: Any
1917    assignees: Any
1918    body: str
1919    closed_at: Any
1920    comments: int
1921    created_at: str
1922    due_date: Any
1923    html_url: str
1924    id: int
1925    is_locked: bool
1926    labels: List[Any]
1927    milestone: Optional["Milestone"]
1928    number: int
1929    original_author: str
1930    original_author_id: int
1931    pin_order: int
1932    pull_request: Any
1933    ref: str
1934    repository: Dict[str, Union[int, str]]
1935    state: str
1936    title: str
1937    updated_at: str
1938    url: str
1939    user: User
1940
1941    API_OBJECT = """/repos/{owner}/{repo}/issues/{index}"""  # <owner, repo, index>
1942    GET_TIME = """/repos/%s/%s/issues/%s/times"""  # <owner, repo, index>
1943    GET_COMMENTS = """/repos/{owner}/{repo}/issues/{index}/comments"""
1944    CREATE_ISSUE = """/repos/{owner}/{repo}/issues"""
1945    GET_ATTACHMENTS = """/repos/{owner}/{repo}/issues/{index}/assets"""
1946
1947    OPENED = "open"
1948    CLOSED = "closed"
1949
1950    def __init__(self, allspice_client):
1951        super().__init__(allspice_client)
1952
1953    def __eq__(self, other):
1954        if not isinstance(other, Issue):
1955            return False
1956        return self.repository == other.repository and self.id == other.id
1957
1958    def __hash__(self):
1959        return hash(self.repository) ^ hash(self.id)
1960
1961    _fields_to_parsers: ClassVar[dict] = {
1962        "milestone": lambda allspice_client, m: Milestone.parse_response(allspice_client, m),
1963        "user": lambda allspice_client, u: User.parse_response(allspice_client, u),
1964        "assets": lambda allspice_client, assets: [
1965            Attachment.parse_response(allspice_client, a) for a in assets
1966        ],
1967        "assignee": lambda allspice_client, u: User.parse_response(allspice_client, u),
1968        "assignees": lambda allspice_client, us: [
1969            User.parse_response(allspice_client, u) for u in us
1970        ],
1971        "state": lambda _, s: (Issue.CLOSED if s == "closed" else Issue.OPENED),
1972    }
1973
1974    _parsers_to_fields: ClassVar[dict] = {
1975        "milestone": lambda m: m.id,
1976    }
1977
1978    _patchable_fields: ClassVar[set[str]] = {
1979        "assignee",
1980        "assignees",
1981        "body",
1982        "due_date",
1983        "milestone",
1984        "state",
1985        "title",
1986    }
1987
1988    def commit(self):
1989        args = {
1990            "owner": self.repository.owner.username,
1991            "repo": self.repository.name,
1992            "index": self.number,
1993        }
1994        self._commit(args)
1995
1996    @classmethod
1997    def request(cls, allspice_client, owner: str, repo: str, number: str):
1998        api_object = cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number})
1999        # The repository in the response is a RepositoryMeta object, so request
2000        # the full repository object and add it to the issue object.
2001        repository = Repository.request(allspice_client, owner, repo)
2002        setattr(api_object, "_repository", repository)
2003        # For legacy reasons
2004        cls._add_read_property("repo", repository, api_object)
2005        return api_object
2006
2007    @classmethod
2008    def create_issue(cls, allspice_client, repo: Repository, title: str, body: str = ""):
2009        args = {"owner": repo.owner.username, "repo": repo.name}
2010        data = {"title": title, "body": body}
2011        result = allspice_client.requests_post(Issue.CREATE_ISSUE.format(**args), data=data)
2012        issue = Issue.parse_response(allspice_client, result)
2013        setattr(issue, "_repository", repo)
2014        cls._add_read_property("repo", repo, issue)
2015        return issue
2016
2017    @property
2018    def owner(self) -> Organization | User:
2019        return self.repository.owner
2020
2021    def get_time_sum(self, user: User) -> int:
2022        results = self.allspice_client.requests_get(
2023            Issue.GET_TIME % (self.owner.username, self.repository.name, self.number)
2024        )
2025        return sum(result["time"] for result in results if result and result["user_id"] == user.id)
2026
2027    def get_times(self) -> Optional[Dict]:
2028        return self.allspice_client.requests_get(
2029            Issue.GET_TIME % (self.owner.username, self.repository.name, self.number)
2030        )
2031
2032    def delete_time(self, time_id: str):
2033        path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times/{time_id}"
2034        self.allspice_client.requests_delete(path)
2035
2036    def add_time(self, time: int, created: Optional[str] = None, user_name: Optional[User] = None):
2037        path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times"
2038        self.allspice_client.requests_post(
2039            path, data={"created": created, "time": int(time), "user_name": user_name}
2040        )
2041
2042    def get_comments(self) -> List[Comment]:
2043        """https://hub.allspice.io/api/swagger#/issue/issueGetComments"""
2044
2045        results = self.allspice_client.requests_get(
2046            self.GET_COMMENTS.format(
2047                owner=self.owner.username, repo=self.repository.name, index=self.number
2048            )
2049        )
2050
2051        return [Comment.parse_response(self.allspice_client, result) for result in results]
2052
2053    def create_comment(self, body: str) -> Comment:
2054        """https://hub.allspice.io/api/swagger#/issue/issueCreateComment"""
2055
2056        path = self.GET_COMMENTS.format(
2057            owner=self.owner.username, repo=self.repository.name, index=self.number
2058        )
2059
2060        response = self.allspice_client.requests_post(path, data={"body": body})
2061        return Comment.parse_response(self.allspice_client, response)
2062
2063    def get_attachments(self) -> List[Attachment]:
2064        """
2065        Fetch all attachments on this issue.
2066
2067        Unlike the assets field, this will always fetch all attachments from the
2068        API.
2069
2070        See https://hub.allspice.io/api/swagger#/issue/issueListIssueAttachments
2071        """
2072
2073        path = self.GET_ATTACHMENTS.format(
2074            owner=self.owner.username, repo=self.repository.name, index=self.number
2075        )
2076        response = self.allspice_client.requests_get(path)
2077
2078        return [Attachment.parse_response(self.allspice_client, result) for result in response]
2079
2080    def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment:
2081        """
2082        Create an attachment on this issue.
2083
2084        https://hub.allspice.io/api/swagger#/issue/issueCreateIssueAttachment
2085
2086        :param file: The file to attach. This should be a file-like object.
2087        :param name: The name of the file. If not provided, the name of the file will be
2088                     used.
2089        :return: The created attachment.
2090        """
2091
2092        args: dict[str, Any] = {
2093            "files": {"attachment": file},
2094        }
2095        if name is not None:
2096            args["params"] = {"name": name}
2097
2098        result = self.allspice_client.requests_post(
2099            self.GET_ATTACHMENTS.format(
2100                owner=self.owner.username, repo=self.repository.name, index=self.number
2101            ),
2102            **args,
2103        )
2104
2105        return Attachment.parse_response(self.allspice_client, result)

An issue on a repository.

Note: Issue.assets may not have any entries even if the issue has attachments. This happens when an issue is fetched via a bulk method like Repository.get_issues. In most cases, prefer using Issue.get_attachments to get the attachments on an issue.

Issue(allspice_client)
1950    def __init__(self, allspice_client):
1951        super().__init__(allspice_client)
assets: List[Union[Any, allspice.apiobject.Attachment]]
assignee: Any
assignees: Any
body: str
closed_at: Any
comments: int
created_at: str
due_date: Any
html_url: str
id: int
is_locked: bool
labels: List[Any]
milestone: Optional[Milestone]
number: int
original_author: str
original_author_id: int
pin_order: int
pull_request: Any
ref: str
repository: Dict[str, Union[int, str]]
state: str
title: str
updated_at: str
url: str
user: User
API_OBJECT = '/repos/{owner}/{repo}/issues/{index}'
GET_TIME = '/repos/%s/%s/issues/%s/times'
GET_COMMENTS = '/repos/{owner}/{repo}/issues/{index}/comments'
CREATE_ISSUE = '/repos/{owner}/{repo}/issues'
GET_ATTACHMENTS = '/repos/{owner}/{repo}/issues/{index}/assets'
OPENED = 'open'
CLOSED = 'closed'
def commit(self):
1988    def commit(self):
1989        args = {
1990            "owner": self.repository.owner.username,
1991            "repo": self.repository.name,
1992            "index": self.number,
1993        }
1994        self._commit(args)
@classmethod
def request(cls, allspice_client, owner: str, repo: str, number: str):
1996    @classmethod
1997    def request(cls, allspice_client, owner: str, repo: str, number: str):
1998        api_object = cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number})
1999        # The repository in the response is a RepositoryMeta object, so request
2000        # the full repository object and add it to the issue object.
2001        repository = Repository.request(allspice_client, owner, repo)
2002        setattr(api_object, "_repository", repository)
2003        # For legacy reasons
2004        cls._add_read_property("repo", repository, api_object)
2005        return api_object
@classmethod
def create_issue( cls, allspice_client, repo: Repository, title: str, body: str = ''):
2007    @classmethod
2008    def create_issue(cls, allspice_client, repo: Repository, title: str, body: str = ""):
2009        args = {"owner": repo.owner.username, "repo": repo.name}
2010        data = {"title": title, "body": body}
2011        result = allspice_client.requests_post(Issue.CREATE_ISSUE.format(**args), data=data)
2012        issue = Issue.parse_response(allspice_client, result)
2013        setattr(issue, "_repository", repo)
2014        cls._add_read_property("repo", repo, issue)
2015        return issue
owner: Organization | User
2017    @property
2018    def owner(self) -> Organization | User:
2019        return self.repository.owner
def get_time_sum(self, user: User) -> int:
2021    def get_time_sum(self, user: User) -> int:
2022        results = self.allspice_client.requests_get(
2023            Issue.GET_TIME % (self.owner.username, self.repository.name, self.number)
2024        )
2025        return sum(result["time"] for result in results if result and result["user_id"] == user.id)
def get_times(self) -> Optional[Dict]:
2027    def get_times(self) -> Optional[Dict]:
2028        return self.allspice_client.requests_get(
2029            Issue.GET_TIME % (self.owner.username, self.repository.name, self.number)
2030        )
def delete_time(self, time_id: str):
2032    def delete_time(self, time_id: str):
2033        path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times/{time_id}"
2034        self.allspice_client.requests_delete(path)
def add_time( self, time: int, created: Optional[str] = None, user_name: Optional[User] = None):
2036    def add_time(self, time: int, created: Optional[str] = None, user_name: Optional[User] = None):
2037        path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times"
2038        self.allspice_client.requests_post(
2039            path, data={"created": created, "time": int(time), "user_name": user_name}
2040        )
def get_comments(self) -> List[Comment]:
2042    def get_comments(self) -> List[Comment]:
2043        """https://hub.allspice.io/api/swagger#/issue/issueGetComments"""
2044
2045        results = self.allspice_client.requests_get(
2046            self.GET_COMMENTS.format(
2047                owner=self.owner.username, repo=self.repository.name, index=self.number
2048            )
2049        )
2050
2051        return [Comment.parse_response(self.allspice_client, result) for result in results]

allspice.allspice.io/api/swagger#/issue/issueGetComments">https://huballspice.allspice.io/api/swagger#/issue/issueGetComments

def create_comment(self, body: str) -> Comment:
2053    def create_comment(self, body: str) -> Comment:
2054        """https://hub.allspice.io/api/swagger#/issue/issueCreateComment"""
2055
2056        path = self.GET_COMMENTS.format(
2057            owner=self.owner.username, repo=self.repository.name, index=self.number
2058        )
2059
2060        response = self.allspice_client.requests_post(path, data={"body": body})
2061        return Comment.parse_response(self.allspice_client, response)

allspice.allspice.io/api/swagger#/issue/issueCreateComment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateComment

def get_attachments(self) -> List[allspice.apiobject.Attachment]:
2063    def get_attachments(self) -> List[Attachment]:
2064        """
2065        Fetch all attachments on this issue.
2066
2067        Unlike the assets field, this will always fetch all attachments from the
2068        API.
2069
2070        See https://hub.allspice.io/api/swagger#/issue/issueListIssueAttachments
2071        """
2072
2073        path = self.GET_ATTACHMENTS.format(
2074            owner=self.owner.username, repo=self.repository.name, index=self.number
2075        )
2076        response = self.allspice_client.requests_get(path)
2077
2078        return [Attachment.parse_response(self.allspice_client, result) for result in response]

Fetch all attachments on this issue.

Unlike the assets field, this will always fetch all attachments from the API.

See allspice.allspice.io/api/swagger#/issue/issueListIssueAttachments">https://huballspice.allspice.io/api/swagger#/issue/issueListIssueAttachments

def create_attachment( self, file: <class 'IO'>, name: Optional[str] = None) -> allspice.apiobject.Attachment:
2080    def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment:
2081        """
2082        Create an attachment on this issue.
2083
2084        https://hub.allspice.io/api/swagger#/issue/issueCreateIssueAttachment
2085
2086        :param file: The file to attach. This should be a file-like object.
2087        :param name: The name of the file. If not provided, the name of the file will be
2088                     used.
2089        :return: The created attachment.
2090        """
2091
2092        args: dict[str, Any] = {
2093            "files": {"attachment": file},
2094        }
2095        if name is not None:
2096            args["params"] = {"name": name}
2097
2098        result = self.allspice_client.requests_post(
2099            self.GET_ATTACHMENTS.format(
2100                owner=self.owner.username, repo=self.repository.name, index=self.number
2101            ),
2102            **args,
2103        )
2104
2105        return Attachment.parse_response(self.allspice_client, result)

Create an attachment on this issue.

allspice.allspice.io/api/swagger#/issue/issueCreateIssueAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateIssueAttachment

Parameters
  • file: The file to attach. This should be a file-like object.
  • name: The name of the file. If not provided, the name of the file will be used.
Returns

The created attachment.

class Milestone(allspice.baseapiobject.ApiObject):
1474class Milestone(ApiObject):
1475    allow_merge_commits: Any
1476    allow_rebase: Any
1477    allow_rebase_explicit: Any
1478    allow_squash_merge: Any
1479    archived: Any
1480    closed_at: Any
1481    closed_issues: int
1482    created_at: str
1483    default_branch: Any
1484    description: str
1485    due_on: Any
1486    has_issues: Any
1487    has_pull_requests: Any
1488    has_wiki: Any
1489    id: int
1490    ignore_whitespace_conflicts: Any
1491    name: Any
1492    open_issues: int
1493    private: Any
1494    state: str
1495    title: str
1496    updated_at: str
1497    website: Any
1498
1499    API_OBJECT = """/repos/{owner}/{repo}/milestones/{number}"""  # <owner, repo>
1500
1501    def __init__(self, allspice_client):
1502        super().__init__(allspice_client)
1503
1504    def __eq__(self, other):
1505        if not isinstance(other, Milestone):
1506            return False
1507        return self.allspice_client == other.allspice_client and self.id == other.id
1508
1509    def __hash__(self):
1510        return hash(self.allspice_client) ^ hash(self.id)
1511
1512    _fields_to_parsers: ClassVar[dict] = {
1513        "closed_at": lambda _, t: Util.convert_time(t),
1514        "due_on": lambda _, t: Util.convert_time(t),
1515    }
1516
1517    _patchable_fields: ClassVar[set[str]] = {
1518        "allow_merge_commits",
1519        "allow_rebase",
1520        "allow_rebase_explicit",
1521        "allow_squash_merge",
1522        "archived",
1523        "default_branch",
1524        "description",
1525        "has_issues",
1526        "has_pull_requests",
1527        "has_wiki",
1528        "ignore_whitespace_conflicts",
1529        "name",
1530        "private",
1531        "website",
1532    }
1533
1534    @classmethod
1535    def request(cls, allspice_client, owner: str, repo: str, number: str):
1536        return cls._request(allspice_client, {"owner": owner, "repo": repo, "number": number})
Milestone(allspice_client)
1501    def __init__(self, allspice_client):
1502        super().__init__(allspice_client)
allow_merge_commits: Any
allow_rebase: Any
allow_rebase_explicit: Any
allow_squash_merge: Any
archived: Any
closed_at: Any
closed_issues: int
created_at: str
default_branch: Any
description: str
due_on: Any
has_issues: Any
has_pull_requests: Any
has_wiki: Any
id: int
ignore_whitespace_conflicts: Any
name: Any
open_issues: int
private: Any
state: str
title: str
updated_at: str
website: Any
API_OBJECT = '/repos/{owner}/{repo}/milestones/{number}'
@classmethod
def request(cls, allspice_client, owner: str, repo: str, number: str):
1534    @classmethod
1535    def request(cls, allspice_client, owner: str, repo: str, number: str):
1536        return cls._request(allspice_client, {"owner": owner, "repo": repo, "number": number})
class NotFoundException(builtins.Exception):
6class NotFoundException(Exception):
7    pass

Common base class for all non-exit exceptions.

class Organization(allspice.baseapiobject.ApiObject):
 33class Organization(ApiObject):
 34    """see https://hub.allspice.io/api/swagger#/organization/orgGetAll"""
 35
 36    active: Optional[bool]
 37    avatar_url: str
 38    created: Optional[str]
 39    description: str
 40    email: str
 41    followers_count: Optional[int]
 42    following_count: Optional[int]
 43    full_name: str
 44    html_url: Optional[str]
 45    id: int
 46    is_admin: Optional[bool]
 47    language: Optional[str]
 48    last_login: Optional[str]
 49    location: str
 50    login: Optional[str]
 51    login_name: Optional[str]
 52    name: Optional[str]
 53    prohibit_login: Optional[bool]
 54    repo_admin_change_team_access: Optional[bool]
 55    restricted: Optional[bool]
 56    source_id: Optional[int]
 57    starred_repos_count: Optional[int]
 58    username: str
 59    visibility: str
 60    website: str
 61
 62    API_OBJECT = """/orgs/{name}"""  # <org>
 63    ORG_REPOS_REQUEST = """/orgs/%s/repos"""  # <org>
 64    ORG_TEAMS_REQUEST = """/orgs/%s/teams"""  # <org>
 65    ORG_TEAMS_CREATE = """/orgs/%s/teams"""  # <org>
 66    ORG_GET_MEMBERS = """/orgs/%s/members"""  # <org>
 67    ORG_IS_MEMBER = """/orgs/%s/members/%s"""  # <org>, <username>
 68    ORG_HEATMAP = """/users/%s/heatmap"""  # <username>
 69
 70    def __init__(self, allspice_client):
 71        super().__init__(allspice_client)
 72
 73    def __eq__(self, other):
 74        if not isinstance(other, Organization):
 75            return False
 76        return self.allspice_client == other.allspice_client and self.name == other.name
 77
 78    def __hash__(self):
 79        return hash(self.allspice_client) ^ hash(self.name)
 80
 81    @classmethod
 82    def request(cls, allspice_client, name: str) -> Self:
 83        return cls._request(allspice_client, {"name": name})
 84
 85    @classmethod
 86    def parse_response(cls, allspice_client, result) -> "Organization":
 87        api_object = super().parse_response(allspice_client, result)
 88        # add "name" field to make this behave similar to users for gitea < 1.18
 89        # also necessary for repository-owner when org is repo owner
 90        if not hasattr(api_object, "name"):
 91            Organization._add_read_property("name", result["username"], api_object)
 92        return api_object
 93
 94    _patchable_fields: ClassVar[set[str]] = {
 95        "description",
 96        "full_name",
 97        "location",
 98        "visibility",
 99        "website",
100    }
101
102    def commit(self):
103        args = {"name": self.name}
104        self._commit(args)
105
106    def create_repo(
107        self,
108        repoName: str,
109        description: str = "",
110        private: bool = False,
111        autoInit=True,
112        gitignores: Optional[str] = None,
113        license: Optional[str] = None,
114        readme: str = "Default",
115        issue_labels: Optional[str] = None,
116        default_branch="master",
117    ):
118        """Create an organization Repository
119
120        Throws:
121            AlreadyExistsException: If the Repository exists already.
122            Exception: If something else went wrong.
123        """
124        result = self.allspice_client.requests_post(
125            f"/orgs/{self.name}/repos",
126            data={
127                "name": repoName,
128                "description": description,
129                "private": private,
130                "auto_init": autoInit,
131                "gitignores": gitignores,
132                "license": license,
133                "issue_labels": issue_labels,
134                "readme": readme,
135                "default_branch": default_branch,
136            },
137        )
138        if "id" in result:
139            self.allspice_client.logger.info("Successfully created Repository %s " % result["name"])
140        else:
141            self.allspice_client.logger.error(result["message"])
142            raise Exception("Repository not created... (gitea: %s)" % result["message"])
143        return Repository.parse_response(self.allspice_client, result)
144
145    def get_repositories(self) -> List["Repository"]:
146        results = self.allspice_client.requests_get_paginated(
147            Organization.ORG_REPOS_REQUEST % self.username
148        )
149        return [Repository.parse_response(self.allspice_client, result) for result in results]
150
151    def get_repository(self, name) -> "Repository":
152        repos = self.get_repositories()
153        for repo in repos:
154            if repo.name == name:
155                return repo
156        raise NotFoundException("Repository %s not existent in organization." % name)
157
158    def get_teams(self) -> List["Team"]:
159        results = self.allspice_client.requests_get(Organization.ORG_TEAMS_REQUEST % self.username)
160        teams = [Team.parse_response(self.allspice_client, result) for result in results]
161        # organisation seems to be missing using this request, so we add org manually
162        for t in teams:
163            setattr(t, "_organization", self)
164        return teams
165
166    def get_team(self, name) -> "Team":
167        teams = self.get_teams()
168        for team in teams:
169            if team.name == name:
170                return team
171        raise NotFoundException("Team not existent in organization.")
172
173    def create_team(
174        self,
175        name: str,
176        description: str = "",
177        permission: str = "read",
178        can_create_org_repo: bool = False,
179        includes_all_repositories: bool = False,
180        units=(
181            "repo.code",
182            "repo.issues",
183            "repo.ext_issues",
184            "repo.wiki",
185            "repo.pulls",
186            "repo.releases",
187            "repo.ext_wiki",
188        ),
189        units_map={},
190    ) -> "Team":
191        """Alias for AllSpice#create_team"""
192        # TODO: Move AllSpice#create_team to Organization#create_team and
193        #       deprecate AllSpice#create_team.
194        return self.allspice_client.create_team(
195            org=self,
196            name=name,
197            description=description,
198            permission=permission,
199            can_create_org_repo=can_create_org_repo,
200            includes_all_repositories=includes_all_repositories,
201            units=units,
202            units_map=units_map,
203        )
204
205    def get_members(self) -> List["User"]:
206        results = self.allspice_client.requests_get(Organization.ORG_GET_MEMBERS % self.username)
207        return [User.parse_response(self.allspice_client, result) for result in results]
208
209    def is_member(self, username) -> bool:
210        if isinstance(username, User):
211            username = username.username
212        try:
213            # returns 204 if its ok, 404 if its not
214            self.allspice_client.requests_get(
215                Organization.ORG_IS_MEMBER % (self.username, username)
216            )
217            return True
218        except Exception:
219            return False
220
221    def remove_member(self, user: "User"):
222        path = f"/orgs/{self.username}/members/{user.username}"
223        self.allspice_client.requests_delete(path)
224
225    def delete(self):
226        """Delete this Organization. Invalidates this Objects data. Also deletes all Repositories owned by the User"""
227        for repo in self.get_repositories():
228            repo.delete()
229        self.allspice_client.requests_delete(Organization.API_OBJECT.format(name=self.username))
230        self.deleted = True
231
232    def get_heatmap(self) -> List[Tuple[datetime, int]]:
233        results = self.allspice_client.requests_get(User.USER_HEATMAP % self.username)
234        results = [
235            (datetime.fromtimestamp(result["timestamp"]), result["contributions"])
236            for result in results
237        ]
238        return results

see allspice.allspice.io/api/swagger#/organization/orgGetAll">https://huballspice.allspice.io/api/swagger#/organization/orgGetAll

Organization(allspice_client)
70    def __init__(self, allspice_client):
71        super().__init__(allspice_client)
active: Optional[bool]
avatar_url: str
created: Optional[str]
description: str
email: str
followers_count: Optional[int]
following_count: Optional[int]
full_name: str
html_url: Optional[str]
id: int
is_admin: Optional[bool]
language: Optional[str]
last_login: Optional[str]
location: str
login: Optional[str]
login_name: Optional[str]
name: Optional[str]
prohibit_login: Optional[bool]
repo_admin_change_team_access: Optional[bool]
restricted: Optional[bool]
source_id: Optional[int]
starred_repos_count: Optional[int]
username: str
visibility: str
website: str
API_OBJECT = '/orgs/{name}'
ORG_REPOS_REQUEST = '/orgs/%s/repos'
ORG_TEAMS_REQUEST = '/orgs/%s/teams'
ORG_TEAMS_CREATE = '/orgs/%s/teams'
ORG_GET_MEMBERS = '/orgs/%s/members'
ORG_IS_MEMBER = '/orgs/%s/members/%s'
ORG_HEATMAP = '/users/%s/heatmap'
@classmethod
def request(cls, allspice_client, name: str) -> Self:
81    @classmethod
82    def request(cls, allspice_client, name: str) -> Self:
83        return cls._request(allspice_client, {"name": name})
@classmethod
def parse_response(cls, allspice_client, result) -> Organization:
85    @classmethod
86    def parse_response(cls, allspice_client, result) -> "Organization":
87        api_object = super().parse_response(allspice_client, result)
88        # add "name" field to make this behave similar to users for gitea < 1.18
89        # also necessary for repository-owner when org is repo owner
90        if not hasattr(api_object, "name"):
91            Organization._add_read_property("name", result["username"], api_object)
92        return api_object
def commit(self):
102    def commit(self):
103        args = {"name": self.name}
104        self._commit(args)
def create_repo( self, repoName: str, description: str = '', private: bool = False, autoInit=True, gitignores: Optional[str] = None, license: Optional[str] = None, readme: str = 'Default', issue_labels: Optional[str] = None, default_branch='master'):
106    def create_repo(
107        self,
108        repoName: str,
109        description: str = "",
110        private: bool = False,
111        autoInit=True,
112        gitignores: Optional[str] = None,
113        license: Optional[str] = None,
114        readme: str = "Default",
115        issue_labels: Optional[str] = None,
116        default_branch="master",
117    ):
118        """Create an organization Repository
119
120        Throws:
121            AlreadyExistsException: If the Repository exists already.
122            Exception: If something else went wrong.
123        """
124        result = self.allspice_client.requests_post(
125            f"/orgs/{self.name}/repos",
126            data={
127                "name": repoName,
128                "description": description,
129                "private": private,
130                "auto_init": autoInit,
131                "gitignores": gitignores,
132                "license": license,
133                "issue_labels": issue_labels,
134                "readme": readme,
135                "default_branch": default_branch,
136            },
137        )
138        if "id" in result:
139            self.allspice_client.logger.info("Successfully created Repository %s " % result["name"])
140        else:
141            self.allspice_client.logger.error(result["message"])
142            raise Exception("Repository not created... (gitea: %s)" % result["message"])
143        return Repository.parse_response(self.allspice_client, result)

Create an organization Repository

Throws: AlreadyExistsException: If the Repository exists already. Exception: If something else went wrong.

def get_repositories(self) -> List[Repository]:
145    def get_repositories(self) -> List["Repository"]:
146        results = self.allspice_client.requests_get_paginated(
147            Organization.ORG_REPOS_REQUEST % self.username
148        )
149        return [Repository.parse_response(self.allspice_client, result) for result in results]
def get_repository(self, name) -> Repository:
151    def get_repository(self, name) -> "Repository":
152        repos = self.get_repositories()
153        for repo in repos:
154            if repo.name == name:
155                return repo
156        raise NotFoundException("Repository %s not existent in organization." % name)
def get_teams(self) -> List[Team]:
158    def get_teams(self) -> List["Team"]:
159        results = self.allspice_client.requests_get(Organization.ORG_TEAMS_REQUEST % self.username)
160        teams = [Team.parse_response(self.allspice_client, result) for result in results]
161        # organisation seems to be missing using this request, so we add org manually
162        for t in teams:
163            setattr(t, "_organization", self)
164        return teams
def get_team(self, name) -> Team:
166    def get_team(self, name) -> "Team":
167        teams = self.get_teams()
168        for team in teams:
169            if team.name == name:
170                return team
171        raise NotFoundException("Team not existent in organization.")
def create_team( self, name: str, description: str = '', permission: str = 'read', can_create_org_repo: bool = False, includes_all_repositories: bool = False, units=('repo.code', 'repo.issues', 'repo.ext_issues', 'repo.wiki', 'repo.pulls', 'repo.releases', 'repo.ext_wiki'), units_map={}) -> Team:
173    def create_team(
174        self,
175        name: str,
176        description: str = "",
177        permission: str = "read",
178        can_create_org_repo: bool = False,
179        includes_all_repositories: bool = False,
180        units=(
181            "repo.code",
182            "repo.issues",
183            "repo.ext_issues",
184            "repo.wiki",
185            "repo.pulls",
186            "repo.releases",
187            "repo.ext_wiki",
188        ),
189        units_map={},
190    ) -> "Team":
191        """Alias for AllSpice#create_team"""
192        # TODO: Move AllSpice#create_team to Organization#create_team and
193        #       deprecate AllSpice#create_team.
194        return self.allspice_client.create_team(
195            org=self,
196            name=name,
197            description=description,
198            permission=permission,
199            can_create_org_repo=can_create_org_repo,
200            includes_all_repositories=includes_all_repositories,
201            units=units,
202            units_map=units_map,
203        )

Alias for AllSpice#create_team

def get_members(self) -> List[User]:
205    def get_members(self) -> List["User"]:
206        results = self.allspice_client.requests_get(Organization.ORG_GET_MEMBERS % self.username)
207        return [User.parse_response(self.allspice_client, result) for result in results]
def is_member(self, username) -> bool:
209    def is_member(self, username) -> bool:
210        if isinstance(username, User):
211            username = username.username
212        try:
213            # returns 204 if its ok, 404 if its not
214            self.allspice_client.requests_get(
215                Organization.ORG_IS_MEMBER % (self.username, username)
216            )
217            return True
218        except Exception:
219            return False
def remove_member(self, user: User):
221    def remove_member(self, user: "User"):
222        path = f"/orgs/{self.username}/members/{user.username}"
223        self.allspice_client.requests_delete(path)
def delete(self):
225    def delete(self):
226        """Delete this Organization. Invalidates this Objects data. Also deletes all Repositories owned by the User"""
227        for repo in self.get_repositories():
228            repo.delete()
229        self.allspice_client.requests_delete(Organization.API_OBJECT.format(name=self.username))
230        self.deleted = True

Delete this Organization. Invalidates this Objects data. Also deletes all Repositories owned by the User

def get_heatmap(self) -> List[Tuple[datetime.datetime, int]]:
232    def get_heatmap(self) -> List[Tuple[datetime, int]]:
233        results = self.allspice_client.requests_get(User.USER_HEATMAP % self.username)
234        results = [
235            (datetime.fromtimestamp(result["timestamp"]), result["contributions"])
236            for result in results
237        ]
238        return results
class Release(allspice.baseapiobject.ApiObject):
2383class Release(ApiObject):
2384    """
2385    A release on a repo.
2386    """
2387
2388    assets: List[Union[Any, Dict[str, Union[int, str]], "ReleaseAsset"]]
2389    author: User
2390    body: str
2391    created_at: str
2392    draft: bool
2393    html_url: str
2394    id: int
2395    name: str
2396    prerelease: bool
2397    published_at: str
2398    repo: Optional["Repository"]
2399    repository: Optional["Repository"]
2400    tag_name: str
2401    tarball_url: str
2402    target_commitish: str
2403    upload_url: str
2404    url: str
2405    zipball_url: str
2406
2407    API_OBJECT = "/repos/{owner}/{repo}/releases/{id}"
2408    RELEASE_CREATE_ASSET = "/repos/{owner}/{repo}/releases/{id}/assets"
2409    # Note that we don't strictly need the get_assets route, as the release
2410    # object already contains the assets.
2411
2412    def __init__(self, allspice_client):
2413        super().__init__(allspice_client)
2414
2415    def __eq__(self, other):
2416        if not isinstance(other, Release):
2417            return False
2418        return self.repo == other.repo and self.id == other.id
2419
2420    def __hash__(self):
2421        return hash(self.repo) ^ hash(self.id)
2422
2423    _fields_to_parsers: ClassVar[dict] = {
2424        "author": lambda allspice_client, author: User.parse_response(allspice_client, author),
2425    }
2426    _patchable_fields: ClassVar[set[str]] = {
2427        "body",
2428        "draft",
2429        "name",
2430        "prerelease",
2431        "tag_name",
2432        "target_commitish",
2433    }
2434
2435    @classmethod
2436    def parse_response(cls, allspice_client, result, repo) -> Release:
2437        release = super().parse_response(allspice_client, result)
2438        Release._add_read_property("repository", repo, release)
2439        # For legacy reasons
2440        Release._add_read_property("repo", repo, release)
2441        setattr(
2442            release,
2443            "_assets",
2444            [
2445                ReleaseAsset.parse_response(allspice_client, asset, release)
2446                for asset in result["assets"]
2447            ],
2448        )
2449        return release
2450
2451    @classmethod
2452    def request(
2453        cls,
2454        allspice_client,
2455        owner: str,
2456        repo: str,
2457        id: Optional[int] = None,
2458    ) -> Release:
2459        args = {"owner": owner, "repo": repo, "id": id}
2460        release_response = cls._get_gitea_api_object(allspice_client, args)
2461        repository = Repository.request(allspice_client, owner, repo)
2462        release = cls.parse_response(allspice_client, release_response, repository)
2463        return release
2464
2465    def commit(self):
2466        if self.repo is None:
2467            raise ValueError("Cannot commit a release without a repository.")
2468
2469        args = {"owner": self.repo.owner.username, "repo": self.repo.name, "id": self.id}
2470        self._commit(args)
2471
2472    def create_asset(self, file: IO, name: Optional[str] = None) -> ReleaseAsset:
2473        """
2474        Create an asset for this release.
2475
2476        https://hub.allspice.io/api/swagger#/repository/repoCreateReleaseAsset
2477
2478        :param file: The file to upload. This should be a file-like object.
2479        :param name: The name of the file.
2480        :return: The created asset.
2481        """
2482
2483        if self.repo is None:
2484            raise ValueError("Cannot commit a release without a repository.")
2485
2486        args: dict[str, Any] = {"files": {"attachment": file}}
2487        if name is not None:
2488            args["params"] = {"name": name}
2489
2490        result = self.allspice_client.requests_post(
2491            self.RELEASE_CREATE_ASSET.format(
2492                owner=self.repo.owner.username,
2493                repo=self.repo.name,
2494                id=self.id,
2495            ),
2496            **args,
2497        )
2498        return ReleaseAsset.parse_response(self.allspice_client, result, self)
2499
2500    def delete(self):
2501        if self.repo is None:
2502            raise ValueError("Cannot commit a release without a repository.")
2503
2504        args = {"owner": self.repo.owner.username, "repo": self.repo.name, "id": self.id}
2505        self.allspice_client.requests_delete(self.API_OBJECT.format(**args))
2506        self.deleted = True

A release on a repo.

Release(allspice_client)
2412    def __init__(self, allspice_client):
2413        super().__init__(allspice_client)
assets: List[Union[Any, Dict[str, Union[int, str]], allspice.apiobject.ReleaseAsset]]
author: User
body: str
created_at: str
draft: bool
html_url: str
id: int
name: str
prerelease: bool
published_at: str
repo: Optional[Repository]
repository: Optional[Repository]
tag_name: str
tarball_url: str
target_commitish: str
upload_url: str
url: str
zipball_url: str
API_OBJECT = '/repos/{owner}/{repo}/releases/{id}'
RELEASE_CREATE_ASSET = '/repos/{owner}/{repo}/releases/{id}/assets'
@classmethod
def parse_response(cls, allspice_client, result, repo) -> Release:
2435    @classmethod
2436    def parse_response(cls, allspice_client, result, repo) -> Release:
2437        release = super().parse_response(allspice_client, result)
2438        Release._add_read_property("repository", repo, release)
2439        # For legacy reasons
2440        Release._add_read_property("repo", repo, release)
2441        setattr(
2442            release,
2443            "_assets",
2444            [
2445                ReleaseAsset.parse_response(allspice_client, asset, release)
2446                for asset in result["assets"]
2447            ],
2448        )
2449        return release
@classmethod
def request( cls, allspice_client, owner: str, repo: str, id: Optional[int] = None) -> Release:
2451    @classmethod
2452    def request(
2453        cls,
2454        allspice_client,
2455        owner: str,
2456        repo: str,
2457        id: Optional[int] = None,
2458    ) -> Release:
2459        args = {"owner": owner, "repo": repo, "id": id}
2460        release_response = cls._get_gitea_api_object(allspice_client, args)
2461        repository = Repository.request(allspice_client, owner, repo)
2462        release = cls.parse_response(allspice_client, release_response, repository)
2463        return release
def commit(self):
2465    def commit(self):
2466        if self.repo is None:
2467            raise ValueError("Cannot commit a release without a repository.")
2468
2469        args = {"owner": self.repo.owner.username, "repo": self.repo.name, "id": self.id}
2470        self._commit(args)
def create_asset( self, file: <class 'IO'>, name: Optional[str] = None) -> allspice.apiobject.ReleaseAsset:
2472    def create_asset(self, file: IO, name: Optional[str] = None) -> ReleaseAsset:
2473        """
2474        Create an asset for this release.
2475
2476        https://hub.allspice.io/api/swagger#/repository/repoCreateReleaseAsset
2477
2478        :param file: The file to upload. This should be a file-like object.
2479        :param name: The name of the file.
2480        :return: The created asset.
2481        """
2482
2483        if self.repo is None:
2484            raise ValueError("Cannot commit a release without a repository.")
2485
2486        args: dict[str, Any] = {"files": {"attachment": file}}
2487        if name is not None:
2488            args["params"] = {"name": name}
2489
2490        result = self.allspice_client.requests_post(
2491            self.RELEASE_CREATE_ASSET.format(
2492                owner=self.repo.owner.username,
2493                repo=self.repo.name,
2494                id=self.id,
2495            ),
2496            **args,
2497        )
2498        return ReleaseAsset.parse_response(self.allspice_client, result, self)

Create an asset for this release.

allspice.allspice.io/api/swagger#/repository/repoCreateReleaseAsset">https://huballspice.allspice.io/api/swagger#/repository/repoCreateReleaseAsset

Parameters
  • file: The file to upload. This should be a file-like object.
  • name: The name of the file.
Returns

The created asset.

def delete(self):
2500    def delete(self):
2501        if self.repo is None:
2502            raise ValueError("Cannot commit a release without a repository.")
2503
2504        args = {"owner": self.repo.owner.username, "repo": self.repo.name, "id": self.id}
2505        self.allspice_client.requests_delete(self.API_OBJECT.format(**args))
2506        self.deleted = True
class Repository(allspice.baseapiobject.ApiObject):
 477class Repository(ApiObject):
 478    allow_fast_forward_only_merge: bool
 479    allow_manual_merge: Any
 480    allow_merge_commits: bool
 481    allow_rebase: bool
 482    allow_rebase_explicit: bool
 483    allow_rebase_update: bool
 484    allow_squash_merge: bool
 485    archived: bool
 486    archived_at: str
 487    autodetect_manual_merge: Any
 488    avatar_url: str
 489    clone_url: str
 490    created_at: str
 491    default_allow_maintainer_edit: bool
 492    default_branch: str
 493    default_delete_branch_after_merge: bool
 494    default_merge_style: str
 495    description: str
 496    empty: bool
 497    enable_prune: Any
 498    external_tracker: Any
 499    external_wiki: Any
 500    fork: bool
 501    forks_count: int
 502    full_name: str
 503    has_actions: bool
 504    has_issues: bool
 505    has_packages: bool
 506    has_projects: bool
 507    has_pull_requests: bool
 508    has_releases: bool
 509    has_wiki: bool
 510    html_url: str
 511    id: int
 512    ignore_whitespace_conflicts: bool
 513    internal: bool
 514    internal_tracker: Dict[str, bool]
 515    language: str
 516    languages_url: str
 517    link: str
 518    mirror: bool
 519    mirror_interval: str
 520    mirror_updated: str
 521    name: str
 522    object_format_name: str
 523    open_issues_count: int
 524    open_pr_counter: int
 525    original_url: str
 526    owner: Union["User", "Organization"]
 527    parent: Any
 528    permissions: Dict[str, bool]
 529    private: bool
 530    projects_mode: str
 531    release_counter: int
 532    repo_transfer: Any
 533    size: int
 534    ssh_url: str
 535    stars_count: int
 536    template: bool
 537    updated_at: datetime
 538    url: str
 539    watchers_count: int
 540    website: str
 541
 542    API_OBJECT = """/repos/{owner}/{name}"""  # <owner>, <reponame>
 543    REPO_IS_COLLABORATOR = """/repos/%s/%s/collaborators/%s"""  # <owner>, <reponame>, <username>
 544    REPO_SEARCH = """/repos/search/"""
 545    REPO_BRANCHES = """/repos/%s/%s/branches"""  # <owner>, <reponame>
 546    REPO_BRANCH = """/repos/{owner}/{repo}/branches/{branch}"""
 547    REPO_ISSUES = """/repos/{owner}/{repo}/issues"""  # <owner, reponame>
 548    REPO_DESIGN_REVIEWS = """/repos/{owner}/{repo}/pulls"""
 549    REPO_DELETE = """/repos/%s/%s"""  # <owner>, <reponame>
 550    REPO_TIMES = """/repos/%s/%s/times"""  # <owner>, <reponame>
 551    REPO_USER_TIME = """/repos/%s/%s/times/%s"""  # <owner>, <reponame>, <username>
 552    REPO_COMMITS = "/repos/%s/%s/commits"  # <owner>, <reponame>
 553    REPO_TRANSFER = "/repos/{owner}/{repo}/transfer"
 554    REPO_MILESTONES = """/repos/{owner}/{repo}/milestones"""
 555    REPO_GET_ARCHIVE = "/repos/{owner}/{repo}/archive/{ref}.{format}"
 556    REPO_GET_ALLSPICE_JSON = "/repos/{owner}/{repo}/allspice_generated/json/{content}"
 557    REPO_GET_ALLSPICE_SVG = "/repos/{owner}/{repo}/allspice_generated/svg/{content}"
 558    REPO_GET_ALLSPICE_PROJECT = "/repos/{owner}/{repo}/allspice_generated/project/{content}"
 559    REPO_GET_TOPICS = "/repos/{owner}/{repo}/topics"
 560    REPO_ADD_TOPIC = "/repos/{owner}/{repo}/topics/{topic}"
 561    REPO_GET_RELEASES = "/repos/{owner}/{repo}/releases"
 562    REPO_GET_LATEST_RELEASE = "/repos/{owner}/{repo}/releases/latest"
 563    REPO_GET_RELEASE_BY_TAG = "/repos/{owner}/{repo}/releases/tags/{tag}"
 564    REPO_GET_COMMIT_STATUS = "/repos/{owner}/{repo}/statuses/{sha}"
 565    REPO_GET_RAW_FILE = "/repos/{owner}/{repo}/raw/{path}"
 566    REPO_GET_TREE = "/repos/{owner}/{repo}/git/trees/{ref}"
 567
 568    class ArchiveFormat(Enum):
 569        """
 570        Archive formats for Repository.get_archive
 571        """
 572
 573        TAR = "tar.gz"
 574        ZIP = "zip"
 575
 576    class CommitStatusSort(Enum):
 577        """
 578        Sort order for Repository.get_commit_status
 579        """
 580
 581        OLDEST = "oldest"
 582        RECENT_UPDATE = "recentupdate"
 583        LEAST_UPDATE = "leastupdate"
 584        LEAST_INDEX = "leastindex"
 585        HIGHEST_INDEX = "highestindex"
 586
 587    def __init__(self, allspice_client):
 588        super().__init__(allspice_client)
 589
 590    def __eq__(self, other):
 591        if not isinstance(other, Repository):
 592            return False
 593        return self.owner == other.owner and self.name == other.name
 594
 595    def __hash__(self):
 596        return hash(self.owner) ^ hash(self.name)
 597
 598    _fields_to_parsers: ClassVar[dict] = {
 599        # dont know how to tell apart user and org as owner except form email being empty.
 600        "owner": lambda allspice_client, r: (
 601            Organization.parse_response(allspice_client, r)
 602            if r["email"] == ""
 603            else User.parse_response(allspice_client, r)
 604        ),
 605        "updated_at": lambda _, t: Util.convert_time(t),
 606    }
 607
 608    @classmethod
 609    def request(
 610        cls,
 611        allspice_client,
 612        owner: str,
 613        name: str,
 614    ) -> Repository:
 615        return cls._request(allspice_client, {"owner": owner, "name": name})
 616
 617    @classmethod
 618    def search(
 619        cls,
 620        allspice_client,
 621        query: Optional[str] = None,
 622        topic: bool = False,
 623        include_description: bool = False,
 624        user: Optional[User] = None,
 625        owner_to_prioritize: Union[User, Organization, None] = None,
 626    ) -> list[Repository]:
 627        """
 628        Search for repositories.
 629
 630        See https://hub.allspice.io/api/swagger#/repository/repoSearch
 631
 632        :param query: The query string to search for
 633        :param topic: If true, the query string will only be matched against the
 634            repository's topic.
 635        :param include_description: If true, the query string will be matched
 636            against the repository's description as well.
 637        :param user: If specified, only repositories that this user owns or
 638            contributes to will be searched.
 639        :param owner_to_prioritize: If specified, repositories owned by the
 640            given entity will be prioritized in the search.
 641        :returns: All repositories matching the query. If there are many
 642            repositories matching this query, this may take some time.
 643        """
 644
 645        params = {}
 646
 647        if query is not None:
 648            params["q"] = query
 649        if topic:
 650            params["topic"] = topic
 651        if include_description:
 652            params["include_description"] = include_description
 653        if user is not None:
 654            params["user"] = user.id
 655        if owner_to_prioritize is not None:
 656            params["owner_to_prioritize"] = owner_to_prioritize.id
 657
 658        responses = allspice_client.requests_get_paginated(cls.REPO_SEARCH, params=params)
 659
 660        return [Repository.parse_response(allspice_client, response) for response in responses]
 661
 662    _patchable_fields: ClassVar[set[str]] = {
 663        "allow_manual_merge",
 664        "allow_merge_commits",
 665        "allow_rebase",
 666        "allow_rebase_explicit",
 667        "allow_rebase_update",
 668        "allow_squash_merge",
 669        "archived",
 670        "autodetect_manual_merge",
 671        "default_branch",
 672        "default_delete_branch_after_merge",
 673        "default_merge_style",
 674        "description",
 675        "enable_prune",
 676        "external_tracker",
 677        "external_wiki",
 678        "has_actions",
 679        "has_issues",
 680        "has_projects",
 681        "has_pull_requests",
 682        "has_wiki",
 683        "ignore_whitespace_conflicts",
 684        "internal_tracker",
 685        "mirror_interval",
 686        "name",
 687        "private",
 688        "template",
 689        "website",
 690    }
 691
 692    def commit(self):
 693        args = {"owner": self.owner.username, "name": self.name}
 694        self._commit(args)
 695
 696    def get_branches(self) -> List["Branch"]:
 697        """Get all the Branches of this Repository."""
 698
 699        results = self.allspice_client.requests_get_paginated(
 700            Repository.REPO_BRANCHES % (self.owner.username, self.name)
 701        )
 702        return [Branch.parse_response(self.allspice_client, result) for result in results]
 703
 704    def get_branch(self, name: str) -> "Branch":
 705        """Get a specific Branch of this Repository."""
 706        result = self.allspice_client.requests_get(
 707            Repository.REPO_BRANCH.format(owner=self.owner.username, repo=self.name, branch=name)
 708        )
 709        return Branch.parse_response(self.allspice_client, result)
 710
 711    def add_branch(self, create_from: Ref, newname: str) -> "Branch":
 712        """Add a branch to the repository"""
 713        # Note: will only work with gitea 1.13 or higher!
 714
 715        ref_name = Util.data_params_for_ref(create_from)
 716        if "ref" not in ref_name:
 717            raise ValueError("create_from must be a Branch, Commit or string")
 718        ref_name = ref_name["ref"]
 719
 720        data = {"new_branch_name": newname, "old_ref_name": ref_name}
 721        result = self.allspice_client.requests_post(
 722            Repository.REPO_BRANCHES % (self.owner.username, self.name), data=data
 723        )
 724        return Branch.parse_response(self.allspice_client, result)
 725
 726    def get_issues(
 727        self,
 728        state: Literal["open", "closed", "all"] = "all",
 729        search_query: Optional[str] = None,
 730        labels: Optional[List[str]] = None,
 731        milestones: Optional[List[Union[Milestone, str]]] = None,
 732        assignee: Optional[Union[User, str]] = None,
 733        since: Optional[datetime] = None,
 734        before: Optional[datetime] = None,
 735    ) -> List["Issue"]:
 736        """
 737        Get all Issues of this Repository (open and closed)
 738
 739        https://hub.allspice.io/api/swagger#/repository/repoListIssues
 740
 741        All params of this method are optional filters. If you don't specify a filter, it
 742        will not be applied.
 743
 744        :param state: The state of the Issues to get. If None, all Issues are returned.
 745        :param search_query: Filter issues by text. This is equivalent to searching for
 746                             `search_query` in the Issues on the web interface.
 747        :param labels: Filter issues by labels.
 748        :param milestones: Filter issues by milestones.
 749        :param assignee: Filter issues by the assigned user.
 750        :param since: Filter issues by the date they were created.
 751        :param before: Filter issues by the date they were created.
 752        :return: A list of Issues.
 753        """
 754
 755        data = {
 756            "state": state,
 757        }
 758        if search_query:
 759            data["q"] = search_query
 760        if labels:
 761            data["labels"] = ",".join(labels)
 762        if milestones:
 763            data["milestone"] = ",".join(
 764                [
 765                    milestone.name if isinstance(milestone, Milestone) else milestone
 766                    for milestone in milestones
 767                ]
 768            )
 769        if assignee:
 770            if isinstance(assignee, User):
 771                data["assignee"] = assignee.username
 772            else:
 773                data["assignee"] = assignee
 774        if since:
 775            data["since"] = Util.format_time(since)
 776        if before:
 777            data["before"] = Util.format_time(before)
 778
 779        results = self.allspice_client.requests_get_paginated(
 780            Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name),
 781            params=data,
 782        )
 783
 784        issues = []
 785        for result in results:
 786            issue = Issue.parse_response(self.allspice_client, result)
 787            # See Issue.request
 788            setattr(issue, "_repository", self)
 789            # This is mostly for compatibility with an older implementation
 790            Issue._add_read_property("repo", self, issue)
 791            issues.append(issue)
 792
 793        return issues
 794
 795    def get_design_reviews(
 796        self,
 797        state: Literal["open", "closed", "all"] = "all",
 798        milestone: Optional[Union[Milestone, str]] = None,
 799        labels: Optional[List[str]] = None,
 800    ) -> List["DesignReview"]:
 801        """
 802        Get all Design Reviews of this Repository.
 803
 804        https://hub.allspice.io/api/swagger#/repository/repoListPullRequests
 805
 806        :param state: The state of the Design Reviews to get. If None, all Design Reviews
 807                      are returned.
 808        :param milestone: The milestone of the Design Reviews to get.
 809        :param labels: A list of label IDs to filter DRs by.
 810        :return: A list of Design Reviews.
 811        """
 812
 813        params = {
 814            "state": state,
 815        }
 816        if milestone:
 817            if isinstance(milestone, Milestone):
 818                params["milestone"] = milestone.name
 819            else:
 820                params["milestone"] = milestone
 821        if labels:
 822            params["labels"] = ",".join(labels)
 823
 824        results = self.allspice_client.requests_get_paginated(
 825            self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name),
 826            params=params,
 827        )
 828        return [DesignReview.parse_response(self.allspice_client, result) for result in results]
 829
 830    def get_commits(
 831        self,
 832        sha: Optional[str] = None,
 833        path: Optional[str] = None,
 834        stat: bool = True,
 835    ) -> List["Commit"]:
 836        """
 837        Get all the Commits of this Repository.
 838
 839        https://hub.allspice.io/api/swagger#/repository/repoGetAllCommits
 840
 841        :param sha: The SHA of the commit to start listing commits from.
 842        :param path: filepath of a file/dir.
 843        :param stat: Include the number of additions and deletions in the response.
 844                     Disable for speedup.
 845        :return: A list of Commits.
 846        """
 847
 848        data = {}
 849        if sha:
 850            data["sha"] = sha
 851        if path:
 852            data["path"] = path
 853        if not stat:
 854            data["stat"] = False
 855
 856        try:
 857            results = self.allspice_client.requests_get_paginated(
 858                Repository.REPO_COMMITS % (self.owner.username, self.name),
 859                params=data,
 860            )
 861        except ConflictException as err:
 862            logging.warning(err)
 863            logging.warning("Repository %s/%s is Empty" % (self.owner.username, self.name))
 864            results = []
 865        return [Commit.parse_response(self.allspice_client, result) for result in results]
 866
 867    def get_issues_state(self, state) -> List["Issue"]:
 868        """
 869        DEPRECATED: Use get_issues() instead.
 870
 871        Get issues of state Issue.open or Issue.closed of a repository.
 872        """
 873
 874        assert state in [Issue.OPENED, Issue.CLOSED]
 875        issues = []
 876        data = {"state": state}
 877        results = self.allspice_client.requests_get_paginated(
 878            Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name),
 879            params=data,
 880        )
 881        for result in results:
 882            issue = Issue.parse_response(self.allspice_client, result)
 883            # adding data not contained in the issue response
 884            # See Issue.request()
 885            setattr(issue, "_repository", self)
 886            Issue._add_read_property("repo", self, issue)
 887            Issue._add_read_property("owner", self.owner, issue)
 888            issues.append(issue)
 889        return issues
 890
 891    def get_times(self):
 892        results = self.allspice_client.requests_get(
 893            Repository.REPO_TIMES % (self.owner.username, self.name)
 894        )
 895        return results
 896
 897    def get_user_time(self, username) -> float:
 898        if isinstance(username, User):
 899            username = username.username
 900        results = self.allspice_client.requests_get(
 901            Repository.REPO_USER_TIME % (self.owner.username, self.name, username)
 902        )
 903        time = sum(r["time"] for r in results)
 904        return time
 905
 906    def get_full_name(self) -> str:
 907        return self.owner.username + "/" + self.name
 908
 909    def create_issue(self, title, assignees=frozenset(), description="") -> ApiObject:
 910        data = {
 911            "assignees": assignees,
 912            "body": description,
 913            "closed": False,
 914            "title": title,
 915        }
 916        result = self.allspice_client.requests_post(
 917            Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name),
 918            data=data,
 919        )
 920
 921        issue = Issue.parse_response(self.allspice_client, result)
 922        setattr(issue, "_repository", self)
 923        Issue._add_read_property("repo", self, issue)
 924        return issue
 925
 926    def create_design_review(
 927        self,
 928        title: str,
 929        head: Union[Branch, str],
 930        base: Union[Branch, str],
 931        assignees: Optional[Set[Union[User, str]]] = None,
 932        body: Optional[str] = None,
 933        due_date: Optional[datetime] = None,
 934        milestone: Optional["Milestone"] = None,
 935    ) -> "DesignReview":
 936        """
 937        Create a new Design Review.
 938
 939        See https://hub.allspice.io/api/swagger#/repository/repoCreatePullRequest
 940
 941        :param title: Title of the Design Review
 942        :param head: Branch or name of the branch to merge into the base branch
 943        :param base: Branch or name of the branch to merge into
 944        :param assignees: Optional. A list of users to assign this review. List can be of
 945                          User objects or of usernames.
 946        :param body: An Optional Description for the Design Review.
 947        :param due_date: An Optional Due date for the Design Review.
 948        :param milestone: An Optional Milestone for the Design Review
 949        :return: The created Design Review
 950        """
 951
 952        data: dict[str, Any] = {
 953            "title": title,
 954        }
 955
 956        if isinstance(head, Branch):
 957            data["head"] = head.name
 958        else:
 959            data["head"] = head
 960        if isinstance(base, Branch):
 961            data["base"] = base.name
 962        else:
 963            data["base"] = base
 964        if assignees:
 965            data["assignees"] = [a.username if isinstance(a, User) else a for a in assignees]
 966        if body:
 967            data["body"] = body
 968        if due_date:
 969            data["due_date"] = Util.format_time(due_date)
 970        if milestone:
 971            data["milestone"] = milestone.id
 972
 973        result = self.allspice_client.requests_post(
 974            self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name),
 975            data=data,
 976        )
 977
 978        return DesignReview.parse_response(self.allspice_client, result)
 979
 980    def create_milestone(
 981        self,
 982        title: str,
 983        description: str,
 984        due_date: Optional[str] = None,
 985        state: str = "open",
 986    ) -> "Milestone":
 987        url = Repository.REPO_MILESTONES.format(owner=self.owner.username, repo=self.name)
 988        data = {"title": title, "description": description, "state": state}
 989        if due_date:
 990            data["due_date"] = due_date
 991        result = self.allspice_client.requests_post(url, data=data)
 992        return Milestone.parse_response(self.allspice_client, result)
 993
 994    def create_gitea_hook(self, hook_url: str, events: List[str]):
 995        url = f"/repos/{self.owner.username}/{self.name}/hooks"
 996        data = {
 997            "type": "gitea",
 998            "config": {"content_type": "json", "url": hook_url},
 999            "events": events,
1000            "active": True,
1001        }
1002        return self.allspice_client.requests_post(url, data=data)
1003
1004    def list_hooks(self):
1005        url = f"/repos/{self.owner.username}/{self.name}/hooks"
1006        return self.allspice_client.requests_get(url)
1007
1008    def delete_hook(self, id: str):
1009        url = f"/repos/{self.owner.username}/{self.name}/hooks/{id}"
1010        self.allspice_client.requests_delete(url)
1011
1012    def is_collaborator(self, username) -> bool:
1013        if isinstance(username, User):
1014            username = username.username
1015        try:
1016            # returns 204 if its ok, 404 if its not
1017            self.allspice_client.requests_get(
1018                Repository.REPO_IS_COLLABORATOR % (self.owner.username, self.name, username)
1019            )
1020            return True
1021        except Exception:
1022            return False
1023
1024    def get_users_with_access(self) -> Sequence[User]:
1025        url = f"/repos/{self.owner.username}/{self.name}/collaborators"
1026        response = self.allspice_client.requests_get(url)
1027        collabs = [User.parse_response(self.allspice_client, user) for user in response]
1028        if isinstance(self.owner, User):
1029            return [*collabs, self.owner]
1030        else:
1031            # owner must be org
1032            teams = self.owner.get_teams()
1033            for team in teams:
1034                team_repos = team.get_repos()
1035                if self.name in [n.name for n in team_repos]:
1036                    collabs += team.get_members()
1037            return collabs
1038
1039    def remove_collaborator(self, user_name: str):
1040        url = f"/repos/{self.owner.username}/{self.name}/collaborators/{user_name}"
1041        self.allspice_client.requests_delete(url)
1042
1043    def transfer_ownership(
1044        self,
1045        new_owner: Union[User, Organization],
1046        new_teams: Set[Team] | FrozenSet[Team] = frozenset(),
1047    ):
1048        url = Repository.REPO_TRANSFER.format(owner=self.owner.username, repo=self.name)
1049        data: dict[str, Any] = {"new_owner": new_owner.username}
1050        if isinstance(new_owner, Organization):
1051            new_team_ids = [team.id for team in new_teams if team in new_owner.get_teams()]
1052            data["team_ids"] = new_team_ids
1053        self.allspice_client.requests_post(url, data=data)
1054        # TODO: make sure this instance is either updated or discarded
1055
1056    def get_git_content(
1057        self,
1058        ref: Optional["Ref"] = None,
1059        commit: "Optional[Commit]" = None,
1060    ) -> List[Content]:
1061        """
1062        Get the metadata for all files in the root directory.
1063
1064        https://hub.allspice.io/api/swagger#/repository/repoGetContentsList
1065
1066        :param ref: branch or commit to get content from
1067        :param commit: commit to get content from (deprecated)
1068        """
1069        url = f"/repos/{self.owner.username}/{self.name}/contents"
1070        data = Util.data_params_for_ref(ref or commit)
1071
1072        result = [
1073            Content.parse_response(self.allspice_client, f)
1074            for f in self.allspice_client.requests_get(url, data)
1075        ]
1076        return result
1077
1078    def get_tree(self, ref: Optional[Ref] = None, recursive: bool = False) -> List[GitEntry]:
1079        """
1080        Get the repository's tree on a given ref.
1081
1082        By default, this will only return the top-level entries in the tree. If you want
1083        to get the entire tree, set `recursive` to True.
1084
1085        :param ref: The ref to get the tree from. If not provided, the default branch is used.
1086        :param recursive: Whether to get the entire tree or just the top-level entries.
1087        """
1088
1089        ref = Util.data_params_for_ref(ref).get("ref", self.default_branch)
1090        url = self.REPO_GET_TREE.format(owner=self.owner.username, repo=self.name, ref=ref)
1091        params = {"recursive": recursive}
1092        results = self.allspice_client.requests_get_paginated(url, params=params)
1093        return [GitEntry.parse_response(self.allspice_client, result) for result in results]
1094
1095    def get_file_content(
1096        self,
1097        content: Content,
1098        ref: Optional[Ref] = None,
1099        commit: Optional[Commit] = None,
1100    ) -> Union[str, List["Content"]]:
1101        """https://hub.allspice.io/api/swagger#/repository/repoGetContents"""
1102        url = f"/repos/{self.owner.username}/{self.name}/contents/{content.path}"
1103        data = Util.data_params_for_ref(ref or commit)
1104
1105        if content.type == Content.FILE:
1106            return self.allspice_client.requests_get(url, data)["content"]
1107        else:
1108            return [
1109                Content.parse_response(self.allspice_client, f)
1110                for f in self.allspice_client.requests_get(url, data)
1111            ]
1112
1113    def get_raw_file(
1114        self,
1115        file_path: str,
1116        ref: Optional[Ref] = None,
1117    ) -> bytes:
1118        """
1119        Get the raw, binary data of a single file.
1120
1121        Note 1: if the file you are requesting is a text file, you might want to
1122        use .decode() on the result to get a string. For example:
1123
1124            content = repo.get_raw_file("file.txt").decode("utf-8")
1125
1126        Note 2: this method will store the entire file in memory. If you want
1127        to download a large file, you might want to use `download_to_file`
1128        instead.
1129
1130        See https://hub.allspice.io/api/swagger#/repository/repoGetRawFile
1131
1132        :param file_path: The path to the file to get.
1133        :param ref: The branch or commit to get the file from.  If not provided,
1134            the default branch is used.
1135        """
1136
1137        url = self.REPO_GET_RAW_FILE.format(
1138            owner=self.owner.username,
1139            repo=self.name,
1140            path=file_path,
1141        )
1142        params = Util.data_params_for_ref(ref)
1143        return self.allspice_client.requests_get_raw(url, params=params)
1144
1145    def download_to_file(
1146        self,
1147        file_path: str,
1148        io: IO,
1149        ref: Optional[Ref] = None,
1150    ) -> None:
1151        """
1152        Download the binary data of a file to a file-like object.
1153
1154        Example:
1155
1156            with open("schematic.DSN", "wb") as f:
1157                Repository.download_to_file("Schematics/my_schematic.DSN", f)
1158
1159        :param file_path: The path to the file in the repository from the root
1160            of the repository.
1161        :param io: The file-like object to write the data to.
1162        """
1163
1164        url = self.allspice_client._AllSpice__get_url(
1165            self.REPO_GET_RAW_FILE.format(
1166                owner=self.owner.username,
1167                repo=self.name,
1168                path=file_path,
1169            )
1170        )
1171        params = Util.data_params_for_ref(ref)
1172        response = self.allspice_client.requests.get(
1173            url,
1174            params=params,
1175            headers=self.allspice_client.headers,
1176            stream=True,
1177        )
1178
1179        for chunk in response.iter_content(chunk_size=4096):
1180            if chunk:
1181                io.write(chunk)
1182
1183    def get_generated_json(self, content: Union[Content, str], ref: Optional[Ref] = None) -> dict:
1184        """
1185        Get the json blob for a cad file if it exists, otherwise enqueue
1186        a new job and return a 503 status.
1187
1188        WARNING: This is still experimental and not recommended for critical
1189        applications. The structure and content of the returned dictionary can
1190        change at any time.
1191
1192        See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON
1193        """
1194
1195        if isinstance(content, Content):
1196            content = content.path
1197
1198        url = self.REPO_GET_ALLSPICE_JSON.format(
1199            owner=self.owner.username,
1200            repo=self.name,
1201            content=content,
1202        )
1203        data = Util.data_params_for_ref(ref)
1204        return self.allspice_client.requests_get(url, data)
1205
1206    def get_generated_svg(self, content: Union[Content, str], ref: Optional[Ref] = None) -> bytes:
1207        """
1208        Get the svg blob for a cad file if it exists, otherwise enqueue
1209        a new job and return a 503 status.
1210
1211        WARNING: This is still experimental and not yet recommended for
1212        critical applications. The content of the returned svg can change
1213        at any time.
1214
1215        See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG
1216        """
1217
1218        if isinstance(content, Content):
1219            content = content.path
1220
1221        url = self.REPO_GET_ALLSPICE_SVG.format(
1222            owner=self.owner.username,
1223            repo=self.name,
1224            content=content,
1225        )
1226        data = Util.data_params_for_ref(ref)
1227        return self.allspice_client.requests_get_raw(url, data)
1228
1229    def get_generated_projectdata(
1230        self, content: Union[Content, str], ref: Optional[Ref] = None
1231    ) -> dict:
1232        """
1233        Get the json project data based on the cad file provided
1234
1235        WARNING: This is still experimental and not yet recommended for
1236        critical applications. The content of the returned dictionary can change
1237        at any time.
1238
1239        See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceProject
1240        """
1241        if isinstance(content, Content):
1242            content = content.path
1243
1244        url = self.REPO_GET_ALLSPICE_PROJECT.format(
1245            owner=self.owner.username,
1246            repo=self.name,
1247            content=content,
1248        )
1249        data = Util.data_params_for_ref(ref)
1250        return self.allspice_client.requests_get(url, data)
1251
1252    def create_file(self, file_path: str, content: str, data: Optional[dict] = None):
1253        """https://hub.allspice.io/api/swagger#/repository/repoCreateFile"""
1254        if not data:
1255            data = {}
1256        url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}"
1257        data.update({"content": content})
1258        return self.allspice_client.requests_post(url, data)
1259
1260    def change_file(self, file_path: str, file_sha: str, content: str, data: Optional[dict] = None):
1261        """https://hub.allspice.io/api/swagger#/repository/repoCreateFile"""
1262        if not data:
1263            data = {}
1264        url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}"
1265        data.update({"sha": file_sha, "content": content})
1266        return self.allspice_client.requests_put(url, data)
1267
1268    def delete_file(self, file_path: str, file_sha: str, data: Optional[dict] = None):
1269        """https://hub.allspice.io/api/swagger#/repository/repoDeleteFile"""
1270        if not data:
1271            data = {}
1272        url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}"
1273        data.update({"sha": file_sha})
1274        return self.allspice_client.requests_delete(url, data)
1275
1276    def get_archive(
1277        self,
1278        ref: Ref = "main",
1279        archive_format: ArchiveFormat = ArchiveFormat.ZIP,
1280    ) -> bytes:
1281        """
1282        Download all the files in a specific ref of a repository as a zip or tarball
1283        archive.
1284
1285        https://hub.allspice.io/api/swagger#/repository/repoGetArchive
1286
1287        :param ref: branch or commit to get content from, defaults to the "main" branch
1288        :param archive_format: zip or tar, defaults to zip
1289        """
1290
1291        ref_string = Util.data_params_for_ref(ref)["ref"]
1292        url = self.REPO_GET_ARCHIVE.format(
1293            owner=self.owner.username,
1294            repo=self.name,
1295            ref=ref_string,
1296            format=archive_format.value,
1297        )
1298        return self.allspice_client.requests_get_raw(url)
1299
1300    def get_topics(self) -> list[str]:
1301        """
1302        Gets the list of topics on this repository.
1303
1304        See http://localhost:3000/api/swagger#/repository/repoListTopics
1305        """
1306
1307        url = self.REPO_GET_TOPICS.format(
1308            owner=self.owner.username,
1309            repo=self.name,
1310        )
1311        return self.allspice_client.requests_get(url)["topics"]
1312
1313    def add_topic(self, topic: str):
1314        """
1315        Adds a topic to the repository.
1316
1317        See https://hub.allspice.io/api/swagger#/repository/repoAddTopic
1318
1319        :param topic: The topic to add. Topic names must consist only of
1320            lowercase letters, numnbers and dashes (-), and cannot start with
1321            dashes. Topic names also must be under 35 characters long.
1322        """
1323
1324        url = self.REPO_ADD_TOPIC.format(owner=self.owner.username, repo=self.name, topic=topic)
1325        self.allspice_client.requests_put(url)
1326
1327    def create_release(
1328        self,
1329        tag_name: str,
1330        name: Optional[str] = None,
1331        body: Optional[str] = None,
1332        draft: bool = False,
1333    ):
1334        """
1335        Create a release for this repository. The release will be created for
1336        the tag with the given name. If there is no tag with this name, create
1337        the tag first.
1338
1339        See https://hub.allspice.io/api/swagger#/repository/repoCreateRelease
1340        """
1341
1342        url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name)
1343        data = {
1344            "tag_name": tag_name,
1345            "draft": draft,
1346        }
1347        if name is not None:
1348            data["name"] = name
1349        if body is not None:
1350            data["body"] = body
1351        response = self.allspice_client.requests_post(url, data)
1352        return Release.parse_response(self.allspice_client, response, self)
1353
1354    def get_releases(
1355        self, draft: Optional[bool] = None, pre_release: Optional[bool] = None
1356    ) -> List[Release]:
1357        """
1358        Get the list of releases for this repository.
1359
1360        See https://hub.allspice.io/api/swagger#/repository/repoListReleases
1361        """
1362
1363        data = {}
1364
1365        if draft is not None:
1366            data["draft"] = draft
1367        if pre_release is not None:
1368            data["pre-release"] = pre_release
1369
1370        url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name)
1371        responses = self.allspice_client.requests_get_paginated(url, params=data)
1372
1373        return [
1374            Release.parse_response(self.allspice_client, response, self) for response in responses
1375        ]
1376
1377    def get_latest_release(self) -> Release:
1378        """
1379        Get the latest release for this repository.
1380
1381        See https://hub.allspice.io/api/swagger#/repository/repoGetLatestRelease
1382        """
1383
1384        url = self.REPO_GET_LATEST_RELEASE.format(owner=self.owner.username, repo=self.name)
1385        response = self.allspice_client.requests_get(url)
1386        release = Release.parse_response(self.allspice_client, response, self)
1387        return release
1388
1389    def get_release_by_tag(self, tag: str) -> Release:
1390        """
1391        Get a release by its tag.
1392
1393        See https://hub.allspice.io/api/swagger#/repository/repoGetReleaseByTag
1394        """
1395
1396        url = self.REPO_GET_RELEASE_BY_TAG.format(
1397            owner=self.owner.username, repo=self.name, tag=tag
1398        )
1399        response = self.allspice_client.requests_get(url)
1400        release = Release.parse_response(self.allspice_client, response, self)
1401        return release
1402
1403    def get_commit_statuses(
1404        self,
1405        commit: Union[str, Commit],
1406        sort: Optional[CommitStatusSort] = None,
1407        state: Optional[CommitStatusState] = None,
1408    ) -> List[CommitStatus]:
1409        """
1410        Get a list of statuses for a commit.
1411
1412        This is roughly equivalent to the Commit.get_statuses method, but this
1413        method allows you to sort and filter commits and is more convenient if
1414        you have a commit SHA and don't need to get the commit itself.
1415
1416        See https://hub.allspice.io/api/swagger#/repository/repoListStatuses
1417        """
1418
1419        if isinstance(commit, Commit):
1420            commit = commit.sha
1421
1422        params = {}
1423        if sort is not None:
1424            params["sort"] = sort.value
1425        if state is not None:
1426            params["state"] = state.value
1427
1428        url = self.REPO_GET_COMMIT_STATUS.format(
1429            owner=self.owner.username, repo=self.name, sha=commit
1430        )
1431        response = self.allspice_client.requests_get_paginated(url, params=params)
1432        return [CommitStatus.parse_response(self.allspice_client, status) for status in response]
1433
1434    def create_commit_status(
1435        self,
1436        commit: Union[str, Commit],
1437        context: Optional[str] = None,
1438        description: Optional[str] = None,
1439        state: Optional[CommitStatusState] = None,
1440        target_url: Optional[str] = None,
1441    ) -> CommitStatus:
1442        """
1443        Create a status on a commit.
1444
1445        See https://hub.allspice.io/api/swagger#/repository/repoCreateStatus
1446        """
1447
1448        if isinstance(commit, Commit):
1449            commit = commit.sha
1450
1451        data = {}
1452        if context is not None:
1453            data["context"] = context
1454        if description is not None:
1455            data["description"] = description
1456        if state is not None:
1457            data["state"] = state.value
1458        if target_url is not None:
1459            data["target_url"] = target_url
1460
1461        url = self.REPO_GET_COMMIT_STATUS.format(
1462            owner=self.owner.username, repo=self.name, sha=commit
1463        )
1464        response = self.allspice_client.requests_post(url, data=data)
1465        return CommitStatus.parse_response(self.allspice_client, response)
1466
1467    def delete(self):
1468        self.allspice_client.requests_delete(
1469            Repository.REPO_DELETE % (self.owner.username, self.name)
1470        )
1471        self.deleted = True
Repository(allspice_client)
587    def __init__(self, allspice_client):
588        super().__init__(allspice_client)
allow_fast_forward_only_merge: bool
allow_manual_merge: Any
allow_merge_commits: bool
allow_rebase: bool
allow_rebase_explicit: bool
allow_rebase_update: bool
allow_squash_merge: bool
archived: bool
archived_at: str
autodetect_manual_merge: Any
avatar_url: str
clone_url: str
created_at: str
default_allow_maintainer_edit: bool
default_branch: str
default_delete_branch_after_merge: bool
default_merge_style: str
description: str
empty: bool
enable_prune: Any
external_tracker: Any
external_wiki: Any
fork: bool
forks_count: int
full_name: str
has_actions: bool
has_issues: bool
has_packages: bool
has_projects: bool
has_pull_requests: bool
has_releases: bool
has_wiki: bool
html_url: str
id: int
ignore_whitespace_conflicts: bool
internal: bool
internal_tracker: Dict[str, bool]
language: str
languages_url: str
mirror: bool
mirror_interval: str
mirror_updated: str
name: str
object_format_name: str
open_issues_count: int
open_pr_counter: int
original_url: str
owner: Union[User, Organization]
parent: Any
permissions: Dict[str, bool]
private: bool
projects_mode: str
release_counter: int
repo_transfer: Any
size: int
ssh_url: str
stars_count: int
template: bool
updated_at: datetime.datetime
url: str
watchers_count: int
website: str
API_OBJECT = '/repos/{owner}/{name}'
REPO_IS_COLLABORATOR = '/repos/%s/%s/collaborators/%s'
REPO_BRANCHES = '/repos/%s/%s/branches'
REPO_BRANCH = '/repos/{owner}/{repo}/branches/{branch}'
REPO_ISSUES = '/repos/{owner}/{repo}/issues'
REPO_DESIGN_REVIEWS = '/repos/{owner}/{repo}/pulls'
REPO_DELETE = '/repos/%s/%s'
REPO_TIMES = '/repos/%s/%s/times'
REPO_USER_TIME = '/repos/%s/%s/times/%s'
REPO_COMMITS = '/repos/%s/%s/commits'
REPO_TRANSFER = '/repos/{owner}/{repo}/transfer'
REPO_MILESTONES = '/repos/{owner}/{repo}/milestones'
REPO_GET_ARCHIVE = '/repos/{owner}/{repo}/archive/{ref}.{format}'
REPO_GET_ALLSPICE_JSON = '/repos/{owner}/{repo}/allspice_generated/json/{content}'
REPO_GET_ALLSPICE_SVG = '/repos/{owner}/{repo}/allspice_generated/svg/{content}'
REPO_GET_ALLSPICE_PROJECT = '/repos/{owner}/{repo}/allspice_generated/project/{content}'
REPO_GET_TOPICS = '/repos/{owner}/{repo}/topics'
REPO_ADD_TOPIC = '/repos/{owner}/{repo}/topics/{topic}'
REPO_GET_RELEASES = '/repos/{owner}/{repo}/releases'
REPO_GET_LATEST_RELEASE = '/repos/{owner}/{repo}/releases/latest'
REPO_GET_RELEASE_BY_TAG = '/repos/{owner}/{repo}/releases/tags/{tag}'
REPO_GET_COMMIT_STATUS = '/repos/{owner}/{repo}/statuses/{sha}'
REPO_GET_RAW_FILE = '/repos/{owner}/{repo}/raw/{path}'
REPO_GET_TREE = '/repos/{owner}/{repo}/git/trees/{ref}'
@classmethod
def request( cls, allspice_client, owner: str, name: str) -> Repository:
608    @classmethod
609    def request(
610        cls,
611        allspice_client,
612        owner: str,
613        name: str,
614    ) -> Repository:
615        return cls._request(allspice_client, {"owner": owner, "name": name})
@classmethod
def search( cls, allspice_client, query: Optional[str] = None, topic: bool = False, include_description: bool = False, user: Optional[User] = None, owner_to_prioritize: Union[User, Organization, NoneType] = None) -> list[Repository]:
617    @classmethod
618    def search(
619        cls,
620        allspice_client,
621        query: Optional[str] = None,
622        topic: bool = False,
623        include_description: bool = False,
624        user: Optional[User] = None,
625        owner_to_prioritize: Union[User, Organization, None] = None,
626    ) -> list[Repository]:
627        """
628        Search for repositories.
629
630        See https://hub.allspice.io/api/swagger#/repository/repoSearch
631
632        :param query: The query string to search for
633        :param topic: If true, the query string will only be matched against the
634            repository's topic.
635        :param include_description: If true, the query string will be matched
636            against the repository's description as well.
637        :param user: If specified, only repositories that this user owns or
638            contributes to will be searched.
639        :param owner_to_prioritize: If specified, repositories owned by the
640            given entity will be prioritized in the search.
641        :returns: All repositories matching the query. If there are many
642            repositories matching this query, this may take some time.
643        """
644
645        params = {}
646
647        if query is not None:
648            params["q"] = query
649        if topic:
650            params["topic"] = topic
651        if include_description:
652            params["include_description"] = include_description
653        if user is not None:
654            params["user"] = user.id
655        if owner_to_prioritize is not None:
656            params["owner_to_prioritize"] = owner_to_prioritize.id
657
658        responses = allspice_client.requests_get_paginated(cls.REPO_SEARCH, params=params)
659
660        return [Repository.parse_response(allspice_client, response) for response in responses]

Search for repositories.

See allspice.allspice.io/api/swagger#/repository/repoSearch">https://huballspice.allspice.io/api/swagger#/repository/repoSearch

Parameters
  • query: The query string to search for
  • topic: If true, the query string will only be matched against the repository's topic.
  • include_description: If true, the query string will be matched against the repository's description as well.
  • user: If specified, only repositories that this user owns or contributes to will be searched.
  • owner_to_prioritize: If specified, repositories owned by the given entity will be prioritized in the search. :returns: All repositories matching the query. If there are many repositories matching this query, this may take some time.
def commit(self):
692    def commit(self):
693        args = {"owner": self.owner.username, "name": self.name}
694        self._commit(args)
def get_branches(self) -> List[Branch]:
696    def get_branches(self) -> List["Branch"]:
697        """Get all the Branches of this Repository."""
698
699        results = self.allspice_client.requests_get_paginated(
700            Repository.REPO_BRANCHES % (self.owner.username, self.name)
701        )
702        return [Branch.parse_response(self.allspice_client, result) for result in results]

Get all the Branches of this Repository.

def get_branch(self, name: str) -> Branch:
704    def get_branch(self, name: str) -> "Branch":
705        """Get a specific Branch of this Repository."""
706        result = self.allspice_client.requests_get(
707            Repository.REPO_BRANCH.format(owner=self.owner.username, repo=self.name, branch=name)
708        )
709        return Branch.parse_response(self.allspice_client, result)

Get a specific Branch of this Repository.

def add_branch( self, create_from: Union[Branch, Commit, str], newname: str) -> Branch:
711    def add_branch(self, create_from: Ref, newname: str) -> "Branch":
712        """Add a branch to the repository"""
713        # Note: will only work with gitea 1.13 or higher!
714
715        ref_name = Util.data_params_for_ref(create_from)
716        if "ref" not in ref_name:
717            raise ValueError("create_from must be a Branch, Commit or string")
718        ref_name = ref_name["ref"]
719
720        data = {"new_branch_name": newname, "old_ref_name": ref_name}
721        result = self.allspice_client.requests_post(
722            Repository.REPO_BRANCHES % (self.owner.username, self.name), data=data
723        )
724        return Branch.parse_response(self.allspice_client, result)

Add a branch to the repository

def get_issues( self, state: Literal['open', 'closed', 'all'] = 'all', search_query: Optional[str] = None, labels: Optional[List[str]] = None, milestones: Optional[List[Union[Milestone, str]]] = None, assignee: Union[User, str, NoneType] = None, since: Optional[datetime.datetime] = None, before: Optional[datetime.datetime] = None) -> List[Issue]:
726    def get_issues(
727        self,
728        state: Literal["open", "closed", "all"] = "all",
729        search_query: Optional[str] = None,
730        labels: Optional[List[str]] = None,
731        milestones: Optional[List[Union[Milestone, str]]] = None,
732        assignee: Optional[Union[User, str]] = None,
733        since: Optional[datetime] = None,
734        before: Optional[datetime] = None,
735    ) -> List["Issue"]:
736        """
737        Get all Issues of this Repository (open and closed)
738
739        https://hub.allspice.io/api/swagger#/repository/repoListIssues
740
741        All params of this method are optional filters. If you don't specify a filter, it
742        will not be applied.
743
744        :param state: The state of the Issues to get. If None, all Issues are returned.
745        :param search_query: Filter issues by text. This is equivalent to searching for
746                             `search_query` in the Issues on the web interface.
747        :param labels: Filter issues by labels.
748        :param milestones: Filter issues by milestones.
749        :param assignee: Filter issues by the assigned user.
750        :param since: Filter issues by the date they were created.
751        :param before: Filter issues by the date they were created.
752        :return: A list of Issues.
753        """
754
755        data = {
756            "state": state,
757        }
758        if search_query:
759            data["q"] = search_query
760        if labels:
761            data["labels"] = ",".join(labels)
762        if milestones:
763            data["milestone"] = ",".join(
764                [
765                    milestone.name if isinstance(milestone, Milestone) else milestone
766                    for milestone in milestones
767                ]
768            )
769        if assignee:
770            if isinstance(assignee, User):
771                data["assignee"] = assignee.username
772            else:
773                data["assignee"] = assignee
774        if since:
775            data["since"] = Util.format_time(since)
776        if before:
777            data["before"] = Util.format_time(before)
778
779        results = self.allspice_client.requests_get_paginated(
780            Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name),
781            params=data,
782        )
783
784        issues = []
785        for result in results:
786            issue = Issue.parse_response(self.allspice_client, result)
787            # See Issue.request
788            setattr(issue, "_repository", self)
789            # This is mostly for compatibility with an older implementation
790            Issue._add_read_property("repo", self, issue)
791            issues.append(issue)
792
793        return issues

Get all Issues of this Repository (open and closed)

allspice.allspice.io/api/swagger#/repository/repoListIssues">https://huballspice.allspice.io/api/swagger#/repository/repoListIssues

All params of this method are optional filters. If you don't specify a filter, it will not be applied.

Parameters
  • state: The state of the Issues to get. If None, all Issues are returned.
  • search_query: Filter issues by text. This is equivalent to searching for search_query in the Issues on the web interface.
  • labels: Filter issues by labels.
  • milestones: Filter issues by milestones.
  • assignee: Filter issues by the assigned user.
  • since: Filter issues by the date they were created.
  • before: Filter issues by the date they were created.
Returns

A list of Issues.

def get_design_reviews( self, state: Literal['open', 'closed', 'all'] = 'all', milestone: Union[Milestone, str, NoneType] = None, labels: Optional[List[str]] = None) -> List[DesignReview]:
795    def get_design_reviews(
796        self,
797        state: Literal["open", "closed", "all"] = "all",
798        milestone: Optional[Union[Milestone, str]] = None,
799        labels: Optional[List[str]] = None,
800    ) -> List["DesignReview"]:
801        """
802        Get all Design Reviews of this Repository.
803
804        https://hub.allspice.io/api/swagger#/repository/repoListPullRequests
805
806        :param state: The state of the Design Reviews to get. If None, all Design Reviews
807                      are returned.
808        :param milestone: The milestone of the Design Reviews to get.
809        :param labels: A list of label IDs to filter DRs by.
810        :return: A list of Design Reviews.
811        """
812
813        params = {
814            "state": state,
815        }
816        if milestone:
817            if isinstance(milestone, Milestone):
818                params["milestone"] = milestone.name
819            else:
820                params["milestone"] = milestone
821        if labels:
822            params["labels"] = ",".join(labels)
823
824        results = self.allspice_client.requests_get_paginated(
825            self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name),
826            params=params,
827        )
828        return [DesignReview.parse_response(self.allspice_client, result) for result in results]

Get all Design Reviews of this Repository.

allspice.allspice.io/api/swagger#/repository/repoListPullRequests">https://huballspice.allspice.io/api/swagger#/repository/repoListPullRequests

Parameters
  • state: The state of the Design Reviews to get. If None, all Design Reviews are returned.
  • milestone: The milestone of the Design Reviews to get.
  • labels: A list of label IDs to filter DRs by.
Returns

A list of Design Reviews.

def get_commits( self, sha: Optional[str] = None, path: Optional[str] = None, stat: bool = True) -> List[Commit]:
830    def get_commits(
831        self,
832        sha: Optional[str] = None,
833        path: Optional[str] = None,
834        stat: bool = True,
835    ) -> List["Commit"]:
836        """
837        Get all the Commits of this Repository.
838
839        https://hub.allspice.io/api/swagger#/repository/repoGetAllCommits
840
841        :param sha: The SHA of the commit to start listing commits from.
842        :param path: filepath of a file/dir.
843        :param stat: Include the number of additions and deletions in the response.
844                     Disable for speedup.
845        :return: A list of Commits.
846        """
847
848        data = {}
849        if sha:
850            data["sha"] = sha
851        if path:
852            data["path"] = path
853        if not stat:
854            data["stat"] = False
855
856        try:
857            results = self.allspice_client.requests_get_paginated(
858                Repository.REPO_COMMITS % (self.owner.username, self.name),
859                params=data,
860            )
861        except ConflictException as err:
862            logging.warning(err)
863            logging.warning("Repository %s/%s is Empty" % (self.owner.username, self.name))
864            results = []
865        return [Commit.parse_response(self.allspice_client, result) for result in results]

Get all the Commits of this Repository.

allspice.allspice.io/api/swagger#/repository/repoGetAllCommits">https://huballspice.allspice.io/api/swagger#/repository/repoGetAllCommits

Parameters
  • sha: The SHA of the commit to start listing commits from.
  • path: filepath of a file/dir.
  • stat: Include the number of additions and deletions in the response. Disable for speedup.
Returns

A list of Commits.

def get_issues_state(self, state) -> List[Issue]:
867    def get_issues_state(self, state) -> List["Issue"]:
868        """
869        DEPRECATED: Use get_issues() instead.
870
871        Get issues of state Issue.open or Issue.closed of a repository.
872        """
873
874        assert state in [Issue.OPENED, Issue.CLOSED]
875        issues = []
876        data = {"state": state}
877        results = self.allspice_client.requests_get_paginated(
878            Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name),
879            params=data,
880        )
881        for result in results:
882            issue = Issue.parse_response(self.allspice_client, result)
883            # adding data not contained in the issue response
884            # See Issue.request()
885            setattr(issue, "_repository", self)
886            Issue._add_read_property("repo", self, issue)
887            Issue._add_read_property("owner", self.owner, issue)
888            issues.append(issue)
889        return issues

DEPRECATED: Use get_issues() instead.

Get issues of state Issue.open or Issue.closed of a repository.

def get_times(self):
891    def get_times(self):
892        results = self.allspice_client.requests_get(
893            Repository.REPO_TIMES % (self.owner.username, self.name)
894        )
895        return results
def get_user_time(self, username) -> float:
897    def get_user_time(self, username) -> float:
898        if isinstance(username, User):
899            username = username.username
900        results = self.allspice_client.requests_get(
901            Repository.REPO_USER_TIME % (self.owner.username, self.name, username)
902        )
903        time = sum(r["time"] for r in results)
904        return time
def get_full_name(self) -> str:
906    def get_full_name(self) -> str:
907        return self.owner.username + "/" + self.name
def create_issue( self, title, assignees=frozenset(), description='') -> allspice.baseapiobject.ApiObject:
909    def create_issue(self, title, assignees=frozenset(), description="") -> ApiObject:
910        data = {
911            "assignees": assignees,
912            "body": description,
913            "closed": False,
914            "title": title,
915        }
916        result = self.allspice_client.requests_post(
917            Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name),
918            data=data,
919        )
920
921        issue = Issue.parse_response(self.allspice_client, result)
922        setattr(issue, "_repository", self)
923        Issue._add_read_property("repo", self, issue)
924        return issue
def create_design_review( self, title: str, head: Union[Branch, str], base: Union[Branch, str], assignees: Optional[Set[Union[User, str]]] = None, body: Optional[str] = None, due_date: Optional[datetime.datetime] = None, milestone: Optional[Milestone] = None) -> DesignReview:
926    def create_design_review(
927        self,
928        title: str,
929        head: Union[Branch, str],
930        base: Union[Branch, str],
931        assignees: Optional[Set[Union[User, str]]] = None,
932        body: Optional[str] = None,
933        due_date: Optional[datetime] = None,
934        milestone: Optional["Milestone"] = None,
935    ) -> "DesignReview":
936        """
937        Create a new Design Review.
938
939        See https://hub.allspice.io/api/swagger#/repository/repoCreatePullRequest
940
941        :param title: Title of the Design Review
942        :param head: Branch or name of the branch to merge into the base branch
943        :param base: Branch or name of the branch to merge into
944        :param assignees: Optional. A list of users to assign this review. List can be of
945                          User objects or of usernames.
946        :param body: An Optional Description for the Design Review.
947        :param due_date: An Optional Due date for the Design Review.
948        :param milestone: An Optional Milestone for the Design Review
949        :return: The created Design Review
950        """
951
952        data: dict[str, Any] = {
953            "title": title,
954        }
955
956        if isinstance(head, Branch):
957            data["head"] = head.name
958        else:
959            data["head"] = head
960        if isinstance(base, Branch):
961            data["base"] = base.name
962        else:
963            data["base"] = base
964        if assignees:
965            data["assignees"] = [a.username if isinstance(a, User) else a for a in assignees]
966        if body:
967            data["body"] = body
968        if due_date:
969            data["due_date"] = Util.format_time(due_date)
970        if milestone:
971            data["milestone"] = milestone.id
972
973        result = self.allspice_client.requests_post(
974            self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name),
975            data=data,
976        )
977
978        return DesignReview.parse_response(self.allspice_client, result)

Create a new Design Review.

See allspice.allspice.io/api/swagger#/repository/repoCreatePullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoCreatePullRequest

Parameters
  • title: Title of the Design Review
  • head: Branch or name of the branch to merge into the base branch
  • base: Branch or name of the branch to merge into
  • assignees: Optional. A list of users to assign this review. List can be of User objects or of usernames.
  • body: An Optional Description for the Design Review.
  • due_date: An Optional Due date for the Design Review.
  • milestone: An Optional Milestone for the Design Review
Returns

The created Design Review

def create_milestone( self, title: str, description: str, due_date: Optional[str] = None, state: str = 'open') -> Milestone:
980    def create_milestone(
981        self,
982        title: str,
983        description: str,
984        due_date: Optional[str] = None,
985        state: str = "open",
986    ) -> "Milestone":
987        url = Repository.REPO_MILESTONES.format(owner=self.owner.username, repo=self.name)
988        data = {"title": title, "description": description, "state": state}
989        if due_date:
990            data["due_date"] = due_date
991        result = self.allspice_client.requests_post(url, data=data)
992        return Milestone.parse_response(self.allspice_client, result)
def create_gitea_hook(self, hook_url: str, events: List[str]):
 994    def create_gitea_hook(self, hook_url: str, events: List[str]):
 995        url = f"/repos/{self.owner.username}/{self.name}/hooks"
 996        data = {
 997            "type": "gitea",
 998            "config": {"content_type": "json", "url": hook_url},
 999            "events": events,
1000            "active": True,
1001        }
1002        return self.allspice_client.requests_post(url, data=data)
def list_hooks(self):
1004    def list_hooks(self):
1005        url = f"/repos/{self.owner.username}/{self.name}/hooks"
1006        return self.allspice_client.requests_get(url)
def delete_hook(self, id: str):
1008    def delete_hook(self, id: str):
1009        url = f"/repos/{self.owner.username}/{self.name}/hooks/{id}"
1010        self.allspice_client.requests_delete(url)
def is_collaborator(self, username) -> bool:
1012    def is_collaborator(self, username) -> bool:
1013        if isinstance(username, User):
1014            username = username.username
1015        try:
1016            # returns 204 if its ok, 404 if its not
1017            self.allspice_client.requests_get(
1018                Repository.REPO_IS_COLLABORATOR % (self.owner.username, self.name, username)
1019            )
1020            return True
1021        except Exception:
1022            return False
def get_users_with_access(self) -> Sequence[User]:
1024    def get_users_with_access(self) -> Sequence[User]:
1025        url = f"/repos/{self.owner.username}/{self.name}/collaborators"
1026        response = self.allspice_client.requests_get(url)
1027        collabs = [User.parse_response(self.allspice_client, user) for user in response]
1028        if isinstance(self.owner, User):
1029            return [*collabs, self.owner]
1030        else:
1031            # owner must be org
1032            teams = self.owner.get_teams()
1033            for team in teams:
1034                team_repos = team.get_repos()
1035                if self.name in [n.name for n in team_repos]:
1036                    collabs += team.get_members()
1037            return collabs
def remove_collaborator(self, user_name: str):
1039    def remove_collaborator(self, user_name: str):
1040        url = f"/repos/{self.owner.username}/{self.name}/collaborators/{user_name}"
1041        self.allspice_client.requests_delete(url)
def transfer_ownership( self, new_owner: Union[User, Organization], new_teams: Union[Set[Team], FrozenSet[Team]] = frozenset()):
1043    def transfer_ownership(
1044        self,
1045        new_owner: Union[User, Organization],
1046        new_teams: Set[Team] | FrozenSet[Team] = frozenset(),
1047    ):
1048        url = Repository.REPO_TRANSFER.format(owner=self.owner.username, repo=self.name)
1049        data: dict[str, Any] = {"new_owner": new_owner.username}
1050        if isinstance(new_owner, Organization):
1051            new_team_ids = [team.id for team in new_teams if team in new_owner.get_teams()]
1052            data["team_ids"] = new_team_ids
1053        self.allspice_client.requests_post(url, data=data)
1054        # TODO: make sure this instance is either updated or discarded
def get_git_content( self, ref: Union[Branch, Commit, str, NoneType] = None, commit: Optional[Commit] = None) -> List[Content]:
1056    def get_git_content(
1057        self,
1058        ref: Optional["Ref"] = None,
1059        commit: "Optional[Commit]" = None,
1060    ) -> List[Content]:
1061        """
1062        Get the metadata for all files in the root directory.
1063
1064        https://hub.allspice.io/api/swagger#/repository/repoGetContentsList
1065
1066        :param ref: branch or commit to get content from
1067        :param commit: commit to get content from (deprecated)
1068        """
1069        url = f"/repos/{self.owner.username}/{self.name}/contents"
1070        data = Util.data_params_for_ref(ref or commit)
1071
1072        result = [
1073            Content.parse_response(self.allspice_client, f)
1074            for f in self.allspice_client.requests_get(url, data)
1075        ]
1076        return result

Get the metadata for all files in the root directory.

allspice.allspice.io/api/swagger#/repository/repoGetContentsList">https://huballspice.allspice.io/api/swagger#/repository/repoGetContentsList

Parameters
  • ref: branch or commit to get content from
  • commit: commit to get content from (deprecated)
def get_tree( self, ref: Union[Branch, Commit, str, NoneType] = None, recursive: bool = False) -> List[allspice.apiobject.GitEntry]:
1078    def get_tree(self, ref: Optional[Ref] = None, recursive: bool = False) -> List[GitEntry]:
1079        """
1080        Get the repository's tree on a given ref.
1081
1082        By default, this will only return the top-level entries in the tree. If you want
1083        to get the entire tree, set `recursive` to True.
1084
1085        :param ref: The ref to get the tree from. If not provided, the default branch is used.
1086        :param recursive: Whether to get the entire tree or just the top-level entries.
1087        """
1088
1089        ref = Util.data_params_for_ref(ref).get("ref", self.default_branch)
1090        url = self.REPO_GET_TREE.format(owner=self.owner.username, repo=self.name, ref=ref)
1091        params = {"recursive": recursive}
1092        results = self.allspice_client.requests_get_paginated(url, params=params)
1093        return [GitEntry.parse_response(self.allspice_client, result) for result in results]

Get the repository's tree on a given ref.

By default, this will only return the top-level entries in the tree. If you want to get the entire tree, set recursive to True.

Parameters
  • ref: The ref to get the tree from. If not provided, the default branch is used.
  • recursive: Whether to get the entire tree or just the top-level entries.
def get_file_content( self, content: Content, ref: Union[Branch, Commit, str, NoneType] = None, commit: Optional[Commit] = None) -> Union[str, List[Content]]:
1095    def get_file_content(
1096        self,
1097        content: Content,
1098        ref: Optional[Ref] = None,
1099        commit: Optional[Commit] = None,
1100    ) -> Union[str, List["Content"]]:
1101        """https://hub.allspice.io/api/swagger#/repository/repoGetContents"""
1102        url = f"/repos/{self.owner.username}/{self.name}/contents/{content.path}"
1103        data = Util.data_params_for_ref(ref or commit)
1104
1105        if content.type == Content.FILE:
1106            return self.allspice_client.requests_get(url, data)["content"]
1107        else:
1108            return [
1109                Content.parse_response(self.allspice_client, f)
1110                for f in self.allspice_client.requests_get(url, data)
1111            ]

allspice.allspice.io/api/swagger#/repository/repoGetContents">https://huballspice.allspice.io/api/swagger#/repository/repoGetContents

def get_raw_file( self, file_path: str, ref: Union[Branch, Commit, str, NoneType] = None) -> bytes:
1113    def get_raw_file(
1114        self,
1115        file_path: str,
1116        ref: Optional[Ref] = None,
1117    ) -> bytes:
1118        """
1119        Get the raw, binary data of a single file.
1120
1121        Note 1: if the file you are requesting is a text file, you might want to
1122        use .decode() on the result to get a string. For example:
1123
1124            content = repo.get_raw_file("file.txt").decode("utf-8")
1125
1126        Note 2: this method will store the entire file in memory. If you want
1127        to download a large file, you might want to use `download_to_file`
1128        instead.
1129
1130        See https://hub.allspice.io/api/swagger#/repository/repoGetRawFile
1131
1132        :param file_path: The path to the file to get.
1133        :param ref: The branch or commit to get the file from.  If not provided,
1134            the default branch is used.
1135        """
1136
1137        url = self.REPO_GET_RAW_FILE.format(
1138            owner=self.owner.username,
1139            repo=self.name,
1140            path=file_path,
1141        )
1142        params = Util.data_params_for_ref(ref)
1143        return self.allspice_client.requests_get_raw(url, params=params)

Get the raw, binary data of a single file.

Note 1: if the file you are requesting is a text file, you might want to use .decode() on the result to get a string. For example:

content = repo.get_raw_file("file.txt").decode("utf-8")

Note 2: this method will store the entire file in memory. If you want to download a large file, you might want to use download_to_file instead.

See allspice.allspice.io/api/swagger#/repository/repoGetRawFile">https://huballspice.allspice.io/api/swagger#/repository/repoGetRawFile

Parameters
  • file_path: The path to the file to get.
  • ref: The branch or commit to get the file from. If not provided, the default branch is used.
def download_to_file( self, file_path: str, io: <class 'IO'>, ref: Union[Branch, Commit, str, NoneType] = None) -> None:
1145    def download_to_file(
1146        self,
1147        file_path: str,
1148        io: IO,
1149        ref: Optional[Ref] = None,
1150    ) -> None:
1151        """
1152        Download the binary data of a file to a file-like object.
1153
1154        Example:
1155
1156            with open("schematic.DSN", "wb") as f:
1157                Repository.download_to_file("Schematics/my_schematic.DSN", f)
1158
1159        :param file_path: The path to the file in the repository from the root
1160            of the repository.
1161        :param io: The file-like object to write the data to.
1162        """
1163
1164        url = self.allspice_client._AllSpice__get_url(
1165            self.REPO_GET_RAW_FILE.format(
1166                owner=self.owner.username,
1167                repo=self.name,
1168                path=file_path,
1169            )
1170        )
1171        params = Util.data_params_for_ref(ref)
1172        response = self.allspice_client.requests.get(
1173            url,
1174            params=params,
1175            headers=self.allspice_client.headers,
1176            stream=True,
1177        )
1178
1179        for chunk in response.iter_content(chunk_size=4096):
1180            if chunk:
1181                io.write(chunk)

Download the binary data of a file to a file-like object.

Example:

with open("schematic.DSN", "wb") as f:
    Repository.download_to_file("Schematics/my_schematic.DSN", f)
Parameters
  • file_path: The path to the file in the repository from the root of the repository.
  • io: The file-like object to write the data to.
def get_generated_json( self, content: Union[Content, str], ref: Union[Branch, Commit, str, NoneType] = None) -> dict:
1183    def get_generated_json(self, content: Union[Content, str], ref: Optional[Ref] = None) -> dict:
1184        """
1185        Get the json blob for a cad file if it exists, otherwise enqueue
1186        a new job and return a 503 status.
1187
1188        WARNING: This is still experimental and not recommended for critical
1189        applications. The structure and content of the returned dictionary can
1190        change at any time.
1191
1192        See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON
1193        """
1194
1195        if isinstance(content, Content):
1196            content = content.path
1197
1198        url = self.REPO_GET_ALLSPICE_JSON.format(
1199            owner=self.owner.username,
1200            repo=self.name,
1201            content=content,
1202        )
1203        data = Util.data_params_for_ref(ref)
1204        return self.allspice_client.requests_get(url, data)

Get the json blob for a cad file if it exists, otherwise enqueue a new job and return a 503 status.

WARNING: This is still experimental and not recommended for critical applications. The structure and content of the returned dictionary can change at any time.

See allspice.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON">https://huballspice.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON

def get_generated_svg( self, content: Union[Content, str], ref: Union[Branch, Commit, str, NoneType] = None) -> bytes:
1206    def get_generated_svg(self, content: Union[Content, str], ref: Optional[Ref] = None) -> bytes:
1207        """
1208        Get the svg blob for a cad file if it exists, otherwise enqueue
1209        a new job and return a 503 status.
1210
1211        WARNING: This is still experimental and not yet recommended for
1212        critical applications. The content of the returned svg can change
1213        at any time.
1214
1215        See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG
1216        """
1217
1218        if isinstance(content, Content):
1219            content = content.path
1220
1221        url = self.REPO_GET_ALLSPICE_SVG.format(
1222            owner=self.owner.username,
1223            repo=self.name,
1224            content=content,
1225        )
1226        data = Util.data_params_for_ref(ref)
1227        return self.allspice_client.requests_get_raw(url, data)

Get the svg blob for a cad file if it exists, otherwise enqueue a new job and return a 503 status.

WARNING: This is still experimental and not yet recommended for critical applications. The content of the returned svg can change at any time.

See allspice.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG">https://huballspice.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG

def get_generated_projectdata( self, content: Union[Content, str], ref: Union[Branch, Commit, str, NoneType] = None) -> dict:
1229    def get_generated_projectdata(
1230        self, content: Union[Content, str], ref: Optional[Ref] = None
1231    ) -> dict:
1232        """
1233        Get the json project data based on the cad file provided
1234
1235        WARNING: This is still experimental and not yet recommended for
1236        critical applications. The content of the returned dictionary can change
1237        at any time.
1238
1239        See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceProject
1240        """
1241        if isinstance(content, Content):
1242            content = content.path
1243
1244        url = self.REPO_GET_ALLSPICE_PROJECT.format(
1245            owner=self.owner.username,
1246            repo=self.name,
1247            content=content,
1248        )
1249        data = Util.data_params_for_ref(ref)
1250        return self.allspice_client.requests_get(url, data)

Get the json project data based on the cad file provided

WARNING: This is still experimental and not yet recommended for critical applications. The content of the returned dictionary can change at any time.

See allspice.allspice.io/api/swagger#/repository/repoGetAllSpiceProject">https://huballspice.allspice.io/api/swagger#/repository/repoGetAllSpiceProject

def create_file(self, file_path: str, content: str, data: Optional[dict] = None):
1252    def create_file(self, file_path: str, content: str, data: Optional[dict] = None):
1253        """https://hub.allspice.io/api/swagger#/repository/repoCreateFile"""
1254        if not data:
1255            data = {}
1256        url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}"
1257        data.update({"content": content})
1258        return self.allspice_client.requests_post(url, data)

allspice.allspice.io/api/swagger#/repository/repoCreateFile">https://huballspice.allspice.io/api/swagger#/repository/repoCreateFile

def change_file( self, file_path: str, file_sha: str, content: str, data: Optional[dict] = None):
1260    def change_file(self, file_path: str, file_sha: str, content: str, data: Optional[dict] = None):
1261        """https://hub.allspice.io/api/swagger#/repository/repoCreateFile"""
1262        if not data:
1263            data = {}
1264        url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}"
1265        data.update({"sha": file_sha, "content": content})
1266        return self.allspice_client.requests_put(url, data)

allspice.allspice.io/api/swagger#/repository/repoCreateFile">https://huballspice.allspice.io/api/swagger#/repository/repoCreateFile

def delete_file(self, file_path: str, file_sha: str, data: Optional[dict] = None):
1268    def delete_file(self, file_path: str, file_sha: str, data: Optional[dict] = None):
1269        """https://hub.allspice.io/api/swagger#/repository/repoDeleteFile"""
1270        if not data:
1271            data = {}
1272        url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}"
1273        data.update({"sha": file_sha})
1274        return self.allspice_client.requests_delete(url, data)

allspice.allspice.io/api/swagger#/repository/repoDeleteFile">https://huballspice.allspice.io/api/swagger#/repository/repoDeleteFile

def get_archive( self, ref: Union[Branch, Commit, str] = 'main', archive_format: Repository.ArchiveFormat = <ArchiveFormat.ZIP: 'zip'>) -> bytes:
1276    def get_archive(
1277        self,
1278        ref: Ref = "main",
1279        archive_format: ArchiveFormat = ArchiveFormat.ZIP,
1280    ) -> bytes:
1281        """
1282        Download all the files in a specific ref of a repository as a zip or tarball
1283        archive.
1284
1285        https://hub.allspice.io/api/swagger#/repository/repoGetArchive
1286
1287        :param ref: branch or commit to get content from, defaults to the "main" branch
1288        :param archive_format: zip or tar, defaults to zip
1289        """
1290
1291        ref_string = Util.data_params_for_ref(ref)["ref"]
1292        url = self.REPO_GET_ARCHIVE.format(
1293            owner=self.owner.username,
1294            repo=self.name,
1295            ref=ref_string,
1296            format=archive_format.value,
1297        )
1298        return self.allspice_client.requests_get_raw(url)

Download all the files in a specific ref of a repository as a zip or tarball archive.

allspice.allspice.io/api/swagger#/repository/repoGetArchive">https://huballspice.allspice.io/api/swagger#/repository/repoGetArchive

Parameters
  • ref: branch or commit to get content from, defaults to the "main" branch
  • archive_format: zip or tar, defaults to zip
def get_topics(self) -> list[str]:
1300    def get_topics(self) -> list[str]:
1301        """
1302        Gets the list of topics on this repository.
1303
1304        See http://localhost:3000/api/swagger#/repository/repoListTopics
1305        """
1306
1307        url = self.REPO_GET_TOPICS.format(
1308            owner=self.owner.username,
1309            repo=self.name,
1310        )
1311        return self.allspice_client.requests_get(url)["topics"]

Gets the list of topics on this repository.

See http://localhost:3000/api/swagger#/repository/repoListTopics

def add_topic(self, topic: str):
1313    def add_topic(self, topic: str):
1314        """
1315        Adds a topic to the repository.
1316
1317        See https://hub.allspice.io/api/swagger#/repository/repoAddTopic
1318
1319        :param topic: The topic to add. Topic names must consist only of
1320            lowercase letters, numnbers and dashes (-), and cannot start with
1321            dashes. Topic names also must be under 35 characters long.
1322        """
1323
1324        url = self.REPO_ADD_TOPIC.format(owner=self.owner.username, repo=self.name, topic=topic)
1325        self.allspice_client.requests_put(url)

Adds a topic to the repository.

See allspice.allspice.io/api/swagger#/repository/repoAddTopic">https://huballspice.allspice.io/api/swagger#/repository/repoAddTopic

Parameters
  • topic: The topic to add. Topic names must consist only of lowercase letters, numnbers and dashes (-), and cannot start with dashes. Topic names also must be under 35 characters long.
def create_release( self, tag_name: str, name: Optional[str] = None, body: Optional[str] = None, draft: bool = False):
1327    def create_release(
1328        self,
1329        tag_name: str,
1330        name: Optional[str] = None,
1331        body: Optional[str] = None,
1332        draft: bool = False,
1333    ):
1334        """
1335        Create a release for this repository. The release will be created for
1336        the tag with the given name. If there is no tag with this name, create
1337        the tag first.
1338
1339        See https://hub.allspice.io/api/swagger#/repository/repoCreateRelease
1340        """
1341
1342        url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name)
1343        data = {
1344            "tag_name": tag_name,
1345            "draft": draft,
1346        }
1347        if name is not None:
1348            data["name"] = name
1349        if body is not None:
1350            data["body"] = body
1351        response = self.allspice_client.requests_post(url, data)
1352        return Release.parse_response(self.allspice_client, response, self)

Create a release for this repository. The release will be created for the tag with the given name. If there is no tag with this name, create the tag first.

See allspice.allspice.io/api/swagger#/repository/repoCreateRelease">https://huballspice.allspice.io/api/swagger#/repository/repoCreateRelease

def get_releases( self, draft: Optional[bool] = None, pre_release: Optional[bool] = None) -> List[Release]:
1354    def get_releases(
1355        self, draft: Optional[bool] = None, pre_release: Optional[bool] = None
1356    ) -> List[Release]:
1357        """
1358        Get the list of releases for this repository.
1359
1360        See https://hub.allspice.io/api/swagger#/repository/repoListReleases
1361        """
1362
1363        data = {}
1364
1365        if draft is not None:
1366            data["draft"] = draft
1367        if pre_release is not None:
1368            data["pre-release"] = pre_release
1369
1370        url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name)
1371        responses = self.allspice_client.requests_get_paginated(url, params=data)
1372
1373        return [
1374            Release.parse_response(self.allspice_client, response, self) for response in responses
1375        ]

Get the list of releases for this repository.

See allspice.allspice.io/api/swagger#/repository/repoListReleases">https://huballspice.allspice.io/api/swagger#/repository/repoListReleases

def get_latest_release(self) -> Release:
1377    def get_latest_release(self) -> Release:
1378        """
1379        Get the latest release for this repository.
1380
1381        See https://hub.allspice.io/api/swagger#/repository/repoGetLatestRelease
1382        """
1383
1384        url = self.REPO_GET_LATEST_RELEASE.format(owner=self.owner.username, repo=self.name)
1385        response = self.allspice_client.requests_get(url)
1386        release = Release.parse_response(self.allspice_client, response, self)
1387        return release

Get the latest release for this repository.

See allspice.allspice.io/api/swagger#/repository/repoGetLatestRelease">https://huballspice.allspice.io/api/swagger#/repository/repoGetLatestRelease

def get_release_by_tag(self, tag: str) -> Release:
1389    def get_release_by_tag(self, tag: str) -> Release:
1390        """
1391        Get a release by its tag.
1392
1393        See https://hub.allspice.io/api/swagger#/repository/repoGetReleaseByTag
1394        """
1395
1396        url = self.REPO_GET_RELEASE_BY_TAG.format(
1397            owner=self.owner.username, repo=self.name, tag=tag
1398        )
1399        response = self.allspice_client.requests_get(url)
1400        release = Release.parse_response(self.allspice_client, response, self)
1401        return release

Get a release by its tag.

See allspice.allspice.io/api/swagger#/repository/repoGetReleaseByTag">https://huballspice.allspice.io/api/swagger#/repository/repoGetReleaseByTag

def get_commit_statuses( self, commit: Union[str, Commit], sort: Optional[Repository.CommitStatusSort] = None, state: Optional[allspice.apiobject.CommitStatusState] = None) -> List[allspice.apiobject.CommitStatus]:
1403    def get_commit_statuses(
1404        self,
1405        commit: Union[str, Commit],
1406        sort: Optional[CommitStatusSort] = None,
1407        state: Optional[CommitStatusState] = None,
1408    ) -> List[CommitStatus]:
1409        """
1410        Get a list of statuses for a commit.
1411
1412        This is roughly equivalent to the Commit.get_statuses method, but this
1413        method allows you to sort and filter commits and is more convenient if
1414        you have a commit SHA and don't need to get the commit itself.
1415
1416        See https://hub.allspice.io/api/swagger#/repository/repoListStatuses
1417        """
1418
1419        if isinstance(commit, Commit):
1420            commit = commit.sha
1421
1422        params = {}
1423        if sort is not None:
1424            params["sort"] = sort.value
1425        if state is not None:
1426            params["state"] = state.value
1427
1428        url = self.REPO_GET_COMMIT_STATUS.format(
1429            owner=self.owner.username, repo=self.name, sha=commit
1430        )
1431        response = self.allspice_client.requests_get_paginated(url, params=params)
1432        return [CommitStatus.parse_response(self.allspice_client, status) for status in response]

Get a list of statuses for a commit.

This is roughly equivalent to the Commit.get_statuses method, but this method allows you to sort and filter commits and is more convenient if you have a commit SHA and don't need to get the commit itself.

See allspice.allspice.io/api/swagger#/repository/repoListStatuses">https://huballspice.allspice.io/api/swagger#/repository/repoListStatuses

def create_commit_status( self, commit: Union[str, Commit], context: Optional[str] = None, description: Optional[str] = None, state: Optional[allspice.apiobject.CommitStatusState] = None, target_url: Optional[str] = None) -> allspice.apiobject.CommitStatus:
1434    def create_commit_status(
1435        self,
1436        commit: Union[str, Commit],
1437        context: Optional[str] = None,
1438        description: Optional[str] = None,
1439        state: Optional[CommitStatusState] = None,
1440        target_url: Optional[str] = None,
1441    ) -> CommitStatus:
1442        """
1443        Create a status on a commit.
1444
1445        See https://hub.allspice.io/api/swagger#/repository/repoCreateStatus
1446        """
1447
1448        if isinstance(commit, Commit):
1449            commit = commit.sha
1450
1451        data = {}
1452        if context is not None:
1453            data["context"] = context
1454        if description is not None:
1455            data["description"] = description
1456        if state is not None:
1457            data["state"] = state.value
1458        if target_url is not None:
1459            data["target_url"] = target_url
1460
1461        url = self.REPO_GET_COMMIT_STATUS.format(
1462            owner=self.owner.username, repo=self.name, sha=commit
1463        )
1464        response = self.allspice_client.requests_post(url, data=data)
1465        return CommitStatus.parse_response(self.allspice_client, response)

Create a status on a commit.

See allspice.allspice.io/api/swagger#/repository/repoCreateStatus">https://huballspice.allspice.io/api/swagger#/repository/repoCreateStatus

def delete(self):
1467    def delete(self):
1468        self.allspice_client.requests_delete(
1469            Repository.REPO_DELETE % (self.owner.username, self.name)
1470        )
1471        self.deleted = True
class Repository.ArchiveFormat(enum.Enum):
568    class ArchiveFormat(Enum):
569        """
570        Archive formats for Repository.get_archive
571        """
572
573        TAR = "tar.gz"
574        ZIP = "zip"

Archive formats for Repository.get_archive

TAR = <ArchiveFormat.TAR: 'tar.gz'>
ZIP = <ArchiveFormat.ZIP: 'zip'>
class Repository.CommitStatusSort(enum.Enum):
576    class CommitStatusSort(Enum):
577        """
578        Sort order for Repository.get_commit_status
579        """
580
581        OLDEST = "oldest"
582        RECENT_UPDATE = "recentupdate"
583        LEAST_UPDATE = "leastupdate"
584        LEAST_INDEX = "leastindex"
585        HIGHEST_INDEX = "highestindex"

Sort order for Repository.get_commit_status

OLDEST = <CommitStatusSort.OLDEST: 'oldest'>
RECENT_UPDATE = <CommitStatusSort.RECENT_UPDATE: 'recentupdate'>
LEAST_UPDATE = <CommitStatusSort.LEAST_UPDATE: 'leastupdate'>
LEAST_INDEX = <CommitStatusSort.LEAST_INDEX: 'leastindex'>
HIGHEST_INDEX = <CommitStatusSort.HIGHEST_INDEX: 'highestindex'>
class Team(allspice.baseapiobject.ApiObject):
2300class Team(ApiObject):
2301    can_create_org_repo: bool
2302    description: str
2303    id: int
2304    includes_all_repositories: bool
2305    name: str
2306    organization: Optional["Organization"]
2307    permission: str
2308    units: List[str]
2309    units_map: Dict[str, str]
2310
2311    API_OBJECT = """/teams/{id}"""  # <id>
2312    ADD_REPO = """/teams/%s/repos/%s/%s"""  # <id, org, repo>
2313    TEAM_DELETE = """/teams/%s"""  # <id>
2314    GET_MEMBERS = """/teams/%s/members"""  # <id>
2315    GET_REPOS = """/teams/%s/repos"""  # <id>
2316
2317    def __init__(self, allspice_client):
2318        super().__init__(allspice_client)
2319
2320    def __eq__(self, other):
2321        if not isinstance(other, Team):
2322            return False
2323        return self.organization == other.organization and self.id == other.id
2324
2325    def __hash__(self):
2326        return hash(self.organization) ^ hash(self.id)
2327
2328    _fields_to_parsers: ClassVar[dict] = {
2329        "organization": lambda allspice_client, o: Organization.parse_response(allspice_client, o)
2330    }
2331
2332    _patchable_fields: ClassVar[set[str]] = {
2333        "can_create_org_repo",
2334        "description",
2335        "includes_all_repositories",
2336        "name",
2337        "permission",
2338        "units",
2339        "units_map",
2340    }
2341
2342    @classmethod
2343    def request(cls, allspice_client, id: int):
2344        return cls._request(allspice_client, {"id": id})
2345
2346    def commit(self):
2347        args = {"id": self.id}
2348        self._commit(args)
2349
2350    def add_user(self, user: User):
2351        """https://hub.allspice.io/api/swagger#/organization/orgAddTeamMember"""
2352        url = f"/teams/{self.id}/members/{user.login}"
2353        self.allspice_client.requests_put(url)
2354
2355    def add_repo(self, org: Organization, repo: Union[Repository, str]):
2356        if isinstance(repo, Repository):
2357            repo_name = repo.name
2358        else:
2359            repo_name = repo
2360        self.allspice_client.requests_put(Team.ADD_REPO % (self.id, org.username, repo_name))
2361
2362    def get_members(self):
2363        """Get all users assigned to the team."""
2364        results = self.allspice_client.requests_get_paginated(
2365            Team.GET_MEMBERS % self.id,
2366        )
2367        return [User.parse_response(self.allspice_client, result) for result in results]
2368
2369    def get_repos(self):
2370        """Get all repos of this Team."""
2371        results = self.allspice_client.requests_get(Team.GET_REPOS % self.id)
2372        return [Repository.parse_response(self.allspice_client, result) for result in results]
2373
2374    def delete(self):
2375        self.allspice_client.requests_delete(Team.TEAM_DELETE % self.id)
2376        self.deleted = True
2377
2378    def remove_team_member(self, user_name: str):
2379        url = f"/teams/{self.id}/members/{user_name}"
2380        self.allspice_client.requests_delete(url)
Team(allspice_client)
2317    def __init__(self, allspice_client):
2318        super().__init__(allspice_client)
can_create_org_repo: bool
description: str
id: int
includes_all_repositories: bool
name: str
organization: Optional[Organization]
permission: str
units: List[str]
units_map: Dict[str, str]
API_OBJECT = '/teams/{id}'
ADD_REPO = '/teams/%s/repos/%s/%s'
TEAM_DELETE = '/teams/%s'
GET_MEMBERS = '/teams/%s/members'
GET_REPOS = '/teams/%s/repos'
@classmethod
def request(cls, allspice_client, id: int):
2342    @classmethod
2343    def request(cls, allspice_client, id: int):
2344        return cls._request(allspice_client, {"id": id})
def commit(self):
2346    def commit(self):
2347        args = {"id": self.id}
2348        self._commit(args)
def add_user(self, user: User):
2350    def add_user(self, user: User):
2351        """https://hub.allspice.io/api/swagger#/organization/orgAddTeamMember"""
2352        url = f"/teams/{self.id}/members/{user.login}"
2353        self.allspice_client.requests_put(url)

allspice.allspice.io/api/swagger#/organization/orgAddTeamMember">https://huballspice.allspice.io/api/swagger#/organization/orgAddTeamMember

def add_repo( self, org: Organization, repo: Union[Repository, str]):
2355    def add_repo(self, org: Organization, repo: Union[Repository, str]):
2356        if isinstance(repo, Repository):
2357            repo_name = repo.name
2358        else:
2359            repo_name = repo
2360        self.allspice_client.requests_put(Team.ADD_REPO % (self.id, org.username, repo_name))
def get_members(self):
2362    def get_members(self):
2363        """Get all users assigned to the team."""
2364        results = self.allspice_client.requests_get_paginated(
2365            Team.GET_MEMBERS % self.id,
2366        )
2367        return [User.parse_response(self.allspice_client, result) for result in results]

Get all users assigned to the team.

def get_repos(self):
2369    def get_repos(self):
2370        """Get all repos of this Team."""
2371        results = self.allspice_client.requests_get(Team.GET_REPOS % self.id)
2372        return [Repository.parse_response(self.allspice_client, result) for result in results]

Get all repos of this Team.

def delete(self):
2374    def delete(self):
2375        self.allspice_client.requests_delete(Team.TEAM_DELETE % self.id)
2376        self.deleted = True
def remove_team_member(self, user_name: str):
2378    def remove_team_member(self, user_name: str):
2379        url = f"/teams/{self.id}/members/{user_name}"
2380        self.allspice_client.requests_delete(url)
class User(allspice.baseapiobject.ApiObject):
241class User(ApiObject):
242    active: bool
243    admin: Any
244    allow_create_organization: Any
245    allow_git_hook: Any
246    allow_import_local: Any
247    avatar_url: str
248    created: str
249    description: str
250    email: str
251    emails: List[Any]
252    followers_count: int
253    following_count: int
254    full_name: str
255    html_url: str
256    id: int
257    is_admin: bool
258    language: str
259    last_login: str
260    location: str
261    login: str
262    login_name: str
263    max_repo_creation: Any
264    must_change_password: Any
265    password: Any
266    prohibit_login: bool
267    restricted: bool
268    source_id: int
269    starred_repos_count: int
270    username: str
271    visibility: str
272    website: str
273
274    API_OBJECT = """/users/{name}"""  # <org>
275    USER_MAIL = """/user/emails?sudo=%s"""  # <name>
276    USER_PATCH = """/admin/users/%s"""  # <username>
277    ADMIN_DELETE_USER = """/admin/users/%s"""  # <username>
278    ADMIN_EDIT_USER = """/admin/users/{username}"""  # <username>
279    USER_HEATMAP = """/users/%s/heatmap"""  # <username>
280
281    def __init__(self, allspice_client):
282        super().__init__(allspice_client)
283        self._emails = []
284
285    def __eq__(self, other):
286        if not isinstance(other, User):
287            return False
288        return self.allspice_client == other.allspice_client and self.id == other.id
289
290    def __hash__(self):
291        return hash(self.allspice_client) ^ hash(self.id)
292
293    @property
294    def emails(self):
295        self.__request_emails()
296        return self._emails
297
298    @classmethod
299    def request(cls, allspice_client, name: str) -> "User":
300        api_object = cls._request(allspice_client, {"name": name})
301        return api_object
302
303    _patchable_fields: ClassVar[set[str]] = {
304        "active",
305        "admin",
306        "allow_create_organization",
307        "allow_git_hook",
308        "allow_import_local",
309        "email",
310        "full_name",
311        "location",
312        "login_name",
313        "max_repo_creation",
314        "must_change_password",
315        "password",
316        "prohibit_login",
317        "website",
318    }
319
320    def commit(self, login_name: str, source_id: int = 0):
321        """
322        Unfortunately it is necessary to require the login name
323        as well as the login source (that is not supplied when getting a user) for
324        changing a user.
325        Usually source_id is 0 and the login_name is equal to the username.
326        """
327        values = self.get_dirty_fields()
328        values.update(
329            # api-doc says that the "source_id" is necessary; works without though
330            {"login_name": login_name, "source_id": source_id}
331        )
332        args = {"username": self.username}
333        self.allspice_client.requests_patch(User.ADMIN_EDIT_USER.format(**args), data=values)
334        self._dirty_fields = {}
335
336    def create_repo(
337        self,
338        repoName: str,
339        description: str = "",
340        private: bool = False,
341        autoInit=True,
342        gitignores: Optional[str] = None,
343        license: Optional[str] = None,
344        readme: str = "Default",
345        issue_labels: Optional[str] = None,
346        default_branch="master",
347    ):
348        """Create a user Repository
349
350        Throws:
351            AlreadyExistsException: If the Repository exists already.
352            Exception: If something else went wrong.
353        """
354        result = self.allspice_client.requests_post(
355            "/user/repos",
356            data={
357                "name": repoName,
358                "description": description,
359                "private": private,
360                "auto_init": autoInit,
361                "gitignores": gitignores,
362                "license": license,
363                "issue_labels": issue_labels,
364                "readme": readme,
365                "default_branch": default_branch,
366            },
367        )
368        if "id" in result:
369            self.allspice_client.logger.info("Successfully created Repository %s " % result["name"])
370        else:
371            self.allspice_client.logger.error(result["message"])
372            raise Exception("Repository not created... (gitea: %s)" % result["message"])
373        return Repository.parse_response(self.allspice_client, result)
374
375    def get_repositories(self) -> List["Repository"]:
376        """Get all Repositories owned by this User."""
377        url = f"/users/{self.username}/repos"
378        results = self.allspice_client.requests_get_paginated(url)
379        return [Repository.parse_response(self.allspice_client, result) for result in results]
380
381    def get_orgs(self) -> List[Organization]:
382        """Get all Organizations this user is a member of."""
383        url = f"/users/{self.username}/orgs"
384        results = self.allspice_client.requests_get_paginated(url)
385        return [Organization.parse_response(self.allspice_client, result) for result in results]
386
387    def get_teams(self) -> List["Team"]:
388        url = "/user/teams"
389        results = self.allspice_client.requests_get_paginated(url, sudo=self)
390        return [Team.parse_response(self.allspice_client, result) for result in results]
391
392    def get_accessible_repos(self) -> List["Repository"]:
393        """Get all Repositories accessible by the logged in User."""
394        results = self.allspice_client.requests_get("/user/repos", sudo=self)
395        return [Repository.parse_response(self.allspice_client, result) for result in results]
396
397    def __request_emails(self):
398        result = self.allspice_client.requests_get(User.USER_MAIL % self.login)
399        # report if the adress changed by this
400        for mail in result:
401            self._emails.append(mail["email"])
402            if mail["primary"]:
403                self._email = mail["email"]
404
405    def delete(self):
406        """Deletes this User. Also deletes all Repositories he owns."""
407        self.allspice_client.requests_delete(User.ADMIN_DELETE_USER % self.username)
408        self.deleted = True
409
410    def get_heatmap(self) -> List[Tuple[datetime, int]]:
411        results = self.allspice_client.requests_get(User.USER_HEATMAP % self.username)
412        results = [
413            (datetime.fromtimestamp(result["timestamp"]), result["contributions"])
414            for result in results
415        ]
416        return results
User(allspice_client)
281    def __init__(self, allspice_client):
282        super().__init__(allspice_client)
283        self._emails = []
active: bool
admin: Any
allow_create_organization: Any
allow_git_hook: Any
allow_import_local: Any
avatar_url: str
created: str
description: str
email: str
emails
293    @property
294    def emails(self):
295        self.__request_emails()
296        return self._emails
followers_count: int
following_count: int
full_name: str
html_url: str
id: int
is_admin: bool
language: str
last_login: str
location: str
login: str
login_name: str
max_repo_creation: Any
must_change_password: Any
password: Any
prohibit_login: bool
restricted: bool
source_id: int
starred_repos_count: int
username: str
visibility: str
website: str
API_OBJECT = '/users/{name}'
USER_MAIL = '/user/emails?sudo=%s'
USER_PATCH = '/admin/users/%s'
ADMIN_DELETE_USER = '/admin/users/%s'
ADMIN_EDIT_USER = '/admin/users/{username}'
USER_HEATMAP = '/users/%s/heatmap'
@classmethod
def request(cls, allspice_client, name: str) -> User:
298    @classmethod
299    def request(cls, allspice_client, name: str) -> "User":
300        api_object = cls._request(allspice_client, {"name": name})
301        return api_object
def commit(self, login_name: str, source_id: int = 0):
320    def commit(self, login_name: str, source_id: int = 0):
321        """
322        Unfortunately it is necessary to require the login name
323        as well as the login source (that is not supplied when getting a user) for
324        changing a user.
325        Usually source_id is 0 and the login_name is equal to the username.
326        """
327        values = self.get_dirty_fields()
328        values.update(
329            # api-doc says that the "source_id" is necessary; works without though
330            {"login_name": login_name, "source_id": source_id}
331        )
332        args = {"username": self.username}
333        self.allspice_client.requests_patch(User.ADMIN_EDIT_USER.format(**args), data=values)
334        self._dirty_fields = {}

Unfortunately it is necessary to require the login name as well as the login source (that is not supplied when getting a user) for changing a user. Usually source_id is 0 and the login_name is equal to the username.

def create_repo( self, repoName: str, description: str = '', private: bool = False, autoInit=True, gitignores: Optional[str] = None, license: Optional[str] = None, readme: str = 'Default', issue_labels: Optional[str] = None, default_branch='master'):
336    def create_repo(
337        self,
338        repoName: str,
339        description: str = "",
340        private: bool = False,
341        autoInit=True,
342        gitignores: Optional[str] = None,
343        license: Optional[str] = None,
344        readme: str = "Default",
345        issue_labels: Optional[str] = None,
346        default_branch="master",
347    ):
348        """Create a user Repository
349
350        Throws:
351            AlreadyExistsException: If the Repository exists already.
352            Exception: If something else went wrong.
353        """
354        result = self.allspice_client.requests_post(
355            "/user/repos",
356            data={
357                "name": repoName,
358                "description": description,
359                "private": private,
360                "auto_init": autoInit,
361                "gitignores": gitignores,
362                "license": license,
363                "issue_labels": issue_labels,
364                "readme": readme,
365                "default_branch": default_branch,
366            },
367        )
368        if "id" in result:
369            self.allspice_client.logger.info("Successfully created Repository %s " % result["name"])
370        else:
371            self.allspice_client.logger.error(result["message"])
372            raise Exception("Repository not created... (gitea: %s)" % result["message"])
373        return Repository.parse_response(self.allspice_client, result)

Create a user Repository

Throws: AlreadyExistsException: If the Repository exists already. Exception: If something else went wrong.

def get_repositories(self) -> List[Repository]:
375    def get_repositories(self) -> List["Repository"]:
376        """Get all Repositories owned by this User."""
377        url = f"/users/{self.username}/repos"
378        results = self.allspice_client.requests_get_paginated(url)
379        return [Repository.parse_response(self.allspice_client, result) for result in results]

Get all Repositories owned by this User.

def get_orgs(self) -> List[Organization]:
381    def get_orgs(self) -> List[Organization]:
382        """Get all Organizations this user is a member of."""
383        url = f"/users/{self.username}/orgs"
384        results = self.allspice_client.requests_get_paginated(url)
385        return [Organization.parse_response(self.allspice_client, result) for result in results]

Get all Organizations this user is a member of.

def get_teams(self) -> List[Team]:
387    def get_teams(self) -> List["Team"]:
388        url = "/user/teams"
389        results = self.allspice_client.requests_get_paginated(url, sudo=self)
390        return [Team.parse_response(self.allspice_client, result) for result in results]
def get_accessible_repos(self) -> List[Repository]:
392    def get_accessible_repos(self) -> List["Repository"]:
393        """Get all Repositories accessible by the logged in User."""
394        results = self.allspice_client.requests_get("/user/repos", sudo=self)
395        return [Repository.parse_response(self.allspice_client, result) for result in results]

Get all Repositories accessible by the logged in User.

def delete(self):
405    def delete(self):
406        """Deletes this User. Also deletes all Repositories he owns."""
407        self.allspice_client.requests_delete(User.ADMIN_DELETE_USER % self.username)
408        self.deleted = True

Deletes this User. Also deletes all Repositories he owns.

def get_heatmap(self) -> List[Tuple[datetime.datetime, int]]:
410    def get_heatmap(self) -> List[Tuple[datetime, int]]:
411        results = self.allspice_client.requests_get(User.USER_HEATMAP % self.username)
412        results = [
413            (datetime.fromtimestamp(result["timestamp"]), result["contributions"])
414            for result in results
415        ]
416        return results