allspice

A very simple API client for AllSpice Hub

Note that not the full Swagger-API is accessible. The whole implementation is focused on making access and working with Organizations, Teams, Repositories and Users as pain free as possible.

Forked from https://github.com/Langenfeld/py-gitea.

Usage

Docs

See the documentation site.

Examples

Check the examples directory for full, working example scripts that you can adapt or refer to for your own needs.

Quickstart

First get an allspice_client object wrapping access and authentication (via an api token) for your instance of AllSpice Hub.

from allspice import *

# By default, points to hub.allspice.io.
allspice_client = AllSpice(token_text=TOKEN)

# If you are self-hosting:
allspice_client = AllSpice(allspice_hub_url=URL, token_text=TOKEN)

Operations like requesting the AllSpice version or authentication user can be requested directly from the allspice_client object:

print("AllSpice Version: " + allspice_client.get_version())
print("API-Token belongs to user: " + allspice_client.get_user().username)

Adding entities like Users, Organizations, ... also is done via the allspice_client object.

user = allspice_client.create_user("Test Testson", "test@test.test", "password")

All operations on entities in allspice are then accomplished via the according wrapper objects for those entities. Each of those objects has a .request method that creates an entity according to your allspice_client instance.

other_user = User.request(allspice_client, "OtherUserName")
print(other_user.username)

Note that the fields of the User, Organization,... classes are dynamically created at runtime, and thus not visible during divelopment. Refer to the AllSpice API documentation for the fields names.

Fields that can not be altered via allspice-api, are read only. After altering a field, the .commit method of the according object must be called to synchronize the changed fields with your allspice_client instance.

org = Organization.request(allspice_client, test_org)
org.description = "some new description"
org.location = "some new location"
org.commit()

An entity in allspice can be deleted by calling delete.

org.delete()

All entity objects do have methods to execute some of the requests possible though the AllSpice api:

org = Organization.request(allspice_client, ORGNAME)
teams = org.get_teams()
for team in teams:
    repos = team.get_repos()
    for repo in repos:
        print(repo.name)

 1"""
 2.. include:: ../README.md
 3   :start-line: 1
 4   :end-before: Installation
 5"""
 6
 7from .allspice import (
 8    AllSpice,
 9)
10from .apiobject import (
11    Branch,
12    Comment,
13    Commit,
14    Content,
15    DesignReview,
16    Issue,
17    Milestone,
18    Organization,
19    Release,
20    Repository,
21    Team,
22    User,
23)
24from .exceptions import AlreadyExistsException, NotFoundException
25
26__version__ = "3.6.0"
27
28__all__ = [
29    "AllSpice",
30    "User",
31    "Organization",
32    "Team",
33    "Repository",
34    "Branch",
35    "NotFoundException",
36    "AlreadyExistsException",
37    "Issue",
38    "Milestone",
39    "Commit",
40    "Comment",
41    "Content",
42    "DesignReview",
43    "Release",
44]
class AllSpice:
 20class AllSpice:
 21    """Object to establish a session with AllSpice Hub."""
 22
 23    ADMIN_CREATE_USER = """/admin/users"""
 24    GET_USERS_ADMIN = """/admin/users"""
 25    ADMIN_REPO_CREATE = """/admin/users/%s/repos"""  # <ownername>
 26    ALLSPICE_HUB_VERSION = """/version"""
 27    GET_USER = """/user"""
 28    GET_REPOSITORY = """/repos/{owner}/{name}"""
 29    CREATE_ORG = """/admin/users/%s/orgs"""  # <username>
 30    CREATE_TEAM = """/orgs/%s/teams"""  # <orgname>
 31
 32    def __init__(
 33        self,
 34        allspice_hub_url="https://hub.allspice.io",
 35        token_text=None,
 36        auth=None,
 37        verify=True,
 38        log_level="INFO",
 39        ratelimiting=(100, 60),
 40    ):
 41        """Initializing an instance of the AllSpice Hub Client
 42
 43        Args:
 44            allspice_hub_url (str): The URL for the AllSpice Hub instance.
 45                Defaults to `https://hub.allspice.io`.
 46
 47            token_text (str, None): The access token, by default None.
 48
 49            auth (tuple, None): The user credentials
 50                `(username, password)`, by default None.
 51
 52            verify (bool): If True, allow insecure server connections
 53                when using SSL.
 54
 55            log_level (str): The log level, by default `INFO`.
 56
 57            ratelimiting (tuple[int, int], None): `(max_calls, period)`,
 58                If None, no rate limiting is applied. By default, 100 calls
 59                per minute are allowed.
 60        """
 61
 62        self.logger = logging.getLogger(__name__)
 63        self.logger.setLevel(log_level)
 64        self.headers = {
 65            "Content-type": "application/json",
 66        }
 67        self.url = allspice_hub_url
 68
 69        if ratelimiting is None:
 70            self.requests = requests.Session()
 71        else:
 72            (max_calls, period) = ratelimiting
 73            self.requests = RateLimitedSession(max_calls=max_calls, period=period)
 74
 75        # Manage authentification
 76        if not token_text and not auth:
 77            raise ValueError("Please provide auth or token_text, but not both")
 78        if token_text:
 79            self.headers["Authorization"] = "token " + token_text
 80        if auth:
 81            self.logger.warning(
 82                "Using basic auth is not recommended. Prefer using a token instead."
 83            )
 84            self.requests.auth = auth
 85
 86        # Manage SSL certification verification
 87        self.requests.verify = verify
 88        if not verify:
 89            urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
 90
 91    def __get_url(self, endpoint):
 92        url = self.url + "/api/v1" + endpoint
 93        self.logger.debug("Url: %s" % url)
 94        return url
 95
 96    def __get(self, endpoint: str, params: Mapping = frozendict()) -> requests.Response:
 97        request = self.requests.get(self.__get_url(endpoint), headers=self.headers, params=params)
 98        if request.status_code not in [200, 201]:
 99            message = f"Received status code: {request.status_code} ({request.url})"
100            if request.status_code in [404]:
101                raise NotFoundException(message)
102            if request.status_code in [403]:
103                raise Exception(
104                    f"Unauthorized: {request.url} - Check your permissions and try again! ({message})"
105                )
106            if request.status_code in [409]:
107                raise ConflictException(message)
108            if request.status_code in [503]:
109                raise NotYetGeneratedException(message)
110            raise Exception(message)
111        return request
112
113    @staticmethod
114    def parse_result(result) -> Dict:
115        """Parses the result-JSON to a dict."""
116        if result.text and len(result.text) > 3:
117            return json.loads(result.text)
118        return {}
119
120    def requests_get(self, endpoint: str, params: Mapping = frozendict(), sudo=None):
121        combined_params = {}
122        combined_params.update(params)
123        if sudo:
124            combined_params["sudo"] = sudo.username
125        return self.parse_result(self.__get(endpoint, combined_params))
126
127    def requests_get_raw(self, endpoint: str, params=frozendict(), sudo=None) -> bytes:
128        combined_params = {}
129        combined_params.update(params)
130        if sudo:
131            combined_params["sudo"] = sudo.username
132        return self.__get(endpoint, combined_params).content
133
134    def requests_get_paginated(
135        self,
136        endpoint: str,
137        params=frozendict(),
138        sudo=None,
139        page_key: str = "page",
140        first_page: int = 1,
141    ):
142        page = first_page
143        combined_params = {}
144        combined_params.update(params)
145        aggregated_result = []
146        while True:
147            combined_params[page_key] = page
148            result = self.requests_get(endpoint, combined_params, sudo)
149
150            if not result:
151                return aggregated_result
152
153            if isinstance(result, dict):
154                if "data" in result:
155                    data = result["data"]
156                    if len(data) == 0:
157                        return aggregated_result
158                    aggregated_result.extend(data)
159                elif "tree" in result:
160                    data = result["tree"]
161                    if data is None or len(data) == 0:
162                        return aggregated_result
163                    aggregated_result.extend(data)
164                else:
165                    raise NotImplementedError(
166                        "requests_get_paginated does not know how to handle responses of this type."
167                    )
168            else:
169                aggregated_result.extend(result)
170
171            page += 1
172
173    def requests_put(self, endpoint: str, data: Optional[dict] = None):
174        if not data:
175            data = {}
176        request = self.requests.put(
177            self.__get_url(endpoint), headers=self.headers, data=json.dumps(data)
178        )
179        if request.status_code not in [200, 204]:
180            message = f"Received status code: {request.status_code} ({request.url}) {request.text}"
181            self.logger.error(message)
182            raise Exception(message)
183
184    def requests_delete(self, endpoint: str, data: Optional[dict] = None):
185        request = self.requests.delete(
186            self.__get_url(endpoint), headers=self.headers, data=json.dumps(data)
187        )
188        if request.status_code not in [200, 204]:
189            message = f"Received status code: {request.status_code} ({request.url})"
190            self.logger.error(message)
191            raise Exception(message)
192
193    def requests_post(
194        self,
195        endpoint: str,
196        data: Optional[dict] = None,
197        params: Optional[dict] = None,
198        files: Optional[dict] = None,
199    ):
200        """
201        Make a POST call to the endpoint.
202
203        :param endpoint: The path to the endpoint
204        :param data: A dictionary for JSON data
205        :param params: A dictionary of query params
206        :param files: A dictionary of files, see requests.post. Using both files and data
207                      can lead to unexpected results!
208        :return: The JSON response parsed as a dict
209        """
210
211        # This should ideally be a TypedDict of the type of arguments taken by
212        # `requests.post`.
213        args: dict[str, Any] = {
214            "headers": self.headers.copy(),
215        }
216        if data is not None:
217            args["data"] = json.dumps(data)
218        if params is not None:
219            args["params"] = params
220        if files is not None:
221            args["headers"].pop("Content-type")
222            args["files"] = files
223
224        request = self.requests.post(self.__get_url(endpoint), **args)
225
226        if request.status_code not in [200, 201, 202]:
227            if "already exists" in request.text or "e-mail already in use" in request.text:
228                self.logger.warning(request.text)
229                raise AlreadyExistsException()
230            self.logger.error(f"Received status code: {request.status_code} ({request.url})")
231            self.logger.error(f"With info: {data} ({self.headers})")
232            self.logger.error(f"Answer: {request.text}")
233            raise Exception(
234                f"Received status code: {request.status_code} ({request.url}), {request.text}"
235            )
236        return self.parse_result(request)
237
238    def requests_patch(self, endpoint: str, data: dict):
239        request = self.requests.patch(
240            self.__get_url(endpoint), headers=self.headers, data=json.dumps(data)
241        )
242        if request.status_code not in [200, 201]:
243            error_message = f"Received status code: {request.status_code} ({request.url}) {data}"
244            self.logger.error(error_message)
245            raise Exception(error_message)
246        return self.parse_result(request)
247
248    def get_orgs_public_members_all(self, orgname):
249        path = "/orgs/" + orgname + "/public_members"
250        return self.requests_get(path)
251
252    def get_orgs(self):
253        path = "/admin/orgs"
254        results = self.requests_get(path)
255        return [Organization.parse_response(self, result) for result in results]
256
257    def get_user(self):
258        result = self.requests_get(AllSpice.GET_USER)
259        return User.parse_response(self, result)
260
261    def get_version(self) -> str:
262        result = self.requests_get(AllSpice.ALLSPICE_HUB_VERSION)
263        return result["version"]
264
265    def get_users(self) -> List[User]:
266        results = self.requests_get(AllSpice.GET_USERS_ADMIN)
267        return [User.parse_response(self, result) for result in results]
268
269    def get_user_by_email(self, email: str) -> Optional[User]:
270        users = self.get_users()
271        for user in users:
272            if user.email == email or email in user.emails:
273                return user
274        return None
275
276    def get_user_by_name(self, username: str) -> Optional[User]:
277        users = self.get_users()
278        for user in users:
279            if user.username == username:
280                return user
281        return None
282
283    def get_repository(self, owner: str, name: str) -> Repository:
284        path = self.GET_REPOSITORY.format(owner=owner, name=name)
285        result = self.requests_get(path)
286        return Repository.parse_response(self, result)
287
288    def create_user(
289        self,
290        user_name: str,
291        email: str,
292        password: str,
293        full_name: Optional[str] = None,
294        login_name: Optional[str] = None,
295        change_pw=True,
296        send_notify=True,
297        source_id=0,
298    ):
299        """Create User.
300        Throws:
301            AlreadyExistsException, if the User exists already
302            Exception, if something else went wrong.
303        """
304        if not login_name:
305            login_name = user_name
306        if not full_name:
307            full_name = user_name
308        request_data = {
309            "source_id": source_id,
310            "login_name": login_name,
311            "full_name": full_name,
312            "username": user_name,
313            "email": email,
314            "password": password,
315            "send_notify": send_notify,
316            "must_change_password": change_pw,
317        }
318
319        self.logger.debug("Gitea post payload: %s", request_data)
320        result = self.requests_post(AllSpice.ADMIN_CREATE_USER, data=request_data)
321        if "id" in result:
322            self.logger.info(
323                "Successfully created User %s <%s> (id %s)",
324                result["login"],
325                result["email"],
326                result["id"],
327            )
328            self.logger.debug("Gitea response: %s", result)
329        else:
330            self.logger.error(result["message"])
331            raise Exception("User not created... (gitea: %s)" % result["message"])
332        user = User.parse_response(self, result)
333        return user
334
335    def create_repo(
336        self,
337        repoOwner: Union[User, Organization],
338        repoName: str,
339        description: str = "",
340        private: bool = False,
341        autoInit=True,
342        gitignores: Optional[str] = None,
343        license: Optional[str] = None,
344        readme: str = "Default",
345        issue_labels: Optional[str] = None,
346        default_branch="master",
347    ):
348        """Create a Repository as the administrator
349
350        Throws:
351            AlreadyExistsException: If the Repository exists already.
352            Exception: If something else went wrong.
353
354        Note:
355            Non-admin users can not use this method. Please use instead
356            `allspice.User.create_repo` or `allspice.Organization.create_repo`.
357        """
358        # although this only says user in the api, this also works for
359        # organizations
360        assert isinstance(repoOwner, User) or isinstance(repoOwner, Organization)
361        result = self.requests_post(
362            AllSpice.ADMIN_REPO_CREATE % repoOwner.username,
363            data={
364                "name": repoName,
365                "description": description,
366                "private": private,
367                "auto_init": autoInit,
368                "gitignores": gitignores,
369                "license": license,
370                "issue_labels": issue_labels,
371                "readme": readme,
372                "default_branch": default_branch,
373            },
374        )
375        if "id" in result:
376            self.logger.info("Successfully created Repository %s " % result["name"])
377        else:
378            self.logger.error(result["message"])
379            raise Exception("Repository not created... (gitea: %s)" % result["message"])
380        return Repository.parse_response(self, result)
381
382    def create_org(
383        self,
384        owner: User,
385        orgName: str,
386        description: str,
387        location="",
388        website="",
389        full_name="",
390    ):
391        assert isinstance(owner, User)
392        result = self.requests_post(
393            AllSpice.CREATE_ORG % owner.username,
394            data={
395                "username": orgName,
396                "description": description,
397                "location": location,
398                "website": website,
399                "full_name": full_name,
400            },
401        )
402        if "id" in result:
403            self.logger.info("Successfully created Organization %s" % result["username"])
404        else:
405            self.logger.error("Organization not created... (gitea: %s)" % result["message"])
406            self.logger.error(result["message"])
407            raise Exception("Organization not created... (gitea: %s)" % result["message"])
408        return Organization.parse_response(self, result)
409
410    def create_team(
411        self,
412        org: Organization,
413        name: str,
414        description: str = "",
415        permission: str = "read",
416        can_create_org_repo: bool = False,
417        includes_all_repositories: bool = False,
418        units=(
419            "repo.code",
420            "repo.issues",
421            "repo.ext_issues",
422            "repo.wiki",
423            "repo.pulls",
424            "repo.releases",
425            "repo.ext_wiki",
426        ),
427        units_map={},
428    ):
429        """Creates a Team.
430
431        Args:
432            org (Organization): Organization the Team will be part of.
433            name (str): The Name of the Team to be created.
434            description (str): Optional, None, short description of the new Team.
435            permission (str): Optional, 'read', What permissions the members
436            units_map (dict): Optional, {}, a mapping of units to their
437                permissions. If None or empty, the `permission` permission will
438                be applied to all units. Note: When both `units` and `units_map`
439                are given, `units_map` will be preferred.
440        """
441
442        result = self.requests_post(
443            AllSpice.CREATE_TEAM % org.username,
444            data={
445                "name": name,
446                "description": description,
447                "permission": permission,
448                "can_create_org_repo": can_create_org_repo,
449                "includes_all_repositories": includes_all_repositories,
450                "units": units,
451                "units_map": units_map,
452            },
453        )
454
455        if "id" in result:
456            self.logger.info("Successfully created Team %s" % result["name"])
457        else:
458            self.logger.error("Team not created... (gitea: %s)" % result["message"])
459            self.logger.error(result["message"])
460            raise Exception("Team not created... (gitea: %s)" % result["message"])
461        api_object = Team.parse_response(self, result)
462        setattr(
463            api_object, "_organization", org
464        )  # fixes strange behaviour of gitea not returning a valid organization here.
465        return api_object

Object to establish a session with AllSpice Hub.

AllSpice( allspice_hub_url='https://hub.allspice.io', token_text=None, auth=None, verify=True, log_level='INFO', ratelimiting=(100, 60))
32    def __init__(
33        self,
34        allspice_hub_url="https://hub.allspice.io",
35        token_text=None,
36        auth=None,
37        verify=True,
38        log_level="INFO",
39        ratelimiting=(100, 60),
40    ):
41        """Initializing an instance of the AllSpice Hub Client
42
43        Args:
44            allspice_hub_url (str): The URL for the AllSpice Hub instance.
45                Defaults to `https://hub.allspice.io`.
46
47            token_text (str, None): The access token, by default None.
48
49            auth (tuple, None): The user credentials
50                `(username, password)`, by default None.
51
52            verify (bool): If True, allow insecure server connections
53                when using SSL.
54
55            log_level (str): The log level, by default `INFO`.
56
57            ratelimiting (tuple[int, int], None): `(max_calls, period)`,
58                If None, no rate limiting is applied. By default, 100 calls
59                per minute are allowed.
60        """
61
62        self.logger = logging.getLogger(__name__)
63        self.logger.setLevel(log_level)
64        self.headers = {
65            "Content-type": "application/json",
66        }
67        self.url = allspice_hub_url
68
69        if ratelimiting is None:
70            self.requests = requests.Session()
71        else:
72            (max_calls, period) = ratelimiting
73            self.requests = RateLimitedSession(max_calls=max_calls, period=period)
74
75        # Manage authentification
76        if not token_text and not auth:
77            raise ValueError("Please provide auth or token_text, but not both")
78        if token_text:
79            self.headers["Authorization"] = "token " + token_text
80        if auth:
81            self.logger.warning(
82                "Using basic auth is not recommended. Prefer using a token instead."
83            )
84            self.requests.auth = auth
85
86        # Manage SSL certification verification
87        self.requests.verify = verify
88        if not verify:
89            urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

Initializing an instance of the AllSpice Hub Client

Args: allspice_hub_url (str): The URL for the AllSpice Hub instance. Defaults to https://hub.allspice.io.

token_text (str, None): The access token, by default None.

auth (tuple, None): The user credentials
    `(username, password)`, by default None.

verify (bool): If True, allow insecure server connections
    when using SSL.

log_level (str): The log level, by default `INFO`.

ratelimiting (tuple[int, int], None): `(max_calls, period)`,
    If None, no rate limiting is applied. By default, 100 calls
    per minute are allowed.
ADMIN_CREATE_USER = '/admin/users'
GET_USERS_ADMIN = '/admin/users'
ADMIN_REPO_CREATE = '/admin/users/%s/repos'
ALLSPICE_HUB_VERSION = '/version'
GET_USER = '/user'
GET_REPOSITORY = '/repos/{owner}/{name}'
CREATE_ORG = '/admin/users/%s/orgs'
CREATE_TEAM = '/orgs/%s/teams'
logger
headers
url
@staticmethod
def parse_result(result) -> Dict:
113    @staticmethod
114    def parse_result(result) -> Dict:
115        """Parses the result-JSON to a dict."""
116        if result.text and len(result.text) > 3:
117            return json.loads(result.text)
118        return {}

Parses the result-JSON to a dict.

def requests_get( self, endpoint: str, params: Mapping = frozendict.frozendict({}), sudo=None):
120    def requests_get(self, endpoint: str, params: Mapping = frozendict(), sudo=None):
121        combined_params = {}
122        combined_params.update(params)
123        if sudo:
124            combined_params["sudo"] = sudo.username
125        return self.parse_result(self.__get(endpoint, combined_params))
def requests_get_raw( self, endpoint: str, params=frozendict.frozendict({}), sudo=None) -> bytes:
127    def requests_get_raw(self, endpoint: str, params=frozendict(), sudo=None) -> bytes:
128        combined_params = {}
129        combined_params.update(params)
130        if sudo:
131            combined_params["sudo"] = sudo.username
132        return self.__get(endpoint, combined_params).content
def requests_get_paginated( self, endpoint: str, params=frozendict.frozendict({}), sudo=None, page_key: str = 'page', first_page: int = 1):
134    def requests_get_paginated(
135        self,
136        endpoint: str,
137        params=frozendict(),
138        sudo=None,
139        page_key: str = "page",
140        first_page: int = 1,
141    ):
142        page = first_page
143        combined_params = {}
144        combined_params.update(params)
145        aggregated_result = []
146        while True:
147            combined_params[page_key] = page
148            result = self.requests_get(endpoint, combined_params, sudo)
149
150            if not result:
151                return aggregated_result
152
153            if isinstance(result, dict):
154                if "data" in result:
155                    data = result["data"]
156                    if len(data) == 0:
157                        return aggregated_result
158                    aggregated_result.extend(data)
159                elif "tree" in result:
160                    data = result["tree"]
161                    if data is None or len(data) == 0:
162                        return aggregated_result
163                    aggregated_result.extend(data)
164                else:
165                    raise NotImplementedError(
166                        "requests_get_paginated does not know how to handle responses of this type."
167                    )
168            else:
169                aggregated_result.extend(result)
170
171            page += 1
def requests_put(self, endpoint: str, data: Optional[dict] = None):
173    def requests_put(self, endpoint: str, data: Optional[dict] = None):
174        if not data:
175            data = {}
176        request = self.requests.put(
177            self.__get_url(endpoint), headers=self.headers, data=json.dumps(data)
178        )
179        if request.status_code not in [200, 204]:
180            message = f"Received status code: {request.status_code} ({request.url}) {request.text}"
181            self.logger.error(message)
182            raise Exception(message)
def requests_delete(self, endpoint: str, data: Optional[dict] = None):
184    def requests_delete(self, endpoint: str, data: Optional[dict] = None):
185        request = self.requests.delete(
186            self.__get_url(endpoint), headers=self.headers, data=json.dumps(data)
187        )
188        if request.status_code not in [200, 204]:
189            message = f"Received status code: {request.status_code} ({request.url})"
190            self.logger.error(message)
191            raise Exception(message)
def requests_post( self, endpoint: str, data: Optional[dict] = None, params: Optional[dict] = None, files: Optional[dict] = None):
193    def requests_post(
194        self,
195        endpoint: str,
196        data: Optional[dict] = None,
197        params: Optional[dict] = None,
198        files: Optional[dict] = None,
199    ):
200        """
201        Make a POST call to the endpoint.
202
203        :param endpoint: The path to the endpoint
204        :param data: A dictionary for JSON data
205        :param params: A dictionary of query params
206        :param files: A dictionary of files, see requests.post. Using both files and data
207                      can lead to unexpected results!
208        :return: The JSON response parsed as a dict
209        """
210
211        # This should ideally be a TypedDict of the type of arguments taken by
212        # `requests.post`.
213        args: dict[str, Any] = {
214            "headers": self.headers.copy(),
215        }
216        if data is not None:
217            args["data"] = json.dumps(data)
218        if params is not None:
219            args["params"] = params
220        if files is not None:
221            args["headers"].pop("Content-type")
222            args["files"] = files
223
224        request = self.requests.post(self.__get_url(endpoint), **args)
225
226        if request.status_code not in [200, 201, 202]:
227            if "already exists" in request.text or "e-mail already in use" in request.text:
228                self.logger.warning(request.text)
229                raise AlreadyExistsException()
230            self.logger.error(f"Received status code: {request.status_code} ({request.url})")
231            self.logger.error(f"With info: {data} ({self.headers})")
232            self.logger.error(f"Answer: {request.text}")
233            raise Exception(
234                f"Received status code: {request.status_code} ({request.url}), {request.text}"
235            )
236        return self.parse_result(request)

