allspice

A very simple API client for AllSpice Hub

Note that not the full Swagger-API is accessible. The whole implementation is focused on making access and working with Organizations, Teams, Repositories and Users as pain free as possible.

Forked from https://github.com/Langenfeld/py-gitea.

Usage

Docs

See the documentation site.

Examples

Check the examples directory for full, working example scripts that you can adapt or refer to for your own needs.

Quickstart

First get an allspice_client object wrapping access and authentication (via an api token) for your instance of AllSpice Hub.

from allspice import *

# By default, points to hub.allspice.io.
allspice_client = AllSpice(token_text=TOKEN)

# If you are self-hosting:
allspice_client = AllSpice(allspice_hub_url=URL, token_text=TOKEN)

Operations like requesting the AllSpice version or authentication user can be requested directly from the allspice_client object:

print("AllSpice Version: " + allspice_client.get_version())
print("API-Token belongs to user: " + allspice_client.get_user().username)

Adding entities like Users, Organizations, ... also is done via the allspice_client object.

user = allspice_client.create_user("Test Testson", "test@test.test", "password")

All operations on entities in allspice are then accomplished via the according wrapper objects for those entities. Each of those objects has a .request method that creates an entity according to your allspice_client instance.

other_user = User.request(allspice_client, "OtherUserName")
print(other_user.username)

Note that the fields of the User, Organization,... classes are dynamically created at runtime, and thus not visible during divelopment. Refer to the AllSpice API documentation for the fields names.

Fields that can not be altered via allspice-api, are read only. After altering a field, the .commit method of the according object must be called to synchronize the changed fields with your allspice_client instance.

org = Organization.request(allspice_client, test_org)
org.description = "some new description"
org.location = "some new location"
org.commit()

An entity in allspice can be deleted by calling delete.

org.delete()

All entity objects do have methods to execute some of the requests possible though the AllSpice api:

org = Organization.request(allspice_client, ORGNAME)
teams = org.get_teams()
for team in teams:
    repos = team.get_repos()
    for repo in repos:
        print(repo.name)

 1"""
 2.. include:: ../README.md
 3   :start-line: 1
 4   :end-before: Installation
 5"""
 6
 7from .allspice import (
 8    AllSpice,
 9)
10from .apiobject import (
11    Branch,
12    Comment,
13    Commit,
14    Content,
15    DesignReview,
16    Issue,
17    Milestone,
18    Organization,
19    Release,
20    Repository,
21    Team,
22    User,
23)
24from .exceptions import AlreadyExistsException, NotFoundException
25
26__version__ = "3.7.0"
27
28__all__ = [
29    "AllSpice",
30    "AlreadyExistsException",
31    "Branch",
32    "Comment",
33    "Commit",
34    "Content",
35    "DesignReview",
36    "Issue",
37    "Milestone",
38    "NotFoundException",
39    "Organization",
40    "Release",
41    "Repository",
42    "Team",
43    "User",
44]
class AllSpice:
 20class AllSpice:
 21    """Object to establish a session with AllSpice Hub."""
 22
 23    ADMIN_CREATE_USER = """/admin/users"""
 24    GET_USERS_ADMIN = """/admin/users"""
 25    ADMIN_REPO_CREATE = """/admin/users/%s/repos"""  # <ownername>
 26    ALLSPICE_HUB_VERSION = """/version"""
 27    GET_USER = """/user"""
 28    GET_REPOSITORY = """/repos/{owner}/{name}"""
 29    CREATE_ORG = """/admin/users/%s/orgs"""  # <username>
 30    CREATE_TEAM = """/orgs/%s/teams"""  # <orgname>
 31
 32    def __init__(
 33        self,
 34        allspice_hub_url="https://hub.allspice.io",
 35        token_text=None,
 36        auth=None,
 37        verify=True,
 38        log_level="INFO",
 39        ratelimiting=(100, 60),
 40    ):
 41        """Initializing an instance of the AllSpice Hub Client
 42
 43        Args:
 44            allspice_hub_url (str): The URL for the AllSpice Hub instance.
 45                Defaults to `https://hub.allspice.io`.
 46
 47            token_text (str, None): The access token, by default None.
 48
 49            auth (tuple, None): The user credentials
 50                `(username, password)`, by default None.
 51
 52            verify (bool): If True, allow insecure server connections
 53                when using SSL.
 54
 55            log_level (str): The log level, by default `INFO`.
 56
 57            ratelimiting (tuple[int, int], None): `(max_calls, period)`,
 58                If None, no rate limiting is applied. By default, 100 calls
 59                per minute are allowed.
 60        """
 61
 62        self.logger = logging.getLogger(__name__)
 63        self.logger.setLevel(log_level)
 64        self.headers = {
 65            "Content-type": "application/json",
 66        }
 67        self.url = allspice_hub_url
 68
 69        if ratelimiting is None:
 70            self.requests = requests.Session()
 71        else:
 72            (max_calls, period) = ratelimiting
 73            self.requests = RateLimitedSession(max_calls=max_calls, period=period)
 74
 75        # Manage authentification
 76        if not token_text and not auth:
 77            raise ValueError("Please provide auth or token_text, but not both")
 78        if token_text:
 79            self.headers["Authorization"] = "token " + token_text
 80        if auth:
 81            self.logger.warning(
 82                "Using basic auth is not recommended. Prefer using a token instead."
 83            )
 84            self.requests.auth = auth
 85
 86        # Manage SSL certification verification
 87        self.requests.verify = verify
 88        if not verify:
 89            urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
 90
 91    def __get_url(self, endpoint):
 92        url = self.url + "/api/v1" + endpoint
 93        self.logger.debug("Url: %s" % url)
 94        return url
 95
 96    def __get(self, endpoint: str, params: Mapping = frozendict()) -> requests.Response:
 97        request = self.requests.get(self.__get_url(endpoint), headers=self.headers, params=params)
 98        if request.status_code not in [200, 201]:
 99            message = f"Received status code: {request.status_code} ({request.url})"
100            if request.status_code in [404]:
101                raise NotFoundException(message)
102            if request.status_code in [403]:
103                raise Exception(
104                    f"Unauthorized: {request.url} - Check your permissions and try again! ({message})"
105                )
106            if request.status_code in [409]:
107                raise ConflictException(message)
108            if request.status_code in [503]:
109                raise NotYetGeneratedException(message)
110            raise Exception(message)
111        return request
112
113    @staticmethod
114    def parse_result(result) -> Dict:
115        """Parses the result-JSON to a dict."""
116        if result.text and len(result.text) > 3:
117            return json.loads(result.text)
118        return {}
119
120    def requests_get(self, endpoint: str, params: Mapping = frozendict(), sudo=None):
121        combined_params = {}
122        combined_params.update(params)
123        if sudo:
124            combined_params["sudo"] = sudo.username
125        return self.parse_result(self.__get(endpoint, combined_params))
126
127    def requests_get_raw(self, endpoint: str, params=frozendict(), sudo=None) -> bytes:
128        combined_params = {}
129        combined_params.update(params)
130        if sudo:
131            combined_params["sudo"] = sudo.username
132        return self.__get(endpoint, combined_params).content
133
134    def requests_get_paginated(
135        self,
136        endpoint: str,
137        params=frozendict(),
138        sudo=None,
139        page_key: str = "page",
140        first_page: int = 1,
141    ):
142        page = first_page
143        combined_params = {}
144        combined_params.update(params)
145        aggregated_result = []
146        while True:
147            combined_params[page_key] = page
148            result = self.requests_get(endpoint, combined_params, sudo)
149
150            if not result:
151                return aggregated_result
152
153            if isinstance(result, dict):
154                if "data" in result:
155                    data = result["data"]
156                    if len(data) == 0:
157                        return aggregated_result
158                    aggregated_result.extend(data)
159                elif "tree" in result:
160                    data = result["tree"]
161                    if data is None or len(data) == 0:
162                        return aggregated_result
163                    aggregated_result.extend(data)
164                else:
165                    raise NotImplementedError(
166                        "requests_get_paginated does not know how to handle responses of this type."
167                    )
168            else:
169                aggregated_result.extend(result)
170
171            page += 1
172
173    def requests_put(self, endpoint: str, data: Optional[dict] = None):
174        if not data:
175            data = {}
176        request = self.requests.put(
177            self.__get_url(endpoint), headers=self.headers, data=json.dumps(data)
178        )
179        if request.status_code not in [200, 204]:
180            message = f"Received status code: {request.status_code} ({request.url}) {request.text}"
181            self.logger.error(message)
182            raise Exception(message)
183
184    def requests_delete(self, endpoint: str, data: Optional[dict] = None):
185        request = self.requests.delete(
186            self.__get_url(endpoint), headers=self.headers, data=json.dumps(data)
187        )
188        if request.status_code not in [200, 204]:
189            message = f"Received status code: {request.status_code} ({request.url})"
190            self.logger.error(message)
191            raise Exception(message)
192
193    def requests_post(
194        self,
195        endpoint: str,
196        data: Optional[dict] = None,
197        params: Optional[dict] = None,
198        files: Optional[dict] = None,
199    ):
200        """
201        Make a POST call to the endpoint.
202
203        :param endpoint: The path to the endpoint
204        :param data: A dictionary for JSON data
205        :param params: A dictionary of query params
206        :param files: A dictionary of files, see requests.post. Using both files and data
207                      can lead to unexpected results!
208        :return: The JSON response parsed as a dict
209        """
210
211        # This should ideally be a TypedDict of the type of arguments taken by
212        # `requests.post`.
213        args: dict[str, Any] = {
214            "headers": self.headers.copy(),
215        }
216        if data is not None:
217            args["data"] = json.dumps(data)
218        if params is not None:
219            args["params"] = params
220        if files is not None:
221            args["headers"].pop("Content-type")
222            args["files"] = files
223
224        request = self.requests.post(self.__get_url(endpoint), **args)
225
226        if request.status_code not in [200, 201, 202]:
227            if "already exists" in request.text or "e-mail already in use" in request.text:
228                self.logger.warning(request.text)
229                raise AlreadyExistsException()
230            self.logger.error(f"Received status code: {request.status_code} ({request.url})")
231            self.logger.error(f"With info: {data} ({self.headers})")
232            self.logger.error(f"Answer: {request.text}")
233            raise Exception(
234                f"Received status code: {request.status_code} ({request.url}), {request.text}"
235            )
236        return self.parse_result(request)
237
238    def requests_patch(self, endpoint: str, data: dict):
239        request = self.requests.patch(
240            self.__get_url(endpoint), headers=self.headers, data=json.dumps(data)
241        )
242        if request.status_code not in [200, 201]:
243            error_message = f"Received status code: {request.status_code} ({request.url}) {data}"
244            self.logger.error(error_message)
245            raise Exception(error_message)
246        return self.parse_result(request)
247
248    def get_orgs_public_members_all(self, orgname):
249        path = "/orgs/" + orgname + "/public_members"
250        return self.requests_get(path)
251
252    def get_orgs(self):
253        path = "/admin/orgs"
254        results = self.requests_get(path)
255        return [Organization.parse_response(self, result) for result in results]
256
257    def get_user(self):
258        result = self.requests_get(AllSpice.GET_USER)
259        return User.parse_response(self, result)
260
261    def get_version(self) -> str:
262        result = self.requests_get(AllSpice.ALLSPICE_HUB_VERSION)
263        return result["version"]
264
265    def get_users(self) -> List[User]:
266        results = self.requests_get(AllSpice.GET_USERS_ADMIN)
267        return [User.parse_response(self, result) for result in results]
268
269    def get_user_by_email(self, email: str) -> Optional[User]:
270        users = self.get_users()
271        for user in users:
272            if user.email == email or email in user.emails:
273                return user
274        return None
275
276    def get_user_by_name(self, username: str) -> Optional[User]:
277        users = self.get_users()
278        for user in users:
279            if user.username == username:
280                return user
281        return None
282
283    def get_repository(self, owner: str, name: str) -> Repository:
284        path = self.GET_REPOSITORY.format(owner=owner, name=name)
285        result = self.requests_get(path)
286        return Repository.parse_response(self, result)
287
288    def create_user(
289        self,
290        user_name: str,
291        email: str,
292        password: str,
293        full_name: Optional[str] = None,
294        login_name: Optional[str] = None,
295        change_pw=True,
296        send_notify=True,
297        source_id=0,
298    ):
299        """Create User.
300        Throws:
301            AlreadyExistsException, if the User exists already
302            Exception, if something else went wrong.
303        """
304        if not login_name:
305            login_name = user_name
306        if not full_name:
307            full_name = user_name
308        request_data = {
309            "source_id": source_id,
310            "login_name": login_name,
311            "full_name": full_name,
312            "username": user_name,
313            "email": email,
314            "password": password,
315            "send_notify": send_notify,
316            "must_change_password": change_pw,
317        }
318
319        self.logger.debug("Gitea post payload: %s", request_data)
320        result = self.requests_post(AllSpice.ADMIN_CREATE_USER, data=request_data)
321        if "id" in result:
322            self.logger.info(
323                "Successfully created User %s <%s> (id %s)",
324                result["login"],
325                result["email"],
326                result["id"],
327            )
328            self.logger.debug("Gitea response: %s", result)
329        else:
330            self.logger.error(result["message"])
331            raise Exception("User not created... (gitea: %s)" % result["message"])
332        user = User.parse_response(self, result)
333        return user
334
335    def create_repo(
336        self,
337        repoOwner: Union[User, Organization],
338        repoName: str,
339        description: str = "",
340        private: bool = False,
341        autoInit=True,
342        gitignores: Optional[str] = None,
343        license: Optional[str] = None,
344        readme: str = "Default",
345        issue_labels: Optional[str] = None,
346        default_branch="master",
347    ):
348        """Create a Repository as the administrator
349
350        Throws:
351            AlreadyExistsException: If the Repository exists already.
352            Exception: If something else went wrong.
353
354        Note:
355            Non-admin users can not use this method. Please use instead
356            `allspice.User.create_repo` or `allspice.Organization.create_repo`.
357        """
358        # although this only says user in the api, this also works for
359        # organizations
360        assert isinstance(repoOwner, User) or isinstance(repoOwner, Organization)
361        result = self.requests_post(
362            AllSpice.ADMIN_REPO_CREATE % repoOwner.username,
363            data={
364                "name": repoName,
365                "description": description,
366                "private": private,
367                "auto_init": autoInit,
368                "gitignores": gitignores,
369                "license": license,
370                "issue_labels": issue_labels,
371                "readme": readme,
372                "default_branch": default_branch,
373            },
374        )
375        if "id" in result:
376            self.logger.info("Successfully created Repository %s " % result["name"])
377        else:
378            self.logger.error(result["message"])
379            raise Exception("Repository not created... (gitea: %s)" % result["message"])
380        return Repository.parse_response(self, result)
381
382    def create_org(
383        self,
384        owner: User,
385        orgName: str,
386        description: str,
387        location="",
388        website="",
389        full_name="",
390    ):
391        assert isinstance(owner, User)
392        result = self.requests_post(
393            AllSpice.CREATE_ORG % owner.username,
394            data={
395                "username": orgName,
396                "description": description,
397                "location": location,
398                "website": website,
399                "full_name": full_name,
400            },
401        )
402        if "id" in result:
403            self.logger.info("Successfully created Organization %s" % result["username"])
404        else:
405            self.logger.error("Organization not created... (gitea: %s)" % result["message"])
406            self.logger.error(result["message"])
407            raise Exception("Organization not created... (gitea: %s)" % result["message"])
408        return Organization.parse_response(self, result)
409
410    def create_team(
411        self,
412        org: Organization,
413        name: str,
414        description: str = "",
415        permission: str = "read",
416        can_create_org_repo: bool = False,
417        includes_all_repositories: bool = False,
418        units=(
419            "repo.code",
420            "repo.issues",
421            "repo.ext_issues",
422            "repo.wiki",
423            "repo.pulls",
424            "repo.releases",
425            "repo.ext_wiki",
426        ),
427        units_map={},
428    ):
429        """Creates a Team.
430
431        Args:
432            org (Organization): Organization the Team will be part of.
433            name (str): The Name of the Team to be created.
434            description (str): Optional, None, short description of the new Team.
435            permission (str): Optional, 'read', What permissions the members
436            units_map (dict): Optional, {}, a mapping of units to their
437                permissions. If None or empty, the `permission` permission will
438                be applied to all units. Note: When both `units` and `units_map`
439                are given, `units_map` will be preferred.
440        """
441
442        result = self.requests_post(
443            AllSpice.CREATE_TEAM % org.username,
444            data={
445                "name": name,
446                "description": description,
447                "permission": permission,
448                "can_create_org_repo": can_create_org_repo,
449                "includes_all_repositories": includes_all_repositories,
450                "units": units,
451                "units_map": units_map,
452            },
453        )
454
455        if "id" in result:
456            self.logger.info("Successfully created Team %s" % result["name"])
457        else:
458            self.logger.error("Team not created... (gitea: %s)" % result["message"])
459            self.logger.error(result["message"])
460            raise Exception("Team not created... (gitea: %s)" % result["message"])
461        api_object = Team.parse_response(self, result)
462        setattr(
463            api_object, "_organization", org
464        )  # fixes strange behaviour of gitea not returning a valid organization here.
465        return api_object

Object to establish a session with AllSpice Hub.

AllSpice( allspice_hub_url='https://hub.allspice.io', token_text=None, auth=None, verify=True, log_level='INFO', ratelimiting=(100, 60))
32    def __init__(
33        self,
34        allspice_hub_url="https://hub.allspice.io",
35        token_text=None,
36        auth=None,
37        verify=True,
38        log_level="INFO",
39        ratelimiting=(100, 60),
40    ):
41        """Initializing an instance of the AllSpice Hub Client
42
43        Args:
44            allspice_hub_url (str): The URL for the AllSpice Hub instance.
45                Defaults to `https://hub.allspice.io`.
46
47            token_text (str, None): The access token, by default None.
48
49            auth (tuple, None): The user credentials
50                `(username, password)`, by default None.
51
52            verify (bool): If True, allow insecure server connections
53                when using SSL.
54
55            log_level (str): The log level, by default `INFO`.
56
57            ratelimiting (tuple[int, int], None): `(max_calls, period)`,
58                If None, no rate limiting is applied. By default, 100 calls
59                per minute are allowed.
60        """
61
62        self.logger = logging.getLogger(__name__)
63        self.logger.setLevel(log_level)
64        self.headers = {
65            "Content-type": "application/json",
66        }
67        self.url = allspice_hub_url
68
69        if ratelimiting is None:
70            self.requests = requests.Session()
71        else:
72            (max_calls, period) = ratelimiting
73            self.requests = RateLimitedSession(max_calls=max_calls, period=period)
74
75        # Manage authentification
76        if not token_text and not auth:
77            raise ValueError("Please provide auth or token_text, but not both")
78        if token_text:
79            self.headers["Authorization"] = "token " + token_text
80        if auth:
81            self.logger.warning(
82                "Using basic auth is not recommended. Prefer using a token instead."
83            )
84            self.requests.auth = auth
85
86        # Manage SSL certification verification
87        self.requests.verify = verify
88        if not verify:
89            urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

Initializing an instance of the AllSpice Hub Client

Args: allspice_hub_url (str): The URL for the AllSpice Hub instance. Defaults to https://hub.allspice.io.

token_text (str, None): The access token, by default None.

auth (tuple, None): The user credentials
    `(username, password)`, by default None.

verify (bool): If True, allow insecure server connections
    when using SSL.

log_level (str): The log level, by default `INFO`.

ratelimiting (tuple[int, int], None): `(max_calls, period)`,
    If None, no rate limiting is applied. By default, 100 calls
    per minute are allowed.
ADMIN_CREATE_USER = '/admin/users'
GET_USERS_ADMIN = '/admin/users'
ADMIN_REPO_CREATE = '/admin/users/%s/repos'
ALLSPICE_HUB_VERSION = '/version'
GET_USER = '/user'
GET_REPOSITORY = '/repos/{owner}/{name}'
CREATE_ORG = '/admin/users/%s/orgs'
CREATE_TEAM = '/orgs/%s/teams'
logger
headers
url
@staticmethod
def parse_result(result) -> Dict:
113    @staticmethod
114    def parse_result(result) -> Dict:
115        """Parses the result-JSON to a dict."""
116        if result.text and len(result.text) > 3:
117            return json.loads(result.text)
118        return {}

Parses the result-JSON to a dict.

def requests_get( self, endpoint: str, params: Mapping = frozendict.frozendict({}), sudo=None):
120    def requests_get(self, endpoint: str, params: Mapping = frozendict(), sudo=None):
121        combined_params = {}
122        combined_params.update(params)
123        if sudo:
124            combined_params["sudo"] = sudo.username
125        return self.parse_result(self.__get(endpoint, combined_params))
def requests_get_raw( self, endpoint: str, params=frozendict.frozendict({}), sudo=None) -> bytes:
127    def requests_get_raw(self, endpoint: str, params=frozendict(), sudo=None) -> bytes:
128        combined_params = {}
129        combined_params.update(params)
130        if sudo:
131            combined_params["sudo"] = sudo.username
132        return self.__get(endpoint, combined_params).content
def requests_get_paginated( self, endpoint: str, params=frozendict.frozendict({}), sudo=None, page_key: str = 'page', first_page: int = 1):
134    def requests_get_paginated(
135        self,
136        endpoint: str,
137        params=frozendict(),
138        sudo=None,
139        page_key: str = "page",
140        first_page: int = 1,
141    ):
142        page = first_page
143        combined_params = {}
144        combined_params.update(params)
145        aggregated_result = []
146        while True:
147            combined_params[page_key] = page
148            result = self.requests_get(endpoint, combined_params, sudo)
149
150            if not result:
151                return aggregated_result
152
153            if isinstance(result, dict):
154                if "data" in result:
155                    data = result["data"]
156                    if len(data) == 0:
157                        return aggregated_result
158                    aggregated_result.extend(data)
159                elif "tree" in result:
160                    data = result["tree"]
161                    if data is None or len(data) == 0:
162                        return aggregated_result
163                    aggregated_result.extend(data)
164                else:
165                    raise NotImplementedError(
166                        "requests_get_paginated does not know how to handle responses of this type."
167                    )
168            else:
169                aggregated_result.extend(result)
170
171            page += 1
def requests_put(self, endpoint: str, data: Optional[dict] = None):
173    def requests_put(self, endpoint: str, data: Optional[dict] = None):
174        if not data:
175            data = {}
176        request = self.requests.put(
177            self.__get_url(endpoint), headers=self.headers, data=json.dumps(data)
178        )
179        if request.status_code not in [200, 204]:
180            message = f"Received status code: {request.status_code} ({request.url}) {request.text}"
181            self.logger.error(message)
182            raise Exception(message)
def requests_delete(self, endpoint: str, data: Optional[dict] = None):
184    def requests_delete(self, endpoint: str, data: Optional[dict] = None):
185        request = self.requests.delete(
186            self.__get_url(endpoint), headers=self.headers, data=json.dumps(data)
187        )
188        if request.status_code not in [200, 204]:
189            message = f"Received status code: {request.status_code} ({request.url})"
190            self.logger.error(message)
191            raise Exception(message)
def requests_post( self, endpoint: str, data: Optional[dict] = None, params: Optional[dict] = None, files: Optional[dict] = None):
193    def requests_post(
194        self,
195        endpoint: str,
196        data: Optional[dict] = None,
197        params: Optional[dict] = None,
198        files: Optional[dict] = None,
199    ):
200        """
201        Make a POST call to the endpoint.
202
203        :param endpoint: The path to the endpoint
204        :param data: A dictionary for JSON data
205        :param params: A dictionary of query params
206        :param files: A dictionary of files, see requests.post. Using both files and data
207                      can lead to unexpected results!
208        :return: The JSON response parsed as a dict
209        """
210
211        # This should ideally be a TypedDict of the type of arguments taken by
212        # `requests.post`.
213        args: dict[str, Any] = {
214            "headers": self.headers.copy(),
215        }
216        if data is not None:
217            args["data"] = json.dumps(data)
218        if params is not None:
219            args["params"] = params
220        if files is not None:
221            args["headers"].pop("Content-type")
222            args["files"] = files
223
224        request = self.requests.post(self.__get_url(endpoint), **args)
225
226        if request.status_code not in [200, 201, 202]:
227            if "already exists" in request.text or "e-mail already in use" in request.text:
228                self.logger.warning(request.text)
229                raise AlreadyExistsException()
230            self.logger.error(f"Received status code: {request.status_code} ({request.url})")
231            self.logger.error(f"With info: {data} ({self.headers})")
232            self.logger.error(f"Answer: {request.text}")
233            raise Exception(
234                f"Received status code: {request.status_code} ({request.url}), {request.text}"
235            )
236        return self.parse_result(request)