Make a POST call to the endpoint.

Parameters
  • endpoint: The path to the endpoint
  • data: A dictionary for JSON data
  • params: A dictionary of query params
  • files: A dictionary of files, see requests.post. Using both files and data can lead to unexpected results!
Returns

The JSON response parsed as a dict

def requests_patch(self, endpoint: str, data: dict):
238    def requests_patch(self, endpoint: str, data: dict):
239        request = self.requests.patch(
240            self.__get_url(endpoint), headers=self.headers, data=json.dumps(data)
241        )
242        if request.status_code not in [200, 201]:
243            error_message = f"Received status code: {request.status_code} ({request.url}) {data}"
244            self.logger.error(error_message)
245            raise Exception(error_message)
246        return self.parse_result(request)
def get_orgs_public_members_all(self, orgname):
248    def get_orgs_public_members_all(self, orgname):
249        path = "/orgs/" + orgname + "/public_members"
250        return self.requests_get(path)
def get_orgs(self):
252    def get_orgs(self):
253        path = "/admin/orgs"
254        results = self.requests_get(path)
255        return [Organization.parse_response(self, result) for result in results]
def get_user(self):
257    def get_user(self):
258        result = self.requests_get(AllSpice.GET_USER)
259        return User.parse_response(self, result)
def get_version(self) -> str:
261    def get_version(self) -> str:
262        result = self.requests_get(AllSpice.ALLSPICE_HUB_VERSION)
263        return result["version"]
def get_users(self) -> List[User]:
265    def get_users(self) -> List[User]:
266        results = self.requests_get(AllSpice.GET_USERS_ADMIN)
267        return [User.parse_response(self, result) for result in results]
def get_user_by_email(self, email: str) -> Optional[User]:
269    def get_user_by_email(self, email: str) -> Optional[User]:
270        users = self.get_users()
271        for user in users:
272            if user.email == email or email in user.emails:
273                return user
274        return None
def get_user_by_name(self, username: str) -> Optional[User]:
276    def get_user_by_name(self, username: str) -> Optional[User]:
277        users = self.get_users()
278        for user in users:
279            if user.username == username:
280                return user
281        return None
def get_repository(self, owner: str, name: str) -> Repository:
283    def get_repository(self, owner: str, name: str) -> Repository:
284        path = self.GET_REPOSITORY.format(owner=owner, name=name)
285        result = self.requests_get(path)
286        return Repository.parse_response(self, result)
def create_user( self, user_name: str, email: str, password: str, full_name: Optional[str] = None, login_name: Optional[str] = None, change_pw=True, send_notify=True, source_id=0):
288    def create_user(
289        self,
290        user_name: str,
291        email: str,
292        password: str,
293        full_name: Optional[str] = None,
294        login_name: Optional[str] = None,
295        change_pw=True,
296        send_notify=True,
297        source_id=0,
298    ):
299        """Create User.
300        Throws:
301            AlreadyExistsException, if the User exists already
302            Exception, if something else went wrong.
303        """
304        if not login_name:
305            login_name = user_name
306        if not full_name:
307            full_name = user_name
308        request_data = {
309            "source_id": source_id,
310            "login_name": login_name,
311            "full_name": full_name,
312            "username": user_name,
313            "email": email,
314            "password": password,
315            "send_notify": send_notify,
316            "must_change_password": change_pw,
317        }
318
319        self.logger.debug("Gitea post payload: %s", request_data)
320        result = self.requests_post(AllSpice.ADMIN_CREATE_USER, data=request_data)
321        if "id" in result:
322            self.logger.info(
323                "Successfully created User %s <%s> (id %s)",
324                result["login"],
325                result["email"],
326                result["id"],
327            )
328            self.logger.debug("Gitea response: %s", result)
329        else:
330            self.logger.error(result["message"])
331            raise Exception("User not created... (gitea: %s)" % result["message"])
332        user = User.parse_response(self, result)
333        return user

Create User. Throws: AlreadyExistsException, if the User exists already Exception, if something else went wrong.

def create_repo( self, repoOwner: Union[User, Organization], repoName: str, description: str = '', private: bool = False, autoInit=True, gitignores: Optional[str] = None, license: Optional[str] = None, readme: str = 'Default', issue_labels: Optional[str] = None, default_branch='master'):
335    def create_repo(
336        self,
337        repoOwner: Union[User, Organization],
338        repoName: str,
339        description: str = "",
340        private: bool = False,
341        autoInit=True,
342        gitignores: Optional[str] = None,
343        license: Optional[str] = None,
344        readme: str = "Default",
345        issue_labels: Optional[str] = None,
346        default_branch="master",
347    ):
348        """Create a Repository as the administrator
349
350        Throws:
351            AlreadyExistsException: If the Repository exists already.
352            Exception: If something else went wrong.
353
354        Note:
355            Non-admin users can not use this method. Please use instead
356            `allspice.User.create_repo` or `allspice.Organization.create_repo`.
357        """
358        # although this only says user in the api, this also works for
359        # organizations
360        assert isinstance(repoOwner, User) or isinstance(repoOwner, Organization)
361        result = self.requests_post(
362            AllSpice.ADMIN_REPO_CREATE % repoOwner.username,
363            data={
364                "name": repoName,
365                "description": description,
366                "private": private,
367                "auto_init": autoInit,
368                "gitignores": gitignores,
369                "license": license,
370                "issue_labels": issue_labels,
371                "readme": readme,
372                "default_branch": default_branch,
373            },
374        )
375        if "id" in result:
376            self.logger.info("Successfully created Repository %s " % result["name"])
377        else:
378            self.logger.error(result["message"])
379            raise Exception("Repository not created... (gitea: %s)" % result["message"])
380        return Repository.parse_response(self, result)

Create a Repository as the administrator

Throws: AlreadyExistsException: If the Repository exists already. Exception: If something else went wrong.

Note: Non-admin users can not use this method. Please use instead allspice.User.create_repo or allspice.Organization.create_repo.

def create_org( self, owner: User, orgName: str, description: str, location='', website='', full_name=''):
382    def create_org(
383        self,
384        owner: User,
385        orgName: str,
386        description: str,
387        location="",
388        website="",
389        full_name="",
390    ):
391        assert isinstance(owner, User)
392        result = self.requests_post(
393            AllSpice.CREATE_ORG % owner.username,
394            data={
395                "username": orgName,
396                "description": description,
397                "location": location,
398                "website": website,
399                "full_name": full_name,
400            },
401        )
402        if "id" in result:
403            self.logger.info("Successfully created Organization %s" % result["username"])
404        else:
405            self.logger.error("Organization not created... (gitea: %s)" % result["message"])
406            self.logger.error(result["message"])
407            raise Exception("Organization not created... (gitea: %s)" % result["message"])
408        return Organization.parse_response(self, result)
def create_team( self, org: Organization, name: str, description: str = '', permission: str = 'read', can_create_org_repo: bool = False, includes_all_repositories: bool = False, units=('repo.code', 'repo.issues', 'repo.ext_issues', 'repo.wiki', 'repo.pulls', 'repo.releases', 'repo.ext_wiki'), units_map={}):
410    def create_team(
411        self,
412        org: Organization,
413        name: str,
414        description: str = "",
415        permission: str = "read",
416        can_create_org_repo: bool = False,
417        includes_all_repositories: bool = False,
418        units=(
419            "repo.code",
420            "repo.issues",
421            "repo.ext_issues",
422            "repo.wiki",
423            "repo.pulls",
424            "repo.releases",
425            "repo.ext_wiki",
426        ),
427        units_map={},
428    ):
429        """Creates a Team.
430
431        Args:
432            org (Organization): Organization the Team will be part of.
433            name (str): The Name of the Team to be created.
434            description (str): Optional, None, short description of the new Team.
435            permission (str): Optional, 'read', What permissions the members
436            units_map (dict): Optional, {}, a mapping of units to their
437                permissions. If None or empty, the `permission` permission will
438                be applied to all units. Note: When both `units` and `units_map`
439                are given, `units_map` will be preferred.
440        """
441
442        result = self.requests_post(
443            AllSpice.CREATE_TEAM % org.username,
444            data={
445                "name": name,
446                "description": description,
447                "permission": permission,
448                "can_create_org_repo": can_create_org_repo,
449                "includes_all_repositories": includes_all_repositories,
450                "units": units,
451                "units_map": units_map,
452            },
453        )
454
455        if "id" in result:
456            self.logger.info("Successfully created Team %s" % result["name"])
457        else:
458            self.logger.error("Team not created... (gitea: %s)" % result["message"])
459            self.logger.error(result["message"])
460            raise Exception("Team not created... (gitea: %s)" % result["message"])
461        api_object = Team.parse_response(self, result)
462        setattr(
463            api_object, "_organization", org
464        )  # fixes strange behaviour of gitea not returning a valid organization here.
465        return api_object

Creates a Team.

Args: org (Organization): Organization the Team will be part of. name (str): The Name of the Team to be created. description (str): Optional, None, short description of the new Team. permission (str): Optional, 'read', What permissions the members units_map (dict): Optional, {}, a mapping of units to their permissions. If None or empty, the permission permission will be applied to all units. Note: When both units and units_map are given, units_map will be preferred.

class User(allspice.baseapiobject.ApiObject):
239class User(ApiObject):
240    active: bool
241    admin: Any
242    allow_create_organization: Any
243    allow_git_hook: Any
244    allow_import_local: Any
245    avatar_url: str
246    created: str
247    description: str
248    email: str
249    emails: List[Any]
250    followers_count: int
251    following_count: int
252    full_name: str
253    id: int
254    is_admin: bool
255    language: str
256    last_login: str
257    location: str
258    login: str
259    login_name: str
260    max_repo_creation: Any
261    must_change_password: Any
262    password: Any
263    prohibit_login: bool
264    restricted: bool
265    starred_repos_count: int
266    username: str
267    visibility: str
268    website: str
269
270    API_OBJECT = """/users/{name}"""  # <org>
271    USER_MAIL = """/user/emails?sudo=%s"""  # <name>
272    USER_PATCH = """/admin/users/%s"""  # <username>
273    ADMIN_DELETE_USER = """/admin/users/%s"""  # <username>
274    ADMIN_EDIT_USER = """/admin/users/{username}"""  # <username>
275    USER_HEATMAP = """/users/%s/heatmap"""  # <username>
276
277    def __init__(self, allspice_client):
278        super().__init__(allspice_client)
279        self._emails = []
280
281    def __eq__(self, other):
282        if not isinstance(other, User):
283            return False
284        return self.allspice_client == other.allspice_client and self.id == other.id
285
286    def __hash__(self):
287        return hash(self.allspice_client) ^ hash(self.id)
288
289    @property
290    def emails(self):
291        self.__request_emails()
292        return self._emails
293
294    @classmethod
295    def request(cls, allspice_client, name: str) -> "User":
296        api_object = cls._request(allspice_client, {"name": name})
297        return api_object
298
299    _patchable_fields: ClassVar[set[str]] = {
300        "active",
301        "admin",
302        "allow_create_organization",
303        "allow_git_hook",
304        "allow_import_local",
305        "email",
306        "full_name",
307        "location",
308        "login_name",
309        "max_repo_creation",
310        "must_change_password",
311        "password",
312        "prohibit_login",
313        "website",
314    }
315
316    def commit(self, login_name: str, source_id: int = 0):
317        """
318        Unfortunately it is necessary to require the login name
319        as well as the login source (that is not supplied when getting a user) for
320        changing a user.
321        Usually source_id is 0 and the login_name is equal to the username.
322        """
323        values = self.get_dirty_fields()
324        values.update(
325            # api-doc says that the "source_id" is necessary; works without though
326            {"login_name": login_name, "source_id": source_id}
327        )
328        args = {"username": self.username}
329        self.allspice_client.requests_patch(User.ADMIN_EDIT_USER.format(**args), data=values)
330        self._dirty_fields = {}
331
332    def create_repo(
333        self,
334        repoName: str,
335        description: str = "",
336        private: bool = False,
337        autoInit=True,
338        gitignores: Optional[str] = None,
339        license: Optional[str] = None,
340        readme: str = "Default",
341        issue_labels: Optional[str] = None,
342        default_branch="master",
343    ):
344        """Create a user Repository
345
346        Throws:
347            AlreadyExistsException: If the Repository exists already.
348            Exception: If something else went wrong.
349        """
350        result = self.allspice_client.requests_post(
351            "/user/repos",
352            data={
353                "name": repoName,
354                "description": description,
355                "private": private,
356                "auto_init": autoInit,
357                "gitignores": gitignores,
358                "license": license,
359                "issue_labels": issue_labels,
360                "readme": readme,
361                "default_branch": default_branch,
362            },
363        )
364        if "id" in result:
365            self.allspice_client.logger.info("Successfully created Repository %s " % result["name"])
366        else:
367            self.allspice_client.logger.error(result["message"])
368            raise Exception("Repository not created... (gitea: %s)" % result["message"])
369        return Repository.parse_response(self.allspice_client, result)
370
371    def get_repositories(self) -> List["Repository"]:
372        """Get all Repositories owned by this User."""
373        url = f"/users/{self.username}/repos"
374        results = self.allspice_client.requests_get_paginated(url)
375        return [Repository.parse_response(self.allspice_client, result) for result in results]
376
377    def get_orgs(self) -> List[Organization]:
378        """Get all Organizations this user is a member of."""
379        url = f"/users/{self.username}/orgs"
380        results = self.allspice_client.requests_get_paginated(url)
381        return [Organization.parse_response(self.allspice_client, result) for result in results]
382
383    def get_teams(self) -> List["Team"]:
384        url = "/user/teams"
385        results = self.allspice_client.requests_get_paginated(url, sudo=self)
386        return [Team.parse_response(self.allspice_client, result) for result in results]
387
388    def get_accessible_repos(self) -> List["Repository"]:
389        """Get all Repositories accessible by the logged in User."""
390        results = self.allspice_client.requests_get("/user/repos", sudo=self)
391        return [Repository.parse_response(self.allspice_client, result) for result in results]
392
393    def __request_emails(self):
394        result = self.allspice_client.requests_get(User.USER_MAIL % self.login)
395        # report if the adress changed by this
396        for mail in result:
397            self._emails.append(mail["email"])
398            if mail["primary"]:
399                self._email = mail["email"]
400
401    def delete(self):
402        """Deletes this User. Also deletes all Repositories he owns."""
403        self.allspice_client.requests_delete(User.ADMIN_DELETE_USER % self.username)
404        self.deleted = True
405
406    def get_heatmap(self) -> List[Tuple[datetime, int]]:
407        results = self.allspice_client.requests_get(User.USER_HEATMAP % self.username)
408        results = [
409            (datetime.fromtimestamp(result["timestamp"]), result["contributions"])
410            for result in results
411        ]
412        return results
User(allspice_client)
277    def __init__(self, allspice_client):
278        super().__init__(allspice_client)
279        self._emails = []
active: bool
admin: Any
allow_create_organization: Any
allow_git_hook: Any
allow_import_local: Any
avatar_url: str
created: str
description: str
email: str
emails
289    @property
290    def emails(self):
291        self.__request_emails()
292        return self._emails
followers_count: int
following_count: int
full_name: str
id: int
is_admin: bool
language: str
last_login: str
location: str
login: str
login_name: str
max_repo_creation: Any
must_change_password: Any
password: Any
prohibit_login: bool
restricted: bool
starred_repos_count: int
username: str
visibility: str
website: str
API_OBJECT = '/users/{name}'
USER_MAIL = '/user/emails?sudo=%s'
USER_PATCH = '/admin/users/%s'
ADMIN_DELETE_USER = '/admin/users/%s'
ADMIN_EDIT_USER = '/admin/users/{username}'
USER_HEATMAP = '/users/%s/heatmap'
@classmethod
def request(cls, allspice_client, name: str) -> User:
294    @classmethod
295    def request(cls, allspice_client, name: str) -> "User":
296        api_object = cls._request(allspice_client, {"name": name})
297        return api_object
def commit(self, login_name: str, source_id: int = 0):
316    def commit(self, login_name: str, source_id: int = 0):
317        """
318        Unfortunately it is necessary to require the login name
319        as well as the login source (that is not supplied when getting a user) for
320        changing a user.
321        Usually source_id is 0 and the login_name is equal to the username.
322        """
323        values = self.get_dirty_fields()
324        values.update(
325            # api-doc says that the "source_id" is necessary; works without though
326            {"login_name": login_name, "source_id": source_id}
327        )
328        args = {"username": self.username}
329        self.allspice_client.requests_patch(User.ADMIN_EDIT_USER.format(**args), data=values)
330        self._dirty_fields = {}

Unfortunately it is necessary to require the login name as well as the login source (that is not supplied when getting a user) for changing a user. Usually source_id is 0 and the login_name is equal to the username.

def create_repo( self, repoName: str, description: str = '', private: bool = False, autoInit=True, gitignores: Optional[str] = None, license: Optional[str] = None, readme: str = 'Default', issue_labels: Optional[str] = None, default_branch='master'):
332    def create_repo(
333        self,
334        repoName: str,
335        description: str = "",
336        private: bool = False,
337        autoInit=True,
338        gitignores: Optional[str] = None,
339        license: Optional[str] = None,
340        readme: str = "Default",
341        issue_labels: Optional[str] = None,
342        default_branch="master",
343    ):
344        """Create a user Repository
345
346        Throws:
347            AlreadyExistsException: If the Repository exists already.
348            Exception: If something else went wrong.
349        """
350        result = self.allspice_client.requests_post(
351            "/user/repos",
352            data={
353                "name": repoName,
354                "description": description,
355                "private": private,
356                "auto_init": autoInit,
357                "gitignores": gitignores,
358                "license": license,
359                "issue_labels": issue_labels,
360                "readme": readme,
361                "default_branch": default_branch,
362            },
363        )
364        if "id" in result:
365            self.allspice_client.logger.info("Successfully created Repository %s " % result["name"])
366        else:
367            self.allspice_client.logger.error(result["message"])
368            raise Exception("Repository not created... (gitea: %s)" % result["message"])
369        return Repository.parse_response(self.allspice_client, result)

Create a user Repository

Throws: AlreadyExistsException: If the Repository exists already. Exception: If something else went wrong.

def get_repositories(self) -> List[Repository]:
371    def get_repositories(self) -> List["Repository"]:
372        """Get all Repositories owned by this User."""
373        url = f"/users/{self.username}/repos"
374        results = self.allspice_client.requests_get_paginated(url)
375        return [Repository.parse_response(self.allspice_client, result) for result in results]

Get all Repositories owned by this User.

def get_orgs(self) -> List[Organization]:
377    def get_orgs(self) -> List[Organization]:
378        """Get all Organizations this user is a member of."""
379        url = f"/users/{self.username}/orgs"
380        results = self.allspice_client.requests_get_paginated(url)
381        return [Organization.parse_response(self.allspice_client, result) for result in results]

Get all Organizations this user is a member of.

def get_teams(self) -> List[Team]:
383    def get_teams(self) -> List["Team"]:
384        url = "/user/teams"
385        results = self.allspice_client.requests_get_paginated(url, sudo=self)
386        return [Team.parse_response(self.allspice_client, result) for result in results]
def get_accessible_repos(self) -> List[Repository]:
388    def get_accessible_repos(self) -> List["Repository"]:
389        """Get all Repositories accessible by the logged in User."""
390        results = self.allspice_client.requests_get("/user/repos", sudo=self)
391        return [Repository.parse_response(self.allspice_client, result) for result in results]

Get all Repositories accessible by the logged in User.

def delete(self):
401    def delete(self):
402        """Deletes this User. Also deletes all Repositories he owns."""
403        self.allspice_client.requests_delete(User.ADMIN_DELETE_USER % self.username)
404        self.deleted = True

Deletes this User. Also deletes all Repositories he owns.

def get_heatmap(self) -> List[Tuple[datetime.datetime, int]]:
406    def get_heatmap(self) -> List[Tuple[datetime, int]]:
407        results = self.allspice_client.requests_get(User.USER_HEATMAP % self.username)
408        results = [
409            (datetime.fromtimestamp(result["timestamp"]), result["contributions"])
410            for result in results
411        ]
412        return results
class Organization(allspice.baseapiobject.ApiObject):
 33class Organization(ApiObject):
 34    """see https://hub.allspice.io/api/swagger#/organization/orgGetAll"""
 35
 36    active: Optional[bool]
 37    avatar_url: str
 38    created: Optional[str]
 39    description: str
 40    email: str
 41    followers_count: Optional[int]
 42    following_count: Optional[int]
 43    full_name: str
 44    id: int
 45    is_admin: Optional[bool]
 46    language: Optional[str]
 47    last_login: Optional[str]
 48    location: str
 49    login: Optional[str]
 50    login_name: Optional[str]
 51    name: Optional[str]
 52    prohibit_login: Optional[bool]
 53    repo_admin_change_team_access: Optional[bool]
 54    restricted: Optional[bool]
 55    starred_repos_count: Optional[int]
 56    username: str
 57    visibility: str
 58    website: str
 59
 60    API_OBJECT = """/orgs/{name}"""  # <org>
 61    ORG_REPOS_REQUEST = """/orgs/%s/repos"""  # <org>
 62    ORG_TEAMS_REQUEST = """/orgs/%s/teams"""  # <org>
 63    ORG_TEAMS_CREATE = """/orgs/%s/teams"""  # <org>
 64    ORG_GET_MEMBERS = """/orgs/%s/members"""  # <org>
 65    ORG_IS_MEMBER = """/orgs/%s/members/%s"""  # <org>, <username>
 66    ORG_HEATMAP = """/users/%s/heatmap"""  # <username>
 67
 68    def __init__(self, allspice_client):
 69        super().__init__(allspice_client)
 70
 71    def __eq__(self, other):
 72        if not isinstance(other, Organization):
 73            return False
 74        return self.allspice_client == other.allspice_client and self.name == other.name
 75
 76    def __hash__(self):
 77        return hash(self.allspice_client) ^ hash(self.name)
 78
 79    @classmethod
 80    def request(cls, allspice_client, name: str) -> Self:
 81        return cls._request(allspice_client, {"name": name})
 82
 83    @classmethod
 84    def parse_response(cls, allspice_client, result) -> "Organization":
 85        api_object = super().parse_response(allspice_client, result)
 86        # add "name" field to make this behave similar to users for gitea < 1.18
 87        # also necessary for repository-owner when org is repo owner
 88        if not hasattr(api_object, "name"):
 89            Organization._add_read_property("name", result["username"], api_object)
 90        return api_object
 91
 92    _patchable_fields: ClassVar[set[str]] = {
 93        "description",
 94        "full_name",
 95        "location",
 96        "visibility",
 97        "website",
 98    }
 99