Make a POST call to the endpoint.

Parameters
  • endpoint: The path to the endpoint
  • data: A dictionary for JSON data
  • params: A dictionary of query params
  • files: A dictionary of files, see requests.post. Using both files and data can lead to unexpected results!
Returns

The JSON response parsed as a dict

def requests_patch(self, endpoint: str, data: dict):
238    def requests_patch(self, endpoint: str, data: dict):
239        request = self.requests.patch(
240            self.__get_url(endpoint), headers=self.headers, data=json.dumps(data)
241        )
242        if request.status_code not in [200, 201]:
243            error_message = f"Received status code: {request.status_code} ({request.url}) {data}"
244            self.logger.error(error_message)
245            raise Exception(error_message)
246        return self.parse_result(request)
def get_orgs_public_members_all(self, orgname):
248    def get_orgs_public_members_all(self, orgname):
249        path = "/orgs/" + orgname + "/public_members"
250        return self.requests_get(path)
def get_orgs(self):
252    def get_orgs(self):
253        path = "/admin/orgs"
254        results = self.requests_get(path)
255        return [Organization.parse_response(self, result) for result in results]
def get_user(self):
257    def get_user(self):
258        result = self.requests_get(AllSpice.GET_USER)
259        return User.parse_response(self, result)
def get_version(self) -> str:
261    def get_version(self) -> str:
262        result = self.requests_get(AllSpice.ALLSPICE_HUB_VERSION)
263        return result["version"]
def get_users(self) -> List[User]:
265    def get_users(self) -> List[User]:
266        results = self.requests_get(AllSpice.GET_USERS_ADMIN)
267        return [User.parse_response(self, result) for result in results]
def get_user_by_email(self, email: str) -> Optional[User]:
269    def get_user_by_email(self, email: str) -> Optional[User]:
270        users = self.get_users()
271        for user in users:
272            if user.email == email or email in user.emails:
273                return user
274        return None
def get_user_by_name(self, username: str) -> Optional[User]:
276    def get_user_by_name(self, username: str) -> Optional[User]:
277        users = self.get_users()
278        for user in users:
279            if user.username == username:
280                return user
281        return None
def get_repository(self, owner: str, name: str) -> Repository:
283    def get_repository(self, owner: str, name: str) -> Repository:
284        path = self.GET_REPOSITORY.format(owner=owner, name=name)
285        result = self.requests_get(path)
286        return Repository.parse_response(self, result)
def create_user( self, user_name: str, email: str, password: str, full_name: Optional[str] = None, login_name: Optional[str] = None, change_pw=True, send_notify=True, source_id=0):
288    def create_user(
289        self,
290        user_name: str,
291        email: str,
292        password: str,
293        full_name: Optional[str] = None,
294        login_name: Optional[str] = None,
295        change_pw=True,
296        send_notify=True,
297        source_id=0,
298    ):
299        """Create User.
300        Throws:
301            AlreadyExistsException, if the User exists already
302            Exception, if something else went wrong.
303        """
304        if not login_name:
305            login_name = user_name
306        if not full_name:
307            full_name = user_name
308        request_data = {
309            "source_id": source_id,
310            "login_name": login_name,
311            "full_name": full_name,
312            "username": user_name,
313            "email": email,
314            "password": password,
315            "send_notify": send_notify,
316            "must_change_password": change_pw,
317        }
318
319        self.logger.debug("Gitea post payload: %s", request_data)
320        result = self.requests_post(AllSpice.ADMIN_CREATE_USER, data=request_data)
321        if "id" in result:
322            self.logger.info(
323                "Successfully created User %s <%s> (id %s)",
324                result["login"],
325                result["email"],
326                result["id"],
327            )
328            self.logger.debug("Gitea response: %s", result)
329        else:
330            self.logger.error(result["message"])
331            raise Exception("User not created... (gitea: %s)" % result["message"])
332        user = User.parse_response(self, result)
333        return user

Create User. Throws: AlreadyExistsException, if the User exists already Exception, if something else went wrong.

def create_repo( self, repoOwner: Union[User, Organization], repoName: str, description: str = '', private: bool = False, autoInit=True, gitignores: Optional[str] = None, license: Optional[str] = None, readme: str = 'Default', issue_labels: Optional[str] = None, default_branch='master'):
335    def create_repo(
336        self,
337        repoOwner: Union[User, Organization],
338        repoName: str,
339        description: str = "",
340        private: bool = False,
341        autoInit=True,
342        gitignores: Optional[str] = None,
343        license: Optional[str] = None,
344        readme: str = "Default",
345        issue_labels: Optional[str] = None,
346        default_branch="master",
347    ):
348        """Create a Repository as the administrator
349
350        Throws:
351            AlreadyExistsException: If the Repository exists already.
352            Exception: If something else went wrong.
353
354        Note:
355            Non-admin users can not use this method. Please use instead
356            `allspice.User.create_repo` or `allspice.Organization.create_repo`.
357        """
358        # although this only says user in the api, this also works for
359        # organizations
360        assert isinstance(repoOwner, User) or isinstance(repoOwner, Organization)
361        result = self.requests_post(
362            AllSpice.ADMIN_REPO_CREATE % repoOwner.username,
363            data={
364                "name": repoName,
365                "description": description,
366                "private": private,
367                "auto_init": autoInit,
368                "gitignores": gitignores,
369                "license": license,
370                "issue_labels": issue_labels,
371                "readme": readme,
372                "default_branch": default_branch,
373            },
374        )
375        if "id" in result:
376            self.logger.info("Successfully created Repository %s " % result["name"])
377        else:
378            self.logger.error(result["message"])
379            raise Exception("Repository not created... (gitea: %s)" % result["message"])
380        return Repository.parse_response(self, result)

Create a Repository as the administrator

Throws: AlreadyExistsException: If the Repository exists already. Exception: If something else went wrong.

Note: Non-admin users can not use this method. Please use instead allspice.User.create_repo or allspice.Organization.create_repo.

def create_org( self, owner: User, orgName: str, description: str, location='', website='', full_name=''):
382    def create_org(
383        self,
384        owner: User,
385        orgName: str,
386        description: str,
387        location="",
388        website="",
389        full_name="",
390    ):
391        assert isinstance(owner, User)
392        result = self.requests_post(
393            AllSpice.CREATE_ORG % owner.username,
394            data={
395                "username": orgName,
396                "description": description,
397                "location": location,
398                "website": website,
399                "full_name": full_name,
400            },
401        )
402        if "id" in result:
403            self.logger.info("Successfully created Organization %s" % result["username"])
404        else:
405            self.logger.error("Organization not created... (gitea: %s)" % result["message"])
406            self.logger.error(result["message"])
407            raise Exception("Organization not created... (gitea: %s)" % result["message"])
408        return Organization.parse_response(self, result)
def create_team( self, org: Organization, name: str, description: str = '', permission: str = 'read', can_create_org_repo: bool = False, includes_all_repositories: bool = False, units=('repo.code', 'repo.issues', 'repo.ext_issues', 'repo.wiki', 'repo.pulls', 'repo.releases', 'repo.ext_wiki'), units_map={}):
410    def create_team(
411        self,
412        org: Organization,
413        name: str,
414        description: str = "",
415        permission: str = "read",
416        can_create_org_repo: bool = False,
417        includes_all_repositories: bool = False,
418        units=(
419            "repo.code",
420            "repo.issues",
421            "repo.ext_issues",
422            "repo.wiki",
423            "repo.pulls",
424            "repo.releases",
425            "repo.ext_wiki",
426        ),
427        units_map={},
428    ):
429        """Creates a Team.
430
431        Args:
432            org (Organization): Organization the Team will be part of.
433            name (str): The Name of the Team to be created.
434            description (str): Optional, None, short description of the new Team.
435            permission (str): Optional, 'read', What permissions the members
436            units_map (dict): Optional, {}, a mapping of units to their
437                permissions. If None or empty, the `permission` permission will
438                be applied to all units. Note: When both `units` and `units_map`
439                are given, `units_map` will be preferred.
440        """
441
442        result = self.requests_post(
443            AllSpice.CREATE_TEAM % org.username,
444            data={
445                "name": name,
446                "description": description,
447                "permission": permission,
448                "can_create_org_repo": can_create_org_repo,
449                "includes_all_repositories": includes_all_repositories,
450                "units": units,
451                "units_map": units_map,
452            },
453        )
454
455        if "id" in result:
456            self.logger.info("Successfully created Team %s" % result["name"])
457        else:
458            self.logger.error("Team not created... (gitea: %s)" % result["message"])
459            self.logger.error(result["message"])
460            raise Exception("Team not created... (gitea: %s)" % result["message"])
461        api_object = Team.parse_response(self, result)
462        setattr(
463            api_object, "_organization", org
464        )  # fixes strange behaviour of gitea not returning a valid organization here.
465        return api_object

Creates a Team.

Args: org (Organization): Organization the Team will be part of. name (str): The Name of the Team to be created. description (str): Optional, None, short description of the new Team. permission (str): Optional, 'read', What permissions the members units_map (dict): Optional, {}, a mapping of units to their permissions. If None or empty, the permission permission will be applied to all units. Note: When both units and units_map are given, units_map will be preferred.

class AlreadyExistsException(builtins.Exception):
2class AlreadyExistsException(Exception):
3    pass

Common base class for all non-exit exceptions.

class Branch(allspice.baseapiobject.ReadonlyApiObject):
419class Branch(ReadonlyApiObject):
420    commit: Dict[str, Optional[Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]]]
421    effective_branch_protection_name: str
422    enable_status_check: bool
423    name: str
424    protected: bool
425    required_approvals: int
426    status_check_contexts: List[Any]
427    user_can_merge: bool
428    user_can_push: bool
429
430    API_OBJECT = """/repos/{owner}/{repo}/branches/{branch}"""
431
432    def __init__(self, allspice_client):
433        super().__init__(allspice_client)
434
435    def __eq__(self, other):
436        if not isinstance(other, Branch):
437            return False
438        return self.commit == other.commit and self.name == other.name
439
440    def __hash__(self):
441        return hash(self.commit["id"]) ^ hash(self.name)
442
443    _fields_to_parsers: ClassVar[dict] = {
444        # This is not a commit object
445        # "commit": lambda allspice_client, c: Commit.parse_response(allspice_client, c)
446    }
447
448    @classmethod
449    def request(cls, allspice_client, owner: str, repo: str, branch: str):
450        return cls._request(allspice_client, {"owner": owner, "repo": repo, "branch": branch})
Branch(allspice_client)
432    def __init__(self, allspice_client):
433        super().__init__(allspice_client)
commit: Dict[str, Union[str, Dict[str, str], Dict[str, Union[bool, str, NoneType]], NoneType]]
effective_branch_protection_name: str
enable_status_check: bool
name: str
protected: bool
required_approvals: int
status_check_contexts: List[Any]
user_can_merge: bool
user_can_push: bool
API_OBJECT = '/repos/{owner}/{repo}/branches/{branch}'
@classmethod
def request(cls, allspice_client, owner: str, repo: str, branch: str):
448    @classmethod
449    def request(cls, allspice_client, owner: str, repo: str, branch: str):
450        return cls._request(allspice_client, {"owner": owner, "repo": repo, "branch": branch})
class Comment(allspice.baseapiobject.ApiObject):
1565class Comment(ApiObject):
1566    assets: List[Union[Any, Dict[str, Union[int, str]]]]
1567    body: str
1568    created_at: datetime
1569    html_url: str
1570    id: int
1571    issue_url: str
1572    original_author: str
1573    original_author_id: int
1574    pull_request_url: str
1575    updated_at: datetime
1576    user: User
1577
1578    API_OBJECT = """/repos/{owner}/{repo}/issues/comments/{id}"""
1579    GET_ATTACHMENTS_PATH = """/repos/{owner}/{repo}/issues/comments/{id}/assets"""
1580    ATTACHMENT_PATH = """/repos/{owner}/{repo}/issues/comments/{id}/assets/{attachment_id}"""
1581
1582    def __init__(self, allspice_client):
1583        super().__init__(allspice_client)
1584
1585    def __eq__(self, other):
1586        if not isinstance(other, Comment):
1587            return False
1588        return self.repository == other.repository and self.id == other.id
1589
1590    def __hash__(self):
1591        return hash(self.repository) ^ hash(self.id)
1592
1593    @classmethod
1594    def request(cls, allspice_client, owner: str, repo: str, id: str) -> "Comment":
1595        return cls._request(allspice_client, {"owner": owner, "repo": repo, "id": id})
1596
1597    _fields_to_parsers: ClassVar[dict] = {
1598        "user": lambda allspice_client, r: User.parse_response(allspice_client, r),
1599        "created_at": lambda _, t: Util.convert_time(t),
1600        "updated_at": lambda _, t: Util.convert_time(t),
1601    }
1602
1603    _patchable_fields: ClassVar[set[str]] = {"body"}
1604
1605    @property
1606    def parent_url(self) -> str:
1607        """URL of the parent of this comment (the issue or the pull request)"""
1608
1609        if self.issue_url is not None and self.issue_url != "":
1610            return self.issue_url
1611        else:
1612            return self.pull_request_url
1613
1614    @cached_property
1615    def repository(self) -> Repository:
1616        """The repository this comment was posted on."""
1617
1618        owner_name, repo_name = self.parent_url.split("/")[-4:-2]
1619        return Repository.request(self.allspice_client, owner_name, repo_name)
1620
1621    def __fields_for_path(self):
1622        return {
1623            "owner": self.repository.owner.username,
1624            "repo": self.repository.name,
1625            "id": self.id,
1626        }
1627
1628    def commit(self):
1629        self._commit(self.__fields_for_path())
1630
1631    def delete(self):
1632        self.allspice_client.requests_delete(self.API_OBJECT.format(**self.__fields_for_path()))
1633        self.deleted = True
1634
1635    def get_attachments(self) -> List[Attachment]:
1636        """
1637        Get all attachments on this comment. This returns Attachment objects, which
1638        contain a link to download the attachment.
1639
1640        https://hub.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments
1641        """
1642
1643        results = self.allspice_client.requests_get(
1644            self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path())
1645        )
1646        return [Attachment.parse_response(self.allspice_client, result) for result in results]
1647
1648    def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment:
1649        """
1650        Create an attachment on this comment.
1651
1652        https://hub.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment
1653
1654        :param file: The file to attach. This should be a file-like object.
1655        :param name: The name of the file. If not provided, the name of the file will be
1656                     used.
1657        :return: The created attachment.
1658        """
1659
1660        args: dict[str, Any] = {
1661            "files": {"attachment": file},
1662        }
1663        if name is not None:
1664            args["params"] = {"name": name}
1665
1666        result = self.allspice_client.requests_post(
1667            self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()),
1668            **args,
1669        )
1670        return Attachment.parse_response(self.allspice_client, result)
1671
1672    def edit_attachment(self, attachment: Attachment, data: dict) -> Attachment:
1673        """
1674        Edit an attachment.
1675
1676        The list of params that can be edited is available at
1677        https://hub.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment
1678
1679        :param attachment: The attachment to be edited
1680        :param data: The data parameter should be a dictionary of the fields to edit.
1681        :return: The edited attachment
1682        """
1683
1684        args = {
1685            **self.__fields_for_path(),
1686            "attachment_id": attachment.id,
1687        }
1688        result = self.allspice_client.requests_patch(
1689            self.ATTACHMENT_PATH.format(**args),
1690            data=data,
1691        )
1692        return Attachment.parse_response(self.allspice_client, result)
1693
1694    def delete_attachment(self, attachment: Attachment):
1695        """https://hub.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment"""
1696
1697        args = {
1698            **self.__fields_for_path(),
1699            "attachment_id": attachment.id,
1700        }
1701        self.allspice_client.requests_delete(self.ATTACHMENT_PATH.format(**args))
1702        attachment.deleted = True
Comment(allspice_client)
1582    def __init__(self, allspice_client):
1583        super().__init__(allspice_client)
assets: List[Union[Any, Dict[str, Union[int, str]]]]
body: str
created_at: datetime.datetime
html_url: str
id: int
issue_url: str
original_author: str
original_author_id: int
pull_request_url: str
updated_at: datetime.datetime
user: User
API_OBJECT = '/repos/{owner}/{repo}/issues/comments/{id}'
GET_ATTACHMENTS_PATH = '/repos/{owner}/{repo}/issues/comments/{id}/assets'
ATTACHMENT_PATH = '/repos/{owner}/{repo}/issues/comments/{id}/assets/{attachment_id}'
@classmethod
def request( cls, allspice_client, owner: str, repo: str, id: str) -> Comment:
1593    @classmethod
1594    def request(cls, allspice_client, owner: str, repo: str, id: str) -> "Comment":
1595        return cls._request(allspice_client, {"owner": owner, "repo": repo, "id": id})
parent_url: str
1605    @property
1606    def parent_url(self) -> str:
1607        """URL of the parent of this comment (the issue or the pull request)"""
1608
1609        if self.issue_url is not None and self.issue_url != "":
1610            return self.issue_url
1611        else:
1612            return self.pull_request_url

URL of the parent of this comment (the issue or the pull request)

repository: Repository
1614    @cached_property
1615    def repository(self) -> Repository:
1616        """The repository this comment was posted on."""
1617
1618        owner_name, repo_name = self.parent_url.split("/")[-4:-2]
1619        return Repository.request(self.allspice_client, owner_name, repo_name)

The repository this comment was posted on.

def commit(self):
1628    def commit(self):
1629        self._commit(self.__fields_for_path())
def delete(self):
1631    def delete(self):
1632        self.allspice_client.requests_delete(self.API_OBJECT.format(**self.__fields_for_path()))
1633        self.deleted = True
def get_attachments(self) -> List[allspice.apiobject.Attachment]:
1635    def get_attachments(self) -> List[Attachment]:
1636        """
1637        Get all attachments on this comment. This returns Attachment objects, which
1638        contain a link to download the attachment.
1639
1640        https://hub.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments
1641        """
1642
1643        results = self.allspice_client.requests_get(
1644            self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path())
1645        )
1646        return [Attachment.parse_response(self.allspice_client, result) for result in results]

Get all attachments on this comment. This returns Attachment objects, which contain a link to download the attachment.

allspice.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments">https://huballspice.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments

def create_attachment( self, file: <class 'IO'>, name: Optional[str] = None) -> allspice.apiobject.Attachment:
1648    def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment:
1649        """
1650        Create an attachment on this comment.
1651
1652        https://hub.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment
1653
1654        :param file: The file to attach. This should be a file-like object.
1655        :param name: The name of the file. If not provided, the name of the file will be
1656                     used.
1657        :return: The created attachment.
1658        """
1659
1660        args: dict[str, Any] = {
1661            "files": {"attachment": file},
1662        }
1663        if name is not None:
1664            args["params"] = {"name": name}
1665
1666        result = self.allspice_client.requests_post(
1667            self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()),
1668            **args,
1669        )
1670        return Attachment.parse_response(self.allspice_client, result)

Create an attachment on this comment.

allspice.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment

Parameters
  • file: The file to attach. This should be a file-like object.
  • name: The name of the file. If not provided, the name of the file will be used.
Returns

The created attachment.

def edit_attachment( self, attachment: allspice.apiobject.Attachment, data: dict) -> allspice.apiobject.Attachment:
1672    def edit_attachment(self, attachment: Attachment, data: dict) -> Attachment:
1673        """
1674        Edit an attachment.
1675
1676        The list of params that can be edited is available at
1677        https://hub.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment
1678
1679        :param attachment: The attachment to be edited
1680        :param data: The data parameter should be a dictionary of the fields to edit.
1681        :return: The edited attachment
1682        """
1683
1684        args = {
1685            **self.__fields_for_path(),
1686            "attachment_id": attachment.id,
1687        }
1688        result = self.allspice_client.requests_patch(
1689            self.ATTACHMENT_PATH.format(**args),
1690            data=data,
1691        )
1692        return Attachment.parse_response(self.allspice_client, result)

Edit an attachment.

The list of params that can be edited is available at allspice.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment

Parameters
  • attachment: The attachment to be edited
  • data: The data parameter should be a dictionary of the fields to edit.
Returns

The edited attachment

def delete_attachment(self, attachment: allspice.apiobject.Attachment):
1694    def delete_attachment(self, attachment: Attachment):
1695        """https://hub.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment"""
1696
1697        args = {
1698            **self.__fields_for_path(),
1699            "attachment_id": attachment.id,
1700        }
1701        self.allspice_client.requests_delete(self.ATTACHMENT_PATH.format(**args))
1702        attachment.deleted = True

allspice.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment

class Commit(allspice.baseapiobject.ReadonlyApiObject):
1705class Commit(ReadonlyApiObject):
1706    author: User
1707    commit: Dict[str, Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]]
1708    committer: Dict[str, Union[int, str, bool]]
1709    created: str
1710    files: List[Dict[str, str]]
1711    html_url: str
1712    inner_commit: Dict[str, Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]]
1713    parents: List[Union[Dict[str, str], Any]]
1714    sha: str
1715    stats: Dict[str, int]
1716    url: str
1717
1718    API_OBJECT = """/repos/{owner}/{repo}/commits/{sha}"""
1719    COMMIT_GET_STATUS = """/repos/{owner}/{repo}/commits/{sha}/status"""
1720    COMMIT_GET_STATUSES = """/repos/{owner}/{repo}/commits/{sha}/statuses"""
1721
1722    # Regex to extract owner and repo names from the url property
1723    URL_REGEXP = re.compile(r"/repos/([^/]+)/([^/]+)/git/commits")
1724
1725    def __init__(self, allspice_client):
1726        super().__init__(allspice_client)
1727
1728    _fields_to_parsers: ClassVar[dict] = {
1729        # NOTE: api may return None for commiters that are no allspice users
1730        "author": lambda allspice_client, u: (
1731            User.parse_response(allspice_client, u) if u else None
1732        )
1733    }
1734
1735    def __eq__(self, other):
1736        if not isinstance(other, Commit):
1737            return False
1738        return self.sha == other.sha
1739
1740    def __hash__(self):
1741        return hash(self.sha)
1742
1743    @classmethod
1744    def parse_response(cls, allspice_client, result) -> "Commit":
1745        commit_cache = result["commit"]
1746        api_object = cls(allspice_client)
1747        cls._initialize(allspice_client, api_object, result)
1748        # inner_commit for legacy reasons
1749        Commit._add_read_property("inner_commit", commit_cache, api_object)
1750        return api_object
1751
1752    def get_status(self) -> CommitCombinedStatus:
1753        """
1754        Get a combined status consisting of all statues on this commit.
1755
1756        Note that the returned object is a CommitCombinedStatus object, which
1757        also contains a list of all statuses on the commit.
1758
1759        https://hub.allspice.io/api/swagger#/repository/repoGetCommitStatus
1760        """
1761
1762        result = self.allspice_client.requests_get(
1763            self.COMMIT_GET_STATUS.format(**self._fields_for_path)
1764        )
1765        return CommitCombinedStatus.parse_response(self.allspice_client, result)
1766
1767    def get_statuses(self) -> List[CommitStatus]:
1768        """
1769        Get a list of all statuses on this commit.
1770
1771        https://hub.allspice.io/api/swagger#/repository/repoListCommitStatuses
1772        """
1773
1774        results = self.allspice_client.requests_get(
1775            self.COMMIT_GET_STATUSES.format(**self._fields_for_path)
1776        )
1777        return [CommitStatus.parse_response(self.allspice_client, result) for result in results]
1778
1779    @cached_property
1780    def _fields_for_path(self) -> dict[str, str]:
1781        matches = self.URL_REGEXP.search(self.url)
1782        if not matches:
1783            raise ValueError(f"Invalid commit URL: {self.url}")
1784
1785        return {
1786            "owner": matches.group(1),
1787            "repo": matches.group(2),
1788            "sha": self.sha,
1789        }
Commit(allspice_client)
1725    def __init__(self, allspice_client):
1726        super().__init__(allspice_client)
author: User
commit: Dict[str, Union[str, Dict[str, str], Dict[str, Union[bool, str, NoneType]]]]
committer: Dict[str, Union[int, str, bool]]
created: str
files: List[Dict[str, str]]
html_url: str
inner_commit: Dict[str, Union[str, Dict[str, str], Dict[str, Union[bool, str, NoneType]]]]
parents: List[Union[Dict[str, str], Any]]
sha: str
stats: Dict[str, int]
url: str
API_OBJECT = '/repos/{owner}/{repo}/commits/{sha}'
COMMIT_GET_STATUS = '/repos/{owner}/{repo}/commits/{sha}/status'
COMMIT_GET_STATUSES = '/repos/{owner}/{repo}/commits/{sha}/statuses'
URL_REGEXP = re.compile('/repos/([^/]+)/([^/]+)/git/commits')
@classmethod
def parse_response(cls, allspice_client, result) -> Commit:
1743    @classmethod
1744    def parse_response(cls, allspice_client, result) -> "Commit":
1745        commit_cache = result["commit"]
1746        api_object = cls(allspice_client)
1747        cls._initialize(allspice_client, api_object, result)
1748        # inner_commit for legacy reasons
1749        Commit._add_read_property("inner_commit", commit_cache, api_object)
1750        return api_object
def get_status(self) -> allspice.apiobject.CommitCombinedStatus:
1752    def get_status(self) -> CommitCombinedStatus:
1753        """
1754        Get a combined status consisting of all statues on this commit.
1755
1756        Note that the returned object is a CommitCombinedStatus object, which
1757        also contains a list of all statuses on the commit.
1758
1759        https://hub.allspice.io/api/swagger#/repository/repoGetCommitStatus
1760        """
1761
1762        result = self.allspice_client.requests_get(
1763            self.COMMIT_GET_STATUS.format(**self._fields_for_path)
1764        )
1765        return CommitCombinedStatus.parse_response(self.allspice_client, result)

Get a combined status consisting of all statues on this commit.

Note that the returned object is a CommitCombinedStatus object, which also contains a list of all statuses on the commit.

allspice.allspice.io/api/swagger#/repository/repoGetCommitStatus">https://huballspice.allspice.io/api/swagger#/repository/repoGetCommitStatus

def get_statuses(self) -> List[allspice.apiobject.CommitStatus]:
1767    def get_statuses(self) -> List[CommitStatus]:
1768        """
1769        Get a list of all statuses on this commit.
1770
1771        https://hub.allspice.io/api/swagger#/repository/repoListCommitStatuses
1772        """
1773
1774        results = self.allspice_client.requests_get(
1775            self.COMMIT_GET_STATUSES.format(**self._fields_for_path)
1776        )
1777        return [CommitStatus.parse_response(self.allspice_client, result) for result in results]

Get a list of all statuses on this commit.

allspice.allspice.io/api/swagger#/repository/repoListCommitStatuses">https://huballspice.allspice.io/api/swagger#/repository/repoListCommitStatuses

class Content(allspice.baseapiobject.ReadonlyApiObject):
2527class Content(ReadonlyApiObject):
2528    content: Any
2529    download_url: str
2530    encoding: Any
2531    git_url: str
2532    html_url: str
2533    last_commit_sha: str
2534    name: str
2535    path: str
2536    sha: str
2537    size: int
2538    submodule_git_url: Any
2539    target: Any
2540    type: str
2541    url: str
2542
2543    FILE = "file"
2544
2545    def __init__(self, allspice_client):
2546        super().__init__(allspice_client)
2547
2548    def __eq__(self, other):
2549        if not isinstance(other, Content):
2550            return False
2551
2552        return self.sha == other.sha and self.name == other.name
2553
2554    def __hash__(self):
2555        return hash(self.sha) ^ hash(self.name)
Content(allspice_client)
2545    def __init__(self, allspice_client):
2546        super().__init__(allspice_client)
content: Any
download_url: str
encoding: Any
git_url: str
html_url: str
last_commit_sha: str
name: str
path: str
sha: str
size: int
submodule_git_url: Any
target: Any
type: str
url: str
FILE = 'file'
class DesignReview(allspice.baseapiobject.ApiObject):
2026class DesignReview(ApiObject):
2027    """
2028    A Design Review. See
2029    https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest.
2030
2031    Note: The base and head fields are not `Branch` objects - they are plain strings
2032    referring to the branch names. This is because DRs can exist for branches that have
2033    been deleted, which don't have an associated `Branch` object from the API. You can use
2034    the `Repository.get_branch` method to get a `Branch` object for a branch if you know
2035    it exists.
2036    """
2037
2038    additions: int
2039    allow_maintainer_edit: bool
2040    allow_maintainer_edits: Any
2041    assignee: User
2042    assignees: List["User"]
2043    base: str
2044    body: str
2045    changed_files: int
2046    closed_at: Any
2047    comments: int
2048    created_at: str
2049    deletions: int
2050    diff_url: str
2051    draft: bool
2052    due_date: Optional[str]
2053    head: str
2054    html_url: str
2055    id: int
2056    is_locked: bool
2057    labels: List[Any]
2058    merge_base: str
2059    merge_commit_sha: Any
2060    mergeable: bool
2061    merged: bool
2062    merged_at: Any
2063    merged_by: Any
2064    milestone: Any
2065    number: int
2066    patch_url: str
2067    pin_order: int
2068    repository: Optional["Repository"]
2069    requested_reviewers: Any
2070    review_comments: int
2071    state: str
2072    title: str
2073    updated_at: str
2074    url: str
2075    user: User
2076
2077    API_OBJECT = "/repos/{owner}/{repo}/pulls/{index}"
2078    MERGE_DESIGN_REVIEW = "/repos/{owner}/{repo}/pulls/{index}/merge"
2079    GET_COMMENTS = "/repos/{owner}/{repo}/issues/{index}/comments"
2080
2081    OPEN = "open"
2082    CLOSED = "closed"
2083
2084    class MergeType(Enum):
2085        MERGE = "merge"
2086        REBASE = "rebase"
2087        REBASE_MERGE = "rebase-merge"
2088        SQUASH = "squash"
2089        MANUALLY_MERGED = "manually-merged"
2090
2091    def __init__(self, allspice_client):
2092        super().__init__(allspice_client)
2093
2094    def __eq__(self, other):
2095        if not isinstance(other, DesignReview):
2096            return False
2097        return self.repository == other.repository and self.id == other.id
2098
2099    def __hash__(self):
2100        return hash(self.repository) ^ hash(self.id)
2101
2102    @classmethod
2103    def parse_response(cls, allspice_client, result) -> "DesignReview":
2104        api_object = super().parse_response(allspice_client, result)
2105        cls._add_read_property(
2106            "repository",
2107            Repository.parse_response(allspice_client, result["base"]["repo"]),
2108            api_object,
2109        )
2110
2111        return api_object
2112
2113    @classmethod
2114    def request(cls, allspice_client, owner: str, repo: str, number: str):
2115        """See https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest"""
2116        return cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number})
2117
2118    _fields_to_parsers: ClassVar[dict] = {
2119        "assignee": lambda allspice_client, u: User.parse_response(allspice_client, u),
2120        "assignees": lambda allspice_client, us: [
2121            User.parse_response(allspice_client, u) for u in us
2122        ],
2123        "base": lambda _, b: b["ref"],
2124        "head": lambda _, h: h["ref"],
2125        "merged_by": lambda allspice_client, u: User.parse_response(allspice_client, u),
2126        "milestone": lambda allspice_client, m: Milestone.parse_response(allspice_client, m),
2127        "user": lambda allspice_client, u: User.parse_response(allspice_client, u),
2128    }
2129
2130    _patchable_fields: ClassVar[set[str]] = {
2131        "allow_maintainer_edits",
2132        "assignee",
2133        "assignees",
2134        "base",
2135        "body",
2136        "due_date",
2137        "milestone",
2138        "state",
2139        "title",
2140    }
2141
2142    _parsers_to_fields: ClassVar[dict] = {
2143        "assignee": lambda u: u.username,
2144        "assignees": lambda us: [u.username for u in us],
2145        "base": lambda b: b.name if isinstance(b, Branch) else b,
2146        "milestone": lambda m: m.id,
2147    }
2148
2149    def commit(self):
2150        data = self.get_dirty_fields()
2151        if "due_date" in data and data["due_date"] is None:
2152            data["unset_due_date"] = True
2153
2154        args = {
2155            "owner": self.repository.owner.username,
2156            "repo": self.repository.name,
2157            "index": self.number,
2158        }
2159        self._commit(args, data)
2160
2161    def merge(self, merge_type: MergeType):
2162        """
2163        Merge the pull request. See
2164        https://hub.allspice.io/api/swagger#/repository/repoMergePullRequest
2165
2166        :param merge_type: The type of merge to perform. See the MergeType enum.
2167        """
2168
2169        self.allspice_client.requests_put(
2170            self.MERGE_DESIGN_REVIEW.format(
2171                owner=self.repository.owner.username,
2172                repo=self.repository.name,
2173                index=self.number,
2174            ),
2175            data={"Do": merge_type.value},
2176        )
2177
2178    def get_comments(self) -> List[Comment]:
2179        """
2180        Get the comments on this pull request, but not specifically on a review.
2181
2182        https://hub.allspice.io/api/swagger#/issue/issueGetComments
2183
2184        :return: A list of comments on this pull request.
2185        """
2186
2187        results = self.allspice_client.requests_get(
2188            self.GET_COMMENTS.format(
2189                owner=self.repository.owner.username,
2190                repo=self.repository.name,
2191                index=self.number,
2192            )
2193        )
2194        return [Comment.parse_response(self.allspice_client, result) for result in results]
2195
2196    def create_comment(self, body: str):
2197        """
2198        Create a comment on this pull request. This uses the same endpoint as the
2199        comments on issues, and will not be associated with any reviews.
2200
2201        https://hub.allspice.io/api/swagger#/issue/issueCreateComment
2202
2203        :param body: The body of the comment.
2204        :return: The comment that was created.
2205        """
2206
2207        result = self.allspice_client.requests_post(
2208            self.GET_COMMENTS.format(
2209                owner=self.repository.owner.username,
2210                repo=self.repository.name,
2211                index=self.number,
2212            ),
2213            data={"body": body},
2214        )
2215        return Comment.parse_response(self.allspice_client, result)

A Design Review. See allspice.allspice.io/api/swagger#/repository/repoGetPullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoGetPullRequest.

Note: The base and head fields are not Branch objects - they are plain strings referring to the branch names. This is because DRs can exist for branches that have been deleted, which don't have an associated Branch object from the API. You can use the Repository.get_branch method to get a Branch object for a branch if you know it exists.

DesignReview(allspice_client)
2091    def __init__(self, allspice_client):
2092        super().__init__(allspice_client)
additions: int
allow_maintainer_edit: bool
allow_maintainer_edits: Any
assignee: User
assignees: List[User]
base: str
body: str
changed_files: int
closed_at: Any
comments: int
created_at: str
deletions: int
diff_url: str
draft: bool
due_date: Optional[str]
head: str
html_url: str
id: int
is_locked: bool
labels: List[Any]
merge_base: str
merge_commit_sha: Any
mergeable: bool
merged: bool
merged_at: Any
merged_by: Any
milestone: Any
number: int
patch_url: str
pin_order: int
repository: Optional[Repository]
requested_reviewers: Any
review_comments: int
state: str
title: str
updated_at: str
url: str
user: User
API_OBJECT = '/repos/{owner}/{repo}/pulls/{index}'
MERGE_DESIGN_REVIEW = '/repos/{owner}/{repo}/pulls/{index}/merge'
GET_COMMENTS = '/repos/{owner}/{repo}/issues/{index}/comments'
OPEN = 'open'
CLOSED = 'closed'
@classmethod
def parse_response(cls, allspice_client, result) -> DesignReview:
2102    @classmethod
2103    def parse_response(cls, allspice_client, result) -> "DesignReview":
2104        api_object = super().parse_response(allspice_client, result)
2105        cls._add_read_property(
2106            "repository",
2107            Repository.parse_response(allspice_client, result["base"]["repo"]),
2108            api_object,
2109        )
2110
2111        return api_object
@classmethod
def request(cls, allspice_client, owner: str, repo: str, number: str):
2113    @classmethod
2114    def request(cls, allspice_client, owner: str, repo: str, number: str):
2115        """See https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest"""
2116        return cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number})

See allspice.allspice.io/api/swagger#/repository/repoGetPullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoGetPullRequest

def commit(self):
2149    def commit(self):
2150        data = self.get_dirty_fields()
2151        if "due_date" in data and data["due_date"] is None:
2152            data["unset_due_date"] = True
2153
2154        args = {
2155            "owner": self.repository.owner.username,
2156            "repo": self.repository.name,
2157            "index": self.number,
2158        }
2159        self._commit(args, data)
def merge(self, merge_type: DesignReview.MergeType):
2161    def merge(self, merge_type: MergeType):
2162        """
2163        Merge the pull request. See
2164        https://hub.allspice.io/api/swagger#/repository/repoMergePullRequest
2165
2166        :param merge_type: The type of merge to perform. See the MergeType enum.
2167        """
2168
2169        self.allspice_client.requests_put(
2170            self.MERGE_DESIGN_REVIEW.format(
2171                owner=self.repository.owner.username,
2172                repo=self.repository.name,
2173                index=self.number,
2174            ),
2175            data={"Do": merge_type.value},
2176        )

Merge the pull request. See allspice.allspice.io/api/swagger#/repository/repoMergePullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoMergePullRequest

Parameters
  • merge_type: The type of merge to perform. See the MergeType enum.
def get_comments(self) -> List[Comment]:
2178    def get_comments(self) -> List[Comment]:
2179        """
2180        Get the comments on this pull request, but not specifically on a review.
2181
2182        https://hub.allspice.io/api/swagger#/issue/issueGetComments
2183
2184        :return: A list of comments on this pull request.
2185        """
2186
2187        results = self.allspice_client.requests_get(
2188            self.GET_COMMENTS.format(
2189                owner=self.repository.owner.username,
2190                repo=self.repository.name,
2191                index=self.number,
2192            )
2193        )
2194        return [Comment.parse_response(self.allspice_client, result) for result in results]

Get the comments on this pull request, but not specifically on a review.

allspice.allspice.io/api/swagger#/issue/issueGetComments">https://huballspice.allspice.io/api/swagger#/issue/issueGetComments

Returns

A list of comments on this pull request.

def create_comment(self, body: str):
2196    def create_comment(self, body: str):
2197        """
2198        Create a comment on this pull request. This uses the same endpoint as the
2199        comments on issues, and will not be associated with any reviews.
2200
2201        https://hub.allspice.io/api/swagger#/issue/issueCreateComment
2202
2203        :param body: The body of the comment.
2204        :return: The comment that was created.
2205        """
2206
2207        result = self.allspice_client.requests_post(
2208            self.GET_COMMENTS.format(
2209                owner=self.repository.owner.username,
2210                repo=self.repository.name,
2211                index=self.number,
2212            ),
2213            data={"body": body},
2214        )
2215        return Comment.parse_response(self.allspice_client, result)

Create a comment on this pull request. This uses the same endpoint as the comments on issues, and will not be associated with any reviews.

allspice.allspice.io/api/swagger#/issue/issueCreateComment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateComment

Parameters
  • body: The body of the comment.
Returns

The comment that was created.

class DesignReview.MergeType(enum.Enum):
2084    class MergeType(Enum):
2085        MERGE = "merge"
2086        REBASE = "rebase"
2087        REBASE_MERGE = "rebase-merge"
2088        SQUASH = "squash"
2089        MANUALLY_MERGED = "manually-merged"
MERGE = <MergeType.MERGE: 'merge'>
REBASE = <MergeType.REBASE: 'rebase'>
REBASE_MERGE = <MergeType.REBASE_MERGE: 'rebase-merge'>
SQUASH = <MergeType.SQUASH: 'squash'>
MANUALLY_MERGED = <MergeType.MANUALLY_MERGED: 'manually-merged'>
class Issue(allspice.baseapiobject.ApiObject):
1880class Issue(ApiObject):
1881    assets: List[Any]
1882    assignee: Any
1883    assignees: Any
1884    body: str
1885    closed_at: Any
1886    comments: int
1887    created_at: str
1888    due_date: Any
1889    html_url: str
1890    id: int
1891    is_locked: bool
1892    labels: List[Any]
1893    milestone: Optional["Milestone"]
1894    number: int
1895    original_author: str
1896    original_author_id: int
1897    pin_order: int
1898    pull_request: Any
1899    ref: str
1900    repository: Dict[str, Union[int, str]]
1901    state: str
1902    title: str
1903    updated_at: str
1904    url: str
1905    user: User
1906
1907    API_OBJECT = """/repos/{owner}/{repo}/issues/{index}"""  # <owner, repo, index>
1908    GET_TIME = """/repos/%s/%s/issues/%s/times"""  # <owner, repo, index>
1909    GET_COMMENTS = """/repos/{owner}/{repo}/issues/{index}/comments"""
1910    CREATE_ISSUE = """/repos/{owner}/{repo}/issues"""
1911
1912    OPENED = "open"
1913    CLOSED = "closed"
1914
1915    def __init__(self, allspice_client):
1916        super().__init__(allspice_client)
1917
1918    def __eq__(self, other):
1919        if not isinstance(other, Issue):
1920            return False
1921        return self.repository == other.repository and self.id == other.id
1922
1923    def __hash__(self):
1924        return hash(self.repository) ^ hash(self.id)
1925
1926    _fields_to_parsers: ClassVar[dict] = {
1927        "milestone": lambda allspice_client, m: Milestone.parse_response(allspice_client, m),
1928        "user": lambda allspice_client, u: User.parse_response(allspice_client, u),
1929        "assignee": lambda allspice_client, u: User.parse_response(allspice_client, u),
1930        "assignees": lambda allspice_client, us: [
1931            User.parse_response(allspice_client, u) for u in us
1932        ],
1933        "state": lambda _, s: (Issue.CLOSED if s == "closed" else Issue.OPENED),
1934    }
1935
1936    _parsers_to_fields: ClassVar[dict] = {
1937        "milestone": lambda m: m.id,
1938    }
1939
1940    _patchable_fields: ClassVar[set[str]] = {
1941        "assignee",
1942        "assignees",
1943        "body",
1944        "due_date",
1945        "milestone",
1946        "state",
1947        "title",
1948    }
1949
1950    def commit(self):
1951        args = {
1952            "owner": self.repository.owner.username,
1953            "repo": self.repository.name,
1954            "index": self.number,
1955        }
1956        self._commit(args)
1957
1958    @classmethod
1959    def request(cls, allspice_client, owner: str, repo: str, number: str):
1960        api_object = cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number})
1961        # The repository in the response is a RepositoryMeta object, so request
1962        # the full repository object and add it to the issue object.
1963        repository = Repository.request(allspice_client, owner, repo)
1964        setattr(api_object, "_repository", repository)
1965        # For legacy reasons
1966        cls._add_read_property("repo", repository, api_object)
1967        return api_object
1968
1969    @classmethod
1970    def create_issue(cls, allspice_client, repo: Repository, title: str, body: str = ""):
1971        args = {"owner": repo.owner.username, "repo": repo.name}
1972        data = {"title": title, "body": body}
1973        result = allspice_client.requests_post(Issue.CREATE_ISSUE.format(**args), data=data)
1974        issue = Issue.parse_response(allspice_client, result)
1975        setattr(issue, "_repository", repo)
1976        cls._add_read_property("repo", repo, issue)
1977        return issue
1978
1979    @property
1980    def owner(self) -> Organization | User:
1981        return self.repository.owner
1982
1983    def get_time_sum(self, user: User) -> int:
1984        results = self.allspice_client.requests_get(
1985            Issue.GET_TIME % (self.owner.username, self.repository.name, self.number)
1986        )
1987        return sum(result["time"] for result in results if result and result["user_id"] == user.id)
1988
1989    def get_times(self) -> Optional[Dict]:
1990        return self.allspice_client.requests_get(
1991            Issue.GET_TIME % (self.owner.username, self.repository.name, self.number)
1992        )
1993
1994    def delete_time(self, time_id: str):
1995        path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times/{time_id}"
1996        self.allspice_client.requests_delete(path)
1997
1998    def add_time(self, time: int, created: Optional[str] = None, user_name: Optional[User] = None):
1999        path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times"
2000        self.allspice_client.requests_post(
2001            path, data={"created": created, "time": int(time), "user_name": user_name}
2002        )
2003
2004    def get_comments(self) -> List[Comment]:
2005        """https://hub.allspice.io/api/swagger#/issue/issueGetComments"""
2006
2007        results = self.allspice_client.requests_get(
2008            self.GET_COMMENTS.format(
2009                owner=self.owner.username, repo=self.repository.name, index=self.number
2010            )
2011        )
2012
2013        return [Comment.parse_response(self.allspice_client, result) for result in results]
2014
2015    def create_comment(self, body: str) -> Comment:
2016        """https://hub.allspice.io/api/swagger#/issue/issueCreateComment"""
2017
2018        path = self.GET_COMMENTS.format(
2019            owner=self.owner.username, repo=self.repository.name, index=self.number
2020        )
2021
2022        response = self.allspice_client.requests_post(path, data={"body": body})
2023        return Comment.parse_response(self.allspice_client, response)
Issue(allspice_client)
1915    def __init__(self, allspice_client):
1916        super().__init__(allspice_client)
assets: List[Any]
assignee: Any
assignees: Any
body: str
closed_at: Any
comments: int
created_at: str
due_date: Any
html_url: str
id: int
is_locked: bool
labels: List[Any]
milestone: Optional[Milestone]
number: int
original_author: str
original_author_id: int
pin_order: int
pull_request: Any
ref: str
repository: Dict[str, Union[int, str]]
state: str
title: str
updated_at: str
url: str
user: User
API_OBJECT = '/repos/{owner}/{repo}/issues/{index}'
GET_TIME = '/repos/%s/%s/issues/%s/times'
GET_COMMENTS = '/repos/{owner}/{repo}/issues/{index}/comments'
CREATE_ISSUE = '/repos/{owner}/{repo}/issues'
OPENED = 'open'
CLOSED = 'closed'
def commit(self):
1950    def commit(self):
1951        args = {
1952            "owner": self.repository.owner.username,
1953            "repo": self.repository.name,
1954            "index": self.number,
1955        }
1956        self._commit(args)
@classmethod
def request(cls, allspice_client, owner: str, repo: str, number: str):
1958    @classmethod
1959    def request(cls, allspice_client, owner: str, repo: str, number: str):
1960        api_object = cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number})
1961        # The repository in the response is a RepositoryMeta object, so request
1962        # the full repository object and add it to the issue object.
1963        repository = Repository.request(allspice_client, owner, repo)
1964        setattr(api_object, "_repository", repository)
1965        # For legacy reasons
1966        cls._add_read_property("repo", repository, api_object)
1967        return api_object
@classmethod
def create_issue( cls, allspice_client, repo: Repository, title: str, body: str = ''):
1969    @classmethod
1970    def create_issue(cls, allspice_client, repo: Repository, title: str, body: str = ""):
1971        args = {"owner": repo.owner.username, "repo": repo.name}
1972        data = {"title": title, "body": body}
1973        result = allspice_client.requests_post(Issue.CREATE_ISSUE.format(**args), data=data)
1974        issue = Issue.parse_response(allspice_client, result)
1975        setattr(issue, "_repository", repo)
1976        cls._add_read_property("repo", repo, issue)
1977        return issue
owner: Organization | User
1979    @property
1980    def owner(self) -> Organization | User:
1981        return self.repository.owner
def get_time_sum(self, user: User) -> int:
1983    def get_time_sum(self, user: User) -> int:
1984        results = self.allspice_client.requests_get(
1985            Issue.GET_TIME % (self.owner.username, self.repository.name, self.number)
1986        )
1987        return sum(result["time"] for result in results if result and result["user_id"] == user.id)
def get_times(self) -> Optional[Dict]:
1989    def get_times(self) -> Optional[Dict]:
1990        return self.allspice_client.requests_get(
1991            Issue.GET_TIME % (self.owner.username, self.repository.name, self.number)
1992        )
def delete_time(self, time_id: str):
1994    def delete_time(self, time_id: str):
1995        path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times/{time_id}"
1996        self.allspice_client.requests_delete(path)
def add_time( self, time: int, created: Optional[str] = None, user_name: Optional[User] = None):
1998    def add_time(self, time: int, created: Optional[str] = None, user_name: Optional[User] = None):
1999        path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times"
2000        self.allspice_client.requests_post(
2001            path, data={"created": created, "time": int(time), "user_name": user_name}
2002        )
def get_comments(self) -> List[Comment]:
2004    def get_comments(self) -> List[Comment]:
2005        """https://hub.allspice.io/api/swagger#/issue/issueGetComments"""
2006
2007        results = self.allspice_client.requests_get(
2008            self.GET_COMMENTS.format(
2009                owner=self.owner.username, repo=self.repository.name, index=self.number
2010            )
2011        )
2012
2013        return [Comment.parse_response(self.allspice_client, result) for result in results]