100    def commit(self):
101        args = {"name": self.name}
102        self._commit(args)
103
104    def create_repo(
105        self,
106        repoName: str,
107        description: str = "",
108        private: bool = False,
109        autoInit=True,
110        gitignores: Optional[str] = None,
111        license: Optional[str] = None,
112        readme: str = "Default",
113        issue_labels: Optional[str] = None,
114        default_branch="master",
115    ):
116        """Create an organization Repository
117
118        Throws:
119            AlreadyExistsException: If the Repository exists already.
120            Exception: If something else went wrong.
121        """
122        result = self.allspice_client.requests_post(
123            f"/orgs/{self.name}/repos",
124            data={
125                "name": repoName,
126                "description": description,
127                "private": private,
128                "auto_init": autoInit,
129                "gitignores": gitignores,
130                "license": license,
131                "issue_labels": issue_labels,
132                "readme": readme,
133                "default_branch": default_branch,
134            },
135        )
136        if "id" in result:
137            self.allspice_client.logger.info("Successfully created Repository %s " % result["name"])
138        else:
139            self.allspice_client.logger.error(result["message"])
140            raise Exception("Repository not created... (gitea: %s)" % result["message"])
141        return Repository.parse_response(self.allspice_client, result)
142
143    def get_repositories(self) -> List["Repository"]:
144        results = self.allspice_client.requests_get_paginated(
145            Organization.ORG_REPOS_REQUEST % self.username
146        )
147        return [Repository.parse_response(self.allspice_client, result) for result in results]
148
149    def get_repository(self, name) -> "Repository":
150        repos = self.get_repositories()
151        for repo in repos:
152            if repo.name == name:
153                return repo
154        raise NotFoundException("Repository %s not existent in organization." % name)
155
156    def get_teams(self) -> List["Team"]:
157        results = self.allspice_client.requests_get(Organization.ORG_TEAMS_REQUEST % self.username)
158        teams = [Team.parse_response(self.allspice_client, result) for result in results]
159        # organisation seems to be missing using this request, so we add org manually
160        for t in teams:
161            setattr(t, "_organization", self)
162        return teams
163
164    def get_team(self, name) -> "Team":
165        teams = self.get_teams()
166        for team in teams:
167            if team.name == name:
168                return team
169        raise NotFoundException("Team not existent in organization.")
170
171    def create_team(
172        self,
173        name: str,
174        description: str = "",
175        permission: str = "read",
176        can_create_org_repo: bool = False,
177        includes_all_repositories: bool = False,
178        units=(
179            "repo.code",
180            "repo.issues",
181            "repo.ext_issues",
182            "repo.wiki",
183            "repo.pulls",
184            "repo.releases",
185            "repo.ext_wiki",
186        ),
187        units_map={},
188    ) -> "Team":
189        """Alias for AllSpice#create_team"""
190        # TODO: Move AllSpice#create_team to Organization#create_team and
191        #       deprecate AllSpice#create_team.
192        return self.allspice_client.create_team(
193            org=self,
194            name=name,
195            description=description,
196            permission=permission,
197            can_create_org_repo=can_create_org_repo,
198            includes_all_repositories=includes_all_repositories,
199            units=units,
200            units_map=units_map,
201        )
202
203    def get_members(self) -> List["User"]:
204        results = self.allspice_client.requests_get(Organization.ORG_GET_MEMBERS % self.username)
205        return [User.parse_response(self.allspice_client, result) for result in results]
206
207    def is_member(self, username) -> bool:
208        if isinstance(username, User):
209            username = username.username
210        try:
211            # returns 204 if its ok, 404 if its not
212            self.allspice_client.requests_get(
213                Organization.ORG_IS_MEMBER % (self.username, username)
214            )
215            return True
216        except Exception:
217            return False
218
219    def remove_member(self, user: "User"):
220        path = f"/orgs/{self.username}/members/{user.username}"
221        self.allspice_client.requests_delete(path)
222
223    def delete(self):
224        """Delete this Organization. Invalidates this Objects data. Also deletes all Repositories owned by the User"""
225        for repo in self.get_repositories():
226            repo.delete()
227        self.allspice_client.requests_delete(Organization.API_OBJECT.format(name=self.username))
228        self.deleted = True
229
230    def get_heatmap(self) -> List[Tuple[datetime, int]]:
231        results = self.allspice_client.requests_get(User.USER_HEATMAP % self.username)
232        results = [
233            (datetime.fromtimestamp(result["timestamp"]), result["contributions"])
234            for result in results
235        ]
236        return results

see allspice.allspice.io/api/swagger#/organization/orgGetAll">https://huballspice.allspice.io/api/swagger#/organization/orgGetAll

Organization(allspice_client)
68    def __init__(self, allspice_client):
69        super().__init__(allspice_client)
active: Optional[bool]
avatar_url: str
created: Optional[str]
description: str
email: str
followers_count: Optional[int]
following_count: Optional[int]
full_name: str
id: int
is_admin: Optional[bool]
language: Optional[str]
last_login: Optional[str]
location: str
login: Optional[str]
login_name: Optional[str]
name: Optional[str]
prohibit_login: Optional[bool]
repo_admin_change_team_access: Optional[bool]
restricted: Optional[bool]
starred_repos_count: Optional[int]
username: str
visibility: str
website: str
API_OBJECT = '/orgs/{name}'
ORG_REPOS_REQUEST = '/orgs/%s/repos'
ORG_TEAMS_REQUEST = '/orgs/%s/teams'
ORG_TEAMS_CREATE = '/orgs/%s/teams'
ORG_GET_MEMBERS = '/orgs/%s/members'
ORG_IS_MEMBER = '/orgs/%s/members/%s'
ORG_HEATMAP = '/users/%s/heatmap'
@classmethod
def request(cls, allspice_client, name: str) -> Self:
79    @classmethod
80    def request(cls, allspice_client, name: str) -> Self:
81        return cls._request(allspice_client, {"name": name})
@classmethod
def parse_response(cls, allspice_client, result) -> Organization:
83    @classmethod
84    def parse_response(cls, allspice_client, result) -> "Organization":
85        api_object = super().parse_response(allspice_client, result)
86        # add "name" field to make this behave similar to users for gitea < 1.18
87        # also necessary for repository-owner when org is repo owner
88        if not hasattr(api_object, "name"):
89            Organization._add_read_property("name", result["username"], api_object)
90        return api_object
def commit(self):
100    def commit(self):
101        args = {"name": self.name}
102        self._commit(args)
def create_repo( self, repoName: str, description: str = '', private: bool = False, autoInit=True, gitignores: Optional[str] = None, license: Optional[str] = None, readme: str = 'Default', issue_labels: Optional[str] = None, default_branch='master'):
104    def create_repo(
105        self,
106        repoName: str,
107        description: str = "",
108        private: bool = False,
109        autoInit=True,
110        gitignores: Optional[str] = None,
111        license: Optional[str] = None,
112        readme: str = "Default",
113        issue_labels: Optional[str] = None,
114        default_branch="master",
115    ):
116        """Create an organization Repository
117
118        Throws:
119            AlreadyExistsException: If the Repository exists already.
120            Exception: If something else went wrong.
121        """
122        result = self.allspice_client.requests_post(
123            f"/orgs/{self.name}/repos",
124            data={
125                "name": repoName,
126                "description": description,
127                "private": private,
128                "auto_init": autoInit,
129                "gitignores": gitignores,
130                "license": license,
131                "issue_labels": issue_labels,
132                "readme": readme,
133                "default_branch": default_branch,
134            },
135        )
136        if "id" in result:
137            self.allspice_client.logger.info("Successfully created Repository %s " % result["name"])
138        else:
139            self.allspice_client.logger.error(result["message"])
140            raise Exception("Repository not created... (gitea: %s)" % result["message"])
141        return Repository.parse_response(self.allspice_client, result)

Create an organization Repository

Throws: AlreadyExistsException: If the Repository exists already. Exception: If something else went wrong.

def get_repositories(self) -> List[Repository]:
143    def get_repositories(self) -> List["Repository"]:
144        results = self.allspice_client.requests_get_paginated(
145            Organization.ORG_REPOS_REQUEST % self.username
146        )
147        return [Repository.parse_response(self.allspice_client, result) for result in results]
def get_repository(self, name) -> Repository:
149    def get_repository(self, name) -> "Repository":
150        repos = self.get_repositories()
151        for repo in repos:
152            if repo.name == name:
153                return repo
154        raise NotFoundException("Repository %s not existent in organization." % name)
def get_teams(self) -> List[Team]:
156    def get_teams(self) -> List["Team"]:
157        results = self.allspice_client.requests_get(Organization.ORG_TEAMS_REQUEST % self.username)
158        teams = [Team.parse_response(self.allspice_client, result) for result in results]
159        # organisation seems to be missing using this request, so we add org manually
160        for t in teams:
161            setattr(t, "_organization", self)
162        return teams
def get_team(self, name) -> Team:
164    def get_team(self, name) -> "Team":
165        teams = self.get_teams()
166        for team in teams:
167            if team.name == name:
168                return team
169        raise NotFoundException("Team not existent in organization.")
def create_team( self, name: str, description: str = '', permission: str = 'read', can_create_org_repo: bool = False, includes_all_repositories: bool = False, units=('repo.code', 'repo.issues', 'repo.ext_issues', 'repo.wiki', 'repo.pulls', 'repo.releases', 'repo.ext_wiki'), units_map={}) -> Team:
171    def create_team(
172        self,
173        name: str,
174        description: str = "",
175        permission: str = "read",
176        can_create_org_repo: bool = False,
177        includes_all_repositories: bool = False,
178        units=(
179            "repo.code",
180            "repo.issues",
181            "repo.ext_issues",
182            "repo.wiki",
183            "repo.pulls",
184            "repo.releases",
185            "repo.ext_wiki",
186        ),
187        units_map={},
188    ) -> "Team":
189        """Alias for AllSpice#create_team"""
190        # TODO: Move AllSpice#create_team to Organization#create_team and
191        #       deprecate AllSpice#create_team.
192        return self.allspice_client.create_team(
193            org=self,
194            name=name,
195            description=description,
196            permission=permission,
197            can_create_org_repo=can_create_org_repo,
198            includes_all_repositories=includes_all_repositories,
199            units=units,
200            units_map=units_map,
201        )

Alias for AllSpice#create_team

def get_members(self) -> List[User]:
203    def get_members(self) -> List["User"]:
204        results = self.allspice_client.requests_get(Organization.ORG_GET_MEMBERS % self.username)
205        return [User.parse_response(self.allspice_client, result) for result in results]
def is_member(self, username) -> bool:
207    def is_member(self, username) -> bool:
208        if isinstance(username, User):
209            username = username.username
210        try:
211            # returns 204 if its ok, 404 if its not
212            self.allspice_client.requests_get(
213                Organization.ORG_IS_MEMBER % (self.username, username)
214            )
215            return True
216        except Exception:
217            return False
def remove_member(self, user: User):
219    def remove_member(self, user: "User"):
220        path = f"/orgs/{self.username}/members/{user.username}"
221        self.allspice_client.requests_delete(path)
def delete(self):
223    def delete(self):
224        """Delete this Organization. Invalidates this Objects data. Also deletes all Repositories owned by the User"""
225        for repo in self.get_repositories():
226            repo.delete()
227        self.allspice_client.requests_delete(Organization.API_OBJECT.format(name=self.username))
228        self.deleted = True

Delete this Organization. Invalidates this Objects data. Also deletes all Repositories owned by the User

def get_heatmap(self) -> List[Tuple[datetime.datetime, int]]:
230    def get_heatmap(self) -> List[Tuple[datetime, int]]:
231        results = self.allspice_client.requests_get(User.USER_HEATMAP % self.username)
232        results = [
233            (datetime.fromtimestamp(result["timestamp"]), result["contributions"])
234            for result in results
235        ]
236        return results
class Team(allspice.baseapiobject.ApiObject):
2206class Team(ApiObject):
2207    can_create_org_repo: bool
2208    description: str
2209    id: int
2210    includes_all_repositories: bool
2211    name: str
2212    organization: Optional["Organization"]
2213    permission: str
2214    units: List[str]
2215    units_map: Dict[str, str]
2216
2217    API_OBJECT = """/teams/{id}"""  # <id>
2218    ADD_REPO = """/teams/%s/repos/%s/%s"""  # <id, org, repo>
2219    TEAM_DELETE = """/teams/%s"""  # <id>
2220    GET_MEMBERS = """/teams/%s/members"""  # <id>
2221    GET_REPOS = """/teams/%s/repos"""  # <id>
2222
2223    def __init__(self, allspice_client):
2224        super().__init__(allspice_client)
2225
2226    def __eq__(self, other):
2227        if not isinstance(other, Team):
2228            return False
2229        return self.organization == other.organization and self.id == other.id
2230
2231    def __hash__(self):
2232        return hash(self.organization) ^ hash(self.id)
2233
2234    _fields_to_parsers: ClassVar[dict] = {
2235        "organization": lambda allspice_client, o: Organization.parse_response(allspice_client, o)
2236    }
2237
2238    _patchable_fields: ClassVar[set[str]] = {
2239        "can_create_org_repo",
2240        "description",
2241        "includes_all_repositories",
2242        "name",
2243        "permission",
2244        "units",
2245        "units_map",
2246    }
2247
2248    @classmethod
2249    def request(cls, allspice_client, id: int):
2250        return cls._request(allspice_client, {"id": id})
2251
2252    def commit(self):
2253        args = {"id": self.id}
2254        self._commit(args)
2255
2256    def add_user(self, user: User):
2257        """https://hub.allspice.io/api/swagger#/organization/orgAddTeamMember"""
2258        url = f"/teams/{self.id}/members/{user.login}"
2259        self.allspice_client.requests_put(url)
2260
2261    def add_repo(self, org: Organization, repo: Union[Repository, str]):
2262        if isinstance(repo, Repository):
2263            repo_name = repo.name
2264        else:
2265            repo_name = repo
2266        self.allspice_client.requests_put(Team.ADD_REPO % (self.id, org.username, repo_name))
2267
2268    def get_members(self):
2269        """Get all users assigned to the team."""
2270        results = self.allspice_client.requests_get_paginated(
2271            Team.GET_MEMBERS % self.id,
2272        )
2273        return [User.parse_response(self.allspice_client, result) for result in results]
2274
2275    def get_repos(self):
2276        """Get all repos of this Team."""
2277        results = self.allspice_client.requests_get(Team.GET_REPOS % self.id)
2278        return [Repository.parse_response(self.allspice_client, result) for result in results]
2279
2280    def delete(self):
2281        self.allspice_client.requests_delete(Team.TEAM_DELETE % self.id)
2282        self.deleted = True
2283
2284    def remove_team_member(self, user_name: str):
2285        url = f"/teams/{self.id}/members/{user_name}"
2286        self.allspice_client.requests_delete(url)
Team(allspice_client)
2223    def __init__(self, allspice_client):
2224        super().__init__(allspice_client)
can_create_org_repo: bool
description: str
id: int
includes_all_repositories: bool
name: str
organization: Optional[Organization]
permission: str
units: List[str]
units_map: Dict[str, str]
API_OBJECT = '/teams/{id}'
ADD_REPO = '/teams/%s/repos/%s/%s'
TEAM_DELETE = '/teams/%s'
GET_MEMBERS = '/teams/%s/members'
GET_REPOS = '/teams/%s/repos'
@classmethod
def request(cls, allspice_client, id: int):
2248    @classmethod
2249    def request(cls, allspice_client, id: int):
2250        return cls._request(allspice_client, {"id": id})
def commit(self):
2252    def commit(self):
2253        args = {"id": self.id}
2254        self._commit(args)
def add_user(self, user: User):
2256    def add_user(self, user: User):
2257        """https://hub.allspice.io/api/swagger#/organization/orgAddTeamMember"""
2258        url = f"/teams/{self.id}/members/{user.login}"
2259        self.allspice_client.requests_put(url)

allspice.allspice.io/api/swagger#/organization/orgAddTeamMember">https://huballspice.allspice.io/api/swagger#/organization/orgAddTeamMember

def add_repo( self, org: Organization, repo: Union[Repository, str]):
2261    def add_repo(self, org: Organization, repo: Union[Repository, str]):
2262        if isinstance(repo, Repository):
2263            repo_name = repo.name
2264        else:
2265            repo_name = repo
2266        self.allspice_client.requests_put(Team.ADD_REPO % (self.id, org.username, repo_name))
def get_members(self):
2268    def get_members(self):
2269        """Get all users assigned to the team."""
2270        results = self.allspice_client.requests_get_paginated(
2271            Team.GET_MEMBERS % self.id,
2272        )
2273        return [User.parse_response(self.allspice_client, result) for result in results]

Get all users assigned to the team.

def get_repos(self):
2275    def get_repos(self):
2276        """Get all repos of this Team."""
2277        results = self.allspice_client.requests_get(Team.GET_REPOS % self.id)
2278        return [Repository.parse_response(self.allspice_client, result) for result in results]

Get all repos of this Team.