allspice.allspice.io/api/swagger#/issue/issueGetComments">https://huballspice.allspice.io/api/swagger#/issue/issueGetComments

def create_comment(self, body: str) -> Comment:
2015    def create_comment(self, body: str) -> Comment:
2016        """https://hub.allspice.io/api/swagger#/issue/issueCreateComment"""
2017
2018        path = self.GET_COMMENTS.format(
2019            owner=self.owner.username, repo=self.repository.name, index=self.number
2020        )
2021
2022        response = self.allspice_client.requests_post(path, data={"body": body})
2023        return Comment.parse_response(self.allspice_client, response)

allspice.allspice.io/api/swagger#/issue/issueCreateComment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateComment

class Milestone(allspice.baseapiobject.ApiObject):
1449class Milestone(ApiObject):
1450    allow_merge_commits: Any
1451    allow_rebase: Any
1452    allow_rebase_explicit: Any
1453    allow_squash_merge: Any
1454    archived: Any
1455    closed_at: Any
1456    closed_issues: int
1457    created_at: str
1458    default_branch: Any
1459    description: str
1460    due_on: Any
1461    has_issues: Any
1462    has_pull_requests: Any
1463    has_wiki: Any
1464    id: int
1465    ignore_whitespace_conflicts: Any
1466    name: Any
1467    open_issues: int
1468    private: Any
1469    state: str
1470    title: str
1471    updated_at: str
1472    website: Any
1473
1474    API_OBJECT = """/repos/{owner}/{repo}/milestones/{number}"""  # <owner, repo>
1475
1476    def __init__(self, allspice_client):
1477        super().__init__(allspice_client)
1478
1479    def __eq__(self, other):
1480        if not isinstance(other, Milestone):
1481            return False
1482        return self.allspice_client == other.allspice_client and self.id == other.id
1483
1484    def __hash__(self):
1485        return hash(self.allspice_client) ^ hash(self.id)
1486
1487    _fields_to_parsers: ClassVar[dict] = {
1488        "closed_at": lambda _, t: Util.convert_time(t),
1489        "due_on": lambda _, t: Util.convert_time(t),
1490    }
1491
1492    _patchable_fields: ClassVar[set[str]] = {
1493        "allow_merge_commits",
1494        "allow_rebase",
1495        "allow_rebase_explicit",
1496        "allow_squash_merge",
1497        "archived",
1498        "default_branch",
1499        "description",
1500        "has_issues",
1501        "has_pull_requests",
1502        "has_wiki",
1503        "ignore_whitespace_conflicts",
1504        "name",
1505        "private",
1506        "website",
1507    }
1508
1509    @classmethod
1510    def request(cls, allspice_client, owner: str, repo: str, number: str):
1511        return cls._request(allspice_client, {"owner": owner, "repo": repo, "number": number})
Milestone(allspice_client)
1476    def __init__(self, allspice_client):
1477        super().__init__(allspice_client)
allow_merge_commits: Any
allow_rebase: Any
allow_rebase_explicit: Any
allow_squash_merge: Any
archived: Any
closed_at: Any
closed_issues: int
created_at: str
default_branch: Any
description: str
due_on: Any
has_issues: Any
has_pull_requests: Any
has_wiki: Any
id: int
ignore_whitespace_conflicts: Any
name: Any
open_issues: int
private: Any
state: str
title: str
updated_at: str
website: Any
API_OBJECT = '/repos/{owner}/{repo}/milestones/{number}'
@classmethod
def request(cls, allspice_client, owner: str, repo: str, number: str):
1509    @classmethod
1510    def request(cls, allspice_client, owner: str, repo: str, number: str):
1511        return cls._request(allspice_client, {"owner": owner, "repo": repo, "number": number})
class NotFoundException(builtins.Exception):
6class NotFoundException(Exception):
7    pass

Common base class for all non-exit exceptions.

class Organization(allspice.baseapiobject.ApiObject):
 33class Organization(ApiObject):
 34    """see https://hub.allspice.io/api/swagger#/organization/orgGetAll"""
 35
 36    active: Optional[bool]
 37    avatar_url: str
 38    created: Optional[str]
 39    description: str
 40    email: str
 41    followers_count: Optional[int]
 42    following_count: Optional[int]
 43    full_name: str
 44    html_url: Optional[str]
 45    id: int
 46    is_admin: Optional[bool]
 47    language: Optional[str]
 48    last_login: Optional[str]
 49    location: str
 50    login: Optional[str]
 51    login_name: Optional[str]
 52    name: Optional[str]
 53    prohibit_login: Optional[bool]
 54    repo_admin_change_team_access: Optional[bool]
 55    restricted: Optional[bool]
 56    source_id: Optional[int]
 57    starred_repos_count: Optional[int]
 58    username: str
 59    visibility: str
 60    website: str
 61
 62    API_OBJECT = """/orgs/{name}"""  # <org>
 63    ORG_REPOS_REQUEST = """/orgs/%s/repos"""  # <org>
 64    ORG_TEAMS_REQUEST = """/orgs/%s/teams"""  # <org>
 65    ORG_TEAMS_CREATE = """/orgs/%s/teams"""  # <org>
 66    ORG_GET_MEMBERS = """/orgs/%s/members"""  # <org>
 67    ORG_IS_MEMBER = """/orgs/%s/members/%s"""  # <org>, <username>
 68    ORG_HEATMAP = """/users/%s/heatmap"""  # <username>
 69
 70    def __init__(self, allspice_client):
 71        super().__init__(allspice_client)
 72
 73    def __eq__(self, other):
 74        if not isinstance(other, Organization):
 75            return False
 76        return self.allspice_client == other.allspice_client and self.name == other.name
 77
 78    def __hash__(self):
 79        return hash(self.allspice_client) ^ hash(self.name)
 80
 81    @classmethod
 82    def request(cls, allspice_client, name: str) -> Self:
 83        return cls._request(allspice_client, {"name": name})
 84
 85    @classmethod
 86    def parse_response(cls, allspice_client, result) -> "Organization":
 87        api_object = super().parse_response(allspice_client, result)
 88        # add "name" field to make this behave similar to users for gitea < 1.18
 89        # also necessary for repository-owner when org is repo owner
 90        if not hasattr(api_object, "name"):
 91            Organization._add_read_property("name", result["username"], api_object)
 92        return api_object
 93
 94    _patchable_fields: ClassVar[set[str]] = {
 95        "description",
 96        "full_name",
 97        "location",
 98        "visibility",
 99        "website",
100    }
101
102    def commit(self):
103        args = {"name": self.name}
104        self._commit(args)
105
106    def create_repo(
107        self,
108        repoName: str,
109        description: str = "",
110        private: bool = False,
111        autoInit=True,
112        gitignores: Optional[str] = None,
113        license: Optional[str] = None,
114        readme: str = "Default",
115        issue_labels: Optional[str] = None,
116        default_branch="master",
117    ):
118        """Create an organization Repository
119
120        Throws:
121            AlreadyExistsException: If the Repository exists already.
122            Exception: If something else went wrong.
123        """
124        result = self.allspice_client.requests_post(
125            f"/orgs/{self.name}/repos",
126            data={
127                "name": repoName,
128                "description": description,
129                "private": private,
130                "auto_init": autoInit,
131                "gitignores": gitignores,
132                "license": license,
133                "issue_labels": issue_labels,
134                "readme": readme,
135                "default_branch": default_branch,
136            },
137        )
138        if "id" in result:
139            self.allspice_client.logger.info("Successfully created Repository %s " % result["name"])
140        else:
141            self.allspice_client.logger.error(result["message"])
142            raise Exception("Repository not created... (gitea: %s)" % result["message"])
143        return Repository.parse_response(self.allspice_client, result)
144
145    def get_repositories(self) -> List["Repository"]:
146        results = self.allspice_client.requests_get_paginated(
147            Organization.ORG_REPOS_REQUEST % self.username
148        )
149        return [Repository.parse_response(self.allspice_client, result) for result in results]
150
151    def get_repository(self, name) -> "Repository":
152        repos = self.get_repositories()
153        for repo in repos:
154            if repo.name == name:
155                return repo
156        raise NotFoundException("Repository %s not existent in organization." % name)
157
158    def get_teams(self) -> List["Team"]:
159        results = self.allspice_client.requests_get(Organization.ORG_TEAMS_REQUEST % self.username)
160        teams = [Team.parse_response(self.allspice_client, result) for result in results]
161        # organisation seems to be missing using this request, so we add org manually
162        for t in teams:
163            setattr(t, "_organization", self)
164        return teams
165
166    def get_team(self, name) -> "Team":
167        teams = self.get_teams()
168        for team in teams:
169            if team.name == name:
170                return team
171        raise NotFoundException("Team not existent in organization.")
172
173    def create_team(
174        self,
175        name: str,
176        description: str = "",
177        permission: str = "read",
178        can_create_org_repo: bool = False,
179        includes_all_repositories: bool = False,
180        units=(
181            "repo.code",
182            "repo.issues",
183            "repo.ext_issues",
184            "repo.wiki",
185            "repo.pulls",
186            "repo.releases",
187            "repo.ext_wiki",
188        ),
189        units_map={},
190    ) -> "Team":
191        """Alias for AllSpice#create_team"""
192        # TODO: Move AllSpice#create_team to Organization#create_team and
193        #       deprecate AllSpice#create_team.
194        return self.allspice_client.create_team(
195            org=self,
196            name=name,
197            description=description,
198            permission=permission,
199            can_create_org_repo=can_create_org_repo,
200            includes_all_repositories=includes_all_repositories,
201            units=units,
202            units_map=units_map,
203        )
204
205    def get_members(self) -> List["User"]:
206        results = self.allspice_client.requests_get(Organization.ORG_GET_MEMBERS % self.username)
207        return [User.parse_response(self.allspice_client, result) for result in results]
208
209    def is_member(self, username) -> bool:
210        if isinstance(username, User):
211            username = username.username
212        try:
213            # returns 204 if its ok, 404 if its not
214            self.allspice_client.requests_get(
215                Organization.ORG_IS_MEMBER % (self.username, username)
216            )
217            return True
218        except Exception:
219            return False
220
221    def remove_member(self, user: "User"):
222        path = f"/orgs/{self.username}/members/{user.username}"
223        self.allspice_client.requests_delete(path)
224
225    def delete(self):
226        """Delete this Organization. Invalidates this Objects data. Also deletes all Repositories owned by the User"""
227        for repo in self.get_repositories():
228            repo.delete()
229        self.allspice_client.requests_delete(Organization.API_OBJECT.format(name=self.username))
230        self.deleted = True
231
232    def get_heatmap(self) -> List[Tuple[datetime, int]]:
233        results = self.allspice_client.requests_get(User.USER_HEATMAP % self.username)
234        results = [
235            (datetime.fromtimestamp(result["timestamp"]), result["contributions"])
236            for result in results
237        ]
238        return results

see allspice.allspice.io/api/swagger#/organization/orgGetAll">https://huballspice.allspice.io/api/swagger#/organization/orgGetAll

Organization(allspice_client)
70    def __init__(self, allspice_client):
71        super().__init__(allspice_client)
active: Optional[bool]
avatar_url: str
created: Optional[str]
description: str
email: str
followers_count: Optional[int]
following_count: Optional[int]
full_name: str
html_url: Optional[str]
id: int
is_admin: Optional[bool]
language: Optional[str]
last_login: Optional[str]
location: str
login: Optional[str]
login_name: Optional[str]
name: Optional[str]
prohibit_login: Optional[bool]
repo_admin_change_team_access: Optional[bool]
restricted: Optional[bool]
source_id: Optional[int]
starred_repos_count: Optional[int]
username: str
visibility: str
website: str
API_OBJECT = '/orgs/{name}'
ORG_REPOS_REQUEST = '/orgs/%s/repos'
ORG_TEAMS_REQUEST = '/orgs/%s/teams'
ORG_TEAMS_CREATE = '/orgs/%s/teams'
ORG_GET_MEMBERS = '/orgs/%s/members'
ORG_IS_MEMBER = '/orgs/%s/members/%s'
ORG_HEATMAP = '/users/%s/heatmap'
@classmethod
def request(cls, allspice_client, name: str) -> Self:
81    @classmethod
82    def request(cls, allspice_client, name: str) -> Self:
83        return cls._request(allspice_client, {"name": name})
@classmethod
def parse_response(cls, allspice_client, result) -> Organization:
85    @classmethod
86    def parse_response(cls, allspice_client, result) -> "Organization":
87        api_object = super().parse_response(allspice_client, result)
88        # add "name" field to make this behave similar to users for gitea < 1.18
89        # also necessary for repository-owner when org is repo owner
90        if not hasattr(api_object, "name"):
91            Organization._add_read_property("name", result["username"], api_object)
92        return api_object
def commit(self):
102    def commit(self):
103        args = {"name": self.name}
104        self._commit(args)
def create_repo( self, repoName: str, description: str = '', private: bool = False, autoInit=True, gitignores: Optional[str] = None, license: Optional[str] = None, readme: str = 'Default', issue_labels: Optional[str] = None, default_branch='master'):
106    def create_repo(
107        self,
108        repoName: str,
109        description: str = "",
110        private: bool = False,
111        autoInit=True,
112        gitignores: Optional[str] = None,
113        license: Optional[str] = None,
114        readme: str = "Default",
115        issue_labels: Optional[str] = None,
116        default_branch="master",
117    ):
118        """Create an organization Repository
119
120        Throws:
121            AlreadyExistsException: If the Repository exists already.
122            Exception: If something else went wrong.
123        """
124        result = self.allspice_client.requests_post(
125            f"/orgs/{self.name}/repos",
126            data={
127                "name": repoName,
128                "description": description,
129                "private": private,
130                "auto_init": autoInit,
131                "gitignores": gitignores,
132                "license": license,
133                "issue_labels": issue_labels,
134                "readme": readme,
135                "default_branch": default_branch,
136            },
137        )
138        if "id" in result:
139            self.allspice_client.logger.info("Successfully created Repository %s " % result["name"])
140        else:
141            self.allspice_client.logger.error(result["message"])
142            raise Exception("Repository not created... (gitea: %s)" % result["message"])
143        return Repository.parse_response(self.allspice_client, result)

Create an organization Repository

Throws: AlreadyExistsException: If the Repository exists already. Exception: If something else went wrong.

def get_repositories(self) -> List[Repository]:
145    def get_repositories(self) -> List["Repository"]:
146        results = self.allspice_client.requests_get_paginated(
147            Organization.ORG_REPOS_REQUEST % self.username
148        )
149        return [Repository.parse_response(self.allspice_client, result) for result in results]
def get_repository(self, name) -> Repository:
151    def get_repository(self, name) -> "Repository":
152        repos = self.get_repositories()
153        for repo in repos:
154            if repo.name == name:
155                return repo
156        raise NotFoundException("Repository %s not existent in organization." % name)
def get_teams(self) -> List[Team]:
158    def get_teams(self) -> List["Team"]:
159        results = self.allspice_client.requests_get(Organization.ORG_TEAMS_REQUEST % self.username)
160        teams = [Team.parse_response(self.allspice_client, result) for result in results]
161        # organisation seems to be missing using this request, so we add org manually
162        for t in teams:
163            setattr(t, "_organization", self)
164        return teams
def get_team(self, name) -> Team:
166    def get_team(self, name) -> "Team":
167        teams = self.get_teams()
168        for team in teams:
169            if team.name == name:
170                return team
171        raise NotFoundException("Team not existent in organization.")
def create_team( self, name: str, description: str = '', permission: str = 'read', can_create_org_repo: bool = False, includes_all_repositories: bool = False, units=('repo.code', 'repo.issues', 'repo.ext_issues', 'repo.wiki', 'repo.pulls', 'repo.releases', 'repo.ext_wiki'), units_map={}) -> Team:
173    def create_team(
174        self,
175        name: str,
176        description: str = "",
177        permission: str = "read",
178        can_create_org_repo: bool = False,
179        includes_all_repositories: bool = False,
180        units=(
181            "repo.code",
182            "repo.issues",
183            "repo.ext_issues",
184            "repo.wiki",
185            "repo.pulls",
186            "repo.releases",
187            "repo.ext_wiki",
188        ),
189        units_map={},
190    ) -> "Team":
191        """Alias for AllSpice#create_team"""
192        # TODO: Move AllSpice#create_team to Organization#create_team and
193        #       deprecate AllSpice#create_team.
194        return self.allspice_client.create_team(
195            org=self,
196            name=name,
197            description=description,
198            permission=permission,
199            can_create_org_repo=can_create_org_repo,
200            includes_all_repositories=includes_all_repositories,
201            units=units,
202            units_map=units_map,
203        )

Alias for AllSpice#create_team

def get_members(self) -> List[User]:
205    def get_members(self) -> List["User"]:
206        results = self.allspice_client.requests_get(Organization.ORG_GET_MEMBERS % self.username)
207        return [User.parse_response(self.allspice_client, result) for result in results]
def is_member(self, username) -> bool:
209    def is_member(self, username) -> bool:
210        if isinstance(username, User):
211            username = username.username
212        try:
213            # returns 204 if its ok, 404 if its not
214            self.allspice_client.requests_get(
215                Organization.ORG_IS_MEMBER % (self.username, username)
216            )
217            return True
218        except Exception:
219            return False
def remove_member(self, user: User):
221    def remove_member(self, user: "User"):
222        path = f"/orgs/{self.username}/members/{user.username}"
223        self.allspice_client.requests_delete(path)
def delete(self):
225    def delete(self):
226        """Delete this Organization. Invalidates this Objects data. Also deletes all Repositories owned by the User"""
227        for repo in self.get_repositories():
228            repo.delete()
229        self.allspice_client.requests_delete(Organization.API_OBJECT.format(name=self.username))
230        self.deleted = True

Delete this Organization. Invalidates this Objects data. Also deletes all Repositories owned by the User

def get_heatmap(self) -> List[Tuple[datetime.datetime, int]]:
232    def get_heatmap(self) -> List[Tuple[datetime, int]]:
233        results = self.allspice_client.requests_get(User.USER_HEATMAP % self.username)
234        results = [
235            (datetime.fromtimestamp(result["timestamp"]), result["contributions"])
236            for result in results
237        ]
238        return results
class Release(allspice.baseapiobject.ApiObject):
2301class Release(ApiObject):
2302    """
2303    A release on a repo.
2304    """
2305
2306    assets: List[Union[Any, Dict[str, Union[int, str]], "ReleaseAsset"]]
2307    author: User
2308    body: str
2309    created_at: str
2310    draft: bool
2311    html_url: str
2312    id: int
2313    name: str
2314    prerelease: bool
2315    published_at: str
2316    repo: Optional["Repository"]
2317    repository: Optional["Repository"]
2318    tag_name: str
2319    tarball_url: str
2320    target_commitish: str
2321    upload_url: str
2322    url: str
2323    zipball_url: str
2324
2325    API_OBJECT = "/repos/{owner}/{repo}/releases/{id}"
2326    RELEASE_CREATE_ASSET = "/repos/{owner}/{repo}/releases/{id}/assets"
2327    # Note that we don't strictly need the get_assets route, as the release
2328    # object already contains the assets.
2329
2330    def __init__(self, allspice_client):
2331        super().__init__(allspice_client)
2332
2333    def __eq__(self, other):
2334        if not isinstance(other, Release):
2335            return False
2336        return self.repo == other.repo and self.id == other.id
2337
2338    def __hash__(self):
2339        return hash(self.repo) ^ hash(self.id)
2340
2341    _fields_to_parsers: ClassVar[dict] = {
2342        "author": lambda allspice_client, author: User.parse_response(allspice_client, author),
2343    }
2344    _patchable_fields: ClassVar[set[str]] = {
2345        "body",
2346        "draft",
2347        "name",
2348        "prerelease",
2349        "tag_name",
2350        "target_commitish",
2351    }
2352
2353    @classmethod
2354    def parse_response(cls, allspice_client, result, repo) -> Release:
2355        release = super().parse_response(allspice_client, result)
2356        Release._add_read_property("repository", repo, release)
2357        # For legacy reasons
2358        Release._add_read_property("repo", repo, release)
2359        setattr(
2360            release,
2361            "_assets",
2362            [
2363                ReleaseAsset.parse_response(allspice_client, asset, release)
2364                for asset in result["assets"]
2365            ],
2366        )
2367        return release
2368
2369    @classmethod
2370    def request(
2371        cls,
2372        allspice_client,
2373        owner: str,
2374        repo: str,
2375        id: Optional[int] = None,
2376    ) -> Release:
2377        args = {"owner": owner, "repo": repo, "id": id}
2378        release_response = cls._get_gitea_api_object(allspice_client, args)
2379        repository = Repository.request(allspice_client, owner, repo)
2380        release = cls.parse_response(allspice_client, release_response, repository)
2381        return release
2382
2383    def commit(self):
2384        args = {"owner": self.repo.owner.name, "repo": self.repo.name, "id": self.id}
2385        self._commit(args)
2386
2387    def create_asset(self, file: IO, name: Optional[str] = None) -> ReleaseAsset:
2388        """
2389        Create an asset for this release.
2390
2391        https://hub.allspice.io/api/swagger#/repository/repoCreateReleaseAsset
2392
2393        :param file: The file to upload. This should be a file-like object.
2394        :param name: The name of the file.
2395        :return: The created asset.
2396        """
2397
2398        args: dict[str, Any] = {"files": {"attachment": file}}
2399        if name is not None:
2400            args["params"] = {"name": name}
2401
2402        result = self.allspice_client.requests_post(
2403            self.RELEASE_CREATE_ASSET.format(
2404                owner=self.repo.owner.username,
2405                repo=self.repo.name,
2406                id=self.id,
2407            ),
2408            **args,
2409        )
2410        return ReleaseAsset.parse_response(self.allspice_client, result, self)
2411
2412    def delete(self):
2413        args = {"owner": self.repo.owner.name, "repo": self.repo.name, "id": self.id}
2414        self.allspice_client.requests_delete(self.API_OBJECT.format(**args))
2415        self.deleted = True

A release on a repo.

Release(allspice_client)
2330    def __init__(self, allspice_client):
2331        super().__init__(allspice_client)
assets: List[Union[Any, Dict[str, Union[int, str]], allspice.apiobject.ReleaseAsset]]
author: User
body: str
created_at: str
draft: bool
html_url: str
id: int
name: str
prerelease: bool
published_at: str
repo: Optional[Repository]
repository: Optional[Repository]
tag_name: str
tarball_url: str
target_commitish: str
upload_url: str
url: str
zipball_url: str
API_OBJECT = '/repos/{owner}/{repo}/releases/{id}'
RELEASE_CREATE_ASSET = '/repos/{owner}/{repo}/releases/{id}/assets'
@classmethod
def parse_response(cls, allspice_client, result, repo) -> Release:
2353    @classmethod
2354    def parse_response(cls, allspice_client, result, repo) -> Release:
2355        release = super().parse_response(allspice_client, result)
2356        Release._add_read_property("repository", repo, release)
2357        # For legacy reasons
2358        Release._add_read_property("repo", repo, release)
2359        setattr(
2360            release,
2361            "_assets",
2362            [
2363                ReleaseAsset.parse_response(allspice_client, asset, release)
2364                for asset in result["assets"]
2365            ],
2366        )
2367        return release
@classmethod
def request( cls, allspice_client, owner: str, repo: str, id: Optional[int] = None) -> Release:
2369    @classmethod
2370    def request(
2371        cls,
2372        allspice_client,
2373        owner: str,
2374        repo: str,
2375        id: Optional[int] = None,
2376    ) -> Release:
2377        args = {"owner": owner, "repo": repo, "id": id}
2378        release_response = cls._get_gitea_api_object(allspice_client, args)
2379        repository = Repository.request(allspice_client, owner, repo)
2380        release = cls.parse_response(allspice_client, release_response, repository)
2381        return release
def commit(self):
2383    def commit(self):
2384        args = {"owner": self.repo.owner.name, "repo": self.repo.name, "id": self.id}
2385        self._commit(args)
def create_asset( self, file: <class 'IO'>, name: Optional[str] = None) -> allspice.apiobject.ReleaseAsset:
2387    def create_asset(self, file: IO, name: Optional[str] = None) -> ReleaseAsset:
2388        """
2389        Create an asset for this release.
2390
2391        https://hub.allspice.io/api/swagger#/repository/repoCreateReleaseAsset
2392
2393        :param file: The file to upload. This should be a file-like object.
2394        :param name: The name of the file.
2395        :return: The created asset.
2396        """
2397
2398        args: dict[str, Any] = {"files": {"attachment": file}}
2399        if name is not None:
2400            args["params"] = {"name": name}
2401
2402        result = self.allspice_client.requests_post(
2403            self.RELEASE_CREATE_ASSET.format(
2404                owner=self.repo.owner.username,
2405                repo=self.repo.name,
2406                id=self.id,
2407            ),
2408            **args,
2409        )
2410        return ReleaseAsset.parse_response(self.allspice_client, result, self)

Create an asset for this release.

allspice.allspice.io/api/swagger#/repository/repoCreateReleaseAsset">https://huballspice.allspice.io/api/swagger#/repository/repoCreateReleaseAsset

Parameters
  • file: The file to upload. This should be a file-like object.
  • name: The name of the file.
Returns

The created asset.

def delete(self):
2412    def delete(self):
2413        args = {"owner": self.repo.owner.name, "repo": self.repo.name, "id": self.id}
2414        self.allspice_client.requests_delete(self.API_OBJECT.format(**args))
2415        self.deleted = True
class Repository(allspice.baseapiobject.ApiObject):
 477class Repository(ApiObject):
 478    allow_fast_forward_only_merge: bool
 479    allow_manual_merge: Any
 480    allow_merge_commits: bool
 481    allow_rebase: bool
 482    allow_rebase_explicit: bool
 483    allow_rebase_update: bool
 484    allow_squash_merge: bool
 485    archived: bool
 486    archived_at: str
 487    autodetect_manual_merge: Any
 488    avatar_url: str
 489    clone_url: str
 490    created_at: str
 491    default_allow_maintainer_edit: bool
 492    default_branch: str
 493    default_delete_branch_after_merge: bool
 494    default_merge_style: str
 495    description: str
 496    empty: bool
 497    enable_prune: Any
 498    external_tracker: Any
 499    external_wiki: Any
 500    fork: bool
 501    forks_count: int
 502    full_name: str
 503    has_actions: bool
 504    has_issues: bool
 505    has_packages: bool
 506    has_projects: bool
 507    has_pull_requests: bool
 508    has_releases: bool
 509    has_wiki: bool
 510    html_url: str
 511    id: int
 512    ignore_whitespace_conflicts: bool
 513    internal: bool
 514    internal_tracker: Dict[str, bool]
 515    language: str
 516    languages_url: str
 517    link: str
 518    mirror: bool
 519    mirror_interval: str
 520    mirror_updated: str
 521    name: str
 522    object_format_name: str
 523    open_issues_count: int
 524    open_pr_counter: int
 525    original_url: str
 526    owner: Union["User", "Organization"]
 527    parent: Any
 528    permissions: Dict[str, bool]
 529    private: bool
 530    projects_mode: str
 531    release_counter: int
 532    repo_transfer: Any
 533    size: int
 534    ssh_url: str
 535    stars_count: int
 536    template: bool
 537    updated_at: datetime
 538    url: str
 539    watchers_count: int
 540    website: str
 541
 542    API_OBJECT = """/repos/{owner}/{name}"""  # <owner>, <reponame>
 543    REPO_IS_COLLABORATOR = """/repos/%s/%s/collaborators/%s"""  # <owner>, <reponame>, <username>
 544    REPO_SEARCH = """/repos/search/"""
 545    REPO_BRANCHES = """/repos/%s/%s/branches"""  # <owner>, <reponame>
 546    REPO_BRANCH = """/repos/{owner}/{repo}/branches/{branch}"""
 547    REPO_ISSUES = """/repos/{owner}/{repo}/issues"""  # <owner, reponame>
 548    REPO_DESIGN_REVIEWS = """/repos/{owner}/{repo}/pulls"""
 549    REPO_DELETE = """/repos/%s/%s"""  # <owner>, <reponame>
 550    REPO_TIMES = """/repos/%s/%s/times"""  # <owner>, <reponame>
 551    REPO_USER_TIME = """/repos/%s/%s/times/%s"""  # <owner>, <reponame>, <username>
 552    REPO_COMMITS = "/repos/%s/%s/commits"  # <owner>, <reponame>
 553    REPO_TRANSFER = "/repos/{owner}/{repo}/transfer"
 554    REPO_MILESTONES = """/repos/{owner}/{repo}/milestones"""
 555    REPO_GET_ARCHIVE = "/repos/{owner}/{repo}/archive/{ref}.{format}"
 556    REPO_GET_ALLSPICE_JSON = "/repos/{owner}/{repo}/allspice_generated/json/{content}"
 557    REPO_GET_ALLSPICE_SVG = "/repos/{owner}/{repo}/allspice_generated/svg/{content}"
 558    REPO_GET_TOPICS = "/repos/{owner}/{repo}/topics"
 559    REPO_ADD_TOPIC = "/repos/{owner}/{repo}/topics/{topic}"
 560    REPO_GET_RELEASES = "/repos/{owner}/{repo}/releases"
 561    REPO_GET_LATEST_RELEASE = "/repos/{owner}/{repo}/releases/latest"
 562    REPO_GET_RELEASE_BY_TAG = "/repos/{owner}/{repo}/releases/tags/{tag}"
 563    REPO_GET_COMMIT_STATUS = "/repos/{owner}/{repo}/statuses/{sha}"
 564    REPO_GET_RAW_FILE = "/repos/{owner}/{repo}/raw/{path}"
 565    REPO_GET_TREE = "/repos/{owner}/{repo}/git/trees/{ref}"
 566
 567    class ArchiveFormat(Enum):
 568        """
 569        Archive formats for Repository.get_archive
 570        """
 571
 572        TAR = "tar.gz"
 573        ZIP = "zip"
 574
 575    class CommitStatusSort(Enum):
 576        """
 577        Sort order for Repository.get_commit_status
 578        """
 579
 580        OLDEST = "oldest"
 581        RECENT_UPDATE = "recentupdate"
 582        LEAST_UPDATE = "leastupdate"
 583        LEAST_INDEX = "leastindex"
 584        HIGHEST_INDEX = "highestindex"
 585
 586    def __init__(self, allspice_client):
 587        super().__init__(allspice_client)
 588
 589    def __eq__(self, other):
 590        if not isinstance(other, Repository):
 591            return False
 592        return self.owner == other.owner and self.name == other.name
 593
 594    def __hash__(self):
 595        return hash(self.owner) ^ hash(self.name)
 596
 597    _fields_to_parsers: ClassVar[dict] = {
 598        # dont know how to tell apart user and org as owner except form email being empty.
 599        "owner": lambda allspice_client, r: (
 600            Organization.parse_response(allspice_client, r)
 601            if r["email"] == ""
 602            else User.parse_response(allspice_client, r)
 603        ),
 604        "updated_at": lambda _, t: Util.convert_time(t),
 605    }
 606
 607    @classmethod
 608    def request(
 609        cls,
 610        allspice_client,
 611        owner: str,
 612        name: str,
 613    ) -> Repository:
 614        return cls._request(allspice_client, {"owner": owner, "name": name})
 615
 616    @classmethod
 617    def search(
 618        cls,
 619        allspice_client,
 620        query: Optional[str] = None,
 621        topic: bool = False,
 622        include_description: bool = False,
 623        user: Optional[User] = None,
 624        owner_to_prioritize: Union[User, Organization, None] = None,
 625    ) -> list[Repository]:
 626        """
 627        Search for repositories.
 628
 629        See https://hub.allspice.io/api/swagger#/repository/repoSearch
 630
 631        :param query: The query string to search for
 632        :param topic: If true, the query string will only be matched against the
 633            repository's topic.
 634        :param include_description: If true, the query string will be matched
 635            against the repository's description as well.
 636        :param user: If specified, only repositories that this user owns or
 637            contributes to will be searched.
 638        :param owner_to_prioritize: If specified, repositories owned by the
 639            given entity will be prioritized in the search.
 640        :returns: All repositories matching the query. If there are many
 641            repositories matching this query, this may take some time.
 642        """
 643
 644        params = {}
 645
 646        if query is not None:
 647            params["q"] = query
 648        if topic:
 649            params["topic"] = topic
 650        if include_description:
 651            params["include_description"] = include_description
 652        if user is not None:
 653            params["user"] = user.id
 654        if owner_to_prioritize is not None:
 655            params["owner_to_prioritize"] = owner_to_prioritize.id
 656
 657        responses = allspice_client.requests_get_paginated(cls.REPO_SEARCH, params=params)
 658
 659        return [Repository.parse_response(allspice_client, response) for response in responses]
 660
 661    _patchable_fields: ClassVar[set[str]] = {
 662        "allow_manual_merge",
 663        "allow_merge_commits",
 664        "allow_rebase",
 665        "allow_rebase_explicit",
 666        "allow_rebase_update",
 667        "allow_squash_merge",
 668        "archived",
 669        "autodetect_manual_merge",
 670        "default_branch",
 671        "default_delete_branch_after_merge",
 672        "default_merge_style",
 673        "description",
 674        "enable_prune",
 675        "external_tracker",
 676        "external_wiki",
 677        "has_issues",
 678        "has_projects",
 679        "has_pull_requests",
 680        "has_wiki",
 681        "ignore_whitespace_conflicts",
 682        "internal_tracker",
 683        "mirror_interval",
 684        "name",
 685        "private",
 686        "template",
 687        "website",
 688    }
 689
 690    def commit(self):
 691        args = {"owner": self.owner.username, "name": self.name}
 692        self._commit(args)
 693
 694    def get_branches(self) -> List["Branch"]:
 695        """Get all the Branches of this Repository."""
 696
 697        results = self.allspice_client.requests_get_paginated(
 698            Repository.REPO_BRANCHES % (self.owner.username, self.name)
 699        )
 700        return [Branch.parse_response(self.allspice_client, result) for result in results]
 701
 702    def get_branch(self, name: str) -> "Branch":
 703        """Get a specific Branch of this Repository."""
 704        result = self.allspice_client.requests_get(
 705            Repository.REPO_BRANCH.format(owner=self.owner.username, repo=self.name, branch=name)
 706        )
 707        return Branch.parse_response(self.allspice_client, result)
 708
 709    def add_branch(self, create_from: Ref, newname: str) -> "Branch":
 710        """Add a branch to the repository"""
 711        # Note: will only work with gitea 1.13 or higher!
 712
 713        ref_name = Util.data_params_for_ref(create_from)
 714        if "ref" not in ref_name:
 715            raise ValueError("create_from must be a Branch, Commit or string")
 716        ref_name = ref_name["ref"]
 717
 718        data = {"new_branch_name": newname, "old_ref_name": ref_name}
 719        result = self.allspice_client.requests_post(
 720            Repository.REPO_BRANCHES % (self.owner.username, self.name), data=data
 721        )
 722        return Branch.parse_response(self.allspice_client, result)
 723
 724    def get_issues(
 725        self,
 726        state: Literal["open", "closed", "all"] = "all",
 727        search_query: Optional[str] = None,
 728        labels: Optional[List[str]] = None,
 729        milestones: Optional[List[Union[Milestone, str]]] = None,
 730        assignee: Optional[Union[User, str]] = None,
 731        since: Optional[datetime] = None,
 732        before: Optional[datetime] = None,
 733    ) -> List["Issue"]:
 734        """
 735        Get all Issues of this Repository (open and closed)
 736
 737        https://hub.allspice.io/api/swagger#/repository/repoListIssues
 738
 739        All params of this method are optional filters. If you don't specify a filter, it
 740        will not be applied.
 741
 742        :param state: The state of the Issues to get. If None, all Issues are returned.
 743        :param search_query: Filter issues by text. This is equivalent to searching for
 744                             `search_query` in the Issues on the web interface.
 745        :param labels: Filter issues by labels.
 746        :param milestones: Filter issues by milestones.
 747        :param assignee: Filter issues by the assigned user.
 748        :param since: Filter issues by the date they were created.
 749        :param before: Filter issues by the date they were created.
 750        :return: A list of Issues.
 751        """
 752
 753        data = {
 754            "state": state,
 755        }
 756        if search_query:
 757            data["q"] = search_query
 758        if labels:
 759            data["labels"] = ",".join(labels)
 760        if milestones:
 761            data["milestone"] = ",".join(
 762                [
 763                    milestone.name if isinstance(milestone, Milestone) else milestone
 764                    for milestone in milestones
 765                ]
 766            )
 767        if assignee:
 768            if isinstance(assignee, User):
 769                data["assignee"] = assignee.username
 770            else:
 771                data["assignee"] = assignee
 772        if since:
 773            data["since"] = Util.format_time(since)
 774        if before:
 775            data["before"] = Util.format_time(before)
 776
 777        results = self.allspice_client.requests_get_paginated(
 778            Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name),
 779            params=data,
 780        )
 781
 782        issues = []
 783        for result in results:
 784            issue = Issue.parse_response(self.allspice_client, result)
 785            # See Issue.request
 786            setattr(issue, "_repository", self)
 787            # This is mostly for compatibility with an older implementation
 788            Issue._add_read_property("repo", self, issue)
 789            issues.append(issue)
 790
 791        return issues
 792
 793    def get_design_reviews(
 794        self,
 795        state: Literal["open", "closed", "all"] = "all",
 796        milestone: Optional[Union[Milestone, str]] = None,
 797        labels: Optional[List[str]] = None,
 798    ) -> List["DesignReview"]:
 799        """
 800        Get all Design Reviews of this Repository.
 801
 802        https://hub.allspice.io/api/swagger#/repository/repoListPullRequests
 803
 804        :param state: The state of the Design Reviews to get. If None, all Design Reviews
 805                      are returned.
 806        :param milestone: The milestone of the Design Reviews to get.
 807        :param labels: A list of label IDs to filter DRs by.
 808        :return: A list of Design Reviews.
 809        """
 810
 811        params = {
 812            "state": state,
 813        }
 814        if milestone:
 815            if isinstance(milestone, Milestone):
 816                params["milestone"] = milestone.name
 817            else:
 818                params["milestone"] = milestone
 819        if labels:
 820            params["labels"] = ",".join(labels)
 821
 822        results = self.allspice_client.requests_get_paginated(
 823            self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name),
 824            params=params,
 825        )
 826        return [DesignReview.parse_response(self.allspice_client, result) for result in results]
 827
 828    def get_commits(
 829        self,
 830        sha: Optional[str] = None,
 831        path: Optional[str] = None,
 832        stat: bool = True,
 833    ) -> List["Commit"]:
 834        """
 835        Get all the Commits of this Repository.
 836
 837        https://hub.allspice.io/api/swagger#/repository/repoGetAllCommits
 838
 839        :param sha: The SHA of the commit to start listing commits from.
 840        :param path: filepath of a file/dir.
 841        :param stat: Include the number of additions and deletions in the response.
 842                     Disable for speedup.
 843        :return: A list of Commits.
 844        """
 845
 846        data = {}
 847        if sha:
 848            data["sha"] = sha
 849        if path:
 850            data["path"] = path
 851        if not stat:
 852            data["stat"] = False
 853
 854        try:
 855            results = self.allspice_client.requests_get_paginated(
 856                Repository.REPO_COMMITS % (self.owner.username, self.name),
 857                params=data,
 858            )
 859        except ConflictException as err:
 860            logging.warning(err)
 861            logging.warning("Repository %s/%s is Empty" % (self.owner.username, self.name))
 862            results = []
 863        return [Commit.parse_response(self.allspice_client, result) for result in results]
 864
 865    def get_issues_state(self, state) -> List["Issue"]:
 866        """
 867        DEPRECATED: Use get_issues() instead.
 868
 869        Get issues of state Issue.open or Issue.closed of a repository.
 870        """
 871
 872        assert state in [Issue.OPENED, Issue.CLOSED]
 873        issues = []
 874        data = {"state": state}
 875        results = self.allspice_client.requests_get_paginated(
 876            Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name),
 877            params=data,
 878        )
 879        for result in results:
 880            issue = Issue.parse_response(self.allspice_client, result)
 881            # adding data not contained in the issue response
 882            # See Issue.request()
 883            setattr(issue, "_repository", self)
 884            Issue._add_read_property("repo", self, issue)
 885            Issue._add_read_property("owner", self.owner, issue)
 886            issues.append(issue)
 887        return issues
 888
 889    def get_times(self):
 890        results = self.allspice_client.requests_get(
 891            Repository.REPO_TIMES % (self.owner.username, self.name)
 892        )
 893        return results
 894
 895    def get_user_time(self, username) -> float:
 896        if isinstance(username, User):
 897            username = username.username
 898        results = self.allspice_client.requests_get(
 899            Repository.REPO_USER_TIME % (self.owner.username, self.name, username)
 900        )
 901        time = sum(r["time"] for r in results)
 902        return time
 903
 904    def get_full_name(self) -> str:
 905        return self.owner.username + "/" + self.name
 906
 907    def create_issue(self, title, assignees=frozenset(), description="") -> ApiObject:
 908        data = {
 909            "assignees": assignees,
 910            "body": description,
 911            "closed": False,
 912            "title": title,
 913        }
 914        result = self.allspice_client.requests_post(
 915            Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name),
 916            data=data,
 917        )
 918
 919        issue = Issue.parse_response(self.allspice_client, result)
 920        setattr(issue, "_repository", self)
 921        Issue._add_read_property("repo", self, issue)
 922        return issue
 923
 924    def create_design_review(
 925        self,
 926        title: str,
 927        head: Union[Branch, str],
 928        base: Union[Branch, str],
 929        assignees: Optional[Set[Union[User, str]]] = None,
 930        body: Optional[str] = None,
 931        due_date: Optional[datetime] = None,
 932        milestone: Optional["Milestone"] = None,
 933    ) -> "DesignReview":
 934        """
 935        Create a new Design Review.
 936
 937        See https://hub.allspice.io/api/swagger#/repository/repoCreatePullRequest
 938
 939        :param title: Title of the Design Review
 940        :param head: Branch or name of the branch to merge into the base branch
 941        :param base: Branch or name of the branch to merge into
 942        :param assignees: Optional. A list of users to assign this review. List can be of
 943                          User objects or of usernames.
 944        :param body: An Optional Description for the Design Review.
 945        :param due_date: An Optional Due date for the Design Review.
 946        :param milestone: An Optional Milestone for the Design Review
 947        :return: The created Design Review
 948        """
 949
 950        data: dict[str, Any] = {
 951            "title": title,
 952        }
 953
 954        if isinstance(head, Branch):
 955            data["head"] = head.name
 956        else:
 957            data["head"] = head
 958        if isinstance(base, Branch):
 959            data["base"] = base.name
 960        else:
 961            data["base"] = base
 962        if assignees:
 963            data["assignees"] = [a.username if isinstance(a, User) else a for a in assignees]
 964        if body:
 965            data["body"] = body
 966        if due_date:
 967            data["due_date"] = Util.format_time(due_date)
 968        if milestone:
 969            data["milestone"] = milestone.id
 970
 971        result = self.allspice_client.requests_post(
 972            self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name),
 973            data=data,
 974        )
 975
 976        return DesignReview.parse_response(self.allspice_client, result)
 977
 978    def create_milestone(
 979        self,
 980        title: str,
 981        description: str,
 982        due_date: Optional[str] = None,
 983        state: str = "open",
 984    ) -> "Milestone":
 985        url = Repository.REPO_MILESTONES.format(owner=self.owner.username, repo=self.name)
 986        data = {"title": title, "description": description, "state": state}
 987        if due_date:
 988            data["due_date"] = due_date
 989        result = self.allspice_client.requests_post(url, data=data)
 990        return Milestone.parse_response(self.allspice_client, result)
 991
 992    def create_gitea_hook(self, hook_url: str, events: List[str]):
 993        url = f"/repos/{self.owner.username}/{self.name}/hooks"
 994        data = {
 995            "type": "gitea",
 996            "config": {"content_type": "json", "url": hook_url},
 997            "events": events,
 998            "active": True,
 999        }