def delete(self):
2280    def delete(self):
2281        self.allspice_client.requests_delete(Team.TEAM_DELETE % self.id)
2282        self.deleted = True
def remove_team_member(self, user_name: str):
2284    def remove_team_member(self, user_name: str):
2285        url = f"/teams/{self.id}/members/{user_name}"
2286        self.allspice_client.requests_delete(url)
class Repository(allspice.baseapiobject.ApiObject):
 473class Repository(ApiObject):
 474    allow_manual_merge: Any
 475    allow_merge_commits: bool
 476    allow_rebase: bool
 477    allow_rebase_explicit: bool
 478    allow_rebase_update: bool
 479    allow_squash_merge: bool
 480    archived: bool
 481    archived_at: str
 482    autodetect_manual_merge: Any
 483    avatar_url: str
 484    clone_url: str
 485    created_at: str
 486    default_allow_maintainer_edit: bool
 487    default_branch: str
 488    default_delete_branch_after_merge: bool
 489    default_merge_style: str
 490    description: str
 491    empty: bool
 492    enable_prune: Any
 493    external_tracker: Any
 494    external_wiki: Any
 495    fork: bool
 496    forks_count: int
 497    full_name: str
 498    has_actions: bool
 499    has_issues: bool
 500    has_packages: bool
 501    has_projects: bool
 502    has_pull_requests: bool
 503    has_releases: bool
 504    has_wiki: bool
 505    html_url: str
 506    id: int
 507    ignore_whitespace_conflicts: bool
 508    internal: bool
 509    internal_tracker: Dict[str, bool]
 510    language: str
 511    languages_url: str
 512    link: str
 513    mirror: bool
 514    mirror_interval: str
 515    mirror_updated: str
 516    name: str
 517    open_issues_count: int
 518    open_pr_counter: int
 519    original_url: str
 520    owner: Union["User", "Organization"]
 521    parent: Any
 522    permissions: Dict[str, bool]
 523    private: bool
 524    release_counter: int
 525    repo_transfer: Any
 526    size: int
 527    ssh_url: str
 528    stars_count: int
 529    template: bool
 530    updated_at: datetime
 531    url: str
 532    watchers_count: int
 533    website: str
 534
 535    API_OBJECT = """/repos/{owner}/{name}"""  # <owner>, <reponame>
 536    REPO_IS_COLLABORATOR = """/repos/%s/%s/collaborators/%s"""  # <owner>, <reponame>, <username>
 537    REPO_SEARCH = """/repos/search/"""
 538    REPO_BRANCHES = """/repos/%s/%s/branches"""  # <owner>, <reponame>
 539    REPO_BRANCH = """/repos/{owner}/{repo}/branches/{branch}"""
 540    REPO_ISSUES = """/repos/{owner}/{repo}/issues"""  # <owner, reponame>
 541    REPO_DESIGN_REVIEWS = """/repos/{owner}/{repo}/pulls"""
 542    REPO_DELETE = """/repos/%s/%s"""  # <owner>, <reponame>
 543    REPO_TIMES = """/repos/%s/%s/times"""  # <owner>, <reponame>
 544    REPO_USER_TIME = """/repos/%s/%s/times/%s"""  # <owner>, <reponame>, <username>
 545    REPO_COMMITS = "/repos/%s/%s/commits"  # <owner>, <reponame>
 546    REPO_TRANSFER = "/repos/{owner}/{repo}/transfer"
 547    REPO_MILESTONES = """/repos/{owner}/{repo}/milestones"""
 548    REPO_GET_ARCHIVE = "/repos/{owner}/{repo}/archive/{ref}.{format}"
 549    REPO_GET_ALLSPICE_JSON = "/repos/{owner}/{repo}/allspice_generated/json/{content}"
 550    REPO_GET_ALLSPICE_SVG = "/repos/{owner}/{repo}/allspice_generated/svg/{content}"
 551    REPO_GET_TOPICS = "/repos/{owner}/{repo}/topics"
 552    REPO_ADD_TOPIC = "/repos/{owner}/{repo}/topics/{topic}"
 553    REPO_GET_RELEASES = "/repos/{owner}/{repo}/releases"
 554    REPO_GET_LATEST_RELEASE = "/repos/{owner}/{repo}/releases/latest"
 555    REPO_GET_RELEASE_BY_TAG = "/repos/{owner}/{repo}/releases/tags/{tag}"
 556    REPO_GET_COMMIT_STATUS = "/repos/{owner}/{repo}/statuses/{sha}"
 557    REPO_GET_RAW_FILE = "/repos/{owner}/{repo}/raw/{path}"
 558    REPO_GET_TREE = "/repos/{owner}/{repo}/git/trees/{ref}"
 559
 560    class ArchiveFormat(Enum):
 561        """
 562        Archive formats for Repository.get_archive
 563        """
 564
 565        TAR = "tar.gz"
 566        ZIP = "zip"
 567
 568    class CommitStatusSort(Enum):
 569        """
 570        Sort order for Repository.get_commit_status
 571        """
 572
 573        OLDEST = "oldest"
 574        RECENT_UPDATE = "recentupdate"
 575        LEAST_UPDATE = "leastupdate"
 576        LEAST_INDEX = "leastindex"
 577        HIGHEST_INDEX = "highestindex"
 578
 579    def __init__(self, allspice_client):
 580        super().__init__(allspice_client)
 581
 582    def __eq__(self, other):
 583        if not isinstance(other, Repository):
 584            return False
 585        return self.owner == other.owner and self.name == other.name
 586
 587    def __hash__(self):
 588        return hash(self.owner) ^ hash(self.name)
 589
 590    _fields_to_parsers: ClassVar[dict] = {
 591        # dont know how to tell apart user and org as owner except form email being empty.
 592        "owner": lambda allspice_client, r: (
 593            Organization.parse_response(allspice_client, r)
 594            if r["email"] == ""
 595            else User.parse_response(allspice_client, r)
 596        ),
 597        "updated_at": lambda _, t: Util.convert_time(t),
 598    }
 599
 600    @classmethod
 601    def request(
 602        cls,
 603        allspice_client,
 604        owner: str,
 605        name: str,
 606    ) -> Repository:
 607        return cls._request(allspice_client, {"owner": owner, "name": name})
 608
 609    @classmethod
 610    def search(
 611        cls,
 612        allspice_client,
 613        query: Optional[str] = None,
 614        topic: bool = False,
 615        include_description: bool = False,
 616        user: Optional[User] = None,
 617        owner_to_prioritize: Union[User, Organization, None] = None,
 618    ) -> list[Repository]:
 619        """
 620        Search for repositories.
 621
 622        See https://hub.allspice.io/api/swagger#/repository/repoSearch
 623
 624        :param query: The query string to search for
 625        :param topic: If true, the query string will only be matched against the
 626            repository's topic.
 627        :param include_description: If true, the query string will be matched
 628            against the repository's description as well.
 629        :param user: If specified, only repositories that this user owns or
 630            contributes to will be searched.
 631        :param owner_to_prioritize: If specified, repositories owned by the
 632            given entity will be prioritized in the search.
 633        :returns: All repositories matching the query. If there are many
 634            repositories matching this query, this may take some time.
 635        """
 636
 637        params = {}
 638
 639        if query is not None:
 640            params["q"] = query
 641        if topic:
 642            params["topic"] = topic
 643        if include_description:
 644            params["include_description"] = include_description
 645        if user is not None:
 646            params["user"] = user.id
 647        if owner_to_prioritize is not None:
 648            params["owner_to_prioritize"] = owner_to_prioritize.id
 649
 650        responses = allspice_client.requests_get_paginated(cls.REPO_SEARCH, params=params)
 651
 652        return [Repository.parse_response(allspice_client, response) for response in responses]
 653
 654    _patchable_fields: ClassVar[set[str]] = {
 655        "allow_manual_merge",
 656        "allow_merge_commits",
 657        "allow_rebase",
 658        "allow_rebase_explicit",
 659        "allow_rebase_update",
 660        "allow_squash_merge",
 661        "archived",
 662        "autodetect_manual_merge",
 663        "default_branch",
 664        "default_delete_branch_after_merge",
 665        "default_merge_style",
 666        "description",
 667        "enable_prune",
 668        "external_tracker",
 669        "external_wiki",
 670        "has_issues",
 671        "has_projects",
 672        "has_pull_requests",
 673        "has_wiki",
 674        "ignore_whitespace_conflicts",
 675        "internal_tracker",
 676        "mirror_interval",
 677        "name",
 678        "private",
 679        "template",
 680        "website",
 681    }
 682
 683    def commit(self):
 684        args = {"owner": self.owner.username, "name": self.name}
 685        self._commit(args)
 686
 687    def get_branches(self) -> List["Branch"]:
 688        """Get all the Branches of this Repository."""
 689
 690        results = self.allspice_client.requests_get_paginated(
 691            Repository.REPO_BRANCHES % (self.owner.username, self.name)
 692        )
 693        return [Branch.parse_response(self.allspice_client, result) for result in results]
 694
 695    def get_branch(self, name: str) -> "Branch":
 696        """Get a specific Branch of this Repository."""
 697        result = self.allspice_client.requests_get(
 698            Repository.REPO_BRANCH.format(owner=self.owner.username, repo=self.name, branch=name)
 699        )
 700        return Branch.parse_response(self.allspice_client, result)
 701
 702    def add_branch(self, create_from: Ref, newname: str) -> "Branch":
 703        """Add a branch to the repository"""
 704        # Note: will only work with gitea 1.13 or higher!
 705
 706        ref_name = Util.data_params_for_ref(create_from)
 707        if "ref" not in ref_name:
 708            raise ValueError("create_from must be a Branch, Commit or string")
 709        ref_name = ref_name["ref"]
 710
 711        data = {"new_branch_name": newname, "old_ref_name": ref_name}
 712        result = self.allspice_client.requests_post(
 713            Repository.REPO_BRANCHES % (self.owner.username, self.name), data=data
 714        )
 715        return Branch.parse_response(self.allspice_client, result)
 716
 717    def get_issues(
 718        self,
 719        state: Literal["open", "closed", "all"] = "all",
 720        search_query: Optional[str] = None,
 721        labels: Optional[List[str]] = None,
 722        milestones: Optional[List[Union[Milestone, str]]] = None,
 723        assignee: Optional[Union[User, str]] = None,
 724        since: Optional[datetime] = None,
 725        before: Optional[datetime] = None,
 726    ) -> List["Issue"]:
 727        """
 728        Get all Issues of this Repository (open and closed)
 729
 730        https://hub.allspice.io/api/swagger#/repository/repoListIssues
 731
 732        All params of this method are optional filters. If you don't specify a filter, it
 733        will not be applied.
 734
 735        :param state: The state of the Issues to get. If None, all Issues are returned.
 736        :param search_query: Filter issues by text. This is equivalent to searching for
 737                             `search_query` in the Issues on the web interface.
 738        :param labels: Filter issues by labels.
 739        :param milestones: Filter issues by milestones.
 740        :param assignee: Filter issues by the assigned user.
 741        :param since: Filter issues by the date they were created.
 742        :param before: Filter issues by the date they were created.
 743        :return: A list of Issues.
 744        """
 745
 746        data = {
 747            "state": state,
 748        }
 749        if search_query:
 750            data["q"] = search_query
 751        if labels:
 752            data["labels"] = ",".join(labels)
 753        if milestones:
 754            data["milestone"] = ",".join(
 755                [
 756                    milestone.name if isinstance(milestone, Milestone) else milestone
 757                    for milestone in milestones
 758                ]
 759            )
 760        if assignee:
 761            if isinstance(assignee, User):
 762                data["assignee"] = assignee.username
 763            else:
 764                data["assignee"] = assignee
 765        if since:
 766            data["since"] = Util.format_time(since)
 767        if before:
 768            data["before"] = Util.format_time(before)
 769
 770        results = self.allspice_client.requests_get_paginated(
 771            Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name),
 772            params=data,
 773        )
 774
 775        issues = []
 776        for result in results:
 777            issue = Issue.parse_response(self.allspice_client, result)
 778            # See Issue.request
 779            setattr(issue, "_repository", self)
 780            # This is mostly for compatibility with an older implementation
 781            Issue._add_read_property("repo", self, issue)
 782            issues.append(issue)
 783
 784        return issues
 785
 786    def get_design_reviews(
 787        self,
 788        state: Literal["open", "closed", "all"] = "all",
 789        milestone: Optional[Union[Milestone, str]] = None,
 790        labels: Optional[List[str]] = None,
 791    ) -> List["DesignReview"]:
 792        """
 793        Get all Design Reviews of this Repository.
 794
 795        https://hub.allspice.io/api/swagger#/repository/repoListPullRequests
 796
 797        :param state: The state of the Design Reviews to get. If None, all Design Reviews
 798                      are returned.
 799        :param milestone: The milestone of the Design Reviews to get.
 800        :param labels: A list of label IDs to filter DRs by.
 801        :return: A list of Design Reviews.
 802        """
 803
 804        params = {
 805            "state": state,
 806        }
 807        if milestone:
 808            if isinstance(milestone, Milestone):
 809                params["milestone"] = milestone.name
 810            else:
 811                params["milestone"] = milestone
 812        if labels:
 813            params["labels"] = ",".join(labels)
 814
 815        results = self.allspice_client.requests_get_paginated(
 816            self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name),
 817            params=params,
 818        )
 819        return [DesignReview.parse_response(self.allspice_client, result) for result in results]
 820
 821    def get_commits(
 822        self,
 823        sha: Optional[str] = None,
 824        path: Optional[str] = None,
 825        stat: bool = True,
 826    ) -> List["Commit"]:
 827        """
 828        Get all the Commits of this Repository.
 829
 830        https://hub.allspice.io/api/swagger#/repository/repoGetAllCommits
 831
 832        :param sha: The SHA of the commit to start listing commits from.
 833        :param path: filepath of a file/dir.
 834        :param stat: Include the number of additions and deletions in the response.
 835                     Disable for speedup.
 836        :return: A list of Commits.
 837        """
 838
 839        data = {}
 840        if sha:
 841            data["sha"] = sha
 842        if path:
 843            data["path"] = path
 844        if not stat:
 845            data["stat"] = False
 846
 847        try:
 848            results = self.allspice_client.requests_get_paginated(
 849                Repository.REPO_COMMITS % (self.owner.username, self.name),
 850                params=data,
 851            )
 852        except ConflictException as err:
 853            logging.warning(err)
 854            logging.warning("Repository %s/%s is Empty" % (self.owner.username, self.name))
 855            results = []
 856        return [Commit.parse_response(self.allspice_client, result) for result in results]
 857
 858    def get_issues_state(self, state) -> List["Issue"]:
 859        """
 860        DEPRECATED: Use get_issues() instead.
 861
 862        Get issues of state Issue.open or Issue.closed of a repository.
 863        """
 864
 865        assert state in [Issue.OPENED, Issue.CLOSED]
 866        issues = []
 867        data = {"state": state}
 868        results = self.allspice_client.requests_get_paginated(
 869            Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name),
 870            params=data,
 871        )
 872        for result in results:
 873            issue = Issue.parse_response(self.allspice_client, result)
 874            # adding data not contained in the issue response
 875            # See Issue.request()
 876            setattr(issue, "_repository", self)
 877            Issue._add_read_property("repo", self, issue)
 878            Issue._add_read_property("owner", self.owner, issue)
 879            issues.append(issue)
 880        return issues
 881
 882    def get_times(self):
 883        results = self.allspice_client.requests_get(
 884            Repository.REPO_TIMES % (self.owner.username, self.name)
 885        )
 886        return results
 887
 888    def get_user_time(self, username) -> float:
 889        if isinstance(username, User):
 890            username = username.username
 891        results = self.allspice_client.requests_get(
 892            Repository.REPO_USER_TIME % (self.owner.username, self.name, username)
 893        )
 894        time = sum(r["time"] for r in results)
 895        return time
 896
 897    def get_full_name(self) -> str:
 898        return self.owner.username + "/" + self.name
 899
 900    def create_issue(self, title, assignees=frozenset(), description="") -> ApiObject:
 901        data = {
 902            "assignees": assignees,
 903            "body": description,
 904            "closed": False,
 905            "title": title,
 906        }
 907        result = self.allspice_client.requests_post(
 908            Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name),
 909            data=data,
 910        )
 911
 912        issue = Issue.parse_response(self.allspice_client, result)
 913        setattr(issue, "_repository", self)
 914        Issue._add_read_property("repo", self, issue)
 915        return issue
 916
 917    def create_design_review(
 918        self,
 919        title: str,
 920        head: Union[Branch, str],
 921        base: Union[Branch, str],
 922        assignees: Optional[Set[Union[User, str]]] = None,
 923        body: Optional[str] = None,
 924        due_date: Optional[datetime] = None,
 925        milestone: Optional["Milestone"] = None,
 926    ) -> "DesignReview":
 927        """
 928        Create a new Design Review.
 929
 930        See https://hub.allspice.io/api/swagger#/repository/repoCreatePullRequest
 931
 932        :param title: Title of the Design Review
 933        :param head: Branch or name of the branch to merge into the base branch
 934        :param base: Branch or name of the branch to merge into
 935        :param assignees: Optional. A list of users to assign this review. List can be of
 936                          User objects or of usernames.
 937        :param body: An Optional Description for the Design Review.
 938        :param due_date: An Optional Due date for the Design Review.
 939        :param milestone: An Optional Milestone for the Design Review
 940        :return: The created Design Review
 941        """
 942
 943        data: dict[str, Any] = {
 944            "title": title,
 945        }
 946
 947        if isinstance(head, Branch):
 948            data["head"] = head.name
 949        else:
 950            data["head"] = head
 951        if isinstance(base, Branch):
 952            data["base"] = base.name
 953        else:
 954            data["base"] = base
 955        if assignees:
 956            data["assignees"] = [a.username if isinstance(a, User) else a for a in assignees]
 957        if body:
 958            data["body"] = body
 959        if due_date:
 960            data["due_date"] = Util.format_time(due_date)
 961        if milestone:
 962            data["milestone"] = milestone.id
 963
 964        result = self.allspice_client.requests_post(
 965            self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name),
 966            data=data,
 967        )
 968
 969        return DesignReview.parse_response(self.allspice_client, result)
 970
 971    def create_milestone(
 972        self,
 973        title: str,
 974        description: str,
 975        due_date: Optional[str] = None,
 976        state: str = "open",
 977    ) -> "Milestone":
 978        url = Repository.REPO_MILESTONES.format(owner=self.owner.username, repo=self.name)
 979        data = {"title": title, "description": description, "state": state}
 980        if due_date:
 981            data["due_date"] = due_date
 982        result = self.allspice_client.requests_post(url, data=data)
 983        return Milestone.parse_response(self.allspice_client, result)
 984
 985    def create_gitea_hook(self, hook_url: str, events: List[str]):
 986        url = f"/repos/{self.owner.username}/{self.name}/hooks"
 987        data = {
 988            "type": "gitea",
 989            "config": {"content_type": "json", "url": hook_url},
 990            "events": events,
 991            "active": True,
 992        }
 993        return self.allspice_client.requests_post(url, data=data)
 994
 995    def list_hooks(self):
 996        url = f"/repos/{self.owner.username}/{self.name}/hooks"
 997        return self.allspice_client.requests_get(url)
 998
 999    def delete_hook(self, id: str):
1000        url = f"/repos/{self.owner.username}/{self.name}/hooks/{id}"
1001        self.allspice_client.requests_delete(url)
1002
1003    def is_collaborator(self, username) -> bool:
1004        if isinstance(username, User):
1005            username = username.username
1006        try:
1007            # returns 204 if its ok, 404 if its not
1008            self.allspice_client.requests_get(
1009                Repository.REPO_IS_COLLABORATOR % (self.owner.username, self.name, username)
1010            )
1011            return True
1012        except Exception:
1013            return False
1014
1015    def get_users_with_access(self) -> Sequence[User]:
1016        url = f"/repos/{self.owner.username}/{self.name}/collaborators"
1017        response = self.allspice_client.requests_get(url)
1018        collabs = [User.parse_response(self.allspice_client, user) for user in response]
1019        if isinstance(self.owner, User):
1020            return [*collabs, self.owner]
1021        else:
1022            # owner must be org
1023            teams = self.owner.get_teams()
1024            for team in teams:
1025                team_repos = team.get_repos()
1026                if self.name in [n.name for n in team_repos]:
1027                    collabs += team.get_members()
1028            return collabs
1029
1030    def remove_collaborator(self, user_name: str):
1031        url = f"/repos/{self.owner.username}/{self.name}/collaborators/{user_name}"
1032        self.allspice_client.requests_delete(url)
1033
1034    def transfer_ownership(
1035        self,
1036        new_owner: Union[User, Organization],
1037        new_teams: Set[Team] | FrozenSet[Team] = frozenset(),
1038    ):
1039        url = Repository.REPO_TRANSFER.format(owner=self.owner.username, repo=self.name)
1040        data: dict[str, Any] = {"new_owner": new_owner.username}
1041        if isinstance(new_owner, Organization):
1042            new_team_ids = [team.id for team in new_teams if team in new_owner.get_teams()]
1043            data["team_ids"] = new_team_ids
1044        self.allspice_client.requests_post(url, data=data)
1045        # TODO: make sure this instance is either updated or discarded
1046
1047    def get_git_content(
1048        self,
1049        ref: Optional["Ref"] = None,
1050        commit: "Optional[Commit]" = None,
1051    ) -> List[Content]:
1052        """
1053        Get the metadata for all files in the root directory.
1054
1055        https://hub.allspice.io/api/swagger#/repository/repoGetContentsList
1056
1057        :param ref: branch or commit to get content from
1058        :param commit: commit to get content from (deprecated)
1059        """
1060        url = f"/repos/{self.owner.username}/{self.name}/contents"
1061        data = Util.data_params_for_ref(ref or commit)
1062
1063        result = [
1064            Content.parse_response(self.allspice_client, f)
1065            for f in self.allspice_client.requests_get(url, data)
1066        ]
1067        return result
1068
1069    def get_tree(self, ref: Optional[Ref] = None, recursive: bool = False) -> List[GitEntry]:
1070        """
1071        Get the repository's tree on a given ref.
1072
1073        By default, this will only return the top-level entries in the tree. If you want
1074        to get the entire tree, set `recursive` to True.
1075
1076        :param ref: The ref to get the tree from. If not provided, the default branch is used.
1077        :param recursive: Whether to get the entire tree or just the top-level entries.
1078        """
1079
1080        ref = Util.data_params_for_ref(ref).get("ref", self.default_branch)
1081        url = self.REPO_GET_TREE.format(owner=self.owner.username, repo=self.name, ref=ref)
1082        params = {"recursive": recursive}
1083        results = self.allspice_client.requests_get_paginated(url, params=params)
1084        return [GitEntry.parse_response(self.allspice_client, result) for result in results]
1085
1086    def get_file_content(
1087        self,
1088        content: Content,
1089        ref: Optional[Ref] = None,
1090        commit: Optional[Commit] = None,
1091    ) -> Union[str, List["Content"]]:
1092        """https://hub.allspice.io/api/swagger#/repository/repoGetContents"""
1093        url = f"/repos/{self.owner.username}/{self.name}/contents/{content.path}"
1094        data = Util.data_params_for_ref(ref or commit)
1095
1096        if content.type == Content.FILE:
1097            return self.allspice_client.requests_get(url, data)["content"]
1098        else:
1099            return [
1100                Content.parse_response(self.allspice_client, f)
1101                for f in self.allspice_client.requests_get(url, data)
1102            ]
1103
1104    def get_raw_file(
1105        self,
1106        file_path: str,
1107        ref: Optional[Ref] = None,
1108    ) -> bytes:
1109        """
1110        Get the raw, binary data of a single file.
1111
1112        Note 1: if the file you are requesting is a text file, you might want to
1113        use .decode() on the result to get a string. For example:
1114
1115            content = repo.get_raw_file("file.txt").decode("utf-8")
1116
1117        Note 2: this method will store the entire file in memory. If you want
1118        to download a large file, you might want to use `download_to_file`
1119        instead.
1120
1121        See https://hub.allspice.io/api/swagger#/repository/repoGetRawFile
1122
1123        :param file_path: The path to the file to get.
1124        :param ref: The branch or commit to get the file from.  If not provided,
1125            the default branch is used.
1126        """
1127
1128        url = self.REPO_GET_RAW_FILE.format(
1129            owner=self.owner.username,
1130            repo=self.name,
1131            path=file_path,
1132        )
1133        params = Util.data_params_for_ref(ref)
1134        return self.allspice_client.requests_get_raw(url, params=params)
1135
1136    def download_to_file(
1137        self,
1138        file_path: str,
1139        io: IO,
1140        ref: Optional[Ref] = None,
1141    ) -> None:
1142        """
1143        Download the binary data of a file to a file-like object.
1144
1145        Example:
1146
1147            with open("schematic.DSN", "wb") as f:
1148                Repository.download_to_file("Schematics/my_schematic.DSN", f)
1149
1150        :param file_path: The path to the file in the repository from the root
1151            of the repository.
1152        :param io: The file-like object to write the data to.
1153        """
1154
1155        url = self.allspice_client._AllSpice__get_url(
1156            self.REPO_GET_RAW_FILE.format(
1157                owner=self.owner.username,
1158                repo=self.name,
1159                path=file_path,
1160            )
1161        )
1162        params = Util.data_params_for_ref(ref)
1163        response = self.allspice_client.requests.get(
1164            url,
1165            params=params,
1166            headers=self.allspice_client.headers,
1167            stream=True,
1168        )
1169
1170        for chunk in response.iter_content(chunk_size=4096):
1171            if chunk:
1172                io.write(chunk)
1173
1174    def get_generated_json(self, content: Union[Content, str], ref: Optional[Ref] = None) -> dict:
1175        """
1176        Get the json blob for a cad file if it exists, otherwise enqueue
1177        a new job and return a 503 status.
1178
1179        WARNING: This is still experimental and not recommended for critical
1180        applications. The structure and content of the returned dictionary can
1181        change at any time.
1182
1183        See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON
1184        """
1185
1186        if isinstance(content, Content):
1187            content = content.path
1188
1189        url = self.REPO_GET_ALLSPICE_JSON.format(
1190            owner=self.owner.username,
1191            repo=self.name,
1192            content=content,
1193        )
1194        data = Util.data_params_for_ref(ref)
1195        return self.allspice_client.requests_get(url, data)
1196
1197    def get_generated_svg(self, content: Union[Content, str], ref: Optional[Ref] = None) -> bytes:
1198        """
1199        Get the svg blob for a cad file if it exists, otherwise enqueue
1200        a new job and return a 503 status.
1201
1202        WARNING: This is still experimental and not yet recommended for
1203        critical applications. The content of the returned svg can change
1204        at any time.
1205
1206        See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG
1207        """
1208
1209        if isinstance(content, Content):
1210            content = content.path
1211
1212        url = self.REPO_GET_ALLSPICE_SVG.format(
1213            owner=self.owner.username,
1214            repo=self.name,
1215            content=content,
1216        )
1217        data = Util.data_params_for_ref(ref)
1218        return self.allspice_client.requests_get_raw(url, data)
1219
1220    def create_file(self, file_path: str, content: str, data: Optional[dict] = None):
1221        """https://hub.allspice.io/api/swagger#/repository/repoCreateFile"""
1222        if not data:
1223            data = {}
1224        url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}"
1225        data.update({"content": content})
1226        return self.allspice_client.requests_post(url, data)
1227
1228    def change_file(self, file_path: str, file_sha: str, content: str, data: Optional[dict] = None):
1229        """https://hub.allspice.io/api/swagger#/repository/repoCreateFile"""
1230        if not data:
1231            data = {}
1232        url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}"
1233        data.update({"sha": file_sha, "content": content})
1234        return self.allspice_client.requests_put(url, data)
1235
1236    def delete_file(self, file_path: str, file_sha: str, data: Optional[dict] = None):
1237        """https://hub.allspice.io/api/swagger#/repository/repoDeleteFile"""
1238        if not data:
1239            data = {}
1240        url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}"
1241        data.update({"sha": file_sha})
1242        return self.allspice_client.requests_delete(url, data)
1243
1244    def get_archive(
1245        self,
1246        ref: Ref = "main",
1247        archive_format: ArchiveFormat = ArchiveFormat.ZIP,
1248    ) -> bytes:
1249        """
1250        Download all the files in a specific ref of a repository as a zip or tarball
1251        archive.
1252
1253        https://hub.allspice.io/api/swagger#/repository/repoGetArchive
1254
1255        :param ref: branch or commit to get content from, defaults to the "main" branch
1256        :param archive_format: zip or tar, defaults to zip
1257        """
1258
1259        ref_string = Util.data_params_for_ref(ref)["ref"]
1260        url = self.REPO_GET_ARCHIVE.format(
1261            owner=self.owner.username,
1262            repo=self.name,
1263            ref=ref_string,
1264            format=archive_format.value,
1265        )
1266        return self.allspice_client.requests_get_raw(url)
1267
1268    def get_topics(self) -> list[str]:
1269        """
1270        Gets the list of topics on this repository.
1271
1272        See http://localhost:3000/api/swagger#/repository/repoListTopics
1273        """
1274
1275        url = self.REPO_GET_TOPICS.format(
1276            owner=self.owner.username,
1277            repo=self.name,
1278        )
1279        return self.allspice_client.requests_get(url)["topics"]
1280
1281    def add_topic(self, topic: str):
1282        """
1283        Adds a topic to the repository.
1284
1285        See https://hub.allspice.io/api/swagger#/repository/repoAddTopic
1286
1287        :param topic: The topic to add. Topic names must consist only of
1288            lowercase letters, numnbers and dashes (-), and cannot start with
1289            dashes. Topic names also must be under 35 characters long.
1290        """
1291
1292        url = self.REPO_ADD_TOPIC.format(owner=self.owner.username, repo=self.name, topic=topic)
1293        self.allspice_client.requests_put(url)
1294
1295    def create_release(
1296        self,
1297        tag_name: str,
1298        name: Optional[str] = None,
1299        body: Optional[str] = None,
1300        draft: bool = False,
1301    ):
1302        """
1303        Create a release for this repository. The release will be created for
1304        the tag with the given name. If there is no tag with this name, create
1305        the tag first.
1306
1307        See https://hub.allspice.io/api/swagger#/repository/repoCreateRelease
1308        """
1309
1310        url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name)
1311        data = {
1312            "tag_name": tag_name,
1313            "draft": draft,
1314        }
1315        if name is not None:
1316            data["name"] = name
1317        if body is not None:
1318            data["body"] = body
1319        response = self.allspice_client.requests_post(url, data)
1320        return Release.parse_response(self.allspice_client, response, self)
1321
1322    def get_releases(
1323        self, draft: Optional[bool] = None, pre_release: Optional[bool] = None
1324    ) -> List[Release]:
1325        """
1326        Get the list of releases for this repository.
1327
1328        See https://hub.allspice.io/api/swagger#/repository/repoListReleases
1329        """
1330
1331        data = {}
1332
1333        if draft is not None:
1334            data["draft"] = draft
1335        if pre_release is not None:
1336            data["pre-release"] = pre_release
1337
1338        url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name)
1339        responses = self.allspice_client.requests_get_paginated(url, params=data)
1340
1341        return [
1342            Release.parse_response(self.allspice_client, response, self) for response in responses
1343        ]
1344
1345    def get_latest_release(self) -> Release:
1346        """
1347        Get the latest release for this repository.
1348
1349        See https://hub.allspice.io/api/swagger#/repository/repoGetLatestRelease
1350        """
1351
1352        url = self.REPO_GET_LATEST_RELEASE.format(owner=self.owner.username, repo=self.name)
1353        response = self.allspice_client.requests_get(url)
1354        release = Release.parse_response(self.allspice_client, response, self)
1355        return release
1356
1357    def get_release_by_tag(self, tag: str) -> Release:
1358        """
1359        Get a release by its tag.
1360
1361        See https://hub.allspice.io/api/swagger#/repository/repoGetReleaseByTag
1362        """
1363
1364        url = self.REPO_GET_RELEASE_BY_TAG.format(
1365            owner=self.owner.username, repo=self.name, tag=tag
1366        )
1367        response = self.allspice_client.requests_get(url)
1368        release = Release.parse_response(self.allspice_client, response, self)
1369        return release
1370
1371    def get_commit_statuses(
1372        self,
1373        commit: Union[str, Commit],
1374        sort: Optional[CommitStatusSort] = None,
1375        state: Optional[CommitStatusState] = None,
1376    ) -> List[CommitStatus]:
1377        """
1378        Get a list of statuses for a commit.
1379
1380        This is roughly equivalent to the Commit.get_statuses method, but this
1381        method allows you to sort and filter commits and is more convenient if
1382        you have a commit SHA and don't need to get the commit itself.
1383
1384        See https://hub.allspice.io/api/swagger#/repository/repoListStatuses
1385        """
1386
1387        if isinstance(commit, Commit):
1388            commit = commit.sha
1389
1390        params = {}
1391        if sort is not None:
1392            params["sort"] = sort.value
1393        if state is not None:
1394            params["state"] = state.value
1395
1396        url = self.REPO_GET_COMMIT_STATUS.format(
1397            owner=self.owner.username, repo=self.name, sha=commit
1398        )
1399        response = self.allspice_client.requests_get_paginated(url, params=params)
1400        return [CommitStatus.parse_response(self.allspice_client, status) for status in response]
1401
1402    def create_commit_status(
1403        self,
1404        commit: Union[str, Commit],
1405        context: Optional[str] = None,
1406        description: Optional[str] = None,
1407        state: Optional[CommitStatusState] = None,
1408        target_url: Optional[str] = None,
1409    ) -> CommitStatus:
1410        """
1411        Create a status on a commit.
1412
1413        See https://hub.allspice.io/api/swagger#/repository/repoCreateStatus
1414        """
1415
1416        if isinstance(commit, Commit):
1417            commit = commit.sha
1418
1419        data = {}
1420        if context is not None:
1421            data["context"] = context
1422        if description is not None:
1423            data["description"] = description
1424        if state is not None:
1425            data["state"] = state.value
1426        if target_url is not None:
1427            data["target_url"] = target_url
1428
1429        url = self.REPO_GET_COMMIT_STATUS.format(
1430            owner=self.owner.username, repo=self.name, sha=commit
1431        )
1432        response = self.allspice_client.requests_post(url, data=data)
1433        return CommitStatus.parse_response(self.allspice_client, response)
1434
1435    def delete(self):
1436        self.allspice_client.requests_delete(
1437            Repository.REPO_DELETE % (self.owner.username, self.name)
1438        )
1439        self.deleted = True
Repository(allspice_client)
579    def __init__(self, allspice_client):
580        super().__init__(allspice_client)
allow_manual_merge: Any
allow_merge_commits: bool
allow_rebase: bool
allow_rebase_explicit: bool
allow_rebase_update: bool
allow_squash_merge: bool
archived: bool
archived_at: str
autodetect_manual_merge: Any
avatar_url: str
clone_url: str
created_at: str
default_allow_maintainer_edit: bool
default_branch: str
default_delete_branch_after_merge: bool
default_merge_style: str
description: str
empty: bool
enable_prune: Any
external_tracker: Any
external_wiki: Any
fork: bool
forks_count: int
full_name: str
has_actions: bool
has_issues: bool
has_packages: bool
has_projects: bool
has_pull_requests: bool
has_releases: bool
has_wiki: bool
html_url: str
id: int
ignore_whitespace_conflicts: bool
internal: bool
internal_tracker: Dict[str, bool]
language: str
languages_url: str
mirror: bool
mirror_interval: str
mirror_updated: str
name: str
open_issues_count: int
open_pr_counter: int
original_url: str
owner: Union[User, Organization]
parent: Any
permissions: Dict[str, bool]
private: bool
release_counter: int
repo_transfer: Any
size: int
ssh_url: str
stars_count: int
template: bool
updated_at: datetime.datetime
url: str
watchers_count: int
website: str
API_OBJECT = '/repos/{owner}/{name}'
REPO_IS_COLLABORATOR = '/repos/%s/%s/collaborators/%s'
REPO_BRANCHES = '/repos/%s/%s/branches'
REPO_BRANCH = '/repos/{owner}/{repo}/branches/{branch}'
REPO_ISSUES = '/repos/{owner}/{repo}/issues'
REPO_DESIGN_REVIEWS = '/repos/{owner}/{repo}/pulls'
REPO_DELETE = '/repos/%s/%s'
REPO_TIMES = '/repos/%s/%s/times'
REPO_USER_TIME = '/repos/%s/%s/times/%s'
REPO_COMMITS = '/repos/%s/%s/commits'
REPO_TRANSFER = '/repos/{owner}/{repo}/transfer'
REPO_MILESTONES = '/repos/{owner}/{repo}/milestones'
REPO_GET_ARCHIVE = '/repos/{owner}/{repo}/archive/{ref}.{format}'
REPO_GET_ALLSPICE_JSON = '/repos/{owner}/{repo}/allspice_generated/json/{content}'
REPO_GET_ALLSPICE_SVG = '/repos/{owner}/{repo}/allspice_generated/svg/{content}'
REPO_GET_TOPICS = '/repos/{owner}/{repo}/topics'
REPO_ADD_TOPIC = '/repos/{owner}/{repo}/topics/{topic}'
REPO_GET_RELEASES = '/repos/{owner}/{repo}/releases'
REPO_GET_LATEST_RELEASE = '/repos/{owner}/{repo}/releases/latest'
REPO_GET_RELEASE_BY_TAG = '/repos/{owner}/{repo}/releases/tags/{tag}'
REPO_GET_COMMIT_STATUS = '/repos/{owner}/{repo}/statuses/{sha}'
REPO_GET_RAW_FILE = '/repos/{owner}/{repo}/raw/{path}'
REPO_GET_TREE = '/repos/{owner}/{repo}/git/trees/{ref}'
@classmethod
def request( cls, allspice_client, owner: str, name: str) -> Repository:
600    @classmethod
601    def request(
602        cls,
603        allspice_client,
604        owner: str,
605        name: str,
606    ) -> Repository:
607        return cls._request(allspice_client, {"owner": owner, "name": name})
@classmethod
def search( cls, allspice_client, query: Optional[str] = None, topic: bool = False, include_description: bool = False, user: Optional[User] = None, owner_to_prioritize: Union[User, Organization, NoneType] = None) -> list[Repository]:
609    @classmethod
610    def search(
611        cls,
612        allspice_client,
613        query: Optional[str] = None,
614        topic: bool = False,
615        include_description: bool = False,
616        user: Optional[User] = None,
617        owner_to_prioritize: Union[User, Organization, None] = None,
618    ) -> list[Repository]:
619        """
620        Search for repositories.
621
622        See https://hub.allspice.io/api/swagger#/repository/repoSearch
623
624        :param query: The query string to search for
625        :param topic: If true, the query string will only be matched against the
626            repository's topic.
627        :param include_description: If true, the query string will be matched
628            against the repository's description as well.
629        :param user: If specified, only repositories that this user owns or
630            contributes to will be searched.
631        :param owner_to_prioritize: If specified, repositories owned by the
632            given entity will be prioritized in the search.
633        :returns: All repositories matching the query. If there are many
634            repositories matching this query, this may take some time.
635        """
636
637        params = {}
638
639        if query is not None:
640            params["q"] = query
641        if topic:
642            params["topic"] = topic
643        if include_description:
644            params["include_description"] = include_description
645        if user is not None:
646            params["user"] = user.id
647        if owner_to_prioritize is not None:
648            params["owner_to_prioritize"] = owner_to_prioritize.id
649
650        responses = allspice_client.requests_get_paginated(cls.REPO_SEARCH, params=params)
651
652        return [Repository.parse_response(allspice_client, response) for response in responses]

Search for repositories.

See allspice.allspice.io/api/swagger#/repository/repoSearch">https://huballspice.allspice.io/api/swagger#/repository/repoSearch

Parameters
  • query: The query string to search for
  • topic: If true, the query string will only be matched against the repository's topic.
  • include_description: If true, the query string will be matched against the repository's description as well.
  • user: If specified, only repositories that this user owns or contributes to will be searched.
  • owner_to_prioritize: If specified, repositories owned by the given entity will be prioritized in the search. :returns: All repositories matching the query. If there are many repositories matching this query, this may take some time.
def commit(self):
683    def commit(self):
684        args = {"owner": self.owner.username, "name": self.name}
685        self._commit(args)
def get_branches(self) -> List[Branch]:
687    def get_branches(self) -> List["Branch"]:
688        """Get all the Branches of this Repository."""
689
690        results = self.allspice_client.requests_get_paginated(
691            Repository.REPO_BRANCHES % (self.owner.username, self.name)
692        )
693        return [Branch.parse_response(self.allspice_client, result) for result in results]

Get all the Branches of this Repository.

def get_branch(self, name: str) -> Branch:
695    def get_branch(self, name: str) -> "Branch":
696        """Get a specific Branch of this Repository."""
697        result = self.allspice_client.requests_get(
698            Repository.REPO_BRANCH.format(owner=self.owner.username, repo=self.name, branch=name)
699        )
700        return Branch.parse_response(self.allspice_client, result)

Get a specific Branch of this Repository.

def add_branch( self, create_from: Union[Branch, Commit, str], newname: str) -> Branch:
702    def add_branch(self, create_from: Ref, newname: str) -> "Branch":
703        """Add a branch to the repository"""
704        # Note: will only work with gitea 1.13 or higher!
705
706        ref_name = Util.data_params_for_ref(create_from)
707        if "ref" not in ref_name:
708            raise ValueError("create_from must be a Branch, Commit or string")
709        ref_name = ref_name["ref"]
710
711        data = {"new_branch_name": newname, "old_ref_name": ref_name}
712        result = self.allspice_client.requests_post(
713            Repository.REPO_BRANCHES % (self.owner.username, self.name), data=data
714        )
715        return Branch.parse_response(self.allspice_client, result)

Add a branch to the repository

def get_issues( self, state: Literal['open', 'closed', 'all'] = 'all', search_query: Optional[str] = None, labels: Optional[List[str]] = None, milestones: Optional[List[Union[Milestone, str]]] = None, assignee: Union[User, str, NoneType] = None, since: Optional[datetime.datetime] = None, before: Optional[datetime.datetime] = None) -> List[Issue]:
717    def get_issues(
718        self,
719        state: Literal["open", "closed", "all"] = "all",
720        search_query: Optional[str] = None,
721        labels: Optional[List[str]] = None,
722        milestones: Optional[List[Union[Milestone, str]]] = None,
723        assignee: Optional[Union[User, str]] = None,
724        since: Optional[datetime] = None,
725        before: Optional[datetime] = None,
726    ) -> List["Issue"]:
727        """
728        Get all Issues of this Repository (open and closed)
729
730        https://hub.allspice.io/api/swagger#/repository/repoListIssues
731
732        All params of this method are optional filters. If you don't specify a filter, it
733        will not be applied.
734
735        :param state: The state of the Issues to get. If None, all Issues are returned.
736        :param search_query: Filter issues by text. This is equivalent to searching for
737                             `search_query` in the Issues on the web interface.
738        :param labels: Filter issues by labels.
739        :param milestones: Filter issues by milestones.
740        :param assignee: Filter issues by the assigned user.
741        :param since: Filter issues by the date they were created.
742        :param before: Filter issues by the date they were created.
743        :return: A list of Issues.
744        """
745
746        data = {
747            "state": state,
748        }
749        if search_query:
750            data["q"] = search_query
751        if labels:
752            data["labels"] = ",".join(labels)
753        if milestones:
754            data["milestone"] = ",".join(
755                [
756                    milestone.name if isinstance(milestone, Milestone) else milestone
757                    for milestone in milestones
758                ]
759            )
760        if assignee:
761            if isinstance(assignee, User):
762                data["assignee"] = assignee.username
763            else:
764                data["assignee"] = assignee
765        if since:
766            data["since"] = Util.format_time(since)
767        if before:
768            data["before"] = Util.format_time(before)
769
770        results = self.allspice_client.requests_get_paginated(
771            Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name),
772            params=data,
773        )
774
775        issues = []
776        for result in results:
777            issue = Issue.parse_response(self.allspice_client, result)
778            # See Issue.request
779            setattr(issue, "_repository", self)
780            # This is mostly for compatibility with an older implementation
781            Issue._add_read_property("repo", self, issue)
782            issues.append(issue)
783
784        return issues

Get all Issues of this Repository (open and closed)

allspice.allspice.io/api/swagger#/repository/repoListIssues">https://huballspice.allspice.io/api/swagger#/repository/repoListIssues

All params of this method are optional filters. If you don't specify a filter, it will not be applied.

Parameters
  • state: The state of the Issues to get. If None, all Issues are returned.
  • search_query: Filter issues by text. This is equivalent to searching for search_query in the Issues on the web interface.
  • labels: Filter issues by labels.
  • milestones: Filter issues by milestones.
  • assignee: Filter issues by the assigned user.
  • since: Filter issues by the date they were created.
  • before: Filter issues by the date they were created.
Returns

A list of Issues.

def get_design_reviews( self, state: Literal['open', 'closed', 'all'] = 'all', milestone: Union[Milestone, str, NoneType] = None, labels: Optional[List[str]] = None) -> List[DesignReview]:
786    def get_design_reviews(
787        self,
788        state: Literal["open", "closed", "all"] = "all",
789        milestone: Optional[Union[Milestone, str]] = None,
790        labels: Optional[List[str]] = None,
791    ) -> List["DesignReview"]:
792        """
793        Get all Design Reviews of this Repository.
794
795        https://hub.allspice.io/api/swagger#/repository/repoListPullRequests
796
797        :param state: The state of the Design Reviews to get. If None, all Design Reviews
798                      are returned.
799        :param milestone: The milestone of the Design Reviews to get.
800        :param labels: A list of label IDs to filter DRs by.
801        :return: A list of Design Reviews.
802        """
803
804        params = {
805            "state": state,
806        }
807        if milestone:
808            if isinstance(milestone, Milestone):
809                params["milestone"] = milestone.name
810            else:
811                params["milestone"] = milestone
812        if labels:
813            params["labels"] = ",".join(labels)
814
815        results = self.allspice_client.requests_get_paginated(
816            self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name),
817            params=params,
818        )
819        return [DesignReview.parse_response(self.allspice_client, result) for result in results]

Get all Design Reviews of this Repository.

allspice.allspice.io/api/swagger#/repository/repoListPullRequests">https://huballspice.allspice.io/api/swagger#/repository/repoListPullRequests

Parameters
  • state: The state of the Design Reviews to get. If None, all Design Reviews are returned.
  • milestone: The milestone of the Design Reviews to get.
  • labels: A list of label IDs to filter DRs by.
Returns

A list of Design Reviews.

def get_commits( self, sha: Optional[str] = None, path: Optional[str] = None, stat: bool = True) -> List[Commit]:
821    def get_commits(
822        self,
823        sha: Optional[str] = None,
824        path: Optional[str] = None,
825        stat: bool = True,
826    ) -> List["Commit"]:
827        """
828        Get all the Commits of this Repository.
829
830        https://hub.allspice.io/api/swagger#/repository/repoGetAllCommits
831
832        :param sha: The SHA of the commit to start listing commits from.
833        :param path: filepath of a file/dir.
834        :param stat: Include the number of additions and deletions in the response.
835                     Disable for speedup.
836        :return: A list of Commits.
837        """
838
839        data = {}
840        if sha:
841            data["sha"] = sha
842        if path:
843            data["path"] = path
844        if not stat:
845            data["stat"] = False
846
847        try:
848            results = self.allspice_client.requests_get_paginated(
849                Repository.REPO_COMMITS % (self.owner.username, self.name),
850                params=data,
851            )
852        except ConflictException as err:
853            logging.warning(err)
854            logging.warning("Repository %s/%s is Empty" % (self.owner.username, self.name))
855            results = []
856        return [Commit.parse_response(self.allspice_client, result) for result in results]

Get all the Commits of this Repository.

allspice.allspice.io/api/swagger#/repository/repoGetAllCommits">https://huballspice.allspice.io/api/swagger#/repository/repoGetAllCommits

Parameters
  • sha: The SHA of the commit to start listing commits from.
  • path: filepath of a file/dir.
  • stat: Include the number of additions and deletions in the response. Disable for speedup.
Returns

A list of Commits.

def get_issues_state(self, state) -> List[Issue]:
858    def get_issues_state(self, state) -> List["Issue"]:
859        """
860        DEPRECATED: Use get_issues() instead.
861
862        Get issues of state Issue.open or Issue.closed of a repository.
863        """
864
865        assert state in [Issue.OPENED, Issue.CLOSED]
866        issues = []
867        data = {"state": state}
868        results = self.allspice_client.requests_get_paginated(
869            Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name),
870            params=data,
871        )
872        for result in results:
873            issue = Issue.parse_response(self.allspice_client, result)
874            # adding data not contained in the issue response
875            # See Issue.request()
876            setattr(issue, "_repository", self)
877            Issue._add_read_property("repo", self, issue)
878            Issue._add_read_property("owner", self.owner, issue)
879            issues.append(issue)
880        return issues

DEPRECATED: Use get_issues() instead.

Get issues of state Issue.open or Issue.closed of a repository.

def get_times(self):
882    def get_times(self):
883        results = self.allspice_client.requests_get(
884            Repository.REPO_TIMES % (self.owner.username, self.name)
885        )
886        return results
def get_user_time(self, username) -> float:
888    def get_user_time(self, username) -> float:
889        if isinstance(username, User):
890            username = username.username
891        results = self.allspice_client.requests_get(
892            Repository.REPO_USER_TIME % (self.owner.username, self.name, username)
893        )
894        time = sum(r["time"] for r in results)
895        return time
def get_full_name(self) -> str:
897    def get_full_name(self) -> str:
898        return self.owner.username + "/" + self.name
def create_issue( self, title, assignees=frozenset(), description='') -> allspice.baseapiobject.ApiObject:
900    def create_issue(self, title, assignees=frozenset(), description="") -> ApiObject:
901        data = {
902            "assignees": assignees,
903            "body": description,
904            "closed": False,
905            "title": title,
906        }
907        result = self.allspice_client.requests_post(
908            Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name),
909            data=data,
910        )
911
912        issue = Issue.parse_response(self.allspice_client, result)
913        setattr(issue, "_repository", self)
914        Issue._add_read_property("repo", self, issue)
915        return issue
def create_design_review( self, title: str, head: Union[Branch, str], base: Union[Branch, str], assignees: Optional[Set[Union[User, str]]] = None, body: Optional[str] = None, due_date: Optional[datetime.datetime] = None, milestone: Optional[Milestone] = None) -> DesignReview:
917    def create_design_review(
918        self,
919        title: str,
920        head: Union[Branch, str],
921        base: Union[Branch, str],
922        assignees: Optional[Set[Union[User, str]]] = None,
923        body: Optional[str] = None,
924        due_date: Optional[datetime] = None,
925        milestone: Optional["Milestone"] = None,
926    ) -> "DesignReview":
927        """
928        Create a new Design Review.
929
930        See https://hub.allspice.io/api/swagger#/repository/repoCreatePullRequest
931
932        :param title: Title of the Design Review
933        :param head: Branch or name of the branch to merge into the base branch
934        :param base: Branch or name of the branch to merge into
935        :param assignees: Optional. A list of users to assign this review. List can be of
936                          User objects or of usernames.
937        :param body: An Optional Description for the Design Review.
938        :param due_date: An Optional Due date for the Design Review.
939        :param milestone: An Optional Milestone for the Design Review
940        :return: The created Design Review
941        """
942
943        data: dict[str, Any] = {
944            "title": title,
945        }
946
947        if isinstance(head, Branch):
948            data["head"] = head.name
949        else:
950            data["head"] = head
951        if isinstance(base, Branch):
952            data["base"] = base.name
953        else:
954            data["base"] = base
955        if assignees:
956            data["assignees"] = [a.username if isinstance(a, User) else a for a in assignees]
957        if body:
958            data["body"] = body
959        if due_date:
960            data["due_date"] = Util.format_time(due_date)
961        if milestone:
962            data["milestone"] = milestone.id
963
964        result = self.allspice_client.requests_post(
965            self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name),
966            data=data,
967        )
968
969        return DesignReview.parse_response(self.allspice_client, result)

Create a new Design Review.

See allspice.allspice.io/api/swagger#/repository/repoCreatePullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoCreatePullRequest

Parameters
  • title: Title of the Design Review
  • head: Branch or name of the branch to merge into the base branch
  • base: Branch or name of the branch to merge into
  • assignees: Optional. A list of users to assign this review. List can be of User objects or of usernames.
  • body: An Optional Description for the Design Review.
  • due_date: An Optional Due date for the Design Review.
  • milestone: An Optional Milestone for the Design Review
Returns

The created Design Review

def create_milestone( self, title: str, description: str, due_date: Optional[str] = None, state: str = 'open') -> Milestone:
971    def create_milestone(
972        self,
973        title: str,
974        description: str,
975        due_date: Optional[str] = None,
976        state: str = "open",
977    ) -> "Milestone":
978        url = Repository.REPO_MILESTONES.format(owner=self.owner.username, repo=self.name)
979        data = {"title": title, "description": description, "state": state}
980        if due_date:
981            data["due_date"] = due_date
982        result = self.allspice_client.requests_post(url, data=data)
983        return Milestone.parse_response(self.allspice_client, result)
def create_gitea_hook(self, hook_url: str, events: List[str]):
985    def create_gitea_hook(self, hook_url: str, events: List[str]):
986        url = f"/repos/{self.owner.username}/{self.name}/hooks"
987        data = {
988            "type": "gitea",
989            "config": {"content_type": "json", "url": hook_url},
990            "events": events,
991            "active": True,
992        }
993        return self.allspice_client.requests_post(url, data=data)
def list_hooks(self):
995    def list_hooks(self):
996        url = f"/repos/{self.owner.username}/{self.name}/hooks"
997        return self.allspice_client.requests_get(url)
def delete_hook(self, id: str):
 999    def delete_hook(self, id: str):
1000        url = f"/repos/{self.owner.username}/{self.name}/hooks/{id}"
1001        self.allspice_client.requests_delete(url)
def is_collaborator(self, username) -> bool:
1003    def is_collaborator(self, username) -> bool:
1004        if isinstance(username, User):
1005            username = username.username
1006        try:
1007            # returns 204 if its ok, 404 if its not
1008            self.allspice_client.requests_get(
1009                Repository.REPO_IS_COLLABORATOR % (self.owner.username, self.name, username)
1010            )
1011            return True
1012        except Exception:
1013            return False
def get_users_with_access(self) -> Sequence[User]:
1015    def get_users_with_access(self) -> Sequence[User]:
1016        url = f"/repos/{self.owner.username}/{self.name}/collaborators"
1017        response = self.allspice_client.requests_get(url)
1018        collabs = [User.parse_response(self.allspice_client, user) for user in response]
1019        if isinstance(self.owner, User):
1020            return [*collabs, self.owner]
1021        else:
1022            # owner must be org
1023            teams = self.owner.get_teams()
1024            for team in teams:
1025                team_repos = team.get_repos()
1026                if self.name in [n.name for n in team_repos]:
1027                    collabs += team.get_members()
1028            return collabs
def remove_collaborator(self, user_name: str):
1030    def remove_collaborator(self, user_name: str):
1031        url = f"/repos/{self.owner.username}/{self.name}/collaborators/{user_name}"
1032        self.allspice_client.requests_delete(url)
def transfer_ownership( self, new_owner: Union[User, Organization], new_teams: Union[Set[Team], FrozenSet[Team]] = frozenset()):
1034    def transfer_ownership(
1035        self,
1036        new_owner: Union[User, Organization],
1037        new_teams: Set[Team] | FrozenSet[Team] = frozenset(),
1038    ):
1039        url = Repository.REPO_TRANSFER.format(owner=self.owner.username, repo=self.name)
1040        data: dict[str, Any] = {"new_owner": new_owner.username}
1041        if isinstance(new_owner, Organization):
1042            new_team_ids = [team.id for team in new_teams if team in new_owner.get_teams()]
1043            data["team_ids"] = new_team_ids
1044        self.allspice_client.requests_post(url, data=data)
1045        # TODO: make sure this instance is either updated or discarded
def get_git_content( self, ref: Union[Branch, Commit, str, NoneType] = None, commit: Optional[Commit] = None) -> List[Content]:
1047    def get_git_content(
1048        self,
1049        ref: Optional["Ref"] = None,
1050        commit: "Optional[Commit]" = None,
1051    ) -> List[Content]:
1052        """
1053        Get the metadata for all files in the root directory.
1054
1055        https://hub.allspice.io/api/swagger#/repository/repoGetContentsList
1056
1057        :param ref: branch or commit to get content from
1058        :param commit: commit to get content from (deprecated)
1059        """
1060        url = f"/repos/{self.owner.username}/{self.name}/contents"
1061        data = Util.data_params_for_ref(ref or commit)
1062
1063        result = [
1064            Content.parse_response(self.allspice_client, f)
1065            for f in self.allspice_client.requests_get(url, data)
1066        ]
1067        return result