1000        return self.allspice_client.requests_post(url, data=data)
1001
1002    def list_hooks(self):
1003        url = f"/repos/{self.owner.username}/{self.name}/hooks"
1004        return self.allspice_client.requests_get(url)
1005
1006    def delete_hook(self, id: str):
1007        url = f"/repos/{self.owner.username}/{self.name}/hooks/{id}"
1008        self.allspice_client.requests_delete(url)
1009
1010    def is_collaborator(self, username) -> bool:
1011        if isinstance(username, User):
1012            username = username.username
1013        try:
1014            # returns 204 if its ok, 404 if its not
1015            self.allspice_client.requests_get(
1016                Repository.REPO_IS_COLLABORATOR % (self.owner.username, self.name, username)
1017            )
1018            return True
1019        except Exception:
1020            return False
1021
1022    def get_users_with_access(self) -> Sequence[User]:
1023        url = f"/repos/{self.owner.username}/{self.name}/collaborators"
1024        response = self.allspice_client.requests_get(url)
1025        collabs = [User.parse_response(self.allspice_client, user) for user in response]
1026        if isinstance(self.owner, User):
1027            return [*collabs, self.owner]
1028        else:
1029            # owner must be org
1030            teams = self.owner.get_teams()
1031            for team in teams:
1032                team_repos = team.get_repos()
1033                if self.name in [n.name for n in team_repos]:
1034                    collabs += team.get_members()
1035            return collabs
1036
1037    def remove_collaborator(self, user_name: str):
1038        url = f"/repos/{self.owner.username}/{self.name}/collaborators/{user_name}"
1039        self.allspice_client.requests_delete(url)
1040
1041    def transfer_ownership(
1042        self,
1043        new_owner: Union[User, Organization],
1044        new_teams: Set[Team] | FrozenSet[Team] = frozenset(),
1045    ):
1046        url = Repository.REPO_TRANSFER.format(owner=self.owner.username, repo=self.name)
1047        data: dict[str, Any] = {"new_owner": new_owner.username}
1048        if isinstance(new_owner, Organization):
1049            new_team_ids = [team.id for team in new_teams if team in new_owner.get_teams()]
1050            data["team_ids"] = new_team_ids
1051        self.allspice_client.requests_post(url, data=data)
1052        # TODO: make sure this instance is either updated or discarded
1053
1054    def get_git_content(
1055        self,
1056        ref: Optional["Ref"] = None,
1057        commit: "Optional[Commit]" = None,
1058    ) -> List[Content]:
1059        """
1060        Get the metadata for all files in the root directory.
1061
1062        https://hub.allspice.io/api/swagger#/repository/repoGetContentsList
1063
1064        :param ref: branch or commit to get content from
1065        :param commit: commit to get content from (deprecated)
1066        """
1067        url = f"/repos/{self.owner.username}/{self.name}/contents"
1068        data = Util.data_params_for_ref(ref or commit)
1069
1070        result = [
1071            Content.parse_response(self.allspice_client, f)
1072            for f in self.allspice_client.requests_get(url, data)
1073        ]
1074        return result
1075
1076    def get_tree(self, ref: Optional[Ref] = None, recursive: bool = False) -> List[GitEntry]:
1077        """
1078        Get the repository's tree on a given ref.
1079
1080        By default, this will only return the top-level entries in the tree. If you want
1081        to get the entire tree, set `recursive` to True.
1082
1083        :param ref: The ref to get the tree from. If not provided, the default branch is used.
1084        :param recursive: Whether to get the entire tree or just the top-level entries.
1085        """
1086
1087        ref = Util.data_params_for_ref(ref).get("ref", self.default_branch)
1088        url = self.REPO_GET_TREE.format(owner=self.owner.username, repo=self.name, ref=ref)
1089        params = {"recursive": recursive}
1090        results = self.allspice_client.requests_get_paginated(url, params=params)
1091        return [GitEntry.parse_response(self.allspice_client, result) for result in results]
1092
1093    def get_file_content(
1094        self,
1095        content: Content,
1096        ref: Optional[Ref] = None,
1097        commit: Optional[Commit] = None,
1098    ) -> Union[str, List["Content"]]:
1099        """https://hub.allspice.io/api/swagger#/repository/repoGetContents"""
1100        url = f"/repos/{self.owner.username}/{self.name}/contents/{content.path}"
1101        data = Util.data_params_for_ref(ref or commit)
1102
1103        if content.type == Content.FILE:
1104            return self.allspice_client.requests_get(url, data)["content"]
1105        else:
1106            return [
1107                Content.parse_response(self.allspice_client, f)
1108                for f in self.allspice_client.requests_get(url, data)
1109            ]
1110
1111    def get_raw_file(
1112        self,
1113        file_path: str,
1114        ref: Optional[Ref] = None,
1115    ) -> bytes:
1116        """
1117        Get the raw, binary data of a single file.
1118
1119        Note 1: if the file you are requesting is a text file, you might want to
1120        use .decode() on the result to get a string. For example:
1121
1122            content = repo.get_raw_file("file.txt").decode("utf-8")
1123
1124        Note 2: this method will store the entire file in memory. If you want
1125        to download a large file, you might want to use `download_to_file`
1126        instead.
1127
1128        See https://hub.allspice.io/api/swagger#/repository/repoGetRawFile
1129
1130        :param file_path: The path to the file to get.
1131        :param ref: The branch or commit to get the file from.  If not provided,
1132            the default branch is used.
1133        """
1134
1135        url = self.REPO_GET_RAW_FILE.format(
1136            owner=self.owner.username,
1137            repo=self.name,
1138            path=file_path,
1139        )
1140        params = Util.data_params_for_ref(ref)
1141        return self.allspice_client.requests_get_raw(url, params=params)
1142
1143    def download_to_file(
1144        self,
1145        file_path: str,
1146        io: IO,
1147        ref: Optional[Ref] = None,
1148    ) -> None:
1149        """
1150        Download the binary data of a file to a file-like object.
1151
1152        Example:
1153
1154            with open("schematic.DSN", "wb") as f:
1155                Repository.download_to_file("Schematics/my_schematic.DSN", f)
1156
1157        :param file_path: The path to the file in the repository from the root
1158            of the repository.
1159        :param io: The file-like object to write the data to.
1160        """
1161
1162        url = self.allspice_client._AllSpice__get_url(
1163            self.REPO_GET_RAW_FILE.format(
1164                owner=self.owner.username,
1165                repo=self.name,
1166                path=file_path,
1167            )
1168        )
1169        params = Util.data_params_for_ref(ref)
1170        response = self.allspice_client.requests.get(
1171            url,
1172            params=params,
1173            headers=self.allspice_client.headers,
1174            stream=True,
1175        )
1176
1177        for chunk in response.iter_content(chunk_size=4096):
1178            if chunk:
1179                io.write(chunk)
1180
1181    def get_generated_json(self, content: Union[Content, str], ref: Optional[Ref] = None) -> dict:
1182        """
1183        Get the json blob for a cad file if it exists, otherwise enqueue
1184        a new job and return a 503 status.
1185
1186        WARNING: This is still experimental and not recommended for critical
1187        applications. The structure and content of the returned dictionary can
1188        change at any time.
1189
1190        See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON
1191        """
1192
1193        if isinstance(content, Content):
1194            content = content.path
1195
1196        url = self.REPO_GET_ALLSPICE_JSON.format(
1197            owner=self.owner.username,
1198            repo=self.name,
1199            content=content,
1200        )
1201        data = Util.data_params_for_ref(ref)
1202        return self.allspice_client.requests_get(url, data)
1203
1204    def get_generated_svg(self, content: Union[Content, str], ref: Optional[Ref] = None) -> bytes:
1205        """
1206        Get the svg blob for a cad file if it exists, otherwise enqueue
1207        a new job and return a 503 status.
1208
1209        WARNING: This is still experimental and not yet recommended for
1210        critical applications. The content of the returned svg can change
1211        at any time.
1212
1213        See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG
1214        """
1215
1216        if isinstance(content, Content):
1217            content = content.path
1218
1219        url = self.REPO_GET_ALLSPICE_SVG.format(
1220            owner=self.owner.username,
1221            repo=self.name,
1222            content=content,
1223        )
1224        data = Util.data_params_for_ref(ref)
1225        return self.allspice_client.requests_get_raw(url, data)
1226
1227    def create_file(self, file_path: str, content: str, data: Optional[dict] = None):
1228        """https://hub.allspice.io/api/swagger#/repository/repoCreateFile"""
1229        if not data:
1230            data = {}
1231        url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}"
1232        data.update({"content": content})
1233        return self.allspice_client.requests_post(url, data)
1234
1235    def change_file(self, file_path: str, file_sha: str, content: str, data: Optional[dict] = None):
1236        """https://hub.allspice.io/api/swagger#/repository/repoCreateFile"""
1237        if not data:
1238            data = {}
1239        url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}"
1240        data.update({"sha": file_sha, "content": content})
1241        return self.allspice_client.requests_put(url, data)
1242
1243    def delete_file(self, file_path: str, file_sha: str, data: Optional[dict] = None):
1244        """https://hub.allspice.io/api/swagger#/repository/repoDeleteFile"""
1245        if not data:
1246            data = {}
1247        url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}"
1248        data.update({"sha": file_sha})
1249        return self.allspice_client.requests_delete(url, data)
1250
1251    def get_archive(
1252        self,
1253        ref: Ref = "main",
1254        archive_format: ArchiveFormat = ArchiveFormat.ZIP,
1255    ) -> bytes:
1256        """
1257        Download all the files in a specific ref of a repository as a zip or tarball
1258        archive.
1259
1260        https://hub.allspice.io/api/swagger#/repository/repoGetArchive
1261
1262        :param ref: branch or commit to get content from, defaults to the "main" branch
1263        :param archive_format: zip or tar, defaults to zip
1264        """
1265
1266        ref_string = Util.data_params_for_ref(ref)["ref"]
1267        url = self.REPO_GET_ARCHIVE.format(
1268            owner=self.owner.username,
1269            repo=self.name,
1270            ref=ref_string,
1271            format=archive_format.value,
1272        )
1273        return self.allspice_client.requests_get_raw(url)
1274
1275    def get_topics(self) -> list[str]:
1276        """
1277        Gets the list of topics on this repository.
1278
1279        See http://localhost:3000/api/swagger#/repository/repoListTopics
1280        """
1281
1282        url = self.REPO_GET_TOPICS.format(
1283            owner=self.owner.username,
1284            repo=self.name,
1285        )
1286        return self.allspice_client.requests_get(url)["topics"]
1287
1288    def add_topic(self, topic: str):
1289        """
1290        Adds a topic to the repository.
1291
1292        See https://hub.allspice.io/api/swagger#/repository/repoAddTopic
1293
1294        :param topic: The topic to add. Topic names must consist only of
1295            lowercase letters, numnbers and dashes (-), and cannot start with
1296            dashes. Topic names also must be under 35 characters long.
1297        """
1298
1299        url = self.REPO_ADD_TOPIC.format(owner=self.owner.username, repo=self.name, topic=topic)
1300        self.allspice_client.requests_put(url)
1301
1302    def create_release(
1303        self,
1304        tag_name: str,
1305        name: Optional[str] = None,
1306        body: Optional[str] = None,
1307        draft: bool = False,
1308    ):
1309        """
1310        Create a release for this repository. The release will be created for
1311        the tag with the given name. If there is no tag with this name, create
1312        the tag first.
1313
1314        See https://hub.allspice.io/api/swagger#/repository/repoCreateRelease
1315        """
1316
1317        url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name)
1318        data = {
1319            "tag_name": tag_name,
1320            "draft": draft,
1321        }
1322        if name is not None:
1323            data["name"] = name
1324        if body is not None:
1325            data["body"] = body
1326        response = self.allspice_client.requests_post(url, data)
1327        return Release.parse_response(self.allspice_client, response, self)
1328
1329    def get_releases(
1330        self, draft: Optional[bool] = None, pre_release: Optional[bool] = None
1331    ) -> List[Release]:
1332        """
1333        Get the list of releases for this repository.
1334
1335        See https://hub.allspice.io/api/swagger#/repository/repoListReleases
1336        """
1337
1338        data = {}
1339
1340        if draft is not None:
1341            data["draft"] = draft
1342        if pre_release is not None:
1343            data["pre-release"] = pre_release
1344
1345        url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name)
1346        responses = self.allspice_client.requests_get_paginated(url, params=data)
1347
1348        return [
1349            Release.parse_response(self.allspice_client, response, self) for response in responses
1350        ]
1351
1352    def get_latest_release(self) -> Release:
1353        """
1354        Get the latest release for this repository.
1355
1356        See https://hub.allspice.io/api/swagger#/repository/repoGetLatestRelease
1357        """
1358
1359        url = self.REPO_GET_LATEST_RELEASE.format(owner=self.owner.username, repo=self.name)
1360        response = self.allspice_client.requests_get(url)
1361        release = Release.parse_response(self.allspice_client, response, self)
1362        return release
1363
1364    def get_release_by_tag(self, tag: str) -> Release:
1365        """
1366        Get a release by its tag.
1367
1368        See https://hub.allspice.io/api/swagger#/repository/repoGetReleaseByTag
1369        """
1370
1371        url = self.REPO_GET_RELEASE_BY_TAG.format(
1372            owner=self.owner.username, repo=self.name, tag=tag
1373        )
1374        response = self.allspice_client.requests_get(url)
1375        release = Release.parse_response(self.allspice_client, response, self)
1376        return release
1377
1378    def get_commit_statuses(
1379        self,
1380        commit: Union[str, Commit],
1381        sort: Optional[CommitStatusSort] = None,
1382        state: Optional[CommitStatusState] = None,
1383    ) -> List[CommitStatus]:
1384        """
1385        Get a list of statuses for a commit.
1386
1387        This is roughly equivalent to the Commit.get_statuses method, but this
1388        method allows you to sort and filter commits and is more convenient if
1389        you have a commit SHA and don't need to get the commit itself.
1390
1391        See https://hub.allspice.io/api/swagger#/repository/repoListStatuses
1392        """
1393
1394        if isinstance(commit, Commit):
1395            commit = commit.sha
1396
1397        params = {}
1398        if sort is not None:
1399            params["sort"] = sort.value
1400        if state is not None:
1401            params["state"] = state.value
1402
1403        url = self.REPO_GET_COMMIT_STATUS.format(
1404            owner=self.owner.username, repo=self.name, sha=commit
1405        )
1406        response = self.allspice_client.requests_get_paginated(url, params=params)
1407        return [CommitStatus.parse_response(self.allspice_client, status) for status in response]
1408
1409    def create_commit_status(
1410        self,
1411        commit: Union[str, Commit],
1412        context: Optional[str] = None,
1413        description: Optional[str] = None,
1414        state: Optional[CommitStatusState] = None,
1415        target_url: Optional[str] = None,
1416    ) -> CommitStatus:
1417        """
1418        Create a status on a commit.
1419
1420        See https://hub.allspice.io/api/swagger#/repository/repoCreateStatus
1421        """
1422
1423        if isinstance(commit, Commit):
1424            commit = commit.sha
1425
1426        data = {}
1427        if context is not None:
1428            data["context"] = context
1429        if description is not None:
1430            data["description"] = description
1431        if state is not None:
1432            data["state"] = state.value
1433        if target_url is not None:
1434            data["target_url"] = target_url
1435
1436        url = self.REPO_GET_COMMIT_STATUS.format(
1437            owner=self.owner.username, repo=self.name, sha=commit
1438        )
1439        response = self.allspice_client.requests_post(url, data=data)
1440        return CommitStatus.parse_response(self.allspice_client, response)
1441
1442    def delete(self):
1443        self.allspice_client.requests_delete(
1444            Repository.REPO_DELETE % (self.owner.username, self.name)
1445        )
1446        self.deleted = True
Repository(allspice_client)
586    def __init__(self, allspice_client):
587        super().__init__(allspice_client)
allow_fast_forward_only_merge: bool
allow_manual_merge: Any
allow_merge_commits: bool
allow_rebase: bool
allow_rebase_explicit: bool
allow_rebase_update: bool
allow_squash_merge: bool
archived: bool
archived_at: str
autodetect_manual_merge: Any
avatar_url: str
clone_url: str
created_at: str
default_allow_maintainer_edit: bool
default_branch: str
default_delete_branch_after_merge: bool
default_merge_style: str
description: str
empty: bool
enable_prune: Any
external_tracker: Any
external_wiki: Any
fork: bool
forks_count: int
full_name: str
has_actions: bool
has_issues: bool
has_packages: bool
has_projects: bool
has_pull_requests: bool
has_releases: bool
has_wiki: bool
html_url: str
id: int
ignore_whitespace_conflicts: bool
internal: bool
internal_tracker: Dict[str, bool]
language: str
languages_url: str
mirror: bool
mirror_interval: str
mirror_updated: str
name: str
object_format_name: str
open_issues_count: int
open_pr_counter: int
original_url: str
owner: Union[User, Organization]
parent: Any
permissions: Dict[str, bool]
private: bool
projects_mode: str
release_counter: int
repo_transfer: Any
size: int
ssh_url: str
stars_count: int
template: bool
updated_at: datetime.datetime
url: str
watchers_count: int
website: str
API_OBJECT = '/repos/{owner}/{name}'
REPO_IS_COLLABORATOR = '/repos/%s/%s/collaborators/%s'
REPO_BRANCHES = '/repos/%s/%s/branches'
REPO_BRANCH = '/repos/{owner}/{repo}/branches/{branch}'
REPO_ISSUES = '/repos/{owner}/{repo}/issues'
REPO_DESIGN_REVIEWS = '/repos/{owner}/{repo}/pulls'
REPO_DELETE = '/repos/%s/%s'
REPO_TIMES = '/repos/%s/%s/times'
REPO_USER_TIME = '/repos/%s/%s/times/%s'
REPO_COMMITS = '/repos/%s/%s/commits'
REPO_TRANSFER = '/repos/{owner}/{repo}/transfer'
REPO_MILESTONES = '/repos/{owner}/{repo}/milestones'
REPO_GET_ARCHIVE = '/repos/{owner}/{repo}/archive/{ref}.{format}'
REPO_GET_ALLSPICE_JSON = '/repos/{owner}/{repo}/allspice_generated/json/{content}'
REPO_GET_ALLSPICE_SVG = '/repos/{owner}/{repo}/allspice_generated/svg/{content}'
REPO_GET_TOPICS = '/repos/{owner}/{repo}/topics'
REPO_ADD_TOPIC = '/repos/{owner}/{repo}/topics/{topic}'
REPO_GET_RELEASES = '/repos/{owner}/{repo}/releases'
REPO_GET_LATEST_RELEASE = '/repos/{owner}/{repo}/releases/latest'
REPO_GET_RELEASE_BY_TAG = '/repos/{owner}/{repo}/releases/tags/{tag}'
REPO_GET_COMMIT_STATUS = '/repos/{owner}/{repo}/statuses/{sha}'
REPO_GET_RAW_FILE = '/repos/{owner}/{repo}/raw/{path}'
REPO_GET_TREE = '/repos/{owner}/{repo}/git/trees/{ref}'
@classmethod
def request( cls, allspice_client, owner: str, name: str) -> Repository:
607    @classmethod
608    def request(
609        cls,
610        allspice_client,
611        owner: str,
612        name: str,
613    ) -> Repository:
614        return cls._request(allspice_client, {"owner": owner, "name": name})
@classmethod
def search( cls, allspice_client, query: Optional[str] = None, topic: bool = False, include_description: bool = False, user: Optional[User] = None, owner_to_prioritize: Union[User, Organization, NoneType] = None) -> list[Repository]:
616    @classmethod
617    def search(
618        cls,
619        allspice_client,
620        query: Optional[str] = None,
621        topic: bool = False,
622        include_description: bool = False,
623        user: Optional[User] = None,
624        owner_to_prioritize: Union[User, Organization, None] = None,
625    ) -> list[Repository]:
626        """
627        Search for repositories.
628
629        See https://hub.allspice.io/api/swagger#/repository/repoSearch
630
631        :param query: The query string to search for
632        :param topic: If true, the query string will only be matched against the
633            repository's topic.
634        :param include_description: If true, the query string will be matched
635            against the repository's description as well.
636        :param user: If specified, only repositories that this user owns or
637            contributes to will be searched.
638        :param owner_to_prioritize: If specified, repositories owned by the
639            given entity will be prioritized in the search.
640        :returns: All repositories matching the query. If there are many
641            repositories matching this query, this may take some time.
642        """
643
644        params = {}
645
646        if query is not None:
647            params["q"] = query
648        if topic:
649            params["topic"] = topic
650        if include_description:
651            params["include_description"] = include_description
652        if user is not None:
653            params["user"] = user.id
654        if owner_to_prioritize is not None:
655            params["owner_to_prioritize"] = owner_to_prioritize.id
656
657        responses = allspice_client.requests_get_paginated(cls.REPO_SEARCH, params=params)
658
659        return [Repository.parse_response(allspice_client, response) for response in responses]

Search for repositories.

See allspice.allspice.io/api/swagger#/repository/repoSearch">https://huballspice.allspice.io/api/swagger#/repository/repoSearch

Parameters
  • query: The query string to search for
  • topic: If true, the query string will only be matched against the repository's topic.
  • include_description: If true, the query string will be matched against the repository's description as well.
  • user: If specified, only repositories that this user owns or contributes to will be searched.
  • owner_to_prioritize: If specified, repositories owned by the given entity will be prioritized in the search. :returns: All repositories matching the query. If there are many repositories matching this query, this may take some time.
def commit(self):
690    def commit(self):
691        args = {"owner": self.owner.username, "name": self.name}
692        self._commit(args)
def get_branches(self) -> List[Branch]:
694    def get_branches(self) -> List["Branch"]:
695        """Get all the Branches of this Repository."""
696
697        results = self.allspice_client.requests_get_paginated(
698            Repository.REPO_BRANCHES % (self.owner.username, self.name)
699        )
700        return [Branch.parse_response(self.allspice_client, result) for result in results]

Get all the Branches of this Repository.

def get_branch(self, name: str) -> Branch:
702    def get_branch(self, name: str) -> "Branch":
703        """Get a specific Branch of this Repository."""
704        result = self.allspice_client.requests_get(
705            Repository.REPO_BRANCH.format(owner=self.owner.username, repo=self.name, branch=name)
706        )
707        return Branch.parse_response(self.allspice_client, result)

Get a specific Branch of this Repository.

def add_branch( self, create_from: Union[Branch, Commit, str], newname: str) -> Branch:
709    def add_branch(self, create_from: Ref, newname: str) -> "Branch":
710        """Add a branch to the repository"""
711        # Note: will only work with gitea 1.13 or higher!
712
713        ref_name = Util.data_params_for_ref(create_from)
714        if "ref" not in ref_name:
715            raise ValueError("create_from must be a Branch, Commit or string")
716        ref_name = ref_name["ref"]
717
718        data = {"new_branch_name": newname, "old_ref_name": ref_name}
719        result = self.allspice_client.requests_post(
720            Repository.REPO_BRANCHES % (self.owner.username, self.name), data=data
721        )
722        return Branch.parse_response(self.allspice_client, result)

Add a branch to the repository

def get_issues( self, state: Literal['open', 'closed', 'all'] = 'all', search_query: Optional[str] = None, labels: Optional[List[str]] = None, milestones: Optional[List[Union[Milestone, str]]] = None, assignee: Union[User, str, NoneType] = None, since: Optional[datetime.datetime] = None, before: Optional[datetime.datetime] = None) -> List[Issue]:
724    def get_issues(
725        self,
726        state: Literal["open", "closed", "all"] = "all",
727        search_query: Optional[str] = None,
728        labels: Optional[List[str]] = None,
729        milestones: Optional[List[Union[Milestone, str]]] = None,
730        assignee: Optional[Union[User, str]] = None,
731        since: Optional[datetime] = None,
732        before: Optional[datetime] = None,
733    ) -> List["Issue"]:
734        """
735        Get all Issues of this Repository (open and closed)
736
737        https://hub.allspice.io/api/swagger#/repository/repoListIssues
738
739        All params of this method are optional filters. If you don't specify a filter, it
740        will not be applied.
741
742        :param state: The state of the Issues to get. If None, all Issues are returned.
743        :param search_query: Filter issues by text. This is equivalent to searching for
744                             `search_query` in the Issues on the web interface.
745        :param labels: Filter issues by labels.
746        :param milestones: Filter issues by milestones.
747        :param assignee: Filter issues by the assigned user.
748        :param since: Filter issues by the date they were created.
749        :param before: Filter issues by the date they were created.
750        :return: A list of Issues.
751        """
752
753        data = {
754            "state": state,
755        }
756        if search_query:
757            data["q"] = search_query
758        if labels:
759            data["labels"] = ",".join(labels)
760        if milestones:
761            data["milestone"] = ",".join(
762                [
763                    milestone.name if isinstance(milestone, Milestone) else milestone
764                    for milestone in milestones
765                ]
766            )
767        if assignee:
768            if isinstance(assignee, User):
769                data["assignee"] = assignee.username
770            else:
771                data["assignee"] = assignee
772        if since:
773            data["since"] = Util.format_time(since)
774        if before:
775            data["before"] = Util.format_time(before)
776
777        results = self.allspice_client.requests_get_paginated(
778            Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name),
779            params=data,
780        )
781
782        issues = []
783        for result in results:
784            issue = Issue.parse_response(self.allspice_client, result)
785            # See Issue.request
786            setattr(issue, "_repository", self)
787            # This is mostly for compatibility with an older implementation
788            Issue._add_read_property("repo", self, issue)
789            issues.append(issue)
790
791        return issues

Get all Issues of this Repository (open and closed)

allspice.allspice.io/api/swagger#/repository/repoListIssues">https://huballspice.allspice.io/api/swagger#/repository/repoListIssues

All params of this method are optional filters. If you don't specify a filter, it will not be applied.

Parameters
  • state: The state of the Issues to get. If None, all Issues are returned.
  • search_query: Filter issues by text. This is equivalent to searching for search_query in the Issues on the web interface.
  • labels: Filter issues by labels.
  • milestones: Filter issues by milestones.
  • assignee: Filter issues by the assigned user.
  • since: Filter issues by the date they were created.
  • before: Filter issues by the date they were created.
Returns

A list of Issues.

def get_design_reviews( self, state: Literal['open', 'closed', 'all'] = 'all', milestone: Union[Milestone, str, NoneType] = None, labels: Optional[List[str]] = None) -> List[DesignReview]:
793    def get_design_reviews(
794        self,
795        state: Literal["open", "closed", "all"] = "all",
796        milestone: Optional[Union[Milestone, str]] = None,
797        labels: Optional[List[str]] = None,
798    ) -> List["DesignReview"]:
799        """
800        Get all Design Reviews of this Repository.
801
802        https://hub.allspice.io/api/swagger#/repository/repoListPullRequests
803
804        :param state: The state of the Design Reviews to get. If None, all Design Reviews
805                      are returned.
806        :param milestone: The milestone of the Design Reviews to get.
807        :param labels: A list of label IDs to filter DRs by.
808        :return: A list of Design Reviews.
809        """
810
811        params = {
812            "state": state,
813        }
814        if milestone:
815            if isinstance(milestone, Milestone):
816                params["milestone"] = milestone.name
817            else:
818                params["milestone"] = milestone
819        if labels:
820            params["labels"] = ",".join(labels)
821
822        results = self.allspice_client.requests_get_paginated(
823            self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name),
824            params=params,
825        )
826        return [DesignReview.parse_response(self.allspice_client, result) for result in results]