Get the metadata for all files in the root directory.

allspice.allspice.io/api/swagger#/repository/repoGetContentsList">https://huballspice.allspice.io/api/swagger#/repository/repoGetContentsList

Parameters
  • ref: branch or commit to get content from
  • commit: commit to get content from (deprecated)
def get_tree( self, ref: Union[Branch, Commit, str, NoneType] = None, recursive: bool = False) -> List[allspice.apiobject.GitEntry]:
1069    def get_tree(self, ref: Optional[Ref] = None, recursive: bool = False) -> List[GitEntry]:
1070        """
1071        Get the repository's tree on a given ref.
1072
1073        By default, this will only return the top-level entries in the tree. If you want
1074        to get the entire tree, set `recursive` to True.
1075
1076        :param ref: The ref to get the tree from. If not provided, the default branch is used.
1077        :param recursive: Whether to get the entire tree or just the top-level entries.
1078        """
1079
1080        ref = Util.data_params_for_ref(ref).get("ref", self.default_branch)
1081        url = self.REPO_GET_TREE.format(owner=self.owner.username, repo=self.name, ref=ref)
1082        params = {"recursive": recursive}
1083        results = self.allspice_client.requests_get_paginated(url, params=params)
1084        return [GitEntry.parse_response(self.allspice_client, result) for result in results]

Get the repository's tree on a given ref.

By default, this will only return the top-level entries in the tree. If you want to get the entire tree, set recursive to True.

Parameters
  • ref: The ref to get the tree from. If not provided, the default branch is used.
  • recursive: Whether to get the entire tree or just the top-level entries.
def get_file_content( self, content: Content, ref: Union[Branch, Commit, str, NoneType] = None, commit: Optional[Commit] = None) -> Union[str, List[Content]]:
1086    def get_file_content(
1087        self,
1088        content: Content,
1089        ref: Optional[Ref] = None,
1090        commit: Optional[Commit] = None,
1091    ) -> Union[str, List["Content"]]:
1092        """https://hub.allspice.io/api/swagger#/repository/repoGetContents"""
1093        url = f"/repos/{self.owner.username}/{self.name}/contents/{content.path}"
1094        data = Util.data_params_for_ref(ref or commit)
1095
1096        if content.type == Content.FILE:
1097            return self.allspice_client.requests_get(url, data)["content"]
1098        else:
1099            return [
1100                Content.parse_response(self.allspice_client, f)
1101                for f in self.allspice_client.requests_get(url, data)
1102            ]

allspice.allspice.io/api/swagger#/repository/repoGetContents">https://huballspice.allspice.io/api/swagger#/repository/repoGetContents

def get_raw_file( self, file_path: str, ref: Union[Branch, Commit, str, NoneType] = None) -> bytes:
1104    def get_raw_file(
1105        self,
1106        file_path: str,
1107        ref: Optional[Ref] = None,
1108    ) -> bytes:
1109        """
1110        Get the raw, binary data of a single file.
1111
1112        Note 1: if the file you are requesting is a text file, you might want to
1113        use .decode() on the result to get a string. For example:
1114
1115            content = repo.get_raw_file("file.txt").decode("utf-8")
1116
1117        Note 2: this method will store the entire file in memory. If you want
1118        to download a large file, you might want to use `download_to_file`
1119        instead.
1120
1121        See https://hub.allspice.io/api/swagger#/repository/repoGetRawFile
1122
1123        :param file_path: The path to the file to get.
1124        :param ref: The branch or commit to get the file from.  If not provided,
1125            the default branch is used.
1126        """
1127
1128        url = self.REPO_GET_RAW_FILE.format(
1129            owner=self.owner.username,
1130            repo=self.name,
1131            path=file_path,
1132        )
1133        params = Util.data_params_for_ref(ref)
1134        return self.allspice_client.requests_get_raw(url, params=params)

Get the raw, binary data of a single file.

Note 1: if the file you are requesting is a text file, you might want to use .decode() on the result to get a string. For example:

content = repo.get_raw_file("file.txt").decode("utf-8")

Note 2: this method will store the entire file in memory. If you want to download a large file, you might want to use download_to_file instead.

See allspice.allspice.io/api/swagger#/repository/repoGetRawFile">https://huballspice.allspice.io/api/swagger#/repository/repoGetRawFile

Parameters
  • file_path: The path to the file to get.
  • ref: The branch or commit to get the file from. If not provided, the default branch is used.
def download_to_file( self, file_path: str, io: <class 'IO'>, ref: Union[Branch, Commit, str, NoneType] = None) -> None:
1136    def download_to_file(
1137        self,
1138        file_path: str,
1139        io: IO,
1140        ref: Optional[Ref] = None,
1141    ) -> None:
1142        """
1143        Download the binary data of a file to a file-like object.
1144
1145        Example:
1146
1147            with open("schematic.DSN", "wb") as f:
1148                Repository.download_to_file("Schematics/my_schematic.DSN", f)
1149
1150        :param file_path: The path to the file in the repository from the root
1151            of the repository.
1152        :param io: The file-like object to write the data to.
1153        """
1154
1155        url = self.allspice_client._AllSpice__get_url(
1156            self.REPO_GET_RAW_FILE.format(
1157                owner=self.owner.username,
1158                repo=self.name,
1159                path=file_path,
1160            )
1161        )
1162        params = Util.data_params_for_ref(ref)
1163        response = self.allspice_client.requests.get(
1164            url,
1165            params=params,
1166            headers=self.allspice_client.headers,
1167            stream=True,
1168        )
1169
1170        for chunk in response.iter_content(chunk_size=4096):
1171            if chunk:
1172                io.write(chunk)

Download the binary data of a file to a file-like object.

Example:

with open("schematic.DSN", "wb") as f:
    Repository.download_to_file("Schematics/my_schematic.DSN", f)
Parameters
  • file_path: The path to the file in the repository from the root of the repository.
  • io: The file-like object to write the data to.
def get_generated_json( self, content: Union[Content, str], ref: Union[Branch, Commit, str, NoneType] = None) -> dict:
1174    def get_generated_json(self, content: Union[Content, str], ref: Optional[Ref] = None) -> dict:
1175        """
1176        Get the json blob for a cad file if it exists, otherwise enqueue
1177        a new job and return a 503 status.
1178
1179        WARNING: This is still experimental and not recommended for critical
1180        applications. The structure and content of the returned dictionary can
1181        change at any time.
1182
1183        See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON
1184        """
1185
1186        if isinstance(content, Content):
1187            content = content.path
1188
1189        url = self.REPO_GET_ALLSPICE_JSON.format(
1190            owner=self.owner.username,
1191            repo=self.name,
1192            content=content,
1193        )
1194        data = Util.data_params_for_ref(ref)
1195        return self.allspice_client.requests_get(url, data)

Get the json blob for a cad file if it exists, otherwise enqueue a new job and return a 503 status.

WARNING: This is still experimental and not recommended for critical applications. The structure and content of the returned dictionary can change at any time.

See allspice.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON">https://huballspice.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON

def get_generated_svg( self, content: Union[Content, str], ref: Union[Branch, Commit, str, NoneType] = None) -> bytes:
1197    def get_generated_svg(self, content: Union[Content, str], ref: Optional[Ref] = None) -> bytes:
1198        """
1199        Get the svg blob for a cad file if it exists, otherwise enqueue
1200        a new job and return a 503 status.
1201
1202        WARNING: This is still experimental and not yet recommended for
1203        critical applications. The content of the returned svg can change
1204        at any time.
1205
1206        See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG
1207        """
1208
1209        if isinstance(content, Content):
1210            content = content.path
1211
1212        url = self.REPO_GET_ALLSPICE_SVG.format(
1213            owner=self.owner.username,
1214            repo=self.name,
1215            content=content,
1216        )
1217        data = Util.data_params_for_ref(ref)
1218        return self.allspice_client.requests_get_raw(url, data)

Get the svg blob for a cad file if it exists, otherwise enqueue a new job and return a 503 status.

WARNING: This is still experimental and not yet recommended for critical applications. The content of the returned svg can change at any time.

See allspice.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG">https://huballspice.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG

def create_file(self, file_path: str, content: str, data: Optional[dict] = None):
1220    def create_file(self, file_path: str, content: str, data: Optional[dict] = None):
1221        """https://hub.allspice.io/api/swagger#/repository/repoCreateFile"""
1222        if not data:
1223            data = {}
1224        url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}"
1225        data.update({"content": content})
1226        return self.allspice_client.requests_post(url, data)

allspice.allspice.io/api/swagger#/repository/repoCreateFile">https://huballspice.allspice.io/api/swagger#/repository/repoCreateFile

def change_file( self, file_path: str, file_sha: str, content: str, data: Optional[dict] = None):
1228    def change_file(self, file_path: str, file_sha: str, content: str, data: Optional[dict] = None):
1229        """https://hub.allspice.io/api/swagger#/repository/repoCreateFile"""
1230        if not data:
1231            data = {}
1232        url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}"
1233        data.update({"sha": file_sha, "content": content})
1234        return self.allspice_client.requests_put(url, data)

allspice.allspice.io/api/swagger#/repository/repoCreateFile">https://huballspice.allspice.io/api/swagger#/repository/repoCreateFile

def delete_file(self, file_path: str, file_sha: str, data: Optional[dict] = None):
1236    def delete_file(self, file_path: str, file_sha: str, data: Optional[dict] = None):
1237        """https://hub.allspice.io/api/swagger#/repository/repoDeleteFile"""
1238        if not data:
1239            data = {}
1240        url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}"
1241        data.update({"sha": file_sha})
1242        return self.allspice_client.requests_delete(url, data)

allspice.allspice.io/api/swagger#/repository/repoDeleteFile">https://huballspice.allspice.io/api/swagger#/repository/repoDeleteFile

def get_archive( self, ref: Union[Branch, Commit, str] = 'main', archive_format: Repository.ArchiveFormat = <ArchiveFormat.ZIP: 'zip'>) -> bytes:
1244    def get_archive(
1245        self,
1246        ref: Ref = "main",
1247        archive_format: ArchiveFormat = ArchiveFormat.ZIP,
1248    ) -> bytes:
1249        """
1250        Download all the files in a specific ref of a repository as a zip or tarball
1251        archive.
1252
1253        https://hub.allspice.io/api/swagger#/repository/repoGetArchive
1254
1255        :param ref: branch or commit to get content from, defaults to the "main" branch
1256        :param archive_format: zip or tar, defaults to zip
1257        """
1258
1259        ref_string = Util.data_params_for_ref(ref)["ref"]
1260        url = self.REPO_GET_ARCHIVE.format(
1261            owner=self.owner.username,
1262            repo=self.name,
1263            ref=ref_string,
1264            format=archive_format.value,
1265        )
1266        return self.allspice_client.requests_get_raw(url)

Download all the files in a specific ref of a repository as a zip or tarball archive.

allspice.allspice.io/api/swagger#/repository/repoGetArchive">https://huballspice.allspice.io/api/swagger#/repository/repoGetArchive

Parameters
  • ref: branch or commit to get content from, defaults to the "main" branch
  • archive_format: zip or tar, defaults to zip
def get_topics(self) -> list[str]:
1268    def get_topics(self) -> list[str]:
1269        """
1270        Gets the list of topics on this repository.
1271
1272        See http://localhost:3000/api/swagger#/repository/repoListTopics
1273        """
1274
1275        url = self.REPO_GET_TOPICS.format(
1276            owner=self.owner.username,
1277            repo=self.name,
1278        )
1279        return self.allspice_client.requests_get(url)["topics"]

Gets the list of topics on this repository.

See http://localhost:3000/api/swagger#/repository/repoListTopics

def add_topic(self, topic: str):
1281    def add_topic(self, topic: str):
1282        """
1283        Adds a topic to the repository.
1284
1285        See https://hub.allspice.io/api/swagger#/repository/repoAddTopic
1286
1287        :param topic: The topic to add. Topic names must consist only of
1288            lowercase letters, numnbers and dashes (-), and cannot start with
1289            dashes. Topic names also must be under 35 characters long.
1290        """
1291
1292        url = self.REPO_ADD_TOPIC.format(owner=self.owner.username, repo=self.name, topic=topic)
1293        self.allspice_client.requests_put(url)

Adds a topic to the repository.

See allspice.allspice.io/api/swagger#/repository/repoAddTopic">https://huballspice.allspice.io/api/swagger#/repository/repoAddTopic

Parameters
  • topic: The topic to add. Topic names must consist only of lowercase letters, numnbers and dashes (-), and cannot start with dashes. Topic names also must be under 35 characters long.
def create_release( self, tag_name: str, name: Optional[str] = None, body: Optional[str] = None, draft: bool = False):
1295    def create_release(
1296        self,
1297        tag_name: str,
1298        name: Optional[str] = None,
1299        body: Optional[str] = None,
1300        draft: bool = False,
1301    ):
1302        """
1303        Create a release for this repository. The release will be created for
1304        the tag with the given name. If there is no tag with this name, create
1305        the tag first.
1306
1307        See https://hub.allspice.io/api/swagger#/repository/repoCreateRelease
1308        """
1309
1310        url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name)
1311        data = {
1312            "tag_name": tag_name,
1313            "draft": draft,
1314        }
1315        if name is not None:
1316            data["name"] = name
1317        if body is not None:
1318            data["body"] = body
1319        response = self.allspice_client.requests_post(url, data)
1320        return Release.parse_response(self.allspice_client, response, self)

Create a release for this repository. The release will be created for the tag with the given name. If there is no tag with this name, create the tag first.

See allspice.allspice.io/api/swagger#/repository/repoCreateRelease">https://huballspice.allspice.io/api/swagger#/repository/repoCreateRelease

def get_releases( self, draft: Optional[bool] = None, pre_release: Optional[bool] = None) -> List[Release]:
1322    def get_releases(
1323        self, draft: Optional[bool] = None, pre_release: Optional[bool] = None
1324    ) -> List[Release]:
1325        """
1326        Get the list of releases for this repository.
1327
1328        See https://hub.allspice.io/api/swagger#/repository/repoListReleases
1329        """
1330
1331        data = {}
1332
1333        if draft is not None:
1334            data["draft"] = draft
1335        if pre_release is not None:
1336            data["pre-release"] = pre_release
1337
1338        url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name)
1339        responses = self.allspice_client.requests_get_paginated(url, params=data)
1340
1341        return [
1342            Release.parse_response(self.allspice_client, response, self) for response in responses
1343        ]

Get the list of releases for this repository.

See allspice.allspice.io/api/swagger#/repository/repoListReleases">https://huballspice.allspice.io/api/swagger#/repository/repoListReleases

def get_latest_release(self) -> Release:
1345    def get_latest_release(self) -> Release:
1346        """
1347        Get the latest release for this repository.
1348
1349        See https://hub.allspice.io/api/swagger#/repository/repoGetLatestRelease
1350        """
1351
1352        url = self.REPO_GET_LATEST_RELEASE.format(owner=self.owner.username, repo=self.name)
1353        response = self.allspice_client.requests_get(url)
1354        release = Release.parse_response(self.allspice_client, response, self)
1355        return release

Get the latest release for this repository.

See allspice.allspice.io/api/swagger#/repository/repoGetLatestRelease">https://huballspice.allspice.io/api/swagger#/repository/repoGetLatestRelease

def get_release_by_tag(self, tag: str) -> Release:
1357    def get_release_by_tag(self, tag: str) -> Release:
1358        """
1359        Get a release by its tag.
1360
1361        See https://hub.allspice.io/api/swagger#/repository/repoGetReleaseByTag
1362        """
1363
1364        url = self.REPO_GET_RELEASE_BY_TAG.format(
1365            owner=self.owner.username, repo=self.name, tag=tag
1366        )
1367        response = self.allspice_client.requests_get(url)
1368        release = Release.parse_response(self.allspice_client, response, self)
1369        return release

Get a release by its tag.

See allspice.allspice.io/api/swagger#/repository/repoGetReleaseByTag">https://huballspice.allspice.io/api/swagger#/repository/repoGetReleaseByTag

def get_commit_statuses( self, commit: Union[str, Commit], sort: Optional[Repository.CommitStatusSort] = None, state: Optional[allspice.apiobject.CommitStatusState] = None) -> List[allspice.apiobject.CommitStatus]:
1371    def get_commit_statuses(
1372        self,
1373        commit: Union[str, Commit],
1374        sort: Optional[CommitStatusSort] = None,
1375        state: Optional[CommitStatusState] = None,
1376    ) -> List[CommitStatus]:
1377        """
1378        Get a list of statuses for a commit.
1379
1380        This is roughly equivalent to the Commit.get_statuses method, but this
1381        method allows you to sort and filter commits and is more convenient if
1382        you have a commit SHA and don't need to get the commit itself.
1383
1384        See https://hub.allspice.io/api/swagger#/repository/repoListStatuses
1385        """
1386
1387        if isinstance(commit, Commit):
1388            commit = commit.sha
1389
1390        params = {}
1391        if sort is not None:
1392            params["sort"] = sort.value
1393        if state is not None:
1394            params["state"] = state.value
1395
1396        url = self.REPO_GET_COMMIT_STATUS.format(
1397            owner=self.owner.username, repo=self.name, sha=commit
1398        )
1399        response = self.allspice_client.requests_get_paginated(url, params=params)
1400        return [CommitStatus.parse_response(self.allspice_client, status) for status in response]

Get a list of statuses for a commit.

This is roughly equivalent to the Commit.get_statuses method, but this method allows you to sort and filter commits and is more convenient if you have a commit SHA and don't need to get the commit itself.

See allspice.allspice.io/api/swagger#/repository/repoListStatuses">https://huballspice.allspice.io/api/swagger#/repository/repoListStatuses

def create_commit_status( self, commit: Union[str, Commit], context: Optional[str] = None, description: Optional[str] = None, state: Optional[allspice.apiobject.CommitStatusState] = None, target_url: Optional[str] = None) -> allspice.apiobject.CommitStatus:
1402    def create_commit_status(
1403        self,
1404        commit: Union[str, Commit],
1405        context: Optional[str] = None,
1406        description: Optional[str] = None,
1407        state: Optional[CommitStatusState] = None,
1408        target_url: Optional[str] = None,
1409    ) -> CommitStatus:
1410        """
1411        Create a status on a commit.
1412
1413        See https://hub.allspice.io/api/swagger#/repository/repoCreateStatus
1414        """
1415
1416        if isinstance(commit, Commit):
1417            commit = commit.sha
1418
1419        data = {}
1420        if context is not None:
1421            data["context"] = context
1422        if description is not None:
1423            data["description"] = description
1424        if state is not None:
1425            data["state"] = state.value
1426        if target_url is not None:
1427            data["target_url"] = target_url
1428
1429        url = self.REPO_GET_COMMIT_STATUS.format(
1430            owner=self.owner.username, repo=self.name, sha=commit
1431        )
1432        response = self.allspice_client.requests_post(url, data=data)
1433        return CommitStatus.parse_response(self.allspice_client, response)

Create a status on a commit.

See allspice.allspice.io/api/swagger#/repository/repoCreateStatus">https://huballspice.allspice.io/api/swagger#/repository/repoCreateStatus

def delete(self):
1435    def delete(self):
1436        self.allspice_client.requests_delete(
1437            Repository.REPO_DELETE % (self.owner.username, self.name)
1438        )
1439        self.deleted = True
class Repository.ArchiveFormat(enum.Enum):
560    class ArchiveFormat(Enum):
561        """
562        Archive formats for Repository.get_archive
563        """
564
565        TAR = "tar.gz"
566        ZIP = "zip"

Archive formats for Repository.get_archive

TAR = <ArchiveFormat.TAR: 'tar.gz'>
ZIP = <ArchiveFormat.ZIP: 'zip'>
Inherited Members
enum.Enum
name
value
class Repository.CommitStatusSort(enum.Enum):
568    class CommitStatusSort(Enum):
569        """
570        Sort order for Repository.get_commit_status
571        """
572
573        OLDEST = "oldest"
574        RECENT_UPDATE = "recentupdate"
575        LEAST_UPDATE = "leastupdate"
576        LEAST_INDEX = "leastindex"
577        HIGHEST_INDEX = "highestindex"

Sort order for Repository.get_commit_status

OLDEST = <CommitStatusSort.OLDEST: 'oldest'>
RECENT_UPDATE = <CommitStatusSort.RECENT_UPDATE: 'recentupdate'>
LEAST_UPDATE = <CommitStatusSort.LEAST_UPDATE: 'leastupdate'>
LEAST_INDEX = <CommitStatusSort.LEAST_INDEX: 'leastindex'>
HIGHEST_INDEX = <CommitStatusSort.HIGHEST_INDEX: 'highestindex'>
Inherited Members
enum.Enum
name
value
class Branch(allspice.baseapiobject.ReadonlyApiObject):
415class Branch(ReadonlyApiObject):
416    commit: Dict[str, Optional[Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]]]
417    effective_branch_protection_name: str
418    enable_status_check: bool
419    name: str
420    protected: bool
421    required_approvals: int
422    status_check_contexts: List[Any]
423    user_can_merge: bool
424    user_can_push: bool
425
426    API_OBJECT = """/repos/{owner}/{repo}/branches/{branch}"""
427
428    def __init__(self, allspice_client):
429        super().__init__(allspice_client)
430
431    def __eq__(self, other):
432        if not isinstance(other, Branch):
433            return False
434        return self.commit == other.commit and self.name == other.name
435
436    def __hash__(self):
437        return hash(self.commit["id"]) ^ hash(self.name)
438
439    _fields_to_parsers: ClassVar[dict] = {
440        # This is not a commit object
441        # "commit": lambda allspice_client, c: Commit.parse_response(allspice_client, c)
442    }
443
444    @classmethod
445    def request(cls, allspice_client, owner: str, repo: str, branch: str):
446        return cls._request(allspice_client, {"owner": owner, "repo": repo, "branch": branch})
Branch(allspice_client)
428    def __init__(self, allspice_client):
429        super().__init__(allspice_client)
commit: Dict[str, Union[str, Dict[str, str], Dict[str, Union[bool, str, NoneType]], NoneType]]
effective_branch_protection_name: str
enable_status_check: bool
name: str
protected: bool
required_approvals: int
status_check_contexts: List[Any]
user_can_merge: bool
user_can_push: bool
API_OBJECT = '/repos/{owner}/{repo}/branches/{branch}'
@classmethod
def request(cls, allspice_client, owner: str, repo: str, branch: str):
444    @classmethod
445    def request(cls, allspice_client, owner: str, repo: str, branch: str):
446        return cls._request(allspice_client, {"owner": owner, "repo": repo, "branch": branch})
class NotFoundException(builtins.Exception):
6class NotFoundException(Exception):
7    pass

Common base class for all non-exit exceptions.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
add_note
args
class AlreadyExistsException(builtins.Exception):
2class AlreadyExistsException(Exception):
3    pass

Common base class for all non-exit exceptions.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
add_note
args
class Issue(allspice.baseapiobject.ApiObject):
1873class Issue(ApiObject):
1874    assets: List[Any]
1875    assignee: Any
1876    assignees: Any
1877    body: str
1878    closed_at: Any
1879    comments: int
1880    created_at: str
1881    due_date: Any
1882    html_url: str
1883    id: int
1884    is_locked: bool
1885    labels: List[Any]
1886    milestone: Optional["Milestone"]
1887    number: int
1888    original_author: str
1889    original_author_id: int
1890    pin_order: int
1891    pull_request: Any
1892    ref: str
1893    repository: Dict[str, Union[int, str]]
1894    state: str
1895    title: str
1896    updated_at: str
1897    url: str
1898    user: User
1899
1900    API_OBJECT = """/repos/{owner}/{repo}/issues/{index}"""  # <owner, repo, index>
1901    GET_TIME = """/repos/%s/%s/issues/%s/times"""  # <owner, repo, index>
1902    GET_COMMENTS = """/repos/{owner}/{repo}/issues/{index}/comments"""
1903    CREATE_ISSUE = """/repos/{owner}/{repo}/issues"""
1904
1905    OPENED = "open"
1906    CLOSED = "closed"
1907
1908    def __init__(self, allspice_client):
1909        super().__init__(allspice_client)
1910
1911    def __eq__(self, other):
1912        if not isinstance(other, Issue):
1913            return False
1914        return self.repository == other.repository and self.id == other.id
1915
1916    def __hash__(self):
1917        return hash(self.repository) ^ hash(self.id)
1918
1919    _fields_to_parsers: ClassVar[dict] = {
1920        "milestone": lambda allspice_client, m: Milestone.parse_response(allspice_client, m),
1921        "user": lambda allspice_client, u: User.parse_response(allspice_client, u),
1922        "assignee": lambda allspice_client, u: User.parse_response(allspice_client, u),
1923        "assignees": lambda allspice_client, us: [
1924            User.parse_response(allspice_client, u) for u in us
1925        ],
1926        "state": lambda _, s: (Issue.CLOSED if s == "closed" else Issue.OPENED),
1927    }
1928
1929    _parsers_to_fields: ClassVar[dict] = {
1930        "milestone": lambda m: m.id,
1931    }
1932
1933    _patchable_fields: ClassVar[set[str]] = {
1934        "assignee",
1935        "assignees",
1936        "body",
1937        "due_date",
1938        "milestone",
1939        "state",
1940        "title",
1941    }
1942
1943    def commit(self):
1944        args = {
1945            "owner": self.repository.owner.username,
1946            "repo": self.repository.name,
1947            "index": self.number,
1948        }
1949        self._commit(args)
1950
1951    @classmethod
1952    def request(cls, allspice_client, owner: str, repo: str, number: str):
1953        api_object = cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number})
1954        # The repository in the response is a RepositoryMeta object, so request
1955        # the full repository object and add it to the issue object.
1956        repository = Repository.request(allspice_client, owner, repo)
1957        setattr(api_object, "_repository", repository)
1958        # For legacy reasons
1959        cls._add_read_property("repo", repository, api_object)
1960        return api_object
1961
1962    @classmethod
1963    def create_issue(cls, allspice_client, repo: Repository, title: str, body: str = ""):
1964        args = {"owner": repo.owner.username, "repo": repo.name}
1965        data = {"title": title, "body": body}
1966        result = allspice_client.requests_post(Issue.CREATE_ISSUE.format(**args), data=data)
1967        issue = Issue.parse_response(allspice_client, result)
1968        setattr(issue, "_repository", repo)
1969        cls._add_read_property("repo", repo, issue)
1970        return issue
1971
1972    @property
1973    def owner(self) -> Organization | User:
1974        return self.repository.owner
1975
1976    def get_time_sum(self, user: User) -> int:
1977        results = self.allspice_client.requests_get(
1978            Issue.GET_TIME % (self.owner.username, self.repository.name, self.number)
1979        )
1980        return sum(result["time"] for result in results if result and result["user_id"] == user.id)
1981
1982    def get_times(self) -> Optional[Dict]:
1983        return self.allspice_client.requests_get(
1984            Issue.GET_TIME % (self.owner.username, self.repository.name, self.number)
1985        )
1986
1987    def delete_time(self, time_id: str):
1988        path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times/{time_id}"
1989        self.allspice_client.requests_delete(path)
1990
1991    def add_time(self, time: int, created: Optional[str] = None, user_name: Optional[User] = None):
1992        path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times"
1993        self.allspice_client.requests_post(
1994            path, data={"created": created, "time": int(time), "user_name": user_name}
1995        )
1996
1997    def get_comments(self) -> List[Comment]:
1998        """https://hub.allspice.io/api/swagger#/issue/issueGetComments"""
1999
2000        results = self.allspice_client.requests_get(
2001            self.GET_COMMENTS.format(
2002                owner=self.owner.username, repo=self.repository.name, index=self.number
2003            )
2004        )
2005
2006        return [Comment.parse_response(self.allspice_client, result) for result in results]
2007
2008    def create_comment(self, body: str) -> Comment:
2009        """https://hub.allspice.io/api/swagger#/issue/issueCreateComment"""
2010
2011        path = self.GET_COMMENTS.format(
2012            owner=self.owner.username, repo=self.repository.name, index=self.number
2013        )
2014
2015        response = self.allspice_client.requests_post(path, data={"body": body})
2016        return Comment.parse_response(self.allspice_client, response)
Issue(allspice_client)
1908    def __init__(self, allspice_client):
1909        super().__init__(allspice_client)
assets: List[Any]
assignee: Any
assignees: Any
body: str
closed_at: Any
comments: int
created_at: str
due_date: Any
html_url: str
id: int
is_locked: bool
labels: List[Any]
milestone: Optional[Milestone]
number: int
original_author: str
original_author_id: int
pin_order: int
pull_request: Any
ref: str
repository: Dict[str, Union[int, str]]
state: str
title: str
updated_at: str
url: str
user: User
API_OBJECT = '/repos/{owner}/{repo}/issues/{index}'
GET_TIME = '/repos/%s/%s/issues/%s/times'
GET_COMMENTS = '/repos/{owner}/{repo}/issues/{index}/comments'
CREATE_ISSUE = '/repos/{owner}/{repo}/issues'
OPENED = 'open'
CLOSED = 'closed'
def commit(self):
1943    def commit(self):
1944        args = {
1945            "owner": self.repository.owner.username,
1946            "repo": self.repository.name,
1947            "index": self.number,
1948        }
1949        self._commit(args)
@classmethod
def request(cls, allspice_client, owner: str, repo: str, number: str):
1951    @classmethod
1952    def request(cls, allspice_client, owner: str, repo: str, number: str):
1953        api_object = cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number})
1954        # The repository in the response is a RepositoryMeta object, so request
1955        # the full repository object and add it to the issue object.
1956        repository = Repository.request(allspice_client, owner, repo)
1957        setattr(api_object, "_repository", repository)
1958        # For legacy reasons
1959        cls._add_read_property("repo", repository, api_object)
1960        return api_object
@classmethod
def create_issue( cls, allspice_client, repo: Repository, title: str, body: str = ''):
1962    @classmethod
1963    def create_issue(cls, allspice_client, repo: Repository, title: str, body: str = ""):
1964        args = {"owner": repo.owner.username, "repo": repo.name}
1965        data = {"title": title, "body": body}
1966        result = allspice_client.requests_post(Issue.CREATE_ISSUE.format(**args), data=data)
1967        issue = Issue.parse_response(allspice_client, result)
1968        setattr(issue, "_repository", repo)
1969        cls._add_read_property("repo", repo, issue)
1970        return issue
owner: Organization | User
1972    @property
1973    def owner(self) -> Organization | User:
1974        return self.repository.owner
def get_time_sum(self, user: User) -> int:
1976    def get_time_sum(self, user: User) -> int:
1977        results = self.allspice_client.requests_get(
1978            Issue.GET_TIME % (self.owner.username, self.repository.name, self.number)
1979        )
1980        return sum(result["time"] for result in results if result and result["user_id"] == user.id)
def get_times(self) -> Optional[Dict]:
1982    def get_times(self) -> Optional[Dict]:
1983        return self.allspice_client.requests_get(
1984            Issue.GET_TIME % (self.owner.username, self.repository.name, self.number)
1985        )
def delete_time(self, time_id: str):
1987    def delete_time(self, time_id: str):
1988        path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times/{time_id}"
1989        self.allspice_client.requests_delete(path)
def add_time( self, time: int, created: Optional[str] = None, user_name: Optional[User] = None):
1991    def add_time(self, time: int, created: Optional[str] = None, user_name: Optional[User] = None):
1992        path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times"
1993        self.allspice_client.requests_post(
1994            path, data={"created": created, "time": int(time), "user_name": user_name}
1995        )
def get_comments(self) -> List[Comment]:
1997    def get_comments(self) -> List[Comment]:
1998        """https://hub.allspice.io/api/swagger#/issue/issueGetComments"""
1999
2000        results = self.allspice_client.requests_get(
2001            self.GET_COMMENTS.format(
2002                owner=self.owner.username, repo=self.repository.name, index=self.number
2003            )
2004        )
2005
2006        return [Comment.parse_response(self.allspice_client, result) for result in results]

allspice.allspice.io/api/swagger#/issue/issueGetComments">https://huballspice.allspice.io/api/swagger#/issue/issueGetComments

def create_comment(self, body: str) -> Comment:
2008    def create_comment(self, body: str) -> Comment:
2009        """https://hub.allspice.io/api/swagger#/issue/issueCreateComment"""
2010
2011        path = self.GET_COMMENTS.format(
2012            owner=self.owner.username, repo=self.repository.name, index=self.number
2013        )
2014
2015        response = self.allspice_client.requests_post(path, data={"body": body})
2016        return Comment.parse_response(self.allspice_client, response)

allspice.allspice.io/api/swagger#/issue/issueCreateComment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateComment

class Milestone(allspice.baseapiobject.ApiObject):
1442class Milestone(ApiObject):
1443    allow_merge_commits: Any
1444    allow_rebase: Any
1445    allow_rebase_explicit: Any
1446    allow_squash_merge: Any
1447    archived: Any
1448    closed_at: Any
1449    closed_issues: int
1450    created_at: str
1451    default_branch: Any
1452    description: str
1453    due_on: Any
1454    has_issues: Any
1455    has_pull_requests: Any
1456    has_wiki: Any
1457    id: int
1458    ignore_whitespace_conflicts: Any
1459    name: Any
1460    open_issues: int
1461    private: Any
1462    state: str
1463    title: str
1464    updated_at: str
1465    website: Any
1466
1467    API_OBJECT = """/repos/{owner}/{repo}/milestones/{number}"""  # <owner, repo>
1468
1469    def __init__(self, allspice_client):
1470        super().__init__(allspice_client)
1471
1472    def __eq__(self, other):
1473        if not isinstance(other, Milestone):
1474            return False
1475        return self.allspice_client == other.allspice_client and self.id == other.id
1476
1477    def __hash__(self):
1478        return hash(self.allspice_client) ^ hash(self.id)
1479
1480    _fields_to_parsers: ClassVar[dict] = {
1481        "closed_at": lambda _, t: Util.convert_time(t),
1482        "due_on": lambda _, t: Util.convert_time(t),
1483    }
1484
1485    _patchable_fields: ClassVar[set[str]] = {
1486        "allow_merge_commits",
1487        "allow_rebase",
1488        "allow_rebase_explicit",
1489        "allow_squash_merge",
1490        "archived",
1491        "default_branch",
1492        "description",
1493        "has_issues",
1494        "has_pull_requests",
1495        "has_wiki",
1496        "ignore_whitespace_conflicts",
1497        "name",
1498        "private",
1499        "website",
1500    }
1501
1502    @classmethod
1503    def request(cls, allspice_client, owner: str, repo: str, number: str):
1504        return cls._request(allspice_client, {"owner": owner, "repo": repo, "number": number})
Milestone(allspice_client)
1469    def __init__(self, allspice_client):
1470        super().__init__(allspice_client)
allow_merge_commits: Any
allow_rebase: Any
allow_rebase_explicit: Any
allow_squash_merge: Any
archived: Any
closed_at: Any
closed_issues: int
created_at: str
default_branch: Any
description: str
due_on: Any
has_issues: Any
has_pull_requests: Any
has_wiki: Any
id: int
ignore_whitespace_conflicts: Any
name: Any
open_issues: int
private: Any
state: str
title: str
updated_at: str
website: Any
API_OBJECT = '/repos/{owner}/{repo}/milestones/{number}'
@classmethod
def request(cls, allspice_client, owner: str, repo: str, number: str):
1502    @classmethod
1503    def request(cls, allspice_client, owner: str, repo: str, number: str):
1504        return cls._request(allspice_client, {"owner": owner, "repo": repo, "number": number})
class Commit(allspice.baseapiobject.ReadonlyApiObject):
1698class Commit(ReadonlyApiObject):
1699    author: User
1700    commit: Dict[str, Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]]
1701    committer: Dict[str, Union[int, str, bool]]
1702    created: str
1703    files: List[Dict[str, str]]
1704    html_url: str
1705    inner_commit: Dict[str, Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]]
1706    parents: List[Union[Dict[str, str], Any]]
1707    sha: str
1708    stats: Dict[str, int]
1709    url: str
1710
1711    API_OBJECT = """/repos/{owner}/{repo}/commits/{sha}"""
1712    COMMIT_GET_STATUS = """/repos/{owner}/{repo}/commits/{sha}/status"""
1713    COMMIT_GET_STATUSES = """/repos/{owner}/{repo}/commits/{sha}/statuses"""
1714
1715    # Regex to extract owner and repo names from the url property
1716    URL_REGEXP = re.compile(r"/repos/([^/]+)/([^/]+)/git/commits")
1717
1718    def __init__(self, allspice_client):
1719        super().__init__(allspice_client)
1720
1721    _fields_to_parsers: ClassVar[dict] = {
1722        # NOTE: api may return None for commiters that are no allspice users
1723        "author": lambda allspice_client, u: (
1724            User.parse_response(allspice_client, u) if u else None
1725        )
1726    }
1727
1728    def __eq__(self, other):
1729        if not isinstance(other, Commit):
1730            return False
1731        return self.sha == other.sha
1732
1733    def __hash__(self):
1734        return hash(self.sha)
1735
1736    @classmethod
1737    def parse_response(cls, allspice_client, result) -> "Commit":
1738        commit_cache = result["commit"]
1739        api_object = cls(allspice_client)
1740        cls._initialize(allspice_client, api_object, result)
1741        # inner_commit for legacy reasons
1742        Commit._add_read_property("inner_commit", commit_cache, api_object)
1743        return api_object
1744
1745    def get_status(self) -> CommitCombinedStatus:
1746        """
1747        Get a combined status consisting of all statues on this commit.
1748
1749        Note that the returned object is a CommitCombinedStatus object, which
1750        also contains a list of all statuses on the commit.
1751
1752        https://hub.allspice.io/api/swagger#/repository/repoGetCommitStatus
1753        """
1754
1755        result = self.allspice_client.requests_get(
1756            self.COMMIT_GET_STATUS.format(**self._fields_for_path)
1757        )
1758        return CommitCombinedStatus.parse_response(self.allspice_client, result)
1759
1760    def get_statuses(self) -> List[CommitStatus]:
1761        """
1762        Get a list of all statuses on this commit.
1763
1764        https://hub.allspice.io/api/swagger#/repository/repoListCommitStatuses
1765        """
1766
1767        results = self.allspice_client.requests_get(
1768            self.COMMIT_GET_STATUSES.format(**self._fields_for_path)
1769        )
1770        return [CommitStatus.parse_response(self.allspice_client, result) for result in results]
1771
1772    @cached_property
1773    def _fields_for_path(self) -> dict[str, str]:
1774        matches = self.URL_REGEXP.search(self.url)
1775        if not matches:
1776            raise ValueError(f"Invalid commit URL: {self.url}")
1777
1778        return {
1779            "owner": matches.group(1),
1780            "repo": matches.group(2),
1781            "sha": self.sha,
1782        }
Commit(allspice_client)
1718    def __init__(self, allspice_client):
1719        super().__init__(allspice_client)
author: User
commit: Dict[str, Union[str, Dict[str, str], Dict[str, Union[bool, str, NoneType]]]]
committer: Dict[str, Union[int, str, bool]]
created: str
files: List[Dict[str, str]]
html_url: str
inner_commit: Dict[str, Union[str, Dict[str, str], Dict[str, Union[bool, str, NoneType]]]]
parents: List[Union[Dict[str, str], Any]]
sha: str
stats: Dict[str, int]
url: str
API_OBJECT = '/repos/{owner}/{repo}/commits/{sha}'
COMMIT_GET_STATUS = '/repos/{owner}/{repo}/commits/{sha}/status'
COMMIT_GET_STATUSES = '/repos/{owner}/{repo}/commits/{sha}/statuses'
URL_REGEXP = re.compile('/repos/([^/]+)/([^/]+)/git/commits')
@classmethod
def parse_response(cls, allspice_client, result) -> Commit:
1736    @classmethod
1737    def parse_response(cls, allspice_client, result) -> "Commit":
1738        commit_cache = result["commit"]
1739        api_object = cls(allspice_client)
1740        cls._initialize(allspice_client, api_object, result)
1741        # inner_commit for legacy reasons
1742        Commit._add_read_property("inner_commit", commit_cache, api_object)
1743        return api_object
def get_status(self) -> allspice.apiobject.CommitCombinedStatus:
1745    def get_status(self) -> CommitCombinedStatus:
1746        """
1747        Get a combined status consisting of all statues on this commit.
1748
1749        Note that the returned object is a CommitCombinedStatus object, which
1750        also contains a list of all statuses on the commit.
1751
1752        https://hub.allspice.io/api/swagger#/repository/repoGetCommitStatus
1753        """
1754
1755        result = self.allspice_client.requests_get(
1756            self.COMMIT_GET_STATUS.format(**self._fields_for_path)
1757        )
1758        return CommitCombinedStatus.parse_response(self.allspice_client, result)

Get a combined status consisting of all statues on this commit.

Note that the returned object is a CommitCombinedStatus object, which also contains a list of all statuses on the commit.

allspice.allspice.io/api/swagger#/repository/repoGetCommitStatus">https://huballspice.allspice.io/api/swagger#/repository/repoGetCommitStatus

def get_statuses(self) -> List[allspice.apiobject.CommitStatus]:
1760    def get_statuses(self) -> List[CommitStatus]:
1761        """
1762        Get a list of all statuses on this commit.
1763
1764        https://hub.allspice.io/api/swagger#/repository/repoListCommitStatuses
1765        """
1766
1767        results = self.allspice_client.requests_get(
1768            self.COMMIT_GET_STATUSES.format(**self._fields_for_path)
1769        )
1770        return [CommitStatus.parse_response(self.allspice_client, result) for result in results]

Get a list of all statuses on this commit.

allspice.allspice.io/api/swagger#/repository/repoListCommitStatuses">https://huballspice.allspice.io/api/swagger#/repository/repoListCommitStatuses

class Comment(allspice.baseapiobject.ApiObject):
1558class Comment(ApiObject):
1559    assets: List[Union[Any, Dict[str, Union[int, str]]]]
1560    body: str
1561    created_at: datetime
1562    html_url: str
1563    id: int
1564    issue_url: str
1565    original_author: str
1566    original_author_id: int
1567    pull_request_url: str
1568    updated_at: datetime
1569    user: User
1570
1571    API_OBJECT = """/repos/{owner}/{repo}/issues/comments/{id}"""
1572    GET_ATTACHMENTS_PATH = """/repos/{owner}/{repo}/issues/comments/{id}/assets"""
1573    ATTACHMENT_PATH = """/repos/{owner}/{repo}/issues/comments/{id}/assets/{attachment_id}"""
1574
1575    def __init__(self, allspice_client):
1576        super().__init__(allspice_client)
1577
1578    def __eq__(self, other):
1579        if not isinstance(other, Comment):
1580            return False
1581        return self.repository == other.repository and self.id == other.id
1582
1583    def __hash__(self):
1584        return hash(self.repository) ^ hash(self.id)
1585
1586    @classmethod
1587    def request(cls, allspice_client, owner: str, repo: str, id: str) -> "Comment":
1588        return cls._request(allspice_client, {"owner": owner, "repo": repo, "id": id})
1589
1590    _fields_to_parsers: ClassVar[dict] = {
1591        "user": lambda allspice_client, r: User.parse_response(allspice_client, r),
1592        "created_at": lambda _, t: Util.convert_time(t),
1593        "updated_at": lambda _, t: Util.convert_time(t),
1594    }
1595
1596    _patchable_fields: ClassVar[set[str]] = {"body"}
1597
1598    @property
1599    def parent_url(self) -> str:
1600        """URL of the parent of this comment (the issue or the pull request)"""
1601
1602        if self.issue_url is not None and self.issue_url != "":
1603            return self.issue_url
1604        else:
1605            return self.pull_request_url
1606
1607    @cached_property
1608    def repository(self) -> Repository:
1609        """The repository this comment was posted on."""
1610
1611        owner_name, repo_name = self.parent_url.split("/")[-4:-2]
1612        return Repository.request(self.allspice_client, owner_name, repo_name)
1613
1614    def __fields_for_path(self):
1615        return {
1616            "owner": self.repository.owner.username,
1617            "repo": self.repository.name,
1618            "id": self.id,
1619        }
1620
1621    def commit(self):
1622        self._commit(self.__fields_for_path())
1623
1624    def delete(self):
1625        self.allspice_client.requests_delete(self.API_OBJECT.format(**self.__fields_for_path()))
1626        self.deleted = True
1627
1628    def get_attachments(self) -> List[Attachment]:
1629        """
1630        Get all attachments on this comment. This returns Attachment objects, which
1631        contain a link to download the attachment.
1632
1633        https://hub.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments
1634        """
1635
1636        results = self.allspice_client.requests_get(
1637            self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path())
1638        )
1639        return [Attachment.parse_response(self.allspice_client, result) for result in results]
1640
1641    def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment:
1642        """
1643        Create an attachment on this comment.
1644
1645        https://hub.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment
1646
1647        :param file: The file to attach. This should be a file-like object.
1648        :param name: The name of the file. If not provided, the name of the file will be
1649                     used.
1650        :return: The created attachment.
1651        """
1652
1653        args: dict[str, Any] = {
1654            "files": {"attachment": file},
1655        }
1656        if name is not None:
1657            args["params"] = {"name": name}
1658
1659        result = self.allspice_client.requests_post(
1660            self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()),
1661            **args,
1662        )
1663        return Attachment.parse_response(self.allspice_client, result)
1664
1665    def edit_attachment(self, attachment: Attachment, data: dict) -> Attachment:
1666        """
1667        Edit an attachment.
1668
1669        The list of params that can be edited is available at
1670        https://hub.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment
1671
1672        :param attachment: The attachment to be edited
1673        :param data: The data parameter should be a dictionary of the fields to edit.
1674        :return: The edited attachment
1675        """
1676
1677        args = {
1678            **self.__fields_for_path(),
1679            "attachment_id": attachment.id,
1680        }
1681        result = self.allspice_client.requests_patch(
1682            self.ATTACHMENT_PATH.format(**args),
1683            data=data,
1684        )
1685        return Attachment.parse_response(self.allspice_client, result)
1686
1687    def delete_attachment(self, attachment: Attachment):
1688        """https://hub.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment"""
1689
1690        args = {
1691            **self.__fields_for_path(),
1692            "attachment_id": attachment.id,
1693        }
1694        self.allspice_client.requests_delete(self.ATTACHMENT_PATH.format(**args))
1695        attachment.deleted = True
Comment(allspice_client)
1575    def __init__(self, allspice_client):
1576        super().__init__(allspice_client)
assets: List[Union[Any, Dict[str, Union[int, str]]]]
body: str
created_at: datetime.datetime
html_url: str
id: int
issue_url: str
original_author: str
original_author_id: int
pull_request_url: str
updated_at: datetime.datetime
user: User
API_OBJECT = '/repos/{owner}/{repo}/issues/comments/{id}'
GET_ATTACHMENTS_PATH = '/repos/{owner}/{repo}/issues/comments/{id}/assets'
ATTACHMENT_PATH = '/repos/{owner}/{repo}/issues/comments/{id}/assets/{attachment_id}'
@classmethod
def request( cls, allspice_client, owner: str, repo: str, id: str) -> Comment:
1586    @classmethod
1587    def request(cls, allspice_client, owner: str, repo: str, id: str) -> "Comment":
1588        return cls._request(allspice_client, {"owner": owner, "repo": repo, "id": id})
parent_url: str
1598    @property
1599    def parent_url(self) -> str:
1600        """URL of the parent of this comment (the issue or the pull request)"""
1601
1602        if self.issue_url is not None and self.issue_url != "":
1603            return self.issue_url
1604        else:
1605            return self.pull_request_url