Get all Design Reviews of this Repository.

allspice.allspice.io/api/swagger#/repository/repoListPullRequests">https://huballspice.allspice.io/api/swagger#/repository/repoListPullRequests

Parameters
  • state: The state of the Design Reviews to get. If None, all Design Reviews are returned.
  • milestone: The milestone of the Design Reviews to get.
  • labels: A list of label IDs to filter DRs by.
Returns

A list of Design Reviews.

def get_commits( self, sha: Optional[str] = None, path: Optional[str] = None, stat: bool = True) -> List[Commit]:
828    def get_commits(
829        self,
830        sha: Optional[str] = None,
831        path: Optional[str] = None,
832        stat: bool = True,
833    ) -> List["Commit"]:
834        """
835        Get all the Commits of this Repository.
836
837        https://hub.allspice.io/api/swagger#/repository/repoGetAllCommits
838
839        :param sha: The SHA of the commit to start listing commits from.
840        :param path: filepath of a file/dir.
841        :param stat: Include the number of additions and deletions in the response.
842                     Disable for speedup.
843        :return: A list of Commits.
844        """
845
846        data = {}
847        if sha:
848            data["sha"] = sha
849        if path:
850            data["path"] = path
851        if not stat:
852            data["stat"] = False
853
854        try:
855            results = self.allspice_client.requests_get_paginated(
856                Repository.REPO_COMMITS % (self.owner.username, self.name),
857                params=data,
858            )
859        except ConflictException as err:
860            logging.warning(err)
861            logging.warning("Repository %s/%s is Empty" % (self.owner.username, self.name))
862            results = []
863        return [Commit.parse_response(self.allspice_client, result) for result in results]

Get all the Commits of this Repository.

allspice.allspice.io/api/swagger#/repository/repoGetAllCommits">https://huballspice.allspice.io/api/swagger#/repository/repoGetAllCommits

Parameters
  • sha: The SHA of the commit to start listing commits from.
  • path: filepath of a file/dir.
  • stat: Include the number of additions and deletions in the response. Disable for speedup.
Returns

A list of Commits.

def get_issues_state(self, state) -> List[Issue]:
865    def get_issues_state(self, state) -> List["Issue"]:
866        """
867        DEPRECATED: Use get_issues() instead.
868
869        Get issues of state Issue.open or Issue.closed of a repository.
870        """
871
872        assert state in [Issue.OPENED, Issue.CLOSED]
873        issues = []
874        data = {"state": state}
875        results = self.allspice_client.requests_get_paginated(
876            Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name),
877            params=data,
878        )
879        for result in results:
880            issue = Issue.parse_response(self.allspice_client, result)
881            # adding data not contained in the issue response
882            # See Issue.request()
883            setattr(issue, "_repository", self)
884            Issue._add_read_property("repo", self, issue)
885            Issue._add_read_property("owner", self.owner, issue)
886            issues.append(issue)
887        return issues

DEPRECATED: Use get_issues() instead.

Get issues of state Issue.open or Issue.closed of a repository.

def get_times(self):
889    def get_times(self):
890        results = self.allspice_client.requests_get(
891            Repository.REPO_TIMES % (self.owner.username, self.name)
892        )
893        return results
def get_user_time(self, username) -> float:
895    def get_user_time(self, username) -> float:
896        if isinstance(username, User):
897            username = username.username
898        results = self.allspice_client.requests_get(
899            Repository.REPO_USER_TIME % (self.owner.username, self.name, username)
900        )
901        time = sum(r["time"] for r in results)
902        return time
def get_full_name(self) -> str:
904    def get_full_name(self) -> str:
905        return self.owner.username + "/" + self.name
def create_issue( self, title, assignees=frozenset(), description='') -> allspice.baseapiobject.ApiObject:
907    def create_issue(self, title, assignees=frozenset(), description="") -> ApiObject:
908        data = {
909            "assignees": assignees,
910            "body": description,
911            "closed": False,
912            "title": title,
913        }
914        result = self.allspice_client.requests_post(
915            Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name),
916            data=data,
917        )
918
919        issue = Issue.parse_response(self.allspice_client, result)
920        setattr(issue, "_repository", self)
921        Issue._add_read_property("repo", self, issue)
922        return issue
def create_design_review( self, title: str, head: Union[Branch, str], base: Union[Branch, str], assignees: Optional[Set[Union[User, str]]] = None, body: Optional[str] = None, due_date: Optional[datetime.datetime] = None, milestone: Optional[Milestone] = None) -> DesignReview:
924    def create_design_review(
925        self,
926        title: str,
927        head: Union[Branch, str],
928        base: Union[Branch, str],
929        assignees: Optional[Set[Union[User, str]]] = None,
930        body: Optional[str] = None,
931        due_date: Optional[datetime] = None,
932        milestone: Optional["Milestone"] = None,
933    ) -> "DesignReview":
934        """
935        Create a new Design Review.
936
937        See https://hub.allspice.io/api/swagger#/repository/repoCreatePullRequest
938
939        :param title: Title of the Design Review
940        :param head: Branch or name of the branch to merge into the base branch
941        :param base: Branch or name of the branch to merge into
942        :param assignees: Optional. A list of users to assign this review. List can be of
943                          User objects or of usernames.
944        :param body: An Optional Description for the Design Review.
945        :param due_date: An Optional Due date for the Design Review.
946        :param milestone: An Optional Milestone for the Design Review
947        :return: The created Design Review
948        """
949
950        data: dict[str, Any] = {
951            "title": title,
952        }
953
954        if isinstance(head, Branch):
955            data["head"] = head.name
956        else:
957            data["head"] = head
958        if isinstance(base, Branch):
959            data["base"] = base.name
960        else:
961            data["base"] = base
962        if assignees:
963            data["assignees"] = [a.username if isinstance(a, User) else a for a in assignees]
964        if body:
965            data["body"] = body
966        if due_date:
967            data["due_date"] = Util.format_time(due_date)
968        if milestone:
969            data["milestone"] = milestone.id
970
971        result = self.allspice_client.requests_post(
972            self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name),
973            data=data,
974        )
975
976        return DesignReview.parse_response(self.allspice_client, result)

Create a new Design Review.

See allspice.allspice.io/api/swagger#/repository/repoCreatePullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoCreatePullRequest

Parameters
  • title: Title of the Design Review
  • head: Branch or name of the branch to merge into the base branch
  • base: Branch or name of the branch to merge into
  • assignees: Optional. A list of users to assign this review. List can be of User objects or of usernames.
  • body: An Optional Description for the Design Review.
  • due_date: An Optional Due date for the Design Review.
  • milestone: An Optional Milestone for the Design Review
Returns

The created Design Review

def create_milestone( self, title: str, description: str, due_date: Optional[str] = None, state: str = 'open') -> Milestone:
978    def create_milestone(
979        self,
980        title: str,
981        description: str,
982        due_date: Optional[str] = None,
983        state: str = "open",
984    ) -> "Milestone":
985        url = Repository.REPO_MILESTONES.format(owner=self.owner.username, repo=self.name)
986        data = {"title": title, "description": description, "state": state}
987        if due_date:
988            data["due_date"] = due_date
989        result = self.allspice_client.requests_post(url, data=data)
990        return Milestone.parse_response(self.allspice_client, result)
def create_gitea_hook(self, hook_url: str, events: List[str]):
 992    def create_gitea_hook(self, hook_url: str, events: List[str]):
 993        url = f"/repos/{self.owner.username}/{self.name}/hooks"
 994        data = {
 995            "type": "gitea",
 996            "config": {"content_type": "json", "url": hook_url},
 997            "events": events,
 998            "active": True,
 999        }
1000        return self.allspice_client.requests_post(url, data=data)
def list_hooks(self):
1002    def list_hooks(self):
1003        url = f"/repos/{self.owner.username}/{self.name}/hooks"
1004        return self.allspice_client.requests_get(url)
def delete_hook(self, id: str):
1006    def delete_hook(self, id: str):
1007        url = f"/repos/{self.owner.username}/{self.name}/hooks/{id}"
1008        self.allspice_client.requests_delete(url)
def is_collaborator(self, username) -> bool:
1010    def is_collaborator(self, username) -> bool:
1011        if isinstance(username, User):
1012            username = username.username
1013        try:
1014            # returns 204 if its ok, 404 if its not
1015            self.allspice_client.requests_get(
1016                Repository.REPO_IS_COLLABORATOR % (self.owner.username, self.name, username)
1017            )
1018            return True
1019        except Exception:
1020            return False
def get_users_with_access(self) -> Sequence[User]:
1022    def get_users_with_access(self) -> Sequence[User]:
1023        url = f"/repos/{self.owner.username}/{self.name}/collaborators"
1024        response = self.allspice_client.requests_get(url)
1025        collabs = [User.parse_response(self.allspice_client, user) for user in response]
1026        if isinstance(self.owner, User):
1027            return [*collabs, self.owner]
1028        else:
1029            # owner must be org
1030            teams = self.owner.get_teams()
1031            for team in teams:
1032                team_repos = team.get_repos()
1033                if self.name in [n.name for n in team_repos]:
1034                    collabs += team.get_members()
1035            return collabs
def remove_collaborator(self, user_name: str):
1037    def remove_collaborator(self, user_name: str):
1038        url = f"/repos/{self.owner.username}/{self.name}/collaborators/{user_name}"
1039        self.allspice_client.requests_delete(url)
def transfer_ownership( self, new_owner: Union[User, Organization], new_teams: Union[Set[Team], FrozenSet[Team]] = frozenset()):
1041    def transfer_ownership(
1042        self,
1043        new_owner: Union[User, Organization],
1044        new_teams: Set[Team] | FrozenSet[Team] = frozenset(),
1045    ):
1046        url = Repository.REPO_TRANSFER.format(owner=self.owner.username, repo=self.name)
1047        data: dict[str, Any] = {"new_owner": new_owner.username}
1048        if isinstance(new_owner, Organization):
1049            new_team_ids = [team.id for team in new_teams if team in new_owner.get_teams()]
1050            data["team_ids"] = new_team_ids
1051        self.allspice_client.requests_post(url, data=data)
1052        # TODO: make sure this instance is either updated or discarded
def get_git_content( self, ref: Union[Branch, Commit, str, NoneType] = None, commit: Optional[Commit] = None) -> List[Content]:
1054    def get_git_content(
1055        self,
1056        ref: Optional["Ref"] = None,
1057        commit: "Optional[Commit]" = None,
1058    ) -> List[Content]:
1059        """
1060        Get the metadata for all files in the root directory.
1061
1062        https://hub.allspice.io/api/swagger#/repository/repoGetContentsList
1063
1064        :param ref: branch or commit to get content from
1065        :param commit: commit to get content from (deprecated)
1066        """
1067        url = f"/repos/{self.owner.username}/{self.name}/contents"
1068        data = Util.data_params_for_ref(ref or commit)
1069
1070        result = [
1071            Content.parse_response(self.allspice_client, f)
1072            for f in self.allspice_client.requests_get(url, data)
1073        ]
1074        return result

Get the metadata for all files in the root directory.

allspice.allspice.io/api/swagger#/repository/repoGetContentsList">https://huballspice.allspice.io/api/swagger#/repository/repoGetContentsList

Parameters
  • ref: branch or commit to get content from
  • commit: commit to get content from (deprecated)
def get_tree( self, ref: Union[Branch, Commit, str, NoneType] = None, recursive: bool = False) -> List[allspice.apiobject.GitEntry]:
1076    def get_tree(self, ref: Optional[Ref] = None, recursive: bool = False) -> List[GitEntry]:
1077        """
1078        Get the repository's tree on a given ref.
1079
1080        By default, this will only return the top-level entries in the tree. If you want
1081        to get the entire tree, set `recursive` to True.
1082
1083        :param ref: The ref to get the tree from. If not provided, the default branch is used.
1084        :param recursive: Whether to get the entire tree or just the top-level entries.
1085        """
1086
1087        ref = Util.data_params_for_ref(ref).get("ref", self.default_branch)
1088        url = self.REPO_GET_TREE.format(owner=self.owner.username, repo=self.name, ref=ref)
1089        params = {"recursive": recursive}
1090        results = self.allspice_client.requests_get_paginated(url, params=params)
1091        return [GitEntry.parse_response(self.allspice_client, result) for result in results]

Get the repository's tree on a given ref.

By default, this will only return the top-level entries in the tree. If you want to get the entire tree, set recursive to True.

Parameters
  • ref: The ref to get the tree from. If not provided, the default branch is used.
  • recursive: Whether to get the entire tree or just the top-level entries.
def get_file_content( self, content: Content, ref: Union[Branch, Commit, str, NoneType] = None, commit: Optional[Commit] = None) -> Union[str, List[Content]]:
1093    def get_file_content(
1094        self,
1095        content: Content,
1096        ref: Optional[Ref] = None,
1097        commit: Optional[Commit] = None,
1098    ) -> Union[str, List["Content"]]:
1099        """https://hub.allspice.io/api/swagger#/repository/repoGetContents"""
1100        url = f"/repos/{self.owner.username}/{self.name}/contents/{content.path}"
1101        data = Util.data_params_for_ref(ref or commit)
1102
1103        if content.type == Content.FILE:
1104            return self.allspice_client.requests_get(url, data)["content"]
1105        else:
1106            return [
1107                Content.parse_response(self.allspice_client, f)
1108                for f in self.allspice_client.requests_get(url, data)
1109            ]

allspice.allspice.io/api/swagger#/repository/repoGetContents">https://huballspice.allspice.io/api/swagger#/repository/repoGetContents

def get_raw_file( self, file_path: str, ref: Union[Branch, Commit, str, NoneType] = None) -> bytes:
1111    def get_raw_file(
1112        self,
1113        file_path: str,
1114        ref: Optional[Ref] = None,
1115    ) -> bytes:
1116        """
1117        Get the raw, binary data of a single file.
1118
1119        Note 1: if the file you are requesting is a text file, you might want to
1120        use .decode() on the result to get a string. For example:
1121
1122            content = repo.get_raw_file("file.txt").decode("utf-8")
1123
1124        Note 2: this method will store the entire file in memory. If you want
1125        to download a large file, you might want to use `download_to_file`
1126        instead.
1127
1128        See https://hub.allspice.io/api/swagger#/repository/repoGetRawFile
1129
1130        :param file_path: The path to the file to get.
1131        :param ref: The branch or commit to get the file from.  If not provided,
1132            the default branch is used.
1133        """
1134
1135        url = self.REPO_GET_RAW_FILE.format(
1136            owner=self.owner.username,
1137            repo=self.name,
1138            path=file_path,
1139        )
1140        params = Util.data_params_for_ref(ref)
1141        return self.allspice_client.requests_get_raw(url, params=params)

Get the raw, binary data of a single file.

Note 1: if the file you are requesting is a text file, you might want to use .decode() on the result to get a string. For example:

content = repo.get_raw_file("file.txt").decode("utf-8")

Note 2: this method will store the entire file in memory. If you want to download a large file, you might want to use download_to_file instead.

See allspice.allspice.io/api/swagger#/repository/repoGetRawFile">https://huballspice.allspice.io/api/swagger#/repository/repoGetRawFile

Parameters
  • file_path: The path to the file to get.
  • ref: The branch or commit to get the file from. If not provided, the default branch is used.
def download_to_file( self, file_path: str, io: <class 'IO'>, ref: Union[Branch, Commit, str, NoneType] = None) -> None:
1143    def download_to_file(
1144        self,
1145        file_path: str,
1146        io: IO,
1147        ref: Optional[Ref] = None,
1148    ) -> None:
1149        """
1150        Download the binary data of a file to a file-like object.
1151
1152        Example:
1153
1154            with open("schematic.DSN", "wb") as f:
1155                Repository.download_to_file("Schematics/my_schematic.DSN", f)
1156
1157        :param file_path: The path to the file in the repository from the root
1158            of the repository.
1159        :param io: The file-like object to write the data to.
1160        """
1161
1162        url = self.allspice_client._AllSpice__get_url(
1163            self.REPO_GET_RAW_FILE.format(
1164                owner=self.owner.username,
1165                repo=self.name,
1166                path=file_path,
1167            )
1168        )
1169        params = Util.data_params_for_ref(ref)
1170        response = self.allspice_client.requests.get(
1171            url,
1172            params=params,
1173            headers=self.allspice_client.headers,
1174            stream=True,
1175        )
1176
1177        for chunk in response.iter_content(chunk_size=4096):
1178            if chunk:
1179                io.write(chunk)

Download the binary data of a file to a file-like object.

Example:

with open("schematic.DSN", "wb") as f:
    Repository.download_to_file("Schematics/my_schematic.DSN", f)
Parameters
  • file_path: The path to the file in the repository from the root of the repository.
  • io: The file-like object to write the data to.
def get_generated_json( self, content: Union[Content, str], ref: Union[Branch, Commit, str, NoneType] = None) -> dict:
1181    def get_generated_json(self, content: Union[Content, str], ref: Optional[Ref] = None) -> dict:
1182        """
1183        Get the json blob for a cad file if it exists, otherwise enqueue
1184        a new job and return a 503 status.
1185
1186        WARNING: This is still experimental and not recommended for critical
1187        applications. The structure and content of the returned dictionary can
1188        change at any time.
1189
1190        See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON
1191        """
1192
1193        if isinstance(content, Content):
1194            content = content.path
1195
1196        url = self.REPO_GET_ALLSPICE_JSON.format(
1197            owner=self.owner.username,
1198            repo=self.name,
1199            content=content,
1200        )
1201        data = Util.data_params_for_ref(ref)
1202        return self.allspice_client.requests_get(url, data)

Get the json blob for a cad file if it exists, otherwise enqueue a new job and return a 503 status.

WARNING: This is still experimental and not recommended for critical applications. The structure and content of the returned dictionary can change at any time.

See allspice.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON">https://huballspice.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON

def get_generated_svg( self, content: Union[Content, str], ref: Union[Branch, Commit, str, NoneType] = None) -> bytes:
1204    def get_generated_svg(self, content: Union[Content, str], ref: Optional[Ref] = None) -> bytes:
1205        """
1206        Get the svg blob for a cad file if it exists, otherwise enqueue
1207        a new job and return a 503 status.
1208
1209        WARNING: This is still experimental and not yet recommended for
1210        critical applications. The content of the returned svg can change
1211        at any time.
1212
1213        See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG
1214        """
1215
1216        if isinstance(content, Content):
1217            content = content.path
1218
1219        url = self.REPO_GET_ALLSPICE_SVG.format(
1220            owner=self.owner.username,
1221            repo=self.name,
1222            content=content,
1223        )
1224        data = Util.data_params_for_ref(ref)
1225        return self.allspice_client.requests_get_raw(url, data)

Get the svg blob for a cad file if it exists, otherwise enqueue a new job and return a 503 status.

WARNING: This is still experimental and not yet recommended for critical applications. The content of the returned svg can change at any time.

See allspice.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG">https://huballspice.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG

def create_file(self, file_path: str, content: str, data: Optional[dict] = None):
1227    def create_file(self, file_path: str, content: str, data: Optional[dict] = None):
1228        """https://hub.allspice.io/api/swagger#/repository/repoCreateFile"""
1229        if not data:
1230            data = {}
1231        url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}"
1232        data.update({"content": content})
1233        return self.allspice_client.requests_post(url, data)

allspice.allspice.io/api/swagger#/repository/repoCreateFile">https://huballspice.allspice.io/api/swagger#/repository/repoCreateFile

def change_file( self, file_path: str, file_sha: str, content: str, data: Optional[dict] = None):
1235    def change_file(self, file_path: str, file_sha: str, content: str, data: Optional[dict] = None):
1236        """https://hub.allspice.io/api/swagger#/repository/repoCreateFile"""
1237        if not data:
1238            data = {}
1239        url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}"
1240        data.update({"sha": file_sha, "content": content})
1241        return self.allspice_client.requests_put(url, data)

allspice.allspice.io/api/swagger#/repository/repoCreateFile">https://huballspice.allspice.io/api/swagger#/repository/repoCreateFile

def delete_file(self, file_path: str, file_sha: str, data: Optional[dict] = None):
1243    def delete_file(self, file_path: str, file_sha: str, data: Optional[dict] = None):
1244        """https://hub.allspice.io/api/swagger#/repository/repoDeleteFile"""
1245        if not data:
1246            data = {}
1247        url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}"
1248        data.update({"sha": file_sha})
1249        return self.allspice_client.requests_delete(url, data)

allspice.allspice.io/api/swagger#/repository/repoDeleteFile">https://huballspice.allspice.io/api/swagger#/repository/repoDeleteFile

def get_archive( self, ref: Union[Branch, Commit, str] = 'main', archive_format: Repository.ArchiveFormat = <ArchiveFormat.ZIP: 'zip'>) -> bytes:
1251    def get_archive(
1252        self,
1253        ref: Ref = "main",
1254        archive_format: ArchiveFormat = ArchiveFormat.ZIP,
1255    ) -> bytes:
1256        """
1257        Download all the files in a specific ref of a repository as a zip or tarball
1258        archive.
1259
1260        https://hub.allspice.io/api/swagger#/repository/repoGetArchive
1261
1262        :param ref: branch or commit to get content from, defaults to the "main" branch
1263        :param archive_format: zip or tar, defaults to zip
1264        """
1265
1266        ref_string = Util.data_params_for_ref(ref)["ref"]
1267        url = self.REPO_GET_ARCHIVE.format(
1268            owner=self.owner.username,
1269            repo=self.name,
1270            ref=ref_string,
1271            format=archive_format.value,
1272        )
1273        return self.allspice_client.requests_get_raw(url)

Download all the files in a specific ref of a repository as a zip or tarball archive.

allspice.allspice.io/api/swagger#/repository/repoGetArchive">https://huballspice.allspice.io/api/swagger#/repository/repoGetArchive

Parameters
  • ref: branch or commit to get content from, defaults to the "main" branch
  • archive_format: zip or tar, defaults to zip
def get_topics(self) -> list[str]:
1275    def get_topics(self) -> list[str]:
1276        """
1277        Gets the list of topics on this repository.
1278
1279        See http://localhost:3000/api/swagger#/repository/repoListTopics
1280        """
1281
1282        url = self.REPO_GET_TOPICS.format(
1283            owner=self.owner.username,
1284            repo=self.name,
1285        )
1286        return self.allspice_client.requests_get(url)["topics"]

Gets the list of topics on this repository.

See http://localhost:3000/api/swagger#/repository/repoListTopics

def add_topic(self, topic: str):
1288    def add_topic(self, topic: str):
1289        """
1290        Adds a topic to the repository.
1291
1292        See https://hub.allspice.io/api/swagger#/repository/repoAddTopic
1293
1294        :param topic: The topic to add. Topic names must consist only of
1295            lowercase letters, numnbers and dashes (-), and cannot start with
1296            dashes. Topic names also must be under 35 characters long.
1297        """
1298
1299        url = self.REPO_ADD_TOPIC.format(owner=self.owner.username, repo=self.name, topic=topic)
1300        self.allspice_client.requests_put(url)

Adds a topic to the repository.

See allspice.allspice.io/api/swagger#/repository/repoAddTopic">https://huballspice.allspice.io/api/swagger#/repository/repoAddTopic

Parameters
  • topic: The topic to add. Topic names must consist only of lowercase letters, numnbers and dashes (-), and cannot start with dashes. Topic names also must be under 35 characters long.
def create_release( self, tag_name: str, name: Optional[str] = None, body: Optional[str] = None, draft: bool = False):
1302    def create_release(
1303        self,
1304        tag_name: str,
1305        name: Optional[str] = None,
1306        body: Optional[str] = None,
1307        draft: bool = False,
1308    ):
1309        """
1310        Create a release for this repository. The release will be created for
1311        the tag with the given name. If there is no tag with this name, create
1312        the tag first.
1313
1314        See https://hub.allspice.io/api/swagger#/repository/repoCreateRelease
1315        """
1316
1317        url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name)
1318        data = {
1319            "tag_name": tag_name,
1320            "draft": draft,
1321        }
1322        if name is not None:
1323            data["name"] = name
1324        if body is not None:
1325            data["body"] = body
1326        response = self.allspice_client.requests_post(url, data)
1327        return Release.parse_response(self.allspice_client, response, self)