URL of the parent of this comment (the issue or the pull request)

repository: Repository
1607    @cached_property
1608    def repository(self) -> Repository:
1609        """The repository this comment was posted on."""
1610
1611        owner_name, repo_name = self.parent_url.split("/")[-4:-2]
1612        return Repository.request(self.allspice_client, owner_name, repo_name)

The repository this comment was posted on.

def commit(self):
1621    def commit(self):
1622        self._commit(self.__fields_for_path())
def delete(self):
1624    def delete(self):
1625        self.allspice_client.requests_delete(self.API_OBJECT.format(**self.__fields_for_path()))
1626        self.deleted = True
def get_attachments(self) -> List[allspice.apiobject.Attachment]:
1628    def get_attachments(self) -> List[Attachment]:
1629        """
1630        Get all attachments on this comment. This returns Attachment objects, which
1631        contain a link to download the attachment.
1632
1633        https://hub.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments
1634        """
1635
1636        results = self.allspice_client.requests_get(
1637            self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path())
1638        )
1639        return [Attachment.parse_response(self.allspice_client, result) for result in results]

Get all attachments on this comment. This returns Attachment objects, which contain a link to download the attachment.

allspice.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments">https://huballspice.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments

def create_attachment( self, file: <class 'IO'>, name: Optional[str] = None) -> allspice.apiobject.Attachment:
1641    def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment:
1642        """
1643        Create an attachment on this comment.
1644
1645        https://hub.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment
1646
1647        :param file: The file to attach. This should be a file-like object.
1648        :param name: The name of the file. If not provided, the name of the file will be
1649                     used.
1650        :return: The created attachment.
1651        """
1652
1653        args: dict[str, Any] = {
1654            "files": {"attachment": file},
1655        }
1656        if name is not None:
1657            args["params"] = {"name": name}
1658
1659        result = self.allspice_client.requests_post(
1660            self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()),
1661            **args,
1662        )
1663        return Attachment.parse_response(self.allspice_client, result)

Create an attachment on this comment.

allspice.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment

Parameters
  • file: The file to attach. This should be a file-like object.
  • name: The name of the file. If not provided, the name of the file will be used.
Returns

The created attachment.

def edit_attachment( self, attachment: allspice.apiobject.Attachment, data: dict) -> allspice.apiobject.Attachment:
1665    def edit_attachment(self, attachment: Attachment, data: dict) -> Attachment:
1666        """
1667        Edit an attachment.
1668
1669        The list of params that can be edited is available at
1670        https://hub.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment
1671
1672        :param attachment: The attachment to be edited
1673        :param data: The data parameter should be a dictionary of the fields to edit.
1674        :return: The edited attachment
1675        """
1676
1677        args = {
1678            **self.__fields_for_path(),
1679            "attachment_id": attachment.id,
1680        }
1681        result = self.allspice_client.requests_patch(
1682            self.ATTACHMENT_PATH.format(**args),
1683            data=data,
1684        )
1685        return Attachment.parse_response(self.allspice_client, result)

Edit an attachment.

The list of params that can be edited is available at allspice.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment

Parameters
  • attachment: The attachment to be edited
  • data: The data parameter should be a dictionary of the fields to edit.
Returns

The edited attachment

def delete_attachment(self, attachment: allspice.apiobject.Attachment):
1687    def delete_attachment(self, attachment: Attachment):
1688        """https://hub.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment"""
1689
1690        args = {
1691            **self.__fields_for_path(),
1692            "attachment_id": attachment.id,
1693        }
1694        self.allspice_client.requests_delete(self.ATTACHMENT_PATH.format(**args))
1695        attachment.deleted = True

allspice.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment

class Content(allspice.baseapiobject.ReadonlyApiObject):
2515class Content(ReadonlyApiObject):
2516    content: Any
2517    download_url: str
2518    encoding: Any
2519    git_url: str
2520    html_url: str
2521    last_commit_sha: str
2522    name: str
2523    path: str
2524    sha: str
2525    size: int
2526    submodule_git_url: Any
2527    target: Any
2528    type: str
2529    url: str
2530
2531    FILE = "file"
2532
2533    def __init__(self, allspice_client):
2534        super().__init__(allspice_client)
2535
2536    def __eq__(self, other):
2537        if not isinstance(other, Content):
2538            return False
2539
2540        return self.sha == other.sha and self.name == other.name
2541
2542    def __hash__(self):
2543        return hash(self.sha) ^ hash(self.name)
Content(allspice_client)
2533    def __init__(self, allspice_client):
2534        super().__init__(allspice_client)
content: Any
download_url: str
encoding: Any
git_url: str
html_url: str
last_commit_sha: str
name: str
path: str
sha: str
size: int
submodule_git_url: Any
target: Any
type: str
url: str
FILE = 'file'
class DesignReview(allspice.baseapiobject.ApiObject):
2019class DesignReview(ApiObject):
2020    """
2021    A Design Review. See
2022    https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest.
2023
2024    Note: The base and head fields are not `Branch` objects - they are plain strings
2025    referring to the branch names. This is because DRs can exist for branches that have
2026    been deleted, which don't have an associated `Branch` object from the API. You can use
2027    the `Repository.get_branch` method to get a `Branch` object for a branch if you know
2028    it exists.
2029    """
2030
2031    allow_maintainer_edit: bool
2032    allow_maintainer_edits: Any
2033    assignee: User
2034    assignees: List["User"]
2035    base: str
2036    body: str
2037    closed_at: Any
2038    comments: int
2039    created_at: str
2040    diff_url: str
2041    due_date: Optional[str]
2042    head: str
2043    html_url: str
2044    id: int
2045    is_locked: bool
2046    labels: List[Any]
2047    merge_base: str
2048    merge_commit_sha: Any
2049    mergeable: bool
2050    merged: bool
2051    merged_at: Any
2052    merged_by: Any
2053    milestone: Any
2054    number: int
2055    patch_url: str
2056    pin_order: int
2057    repository: Optional["Repository"]
2058    requested_reviewers: Any
2059    state: str
2060    title: str
2061    updated_at: str
2062    url: str
2063    user: User
2064
2065    API_OBJECT = "/repos/{owner}/{repo}/pulls/{index}"
2066    MERGE_DESIGN_REVIEW = "/repos/{owner}/{repo}/pulls/{index}/merge"
2067    GET_COMMENTS = "/repos/{owner}/{repo}/issues/{index}/comments"
2068
2069    OPEN = "open"
2070    CLOSED = "closed"
2071
2072    class MergeType(Enum):
2073        MERGE = "merge"
2074        REBASE = "rebase"
2075        REBASE_MERGE = "rebase-merge"
2076        SQUASH = "squash"
2077        MANUALLY_MERGED = "manually-merged"
2078
2079    def __init__(self, allspice_client):
2080        super().__init__(allspice_client)
2081
2082    def __eq__(self, other):
2083        if not isinstance(other, DesignReview):
2084            return False
2085        return self.repository == other.repository and self.id == other.id
2086
2087    def __hash__(self):
2088        return hash(self.repository) ^ hash(self.id)
2089
2090    @classmethod
2091    def parse_response(cls, allspice_client, result) -> "DesignReview":
2092        api_object = super().parse_response(allspice_client, result)
2093        cls._add_read_property(
2094            "repository",
2095            Repository.parse_response(allspice_client, result["base"]["repo"]),
2096            api_object,
2097        )
2098
2099        return api_object
2100
2101    @classmethod
2102    def request(cls, allspice_client, owner: str, repo: str, number: str):
2103        """See https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest"""
2104        return cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number})
2105
2106    _fields_to_parsers: ClassVar[dict] = {
2107        "assignee": lambda allspice_client, u: User.parse_response(allspice_client, u),
2108        "assignees": lambda allspice_client, us: [
2109            User.parse_response(allspice_client, u) for u in us
2110        ],
2111        "base": lambda _, b: b["ref"],
2112        "head": lambda _, h: h["ref"],
2113        "merged_by": lambda allspice_client, u: User.parse_response(allspice_client, u),
2114        "milestone": lambda allspice_client, m: Milestone.parse_response(allspice_client, m),
2115        "user": lambda allspice_client, u: User.parse_response(allspice_client, u),
2116    }
2117
2118    _patchable_fields: ClassVar[set[str]] = {
2119        "allow_maintainer_edits",
2120        "assignee",
2121        "assignees",
2122        "base",
2123        "body",
2124        "due_date",
2125        "milestone",
2126        "state",
2127        "title",
2128    }
2129
2130    _parsers_to_fields: ClassVar[dict] = {
2131        "assignee": lambda u: u.username,
2132        "assignees": lambda us: [u.username for u in us],
2133        "base": lambda b: b.name if isinstance(b, Branch) else b,
2134        "milestone": lambda m: m.id,
2135    }
2136
2137    def commit(self):
2138        data = self.get_dirty_fields()
2139        if "due_date" in data and data["due_date"] is None:
2140            data["unset_due_date"] = True
2141
2142        args = {
2143            "owner": self.repository.owner.username,
2144            "repo": self.repository.name,
2145            "index": self.number,
2146        }
2147        self._commit(args, data)
2148
2149    def merge(self, merge_type: MergeType):
2150        """
2151        Merge the pull request. See
2152        https://hub.allspice.io/api/swagger#/repository/repoMergePullRequest
2153
2154        :param merge_type: The type of merge to perform. See the MergeType enum.
2155        """
2156
2157        self.allspice_client.requests_put(
2158            self.MERGE_DESIGN_REVIEW.format(
2159                owner=self.repository.owner.username,
2160                repo=self.repository.name,
2161                index=self.number,
2162            ),
2163            data={"Do": merge_type.value},
2164        )
2165
2166    def get_comments(self) -> List[Comment]:
2167        """
2168        Get the comments on this pull request, but not specifically on a review.
2169
2170        https://hub.allspice.io/api/swagger#/issue/issueGetComments
2171
2172        :return: A list of comments on this pull request.
2173        """
2174
2175        results = self.allspice_client.requests_get(
2176            self.GET_COMMENTS.format(
2177                owner=self.repository.owner.username,
2178                repo=self.repository.name,
2179                index=self.number,
2180            )
2181        )
2182        return [Comment.parse_response(self.allspice_client, result) for result in results]
2183
2184    def create_comment(self, body: str):
2185        """
2186        Create a comment on this pull request. This uses the same endpoint as the
2187        comments on issues, and will not be associated with any reviews.
2188
2189        https://hub.allspice.io/api/swagger#/issue/issueCreateComment
2190
2191        :param body: The body of the comment.
2192        :return: The comment that was created.
2193        """
2194
2195        result = self.allspice_client.requests_post(
2196            self.GET_COMMENTS.format(
2197                owner=self.repository.owner.username,
2198                repo=self.repository.name,
2199                index=self.number,
2200            ),
2201            data={"body": body},
2202        )
2203        return Comment.parse_response(self.allspice_client, result)

A Design Review. See allspice.allspice.io/api/swagger#/repository/repoGetPullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoGetPullRequest.

Note: The base and head fields are not Branch objects - they are plain strings referring to the branch names. This is because DRs can exist for branches that have been deleted, which don't have an associated Branch object from the API. You can use the Repository.get_branch method to get a Branch object for a branch if you know it exists.

DesignReview(allspice_client)
2079    def __init__(self, allspice_client):
2080        super().__init__(allspice_client)
allow_maintainer_edit: bool
allow_maintainer_edits: Any
assignee: User
assignees: List[User]
base: str
body: str
closed_at: Any
comments: int
created_at: str
diff_url: str
due_date: Optional[str]
head: str
html_url: str
id: int
is_locked: bool
labels: List[Any]
merge_base: str
merge_commit_sha: Any
mergeable: bool
merged: bool
merged_at: Any
merged_by: Any
milestone: Any
number: int
patch_url: str
pin_order: int
repository: Optional[Repository]
requested_reviewers: Any
state: str
title: str
updated_at: str
url: str
user: User
API_OBJECT = '/repos/{owner}/{repo}/pulls/{index}'
MERGE_DESIGN_REVIEW = '/repos/{owner}/{repo}/pulls/{index}/merge'
GET_COMMENTS = '/repos/{owner}/{repo}/issues/{index}/comments'
OPEN = 'open'
CLOSED = 'closed'
@classmethod
def parse_response(cls, allspice_client, result) -> DesignReview:
2090    @classmethod
2091    def parse_response(cls, allspice_client, result) -> "DesignReview":
2092        api_object = super().parse_response(allspice_client, result)
2093        cls._add_read_property(
2094            "repository",
2095            Repository.parse_response(allspice_client, result["base"]["repo"]),
2096            api_object,
2097        )
2098
2099        return api_object
@classmethod
def request(cls, allspice_client, owner: str, repo: str, number: str):
2101    @classmethod
2102    def request(cls, allspice_client, owner: str, repo: str, number: str):
2103        """See https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest"""
2104        return cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number})

See allspice.allspice.io/api/swagger#/repository/repoGetPullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoGetPullRequest

def commit(self):
2137    def commit(self):
2138        data = self.get_dirty_fields()
2139        if "due_date" in data and data["due_date"] is None:
2140            data["unset_due_date"] = True
2141
2142        args = {
2143            "owner": self.repository.owner.username,
2144            "repo": self.repository.name,
2145            "index": self.number,
2146        }
2147        self._commit(args, data)
def merge(self, merge_type: DesignReview.MergeType):
2149    def merge(self, merge_type: MergeType):
2150        """
2151        Merge the pull request. See
2152        https://hub.allspice.io/api/swagger#/repository/repoMergePullRequest
2153
2154        :param merge_type: The type of merge to perform. See the MergeType enum.
2155        """
2156
2157        self.allspice_client.requests_put(
2158            self.MERGE_DESIGN_REVIEW.format(
2159                owner=self.repository.owner.username,
2160                repo=self.repository.name,
2161                index=self.number,
2162            ),
2163            data={"Do": merge_type.value},
2164        )

Merge the pull request. See allspice.allspice.io/api/swagger#/repository/repoMergePullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoMergePullRequest

Parameters
  • merge_type: The type of merge to perform. See the MergeType enum.
def get_comments(self) -> List[Comment]:
2166    def get_comments(self) -> List[Comment]:
2167        """
2168        Get the comments on this pull request, but not specifically on a review.
2169
2170        https://hub.allspice.io/api/swagger#/issue/issueGetComments
2171
2172        :return: A list of comments on this pull request.
2173        """
2174
2175        results = self.allspice_client.requests_get(
2176            self.GET_COMMENTS.format(
2177                owner=self.repository.owner.username,
2178                repo=self.repository.name,
2179                index=self.number,
2180            )
2181        )
2182        return [Comment.parse_response(self.allspice_client, result) for result in results]

Get the comments on this pull request, but not specifically on a review.

allspice.allspice.io/api/swagger#/issue/issueGetComments">https://huballspice.allspice.io/api/swagger#/issue/issueGetComments

Returns

A list of comments on this pull request.

def create_comment(self, body: str):
2184    def create_comment(self, body: str):
2185        """
2186        Create a comment on this pull request. This uses the same endpoint as the
2187        comments on issues, and will not be associated with any reviews.
2188
2189        https://hub.allspice.io/api/swagger#/issue/issueCreateComment
2190
2191        :param body: The body of the comment.
2192        :return: The comment that was created.
2193        """
2194
2195        result = self.allspice_client.requests_post(
2196            self.GET_COMMENTS.format(
2197                owner=self.repository.owner.username,
2198                repo=self.repository.name,
2199                index=self.number,
2200            ),
2201            data={"body": body},
2202        )
2203        return Comment.parse_response(self.allspice_client, result)

Create a comment on this pull request. This uses the same endpoint as the comments on issues, and will not be associated with any reviews.

allspice.allspice.io/api/swagger#/issue/issueCreateComment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateComment

Parameters
  • body: The body of the comment.
Returns

The comment that was created.

class DesignReview.MergeType(enum.Enum):
2072    class MergeType(Enum):
2073        MERGE = "merge"
2074        REBASE = "rebase"
2075        REBASE_MERGE = "rebase-merge"
2076        SQUASH = "squash"
2077        MANUALLY_MERGED = "manually-merged"
MERGE = <MergeType.MERGE: 'merge'>
REBASE = <MergeType.REBASE: 'rebase'>
REBASE_MERGE = <MergeType.REBASE_MERGE: 'rebase-merge'>
SQUASH = <MergeType.SQUASH: 'squash'>
MANUALLY_MERGED = <MergeType.MANUALLY_MERGED: 'manually-merged'>
Inherited Members
enum.Enum
name
value
class Release(allspice.baseapiobject.ApiObject):
2289class Release(ApiObject):
2290    """
2291    A release on a repo.
2292    """
2293
2294    assets: List[Union[Any, Dict[str, Union[int, str]], "ReleaseAsset"]]
2295    author: User
2296    body: str
2297    created_at: str
2298    draft: bool
2299    html_url: str
2300    id: int
2301    name: str
2302    prerelease: bool
2303    published_at: str
2304    repo: Optional["Repository"]
2305    repository: Optional["Repository"]
2306    tag_name: str
2307    tarball_url: str
2308    target_commitish: str
2309    upload_url: str
2310    url: str
2311    zipball_url: str
2312
2313    API_OBJECT = "/repos/{owner}/{repo}/releases/{id}"
2314    RELEASE_CREATE_ASSET = "/repos/{owner}/{repo}/releases/{id}/assets"
2315    # Note that we don't strictly need the get_assets route, as the release
2316    # object already contains the assets.
2317
2318    def __init__(self, allspice_client):
2319        super().__init__(allspice_client)
2320
2321    def __eq__(self, other):
2322        if not isinstance(other, Release):
2323            return False
2324        return self.repo == other.repo and self.id == other.id
2325
2326    def __hash__(self):
2327        return hash(self.repo) ^ hash(self.id)
2328
2329    _fields_to_parsers: ClassVar[dict] = {
2330        "author": lambda allspice_client, author: User.parse_response(allspice_client, author),
2331    }
2332    _patchable_fields: ClassVar[set[str]] = {
2333        "body",
2334        "draft",
2335        "name",
2336        "prerelease",
2337        "tag_name",
2338        "target_commitish",
2339    }
2340
2341    @classmethod
2342    def parse_response(cls, allspice_client, result, repo) -> Release:
2343        release = super().parse_response(allspice_client, result)
2344        Release._add_read_property("repository", repo, release)
2345        # For legacy reasons
2346        Release._add_read_property("repo", repo, release)
2347        setattr(
2348            release,
2349            "_assets",
2350            [
2351                ReleaseAsset.parse_response(allspice_client, asset, release)
2352                for asset in result["assets"]
2353            ],
2354        )
2355        return release
2356
2357    @classmethod
2358    def request(
2359        cls,
2360        allspice_client,
2361        owner: str,
2362        repo: str,
2363        id: Optional[int] = None,
2364    ) -> Release:
2365        args = {"owner": owner, "repo": repo, "id": id}
2366        release_response = cls._get_gitea_api_object(allspice_client, args)
2367        repository = Repository.request(allspice_client, owner, repo)
2368        release = cls.parse_response(allspice_client, release_response, repository)
2369        return release
2370
2371    def commit(self):
2372        args = {"owner": self.repo.owner.name, "repo": self.repo.name, "id": self.id}
2373        self._commit(args)
2374
2375    def create_asset(self, file: IO, name: Optional[str] = None) -> ReleaseAsset:
2376        """
2377        Create an asset for this release.
2378
2379        https://hub.allspice.io/api/swagger#/repository/repoCreateReleaseAsset
2380
2381        :param file: The file to upload. This should be a file-like object.
2382        :param name: The name of the file.
2383        :return: The created asset.
2384        """
2385
2386        args: dict[str, Any] = {"files": {"attachment": file}}
2387        if name is not None:
2388            args["params"] = {"name": name}
2389
2390        result = self.allspice_client.requests_post(
2391            self.RELEASE_CREATE_ASSET.format(
2392                owner=self.repo.owner.username,
2393                repo=self.repo.name,
2394                id=self.id,
2395            ),
2396            **args,
2397        )
2398        return ReleaseAsset.parse_response(self.allspice_client, result, self)
2399
2400    def delete(self):
2401        args = {"owner": self.repo.owner.name, "repo": self.repo.name, "id": self.id}
2402        self.allspice_client.requests_delete(self.API_OBJECT.format(**args))
2403        self.deleted = True

A release on a repo.

Release(allspice_client)
2318    def __init__(self, allspice_client):
2319        super().__init__(allspice_client)
assets: List[Union[Any, Dict[str, Union[int, str]], allspice.apiobject.ReleaseAsset]]
author: User
body: str
created_at: str
draft: bool
html_url: str
id: int
name: str
prerelease: bool
published_at: str
repo: Optional[Repository]
repository: Optional[Repository]
tag_name: str
tarball_url: str
target_commitish: str
upload_url: str
url: str
zipball_url: str
API_OBJECT = '/repos/{owner}/{repo}/releases/{id}'
RELEASE_CREATE_ASSET = '/repos/{owner}/{repo}/releases/{id}/assets'
@classmethod
def parse_response(cls, allspice_client, result, repo) -> Release:
2341    @classmethod
2342    def parse_response(cls, allspice_client, result, repo) -> Release:
2343        release = super().parse_response(allspice_client, result)
2344        Release._add_read_property("repository", repo, release)
2345        # For legacy reasons
2346        Release._add_read_property("repo", repo, release)
2347        setattr(
2348            release,
2349            "_assets",
2350            [
2351                ReleaseAsset.parse_response(allspice_client, asset, release)
2352                for asset in result["assets"]
2353            ],
2354        )
2355        return release
@classmethod
def request( cls, allspice_client, owner: str, repo: str, id: Optional[int] = None) -> Release:
2357    @classmethod
2358    def request(
2359        cls,
2360        allspice_client,
2361        owner: str,
2362        repo: str,
2363        id: Optional[int] = None,
2364    ) -> Release:
2365        args = {"owner": owner, "repo": repo, "id": id}
2366        release_response = cls._get_gitea_api_object(allspice_client, args)
2367        repository = Repository.request(allspice_client, owner, repo)
2368        release = cls.parse_response(allspice_client, release_response, repository)
2369        return release
def commit(self):
2371    def commit(self):
2372        args = {"owner": self.repo.owner.name, "repo": self.repo.name, "id": self.id}
2373        self._commit(args)
def create_asset( self, file: <class 'IO'>, name: Optional[str] = None) -> allspice.apiobject.ReleaseAsset:
2375    def create_asset(self, file: IO, name: Optional[str] = None) -> ReleaseAsset:
2376        """
2377        Create an asset for this release.
2378
2379        https://hub.allspice.io/api/swagger#/repository/repoCreateReleaseAsset
2380
2381        :param file: The file to upload. This should be a file-like object.
2382        :param name: The name of the file.
2383        :return: The created asset.
2384        """
2385
2386        args: dict[str, Any] = {"files": {"attachment": file}}
2387        if name is not None:
2388            args["params"] = {"name": name}
2389
2390        result = self.allspice_client.requests_post(
2391            self.RELEASE_CREATE_ASSET.format(
2392                owner=self.repo.owner.username,
2393                repo=self.repo.name,
2394                id=self.id,
2395            ),
2396            **args,
2397        )
2398        return ReleaseAsset.parse_response(self.allspice_client, result, self)

Create an asset for this release.

allspice.allspice.io/api/swagger#/repository/repoCreateReleaseAsset">https://huballspice.allspice.io/api/swagger#/repository/repoCreateReleaseAsset

Parameters
  • file: The file to upload. This should be a file-like object.
  • name: The name of the file.
Returns

The created asset.

def delete(self):
2400    def delete(self):
2401        args = {"owner": self.repo.owner.name, "repo": self.repo.name, "id": self.id}
2402        self.allspice_client.requests_delete(self.API_OBJECT.format(**args))
2403        self.deleted = True