Create a release for this repository. The release will be created for the tag with the given name. If there is no tag with this name, create the tag first.

See allspice.allspice.io/api/swagger#/repository/repoCreateRelease">https://huballspice.allspice.io/api/swagger#/repository/repoCreateRelease

def get_releases( self, draft: Optional[bool] = None, pre_release: Optional[bool] = None) -> List[Release]:
1329    def get_releases(
1330        self, draft: Optional[bool] = None, pre_release: Optional[bool] = None
1331    ) -> List[Release]:
1332        """
1333        Get the list of releases for this repository.
1334
1335        See https://hub.allspice.io/api/swagger#/repository/repoListReleases
1336        """
1337
1338        data = {}
1339
1340        if draft is not None:
1341            data["draft"] = draft
1342        if pre_release is not None:
1343            data["pre-release"] = pre_release
1344
1345        url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name)
1346        responses = self.allspice_client.requests_get_paginated(url, params=data)
1347
1348        return [
1349            Release.parse_response(self.allspice_client, response, self) for response in responses
1350        ]

Get the list of releases for this repository.

See allspice.allspice.io/api/swagger#/repository/repoListReleases">https://huballspice.allspice.io/api/swagger#/repository/repoListReleases

def get_latest_release(self) -> Release:
1352    def get_latest_release(self) -> Release:
1353        """
1354        Get the latest release for this repository.
1355
1356        See https://hub.allspice.io/api/swagger#/repository/repoGetLatestRelease
1357        """
1358
1359        url = self.REPO_GET_LATEST_RELEASE.format(owner=self.owner.username, repo=self.name)
1360        response = self.allspice_client.requests_get(url)
1361        release = Release.parse_response(self.allspice_client, response, self)
1362        return release

Get the latest release for this repository.

See allspice.allspice.io/api/swagger#/repository/repoGetLatestRelease">https://huballspice.allspice.io/api/swagger#/repository/repoGetLatestRelease

def get_release_by_tag(self, tag: str) -> Release:
1364    def get_release_by_tag(self, tag: str) -> Release:
1365        """
1366        Get a release by its tag.
1367
1368        See https://hub.allspice.io/api/swagger#/repository/repoGetReleaseByTag
1369        """
1370
1371        url = self.REPO_GET_RELEASE_BY_TAG.format(
1372            owner=self.owner.username, repo=self.name, tag=tag
1373        )
1374        response = self.allspice_client.requests_get(url)
1375        release = Release.parse_response(self.allspice_client, response, self)
1376        return release

Get a release by its tag.

See allspice.allspice.io/api/swagger#/repository/repoGetReleaseByTag">https://huballspice.allspice.io/api/swagger#/repository/repoGetReleaseByTag

def get_commit_statuses( self, commit: Union[str, Commit], sort: Optional[Repository.CommitStatusSort] = None, state: Optional[allspice.apiobject.CommitStatusState] = None) -> List[allspice.apiobject.CommitStatus]:
1378    def get_commit_statuses(
1379        self,
1380        commit: Union[str, Commit],
1381        sort: Optional[CommitStatusSort] = None,
1382        state: Optional[CommitStatusState] = None,
1383    ) -> List[CommitStatus]:
1384        """
1385        Get a list of statuses for a commit.
1386
1387        This is roughly equivalent to the Commit.get_statuses method, but this
1388        method allows you to sort and filter commits and is more convenient if
1389        you have a commit SHA and don't need to get the commit itself.
1390
1391        See https://hub.allspice.io/api/swagger#/repository/repoListStatuses
1392        """
1393
1394        if isinstance(commit, Commit):
1395            commit = commit.sha
1396
1397        params = {}
1398        if sort is not None:
1399            params["sort"] = sort.value
1400        if state is not None:
1401            params["state"] = state.value
1402
1403        url = self.REPO_GET_COMMIT_STATUS.format(
1404            owner=self.owner.username, repo=self.name, sha=commit
1405        )
1406        response = self.allspice_client.requests_get_paginated(url, params=params)
1407        return [CommitStatus.parse_response(self.allspice_client, status) for status in response]

Get a list of statuses for a commit.

This is roughly equivalent to the Commit.get_statuses method, but this method allows you to sort and filter commits and is more convenient if you have a commit SHA and don't need to get the commit itself.

See allspice.allspice.io/api/swagger#/repository/repoListStatuses">https://huballspice.allspice.io/api/swagger#/repository/repoListStatuses

def create_commit_status( self, commit: Union[str, Commit], context: Optional[str] = None, description: Optional[str] = None, state: Optional[allspice.apiobject.CommitStatusState] = None, target_url: Optional[str] = None) -> allspice.apiobject.CommitStatus:
1409    def create_commit_status(
1410        self,
1411        commit: Union[str, Commit],
1412        context: Optional[str] = None,
1413        description: Optional[str] = None,
1414        state: Optional[CommitStatusState] = None,
1415        target_url: Optional[str] = None,
1416    ) -> CommitStatus:
1417        """
1418        Create a status on a commit.
1419
1420        See https://hub.allspice.io/api/swagger#/repository/repoCreateStatus
1421        """
1422
1423        if isinstance(commit, Commit):
1424            commit = commit.sha
1425
1426        data = {}
1427        if context is not None:
1428            data["context"] = context
1429        if description is not None:
1430            data["description"] = description
1431        if state is not None:
1432            data["state"] = state.value
1433        if target_url is not None:
1434            data["target_url"] = target_url
1435
1436        url = self.REPO_GET_COMMIT_STATUS.format(
1437            owner=self.owner.username, repo=self.name, sha=commit
1438        )
1439        response = self.allspice_client.requests_post(url, data=data)
1440        return CommitStatus.parse_response(self.allspice_client, response)

Create a status on a commit.

See allspice.allspice.io/api/swagger#/repository/repoCreateStatus">https://huballspice.allspice.io/api/swagger#/repository/repoCreateStatus

def delete(self):
1442    def delete(self):
1443        self.allspice_client.requests_delete(
1444            Repository.REPO_DELETE % (self.owner.username, self.name)
1445        )
1446        self.deleted = True
class Repository.ArchiveFormat(enum.Enum):
567    class ArchiveFormat(Enum):
568        """
569        Archive formats for Repository.get_archive
570        """
571
572        TAR = "tar.gz"
573        ZIP = "zip"

Archive formats for Repository.get_archive

TAR = <ArchiveFormat.TAR: 'tar.gz'>
ZIP = <ArchiveFormat.ZIP: 'zip'>
class Repository.CommitStatusSort(enum.Enum):
575    class CommitStatusSort(Enum):
576        """
577        Sort order for Repository.get_commit_status
578        """
579
580        OLDEST = "oldest"
581        RECENT_UPDATE = "recentupdate"
582        LEAST_UPDATE = "leastupdate"
583        LEAST_INDEX = "leastindex"
584        HIGHEST_INDEX = "highestindex"

Sort order for Repository.get_commit_status

OLDEST = <CommitStatusSort.OLDEST: 'oldest'>
RECENT_UPDATE = <CommitStatusSort.RECENT_UPDATE: 'recentupdate'>
LEAST_UPDATE = <CommitStatusSort.LEAST_UPDATE: 'leastupdate'>
LEAST_INDEX = <CommitStatusSort.LEAST_INDEX: 'leastindex'>
HIGHEST_INDEX = <CommitStatusSort.HIGHEST_INDEX: 'highestindex'>
class Team(allspice.baseapiobject.ApiObject):
2218class Team(ApiObject):
2219    can_create_org_repo: bool
2220    description: str
2221    id: int
2222    includes_all_repositories: bool
2223    name: str
2224    organization: Optional["Organization"]
2225    permission: str
2226    units: List[str]
2227    units_map: Dict[str, str]
2228
2229    API_OBJECT = """/teams/{id}"""  # <id>
2230    ADD_REPO = """/teams/%s/repos/%s/%s"""  # <id, org, repo>
2231    TEAM_DELETE = """/teams/%s"""  # <id>
2232    GET_MEMBERS = """/teams/%s/members"""  # <id>
2233    GET_REPOS = """/teams/%s/repos"""  # <id>
2234
2235    def __init__(self, allspice_client):
2236        super().__init__(allspice_client)
2237
2238    def __eq__(self, other):
2239        if not isinstance(other, Team):
2240            return False
2241        return self.organization == other.organization and self.id == other.id
2242
2243    def __hash__(self):
2244        return hash(self.organization) ^ hash(self.id)
2245
2246    _fields_to_parsers: ClassVar[dict] = {
2247        "organization": lambda allspice_client, o: Organization.parse_response(allspice_client, o)
2248    }
2249
2250    _patchable_fields: ClassVar[set[str]] = {
2251        "can_create_org_repo",
2252        "description",
2253        "includes_all_repositories",
2254        "name",
2255        "permission",
2256        "units",
2257        "units_map",
2258    }
2259
2260    @classmethod
2261    def request(cls, allspice_client, id: int):
2262        return cls._request(allspice_client, {"id": id})
2263
2264    def commit(self):
2265        args = {"id": self.id}
2266        self._commit(args)
2267
2268    def add_user(self, user: User):
2269        """https://hub.allspice.io/api/swagger#/organization/orgAddTeamMember"""
2270        url = f"/teams/{self.id}/members/{user.login}"
2271        self.allspice_client.requests_put(url)
2272
2273    def add_repo(self, org: Organization, repo: Union[Repository, str]):
2274        if isinstance(repo, Repository):
2275            repo_name = repo.name
2276        else:
2277            repo_name = repo
2278        self.allspice_client.requests_put(Team.ADD_REPO % (self.id, org.username, repo_name))
2279
2280    def get_members(self):
2281        """Get all users assigned to the team."""
2282        results = self.allspice_client.requests_get_paginated(
2283            Team.GET_MEMBERS % self.id,
2284        )
2285        return [User.parse_response(self.allspice_client, result) for result in results]
2286
2287    def get_repos(self):
2288        """Get all repos of this Team."""
2289        results = self.allspice_client.requests_get(Team.GET_REPOS % self.id)
2290        return [Repository.parse_response(self.allspice_client, result) for result in results]
2291
2292    def delete(self):
2293        self.allspice_client.requests_delete(Team.TEAM_DELETE % self.id)
2294        self.deleted = True
2295
2296    def remove_team_member(self, user_name: str):
2297        url = f"/teams/{self.id}/members/{user_name}"
2298        self.allspice_client.requests_delete(url)
Team(allspice_client)
2235    def __init__(self, allspice_client):
2236        super().__init__(allspice_client)
can_create_org_repo: bool
description: str
id: int
includes_all_repositories: bool
name: str
organization: Optional[Organization]
permission: str
units: List[str]
units_map: Dict[str, str]
API_OBJECT = '/teams/{id}'
ADD_REPO = '/teams/%s/repos/%s/%s'
TEAM_DELETE = '/teams/%s'
GET_MEMBERS = '/teams/%s/members'
GET_REPOS = '/teams/%s/repos'
@classmethod
def request(cls, allspice_client, id: int):
2260    @classmethod
2261    def request(cls, allspice_client, id: int):
2262        return cls._request(allspice_client, {"id": id})
def commit(self):
2264    def commit(self):
2265        args = {"id": self.id}
2266        self._commit(args)
def add_user(self, user: User):
2268    def add_user(self, user: User):
2269        """https://hub.allspice.io/api/swagger#/organization/orgAddTeamMember"""
2270        url = f"/teams/{self.id}/members/{user.login}"
2271        self.allspice_client.requests_put(url)

allspice.allspice.io/api/swagger#/organization/orgAddTeamMember">https://huballspice.allspice.io/api/swagger#/organization/orgAddTeamMember

def add_repo( self, org: Organization, repo: Union[Repository, str]):
2273    def add_repo(self, org: Organization, repo: Union[Repository, str]):
2274        if isinstance(repo, Repository):
2275            repo_name = repo.name
2276        else:
2277            repo_name = repo
2278        self.allspice_client.requests_put(Team.ADD_REPO % (self.id, org.username, repo_name))
def get_members(self):
2280    def get_members(self):
2281        """Get all users assigned to the team."""
2282        results = self.allspice_client.requests_get_paginated(
2283            Team.GET_MEMBERS % self.id,
2284        )
2285        return [User.parse_response(self.allspice_client, result) for result in results]

Get all users assigned to the team.

def get_repos(self):
2287    def get_repos(self):
2288        """Get all repos of this Team."""
2289        results = self.allspice_client.requests_get(Team.GET_REPOS % self.id)
2290        return [Repository.parse_response(self.allspice_client, result) for result in results]

Get all repos of this Team.

def delete(self):
2292    def delete(self):
2293        self.allspice_client.requests_delete(Team.TEAM_DELETE % self.id)
2294        self.deleted = True
def remove_team_member(self, user_name: str):
2296    def remove_team_member(self, user_name: str):
2297        url = f"/teams/{self.id}/members/{user_name}"
2298        self.allspice_client.requests_delete(url)
class User(allspice.baseapiobject.ApiObject):
241class User(ApiObject):
242    active: bool
243    admin: Any
244    allow_create_organization: Any
245    allow_git_hook: Any
246    allow_import_local: Any
247    avatar_url: str
248    created: str
249    description: str
250    email: str
251    emails: List[Any]
252    followers_count: int
253    following_count: int
254    full_name: str
255    html_url: str
256    id: int
257    is_admin: bool
258    language: str
259    last_login: str
260    location: str
261    login: str
262    login_name: str
263    max_repo_creation: Any
264    must_change_password: Any
265    password: Any
266    prohibit_login: bool
267    restricted: bool
268    source_id: int
269    starred_repos_count: int
270    username: str
271    visibility: str
272    website: str
273
274    API_OBJECT = """/users/{name}"""  # <org>
275    USER_MAIL = """/user/emails?sudo=%s"""  # <name>
276    USER_PATCH = """/admin/users/%s"""  # <username>
277    ADMIN_DELETE_USER = """/admin/users/%s"""  # <username>
278    ADMIN_EDIT_USER = """/admin/users/{username}"""  # <username>
279    USER_HEATMAP = """/users/%s/heatmap"""  # <username>
280
281    def __init__(self, allspice_client):
282        super().__init__(allspice_client)
283        self._emails = []
284
285    def __eq__(self, other):
286        if not isinstance(other, User):
287            return False
288        return self.allspice_client == other.allspice_client and self.id == other.id
289
290    def __hash__(self):
291        return hash(self.allspice_client) ^ hash(self.id)
292
293    @property
294    def emails(self):
295        self.__request_emails()
296        return self._emails
297
298    @classmethod
299    def request(cls, allspice_client, name: str) -> "User":
300        api_object = cls._request(allspice_client, {"name": name})
301        return api_object
302
303    _patchable_fields: ClassVar[set[str]] = {
304        "active",
305        "admin",
306        "allow_create_organization",
307        "allow_git_hook",
308        "allow_import_local",
309        "email",
310        "full_name",
311        "location",
312        "login_name",
313        "max_repo_creation",
314        "must_change_password",
315        "password",
316        "prohibit_login",
317        "website",
318    }
319
320    def commit(self, login_name: str, source_id: int = 0):
321        """
322        Unfortunately it is necessary to require the login name
323        as well as the login source (that is not supplied when getting a user) for
324        changing a user.
325        Usually source_id is 0 and the login_name is equal to the username.
326        """
327        values = self.get_dirty_fields()
328        values.update(
329            # api-doc says that the "source_id" is necessary; works without though
330            {"login_name": login_name, "source_id": source_id}
331        )
332        args = {"username": self.username}
333        self.allspice_client.requests_patch(User.ADMIN_EDIT_USER.format(**args), data=values)
334        self._dirty_fields = {}
335
336    def create_repo(
337        self,
338        repoName: str,
339        description: str = "",
340        private: bool = False,
341        autoInit=True,
342        gitignores: Optional[str] = None,
343        license: Optional[str] = None,
344        readme: str = "Default",
345        issue_labels: Optional[str] = None,
346        default_branch="master",
347    ):
348        """Create a user Repository
349
350        Throws:
351            AlreadyExistsException: If the Repository exists already.
352            Exception: If something else went wrong.
353        """
354        result = self.allspice_client.requests_post(
355            "/user/repos",
356            data={
357                "name": repoName,
358                "description": description,
359                "private": private,
360                "auto_init": autoInit,
361                "gitignores": gitignores,
362                "license": license,
363                "issue_labels": issue_labels,
364                "readme": readme,
365                "default_branch": default_branch,
366            },
367        )
368        if "id" in result:
369            self.allspice_client.logger.info("Successfully created Repository %s " % result["name"])
370        else:
371            self.allspice_client.logger.error(result["message"])
372            raise Exception("Repository not created... (gitea: %s)" % result["message"])
373        return Repository.parse_response(self.allspice_client, result)
374
375    def get_repositories(self) -> List["Repository"]:
376        """Get all Repositories owned by this User."""
377        url = f"/users/{self.username}/repos"
378        results = self.allspice_client.requests_get_paginated(url)
379        return [Repository.parse_response(self.allspice_client, result) for result in results]
380
381    def get_orgs(self) -> List[Organization]:
382        """Get all Organizations this user is a member of."""
383        url = f"/users/{self.username}/orgs"
384        results = self.allspice_client.requests_get_paginated(url)
385        return [Organization.parse_response(self.allspice_client, result) for result in results]
386
387    def get_teams(self) -> List["Team"]:
388        url = "/user/teams"
389        results = self.allspice_client.requests_get_paginated(url, sudo=self)
390        return [Team.parse_response(self.allspice_client, result) for result in results]
391
392    def get_accessible_repos(self) -> List["Repository"]:
393        """Get all Repositories accessible by the logged in User."""
394        results = self.allspice_client.requests_get("/user/repos", sudo=self)
395        return [Repository.parse_response(self.allspice_client, result) for result in results]
396
397    def __request_emails(self):
398        result = self.allspice_client.requests_get(User.USER_MAIL % self.login)
399        # report if the adress changed by this
400        for mail in result:
401            self._emails.append(mail["email"])
402            if mail["primary"]:
403                self._email = mail["email"]
404
405    def delete(self):
406        """Deletes this User. Also deletes all Repositories he owns."""
407        self.allspice_client.requests_delete(User.ADMIN_DELETE_USER % self.username)
408        self.deleted = True
409
410    def get_heatmap(self) -> List[Tuple[datetime, int]]:
411        results = self.allspice_client.requests_get(User.USER_HEATMAP % self.username)
412        results = [
413            (datetime.fromtimestamp(result["timestamp"]), result["contributions"])
414            for result in results
415        ]
416        return results
User(allspice_client)
281    def __init__(self, allspice_client):
282        super().__init__(allspice_client)
283        self._emails = []
active: bool
admin: Any
allow_create_organization: Any
allow_git_hook: Any
allow_import_local: Any
avatar_url: str
created: str
description: str
email: str
emails
293    @property
294    def emails(self):
295        self.__request_emails()
296        return self._emails
followers_count: int
following_count: int
full_name: str
html_url: str
id: int
is_admin: bool
language: str
last_login: str
location: str
login: str
login_name: str
max_repo_creation: Any
must_change_password: Any
password: Any
prohibit_login: bool
restricted: bool
source_id: int
starred_repos_count: int
username: str
visibility: str
website: str
API_OBJECT = '/users/{name}'
USER_MAIL = '/user/emails?sudo=%s'
USER_PATCH = '/admin/users/%s'
ADMIN_DELETE_USER = '/admin/users/%s'
ADMIN_EDIT_USER = '/admin/users/{username}'
USER_HEATMAP = '/users/%s/heatmap'
@classmethod
def request(cls, allspice_client, name: str) -> User:
298    @classmethod
299    def request(cls, allspice_client, name: str) -> "User":
300        api_object = cls._request(allspice_client, {"name": name})
301        return api_object
def commit(self, login_name: str, source_id: int = 0):
320    def commit(self, login_name: str, source_id: int = 0):
321        """
322        Unfortunately it is necessary to require the login name
323        as well as the login source (that is not supplied when getting a user) for
324        changing a user.
325        Usually source_id is 0 and the login_name is equal to the username.
326        """
327        values = self.get_dirty_fields()
328        values.update(
329            # api-doc says that the "source_id" is necessary; works without though
330            {"login_name": login_name, "source_id": source_id}
331        )
332        args = {"username": self.username}
333        self.allspice_client.requests_patch(User.ADMIN_EDIT_USER.format(**args), data=values)
334        self._dirty_fields = {}

Unfortunately it is necessary to require the login name as well as the login source (that is not supplied when getting a user) for changing a user. Usually source_id is 0 and the login_name is equal to the username.

def create_repo( self, repoName: str, description: str = '', private: bool = False, autoInit=True, gitignores: Optional[str] = None, license: Optional[str] = None, readme: str = 'Default', issue_labels: Optional[str] = None, default_branch='master'):
336    def create_repo(
337        self,
338        repoName: str,
339        description: str = "",
340        private: bool = False,
341        autoInit=True,
342        gitignores: Optional[str] = None,
343        license: Optional[str] = None,
344        readme: str = "Default",
345        issue_labels: Optional[str] = None,
346        default_branch="master",
347    ):
348        """Create a user Repository
349
350        Throws:
351            AlreadyExistsException: If the Repository exists already.
352            Exception: If something else went wrong.
353        """
354        result = self.allspice_client.requests_post(
355            "/user/repos",
356            data={
357                "name": repoName,
358                "description": description,
359                "private": private,
360                "auto_init": autoInit,
361                "gitignores": gitignores,
362                "license": license,
363                "issue_labels": issue_labels,
364                "readme": readme,
365                "default_branch": default_branch,
366            },
367        )
368        if "id" in result:
369            self.allspice_client.logger.info("Successfully created Repository %s " % result["name"])
370        else:
371            self.allspice_client.logger.error(result["message"])
372            raise Exception("Repository not created... (gitea: %s)" % result["message"])
373        return Repository.parse_response(self.allspice_client, result)

Create a user Repository

Throws: AlreadyExistsException: If the Repository exists already. Exception: If something else went wrong.

def get_repositories(self) -> List[Repository]:
375    def get_repositories(self) -> List["Repository"]:
376        """Get all Repositories owned by this User."""
377        url = f"/users/{self.username}/repos"
378        results = self.allspice_client.requests_get_paginated(url)
379        return [Repository.parse_response(self.allspice_client, result) for result in results]

Get all Repositories owned by this User.

def get_orgs(self) -> List[Organization]:
381    def get_orgs(self) -> List[Organization]:
382        """Get all Organizations this user is a member of."""
383        url = f"/users/{self.username}/orgs"
384        results = self.allspice_client.requests_get_paginated(url)
385        return [Organization.parse_response(self.allspice_client, result) for result in results]

Get all Organizations this user is a member of.

def get_teams(self) -> List[Team]:
387    def get_teams(self) -> List["Team"]:
388        url = "/user/teams"
389        results = self.allspice_client.requests_get_paginated(url, sudo=self)
390        return [Team.parse_response(self.allspice_client, result) for result in results]
def get_accessible_repos(self) -> List[Repository]:
392    def get_accessible_repos(self) -> List["Repository"]:
393        """Get all Repositories accessible by the logged in User."""
394        results = self.allspice_client.requests_get("/user/repos", sudo=self)
395        return [Repository.parse_response(self.allspice_client, result) for result in results]

Get all Repositories accessible by the logged in User.

def delete(self):
405    def delete(self):
406        """Deletes this User. Also deletes all Repositories he owns."""
407        self.allspice_client.requests_delete(User.ADMIN_DELETE_USER % self.username)
408        self.deleted = True

Deletes this User. Also deletes all Repositories he owns.

def get_heatmap(self) -> List[Tuple[datetime.datetime, int]]:
410    def get_heatmap(self) -> List[Tuple[datetime, int]]:
411        results = self.allspice_client.requests_get(User.USER_HEATMAP % self.username)
412        results = [
413            (datetime.fromtimestamp(result["timestamp"]), result["contributions"])
414            for result in results
415        ]
416        return results