allspice.apiobject

   1from __future__ import annotations
   2
   3import logging
   4import re
   5from datetime import datetime, timezone
   6from enum import Enum
   7from functools import cached_property
   8from typing import (
   9    IO,
  10    Any,
  11    ClassVar,
  12    Dict,
  13    FrozenSet,
  14    List,
  15    Literal,
  16    Optional,
  17    Sequence,
  18    Set,
  19    Tuple,
  20    Union,
  21)
  22
  23try:
  24    from typing_extensions import Self
  25except ImportError:
  26    from typing import Self
  27
  28from .baseapiobject import ApiObject, ReadonlyApiObject
  29from .exceptions import ConflictException, NotFoundException
  30
  31
  32class Organization(ApiObject):
  33    """see https://hub.allspice.io/api/swagger#/organization/orgGetAll"""
  34
  35    active: Optional[bool]
  36    avatar_url: str
  37    created: Optional[str]
  38    description: str
  39    email: str
  40    followers_count: Optional[int]
  41    following_count: Optional[int]
  42    full_name: str
  43    html_url: Optional[str]
  44    id: int
  45    is_admin: Optional[bool]
  46    language: Optional[str]
  47    last_login: Optional[str]
  48    location: str
  49    login: Optional[str]
  50    login_name: Optional[str]
  51    name: Optional[str]
  52    prohibit_login: Optional[bool]
  53    repo_admin_change_team_access: Optional[bool]
  54    restricted: Optional[bool]
  55    source_id: Optional[int]
  56    starred_repos_count: Optional[int]
  57    username: str
  58    visibility: str
  59    website: str
  60
  61    API_OBJECT = """/orgs/{name}"""  # <org>
  62    ORG_REPOS_REQUEST = """/orgs/%s/repos"""  # <org>
  63    ORG_TEAMS_REQUEST = """/orgs/%s/teams"""  # <org>
  64    ORG_TEAMS_CREATE = """/orgs/%s/teams"""  # <org>
  65    ORG_GET_MEMBERS = """/orgs/%s/members"""  # <org>
  66    ORG_IS_MEMBER = """/orgs/%s/members/%s"""  # <org>, <username>
  67    ORG_HEATMAP = """/users/%s/heatmap"""  # <username>
  68
  69    def __init__(self, allspice_client):
  70        super().__init__(allspice_client)
  71
  72    def __eq__(self, other):
  73        if not isinstance(other, Organization):
  74            return False
  75        return self.allspice_client == other.allspice_client and self.name == other.name
  76
  77    def __hash__(self):
  78        return hash(self.allspice_client) ^ hash(self.name)
  79
  80    @classmethod
  81    def request(cls, allspice_client, name: str) -> Self:
  82        return cls._request(allspice_client, {"name": name})
  83
  84    @classmethod
  85    def parse_response(cls, allspice_client, result) -> "Organization":
  86        api_object = super().parse_response(allspice_client, result)
  87        # add "name" field to make this behave similar to users for gitea < 1.18
  88        # also necessary for repository-owner when org is repo owner
  89        if not hasattr(api_object, "name"):
  90            Organization._add_read_property("name", result["username"], api_object)
  91        return api_object
  92
  93    _patchable_fields: ClassVar[set[str]] = {
  94        "description",
  95        "full_name",
  96        "location",
  97        "visibility",
  98        "website",
  99    }
 100
 101    def commit(self):
 102        args = {"name": self.name}
 103        self._commit(args)
 104
 105    def create_repo(
 106        self,
 107        repoName: str,
 108        description: str = "",
 109        private: bool = False,
 110        autoInit=True,
 111        gitignores: Optional[str] = None,
 112        license: Optional[str] = None,
 113        readme: str = "Default",
 114        issue_labels: Optional[str] = None,
 115        default_branch="master",
 116    ):
 117        """Create an organization Repository
 118
 119        Throws:
 120            AlreadyExistsException: If the Repository exists already.
 121            Exception: If something else went wrong.
 122        """
 123        result = self.allspice_client.requests_post(
 124            f"/orgs/{self.name}/repos",
 125            data={
 126                "name": repoName,
 127                "description": description,
 128                "private": private,
 129                "auto_init": autoInit,
 130                "gitignores": gitignores,
 131                "license": license,
 132                "issue_labels": issue_labels,
 133                "readme": readme,
 134                "default_branch": default_branch,
 135            },
 136        )
 137        if "id" in result:
 138            self.allspice_client.logger.info("Successfully created Repository %s " % result["name"])
 139        else:
 140            self.allspice_client.logger.error(result["message"])
 141            raise Exception("Repository not created... (gitea: %s)" % result["message"])
 142        return Repository.parse_response(self.allspice_client, result)
 143
 144    def get_repositories(self) -> List["Repository"]:
 145        results = self.allspice_client.requests_get_paginated(
 146            Organization.ORG_REPOS_REQUEST % self.username
 147        )
 148        return [Repository.parse_response(self.allspice_client, result) for result in results]
 149
 150    def get_repository(self, name) -> "Repository":
 151        repos = self.get_repositories()
 152        for repo in repos:
 153            if repo.name == name:
 154                return repo
 155        raise NotFoundException("Repository %s not existent in organization." % name)
 156
 157    def get_teams(self) -> List["Team"]:
 158        results = self.allspice_client.requests_get(Organization.ORG_TEAMS_REQUEST % self.username)
 159        teams = [Team.parse_response(self.allspice_client, result) for result in results]
 160        # organisation seems to be missing using this request, so we add org manually
 161        for t in teams:
 162            setattr(t, "_organization", self)
 163        return teams
 164
 165    def get_team(self, name) -> "Team":
 166        teams = self.get_teams()
 167        for team in teams:
 168            if team.name == name:
 169                return team
 170        raise NotFoundException("Team not existent in organization.")
 171
 172    def create_team(
 173        self,
 174        name: str,
 175        description: str = "",
 176        permission: str = "read",
 177        can_create_org_repo: bool = False,
 178        includes_all_repositories: bool = False,
 179        units=(
 180            "repo.code",
 181            "repo.issues",
 182            "repo.ext_issues",
 183            "repo.wiki",
 184            "repo.pulls",
 185            "repo.releases",
 186            "repo.ext_wiki",
 187        ),
 188        units_map={},
 189    ) -> "Team":
 190        """Alias for AllSpice#create_team"""
 191        # TODO: Move AllSpice#create_team to Organization#create_team and
 192        #       deprecate AllSpice#create_team.
 193        return self.allspice_client.create_team(
 194            org=self,
 195            name=name,
 196            description=description,
 197            permission=permission,
 198            can_create_org_repo=can_create_org_repo,
 199            includes_all_repositories=includes_all_repositories,
 200            units=units,
 201            units_map=units_map,
 202        )
 203
 204    def get_members(self) -> List["User"]:
 205        results = self.allspice_client.requests_get(Organization.ORG_GET_MEMBERS % self.username)
 206        return [User.parse_response(self.allspice_client, result) for result in results]
 207
 208    def is_member(self, username) -> bool:
 209        if isinstance(username, User):
 210            username = username.username
 211        try:
 212            # returns 204 if its ok, 404 if its not
 213            self.allspice_client.requests_get(
 214                Organization.ORG_IS_MEMBER % (self.username, username)
 215            )
 216            return True
 217        except Exception:
 218            return False
 219
 220    def remove_member(self, user: "User"):
 221        path = f"/orgs/{self.username}/members/{user.username}"
 222        self.allspice_client.requests_delete(path)
 223
 224    def delete(self):
 225        """Delete this Organization. Invalidates this Objects data. Also deletes all Repositories owned by the User"""
 226        for repo in self.get_repositories():
 227            repo.delete()
 228        self.allspice_client.requests_delete(Organization.API_OBJECT.format(name=self.username))
 229        self.deleted = True
 230
 231    def get_heatmap(self) -> List[Tuple[datetime, int]]:
 232        results = self.allspice_client.requests_get(User.USER_HEATMAP % self.username)
 233        results = [
 234            (datetime.fromtimestamp(result["timestamp"]), result["contributions"])
 235            for result in results
 236        ]
 237        return results
 238
 239
 240class User(ApiObject):
 241    active: bool
 242    admin: Any
 243    allow_create_organization: Any
 244    allow_git_hook: Any
 245    allow_import_local: Any
 246    avatar_url: str
 247    created: str
 248    description: str
 249    email: str
 250    emails: List[Any]
 251    followers_count: int
 252    following_count: int
 253    full_name: str
 254    html_url: str
 255    id: int
 256    is_admin: bool
 257    language: str
 258    last_login: str
 259    location: str
 260    login: str
 261    login_name: str
 262    max_repo_creation: Any
 263    must_change_password: Any
 264    password: Any
 265    prohibit_login: bool
 266    restricted: bool
 267    source_id: int
 268    starred_repos_count: int
 269    username: str
 270    visibility: str
 271    website: str
 272
 273    API_OBJECT = """/users/{name}"""  # <org>
 274    USER_MAIL = """/user/emails?sudo=%s"""  # <name>
 275    USER_PATCH = """/admin/users/%s"""  # <username>
 276    ADMIN_DELETE_USER = """/admin/users/%s"""  # <username>
 277    ADMIN_EDIT_USER = """/admin/users/{username}"""  # <username>
 278    USER_HEATMAP = """/users/%s/heatmap"""  # <username>
 279
 280    def __init__(self, allspice_client):
 281        super().__init__(allspice_client)
 282        self._emails = []
 283
 284    def __eq__(self, other):
 285        if not isinstance(other, User):
 286            return False
 287        return self.allspice_client == other.allspice_client and self.id == other.id
 288
 289    def __hash__(self):
 290        return hash(self.allspice_client) ^ hash(self.id)
 291
 292    @property
 293    def emails(self):
 294        self.__request_emails()
 295        return self._emails
 296
 297    @classmethod
 298    def request(cls, allspice_client, name: str) -> "User":
 299        api_object = cls._request(allspice_client, {"name": name})
 300        return api_object
 301
 302    _patchable_fields: ClassVar[set[str]] = {
 303        "active",
 304        "admin",
 305        "allow_create_organization",
 306        "allow_git_hook",
 307        "allow_import_local",
 308        "email",
 309        "full_name",
 310        "location",
 311        "login_name",
 312        "max_repo_creation",
 313        "must_change_password",
 314        "password",
 315        "prohibit_login",
 316        "website",
 317    }
 318
 319    def commit(self, login_name: str, source_id: int = 0):
 320        """
 321        Unfortunately it is necessary to require the login name
 322        as well as the login source (that is not supplied when getting a user) for
 323        changing a user.
 324        Usually source_id is 0 and the login_name is equal to the username.
 325        """
 326        values = self.get_dirty_fields()
 327        values.update(
 328            # api-doc says that the "source_id" is necessary; works without though
 329            {"login_name": login_name, "source_id": source_id}
 330        )
 331        args = {"username": self.username}
 332        self.allspice_client.requests_patch(User.ADMIN_EDIT_USER.format(**args), data=values)
 333        self._dirty_fields = {}
 334
 335    def create_repo(
 336        self,
 337        repoName: str,
 338        description: str = "",
 339        private: bool = False,
 340        autoInit=True,
 341        gitignores: Optional[str] = None,
 342        license: Optional[str] = None,
 343        readme: str = "Default",
 344        issue_labels: Optional[str] = None,
 345        default_branch="master",
 346    ):
 347        """Create a user Repository
 348
 349        Throws:
 350            AlreadyExistsException: If the Repository exists already.
 351            Exception: If something else went wrong.
 352        """
 353        result = self.allspice_client.requests_post(
 354            "/user/repos",
 355            data={
 356                "name": repoName,
 357                "description": description,
 358                "private": private,
 359                "auto_init": autoInit,
 360                "gitignores": gitignores,
 361                "license": license,
 362                "issue_labels": issue_labels,
 363                "readme": readme,
 364                "default_branch": default_branch,
 365            },
 366        )
 367        if "id" in result:
 368            self.allspice_client.logger.info("Successfully created Repository %s " % result["name"])
 369        else:
 370            self.allspice_client.logger.error(result["message"])
 371            raise Exception("Repository not created... (gitea: %s)" % result["message"])
 372        return Repository.parse_response(self.allspice_client, result)
 373
 374    def get_repositories(self) -> List["Repository"]:
 375        """Get all Repositories owned by this User."""
 376        url = f"/users/{self.username}/repos"
 377        results = self.allspice_client.requests_get_paginated(url)
 378        return [Repository.parse_response(self.allspice_client, result) for result in results]
 379
 380    def get_orgs(self) -> List[Organization]:
 381        """Get all Organizations this user is a member of."""
 382        url = f"/users/{self.username}/orgs"
 383        results = self.allspice_client.requests_get_paginated(url)
 384        return [Organization.parse_response(self.allspice_client, result) for result in results]
 385
 386    def get_teams(self) -> List["Team"]:
 387        url = "/user/teams"
 388        results = self.allspice_client.requests_get_paginated(url, sudo=self)
 389        return [Team.parse_response(self.allspice_client, result) for result in results]
 390
 391    def get_accessible_repos(self) -> List["Repository"]:
 392        """Get all Repositories accessible by the logged in User."""
 393        results = self.allspice_client.requests_get("/user/repos", sudo=self)
 394        return [Repository.parse_response(self.allspice_client, result) for result in results]
 395
 396    def __request_emails(self):
 397        result = self.allspice_client.requests_get(User.USER_MAIL % self.login)
 398        # report if the adress changed by this
 399        for mail in result:
 400            self._emails.append(mail["email"])
 401            if mail["primary"]:
 402                self._email = mail["email"]
 403
 404    def delete(self):
 405        """Deletes this User. Also deletes all Repositories he owns."""
 406        self.allspice_client.requests_delete(User.ADMIN_DELETE_USER % self.username)
 407        self.deleted = True
 408
 409    def get_heatmap(self) -> List[Tuple[datetime, int]]:
 410        results = self.allspice_client.requests_get(User.USER_HEATMAP % self.username)
 411        results = [
 412            (datetime.fromtimestamp(result["timestamp"]), result["contributions"])
 413            for result in results
 414        ]
 415        return results
 416
 417
 418class Branch(ReadonlyApiObject):
 419    commit: Dict[str, Optional[Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]]]
 420    effective_branch_protection_name: str
 421    enable_status_check: bool
 422    name: str
 423    protected: bool
 424    required_approvals: int
 425    status_check_contexts: List[Any]
 426    user_can_merge: bool
 427    user_can_push: bool
 428
 429    API_OBJECT = """/repos/{owner}/{repo}/branches/{branch}"""
 430
 431    def __init__(self, allspice_client):
 432        super().__init__(allspice_client)
 433
 434    def __eq__(self, other):
 435        if not isinstance(other, Branch):
 436            return False
 437        return self.commit == other.commit and self.name == other.name
 438
 439    def __hash__(self):
 440        return hash(self.commit["id"]) ^ hash(self.name)
 441
 442    _fields_to_parsers: ClassVar[dict] = {
 443        # This is not a commit object
 444        # "commit": lambda allspice_client, c: Commit.parse_response(allspice_client, c)
 445    }
 446
 447    @classmethod
 448    def request(cls, allspice_client, owner: str, repo: str, branch: str):
 449        return cls._request(allspice_client, {"owner": owner, "repo": repo, "branch": branch})
 450
 451
 452class GitEntry(ReadonlyApiObject):
 453    """
 454    An object representing a file or directory in the Git tree.
 455    """
 456
 457    mode: str
 458    path: str
 459    sha: str
 460    size: int
 461    type: str
 462    url: str
 463
 464    def __init__(self, allspice_client):
 465        super().__init__(allspice_client)
 466
 467    def __eq__(self, other) -> bool:
 468        if not isinstance(other, GitEntry):
 469            return False
 470        return self.sha == other.sha
 471
 472    def __hash__(self) -> int:
 473        return hash(self.sha)
 474
 475
 476class Repository(ApiObject):
 477    allow_fast_forward_only_merge: bool
 478    allow_manual_merge: Any
 479    allow_merge_commits: bool
 480    allow_rebase: bool
 481    allow_rebase_explicit: bool
 482    allow_rebase_update: bool
 483    allow_squash_merge: bool
 484    archived: bool
 485    archived_at: str
 486    autodetect_manual_merge: Any
 487    avatar_url: str
 488    clone_url: str
 489    created_at: str
 490    default_allow_maintainer_edit: bool
 491    default_branch: str
 492    default_delete_branch_after_merge: bool
 493    default_merge_style: str
 494    description: str
 495    empty: bool
 496    enable_prune: Any
 497    external_tracker: Any
 498    external_wiki: Any
 499    fork: bool
 500    forks_count: int
 501    full_name: str
 502    has_actions: bool
 503    has_issues: bool
 504    has_packages: bool
 505    has_projects: bool
 506    has_pull_requests: bool
 507    has_releases: bool
 508    has_wiki: bool
 509    html_url: str
 510    id: int
 511    ignore_whitespace_conflicts: bool
 512    internal: bool
 513    internal_tracker: Dict[str, bool]
 514    language: str
 515    languages_url: str
 516    link: str
 517    mirror: bool
 518    mirror_interval: str
 519    mirror_updated: str
 520    name: str
 521    object_format_name: str
 522    open_issues_count: int
 523    open_pr_counter: int
 524    original_url: str
 525    owner: Union["User", "Organization"]
 526    parent: Any
 527    permissions: Dict[str, bool]
 528    private: bool
 529    projects_mode: str
 530    release_counter: int
 531    repo_transfer: Any
 532    size: int
 533    ssh_url: str
 534    stars_count: int
 535    template: bool
 536    updated_at: datetime
 537    url: str
 538    watchers_count: int
 539    website: str
 540
 541    API_OBJECT = """/repos/{owner}/{name}"""  # <owner>, <reponame>
 542    REPO_IS_COLLABORATOR = """/repos/%s/%s/collaborators/%s"""  # <owner>, <reponame>, <username>
 543    REPO_SEARCH = """/repos/search/"""
 544    REPO_BRANCHES = """/repos/%s/%s/branches"""  # <owner>, <reponame>
 545    REPO_BRANCH = """/repos/{owner}/{repo}/branches/{branch}"""
 546    REPO_ISSUES = """/repos/{owner}/{repo}/issues"""  # <owner, reponame>
 547    REPO_DESIGN_REVIEWS = """/repos/{owner}/{repo}/pulls"""
 548    REPO_DELETE = """/repos/%s/%s"""  # <owner>, <reponame>
 549    REPO_TIMES = """/repos/%s/%s/times"""  # <owner>, <reponame>
 550    REPO_USER_TIME = """/repos/%s/%s/times/%s"""  # <owner>, <reponame>, <username>
 551    REPO_COMMITS = "/repos/%s/%s/commits"  # <owner>, <reponame>
 552    REPO_TRANSFER = "/repos/{owner}/{repo}/transfer"
 553    REPO_MILESTONES = """/repos/{owner}/{repo}/milestones"""
 554    REPO_GET_ARCHIVE = "/repos/{owner}/{repo}/archive/{ref}.{format}"
 555    REPO_GET_ALLSPICE_JSON = "/repos/{owner}/{repo}/allspice_generated/json/{content}"
 556    REPO_GET_ALLSPICE_SVG = "/repos/{owner}/{repo}/allspice_generated/svg/{content}"
 557    REPO_GET_TOPICS = "/repos/{owner}/{repo}/topics"
 558    REPO_ADD_TOPIC = "/repos/{owner}/{repo}/topics/{topic}"
 559    REPO_GET_RELEASES = "/repos/{owner}/{repo}/releases"
 560    REPO_GET_LATEST_RELEASE = "/repos/{owner}/{repo}/releases/latest"
 561    REPO_GET_RELEASE_BY_TAG = "/repos/{owner}/{repo}/releases/tags/{tag}"
 562    REPO_GET_COMMIT_STATUS = "/repos/{owner}/{repo}/statuses/{sha}"
 563    REPO_GET_RAW_FILE = "/repos/{owner}/{repo}/raw/{path}"
 564    REPO_GET_TREE = "/repos/{owner}/{repo}/git/trees/{ref}"
 565
 566    class ArchiveFormat(Enum):
 567        """
 568        Archive formats for Repository.get_archive
 569        """
 570
 571        TAR = "tar.gz"
 572        ZIP = "zip"
 573
 574    class CommitStatusSort(Enum):
 575        """
 576        Sort order for Repository.get_commit_status
 577        """
 578
 579        OLDEST = "oldest"
 580        RECENT_UPDATE = "recentupdate"
 581        LEAST_UPDATE = "leastupdate"
 582        LEAST_INDEX = "leastindex"
 583        HIGHEST_INDEX = "highestindex"
 584
 585    def __init__(self, allspice_client):
 586        super().__init__(allspice_client)
 587
 588    def __eq__(self, other):
 589        if not isinstance(other, Repository):
 590            return False
 591        return self.owner == other.owner and self.name == other.name
 592
 593    def __hash__(self):
 594        return hash(self.owner) ^ hash(self.name)
 595
 596    _fields_to_parsers: ClassVar[dict] = {
 597        # dont know how to tell apart user and org as owner except form email being empty.
 598        "owner": lambda allspice_client, r: (
 599            Organization.parse_response(allspice_client, r)
 600            if r["email"] == ""
 601            else User.parse_response(allspice_client, r)
 602        ),
 603        "updated_at": lambda _, t: Util.convert_time(t),
 604    }
 605
 606    @classmethod
 607    def request(
 608        cls,
 609        allspice_client,
 610        owner: str,
 611        name: str,
 612    ) -> Repository:
 613        return cls._request(allspice_client, {"owner": owner, "name": name})
 614
 615    @classmethod
 616    def search(
 617        cls,
 618        allspice_client,
 619        query: Optional[str] = None,
 620        topic: bool = False,
 621        include_description: bool = False,
 622        user: Optional[User] = None,
 623        owner_to_prioritize: Union[User, Organization, None] = None,
 624    ) -> list[Repository]:
 625        """
 626        Search for repositories.
 627
 628        See https://hub.allspice.io/api/swagger#/repository/repoSearch
 629
 630        :param query: The query string to search for
 631        :param topic: If true, the query string will only be matched against the
 632            repository's topic.
 633        :param include_description: If true, the query string will be matched
 634            against the repository's description as well.
 635        :param user: If specified, only repositories that this user owns or
 636            contributes to will be searched.
 637        :param owner_to_prioritize: If specified, repositories owned by the
 638            given entity will be prioritized in the search.
 639        :returns: All repositories matching the query. If there are many
 640            repositories matching this query, this may take some time.
 641        """
 642
 643        params = {}
 644
 645        if query is not None:
 646            params["q"] = query
 647        if topic:
 648            params["topic"] = topic
 649        if include_description:
 650            params["include_description"] = include_description
 651        if user is not None:
 652            params["user"] = user.id
 653        if owner_to_prioritize is not None:
 654            params["owner_to_prioritize"] = owner_to_prioritize.id
 655
 656        responses = allspice_client.requests_get_paginated(cls.REPO_SEARCH, params=params)
 657
 658        return [Repository.parse_response(allspice_client, response) for response in responses]
 659
 660    _patchable_fields: ClassVar[set[str]] = {
 661        "allow_manual_merge",
 662        "allow_merge_commits",
 663        "allow_rebase",
 664        "allow_rebase_explicit",
 665        "allow_rebase_update",
 666        "allow_squash_merge",
 667        "archived",
 668        "autodetect_manual_merge",
 669        "default_branch",
 670        "default_delete_branch_after_merge",
 671        "default_merge_style",
 672        "description",
 673        "enable_prune",
 674        "external_tracker",
 675        "external_wiki",
 676        "has_actions",
 677        "has_issues",
 678        "has_projects",
 679        "has_pull_requests",
 680        "has_wiki",
 681        "ignore_whitespace_conflicts",
 682        "internal_tracker",
 683        "mirror_interval",
 684        "name",
 685        "private",
 686        "template",
 687        "website",
 688    }
 689
 690    def commit(self):
 691        args = {"owner": self.owner.username, "name": self.name}
 692        self._commit(args)
 693
 694    def get_branches(self) -> List["Branch"]:
 695        """Get all the Branches of this Repository."""
 696
 697        results = self.allspice_client.requests_get_paginated(
 698            Repository.REPO_BRANCHES % (self.owner.username, self.name)
 699        )
 700        return [Branch.parse_response(self.allspice_client, result) for result in results]
 701
 702    def get_branch(self, name: str) -> "Branch":
 703        """Get a specific Branch of this Repository."""
 704        result = self.allspice_client.requests_get(
 705            Repository.REPO_BRANCH.format(owner=self.owner.username, repo=self.name, branch=name)
 706        )
 707        return Branch.parse_response(self.allspice_client, result)
 708
 709    def add_branch(self, create_from: Ref, newname: str) -> "Branch":
 710        """Add a branch to the repository"""
 711        # Note: will only work with gitea 1.13 or higher!
 712
 713        ref_name = Util.data_params_for_ref(create_from)
 714        if "ref" not in ref_name:
 715            raise ValueError("create_from must be a Branch, Commit or string")
 716        ref_name = ref_name["ref"]
 717
 718        data = {"new_branch_name": newname, "old_ref_name": ref_name}
 719        result = self.allspice_client.requests_post(
 720            Repository.REPO_BRANCHES % (self.owner.username, self.name), data=data
 721        )
 722        return Branch.parse_response(self.allspice_client, result)
 723
 724    def get_issues(
 725        self,
 726        state: Literal["open", "closed", "all"] = "all",
 727        search_query: Optional[str] = None,
 728        labels: Optional[List[str]] = None,
 729        milestones: Optional[List[Union[Milestone, str]]] = None,
 730        assignee: Optional[Union[User, str]] = None,
 731        since: Optional[datetime] = None,
 732        before: Optional[datetime] = None,
 733    ) -> List["Issue"]:
 734        """
 735        Get all Issues of this Repository (open and closed)
 736
 737        https://hub.allspice.io/api/swagger#/repository/repoListIssues
 738
 739        All params of this method are optional filters. If you don't specify a filter, it
 740        will not be applied.
 741
 742        :param state: The state of the Issues to get. If None, all Issues are returned.
 743        :param search_query: Filter issues by text. This is equivalent to searching for
 744                             `search_query` in the Issues on the web interface.
 745        :param labels: Filter issues by labels.
 746        :param milestones: Filter issues by milestones.
 747        :param assignee: Filter issues by the assigned user.
 748        :param since: Filter issues by the date they were created.
 749        :param before: Filter issues by the date they were created.
 750        :return: A list of Issues.
 751        """
 752
 753        data = {
 754            "state": state,
 755        }
 756        if search_query:
 757            data["q"] = search_query
 758        if labels:
 759            data["labels"] = ",".join(labels)
 760        if milestones:
 761            data["milestone"] = ",".join(
 762                [
 763                    milestone.name if isinstance(milestone, Milestone) else milestone
 764                    for milestone in milestones
 765                ]
 766            )
 767        if assignee:
 768            if isinstance(assignee, User):
 769                data["assignee"] = assignee.username
 770            else:
 771                data["assignee"] = assignee
 772        if since:
 773            data["since"] = Util.format_time(since)
 774        if before:
 775            data["before"] = Util.format_time(before)
 776
 777        results = self.allspice_client.requests_get_paginated(
 778            Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name),
 779            params=data,
 780        )
 781
 782        issues = []
 783        for result in results:
 784            issue = Issue.parse_response(self.allspice_client, result)
 785            # See Issue.request
 786            setattr(issue, "_repository", self)
 787            # This is mostly for compatibility with an older implementation
 788            Issue._add_read_property("repo", self, issue)
 789            issues.append(issue)
 790
 791        return issues
 792
 793    def get_design_reviews(
 794        self,
 795        state: Literal["open", "closed", "all"] = "all",
 796        milestone: Optional[Union[Milestone, str]] = None,
 797        labels: Optional[List[str]] = None,
 798    ) -> List["DesignReview"]:
 799        """
 800        Get all Design Reviews of this Repository.
 801
 802        https://hub.allspice.io/api/swagger#/repository/repoListPullRequests
 803
 804        :param state: The state of the Design Reviews to get. If None, all Design Reviews
 805                      are returned.
 806        :param milestone: The milestone of the Design Reviews to get.
 807        :param labels: A list of label IDs to filter DRs by.
 808        :return: A list of Design Reviews.
 809        """
 810
 811        params = {
 812            "state": state,
 813        }
 814        if milestone:
 815            if isinstance(milestone, Milestone):
 816                params["milestone"] = milestone.name
 817            else:
 818                params["milestone"] = milestone
 819        if labels:
 820            params["labels"] = ",".join(labels)
 821
 822        results = self.allspice_client.requests_get_paginated(
 823            self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name),
 824            params=params,
 825        )
 826        return [DesignReview.parse_response(self.allspice_client, result) for result in results]
 827
 828    def get_commits(
 829        self,
 830        sha: Optional[str] = None,
 831        path: Optional[str] = None,
 832        stat: bool = True,
 833    ) -> List["Commit"]:
 834        """
 835        Get all the Commits of this Repository.
 836
 837        https://hub.allspice.io/api/swagger#/repository/repoGetAllCommits
 838
 839        :param sha: The SHA of the commit to start listing commits from.
 840        :param path: filepath of a file/dir.
 841        :param stat: Include the number of additions and deletions in the response.
 842                     Disable for speedup.
 843        :return: A list of Commits.
 844        """
 845
 846        data = {}
 847        if sha:
 848            data["sha"] = sha
 849        if path:
 850            data["path"] = path
 851        if not stat:
 852            data["stat"] = False
 853
 854        try:
 855            results = self.allspice_client.requests_get_paginated(
 856                Repository.REPO_COMMITS % (self.owner.username, self.name),
 857                params=data,
 858            )
 859        except ConflictException as err:
 860            logging.warning(err)
 861            logging.warning("Repository %s/%s is Empty" % (self.owner.username, self.name))
 862            results = []
 863        return [Commit.parse_response(self.allspice_client, result) for result in results]
 864
 865    def get_issues_state(self, state) -> List["Issue"]:
 866        """
 867        DEPRECATED: Use get_issues() instead.
 868
 869        Get issues of state Issue.open or Issue.closed of a repository.
 870        """
 871
 872        assert state in [Issue.OPENED, Issue.CLOSED]
 873        issues = []
 874        data = {"state": state}
 875        results = self.allspice_client.requests_get_paginated(
 876            Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name),
 877            params=data,
 878        )
 879        for result in results:
 880            issue = Issue.parse_response(self.allspice_client, result)
 881            # adding data not contained in the issue response
 882            # See Issue.request()
 883            setattr(issue, "_repository", self)
 884            Issue._add_read_property("repo", self, issue)
 885            Issue._add_read_property("owner", self.owner, issue)
 886            issues.append(issue)
 887        return issues
 888
 889    def get_times(self):
 890        results = self.allspice_client.requests_get(
 891            Repository.REPO_TIMES % (self.owner.username, self.name)
 892        )
 893        return results
 894
 895    def get_user_time(self, username) -> float:
 896        if isinstance(username, User):
 897            username = username.username
 898        results = self.allspice_client.requests_get(
 899            Repository.REPO_USER_TIME % (self.owner.username, self.name, username)
 900        )
 901        time = sum(r["time"] for r in results)
 902        return time
 903
 904    def get_full_name(self) -> str:
 905        return self.owner.username + "/" + self.name
 906
 907    def create_issue(self, title, assignees=frozenset(), description="") -> ApiObject:
 908        data = {
 909            "assignees": assignees,
 910            "body": description,
 911            "closed": False,
 912            "title": title,
 913        }
 914        result = self.allspice_client.requests_post(
 915            Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name),
 916            data=data,
 917        )
 918
 919        issue = Issue.parse_response(self.allspice_client, result)
 920        setattr(issue, "_repository", self)
 921        Issue._add_read_property("repo", self, issue)
 922        return issue
 923
 924    def create_design_review(
 925        self,
 926        title: str,
 927        head: Union[Branch, str],
 928        base: Union[Branch, str],
 929        assignees: Optional[Set[Union[User, str]]] = None,
 930        body: Optional[str] = None,
 931        due_date: Optional[datetime] = None,
 932        milestone: Optional["Milestone"] = None,
 933    ) -> "DesignReview":
 934        """
 935        Create a new Design Review.
 936
 937        See https://hub.allspice.io/api/swagger#/repository/repoCreatePullRequest
 938
 939        :param title: Title of the Design Review
 940        :param head: Branch or name of the branch to merge into the base branch
 941        :param base: Branch or name of the branch to merge into
 942        :param assignees: Optional. A list of users to assign this review. List can be of
 943                          User objects or of usernames.
 944        :param body: An Optional Description for the Design Review.
 945        :param due_date: An Optional Due date for the Design Review.
 946        :param milestone: An Optional Milestone for the Design Review
 947        :return: The created Design Review
 948        """
 949
 950        data: dict[str, Any] = {
 951            "title": title,
 952        }
 953
 954        if isinstance(head, Branch):
 955            data["head"] = head.name
 956        else:
 957            data["head"] = head
 958        if isinstance(base, Branch):
 959            data["base"] = base.name
 960        else:
 961            data["base"] = base
 962        if assignees:
 963            data["assignees"] = [a.username if isinstance(a, User) else a for a in assignees]
 964        if body:
 965            data["body"] = body
 966        if due_date:
 967            data["due_date"] = Util.format_time(due_date)
 968        if milestone:
 969            data["milestone"] = milestone.id
 970
 971        result = self.allspice_client.requests_post(
 972            self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name),
 973            data=data,
 974        )
 975
 976        return DesignReview.parse_response(self.allspice_client, result)
 977
 978    def create_milestone(
 979        self,
 980        title: str,
 981        description: str,
 982        due_date: Optional[str] = None,
 983        state: str = "open",
 984    ) -> "Milestone":
 985        url = Repository.REPO_MILESTONES.format(owner=self.owner.username, repo=self.name)
 986        data = {"title": title, "description": description, "state": state}
 987        if due_date:
 988            data["due_date"] = due_date
 989        result = self.allspice_client.requests_post(url, data=data)
 990        return Milestone.parse_response(self.allspice_client, result)
 991
 992    def create_gitea_hook(self, hook_url: str, events: List[str]):
 993        url = f"/repos/{self.owner.username}/{self.name}/hooks"
 994        data = {
 995            "type": "gitea",
 996            "config": {"content_type": "json", "url": hook_url},
 997            "events": events,
 998            "active": True,
 999        }
1000        return self.allspice_client.requests_post(url, data=data)
1001
1002    def list_hooks(self):
1003        url = f"/repos/{self.owner.username}/{self.name}/hooks"
1004        return self.allspice_client.requests_get(url)
1005
1006    def delete_hook(self, id: str):
1007        url = f"/repos/{self.owner.username}/{self.name}/hooks/{id}"
1008        self.allspice_client.requests_delete(url)
1009
1010    def is_collaborator(self, username) -> bool:
1011        if isinstance(username, User):
1012            username = username.username
1013        try:
1014            # returns 204 if its ok, 404 if its not
1015            self.allspice_client.requests_get(
1016                Repository.REPO_IS_COLLABORATOR % (self.owner.username, self.name, username)
1017            )
1018            return True
1019        except Exception:
1020            return False
1021
1022    def get_users_with_access(self) -> Sequence[User]:
1023        url = f"/repos/{self.owner.username}/{self.name}/collaborators"
1024        response = self.allspice_client.requests_get(url)
1025        collabs = [User.parse_response(self.allspice_client, user) for user in response]
1026        if isinstance(self.owner, User):
1027            return [*collabs, self.owner]
1028        else:
1029            # owner must be org
1030            teams = self.owner.get_teams()
1031            for team in teams:
1032                team_repos = team.get_repos()
1033                if self.name in [n.name for n in team_repos]:
1034                    collabs += team.get_members()
1035            return collabs
1036
1037    def remove_collaborator(self, user_name: str):
1038        url = f"/repos/{self.owner.username}/{self.name}/collaborators/{user_name}"
1039        self.allspice_client.requests_delete(url)
1040
1041    def transfer_ownership(
1042        self,
1043        new_owner: Union[User, Organization],
1044        new_teams: Set[Team] | FrozenSet[Team] = frozenset(),
1045    ):
1046        url = Repository.REPO_TRANSFER.format(owner=self.owner.username, repo=self.name)
1047        data: dict[str, Any] = {"new_owner": new_owner.username}
1048        if isinstance(new_owner, Organization):
1049            new_team_ids = [team.id for team in new_teams if team in new_owner.get_teams()]
1050            data["team_ids"] = new_team_ids
1051        self.allspice_client.requests_post(url, data=data)
1052        # TODO: make sure this instance is either updated or discarded
1053
1054    def get_git_content(
1055        self,
1056        ref: Optional["Ref"] = None,
1057        commit: "Optional[Commit]" = None,
1058    ) -> List[Content]:
1059        """
1060        Get the metadata for all files in the root directory.
1061
1062        https://hub.allspice.io/api/swagger#/repository/repoGetContentsList
1063
1064        :param ref: branch or commit to get content from
1065        :param commit: commit to get content from (deprecated)
1066        """
1067        url = f"/repos/{self.owner.username}/{self.name}/contents"
1068        data = Util.data_params_for_ref(ref or commit)
1069
1070        result = [
1071            Content.parse_response(self.allspice_client, f)
1072            for f in self.allspice_client.requests_get(url, data)
1073        ]
1074        return result
1075
1076    def get_tree(self, ref: Optional[Ref] = None, recursive: bool = False) -> List[GitEntry]:
1077        """
1078        Get the repository's tree on a given ref.
1079
1080        By default, this will only return the top-level entries in the tree. If you want
1081        to get the entire tree, set `recursive` to True.
1082
1083        :param ref: The ref to get the tree from. If not provided, the default branch is used.
1084        :param recursive: Whether to get the entire tree or just the top-level entries.
1085        """
1086
1087        ref = Util.data_params_for_ref(ref).get("ref", self.default_branch)
1088        url = self.REPO_GET_TREE.format(owner=self.owner.username, repo=self.name, ref=ref)
1089        params = {"recursive": recursive}
1090        results = self.allspice_client.requests_get_paginated(url, params=params)
1091        return [GitEntry.parse_response(self.allspice_client, result) for result in results]
1092
1093    def get_file_content(
1094        self,
1095        content: Content,
1096        ref: Optional[Ref] = None,
1097        commit: Optional[Commit] = None,
1098    ) -> Union[str, List["Content"]]:
1099        """https://hub.allspice.io/api/swagger#/repository/repoGetContents"""
1100        url = f"/repos/{self.owner.username}/{self.name}/contents/{content.path}"
1101        data = Util.data_params_for_ref(ref or commit)
1102
1103        if content.type == Content.FILE:
1104            return self.allspice_client.requests_get(url, data)["content"]
1105        else:
1106            return [
1107                Content.parse_response(self.allspice_client, f)
1108                for f in self.allspice_client.requests_get(url, data)
1109            ]
1110
1111    def get_raw_file(
1112        self,
1113        file_path: str,
1114        ref: Optional[Ref] = None,
1115    ) -> bytes:
1116        """
1117        Get the raw, binary data of a single file.
1118
1119        Note 1: if the file you are requesting is a text file, you might want to
1120        use .decode() on the result to get a string. For example:
1121
1122            content = repo.get_raw_file("file.txt").decode("utf-8")
1123
1124        Note 2: this method will store the entire file in memory. If you want
1125        to download a large file, you might want to use `download_to_file`
1126        instead.
1127
1128        See https://hub.allspice.io/api/swagger#/repository/repoGetRawFile
1129
1130        :param file_path: The path to the file to get.
1131        :param ref: The branch or commit to get the file from.  If not provided,
1132            the default branch is used.
1133        """
1134
1135        url = self.REPO_GET_RAW_FILE.format(
1136            owner=self.owner.username,
1137            repo=self.name,
1138            path=file_path,
1139        )
1140        params = Util.data_params_for_ref(ref)
1141        return self.allspice_client.requests_get_raw(url, params=params)
1142
1143    def download_to_file(
1144        self,
1145        file_path: str,
1146        io: IO,
1147        ref: Optional[Ref] = None,
1148    ) -> None:
1149        """
1150        Download the binary data of a file to a file-like object.
1151
1152        Example:
1153
1154            with open("schematic.DSN", "wb") as f:
1155                Repository.download_to_file("Schematics/my_schematic.DSN", f)
1156
1157        :param file_path: The path to the file in the repository from the root
1158            of the repository.
1159        :param io: The file-like object to write the data to.
1160        """
1161
1162        url = self.allspice_client._AllSpice__get_url(
1163            self.REPO_GET_RAW_FILE.format(
1164                owner=self.owner.username,
1165                repo=self.name,
1166                path=file_path,
1167            )
1168        )
1169        params = Util.data_params_for_ref(ref)
1170        response = self.allspice_client.requests.get(
1171            url,
1172            params=params,
1173            headers=self.allspice_client.headers,
1174            stream=True,
1175        )
1176
1177        for chunk in response.iter_content(chunk_size=4096):
1178            if chunk:
1179                io.write(chunk)
1180
1181    def get_generated_json(self, content: Union[Content, str], ref: Optional[Ref] = None) -> dict:
1182        """
1183        Get the json blob for a cad file if it exists, otherwise enqueue
1184        a new job and return a 503 status.
1185
1186        WARNING: This is still experimental and not recommended for critical
1187        applications. The structure and content of the returned dictionary can
1188        change at any time.
1189
1190        See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON
1191        """
1192
1193        if isinstance(content, Content):
1194            content = content.path
1195
1196        url = self.REPO_GET_ALLSPICE_JSON.format(
1197            owner=self.owner.username,
1198            repo=self.name,
1199            content=content,
1200        )
1201        data = Util.data_params_for_ref(ref)
1202        return self.allspice_client.requests_get(url, data)
1203
1204    def get_generated_svg(self, content: Union[Content, str], ref: Optional[Ref] = None) -> bytes:
1205        """
1206        Get the svg blob for a cad file if it exists, otherwise enqueue
1207        a new job and return a 503 status.
1208
1209        WARNING: This is still experimental and not yet recommended for
1210        critical applications. The content of the returned svg can change
1211        at any time.
1212
1213        See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG
1214        """
1215
1216        if isinstance(content, Content):
1217            content = content.path
1218
1219        url = self.REPO_GET_ALLSPICE_SVG.format(
1220            owner=self.owner.username,
1221            repo=self.name,
1222            content=content,
1223        )
1224        data = Util.data_params_for_ref(ref)
1225        return self.allspice_client.requests_get_raw(url, data)
1226
1227    def create_file(self, file_path: str, content: str, data: Optional[dict] = None):
1228        """https://hub.allspice.io/api/swagger#/repository/repoCreateFile"""
1229        if not data:
1230            data = {}
1231        url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}"
1232        data.update({"content": content})
1233        return self.allspice_client.requests_post(url, data)
1234
1235    def change_file(self, file_path: str, file_sha: str, content: str, data: Optional[dict] = None):
1236        """https://hub.allspice.io/api/swagger#/repository/repoCreateFile"""
1237        if not data:
1238            data = {}
1239        url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}"
1240        data.update({"sha": file_sha, "content": content})
1241        return self.allspice_client.requests_put(url, data)
1242
1243    def delete_file(self, file_path: str, file_sha: str, data: Optional[dict] = None):
1244        """https://hub.allspice.io/api/swagger#/repository/repoDeleteFile"""
1245        if not data:
1246            data = {}
1247        url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}"
1248        data.update({"sha": file_sha})
1249        return self.allspice_client.requests_delete(url, data)
1250
1251    def get_archive(
1252        self,
1253        ref: Ref = "main",
1254        archive_format: ArchiveFormat = ArchiveFormat.ZIP,
1255    ) -> bytes:
1256        """
1257        Download all the files in a specific ref of a repository as a zip or tarball
1258        archive.
1259
1260        https://hub.allspice.io/api/swagger#/repository/repoGetArchive
1261
1262        :param ref: branch or commit to get content from, defaults to the "main" branch
1263        :param archive_format: zip or tar, defaults to zip
1264        """
1265
1266        ref_string = Util.data_params_for_ref(ref)["ref"]
1267        url = self.REPO_GET_ARCHIVE.format(
1268            owner=self.owner.username,
1269            repo=self.name,
1270            ref=ref_string,
1271            format=archive_format.value,
1272        )
1273        return self.allspice_client.requests_get_raw(url)
1274
1275    def get_topics(self) -> list[str]:
1276        """
1277        Gets the list of topics on this repository.
1278
1279        See http://localhost:3000/api/swagger#/repository/repoListTopics
1280        """
1281
1282        url = self.REPO_GET_TOPICS.format(
1283            owner=self.owner.username,
1284            repo=self.name,
1285        )
1286        return self.allspice_client.requests_get(url)["topics"]
1287
1288    def add_topic(self, topic: str):
1289        """
1290        Adds a topic to the repository.
1291
1292        See https://hub.allspice.io/api/swagger#/repository/repoAddTopic
1293
1294        :param topic: The topic to add. Topic names must consist only of
1295            lowercase letters, numnbers and dashes (-), and cannot start with
1296            dashes. Topic names also must be under 35 characters long.
1297        """
1298
1299        url = self.REPO_ADD_TOPIC.format(owner=self.owner.username, repo=self.name, topic=topic)
1300        self.allspice_client.requests_put(url)
1301
1302    def create_release(
1303        self,
1304        tag_name: str,
1305        name: Optional[str] = None,
1306        body: Optional[str] = None,
1307        draft: bool = False,
1308    ):
1309        """
1310        Create a release for this repository. The release will be created for
1311        the tag with the given name. If there is no tag with this name, create
1312        the tag first.
1313
1314        See https://hub.allspice.io/api/swagger#/repository/repoCreateRelease
1315        """
1316
1317        url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name)
1318        data = {
1319            "tag_name": tag_name,
1320            "draft": draft,
1321        }
1322        if name is not None:
1323            data["name"] = name
1324        if body is not None:
1325            data["body"] = body
1326        response = self.allspice_client.requests_post(url, data)
1327        return Release.parse_response(self.allspice_client, response, self)
1328
1329    def get_releases(
1330        self, draft: Optional[bool] = None, pre_release: Optional[bool] = None
1331    ) -> List[Release]:
1332        """
1333        Get the list of releases for this repository.
1334
1335        See https://hub.allspice.io/api/swagger#/repository/repoListReleases
1336        """
1337
1338        data = {}
1339
1340        if draft is not None:
1341            data["draft"] = draft
1342        if pre_release is not None:
1343            data["pre-release"] = pre_release
1344
1345        url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name)
1346        responses = self.allspice_client.requests_get_paginated(url, params=data)
1347
1348        return [
1349            Release.parse_response(self.allspice_client, response, self) for response in responses
1350        ]
1351
1352    def get_latest_release(self) -> Release:
1353        """
1354        Get the latest release for this repository.
1355
1356        See https://hub.allspice.io/api/swagger#/repository/repoGetLatestRelease
1357        """
1358
1359        url = self.REPO_GET_LATEST_RELEASE.format(owner=self.owner.username, repo=self.name)
1360        response = self.allspice_client.requests_get(url)
1361        release = Release.parse_response(self.allspice_client, response, self)
1362        return release
1363
1364    def get_release_by_tag(self, tag: str) -> Release:
1365        """
1366        Get a release by its tag.
1367
1368        See https://hub.allspice.io/api/swagger#/repository/repoGetReleaseByTag
1369        """
1370
1371        url = self.REPO_GET_RELEASE_BY_TAG.format(
1372            owner=self.owner.username, repo=self.name, tag=tag
1373        )
1374        response = self.allspice_client.requests_get(url)
1375        release = Release.parse_response(self.allspice_client, response, self)
1376        return release
1377
1378    def get_commit_statuses(
1379        self,
1380        commit: Union[str, Commit],
1381        sort: Optional[CommitStatusSort] = None,
1382        state: Optional[CommitStatusState] = None,
1383    ) -> List[CommitStatus]:
1384        """
1385        Get a list of statuses for a commit.
1386
1387        This is roughly equivalent to the Commit.get_statuses method, but this
1388        method allows you to sort and filter commits and is more convenient if
1389        you have a commit SHA and don't need to get the commit itself.
1390
1391        See https://hub.allspice.io/api/swagger#/repository/repoListStatuses
1392        """
1393
1394        if isinstance(commit, Commit):
1395            commit = commit.sha
1396
1397        params = {}
1398        if sort is not None:
1399            params["sort"] = sort.value
1400        if state is not None:
1401            params["state"] = state.value
1402
1403        url = self.REPO_GET_COMMIT_STATUS.format(
1404            owner=self.owner.username, repo=self.name, sha=commit
1405        )
1406        response = self.allspice_client.requests_get_paginated(url, params=params)
1407        return [CommitStatus.parse_response(self.allspice_client, status) for status in response]
1408
1409    def create_commit_status(
1410        self,
1411        commit: Union[str, Commit],
1412        context: Optional[str] = None,
1413        description: Optional[str] = None,
1414        state: Optional[CommitStatusState] = None,
1415        target_url: Optional[str] = None,
1416    ) -> CommitStatus:
1417        """
1418        Create a status on a commit.
1419
1420        See https://hub.allspice.io/api/swagger#/repository/repoCreateStatus
1421        """
1422
1423        if isinstance(commit, Commit):
1424            commit = commit.sha
1425
1426        data = {}
1427        if context is not None:
1428            data["context"] = context
1429        if description is not None:
1430            data["description"] = description
1431        if state is not None:
1432            data["state"] = state.value
1433        if target_url is not None:
1434            data["target_url"] = target_url
1435
1436        url = self.REPO_GET_COMMIT_STATUS.format(
1437            owner=self.owner.username, repo=self.name, sha=commit
1438        )
1439        response = self.allspice_client.requests_post(url, data=data)
1440        return CommitStatus.parse_response(self.allspice_client, response)
1441
1442    def delete(self):
1443        self.allspice_client.requests_delete(
1444            Repository.REPO_DELETE % (self.owner.username, self.name)
1445        )
1446        self.deleted = True
1447
1448
1449class Milestone(ApiObject):
1450    allow_merge_commits: Any
1451    allow_rebase: Any
1452    allow_rebase_explicit: Any
1453    allow_squash_merge: Any
1454    archived: Any
1455    closed_at: Any
1456    closed_issues: int
1457    created_at: str
1458    default_branch: Any
1459    description: str
1460    due_on: Any
1461    has_issues: Any
1462    has_pull_requests: Any
1463    has_wiki: Any
1464    id: int
1465    ignore_whitespace_conflicts: Any
1466    name: Any
1467    open_issues: int
1468    private: Any
1469    state: str
1470    title: str
1471    updated_at: str
1472    website: Any
1473
1474    API_OBJECT = """/repos/{owner}/{repo}/milestones/{number}"""  # <owner, repo>
1475
1476    def __init__(self, allspice_client):
1477        super().__init__(allspice_client)
1478
1479    def __eq__(self, other):
1480        if not isinstance(other, Milestone):
1481            return False
1482        return self.allspice_client == other.allspice_client and self.id == other.id
1483
1484    def __hash__(self):
1485        return hash(self.allspice_client) ^ hash(self.id)
1486
1487    _fields_to_parsers: ClassVar[dict] = {
1488        "closed_at": lambda _, t: Util.convert_time(t),
1489        "due_on": lambda _, t: Util.convert_time(t),
1490    }
1491
1492    _patchable_fields: ClassVar[set[str]] = {
1493        "allow_merge_commits",
1494        "allow_rebase",
1495        "allow_rebase_explicit",
1496        "allow_squash_merge",
1497        "archived",
1498        "default_branch",
1499        "description",
1500        "has_issues",
1501        "has_pull_requests",
1502        "has_wiki",
1503        "ignore_whitespace_conflicts",
1504        "name",
1505        "private",
1506        "website",
1507    }
1508
1509    @classmethod
1510    def request(cls, allspice_client, owner: str, repo: str, number: str):
1511        return cls._request(allspice_client, {"owner": owner, "repo": repo, "number": number})
1512
1513
1514class Attachment(ReadonlyApiObject):
1515    """
1516    An asset attached to a comment.
1517
1518    You cannot edit or delete the attachment from this object - see the instance methods
1519    Comment.edit_attachment and delete_attachment for that.
1520    """
1521
1522    browser_download_url: str
1523    created_at: str
1524    download_count: int
1525    id: int
1526    name: str
1527    size: int
1528    uuid: str
1529
1530    def __init__(self, allspice_client):
1531        super().__init__(allspice_client)
1532
1533    def __eq__(self, other):
1534        if not isinstance(other, Attachment):
1535            return False
1536
1537        return self.uuid == other.uuid
1538
1539    def __hash__(self):
1540        return hash(self.uuid)
1541
1542    def download_to_file(self, io: IO):
1543        """
1544        Download the raw, binary data of this Attachment to a file-like object.
1545
1546        Example:
1547
1548            with open("my_file.zip", "wb") as f:
1549                attachment.download_to_file(f)
1550
1551        :param io: The file-like object to write the data to.
1552        """
1553
1554        response = self.allspice_client.requests.get(
1555            self.browser_download_url,
1556            headers=self.allspice_client.headers,
1557            stream=True,
1558        )
1559        # 4kb chunks
1560        for chunk in response.iter_content(chunk_size=4096):
1561            if chunk:
1562                io.write(chunk)
1563
1564
1565class Comment(ApiObject):
1566    assets: List[Union[Any, Dict[str, Union[int, str]]]]
1567    body: str
1568    created_at: datetime
1569    html_url: str
1570    id: int
1571    issue_url: str
1572    original_author: str
1573    original_author_id: int
1574    pull_request_url: str
1575    updated_at: datetime
1576    user: User
1577
1578    API_OBJECT = """/repos/{owner}/{repo}/issues/comments/{id}"""
1579    GET_ATTACHMENTS_PATH = """/repos/{owner}/{repo}/issues/comments/{id}/assets"""
1580    ATTACHMENT_PATH = """/repos/{owner}/{repo}/issues/comments/{id}/assets/{attachment_id}"""
1581
1582    def __init__(self, allspice_client):
1583        super().__init__(allspice_client)
1584
1585    def __eq__(self, other):
1586        if not isinstance(other, Comment):
1587            return False
1588        return self.repository == other.repository and self.id == other.id
1589
1590    def __hash__(self):
1591        return hash(self.repository) ^ hash(self.id)
1592
1593    @classmethod
1594    def request(cls, allspice_client, owner: str, repo: str, id: str) -> "Comment":
1595        return cls._request(allspice_client, {"owner": owner, "repo": repo, "id": id})
1596
1597    _fields_to_parsers: ClassVar[dict] = {
1598        "user": lambda allspice_client, r: User.parse_response(allspice_client, r),
1599        "created_at": lambda _, t: Util.convert_time(t),
1600        "updated_at": lambda _, t: Util.convert_time(t),
1601    }
1602
1603    _patchable_fields: ClassVar[set[str]] = {"body"}
1604
1605    @property
1606    def parent_url(self) -> str:
1607        """URL of the parent of this comment (the issue or the pull request)"""
1608
1609        if self.issue_url is not None and self.issue_url != "":
1610            return self.issue_url
1611        else:
1612            return self.pull_request_url
1613
1614    @cached_property
1615    def repository(self) -> Repository:
1616        """The repository this comment was posted on."""
1617
1618        owner_name, repo_name = self.parent_url.split("/")[-4:-2]
1619        return Repository.request(self.allspice_client, owner_name, repo_name)
1620
1621    def __fields_for_path(self):
1622        return {
1623            "owner": self.repository.owner.username,
1624            "repo": self.repository.name,
1625            "id": self.id,
1626        }
1627
1628    def commit(self):
1629        self._commit(self.__fields_for_path())
1630
1631    def delete(self):
1632        self.allspice_client.requests_delete(self.API_OBJECT.format(**self.__fields_for_path()))
1633        self.deleted = True
1634
1635    def get_attachments(self) -> List[Attachment]:
1636        """
1637        Get all attachments on this comment. This returns Attachment objects, which
1638        contain a link to download the attachment.
1639
1640        https://hub.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments
1641        """
1642
1643        results = self.allspice_client.requests_get(
1644            self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path())
1645        )
1646        return [Attachment.parse_response(self.allspice_client, result) for result in results]
1647
1648    def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment:
1649        """
1650        Create an attachment on this comment.
1651
1652        https://hub.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment
1653
1654        :param file: The file to attach. This should be a file-like object.
1655        :param name: The name of the file. If not provided, the name of the file will be
1656                     used.
1657        :return: The created attachment.
1658        """
1659
1660        args: dict[str, Any] = {
1661            "files": {"attachment": file},
1662        }
1663        if name is not None:
1664            args["params"] = {"name": name}
1665
1666        result = self.allspice_client.requests_post(
1667            self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()),
1668            **args,
1669        )
1670        return Attachment.parse_response(self.allspice_client, result)
1671
1672    def edit_attachment(self, attachment: Attachment, data: dict) -> Attachment:
1673        """
1674        Edit an attachment.
1675
1676        The list of params that can be edited is available at
1677        https://hub.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment
1678
1679        :param attachment: The attachment to be edited
1680        :param data: The data parameter should be a dictionary of the fields to edit.
1681        :return: The edited attachment
1682        """
1683
1684        args = {
1685            **self.__fields_for_path(),
1686            "attachment_id": attachment.id,
1687        }
1688        result = self.allspice_client.requests_patch(
1689            self.ATTACHMENT_PATH.format(**args),
1690            data=data,
1691        )
1692        return Attachment.parse_response(self.allspice_client, result)
1693
1694    def delete_attachment(self, attachment: Attachment):
1695        """https://hub.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment"""
1696
1697        args = {
1698            **self.__fields_for_path(),
1699            "attachment_id": attachment.id,
1700        }
1701        self.allspice_client.requests_delete(self.ATTACHMENT_PATH.format(**args))
1702        attachment.deleted = True
1703
1704
1705class Commit(ReadonlyApiObject):
1706    author: User
1707    commit: Dict[str, Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]]
1708    committer: Dict[str, Union[int, str, bool]]
1709    created: str
1710    files: List[Dict[str, str]]
1711    html_url: str
1712    inner_commit: Dict[str, Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]]
1713    parents: List[Union[Dict[str, str], Any]]
1714    sha: str
1715    stats: Dict[str, int]
1716    url: str
1717
1718    API_OBJECT = """/repos/{owner}/{repo}/commits/{sha}"""
1719    COMMIT_GET_STATUS = """/repos/{owner}/{repo}/commits/{sha}/status"""
1720    COMMIT_GET_STATUSES = """/repos/{owner}/{repo}/commits/{sha}/statuses"""
1721
1722    # Regex to extract owner and repo names from the url property
1723    URL_REGEXP = re.compile(r"/repos/([^/]+)/([^/]+)/git/commits")
1724
1725    def __init__(self, allspice_client):
1726        super().__init__(allspice_client)
1727
1728    _fields_to_parsers: ClassVar[dict] = {
1729        # NOTE: api may return None for commiters that are no allspice users
1730        "author": lambda allspice_client, u: (
1731            User.parse_response(allspice_client, u) if u else None
1732        )
1733    }
1734
1735    def __eq__(self, other):
1736        if not isinstance(other, Commit):
1737            return False
1738        return self.sha == other.sha
1739
1740    def __hash__(self):
1741        return hash(self.sha)
1742
1743    @classmethod
1744    def parse_response(cls, allspice_client, result) -> "Commit":
1745        commit_cache = result["commit"]
1746        api_object = cls(allspice_client)
1747        cls._initialize(allspice_client, api_object, result)
1748        # inner_commit for legacy reasons
1749        Commit._add_read_property("inner_commit", commit_cache, api_object)
1750        return api_object
1751
1752    def get_status(self) -> CommitCombinedStatus:
1753        """
1754        Get a combined status consisting of all statues on this commit.
1755
1756        Note that the returned object is a CommitCombinedStatus object, which
1757        also contains a list of all statuses on the commit.
1758
1759        https://hub.allspice.io/api/swagger#/repository/repoGetCommitStatus
1760        """
1761
1762        result = self.allspice_client.requests_get(
1763            self.COMMIT_GET_STATUS.format(**self._fields_for_path)
1764        )
1765        return CommitCombinedStatus.parse_response(self.allspice_client, result)
1766
1767    def get_statuses(self) -> List[CommitStatus]:
1768        """
1769        Get a list of all statuses on this commit.
1770
1771        https://hub.allspice.io/api/swagger#/repository/repoListCommitStatuses
1772        """
1773
1774        results = self.allspice_client.requests_get(
1775            self.COMMIT_GET_STATUSES.format(**self._fields_for_path)
1776        )
1777        return [CommitStatus.parse_response(self.allspice_client, result) for result in results]
1778
1779    @cached_property
1780    def _fields_for_path(self) -> dict[str, str]:
1781        matches = self.URL_REGEXP.search(self.url)
1782        if not matches:
1783            raise ValueError(f"Invalid commit URL: {self.url}")
1784
1785        return {
1786            "owner": matches.group(1),
1787            "repo": matches.group(2),
1788            "sha": self.sha,
1789        }
1790
1791
1792class CommitStatusState(Enum):
1793    PENDING = "pending"
1794    SUCCESS = "success"
1795    ERROR = "error"
1796    FAILURE = "failure"
1797    WARNING = "warning"
1798
1799    @classmethod
1800    def try_init(cls, value: str) -> CommitStatusState | str:
1801        """
1802        Try converting a string to the enum, and if that fails, return the
1803        string itself.
1804        """
1805
1806        try:
1807            return cls(value)
1808        except ValueError:
1809            return value
1810
1811
1812class CommitStatus(ReadonlyApiObject):
1813    context: str
1814    created_at: str
1815    creator: User
1816    description: str
1817    id: int
1818    status: CommitStatusState
1819    target_url: str
1820    updated_at: str
1821    url: str
1822
1823    def __init__(self, allspice_client):
1824        super().__init__(allspice_client)
1825
1826    _fields_to_parsers: ClassVar[dict] = {
1827        # Gitea/ASH doesn't actually validate that the status is a "valid"
1828        # status, so we can expect empty or unknown strings in the status field.
1829        "status": lambda _, s: CommitStatusState.try_init(s),
1830        "creator": lambda allspice_client, u: (
1831            User.parse_response(allspice_client, u) if u else None
1832        ),
1833    }
1834
1835    def __eq__(self, other):
1836        if not isinstance(other, CommitStatus):
1837            return False
1838        return self.id == other.id
1839
1840    def __hash__(self):
1841        return hash(self.id)
1842
1843
1844class CommitCombinedStatus(ReadonlyApiObject):
1845    commit_url: str
1846    repository: Repository
1847    sha: str
1848    state: CommitStatusState
1849    statuses: List["CommitStatus"]
1850    total_count: int
1851    url: str
1852
1853    def __init__(self, allspice_client):
1854        super().__init__(allspice_client)
1855
1856    _fields_to_parsers: ClassVar[dict] = {
1857        # See CommitStatus
1858        "state": lambda _, s: CommitStatusState.try_init(s),
1859        "statuses": lambda allspice_client, statuses: [
1860            CommitStatus.parse_response(allspice_client, status) for status in statuses
1861        ],
1862        "repository": lambda allspice_client, r: Repository.parse_response(allspice_client, r),
1863    }
1864
1865    def __eq__(self, other):
1866        if not isinstance(other, CommitCombinedStatus):
1867            return False
1868        return self.sha == other.sha
1869
1870    def __hash__(self):
1871        return hash(self.sha)
1872
1873    @classmethod
1874    def parse_response(cls, allspice_client, result) -> "CommitCombinedStatus":
1875        api_object = cls(allspice_client)
1876        cls._initialize(allspice_client, api_object, result)
1877        return api_object
1878
1879
1880class Issue(ApiObject):
1881    """
1882    An issue on a repository.
1883
1884    Note: `Issue.assets` may not have any entries even if the issue has
1885    attachments. This happens when an issue is fetched via a bulk method like
1886    `Repository.get_issues`. In most cases, prefer using
1887    `Issue.get_attachments` to get the attachments on an issue.
1888    """
1889
1890    assets: List[Union[Any, "Attachment"]]
1891    assignee: Any
1892    assignees: Any
1893    body: str
1894    closed_at: Any
1895    comments: int
1896    created_at: str
1897    due_date: Any
1898    html_url: str
1899    id: int
1900    is_locked: bool
1901    labels: List[Any]
1902    milestone: Optional["Milestone"]
1903    number: int
1904    original_author: str
1905    original_author_id: int
1906    pin_order: int
1907    pull_request: Any
1908    ref: str
1909    repository: Dict[str, Union[int, str]]
1910    state: str
1911    title: str
1912    updated_at: str
1913    url: str
1914    user: User
1915
1916    API_OBJECT = """/repos/{owner}/{repo}/issues/{index}"""  # <owner, repo, index>
1917    GET_TIME = """/repos/%s/%s/issues/%s/times"""  # <owner, repo, index>
1918    GET_COMMENTS = """/repos/{owner}/{repo}/issues/{index}/comments"""
1919    CREATE_ISSUE = """/repos/{owner}/{repo}/issues"""
1920    GET_ATTACHMENTS = """/repos/{owner}/{repo}/issues/{index}/assets"""
1921
1922    OPENED = "open"
1923    CLOSED = "closed"
1924
1925    def __init__(self, allspice_client):
1926        super().__init__(allspice_client)
1927
1928    def __eq__(self, other):
1929        if not isinstance(other, Issue):
1930            return False
1931        return self.repository == other.repository and self.id == other.id
1932
1933    def __hash__(self):
1934        return hash(self.repository) ^ hash(self.id)
1935
1936    _fields_to_parsers: ClassVar[dict] = {
1937        "milestone": lambda allspice_client, m: Milestone.parse_response(allspice_client, m),
1938        "user": lambda allspice_client, u: User.parse_response(allspice_client, u),
1939        "assets": lambda allspice_client, assets: [
1940            Attachment.parse_response(allspice_client, a) for a in assets
1941        ],
1942        "assignee": lambda allspice_client, u: User.parse_response(allspice_client, u),
1943        "assignees": lambda allspice_client, us: [
1944            User.parse_response(allspice_client, u) for u in us
1945        ],
1946        "state": lambda _, s: (Issue.CLOSED if s == "closed" else Issue.OPENED),
1947    }
1948
1949    _parsers_to_fields: ClassVar[dict] = {
1950        "milestone": lambda m: m.id,
1951    }
1952
1953    _patchable_fields: ClassVar[set[str]] = {
1954        "assignee",
1955        "assignees",
1956        "body",
1957        "due_date",
1958        "milestone",
1959        "state",
1960        "title",
1961    }
1962
1963    def commit(self):
1964        args = {
1965            "owner": self.repository.owner.username,
1966            "repo": self.repository.name,
1967            "index": self.number,
1968        }
1969        self._commit(args)
1970
1971    @classmethod
1972    def request(cls, allspice_client, owner: str, repo: str, number: str):
1973        api_object = cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number})
1974        # The repository in the response is a RepositoryMeta object, so request
1975        # the full repository object and add it to the issue object.
1976        repository = Repository.request(allspice_client, owner, repo)
1977        setattr(api_object, "_repository", repository)
1978        # For legacy reasons
1979        cls._add_read_property("repo", repository, api_object)
1980        return api_object
1981
1982    @classmethod
1983    def create_issue(cls, allspice_client, repo: Repository, title: str, body: str = ""):
1984        args = {"owner": repo.owner.username, "repo": repo.name}
1985        data = {"title": title, "body": body}
1986        result = allspice_client.requests_post(Issue.CREATE_ISSUE.format(**args), data=data)
1987        issue = Issue.parse_response(allspice_client, result)
1988        setattr(issue, "_repository", repo)
1989        cls._add_read_property("repo", repo, issue)
1990        return issue
1991
1992    @property
1993    def owner(self) -> Organization | User:
1994        return self.repository.owner
1995
1996    def get_time_sum(self, user: User) -> int:
1997        results = self.allspice_client.requests_get(
1998            Issue.GET_TIME % (self.owner.username, self.repository.name, self.number)
1999        )
2000        return sum(result["time"] for result in results if result and result["user_id"] == user.id)
2001
2002    def get_times(self) -> Optional[Dict]:
2003        return self.allspice_client.requests_get(
2004            Issue.GET_TIME % (self.owner.username, self.repository.name, self.number)
2005        )
2006
2007    def delete_time(self, time_id: str):
2008        path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times/{time_id}"
2009        self.allspice_client.requests_delete(path)
2010
2011    def add_time(self, time: int, created: Optional[str] = None, user_name: Optional[User] = None):
2012        path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times"
2013        self.allspice_client.requests_post(
2014            path, data={"created": created, "time": int(time), "user_name": user_name}
2015        )
2016
2017    def get_comments(self) -> List[Comment]:
2018        """https://hub.allspice.io/api/swagger#/issue/issueGetComments"""
2019
2020        results = self.allspice_client.requests_get(
2021            self.GET_COMMENTS.format(
2022                owner=self.owner.username, repo=self.repository.name, index=self.number
2023            )
2024        )
2025
2026        return [Comment.parse_response(self.allspice_client, result) for result in results]
2027
2028    def create_comment(self, body: str) -> Comment:
2029        """https://hub.allspice.io/api/swagger#/issue/issueCreateComment"""
2030
2031        path = self.GET_COMMENTS.format(
2032            owner=self.owner.username, repo=self.repository.name, index=self.number
2033        )
2034
2035        response = self.allspice_client.requests_post(path, data={"body": body})
2036        return Comment.parse_response(self.allspice_client, response)
2037
2038    def get_attachments(self) -> List[Attachment]:
2039        """
2040        Fetch all attachments on this issue.
2041
2042        Unlike the assets field, this will always fetch all attachments from the
2043        API.
2044
2045        See https://hub.allspice.io/api/swagger#/issue/issueListIssueAttachments
2046        """
2047
2048        path = self.GET_ATTACHMENTS.format(
2049            owner=self.owner.username, repo=self.repository.name, index=self.number
2050        )
2051        response = self.allspice_client.requests_get(path)
2052
2053        return [Attachment.parse_response(self.allspice_client, result) for result in response]
2054
2055    def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment:
2056        """
2057        Create an attachment on this issue.
2058
2059        https://hub.allspice.io/api/swagger#/issue/issueCreateIssueAttachment
2060
2061        :param file: The file to attach. This should be a file-like object.
2062        :param name: The name of the file. If not provided, the name of the file will be
2063                     used.
2064        :return: The created attachment.
2065        """
2066
2067        args: dict[str, Any] = {
2068            "files": {"attachment": file},
2069        }
2070        if name is not None:
2071            args["params"] = {"name": name}
2072
2073        result = self.allspice_client.requests_post(
2074            self.GET_ATTACHMENTS.format(
2075                owner=self.owner.username, repo=self.repository.name, index=self.number
2076            ),
2077            **args,
2078        )
2079
2080        return Attachment.parse_response(self.allspice_client, result)
2081
2082
2083class DesignReview(ApiObject):
2084    """
2085    A Design Review. See
2086    https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest.
2087
2088    Note: The base and head fields are not `Branch` objects - they are plain strings
2089    referring to the branch names. This is because DRs can exist for branches that have
2090    been deleted, which don't have an associated `Branch` object from the API. You can use
2091    the `Repository.get_branch` method to get a `Branch` object for a branch if you know
2092    it exists.
2093    """
2094
2095    additions: int
2096    allow_maintainer_edit: bool
2097    allow_maintainer_edits: Any
2098    assignee: User
2099    assignees: List["User"]
2100    base: str
2101    body: str
2102    changed_files: int
2103    closed_at: Optional[str]
2104    comments: int
2105    created_at: str
2106    deletions: int
2107    diff_url: str
2108    draft: bool
2109    due_date: Optional[str]
2110    head: str
2111    html_url: str
2112    id: int
2113    is_locked: bool
2114    labels: List[Any]
2115    merge_base: str
2116    merge_commit_sha: Optional[str]
2117    mergeable: bool
2118    merged: bool
2119    merged_at: Optional[str]
2120    merged_by: Optional["User"]
2121    milestone: Any
2122    number: int
2123    patch_url: str
2124    pin_order: int
2125    repository: Optional["Repository"]
2126    requested_reviewers: Any
2127    review_comments: int
2128    state: str
2129    title: str
2130    updated_at: str
2131    url: str
2132    user: User
2133
2134    API_OBJECT = "/repos/{owner}/{repo}/pulls/{index}"
2135    MERGE_DESIGN_REVIEW = "/repos/{owner}/{repo}/pulls/{index}/merge"
2136    GET_COMMENTS = "/repos/{owner}/{repo}/issues/{index}/comments"
2137
2138    OPEN = "open"
2139    CLOSED = "closed"
2140
2141    class MergeType(Enum):
2142        MERGE = "merge"
2143        REBASE = "rebase"
2144        REBASE_MERGE = "rebase-merge"
2145        SQUASH = "squash"
2146        MANUALLY_MERGED = "manually-merged"
2147
2148    def __init__(self, allspice_client):
2149        super().__init__(allspice_client)
2150
2151    def __eq__(self, other):
2152        if not isinstance(other, DesignReview):
2153            return False
2154        return self.repository == other.repository and self.id == other.id
2155
2156    def __hash__(self):
2157        return hash(self.repository) ^ hash(self.id)
2158
2159    @classmethod
2160    def parse_response(cls, allspice_client, result) -> "DesignReview":
2161        api_object = super().parse_response(allspice_client, result)
2162        cls._add_read_property(
2163            "repository",
2164            Repository.parse_response(allspice_client, result["base"]["repo"]),
2165            api_object,
2166        )
2167
2168        return api_object
2169
2170    @classmethod
2171    def request(cls, allspice_client, owner: str, repo: str, number: str):
2172        """See https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest"""
2173        return cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number})
2174
2175    _fields_to_parsers: ClassVar[dict] = {
2176        "assignee": lambda allspice_client, u: User.parse_response(allspice_client, u),
2177        "assignees": lambda allspice_client, us: [
2178            User.parse_response(allspice_client, u) for u in us
2179        ],
2180        "base": lambda _, b: b["ref"],
2181        "head": lambda _, h: h["ref"],
2182        "merged_by": lambda allspice_client, u: User.parse_response(allspice_client, u),
2183        "milestone": lambda allspice_client, m: Milestone.parse_response(allspice_client, m),
2184        "user": lambda allspice_client, u: User.parse_response(allspice_client, u),
2185    }
2186
2187    _patchable_fields: ClassVar[set[str]] = {
2188        "allow_maintainer_edits",
2189        "assignee",
2190        "assignees",
2191        "base",
2192        "body",
2193        "due_date",
2194        "milestone",
2195        "state",
2196        "title",
2197    }
2198
2199    _parsers_to_fields: ClassVar[dict] = {
2200        "assignee": lambda u: u.username,
2201        "assignees": lambda us: [u.username for u in us],
2202        "base": lambda b: b.name if isinstance(b, Branch) else b,
2203        "milestone": lambda m: m.id,
2204    }
2205
2206    def commit(self):
2207        data = self.get_dirty_fields()
2208        if "due_date" in data and data["due_date"] is None:
2209            data["unset_due_date"] = True
2210
2211        args = {
2212            "owner": self.repository.owner.username,
2213            "repo": self.repository.name,
2214            "index": self.number,
2215        }
2216        self._commit(args, data)
2217
2218    def merge(self, merge_type: MergeType):
2219        """
2220        Merge the pull request. See
2221        https://hub.allspice.io/api/swagger#/repository/repoMergePullRequest
2222
2223        :param merge_type: The type of merge to perform. See the MergeType enum.
2224        """
2225
2226        self.allspice_client.requests_post(
2227            self.MERGE_DESIGN_REVIEW.format(
2228                owner=self.repository.owner.username,
2229                repo=self.repository.name,
2230                index=self.number,
2231            ),
2232            data={"Do": merge_type.value},
2233        )
2234
2235    def get_comments(self) -> List[Comment]:
2236        """
2237        Get the comments on this pull request, but not specifically on a review.
2238
2239        https://hub.allspice.io/api/swagger#/issue/issueGetComments
2240
2241        :return: A list of comments on this pull request.
2242        """
2243
2244        results = self.allspice_client.requests_get(
2245            self.GET_COMMENTS.format(
2246                owner=self.repository.owner.username,
2247                repo=self.repository.name,
2248                index=self.number,
2249            )
2250        )
2251        return [Comment.parse_response(self.allspice_client, result) for result in results]
2252
2253    def create_comment(self, body: str):
2254        """
2255        Create a comment on this pull request. This uses the same endpoint as the
2256        comments on issues, and will not be associated with any reviews.
2257
2258        https://hub.allspice.io/api/swagger#/issue/issueCreateComment
2259
2260        :param body: The body of the comment.
2261        :return: The comment that was created.
2262        """
2263
2264        result = self.allspice_client.requests_post(
2265            self.GET_COMMENTS.format(
2266                owner=self.repository.owner.username,
2267                repo=self.repository.name,
2268                index=self.number,
2269            ),
2270            data={"body": body},
2271        )
2272        return Comment.parse_response(self.allspice_client, result)
2273
2274
2275class Team(ApiObject):
2276    can_create_org_repo: bool
2277    description: str
2278    id: int
2279    includes_all_repositories: bool
2280    name: str
2281    organization: Optional["Organization"]
2282    permission: str
2283    units: List[str]
2284    units_map: Dict[str, str]
2285
2286    API_OBJECT = """/teams/{id}"""  # <id>
2287    ADD_REPO = """/teams/%s/repos/%s/%s"""  # <id, org, repo>
2288    TEAM_DELETE = """/teams/%s"""  # <id>
2289    GET_MEMBERS = """/teams/%s/members"""  # <id>
2290    GET_REPOS = """/teams/%s/repos"""  # <id>
2291
2292    def __init__(self, allspice_client):
2293        super().__init__(allspice_client)
2294
2295    def __eq__(self, other):
2296        if not isinstance(other, Team):
2297            return False
2298        return self.organization == other.organization and self.id == other.id
2299
2300    def __hash__(self):
2301        return hash(self.organization) ^ hash(self.id)
2302
2303    _fields_to_parsers: ClassVar[dict] = {
2304        "organization": lambda allspice_client, o: Organization.parse_response(allspice_client, o)
2305    }
2306
2307    _patchable_fields: ClassVar[set[str]] = {
2308        "can_create_org_repo",
2309        "description",
2310        "includes_all_repositories",
2311        "name",
2312        "permission",
2313        "units",
2314        "units_map",
2315    }
2316
2317    @classmethod
2318    def request(cls, allspice_client, id: int):
2319        return cls._request(allspice_client, {"id": id})
2320
2321    def commit(self):
2322        args = {"id": self.id}
2323        self._commit(args)
2324
2325    def add_user(self, user: User):
2326        """https://hub.allspice.io/api/swagger#/organization/orgAddTeamMember"""
2327        url = f"/teams/{self.id}/members/{user.login}"
2328        self.allspice_client.requests_put(url)
2329
2330    def add_repo(self, org: Organization, repo: Union[Repository, str]):
2331        if isinstance(repo, Repository):
2332            repo_name = repo.name
2333        else:
2334            repo_name = repo
2335        self.allspice_client.requests_put(Team.ADD_REPO % (self.id, org.username, repo_name))
2336
2337    def get_members(self):
2338        """Get all users assigned to the team."""
2339        results = self.allspice_client.requests_get_paginated(
2340            Team.GET_MEMBERS % self.id,
2341        )
2342        return [User.parse_response(self.allspice_client, result) for result in results]
2343
2344    def get_repos(self):
2345        """Get all repos of this Team."""
2346        results = self.allspice_client.requests_get(Team.GET_REPOS % self.id)
2347        return [Repository.parse_response(self.allspice_client, result) for result in results]
2348
2349    def delete(self):
2350        self.allspice_client.requests_delete(Team.TEAM_DELETE % self.id)
2351        self.deleted = True
2352
2353    def remove_team_member(self, user_name: str):
2354        url = f"/teams/{self.id}/members/{user_name}"
2355        self.allspice_client.requests_delete(url)
2356
2357
2358class Release(ApiObject):
2359    """
2360    A release on a repo.
2361    """
2362
2363    assets: List[Union[Any, Dict[str, Union[int, str]], "ReleaseAsset"]]
2364    author: User
2365    body: str
2366    created_at: str
2367    draft: bool
2368    html_url: str
2369    id: int
2370    name: str
2371    prerelease: bool
2372    published_at: str
2373    repo: Optional["Repository"]
2374    repository: Optional["Repository"]
2375    tag_name: str
2376    tarball_url: str
2377    target_commitish: str
2378    upload_url: str
2379    url: str
2380    zipball_url: str
2381
2382    API_OBJECT = "/repos/{owner}/{repo}/releases/{id}"
2383    RELEASE_CREATE_ASSET = "/repos/{owner}/{repo}/releases/{id}/assets"
2384    # Note that we don't strictly need the get_assets route, as the release
2385    # object already contains the assets.
2386
2387    def __init__(self, allspice_client):
2388        super().__init__(allspice_client)
2389
2390    def __eq__(self, other):
2391        if not isinstance(other, Release):
2392            return False
2393        return self.repo == other.repo and self.id == other.id
2394
2395    def __hash__(self):
2396        return hash(self.repo) ^ hash(self.id)
2397
2398    _fields_to_parsers: ClassVar[dict] = {
2399        "author": lambda allspice_client, author: User.parse_response(allspice_client, author),
2400    }
2401    _patchable_fields: ClassVar[set[str]] = {
2402        "body",
2403        "draft",
2404        "name",
2405        "prerelease",
2406        "tag_name",
2407        "target_commitish",
2408    }
2409
2410    @classmethod
2411    def parse_response(cls, allspice_client, result, repo) -> Release:
2412        release = super().parse_response(allspice_client, result)
2413        Release._add_read_property("repository", repo, release)
2414        # For legacy reasons
2415        Release._add_read_property("repo", repo, release)
2416        setattr(
2417            release,
2418            "_assets",
2419            [
2420                ReleaseAsset.parse_response(allspice_client, asset, release)
2421                for asset in result["assets"]
2422            ],
2423        )
2424        return release
2425
2426    @classmethod
2427    def request(
2428        cls,
2429        allspice_client,
2430        owner: str,
2431        repo: str,
2432        id: Optional[int] = None,
2433    ) -> Release:
2434        args = {"owner": owner, "repo": repo, "id": id}
2435        release_response = cls._get_gitea_api_object(allspice_client, args)
2436        repository = Repository.request(allspice_client, owner, repo)
2437        release = cls.parse_response(allspice_client, release_response, repository)
2438        return release
2439
2440    def commit(self):
2441        if self.repo is None:
2442            raise ValueError("Cannot commit a release without a repository.")
2443
2444        args = {"owner": self.repo.owner.username, "repo": self.repo.name, "id": self.id}
2445        self._commit(args)
2446
2447    def create_asset(self, file: IO, name: Optional[str] = None) -> ReleaseAsset:
2448        """
2449        Create an asset for this release.
2450
2451        https://hub.allspice.io/api/swagger#/repository/repoCreateReleaseAsset
2452
2453        :param file: The file to upload. This should be a file-like object.
2454        :param name: The name of the file.
2455        :return: The created asset.
2456        """
2457
2458        if self.repo is None:
2459            raise ValueError("Cannot commit a release without a repository.")
2460
2461        args: dict[str, Any] = {"files": {"attachment": file}}
2462        if name is not None:
2463            args["params"] = {"name": name}
2464
2465        result = self.allspice_client.requests_post(
2466            self.RELEASE_CREATE_ASSET.format(
2467                owner=self.repo.owner.username,
2468                repo=self.repo.name,
2469                id=self.id,
2470            ),
2471            **args,
2472        )
2473        return ReleaseAsset.parse_response(self.allspice_client, result, self)
2474
2475    def delete(self):
2476        if self.repo is None:
2477            raise ValueError("Cannot commit a release without a repository.")
2478
2479        args = {"owner": self.repo.owner.username, "repo": self.repo.name, "id": self.id}
2480        self.allspice_client.requests_delete(self.API_OBJECT.format(**args))
2481        self.deleted = True
2482
2483
2484class ReleaseAsset(ApiObject):
2485    browser_download_url: str
2486    created_at: str
2487    download_count: int
2488    id: int
2489    name: str
2490    release: Optional["Release"]
2491    size: int
2492    uuid: str
2493
2494    API_OBJECT = "/repos/{owner}/{repo}/releases/{release_id}/assets/{id}"
2495
2496    def __init__(self, allspice_client):
2497        super().__init__(allspice_client)
2498
2499    def __eq__(self, other):
2500        if not isinstance(other, ReleaseAsset):
2501            return False
2502        return self.release == other.release and self.id == other.id
2503
2504    def __hash__(self):
2505        return hash(self.release) ^ hash(self.id)
2506
2507    _fields_to_parsers: ClassVar[dict] = {}
2508    _patchable_fields: ClassVar[set[str]] = {
2509        "name",
2510    }
2511
2512    @classmethod
2513    def parse_response(cls, allspice_client, result, release) -> ReleaseAsset:
2514        asset = super().parse_response(allspice_client, result)
2515        ReleaseAsset._add_read_property("release", release, asset)
2516        return asset
2517
2518    @classmethod
2519    def request(
2520        cls,
2521        allspice_client,
2522        owner: str,
2523        repo: str,
2524        release_id: int,
2525        id: int,
2526    ) -> ReleaseAsset:
2527        args = {"owner": owner, "repo": repo, "release_id": release_id, "id": id}
2528        asset_response = cls._get_gitea_api_object(allspice_client, args)
2529        release = Release.request(allspice_client, owner, repo, release_id)
2530        asset = cls.parse_response(allspice_client, asset_response, release)
2531        return asset
2532
2533    def commit(self):
2534        if self.release is None or self.release.repo is None:
2535            raise ValueError("Cannot commit a release asset without a release or a repository.")
2536
2537        args = {
2538            "owner": self.release.repo.owner.username,
2539            "repo": self.release.repo.name,
2540            "release_id": self.release.id,
2541            "id": self.id,
2542        }
2543        self._commit(args)
2544
2545    def download(self) -> bytes:
2546        """
2547        Download the raw, binary data of this asset.
2548
2549        Note 1: if the file you are requesting is a text file, you might want to
2550        use .decode() on the result to get a string. For example:
2551
2552            asset.download().decode("utf-8")
2553
2554        Note 2: this method will store the entire file in memory. If you are
2555        downloading a large file, you might want to use download_to_file instead.
2556        """
2557
2558        return self.allspice_client.requests.get(
2559            self.browser_download_url,
2560            headers=self.allspice_client.headers,
2561        ).content
2562
2563    def download_to_file(self, io: IO):
2564        """
2565        Download the raw, binary data of this asset to a file-like object.
2566
2567        Example:
2568
2569            with open("my_file.zip", "wb") as f:
2570                asset.download_to_file(f)
2571
2572        :param io: The file-like object to write the data to.
2573        """
2574
2575        response = self.allspice_client.requests.get(
2576            self.browser_download_url,
2577            headers=self.allspice_client.headers,
2578            stream=True,
2579        )
2580        # 4kb chunks
2581        for chunk in response.iter_content(chunk_size=4096):
2582            if chunk:
2583                io.write(chunk)
2584
2585    def delete(self):
2586        if self.release is None or self.release.repo is None:
2587            raise ValueError("Cannot commit a release asset without a release or a repository.")
2588
2589        args = {
2590            "owner": self.release.repo.owner.username,
2591            "repo": self.release.repo.name,
2592            "release_id": self.release.id,
2593            "id": self.id,
2594        }
2595        self.allspice_client.requests_delete(self.API_OBJECT.format(**args))
2596        self.deleted = True
2597
2598
2599class Content(ReadonlyApiObject):
2600    content: Any
2601    download_url: str
2602    encoding: Any
2603    git_url: str
2604    html_url: str
2605    last_commit_sha: str
2606    name: str
2607    path: str
2608    sha: str
2609    size: int
2610    submodule_git_url: Any
2611    target: Any
2612    type: str
2613    url: str
2614
2615    FILE = "file"
2616
2617    def __init__(self, allspice_client):
2618        super().__init__(allspice_client)
2619
2620    def __eq__(self, other):
2621        if not isinstance(other, Content):
2622            return False
2623
2624        return self.sha == other.sha and self.name == other.name
2625
2626    def __hash__(self):
2627        return hash(self.sha) ^ hash(self.name)
2628
2629
2630Ref = Union[Branch, Commit, str]
2631
2632
2633class Util:
2634    @staticmethod
2635    def convert_time(time: str) -> datetime:
2636        """Parsing of strange Gitea time format ("%Y-%m-%dT%H:%M:%S:%z" but with ":" in time zone notation)"""
2637        try:
2638            return datetime.strptime(time[:-3] + "00", "%Y-%m-%dT%H:%M:%S%z")
2639        except ValueError:
2640            return datetime.strptime(time[:-3] + "00", "%Y-%m-%dT%H:%M:%S")
2641
2642    @staticmethod
2643    def format_time(time: datetime) -> str:
2644        """
2645        Format a datetime object to Gitea's time format.
2646
2647        :param time: The time to format
2648        :return: Formatted time
2649        """
2650
2651        return time.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S") + "Z"
2652
2653    @staticmethod
2654    def data_params_for_ref(ref: Optional[Ref]) -> Dict:
2655        """
2656        Given a "ref", returns a dict with the ref parameter for the API call.
2657
2658        If the ref is None, returns an empty dict. You can pass this to the API
2659        directly.
2660        """
2661
2662        if isinstance(ref, Branch):
2663            return {"ref": ref.name}
2664        elif isinstance(ref, Commit):
2665            return {"ref": ref.sha}
2666        elif ref:
2667            return {"ref": ref}
2668        else:
2669            return {}
class Organization(allspice.baseapiobject.ApiObject):
 33class Organization(ApiObject):
 34    """see https://hub.allspice.io/api/swagger#/organization/orgGetAll"""
 35
 36    active: Optional[bool]
 37    avatar_url: str
 38    created: Optional[str]
 39    description: str
 40    email: str
 41    followers_count: Optional[int]
 42    following_count: Optional[int]
 43    full_name: str
 44    html_url: Optional[str]
 45    id: int
 46    is_admin: Optional[bool]
 47    language: Optional[str]
 48    last_login: Optional[str]
 49    location: str
 50    login: Optional[str]
 51    login_name: Optional[str]
 52    name: Optional[str]
 53    prohibit_login: Optional[bool]
 54    repo_admin_change_team_access: Optional[bool]
 55    restricted: Optional[bool]
 56    source_id: Optional[int]
 57    starred_repos_count: Optional[int]
 58    username: str
 59    visibility: str
 60    website: str
 61
 62    API_OBJECT = """/orgs/{name}"""  # <org>
 63    ORG_REPOS_REQUEST = """/orgs/%s/repos"""  # <org>
 64    ORG_TEAMS_REQUEST = """/orgs/%s/teams"""  # <org>
 65    ORG_TEAMS_CREATE = """/orgs/%s/teams"""  # <org>
 66    ORG_GET_MEMBERS = """/orgs/%s/members"""  # <org>
 67    ORG_IS_MEMBER = """/orgs/%s/members/%s"""  # <org>, <username>
 68    ORG_HEATMAP = """/users/%s/heatmap"""  # <username>
 69
 70    def __init__(self, allspice_client):
 71        super().__init__(allspice_client)
 72
 73    def __eq__(self, other):
 74        if not isinstance(other, Organization):
 75            return False
 76        return self.allspice_client == other.allspice_client and self.name == other.name
 77
 78    def __hash__(self):
 79        return hash(self.allspice_client) ^ hash(self.name)
 80
 81    @classmethod
 82    def request(cls, allspice_client, name: str) -> Self:
 83        return cls._request(allspice_client, {"name": name})
 84
 85    @classmethod
 86    def parse_response(cls, allspice_client, result) -> "Organization":
 87        api_object = super().parse_response(allspice_client, result)
 88        # add "name" field to make this behave similar to users for gitea < 1.18
 89        # also necessary for repository-owner when org is repo owner
 90        if not hasattr(api_object, "name"):
 91            Organization._add_read_property("name", result["username"], api_object)
 92        return api_object
 93
 94    _patchable_fields: ClassVar[set[str]] = {
 95        "description",
 96        "full_name",
 97        "location",
 98        "visibility",
 99        "website",
100    }
101
102    def commit(self):
103        args = {"name": self.name}
104        self._commit(args)
105
106    def create_repo(
107        self,
108        repoName: str,
109        description: str = "",
110        private: bool = False,
111        autoInit=True,
112        gitignores: Optional[str] = None,
113        license: Optional[str] = None,
114        readme: str = "Default",
115        issue_labels: Optional[str] = None,
116        default_branch="master",
117    ):
118        """Create an organization Repository
119
120        Throws:
121            AlreadyExistsException: If the Repository exists already.
122            Exception: If something else went wrong.
123        """
124        result = self.allspice_client.requests_post(
125            f"/orgs/{self.name}/repos",
126            data={
127                "name": repoName,
128                "description": description,
129                "private": private,
130                "auto_init": autoInit,
131                "gitignores": gitignores,
132                "license": license,
133                "issue_labels": issue_labels,
134                "readme": readme,
135                "default_branch": default_branch,
136            },
137        )
138        if "id" in result:
139            self.allspice_client.logger.info("Successfully created Repository %s " % result["name"])
140        else:
141            self.allspice_client.logger.error(result["message"])
142            raise Exception("Repository not created... (gitea: %s)" % result["message"])
143        return Repository.parse_response(self.allspice_client, result)
144
145    def get_repositories(self) -> List["Repository"]:
146        results = self.allspice_client.requests_get_paginated(
147            Organization.ORG_REPOS_REQUEST % self.username
148        )
149        return [Repository.parse_response(self.allspice_client, result) for result in results]
150
151    def get_repository(self, name) -> "Repository":
152        repos = self.get_repositories()
153        for repo in repos:
154            if repo.name == name:
155                return repo
156        raise NotFoundException("Repository %s not existent in organization." % name)
157
158    def get_teams(self) -> List["Team"]:
159        results = self.allspice_client.requests_get(Organization.ORG_TEAMS_REQUEST % self.username)
160        teams = [Team.parse_response(self.allspice_client, result) for result in results]
161        # organisation seems to be missing using this request, so we add org manually
162        for t in teams:
163            setattr(t, "_organization", self)
164        return teams
165
166    def get_team(self, name) -> "Team":
167        teams = self.get_teams()
168        for team in teams:
169            if team.name == name:
170                return team
171        raise NotFoundException("Team not existent in organization.")
172
173    def create_team(
174        self,
175        name: str,
176        description: str = "",
177        permission: str = "read",
178        can_create_org_repo: bool = False,
179        includes_all_repositories: bool = False,
180        units=(
181            "repo.code",
182            "repo.issues",
183            "repo.ext_issues",
184            "repo.wiki",
185            "repo.pulls",
186            "repo.releases",
187            "repo.ext_wiki",
188        ),
189        units_map={},
190    ) -> "Team":
191        """Alias for AllSpice#create_team"""
192        # TODO: Move AllSpice#create_team to Organization#create_team and
193        #       deprecate AllSpice#create_team.
194        return self.allspice_client.create_team(
195            org=self,
196            name=name,
197            description=description,
198            permission=permission,
199            can_create_org_repo=can_create_org_repo,
200            includes_all_repositories=includes_all_repositories,
201            units=units,
202            units_map=units_map,
203        )
204
205    def get_members(self) -> List["User"]:
206        results = self.allspice_client.requests_get(Organization.ORG_GET_MEMBERS % self.username)
207        return [User.parse_response(self.allspice_client, result) for result in results]
208
209    def is_member(self, username) -> bool:
210        if isinstance(username, User):
211            username = username.username
212        try:
213            # returns 204 if its ok, 404 if its not
214            self.allspice_client.requests_get(
215                Organization.ORG_IS_MEMBER % (self.username, username)
216            )
217            return True
218        except Exception:
219            return False
220
221    def remove_member(self, user: "User"):
222        path = f"/orgs/{self.username}/members/{user.username}"
223        self.allspice_client.requests_delete(path)
224
225    def delete(self):
226        """Delete this Organization. Invalidates this Objects data. Also deletes all Repositories owned by the User"""
227        for repo in self.get_repositories():
228            repo.delete()
229        self.allspice_client.requests_delete(Organization.API_OBJECT.format(name=self.username))
230        self.deleted = True
231
232    def get_heatmap(self) -> List[Tuple[datetime, int]]:
233        results = self.allspice_client.requests_get(User.USER_HEATMAP % self.username)
234        results = [
235            (datetime.fromtimestamp(result["timestamp"]), result["contributions"])
236            for result in results
237        ]
238        return results

see allspice.allspice.io/api/swagger#/organization/orgGetAll">https://huballspice.allspice.io/api/swagger#/organization/orgGetAll

Organization(allspice_client)
70    def __init__(self, allspice_client):
71        super().__init__(allspice_client)
active: Optional[bool]
avatar_url: str
created: Optional[str]
description: str
email: str
followers_count: Optional[int]
following_count: Optional[int]
full_name: str
html_url: Optional[str]
id: int
is_admin: Optional[bool]
language: Optional[str]
last_login: Optional[str]
location: str
login: Optional[str]
login_name: Optional[str]
name: Optional[str]
prohibit_login: Optional[bool]
repo_admin_change_team_access: Optional[bool]
restricted: Optional[bool]
source_id: Optional[int]
starred_repos_count: Optional[int]
username: str
visibility: str
website: str
API_OBJECT = '/orgs/{name}'
ORG_REPOS_REQUEST = '/orgs/%s/repos'
ORG_TEAMS_REQUEST = '/orgs/%s/teams'
ORG_TEAMS_CREATE = '/orgs/%s/teams'
ORG_GET_MEMBERS = '/orgs/%s/members'
ORG_IS_MEMBER = '/orgs/%s/members/%s'
ORG_HEATMAP = '/users/%s/heatmap'
@classmethod
def request(cls, allspice_client, name: str) -> Self:
81    @classmethod
82    def request(cls, allspice_client, name: str) -> Self:
83        return cls._request(allspice_client, {"name": name})
@classmethod
def parse_response(cls, allspice_client, result) -> Organization:
85    @classmethod
86    def parse_response(cls, allspice_client, result) -> "Organization":
87        api_object = super().parse_response(allspice_client, result)
88        # add "name" field to make this behave similar to users for gitea < 1.18
89        # also necessary for repository-owner when org is repo owner
90        if not hasattr(api_object, "name"):
91            Organization._add_read_property("name", result["username"], api_object)
92        return api_object
def commit(self):
102    def commit(self):
103        args = {"name": self.name}
104        self._commit(args)
def create_repo( self, repoName: str, description: str = '', private: bool = False, autoInit=True, gitignores: Optional[str] = None, license: Optional[str] = None, readme: str = 'Default', issue_labels: Optional[str] = None, default_branch='master'):
106    def create_repo(
107        self,
108        repoName: str,
109        description: str = "",
110        private: bool = False,
111        autoInit=True,
112        gitignores: Optional[str] = None,
113        license: Optional[str] = None,
114        readme: str = "Default",
115        issue_labels: Optional[str] = None,
116        default_branch="master",
117    ):
118        """Create an organization Repository
119
120        Throws:
121            AlreadyExistsException: If the Repository exists already.
122            Exception: If something else went wrong.
123        """
124        result = self.allspice_client.requests_post(
125            f"/orgs/{self.name}/repos",
126            data={
127                "name": repoName,
128                "description": description,
129                "private": private,
130                "auto_init": autoInit,
131                "gitignores": gitignores,
132                "license": license,
133                "issue_labels": issue_labels,
134                "readme": readme,
135                "default_branch": default_branch,
136            },
137        )
138        if "id" in result:
139            self.allspice_client.logger.info("Successfully created Repository %s " % result["name"])
140        else:
141            self.allspice_client.logger.error(result["message"])
142            raise Exception("Repository not created... (gitea: %s)" % result["message"])
143        return Repository.parse_response(self.allspice_client, result)

Create an organization Repository

Throws: AlreadyExistsException: If the Repository exists already. Exception: If something else went wrong.

def get_repositories(self) -> List[Repository]:
145    def get_repositories(self) -> List["Repository"]:
146        results = self.allspice_client.requests_get_paginated(
147            Organization.ORG_REPOS_REQUEST % self.username
148        )
149        return [Repository.parse_response(self.allspice_client, result) for result in results]
def get_repository(self, name) -> Repository:
151    def get_repository(self, name) -> "Repository":
152        repos = self.get_repositories()
153        for repo in repos:
154            if repo.name == name:
155                return repo
156        raise NotFoundException("Repository %s not existent in organization." % name)
def get_teams(self) -> List[Team]:
158    def get_teams(self) -> List["Team"]:
159        results = self.allspice_client.requests_get(Organization.ORG_TEAMS_REQUEST % self.username)
160        teams = [Team.parse_response(self.allspice_client, result) for result in results]
161        # organisation seems to be missing using this request, so we add org manually
162        for t in teams:
163            setattr(t, "_organization", self)
164        return teams
def get_team(self, name) -> Team:
166    def get_team(self, name) -> "Team":
167        teams = self.get_teams()
168        for team in teams:
169            if team.name == name:
170                return team
171        raise NotFoundException("Team not existent in organization.")
def create_team( self, name: str, description: str = '', permission: str = 'read', can_create_org_repo: bool = False, includes_all_repositories: bool = False, units=('repo.code', 'repo.issues', 'repo.ext_issues', 'repo.wiki', 'repo.pulls', 'repo.releases', 'repo.ext_wiki'), units_map={}) -> Team:
173    def create_team(
174        self,
175        name: str,
176        description: str = "",
177        permission: str = "read",
178        can_create_org_repo: bool = False,
179        includes_all_repositories: bool = False,
180        units=(
181            "repo.code",
182            "repo.issues",
183            "repo.ext_issues",
184            "repo.wiki",
185            "repo.pulls",
186            "repo.releases",
187            "repo.ext_wiki",
188        ),
189        units_map={},
190    ) -> "Team":
191        """Alias for AllSpice#create_team"""
192        # TODO: Move AllSpice#create_team to Organization#create_team and
193        #       deprecate AllSpice#create_team.
194        return self.allspice_client.create_team(
195            org=self,
196            name=name,
197            description=description,
198            permission=permission,
199            can_create_org_repo=can_create_org_repo,
200            includes_all_repositories=includes_all_repositories,
201            units=units,
202            units_map=units_map,
203        )

Alias for AllSpice#create_team

def get_members(self) -> List[User]:
205    def get_members(self) -> List["User"]:
206        results = self.allspice_client.requests_get(Organization.ORG_GET_MEMBERS % self.username)
207        return [User.parse_response(self.allspice_client, result) for result in results]
def is_member(self, username) -> bool:
209    def is_member(self, username) -> bool:
210        if isinstance(username, User):
211            username = username.username
212        try:
213            # returns 204 if its ok, 404 if its not
214            self.allspice_client.requests_get(
215                Organization.ORG_IS_MEMBER % (self.username, username)
216            )
217            return True
218        except Exception:
219            return False
def remove_member(self, user: User):
221    def remove_member(self, user: "User"):
222        path = f"/orgs/{self.username}/members/{user.username}"
223        self.allspice_client.requests_delete(path)
def delete(self):
225    def delete(self):
226        """Delete this Organization. Invalidates this Objects data. Also deletes all Repositories owned by the User"""
227        for repo in self.get_repositories():
228            repo.delete()
229        self.allspice_client.requests_delete(Organization.API_OBJECT.format(name=self.username))
230        self.deleted = True

Delete this Organization. Invalidates this Objects data. Also deletes all Repositories owned by the User

def get_heatmap(self) -> List[Tuple[datetime.datetime, int]]:
232    def get_heatmap(self) -> List[Tuple[datetime, int]]:
233        results = self.allspice_client.requests_get(User.USER_HEATMAP % self.username)
234        results = [
235            (datetime.fromtimestamp(result["timestamp"]), result["contributions"])
236            for result in results
237        ]
238        return results
class User(allspice.baseapiobject.ApiObject):
241class User(ApiObject):
242    active: bool
243    admin: Any
244    allow_create_organization: Any
245    allow_git_hook: Any
246    allow_import_local: Any
247    avatar_url: str
248    created: str
249    description: str
250    email: str
251    emails: List[Any]
252    followers_count: int
253    following_count: int
254    full_name: str
255    html_url: str
256    id: int
257    is_admin: bool
258    language: str
259    last_login: str
260    location: str
261    login: str
262    login_name: str
263    max_repo_creation: Any
264    must_change_password: Any
265    password: Any
266    prohibit_login: bool
267    restricted: bool
268    source_id: int
269    starred_repos_count: int
270    username: str
271    visibility: str
272    website: str
273
274    API_OBJECT = """/users/{name}"""  # <org>
275    USER_MAIL = """/user/emails?sudo=%s"""  # <name>
276    USER_PATCH = """/admin/users/%s"""  # <username>
277    ADMIN_DELETE_USER = """/admin/users/%s"""  # <username>
278    ADMIN_EDIT_USER = """/admin/users/{username}"""  # <username>
279    USER_HEATMAP = """/users/%s/heatmap"""  # <username>
280
281    def __init__(self, allspice_client):
282        super().__init__(allspice_client)
283        self._emails = []
284
285    def __eq__(self, other):
286        if not isinstance(other, User):
287            return False
288        return self.allspice_client == other.allspice_client and self.id == other.id
289
290    def __hash__(self):
291        return hash(self.allspice_client) ^ hash(self.id)
292
293    @property
294    def emails(self):
295        self.__request_emails()
296        return self._emails
297
298    @classmethod
299    def request(cls, allspice_client, name: str) -> "User":
300        api_object = cls._request(allspice_client, {"name": name})
301        return api_object
302
303    _patchable_fields: ClassVar[set[str]] = {
304        "active",
305        "admin",
306        "allow_create_organization",
307        "allow_git_hook",
308        "allow_import_local",
309        "email",
310        "full_name",
311        "location",
312        "login_name",
313        "max_repo_creation",
314        "must_change_password",
315        "password",
316        "prohibit_login",
317        "website",
318    }
319
320    def commit(self, login_name: str, source_id: int = 0):
321        """
322        Unfortunately it is necessary to require the login name
323        as well as the login source (that is not supplied when getting a user) for
324        changing a user.
325        Usually source_id is 0 and the login_name is equal to the username.
326        """
327        values = self.get_dirty_fields()
328        values.update(
329            # api-doc says that the "source_id" is necessary; works without though
330            {"login_name": login_name, "source_id": source_id}
331        )
332        args = {"username": self.username}
333        self.allspice_client.requests_patch(User.ADMIN_EDIT_USER.format(**args), data=values)
334        self._dirty_fields = {}
335
336    def create_repo(
337        self,
338        repoName: str,
339        description: str = "",
340        private: bool = False,
341        autoInit=True,
342        gitignores: Optional[str] = None,
343        license: Optional[str] = None,
344        readme: str = "Default",
345        issue_labels: Optional[str] = None,
346        default_branch="master",
347    ):
348        """Create a user Repository
349
350        Throws:
351            AlreadyExistsException: If the Repository exists already.
352            Exception: If something else went wrong.
353        """
354        result = self.allspice_client.requests_post(
355            "/user/repos",
356            data={
357                "name": repoName,
358                "description": description,
359                "private": private,
360                "auto_init": autoInit,
361                "gitignores": gitignores,
362                "license": license,
363                "issue_labels": issue_labels,
364                "readme": readme,
365                "default_branch": default_branch,
366            },
367        )
368        if "id" in result:
369            self.allspice_client.logger.info("Successfully created Repository %s " % result["name"])
370        else:
371            self.allspice_client.logger.error(result["message"])
372            raise Exception("Repository not created... (gitea: %s)" % result["message"])
373        return Repository.parse_response(self.allspice_client, result)
374
375    def get_repositories(self) -> List["Repository"]:
376        """Get all Repositories owned by this User."""
377        url = f"/users/{self.username}/repos"
378        results = self.allspice_client.requests_get_paginated(url)
379        return [Repository.parse_response(self.allspice_client, result) for result in results]
380
381    def get_orgs(self) -> List[Organization]:
382        """Get all Organizations this user is a member of."""
383        url = f"/users/{self.username}/orgs"
384        results = self.allspice_client.requests_get_paginated(url)
385        return [Organization.parse_response(self.allspice_client, result) for result in results]
386
387    def get_teams(self) -> List["Team"]:
388        url = "/user/teams"
389        results = self.allspice_client.requests_get_paginated(url, sudo=self)
390        return [Team.parse_response(self.allspice_client, result) for result in results]
391
392    def get_accessible_repos(self) -> List["Repository"]:
393        """Get all Repositories accessible by the logged in User."""
394        results = self.allspice_client.requests_get("/user/repos", sudo=self)
395        return [Repository.parse_response(self.allspice_client, result) for result in results]
396
397    def __request_emails(self):
398        result = self.allspice_client.requests_get(User.USER_MAIL % self.login)
399        # report if the adress changed by this
400        for mail in result:
401            self._emails.append(mail["email"])
402            if mail["primary"]:
403                self._email = mail["email"]
404
405    def delete(self):
406        """Deletes this User. Also deletes all Repositories he owns."""
407        self.allspice_client.requests_delete(User.ADMIN_DELETE_USER % self.username)
408        self.deleted = True
409
410    def get_heatmap(self) -> List[Tuple[datetime, int]]:
411        results = self.allspice_client.requests_get(User.USER_HEATMAP % self.username)
412        results = [
413            (datetime.fromtimestamp(result["timestamp"]), result["contributions"])
414            for result in results
415        ]
416        return results
User(allspice_client)
281    def __init__(self, allspice_client):
282        super().__init__(allspice_client)
283        self._emails = []
active: bool
admin: Any
allow_create_organization: Any
allow_git_hook: Any
allow_import_local: Any
avatar_url: str
created: str
description: str
email: str
emails
293    @property
294    def emails(self):
295        self.__request_emails()
296        return self._emails
followers_count: int
following_count: int
full_name: str
html_url: str
id: int
is_admin: bool
language: str
last_login: str
location: str
login: str
login_name: str
max_repo_creation: Any
must_change_password: Any
password: Any
prohibit_login: bool
restricted: bool
source_id: int
starred_repos_count: int
username: str
visibility: str
website: str
API_OBJECT = '/users/{name}'
USER_MAIL = '/user/emails?sudo=%s'
USER_PATCH = '/admin/users/%s'
ADMIN_DELETE_USER = '/admin/users/%s'
ADMIN_EDIT_USER = '/admin/users/{username}'
USER_HEATMAP = '/users/%s/heatmap'
@classmethod
def request(cls, allspice_client, name: str) -> User:
298    @classmethod
299    def request(cls, allspice_client, name: str) -> "User":
300        api_object = cls._request(allspice_client, {"name": name})
301        return api_object
def commit(self, login_name: str, source_id: int = 0):
320    def commit(self, login_name: str, source_id: int = 0):
321        """
322        Unfortunately it is necessary to require the login name
323        as well as the login source (that is not supplied when getting a user) for
324        changing a user.
325        Usually source_id is 0 and the login_name is equal to the username.
326        """
327        values = self.get_dirty_fields()
328        values.update(
329            # api-doc says that the "source_id" is necessary; works without though
330            {"login_name": login_name, "source_id": source_id}
331        )
332        args = {"username": self.username}
333        self.allspice_client.requests_patch(User.ADMIN_EDIT_USER.format(**args), data=values)
334        self._dirty_fields = {}

Unfortunately it is necessary to require the login name as well as the login source (that is not supplied when getting a user) for changing a user. Usually source_id is 0 and the login_name is equal to the username.

def create_repo( self, repoName: str, description: str = '', private: bool = False, autoInit=True, gitignores: Optional[str] = None, license: Optional[str] = None, readme: str = 'Default', issue_labels: Optional[str] = None, default_branch='master'):
336    def create_repo(
337        self,
338        repoName: str,
339        description: str = "",
340        private: bool = False,
341        autoInit=True,
342        gitignores: Optional[str] = None,
343        license: Optional[str] = None,
344        readme: str = "Default",
345        issue_labels: Optional[str] = None,
346        default_branch="master",
347    ):
348        """Create a user Repository
349
350        Throws:
351            AlreadyExistsException: If the Repository exists already.
352            Exception: If something else went wrong.
353        """
354        result = self.allspice_client.requests_post(
355            "/user/repos",
356            data={
357                "name": repoName,
358                "description": description,
359                "private": private,
360                "auto_init": autoInit,
361                "gitignores": gitignores,
362                "license": license,
363                "issue_labels": issue_labels,
364                "readme": readme,
365                "default_branch": default_branch,
366            },
367        )
368        if "id" in result:
369            self.allspice_client.logger.info("Successfully created Repository %s " % result["name"])
370        else:
371            self.allspice_client.logger.error(result["message"])
372            raise Exception("Repository not created... (gitea: %s)" % result["message"])
373        return Repository.parse_response(self.allspice_client, result)

Create a user Repository

Throws: AlreadyExistsException: If the Repository exists already. Exception: If something else went wrong.

def get_repositories(self) -> List[Repository]:
375    def get_repositories(self) -> List["Repository"]:
376        """Get all Repositories owned by this User."""
377        url = f"/users/{self.username}/repos"
378        results = self.allspice_client.requests_get_paginated(url)
379        return [Repository.parse_response(self.allspice_client, result) for result in results]

Get all Repositories owned by this User.

def get_orgs(self) -> List[Organization]:
381    def get_orgs(self) -> List[Organization]:
382        """Get all Organizations this user is a member of."""
383        url = f"/users/{self.username}/orgs"
384        results = self.allspice_client.requests_get_paginated(url)
385        return [Organization.parse_response(self.allspice_client, result) for result in results]

Get all Organizations this user is a member of.

def get_teams(self) -> List[Team]:
387    def get_teams(self) -> List["Team"]:
388        url = "/user/teams"
389        results = self.allspice_client.requests_get_paginated(url, sudo=self)
390        return [Team.parse_response(self.allspice_client, result) for result in results]
def get_accessible_repos(self) -> List[Repository]:
392    def get_accessible_repos(self) -> List["Repository"]:
393        """Get all Repositories accessible by the logged in User."""
394        results = self.allspice_client.requests_get("/user/repos", sudo=self)
395        return [Repository.parse_response(self.allspice_client, result) for result in results]

Get all Repositories accessible by the logged in User.

def delete(self):
405    def delete(self):
406        """Deletes this User. Also deletes all Repositories he owns."""
407        self.allspice_client.requests_delete(User.ADMIN_DELETE_USER % self.username)
408        self.deleted = True

Deletes this User. Also deletes all Repositories he owns.

def get_heatmap(self) -> List[Tuple[datetime.datetime, int]]:
410    def get_heatmap(self) -> List[Tuple[datetime, int]]:
411        results = self.allspice_client.requests_get(User.USER_HEATMAP % self.username)
412        results = [
413            (datetime.fromtimestamp(result["timestamp"]), result["contributions"])
414            for result in results
415        ]
416        return results
class Branch(allspice.baseapiobject.ReadonlyApiObject):
419class Branch(ReadonlyApiObject):
420    commit: Dict[str, Optional[Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]]]
421    effective_branch_protection_name: str
422    enable_status_check: bool
423    name: str
424    protected: bool
425    required_approvals: int
426    status_check_contexts: List[Any]
427    user_can_merge: bool
428    user_can_push: bool
429
430    API_OBJECT = """/repos/{owner}/{repo}/branches/{branch}"""
431
432    def __init__(self, allspice_client):
433        super().__init__(allspice_client)
434
435    def __eq__(self, other):
436        if not isinstance(other, Branch):
437            return False
438        return self.commit == other.commit and self.name == other.name
439
440    def __hash__(self):
441        return hash(self.commit["id"]) ^ hash(self.name)
442
443    _fields_to_parsers: ClassVar[dict] = {
444        # This is not a commit object
445        # "commit": lambda allspice_client, c: Commit.parse_response(allspice_client, c)
446    }
447
448    @classmethod
449    def request(cls, allspice_client, owner: str, repo: str, branch: str):
450        return cls._request(allspice_client, {"owner": owner, "repo": repo, "branch": branch})
Branch(allspice_client)
432    def __init__(self, allspice_client):
433        super().__init__(allspice_client)
commit: Dict[str, Union[str, Dict[str, str], Dict[str, Union[bool, str, NoneType]], NoneType]]
effective_branch_protection_name: str
enable_status_check: bool
name: str
protected: bool
required_approvals: int
status_check_contexts: List[Any]
user_can_merge: bool
user_can_push: bool
API_OBJECT = '/repos/{owner}/{repo}/branches/{branch}'
@classmethod
def request(cls, allspice_client, owner: str, repo: str, branch: str):
448    @classmethod
449    def request(cls, allspice_client, owner: str, repo: str, branch: str):
450        return cls._request(allspice_client, {"owner": owner, "repo": repo, "branch": branch})
class GitEntry(allspice.baseapiobject.ReadonlyApiObject):
453class GitEntry(ReadonlyApiObject):
454    """
455    An object representing a file or directory in the Git tree.
456    """
457
458    mode: str
459    path: str
460    sha: str
461    size: int
462    type: str
463    url: str
464
465    def __init__(self, allspice_client):
466        super().__init__(allspice_client)
467
468    def __eq__(self, other) -> bool:
469        if not isinstance(other, GitEntry):
470            return False
471        return self.sha == other.sha
472
473    def __hash__(self) -> int:
474        return hash(self.sha)

An object representing a file or directory in the Git tree.

GitEntry(allspice_client)
465    def __init__(self, allspice_client):
466        super().__init__(allspice_client)
mode: str
path: str
sha: str
size: int
type: str
url: str
class Repository(allspice.baseapiobject.ApiObject):
 477class Repository(ApiObject):
 478    allow_fast_forward_only_merge: bool
 479    allow_manual_merge: Any
 480    allow_merge_commits: bool
 481    allow_rebase: bool
 482    allow_rebase_explicit: bool
 483    allow_rebase_update: bool
 484    allow_squash_merge: bool
 485    archived: bool
 486    archived_at: str
 487    autodetect_manual_merge: Any
 488    avatar_url: str
 489    clone_url: str
 490    created_at: str
 491    default_allow_maintainer_edit: bool
 492    default_branch: str
 493    default_delete_branch_after_merge: bool
 494    default_merge_style: str
 495    description: str
 496    empty: bool
 497    enable_prune: Any
 498    external_tracker: Any
 499    external_wiki: Any
 500    fork: bool
 501    forks_count: int
 502    full_name: str
 503    has_actions: bool
 504    has_issues: bool
 505    has_packages: bool
 506    has_projects: bool
 507    has_pull_requests: bool
 508    has_releases: bool
 509    has_wiki: bool
 510    html_url: str
 511    id: int
 512    ignore_whitespace_conflicts: bool
 513    internal: bool
 514    internal_tracker: Dict[str, bool]
 515    language: str
 516    languages_url: str
 517    link: str
 518    mirror: bool
 519    mirror_interval: str
 520    mirror_updated: str
 521    name: str
 522    object_format_name: str
 523    open_issues_count: int
 524    open_pr_counter: int
 525    original_url: str
 526    owner: Union["User", "Organization"]
 527    parent: Any
 528    permissions: Dict[str, bool]
 529    private: bool
 530    projects_mode: str
 531    release_counter: int
 532    repo_transfer: Any
 533    size: int
 534    ssh_url: str
 535    stars_count: int
 536    template: bool
 537    updated_at: datetime
 538    url: str
 539    watchers_count: int
 540    website: str
 541
 542    API_OBJECT = """/repos/{owner}/{name}"""  # <owner>, <reponame>
 543    REPO_IS_COLLABORATOR = """/repos/%s/%s/collaborators/%s"""  # <owner>, <reponame>, <username>
 544    REPO_SEARCH = """/repos/search/"""
 545    REPO_BRANCHES = """/repos/%s/%s/branches"""  # <owner>, <reponame>
 546    REPO_BRANCH = """/repos/{owner}/{repo}/branches/{branch}"""
 547    REPO_ISSUES = """/repos/{owner}/{repo}/issues"""  # <owner, reponame>
 548    REPO_DESIGN_REVIEWS = """/repos/{owner}/{repo}/pulls"""
 549    REPO_DELETE = """/repos/%s/%s"""  # <owner>, <reponame>
 550    REPO_TIMES = """/repos/%s/%s/times"""  # <owner>, <reponame>
 551    REPO_USER_TIME = """/repos/%s/%s/times/%s"""  # <owner>, <reponame>, <username>
 552    REPO_COMMITS = "/repos/%s/%s/commits"  # <owner>, <reponame>
 553    REPO_TRANSFER = "/repos/{owner}/{repo}/transfer"
 554    REPO_MILESTONES = """/repos/{owner}/{repo}/milestones"""
 555    REPO_GET_ARCHIVE = "/repos/{owner}/{repo}/archive/{ref}.{format}"
 556    REPO_GET_ALLSPICE_JSON = "/repos/{owner}/{repo}/allspice_generated/json/{content}"
 557    REPO_GET_ALLSPICE_SVG = "/repos/{owner}/{repo}/allspice_generated/svg/{content}"
 558    REPO_GET_TOPICS = "/repos/{owner}/{repo}/topics"
 559    REPO_ADD_TOPIC = "/repos/{owner}/{repo}/topics/{topic}"
 560    REPO_GET_RELEASES = "/repos/{owner}/{repo}/releases"
 561    REPO_GET_LATEST_RELEASE = "/repos/{owner}/{repo}/releases/latest"
 562    REPO_GET_RELEASE_BY_TAG = "/repos/{owner}/{repo}/releases/tags/{tag}"
 563    REPO_GET_COMMIT_STATUS = "/repos/{owner}/{repo}/statuses/{sha}"
 564    REPO_GET_RAW_FILE = "/repos/{owner}/{repo}/raw/{path}"
 565    REPO_GET_TREE = "/repos/{owner}/{repo}/git/trees/{ref}"
 566
 567    class ArchiveFormat(Enum):
 568        """
 569        Archive formats for Repository.get_archive
 570        """
 571
 572        TAR = "tar.gz"
 573        ZIP = "zip"
 574
 575    class CommitStatusSort(Enum):
 576        """
 577        Sort order for Repository.get_commit_status
 578        """
 579
 580        OLDEST = "oldest"
 581        RECENT_UPDATE = "recentupdate"
 582        LEAST_UPDATE = "leastupdate"
 583        LEAST_INDEX = "leastindex"
 584        HIGHEST_INDEX = "highestindex"
 585
 586    def __init__(self, allspice_client):
 587        super().__init__(allspice_client)
 588
 589    def __eq__(self, other):
 590        if not isinstance(other, Repository):
 591            return False
 592        return self.owner == other.owner and self.name == other.name
 593
 594    def __hash__(self):
 595        return hash(self.owner) ^ hash(self.name)
 596
 597    _fields_to_parsers: ClassVar[dict] = {
 598        # dont know how to tell apart user and org as owner except form email being empty.
 599        "owner": lambda allspice_client, r: (
 600            Organization.parse_response(allspice_client, r)
 601            if r["email"] == ""
 602            else User.parse_response(allspice_client, r)
 603        ),
 604        "updated_at": lambda _, t: Util.convert_time(t),
 605    }
 606
 607    @classmethod
 608    def request(
 609        cls,
 610        allspice_client,
 611        owner: str,
 612        name: str,
 613    ) -> Repository:
 614        return cls._request(allspice_client, {"owner": owner, "name": name})
 615
 616    @classmethod
 617    def search(
 618        cls,
 619        allspice_client,
 620        query: Optional[str] = None,
 621        topic: bool = False,
 622        include_description: bool = False,
 623        user: Optional[User] = None,
 624        owner_to_prioritize: Union[User, Organization, None] = None,
 625    ) -> list[Repository]:
 626        """
 627        Search for repositories.
 628
 629        See https://hub.allspice.io/api/swagger#/repository/repoSearch
 630
 631        :param query: The query string to search for
 632        :param topic: If true, the query string will only be matched against the
 633            repository's topic.
 634        :param include_description: If true, the query string will be matched
 635            against the repository's description as well.
 636        :param user: If specified, only repositories that this user owns or
 637            contributes to will be searched.
 638        :param owner_to_prioritize: If specified, repositories owned by the
 639            given entity will be prioritized in the search.
 640        :returns: All repositories matching the query. If there are many
 641            repositories matching this query, this may take some time.
 642        """
 643
 644        params = {}
 645
 646        if query is not None:
 647            params["q"] = query
 648        if topic:
 649            params["topic"] = topic
 650        if include_description:
 651            params["include_description"] = include_description
 652        if user is not None:
 653            params["user"] = user.id
 654        if owner_to_prioritize is not None:
 655            params["owner_to_prioritize"] = owner_to_prioritize.id
 656
 657        responses = allspice_client.requests_get_paginated(cls.REPO_SEARCH, params=params)
 658
 659        return [Repository.parse_response(allspice_client, response) for response in responses]
 660
 661    _patchable_fields: ClassVar[set[str]] = {
 662        "allow_manual_merge",
 663        "allow_merge_commits",
 664        "allow_rebase",
 665        "allow_rebase_explicit",
 666        "allow_rebase_update",
 667        "allow_squash_merge",
 668        "archived",
 669        "autodetect_manual_merge",
 670        "default_branch",
 671        "default_delete_branch_after_merge",
 672        "default_merge_style",
 673        "description",
 674        "enable_prune",
 675        "external_tracker",
 676        "external_wiki",
 677        "has_actions",
 678        "has_issues",
 679        "has_projects",
 680        "has_pull_requests",
 681        "has_wiki",
 682        "ignore_whitespace_conflicts",
 683        "internal_tracker",
 684        "mirror_interval",
 685        "name",
 686        "private",
 687        "template",
 688        "website",
 689    }
 690
 691    def commit(self):
 692        args = {"owner": self.owner.username, "name": self.name}
 693        self._commit(args)
 694
 695    def get_branches(self) -> List["Branch"]:
 696        """Get all the Branches of this Repository."""
 697
 698        results = self.allspice_client.requests_get_paginated(
 699            Repository.REPO_BRANCHES % (self.owner.username, self.name)
 700        )
 701        return [Branch.parse_response(self.allspice_client, result) for result in results]
 702
 703    def get_branch(self, name: str) -> "Branch":
 704        """Get a specific Branch of this Repository."""
 705        result = self.allspice_client.requests_get(
 706            Repository.REPO_BRANCH.format(owner=self.owner.username, repo=self.name, branch=name)
 707        )
 708        return Branch.parse_response(self.allspice_client, result)
 709
 710    def add_branch(self, create_from: Ref, newname: str) -> "Branch":
 711        """Add a branch to the repository"""
 712        # Note: will only work with gitea 1.13 or higher!
 713
 714        ref_name = Util.data_params_for_ref(create_from)
 715        if "ref" not in ref_name:
 716            raise ValueError("create_from must be a Branch, Commit or string")
 717        ref_name = ref_name["ref"]
 718
 719        data = {"new_branch_name": newname, "old_ref_name": ref_name}
 720        result = self.allspice_client.requests_post(
 721            Repository.REPO_BRANCHES % (self.owner.username, self.name), data=data
 722        )
 723        return Branch.parse_response(self.allspice_client, result)
 724
 725    def get_issues(
 726        self,
 727        state: Literal["open", "closed", "all"] = "all",
 728        search_query: Optional[str] = None,
 729        labels: Optional[List[str]] = None,
 730        milestones: Optional[List[Union[Milestone, str]]] = None,
 731        assignee: Optional[Union[User, str]] = None,
 732        since: Optional[datetime] = None,
 733        before: Optional[datetime] = None,
 734    ) -> List["Issue"]:
 735        """
 736        Get all Issues of this Repository (open and closed)
 737
 738        https://hub.allspice.io/api/swagger#/repository/repoListIssues
 739
 740        All params of this method are optional filters. If you don't specify a filter, it
 741        will not be applied.
 742
 743        :param state: The state of the Issues to get. If None, all Issues are returned.
 744        :param search_query: Filter issues by text. This is equivalent to searching for
 745                             `search_query` in the Issues on the web interface.
 746        :param labels: Filter issues by labels.
 747        :param milestones: Filter issues by milestones.
 748        :param assignee: Filter issues by the assigned user.
 749        :param since: Filter issues by the date they were created.
 750        :param before: Filter issues by the date they were created.
 751        :return: A list of Issues.
 752        """
 753
 754        data = {
 755            "state": state,
 756        }
 757        if search_query:
 758            data["q"] = search_query
 759        if labels:
 760            data["labels"] = ",".join(labels)
 761        if milestones:
 762            data["milestone"] = ",".join(
 763                [
 764                    milestone.name if isinstance(milestone, Milestone) else milestone
 765                    for milestone in milestones
 766                ]
 767            )
 768        if assignee:
 769            if isinstance(assignee, User):
 770                data["assignee"] = assignee.username
 771            else:
 772                data["assignee"] = assignee
 773        if since:
 774            data["since"] = Util.format_time(since)
 775        if before:
 776            data["before"] = Util.format_time(before)
 777
 778        results = self.allspice_client.requests_get_paginated(
 779            Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name),
 780            params=data,
 781        )
 782
 783        issues = []
 784        for result in results:
 785            issue = Issue.parse_response(self.allspice_client, result)
 786            # See Issue.request
 787            setattr(issue, "_repository", self)
 788            # This is mostly for compatibility with an older implementation
 789            Issue._add_read_property("repo", self, issue)
 790            issues.append(issue)
 791
 792        return issues
 793
 794    def get_design_reviews(
 795        self,
 796        state: Literal["open", "closed", "all"] = "all",
 797        milestone: Optional[Union[Milestone, str]] = None,
 798        labels: Optional[List[str]] = None,
 799    ) -> List["DesignReview"]:
 800        """
 801        Get all Design Reviews of this Repository.
 802
 803        https://hub.allspice.io/api/swagger#/repository/repoListPullRequests
 804
 805        :param state: The state of the Design Reviews to get. If None, all Design Reviews
 806                      are returned.
 807        :param milestone: The milestone of the Design Reviews to get.
 808        :param labels: A list of label IDs to filter DRs by.
 809        :return: A list of Design Reviews.
 810        """
 811
 812        params = {
 813            "state": state,
 814        }
 815        if milestone:
 816            if isinstance(milestone, Milestone):
 817                params["milestone"] = milestone.name
 818            else:
 819                params["milestone"] = milestone
 820        if labels:
 821            params["labels"] = ",".join(labels)
 822
 823        results = self.allspice_client.requests_get_paginated(
 824            self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name),
 825            params=params,
 826        )
 827        return [DesignReview.parse_response(self.allspice_client, result) for result in results]
 828
 829    def get_commits(
 830        self,
 831        sha: Optional[str] = None,
 832        path: Optional[str] = None,
 833        stat: bool = True,
 834    ) -> List["Commit"]:
 835        """
 836        Get all the Commits of this Repository.
 837
 838        https://hub.allspice.io/api/swagger#/repository/repoGetAllCommits
 839
 840        :param sha: The SHA of the commit to start listing commits from.
 841        :param path: filepath of a file/dir.
 842        :param stat: Include the number of additions and deletions in the response.
 843                     Disable for speedup.
 844        :return: A list of Commits.
 845        """
 846
 847        data = {}
 848        if sha:
 849            data["sha"] = sha
 850        if path:
 851            data["path"] = path
 852        if not stat:
 853            data["stat"] = False
 854
 855        try:
 856            results = self.allspice_client.requests_get_paginated(
 857                Repository.REPO_COMMITS % (self.owner.username, self.name),
 858                params=data,
 859            )
 860        except ConflictException as err:
 861            logging.warning(err)
 862            logging.warning("Repository %s/%s is Empty" % (self.owner.username, self.name))
 863            results = []
 864        return [Commit.parse_response(self.allspice_client, result) for result in results]
 865
 866    def get_issues_state(self, state) -> List["Issue"]:
 867        """
 868        DEPRECATED: Use get_issues() instead.
 869
 870        Get issues of state Issue.open or Issue.closed of a repository.
 871        """
 872
 873        assert state in [Issue.OPENED, Issue.CLOSED]
 874        issues = []
 875        data = {"state": state}
 876        results = self.allspice_client.requests_get_paginated(
 877            Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name),
 878            params=data,
 879        )
 880        for result in results:
 881            issue = Issue.parse_response(self.allspice_client, result)
 882            # adding data not contained in the issue response
 883            # See Issue.request()
 884            setattr(issue, "_repository", self)
 885            Issue._add_read_property("repo", self, issue)
 886            Issue._add_read_property("owner", self.owner, issue)
 887            issues.append(issue)
 888        return issues
 889
 890    def get_times(self):
 891        results = self.allspice_client.requests_get(
 892            Repository.REPO_TIMES % (self.owner.username, self.name)
 893        )
 894        return results
 895
 896    def get_user_time(self, username) -> float:
 897        if isinstance(username, User):
 898            username = username.username
 899        results = self.allspice_client.requests_get(
 900            Repository.REPO_USER_TIME % (self.owner.username, self.name, username)
 901        )
 902        time = sum(r["time"] for r in results)
 903        return time
 904
 905    def get_full_name(self) -> str:
 906        return self.owner.username + "/" + self.name
 907
 908    def create_issue(self, title, assignees=frozenset(), description="") -> ApiObject:
 909        data = {
 910            "assignees": assignees,
 911            "body": description,
 912            "closed": False,
 913            "title": title,
 914        }
 915        result = self.allspice_client.requests_post(
 916            Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name),
 917            data=data,
 918        )
 919
 920        issue = Issue.parse_response(self.allspice_client, result)
 921        setattr(issue, "_repository", self)
 922        Issue._add_read_property("repo", self, issue)
 923        return issue
 924
 925    def create_design_review(
 926        self,
 927        title: str,
 928        head: Union[Branch, str],
 929        base: Union[Branch, str],
 930        assignees: Optional[Set[Union[User, str]]] = None,
 931        body: Optional[str] = None,
 932        due_date: Optional[datetime] = None,
 933        milestone: Optional["Milestone"] = None,
 934    ) -> "DesignReview":
 935        """
 936        Create a new Design Review.
 937
 938        See https://hub.allspice.io/api/swagger#/repository/repoCreatePullRequest
 939
 940        :param title: Title of the Design Review
 941        :param head: Branch or name of the branch to merge into the base branch
 942        :param base: Branch or name of the branch to merge into
 943        :param assignees: Optional. A list of users to assign this review. List can be of
 944                          User objects or of usernames.
 945        :param body: An Optional Description for the Design Review.
 946        :param due_date: An Optional Due date for the Design Review.
 947        :param milestone: An Optional Milestone for the Design Review
 948        :return: The created Design Review
 949        """
 950
 951        data: dict[str, Any] = {
 952            "title": title,
 953        }
 954
 955        if isinstance(head, Branch):
 956            data["head"] = head.name
 957        else:
 958            data["head"] = head
 959        if isinstance(base, Branch):
 960            data["base"] = base.name
 961        else:
 962            data["base"] = base
 963        if assignees:
 964            data["assignees"] = [a.username if isinstance(a, User) else a for a in assignees]
 965        if body:
 966            data["body"] = body
 967        if due_date:
 968            data["due_date"] = Util.format_time(due_date)
 969        if milestone:
 970            data["milestone"] = milestone.id
 971
 972        result = self.allspice_client.requests_post(
 973            self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name),
 974            data=data,
 975        )
 976
 977        return DesignReview.parse_response(self.allspice_client, result)
 978
 979    def create_milestone(
 980        self,
 981        title: str,
 982        description: str,
 983        due_date: Optional[str] = None,
 984        state: str = "open",
 985    ) -> "Milestone":
 986        url = Repository.REPO_MILESTONES.format(owner=self.owner.username, repo=self.name)
 987        data = {"title": title, "description": description, "state": state}
 988        if due_date:
 989            data["due_date"] = due_date
 990        result = self.allspice_client.requests_post(url, data=data)
 991        return Milestone.parse_response(self.allspice_client, result)
 992
 993    def create_gitea_hook(self, hook_url: str, events: List[str]):
 994        url = f"/repos/{self.owner.username}/{self.name}/hooks"
 995        data = {
 996            "type": "gitea",
 997            "config": {"content_type": "json", "url": hook_url},
 998            "events": events,
 999            "active": True,
1000        }
1001        return self.allspice_client.requests_post(url, data=data)
1002
1003    def list_hooks(self):
1004        url = f"/repos/{self.owner.username}/{self.name}/hooks"
1005        return self.allspice_client.requests_get(url)
1006
1007    def delete_hook(self, id: str):
1008        url = f"/repos/{self.owner.username}/{self.name}/hooks/{id}"
1009        self.allspice_client.requests_delete(url)
1010
1011    def is_collaborator(self, username) -> bool:
1012        if isinstance(username, User):
1013            username = username.username
1014        try:
1015            # returns 204 if its ok, 404 if its not
1016            self.allspice_client.requests_get(
1017                Repository.REPO_IS_COLLABORATOR % (self.owner.username, self.name, username)
1018            )
1019            return True
1020        except Exception:
1021            return False
1022
1023    def get_users_with_access(self) -> Sequence[User]:
1024        url = f"/repos/{self.owner.username}/{self.name}/collaborators"
1025        response = self.allspice_client.requests_get(url)
1026        collabs = [User.parse_response(self.allspice_client, user) for user in response]
1027        if isinstance(self.owner, User):
1028            return [*collabs, self.owner]
1029        else:
1030            # owner must be org
1031            teams = self.owner.get_teams()
1032            for team in teams:
1033                team_repos = team.get_repos()
1034                if self.name in [n.name for n in team_repos]:
1035                    collabs += team.get_members()
1036            return collabs
1037
1038    def remove_collaborator(self, user_name: str):
1039        url = f"/repos/{self.owner.username}/{self.name}/collaborators/{user_name}"
1040        self.allspice_client.requests_delete(url)
1041
1042    def transfer_ownership(
1043        self,
1044        new_owner: Union[User, Organization],
1045        new_teams: Set[Team] | FrozenSet[Team] = frozenset(),
1046    ):
1047        url = Repository.REPO_TRANSFER.format(owner=self.owner.username, repo=self.name)
1048        data: dict[str, Any] = {"new_owner": new_owner.username}
1049        if isinstance(new_owner, Organization):
1050            new_team_ids = [team.id for team in new_teams if team in new_owner.get_teams()]
1051            data["team_ids"] = new_team_ids
1052        self.allspice_client.requests_post(url, data=data)
1053        # TODO: make sure this instance is either updated or discarded
1054
1055    def get_git_content(
1056        self,
1057        ref: Optional["Ref"] = None,
1058        commit: "Optional[Commit]" = None,
1059    ) -> List[Content]:
1060        """
1061        Get the metadata for all files in the root directory.
1062
1063        https://hub.allspice.io/api/swagger#/repository/repoGetContentsList
1064
1065        :param ref: branch or commit to get content from
1066        :param commit: commit to get content from (deprecated)
1067        """
1068        url = f"/repos/{self.owner.username}/{self.name}/contents"
1069        data = Util.data_params_for_ref(ref or commit)
1070
1071        result = [
1072            Content.parse_response(self.allspice_client, f)
1073            for f in self.allspice_client.requests_get(url, data)
1074        ]
1075        return result
1076
1077    def get_tree(self, ref: Optional[Ref] = None, recursive: bool = False) -> List[GitEntry]:
1078        """
1079        Get the repository's tree on a given ref.
1080
1081        By default, this will only return the top-level entries in the tree. If you want
1082        to get the entire tree, set `recursive` to True.
1083
1084        :param ref: The ref to get the tree from. If not provided, the default branch is used.
1085        :param recursive: Whether to get the entire tree or just the top-level entries.
1086        """
1087
1088        ref = Util.data_params_for_ref(ref).get("ref", self.default_branch)
1089        url = self.REPO_GET_TREE.format(owner=self.owner.username, repo=self.name, ref=ref)
1090        params = {"recursive": recursive}
1091        results = self.allspice_client.requests_get_paginated(url, params=params)
1092        return [GitEntry.parse_response(self.allspice_client, result) for result in results]
1093
1094    def get_file_content(
1095        self,
1096        content: Content,
1097        ref: Optional[Ref] = None,
1098        commit: Optional[Commit] = None,
1099    ) -> Union[str, List["Content"]]:
1100        """https://hub.allspice.io/api/swagger#/repository/repoGetContents"""
1101        url = f"/repos/{self.owner.username}/{self.name}/contents/{content.path}"
1102        data = Util.data_params_for_ref(ref or commit)
1103
1104        if content.type == Content.FILE:
1105            return self.allspice_client.requests_get(url, data)["content"]
1106        else:
1107            return [
1108                Content.parse_response(self.allspice_client, f)
1109                for f in self.allspice_client.requests_get(url, data)
1110            ]
1111
1112    def get_raw_file(
1113        self,
1114        file_path: str,
1115        ref: Optional[Ref] = None,
1116    ) -> bytes:
1117        """
1118        Get the raw, binary data of a single file.
1119
1120        Note 1: if the file you are requesting is a text file, you might want to
1121        use .decode() on the result to get a string. For example:
1122
1123            content = repo.get_raw_file("file.txt").decode("utf-8")
1124
1125        Note 2: this method will store the entire file in memory. If you want
1126        to download a large file, you might want to use `download_to_file`
1127        instead.
1128
1129        See https://hub.allspice.io/api/swagger#/repository/repoGetRawFile
1130
1131        :param file_path: The path to the file to get.
1132        :param ref: The branch or commit to get the file from.  If not provided,
1133            the default branch is used.
1134        """
1135
1136        url = self.REPO_GET_RAW_FILE.format(
1137            owner=self.owner.username,
1138            repo=self.name,
1139            path=file_path,
1140        )
1141        params = Util.data_params_for_ref(ref)
1142        return self.allspice_client.requests_get_raw(url, params=params)
1143
1144    def download_to_file(
1145        self,
1146        file_path: str,
1147        io: IO,
1148        ref: Optional[Ref] = None,
1149    ) -> None:
1150        """
1151        Download the binary data of a file to a file-like object.
1152
1153        Example:
1154
1155            with open("schematic.DSN", "wb") as f:
1156                Repository.download_to_file("Schematics/my_schematic.DSN", f)
1157
1158        :param file_path: The path to the file in the repository from the root
1159            of the repository.
1160        :param io: The file-like object to write the data to.
1161        """
1162
1163        url = self.allspice_client._AllSpice__get_url(
1164            self.REPO_GET_RAW_FILE.format(
1165                owner=self.owner.username,
1166                repo=self.name,
1167                path=file_path,
1168            )
1169        )
1170        params = Util.data_params_for_ref(ref)
1171        response = self.allspice_client.requests.get(
1172            url,
1173            params=params,
1174            headers=self.allspice_client.headers,
1175            stream=True,
1176        )
1177
1178        for chunk in response.iter_content(chunk_size=4096):
1179            if chunk:
1180                io.write(chunk)
1181
1182    def get_generated_json(self, content: Union[Content, str], ref: Optional[Ref] = None) -> dict:
1183        """
1184        Get the json blob for a cad file if it exists, otherwise enqueue
1185        a new job and return a 503 status.
1186
1187        WARNING: This is still experimental and not recommended for critical
1188        applications. The structure and content of the returned dictionary can
1189        change at any time.
1190
1191        See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON
1192        """
1193
1194        if isinstance(content, Content):
1195            content = content.path
1196
1197        url = self.REPO_GET_ALLSPICE_JSON.format(
1198            owner=self.owner.username,
1199            repo=self.name,
1200            content=content,
1201        )
1202        data = Util.data_params_for_ref(ref)
1203        return self.allspice_client.requests_get(url, data)
1204
1205    def get_generated_svg(self, content: Union[Content, str], ref: Optional[Ref] = None) -> bytes:
1206        """
1207        Get the svg blob for a cad file if it exists, otherwise enqueue
1208        a new job and return a 503 status.
1209
1210        WARNING: This is still experimental and not yet recommended for
1211        critical applications. The content of the returned svg can change
1212        at any time.
1213
1214        See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG
1215        """
1216
1217        if isinstance(content, Content):
1218            content = content.path
1219
1220        url = self.REPO_GET_ALLSPICE_SVG.format(
1221            owner=self.owner.username,
1222            repo=self.name,
1223            content=content,
1224        )
1225        data = Util.data_params_for_ref(ref)
1226        return self.allspice_client.requests_get_raw(url, data)
1227
1228    def create_file(self, file_path: str, content: str, data: Optional[dict] = None):
1229        """https://hub.allspice.io/api/swagger#/repository/repoCreateFile"""
1230        if not data:
1231            data = {}
1232        url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}"
1233        data.update({"content": content})
1234        return self.allspice_client.requests_post(url, data)
1235
1236    def change_file(self, file_path: str, file_sha: str, content: str, data: Optional[dict] = None):
1237        """https://hub.allspice.io/api/swagger#/repository/repoCreateFile"""
1238        if not data:
1239            data = {}
1240        url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}"
1241        data.update({"sha": file_sha, "content": content})
1242        return self.allspice_client.requests_put(url, data)
1243
1244    def delete_file(self, file_path: str, file_sha: str, data: Optional[dict] = None):
1245        """https://hub.allspice.io/api/swagger#/repository/repoDeleteFile"""
1246        if not data:
1247            data = {}
1248        url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}"
1249        data.update({"sha": file_sha})
1250        return self.allspice_client.requests_delete(url, data)
1251
1252    def get_archive(
1253        self,
1254        ref: Ref = "main",
1255        archive_format: ArchiveFormat = ArchiveFormat.ZIP,
1256    ) -> bytes:
1257        """
1258        Download all the files in a specific ref of a repository as a zip or tarball
1259        archive.
1260
1261        https://hub.allspice.io/api/swagger#/repository/repoGetArchive
1262
1263        :param ref: branch or commit to get content from, defaults to the "main" branch
1264        :param archive_format: zip or tar, defaults to zip
1265        """
1266
1267        ref_string = Util.data_params_for_ref(ref)["ref"]
1268        url = self.REPO_GET_ARCHIVE.format(
1269            owner=self.owner.username,
1270            repo=self.name,
1271            ref=ref_string,
1272            format=archive_format.value,
1273        )
1274        return self.allspice_client.requests_get_raw(url)
1275
1276    def get_topics(self) -> list[str]:
1277        """
1278        Gets the list of topics on this repository.
1279
1280        See http://localhost:3000/api/swagger#/repository/repoListTopics
1281        """
1282
1283        url = self.REPO_GET_TOPICS.format(
1284            owner=self.owner.username,
1285            repo=self.name,
1286        )
1287        return self.allspice_client.requests_get(url)["topics"]
1288
1289    def add_topic(self, topic: str):
1290        """
1291        Adds a topic to the repository.
1292
1293        See https://hub.allspice.io/api/swagger#/repository/repoAddTopic
1294
1295        :param topic: The topic to add. Topic names must consist only of
1296            lowercase letters, numnbers and dashes (-), and cannot start with
1297            dashes. Topic names also must be under 35 characters long.
1298        """
1299
1300        url = self.REPO_ADD_TOPIC.format(owner=self.owner.username, repo=self.name, topic=topic)
1301        self.allspice_client.requests_put(url)
1302
1303    def create_release(
1304        self,
1305        tag_name: str,
1306        name: Optional[str] = None,
1307        body: Optional[str] = None,
1308        draft: bool = False,
1309    ):
1310        """
1311        Create a release for this repository. The release will be created for
1312        the tag with the given name. If there is no tag with this name, create
1313        the tag first.
1314
1315        See https://hub.allspice.io/api/swagger#/repository/repoCreateRelease
1316        """
1317
1318        url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name)
1319        data = {
1320            "tag_name": tag_name,
1321            "draft": draft,
1322        }
1323        if name is not None:
1324            data["name"] = name
1325        if body is not None:
1326            data["body"] = body
1327        response = self.allspice_client.requests_post(url, data)
1328        return Release.parse_response(self.allspice_client, response, self)
1329
1330    def get_releases(
1331        self, draft: Optional[bool] = None, pre_release: Optional[bool] = None
1332    ) -> List[Release]:
1333        """
1334        Get the list of releases for this repository.
1335
1336        See https://hub.allspice.io/api/swagger#/repository/repoListReleases
1337        """
1338
1339        data = {}
1340
1341        if draft is not None:
1342            data["draft"] = draft
1343        if pre_release is not None:
1344            data["pre-release"] = pre_release
1345
1346        url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name)
1347        responses = self.allspice_client.requests_get_paginated(url, params=data)
1348
1349        return [
1350            Release.parse_response(self.allspice_client, response, self) for response in responses
1351        ]
1352
1353    def get_latest_release(self) -> Release:
1354        """
1355        Get the latest release for this repository.
1356
1357        See https://hub.allspice.io/api/swagger#/repository/repoGetLatestRelease
1358        """
1359
1360        url = self.REPO_GET_LATEST_RELEASE.format(owner=self.owner.username, repo=self.name)
1361        response = self.allspice_client.requests_get(url)
1362        release = Release.parse_response(self.allspice_client, response, self)
1363        return release
1364
1365    def get_release_by_tag(self, tag: str) -> Release:
1366        """
1367        Get a release by its tag.
1368
1369        See https://hub.allspice.io/api/swagger#/repository/repoGetReleaseByTag
1370        """
1371
1372        url = self.REPO_GET_RELEASE_BY_TAG.format(
1373            owner=self.owner.username, repo=self.name, tag=tag
1374        )
1375        response = self.allspice_client.requests_get(url)
1376        release = Release.parse_response(self.allspice_client, response, self)
1377        return release
1378
1379    def get_commit_statuses(
1380        self,
1381        commit: Union[str, Commit],
1382        sort: Optional[CommitStatusSort] = None,
1383        state: Optional[CommitStatusState] = None,
1384    ) -> List[CommitStatus]:
1385        """
1386        Get a list of statuses for a commit.
1387
1388        This is roughly equivalent to the Commit.get_statuses method, but this
1389        method allows you to sort and filter commits and is more convenient if
1390        you have a commit SHA and don't need to get the commit itself.
1391
1392        See https://hub.allspice.io/api/swagger#/repository/repoListStatuses
1393        """
1394
1395        if isinstance(commit, Commit):
1396            commit = commit.sha
1397
1398        params = {}
1399        if sort is not None:
1400            params["sort"] = sort.value
1401        if state is not None:
1402            params["state"] = state.value
1403
1404        url = self.REPO_GET_COMMIT_STATUS.format(
1405            owner=self.owner.username, repo=self.name, sha=commit
1406        )
1407        response = self.allspice_client.requests_get_paginated(url, params=params)
1408        return [CommitStatus.parse_response(self.allspice_client, status) for status in response]
1409
1410    def create_commit_status(
1411        self,
1412        commit: Union[str, Commit],
1413        context: Optional[str] = None,
1414        description: Optional[str] = None,
1415        state: Optional[CommitStatusState] = None,
1416        target_url: Optional[str] = None,
1417    ) -> CommitStatus:
1418        """
1419        Create a status on a commit.
1420
1421        See https://hub.allspice.io/api/swagger#/repository/repoCreateStatus
1422        """
1423
1424        if isinstance(commit, Commit):
1425            commit = commit.sha
1426
1427        data = {}
1428        if context is not None:
1429            data["context"] = context
1430        if description is not None:
1431            data["description"] = description
1432        if state is not None:
1433            data["state"] = state.value
1434        if target_url is not None:
1435            data["target_url"] = target_url
1436
1437        url = self.REPO_GET_COMMIT_STATUS.format(
1438            owner=self.owner.username, repo=self.name, sha=commit
1439        )
1440        response = self.allspice_client.requests_post(url, data=data)
1441        return CommitStatus.parse_response(self.allspice_client, response)
1442
1443    def delete(self):
1444        self.allspice_client.requests_delete(
1445            Repository.REPO_DELETE % (self.owner.username, self.name)
1446        )
1447        self.deleted = True
Repository(allspice_client)
586    def __init__(self, allspice_client):
587        super().__init__(allspice_client)
allow_fast_forward_only_merge: bool
allow_manual_merge: Any
allow_merge_commits: bool
allow_rebase: bool
allow_rebase_explicit: bool
allow_rebase_update: bool
allow_squash_merge: bool
archived: bool
archived_at: str
autodetect_manual_merge: Any
avatar_url: str
clone_url: str
created_at: str
default_allow_maintainer_edit: bool
default_branch: str
default_delete_branch_after_merge: bool
default_merge_style: str
description: str
empty: bool
enable_prune: Any
external_tracker: Any
external_wiki: Any
fork: bool
forks_count: int
full_name: str
has_actions: bool
has_issues: bool
has_packages: bool
has_projects: bool
has_pull_requests: bool
has_releases: bool
has_wiki: bool
html_url: str
id: int
ignore_whitespace_conflicts: bool
internal: bool
internal_tracker: Dict[str, bool]
language: str
languages_url: str
mirror: bool
mirror_interval: str
mirror_updated: str
name: str
object_format_name: str
open_issues_count: int
open_pr_counter: int
original_url: str
owner: Union[User, Organization]
parent: Any
permissions: Dict[str, bool]
private: bool
projects_mode: str
release_counter: int
repo_transfer: Any
size: int
ssh_url: str
stars_count: int
template: bool
updated_at: datetime.datetime
url: str
watchers_count: int
website: str
API_OBJECT = '/repos/{owner}/{name}'
REPO_IS_COLLABORATOR = '/repos/%s/%s/collaborators/%s'
REPO_BRANCHES = '/repos/%s/%s/branches'
REPO_BRANCH = '/repos/{owner}/{repo}/branches/{branch}'
REPO_ISSUES = '/repos/{owner}/{repo}/issues'
REPO_DESIGN_REVIEWS = '/repos/{owner}/{repo}/pulls'
REPO_DELETE = '/repos/%s/%s'
REPO_TIMES = '/repos/%s/%s/times'
REPO_USER_TIME = '/repos/%s/%s/times/%s'
REPO_COMMITS = '/repos/%s/%s/commits'
REPO_TRANSFER = '/repos/{owner}/{repo}/transfer'
REPO_MILESTONES = '/repos/{owner}/{repo}/milestones'
REPO_GET_ARCHIVE = '/repos/{owner}/{repo}/archive/{ref}.{format}'
REPO_GET_ALLSPICE_JSON = '/repos/{owner}/{repo}/allspice_generated/json/{content}'
REPO_GET_ALLSPICE_SVG = '/repos/{owner}/{repo}/allspice_generated/svg/{content}'
REPO_GET_TOPICS = '/repos/{owner}/{repo}/topics'
REPO_ADD_TOPIC = '/repos/{owner}/{repo}/topics/{topic}'
REPO_GET_RELEASES = '/repos/{owner}/{repo}/releases'
REPO_GET_LATEST_RELEASE = '/repos/{owner}/{repo}/releases/latest'
REPO_GET_RELEASE_BY_TAG = '/repos/{owner}/{repo}/releases/tags/{tag}'
REPO_GET_COMMIT_STATUS = '/repos/{owner}/{repo}/statuses/{sha}'
REPO_GET_RAW_FILE = '/repos/{owner}/{repo}/raw/{path}'
REPO_GET_TREE = '/repos/{owner}/{repo}/git/trees/{ref}'
@classmethod
def request( cls, allspice_client, owner: str, name: str) -> Repository:
607    @classmethod
608    def request(
609        cls,
610        allspice_client,
611        owner: str,
612        name: str,
613    ) -> Repository:
614        return cls._request(allspice_client, {"owner": owner, "name": name})
@classmethod
def search( cls, allspice_client, query: Optional[str] = None, topic: bool = False, include_description: bool = False, user: Optional[User] = None, owner_to_prioritize: Union[User, Organization, NoneType] = None) -> list[Repository]:
616    @classmethod
617    def search(
618        cls,
619        allspice_client,
620        query: Optional[str] = None,
621        topic: bool = False,
622        include_description: bool = False,
623        user: Optional[User] = None,
624        owner_to_prioritize: Union[User, Organization, None] = None,
625    ) -> list[Repository]:
626        """
627        Search for repositories.
628
629        See https://hub.allspice.io/api/swagger#/repository/repoSearch
630
631        :param query: The query string to search for
632        :param topic: If true, the query string will only be matched against the
633            repository's topic.
634        :param include_description: If true, the query string will be matched
635            against the repository's description as well.
636        :param user: If specified, only repositories that this user owns or
637            contributes to will be searched.
638        :param owner_to_prioritize: If specified, repositories owned by the
639            given entity will be prioritized in the search.
640        :returns: All repositories matching the query. If there are many
641            repositories matching this query, this may take some time.
642        """
643
644        params = {}
645
646        if query is not None:
647            params["q"] = query
648        if topic:
649            params["topic"] = topic
650        if include_description:
651            params["include_description"] = include_description
652        if user is not None:
653            params["user"] = user.id
654        if owner_to_prioritize is not None:
655            params["owner_to_prioritize"] = owner_to_prioritize.id
656
657        responses = allspice_client.requests_get_paginated(cls.REPO_SEARCH, params=params)
658
659        return [Repository.parse_response(allspice_client, response) for response in responses]

Search for repositories.

See allspice.allspice.io/api/swagger#/repository/repoSearch">https://huballspice.allspice.io/api/swagger#/repository/repoSearch

Parameters
  • query: The query string to search for
  • topic: If true, the query string will only be matched against the repository's topic.
  • include_description: If true, the query string will be matched against the repository's description as well.
  • user: If specified, only repositories that this user owns or contributes to will be searched.
  • owner_to_prioritize: If specified, repositories owned by the given entity will be prioritized in the search. :returns: All repositories matching the query. If there are many repositories matching this query, this may take some time.
def commit(self):
691    def commit(self):
692        args = {"owner": self.owner.username, "name": self.name}
693        self._commit(args)
def get_branches(self) -> List[Branch]:
695    def get_branches(self) -> List["Branch"]:
696        """Get all the Branches of this Repository."""
697
698        results = self.allspice_client.requests_get_paginated(
699            Repository.REPO_BRANCHES % (self.owner.username, self.name)
700        )
701        return [Branch.parse_response(self.allspice_client, result) for result in results]

Get all the Branches of this Repository.

def get_branch(self, name: str) -> Branch:
703    def get_branch(self, name: str) -> "Branch":
704        """Get a specific Branch of this Repository."""
705        result = self.allspice_client.requests_get(
706            Repository.REPO_BRANCH.format(owner=self.owner.username, repo=self.name, branch=name)
707        )
708        return Branch.parse_response(self.allspice_client, result)

Get a specific Branch of this Repository.

def add_branch( self, create_from: Union[Branch, Commit, str], newname: str) -> Branch:
710    def add_branch(self, create_from: Ref, newname: str) -> "Branch":
711        """Add a branch to the repository"""
712        # Note: will only work with gitea 1.13 or higher!
713
714        ref_name = Util.data_params_for_ref(create_from)
715        if "ref" not in ref_name:
716            raise ValueError("create_from must be a Branch, Commit or string")
717        ref_name = ref_name["ref"]
718
719        data = {"new_branch_name": newname, "old_ref_name": ref_name}
720        result = self.allspice_client.requests_post(
721            Repository.REPO_BRANCHES % (self.owner.username, self.name), data=data
722        )
723        return Branch.parse_response(self.allspice_client, result)

Add a branch to the repository

def get_issues( self, state: Literal['open', 'closed', 'all'] = 'all', search_query: Optional[str] = None, labels: Optional[List[str]] = None, milestones: Optional[List[Union[Milestone, str]]] = None, assignee: Union[User, str, NoneType] = None, since: Optional[datetime.datetime] = None, before: Optional[datetime.datetime] = None) -> List[Issue]:
725    def get_issues(
726        self,
727        state: Literal["open", "closed", "all"] = "all",
728        search_query: Optional[str] = None,
729        labels: Optional[List[str]] = None,
730        milestones: Optional[List[Union[Milestone, str]]] = None,
731        assignee: Optional[Union[User, str]] = None,
732        since: Optional[datetime] = None,
733        before: Optional[datetime] = None,
734    ) -> List["Issue"]:
735        """
736        Get all Issues of this Repository (open and closed)
737
738        https://hub.allspice.io/api/swagger#/repository/repoListIssues
739
740        All params of this method are optional filters. If you don't specify a filter, it
741        will not be applied.
742
743        :param state: The state of the Issues to get. If None, all Issues are returned.
744        :param search_query: Filter issues by text. This is equivalent to searching for
745                             `search_query` in the Issues on the web interface.
746        :param labels: Filter issues by labels.
747        :param milestones: Filter issues by milestones.
748        :param assignee: Filter issues by the assigned user.
749        :param since: Filter issues by the date they were created.
750        :param before: Filter issues by the date they were created.
751        :return: A list of Issues.
752        """
753
754        data = {
755            "state": state,
756        }
757        if search_query:
758            data["q"] = search_query
759        if labels:
760            data["labels"] = ",".join(labels)
761        if milestones:
762            data["milestone"] = ",".join(
763                [
764                    milestone.name if isinstance(milestone, Milestone) else milestone
765                    for milestone in milestones
766                ]
767            )
768        if assignee:
769            if isinstance(assignee, User):
770                data["assignee"] = assignee.username
771            else:
772                data["assignee"] = assignee
773        if since:
774            data["since"] = Util.format_time(since)
775        if before:
776            data["before"] = Util.format_time(before)
777
778        results = self.allspice_client.requests_get_paginated(
779            Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name),
780            params=data,
781        )
782
783        issues = []
784        for result in results:
785            issue = Issue.parse_response(self.allspice_client, result)
786            # See Issue.request
787            setattr(issue, "_repository", self)
788            # This is mostly for compatibility with an older implementation
789            Issue._add_read_property("repo", self, issue)
790            issues.append(issue)
791
792        return issues

Get all Issues of this Repository (open and closed)

allspice.allspice.io/api/swagger#/repository/repoListIssues">https://huballspice.allspice.io/api/swagger#/repository/repoListIssues

All params of this method are optional filters. If you don't specify a filter, it will not be applied.

Parameters
  • state: The state of the Issues to get. If None, all Issues are returned.
  • search_query: Filter issues by text. This is equivalent to searching for search_query in the Issues on the web interface.
  • labels: Filter issues by labels.
  • milestones: Filter issues by milestones.
  • assignee: Filter issues by the assigned user.
  • since: Filter issues by the date they were created.
  • before: Filter issues by the date they were created.
Returns

A list of Issues.

def get_design_reviews( self, state: Literal['open', 'closed', 'all'] = 'all', milestone: Union[Milestone, str, NoneType] = None, labels: Optional[List[str]] = None) -> List[DesignReview]:
794    def get_design_reviews(
795        self,
796        state: Literal["open", "closed", "all"] = "all",
797        milestone: Optional[Union[Milestone, str]] = None,
798        labels: Optional[List[str]] = None,
799    ) -> List["DesignReview"]:
800        """
801        Get all Design Reviews of this Repository.
802
803        https://hub.allspice.io/api/swagger#/repository/repoListPullRequests
804
805        :param state: The state of the Design Reviews to get. If None, all Design Reviews
806                      are returned.
807        :param milestone: The milestone of the Design Reviews to get.
808        :param labels: A list of label IDs to filter DRs by.
809        :return: A list of Design Reviews.
810        """
811
812        params = {
813            "state": state,
814        }
815        if milestone:
816            if isinstance(milestone, Milestone):
817                params["milestone"] = milestone.name
818            else:
819                params["milestone"] = milestone
820        if labels:
821            params["labels"] = ",".join(labels)
822
823        results = self.allspice_client.requests_get_paginated(
824            self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name),
825            params=params,
826        )
827        return [DesignReview.parse_response(self.allspice_client, result) for result in results]

Get all Design Reviews of this Repository.

allspice.allspice.io/api/swagger#/repository/repoListPullRequests">https://huballspice.allspice.io/api/swagger#/repository/repoListPullRequests

Parameters
  • state: The state of the Design Reviews to get. If None, all Design Reviews are returned.
  • milestone: The milestone of the Design Reviews to get.
  • labels: A list of label IDs to filter DRs by.
Returns

A list of Design Reviews.

def get_commits( self, sha: Optional[str] = None, path: Optional[str] = None, stat: bool = True) -> List[Commit]:
829    def get_commits(
830        self,
831        sha: Optional[str] = None,
832        path: Optional[str] = None,
833        stat: bool = True,
834    ) -> List["Commit"]:
835        """
836        Get all the Commits of this Repository.
837
838        https://hub.allspice.io/api/swagger#/repository/repoGetAllCommits
839
840        :param sha: The SHA of the commit to start listing commits from.
841        :param path: filepath of a file/dir.
842        :param stat: Include the number of additions and deletions in the response.
843                     Disable for speedup.
844        :return: A list of Commits.
845        """
846
847        data = {}
848        if sha:
849            data["sha"] = sha
850        if path:
851            data["path"] = path
852        if not stat:
853            data["stat"] = False
854
855        try:
856            results = self.allspice_client.requests_get_paginated(
857                Repository.REPO_COMMITS % (self.owner.username, self.name),
858                params=data,
859            )
860        except ConflictException as err:
861            logging.warning(err)
862            logging.warning("Repository %s/%s is Empty" % (self.owner.username, self.name))
863            results = []
864        return [Commit.parse_response(self.allspice_client, result) for result in results]

Get all the Commits of this Repository.

allspice.allspice.io/api/swagger#/repository/repoGetAllCommits">https://huballspice.allspice.io/api/swagger#/repository/repoGetAllCommits

Parameters
  • sha: The SHA of the commit to start listing commits from.
  • path: filepath of a file/dir.
  • stat: Include the number of additions and deletions in the response. Disable for speedup.
Returns

A list of Commits.

def get_issues_state(self, state) -> List[Issue]:
866    def get_issues_state(self, state) -> List["Issue"]:
867        """
868        DEPRECATED: Use get_issues() instead.
869
870        Get issues of state Issue.open or Issue.closed of a repository.
871        """
872
873        assert state in [Issue.OPENED, Issue.CLOSED]
874        issues = []
875        data = {"state": state}
876        results = self.allspice_client.requests_get_paginated(
877            Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name),
878            params=data,
879        )
880        for result in results:
881            issue = Issue.parse_response(self.allspice_client, result)
882            # adding data not contained in the issue response
883            # See Issue.request()
884            setattr(issue, "_repository", self)
885            Issue._add_read_property("repo", self, issue)
886            Issue._add_read_property("owner", self.owner, issue)
887            issues.append(issue)
888        return issues

DEPRECATED: Use get_issues() instead.

Get issues of state Issue.open or Issue.closed of a repository.

def get_times(self):
890    def get_times(self):
891        results = self.allspice_client.requests_get(
892            Repository.REPO_TIMES % (self.owner.username, self.name)
893        )
894        return results
def get_user_time(self, username) -> float:
896    def get_user_time(self, username) -> float:
897        if isinstance(username, User):
898            username = username.username
899        results = self.allspice_client.requests_get(
900            Repository.REPO_USER_TIME % (self.owner.username, self.name, username)
901        )
902        time = sum(r["time"] for r in results)
903        return time
def get_full_name(self) -> str:
905    def get_full_name(self) -> str:
906        return self.owner.username + "/" + self.name
def create_issue( self, title, assignees=frozenset(), description='') -> allspice.baseapiobject.ApiObject:
908    def create_issue(self, title, assignees=frozenset(), description="") -> ApiObject:
909        data = {
910            "assignees": assignees,
911            "body": description,
912            "closed": False,
913            "title": title,
914        }
915        result = self.allspice_client.requests_post(
916            Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name),
917            data=data,
918        )
919
920        issue = Issue.parse_response(self.allspice_client, result)
921        setattr(issue, "_repository", self)
922        Issue._add_read_property("repo", self, issue)
923        return issue
def create_design_review( self, title: str, head: Union[Branch, str], base: Union[Branch, str], assignees: Optional[Set[Union[User, str]]] = None, body: Optional[str] = None, due_date: Optional[datetime.datetime] = None, milestone: Optional[Milestone] = None) -> DesignReview:
925    def create_design_review(
926        self,
927        title: str,
928        head: Union[Branch, str],
929        base: Union[Branch, str],
930        assignees: Optional[Set[Union[User, str]]] = None,
931        body: Optional[str] = None,
932        due_date: Optional[datetime] = None,
933        milestone: Optional["Milestone"] = None,
934    ) -> "DesignReview":
935        """
936        Create a new Design Review.
937
938        See https://hub.allspice.io/api/swagger#/repository/repoCreatePullRequest
939
940        :param title: Title of the Design Review
941        :param head: Branch or name of the branch to merge into the base branch
942        :param base: Branch or name of the branch to merge into
943        :param assignees: Optional. A list of users to assign this review. List can be of
944                          User objects or of usernames.
945        :param body: An Optional Description for the Design Review.
946        :param due_date: An Optional Due date for the Design Review.
947        :param milestone: An Optional Milestone for the Design Review
948        :return: The created Design Review
949        """
950
951        data: dict[str, Any] = {
952            "title": title,
953        }
954
955        if isinstance(head, Branch):
956            data["head"] = head.name
957        else:
958            data["head"] = head
959        if isinstance(base, Branch):
960            data["base"] = base.name
961        else:
962            data["base"] = base
963        if assignees:
964            data["assignees"] = [a.username if isinstance(a, User) else a for a in assignees]
965        if body:
966            data["body"] = body
967        if due_date:
968            data["due_date"] = Util.format_time(due_date)
969        if milestone:
970            data["milestone"] = milestone.id
971
972        result = self.allspice_client.requests_post(
973            self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name),
974            data=data,
975        )
976
977        return DesignReview.parse_response(self.allspice_client, result)

Create a new Design Review.

See allspice.allspice.io/api/swagger#/repository/repoCreatePullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoCreatePullRequest

Parameters
  • title: Title of the Design Review
  • head: Branch or name of the branch to merge into the base branch
  • base: Branch or name of the branch to merge into
  • assignees: Optional. A list of users to assign this review. List can be of User objects or of usernames.
  • body: An Optional Description for the Design Review.
  • due_date: An Optional Due date for the Design Review.
  • milestone: An Optional Milestone for the Design Review
Returns

The created Design Review

def create_milestone( self, title: str, description: str, due_date: Optional[str] = None, state: str = 'open') -> Milestone:
979    def create_milestone(
980        self,
981        title: str,
982        description: str,
983        due_date: Optional[str] = None,
984        state: str = "open",
985    ) -> "Milestone":
986        url = Repository.REPO_MILESTONES.format(owner=self.owner.username, repo=self.name)
987        data = {"title": title, "description": description, "state": state}
988        if due_date:
989            data["due_date"] = due_date
990        result = self.allspice_client.requests_post(url, data=data)
991        return Milestone.parse_response(self.allspice_client, result)
def create_gitea_hook(self, hook_url: str, events: List[str]):
 993    def create_gitea_hook(self, hook_url: str, events: List[str]):
 994        url = f"/repos/{self.owner.username}/{self.name}/hooks"
 995        data = {
 996            "type": "gitea",
 997            "config": {"content_type": "json", "url": hook_url},
 998            "events": events,
 999            "active": True,
1000        }
1001        return self.allspice_client.requests_post(url, data=data)
def list_hooks(self):
1003    def list_hooks(self):
1004        url = f"/repos/{self.owner.username}/{self.name}/hooks"
1005        return self.allspice_client.requests_get(url)
def delete_hook(self, id: str):
1007    def delete_hook(self, id: str):
1008        url = f"/repos/{self.owner.username}/{self.name}/hooks/{id}"
1009        self.allspice_client.requests_delete(url)
def is_collaborator(self, username) -> bool:
1011    def is_collaborator(self, username) -> bool:
1012        if isinstance(username, User):
1013            username = username.username
1014        try:
1015            # returns 204 if its ok, 404 if its not
1016            self.allspice_client.requests_get(
1017                Repository.REPO_IS_COLLABORATOR % (self.owner.username, self.name, username)
1018            )
1019            return True
1020        except Exception:
1021            return False
def get_users_with_access(self) -> Sequence[User]:
1023    def get_users_with_access(self) -> Sequence[User]:
1024        url = f"/repos/{self.owner.username}/{self.name}/collaborators"
1025        response = self.allspice_client.requests_get(url)
1026        collabs = [User.parse_response(self.allspice_client, user) for user in response]
1027        if isinstance(self.owner, User):
1028            return [*collabs, self.owner]
1029        else:
1030            # owner must be org
1031            teams = self.owner.get_teams()
1032            for team in teams:
1033                team_repos = team.get_repos()
1034                if self.name in [n.name for n in team_repos]:
1035                    collabs += team.get_members()
1036            return collabs
def remove_collaborator(self, user_name: str):
1038    def remove_collaborator(self, user_name: str):
1039        url = f"/repos/{self.owner.username}/{self.name}/collaborators/{user_name}"
1040        self.allspice_client.requests_delete(url)
def transfer_ownership( self, new_owner: Union[User, Organization], new_teams: Union[Set[Team], FrozenSet[Team]] = frozenset()):
1042    def transfer_ownership(
1043        self,
1044        new_owner: Union[User, Organization],
1045        new_teams: Set[Team] | FrozenSet[Team] = frozenset(),
1046    ):
1047        url = Repository.REPO_TRANSFER.format(owner=self.owner.username, repo=self.name)
1048        data: dict[str, Any] = {"new_owner": new_owner.username}
1049        if isinstance(new_owner, Organization):
1050            new_team_ids = [team.id for team in new_teams if team in new_owner.get_teams()]
1051            data["team_ids"] = new_team_ids
1052        self.allspice_client.requests_post(url, data=data)
1053        # TODO: make sure this instance is either updated or discarded
def get_git_content( self, ref: Union[Branch, Commit, str, NoneType] = None, commit: Optional[Commit] = None) -> List[Content]:
1055    def get_git_content(
1056        self,
1057        ref: Optional["Ref"] = None,
1058        commit: "Optional[Commit]" = None,
1059    ) -> List[Content]:
1060        """
1061        Get the metadata for all files in the root directory.
1062
1063        https://hub.allspice.io/api/swagger#/repository/repoGetContentsList
1064
1065        :param ref: branch or commit to get content from
1066        :param commit: commit to get content from (deprecated)
1067        """
1068        url = f"/repos/{self.owner.username}/{self.name}/contents"
1069        data = Util.data_params_for_ref(ref or commit)
1070
1071        result = [
1072            Content.parse_response(self.allspice_client, f)
1073            for f in self.allspice_client.requests_get(url, data)
1074        ]
1075        return result

Get the metadata for all files in the root directory.

allspice.allspice.io/api/swagger#/repository/repoGetContentsList">https://huballspice.allspice.io/api/swagger#/repository/repoGetContentsList

Parameters
  • ref: branch or commit to get content from
  • commit: commit to get content from (deprecated)
def get_tree( self, ref: Union[Branch, Commit, str, NoneType] = None, recursive: bool = False) -> List[GitEntry]:
1077    def get_tree(self, ref: Optional[Ref] = None, recursive: bool = False) -> List[GitEntry]:
1078        """
1079        Get the repository's tree on a given ref.
1080
1081        By default, this will only return the top-level entries in the tree. If you want
1082        to get the entire tree, set `recursive` to True.
1083
1084        :param ref: The ref to get the tree from. If not provided, the default branch is used.
1085        :param recursive: Whether to get the entire tree or just the top-level entries.
1086        """
1087
1088        ref = Util.data_params_for_ref(ref).get("ref", self.default_branch)
1089        url = self.REPO_GET_TREE.format(owner=self.owner.username, repo=self.name, ref=ref)
1090        params = {"recursive": recursive}
1091        results = self.allspice_client.requests_get_paginated(url, params=params)
1092        return [GitEntry.parse_response(self.allspice_client, result) for result in results]

Get the repository's tree on a given ref.

By default, this will only return the top-level entries in the tree. If you want to get the entire tree, set recursive to True.

Parameters
  • ref: The ref to get the tree from. If not provided, the default branch is used.
  • recursive: Whether to get the entire tree or just the top-level entries.
def get_file_content( self, content: Content, ref: Union[Branch, Commit, str, NoneType] = None, commit: Optional[Commit] = None) -> Union[str, List[Content]]:
1094    def get_file_content(
1095        self,
1096        content: Content,
1097        ref: Optional[Ref] = None,
1098        commit: Optional[Commit] = None,
1099    ) -> Union[str, List["Content"]]:
1100        """https://hub.allspice.io/api/swagger#/repository/repoGetContents"""
1101        url = f"/repos/{self.owner.username}/{self.name}/contents/{content.path}"
1102        data = Util.data_params_for_ref(ref or commit)
1103
1104        if content.type == Content.FILE:
1105            return self.allspice_client.requests_get(url, data)["content"]
1106        else:
1107            return [
1108                Content.parse_response(self.allspice_client, f)
1109                for f in self.allspice_client.requests_get(url, data)
1110            ]

allspice.allspice.io/api/swagger#/repository/repoGetContents">https://huballspice.allspice.io/api/swagger#/repository/repoGetContents

def get_raw_file( self, file_path: str, ref: Union[Branch, Commit, str, NoneType] = None) -> bytes:
1112    def get_raw_file(
1113        self,
1114        file_path: str,
1115        ref: Optional[Ref] = None,
1116    ) -> bytes:
1117        """
1118        Get the raw, binary data of a single file.
1119
1120        Note 1: if the file you are requesting is a text file, you might want to
1121        use .decode() on the result to get a string. For example:
1122
1123            content = repo.get_raw_file("file.txt").decode("utf-8")
1124
1125        Note 2: this method will store the entire file in memory. If you want
1126        to download a large file, you might want to use `download_to_file`
1127        instead.
1128
1129        See https://hub.allspice.io/api/swagger#/repository/repoGetRawFile
1130
1131        :param file_path: The path to the file to get.
1132        :param ref: The branch or commit to get the file from.  If not provided,
1133            the default branch is used.
1134        """
1135
1136        url = self.REPO_GET_RAW_FILE.format(
1137            owner=self.owner.username,
1138            repo=self.name,
1139            path=file_path,
1140        )
1141        params = Util.data_params_for_ref(ref)
1142        return self.allspice_client.requests_get_raw(url, params=params)

Get the raw, binary data of a single file.

Note 1: if the file you are requesting is a text file, you might want to use .decode() on the result to get a string. For example:

content = repo.get_raw_file("file.txt").decode("utf-8")

Note 2: this method will store the entire file in memory. If you want to download a large file, you might want to use download_to_file instead.

See allspice.allspice.io/api/swagger#/repository/repoGetRawFile">https://huballspice.allspice.io/api/swagger#/repository/repoGetRawFile

Parameters
  • file_path: The path to the file to get.
  • ref: The branch or commit to get the file from. If not provided, the default branch is used.
def download_to_file( self, file_path: str, io: <class 'IO'>, ref: Union[Branch, Commit, str, NoneType] = None) -> None:
1144    def download_to_file(
1145        self,
1146        file_path: str,
1147        io: IO,
1148        ref: Optional[Ref] = None,
1149    ) -> None:
1150        """
1151        Download the binary data of a file to a file-like object.
1152
1153        Example:
1154
1155            with open("schematic.DSN", "wb") as f:
1156                Repository.download_to_file("Schematics/my_schematic.DSN", f)
1157
1158        :param file_path: The path to the file in the repository from the root
1159            of the repository.
1160        :param io: The file-like object to write the data to.
1161        """
1162
1163        url = self.allspice_client._AllSpice__get_url(
1164            self.REPO_GET_RAW_FILE.format(
1165                owner=self.owner.username,
1166                repo=self.name,
1167                path=file_path,
1168            )
1169        )
1170        params = Util.data_params_for_ref(ref)
1171        response = self.allspice_client.requests.get(
1172            url,
1173            params=params,
1174            headers=self.allspice_client.headers,
1175            stream=True,
1176        )
1177
1178        for chunk in response.iter_content(chunk_size=4096):
1179            if chunk:
1180                io.write(chunk)

Download the binary data of a file to a file-like object.

Example:

with open("schematic.DSN", "wb") as f:
    Repository.download_to_file("Schematics/my_schematic.DSN", f)
Parameters
  • file_path: The path to the file in the repository from the root of the repository.
  • io: The file-like object to write the data to.
def get_generated_json( self, content: Union[Content, str], ref: Union[Branch, Commit, str, NoneType] = None) -> dict:
1182    def get_generated_json(self, content: Union[Content, str], ref: Optional[Ref] = None) -> dict:
1183        """
1184        Get the json blob for a cad file if it exists, otherwise enqueue
1185        a new job and return a 503 status.
1186
1187        WARNING: This is still experimental and not recommended for critical
1188        applications. The structure and content of the returned dictionary can
1189        change at any time.
1190
1191        See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON
1192        """
1193
1194        if isinstance(content, Content):
1195            content = content.path
1196
1197        url = self.REPO_GET_ALLSPICE_JSON.format(
1198            owner=self.owner.username,
1199            repo=self.name,
1200            content=content,
1201        )
1202        data = Util.data_params_for_ref(ref)
1203        return self.allspice_client.requests_get(url, data)

Get the json blob for a cad file if it exists, otherwise enqueue a new job and return a 503 status.

WARNING: This is still experimental and not recommended for critical applications. The structure and content of the returned dictionary can change at any time.

See allspice.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON">https://huballspice.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON

def get_generated_svg( self, content: Union[Content, str], ref: Union[Branch, Commit, str, NoneType] = None) -> bytes:
1205    def get_generated_svg(self, content: Union[Content, str], ref: Optional[Ref] = None) -> bytes:
1206        """
1207        Get the svg blob for a cad file if it exists, otherwise enqueue
1208        a new job and return a 503 status.
1209
1210        WARNING: This is still experimental and not yet recommended for
1211        critical applications. The content of the returned svg can change
1212        at any time.
1213
1214        See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG
1215        """
1216
1217        if isinstance(content, Content):
1218            content = content.path
1219
1220        url = self.REPO_GET_ALLSPICE_SVG.format(
1221            owner=self.owner.username,
1222            repo=self.name,
1223            content=content,
1224        )
1225        data = Util.data_params_for_ref(ref)
1226        return self.allspice_client.requests_get_raw(url, data)

Get the svg blob for a cad file if it exists, otherwise enqueue a new job and return a 503 status.

WARNING: This is still experimental and not yet recommended for critical applications. The content of the returned svg can change at any time.

See allspice.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG">https://huballspice.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG

def create_file(self, file_path: str, content: str, data: Optional[dict] = None):
1228    def create_file(self, file_path: str, content: str, data: Optional[dict] = None):
1229        """https://hub.allspice.io/api/swagger#/repository/repoCreateFile"""
1230        if not data:
1231            data = {}
1232        url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}"
1233        data.update({"content": content})
1234        return self.allspice_client.requests_post(url, data)

allspice.allspice.io/api/swagger#/repository/repoCreateFile">https://huballspice.allspice.io/api/swagger#/repository/repoCreateFile

def change_file( self, file_path: str, file_sha: str, content: str, data: Optional[dict] = None):
1236    def change_file(self, file_path: str, file_sha: str, content: str, data: Optional[dict] = None):
1237        """https://hub.allspice.io/api/swagger#/repository/repoCreateFile"""
1238        if not data:
1239            data = {}
1240        url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}"
1241        data.update({"sha": file_sha, "content": content})
1242        return self.allspice_client.requests_put(url, data)

allspice.allspice.io/api/swagger#/repository/repoCreateFile">https://huballspice.allspice.io/api/swagger#/repository/repoCreateFile

def delete_file(self, file_path: str, file_sha: str, data: Optional[dict] = None):
1244    def delete_file(self, file_path: str, file_sha: str, data: Optional[dict] = None):
1245        """https://hub.allspice.io/api/swagger#/repository/repoDeleteFile"""
1246        if not data:
1247            data = {}
1248        url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}"
1249        data.update({"sha": file_sha})
1250        return self.allspice_client.requests_delete(url, data)

allspice.allspice.io/api/swagger#/repository/repoDeleteFile">https://huballspice.allspice.io/api/swagger#/repository/repoDeleteFile

def get_archive( self, ref: Union[Branch, Commit, str] = 'main', archive_format: Repository.ArchiveFormat = <ArchiveFormat.ZIP: 'zip'>) -> bytes:
1252    def get_archive(
1253        self,
1254        ref: Ref = "main",
1255        archive_format: ArchiveFormat = ArchiveFormat.ZIP,
1256    ) -> bytes:
1257        """
1258        Download all the files in a specific ref of a repository as a zip or tarball
1259        archive.
1260
1261        https://hub.allspice.io/api/swagger#/repository/repoGetArchive
1262
1263        :param ref: branch or commit to get content from, defaults to the "main" branch
1264        :param archive_format: zip or tar, defaults to zip
1265        """
1266
1267        ref_string = Util.data_params_for_ref(ref)["ref"]
1268        url = self.REPO_GET_ARCHIVE.format(
1269            owner=self.owner.username,
1270            repo=self.name,
1271            ref=ref_string,
1272            format=archive_format.value,
1273        )
1274        return self.allspice_client.requests_get_raw(url)

Download all the files in a specific ref of a repository as a zip or tarball archive.

allspice.allspice.io/api/swagger#/repository/repoGetArchive">https://huballspice.allspice.io/api/swagger#/repository/repoGetArchive

Parameters
  • ref: branch or commit to get content from, defaults to the "main" branch
  • archive_format: zip or tar, defaults to zip
def get_topics(self) -> list[str]:
1276    def get_topics(self) -> list[str]:
1277        """
1278        Gets the list of topics on this repository.
1279
1280        See http://localhost:3000/api/swagger#/repository/repoListTopics
1281        """
1282
1283        url = self.REPO_GET_TOPICS.format(
1284            owner=self.owner.username,
1285            repo=self.name,
1286        )
1287        return self.allspice_client.requests_get(url)["topics"]

Gets the list of topics on this repository.

See http://localhost:3000/api/swagger#/repository/repoListTopics

def add_topic(self, topic: str):
1289    def add_topic(self, topic: str):
1290        """
1291        Adds a topic to the repository.
1292
1293        See https://hub.allspice.io/api/swagger#/repository/repoAddTopic
1294
1295        :param topic: The topic to add. Topic names must consist only of
1296            lowercase letters, numnbers and dashes (-), and cannot start with
1297            dashes. Topic names also must be under 35 characters long.
1298        """
1299
1300        url = self.REPO_ADD_TOPIC.format(owner=self.owner.username, repo=self.name, topic=topic)
1301        self.allspice_client.requests_put(url)

Adds a topic to the repository.

See allspice.allspice.io/api/swagger#/repository/repoAddTopic">https://huballspice.allspice.io/api/swagger#/repository/repoAddTopic

Parameters
  • topic: The topic to add. Topic names must consist only of lowercase letters, numnbers and dashes (-), and cannot start with dashes. Topic names also must be under 35 characters long.
def create_release( self, tag_name: str, name: Optional[str] = None, body: Optional[str] = None, draft: bool = False):
1303    def create_release(
1304        self,
1305        tag_name: str,
1306        name: Optional[str] = None,
1307        body: Optional[str] = None,
1308        draft: bool = False,
1309    ):
1310        """
1311        Create a release for this repository. The release will be created for
1312        the tag with the given name. If there is no tag with this name, create
1313        the tag first.
1314
1315        See https://hub.allspice.io/api/swagger#/repository/repoCreateRelease
1316        """
1317
1318        url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name)
1319        data = {
1320            "tag_name": tag_name,
1321            "draft": draft,
1322        }
1323        if name is not None:
1324            data["name"] = name
1325        if body is not None:
1326            data["body"] = body
1327        response = self.allspice_client.requests_post(url, data)
1328        return Release.parse_response(self.allspice_client, response, self)

Create a release for this repository. The release will be created for the tag with the given name. If there is no tag with this name, create the tag first.

See allspice.allspice.io/api/swagger#/repository/repoCreateRelease">https://huballspice.allspice.io/api/swagger#/repository/repoCreateRelease

def get_releases( self, draft: Optional[bool] = None, pre_release: Optional[bool] = None) -> List[Release]:
1330    def get_releases(
1331        self, draft: Optional[bool] = None, pre_release: Optional[bool] = None
1332    ) -> List[Release]:
1333        """
1334        Get the list of releases for this repository.
1335
1336        See https://hub.allspice.io/api/swagger#/repository/repoListReleases
1337        """
1338
1339        data = {}
1340
1341        if draft is not None:
1342            data["draft"] = draft
1343        if pre_release is not None:
1344            data["pre-release"] = pre_release
1345
1346        url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name)
1347        responses = self.allspice_client.requests_get_paginated(url, params=data)
1348
1349        return [
1350            Release.parse_response(self.allspice_client, response, self) for response in responses
1351        ]

Get the list of releases for this repository.

See allspice.allspice.io/api/swagger#/repository/repoListReleases">https://huballspice.allspice.io/api/swagger#/repository/repoListReleases

def get_latest_release(self) -> Release:
1353    def get_latest_release(self) -> Release:
1354        """
1355        Get the latest release for this repository.
1356
1357        See https://hub.allspice.io/api/swagger#/repository/repoGetLatestRelease
1358        """
1359
1360        url = self.REPO_GET_LATEST_RELEASE.format(owner=self.owner.username, repo=self.name)
1361        response = self.allspice_client.requests_get(url)
1362        release = Release.parse_response(self.allspice_client, response, self)
1363        return release

Get the latest release for this repository.

See allspice.allspice.io/api/swagger#/repository/repoGetLatestRelease">https://huballspice.allspice.io/api/swagger#/repository/repoGetLatestRelease

def get_release_by_tag(self, tag: str) -> Release:
1365    def get_release_by_tag(self, tag: str) -> Release:
1366        """
1367        Get a release by its tag.
1368
1369        See https://hub.allspice.io/api/swagger#/repository/repoGetReleaseByTag
1370        """
1371
1372        url = self.REPO_GET_RELEASE_BY_TAG.format(
1373            owner=self.owner.username, repo=self.name, tag=tag
1374        )
1375        response = self.allspice_client.requests_get(url)
1376        release = Release.parse_response(self.allspice_client, response, self)
1377        return release

Get a release by its tag.

See allspice.allspice.io/api/swagger#/repository/repoGetReleaseByTag">https://huballspice.allspice.io/api/swagger#/repository/repoGetReleaseByTag

def get_commit_statuses( self, commit: Union[str, Commit], sort: Optional[Repository.CommitStatusSort] = None, state: Optional[CommitStatusState] = None) -> List[CommitStatus]:
1379    def get_commit_statuses(
1380        self,
1381        commit: Union[str, Commit],
1382        sort: Optional[CommitStatusSort] = None,
1383        state: Optional[CommitStatusState] = None,
1384    ) -> List[CommitStatus]:
1385        """
1386        Get a list of statuses for a commit.
1387
1388        This is roughly equivalent to the Commit.get_statuses method, but this
1389        method allows you to sort and filter commits and is more convenient if
1390        you have a commit SHA and don't need to get the commit itself.
1391
1392        See https://hub.allspice.io/api/swagger#/repository/repoListStatuses
1393        """
1394
1395        if isinstance(commit, Commit):
1396            commit = commit.sha
1397
1398        params = {}
1399        if sort is not None:
1400            params["sort"] = sort.value
1401        if state is not None:
1402            params["state"] = state.value
1403
1404        url = self.REPO_GET_COMMIT_STATUS.format(
1405            owner=self.owner.username, repo=self.name, sha=commit
1406        )
1407        response = self.allspice_client.requests_get_paginated(url, params=params)
1408        return [CommitStatus.parse_response(self.allspice_client, status) for status in response]

Get a list of statuses for a commit.

This is roughly equivalent to the Commit.get_statuses method, but this method allows you to sort and filter commits and is more convenient if you have a commit SHA and don't need to get the commit itself.

See allspice.allspice.io/api/swagger#/repository/repoListStatuses">https://huballspice.allspice.io/api/swagger#/repository/repoListStatuses

def create_commit_status( self, commit: Union[str, Commit], context: Optional[str] = None, description: Optional[str] = None, state: Optional[CommitStatusState] = None, target_url: Optional[str] = None) -> CommitStatus:
1410    def create_commit_status(
1411        self,
1412        commit: Union[str, Commit],
1413        context: Optional[str] = None,
1414        description: Optional[str] = None,
1415        state: Optional[CommitStatusState] = None,
1416        target_url: Optional[str] = None,
1417    ) -> CommitStatus:
1418        """
1419        Create a status on a commit.
1420
1421        See https://hub.allspice.io/api/swagger#/repository/repoCreateStatus
1422        """
1423
1424        if isinstance(commit, Commit):
1425            commit = commit.sha
1426
1427        data = {}
1428        if context is not None:
1429            data["context"] = context
1430        if description is not None:
1431            data["description"] = description
1432        if state is not None:
1433            data["state"] = state.value
1434        if target_url is not None:
1435            data["target_url"] = target_url
1436
1437        url = self.REPO_GET_COMMIT_STATUS.format(
1438            owner=self.owner.username, repo=self.name, sha=commit
1439        )
1440        response = self.allspice_client.requests_post(url, data=data)
1441        return CommitStatus.parse_response(self.allspice_client, response)

Create a status on a commit.

See allspice.allspice.io/api/swagger#/repository/repoCreateStatus">https://huballspice.allspice.io/api/swagger#/repository/repoCreateStatus

def delete(self):
1443    def delete(self):
1444        self.allspice_client.requests_delete(
1445            Repository.REPO_DELETE % (self.owner.username, self.name)
1446        )
1447        self.deleted = True
class Repository.ArchiveFormat(enum.Enum):
567    class ArchiveFormat(Enum):
568        """
569        Archive formats for Repository.get_archive
570        """
571
572        TAR = "tar.gz"
573        ZIP = "zip"

Archive formats for Repository.get_archive

TAR = <ArchiveFormat.TAR: 'tar.gz'>
ZIP = <ArchiveFormat.ZIP: 'zip'>
class Repository.CommitStatusSort(enum.Enum):
575    class CommitStatusSort(Enum):
576        """
577        Sort order for Repository.get_commit_status
578        """
579
580        OLDEST = "oldest"
581        RECENT_UPDATE = "recentupdate"
582        LEAST_UPDATE = "leastupdate"
583        LEAST_INDEX = "leastindex"
584        HIGHEST_INDEX = "highestindex"

Sort order for Repository.get_commit_status

OLDEST = <CommitStatusSort.OLDEST: 'oldest'>
RECENT_UPDATE = <CommitStatusSort.RECENT_UPDATE: 'recentupdate'>
LEAST_UPDATE = <CommitStatusSort.LEAST_UPDATE: 'leastupdate'>
LEAST_INDEX = <CommitStatusSort.LEAST_INDEX: 'leastindex'>
HIGHEST_INDEX = <CommitStatusSort.HIGHEST_INDEX: 'highestindex'>
class Milestone(allspice.baseapiobject.ApiObject):
1450class Milestone(ApiObject):
1451    allow_merge_commits: Any
1452    allow_rebase: Any
1453    allow_rebase_explicit: Any
1454    allow_squash_merge: Any
1455    archived: Any
1456    closed_at: Any
1457    closed_issues: int
1458    created_at: str
1459    default_branch: Any
1460    description: str
1461    due_on: Any
1462    has_issues: Any
1463    has_pull_requests: Any
1464    has_wiki: Any
1465    id: int
1466    ignore_whitespace_conflicts: Any
1467    name: Any
1468    open_issues: int
1469    private: Any
1470    state: str
1471    title: str
1472    updated_at: str
1473    website: Any
1474
1475    API_OBJECT = """/repos/{owner}/{repo}/milestones/{number}"""  # <owner, repo>
1476
1477    def __init__(self, allspice_client):
1478        super().__init__(allspice_client)
1479
1480    def __eq__(self, other):
1481        if not isinstance(other, Milestone):
1482            return False
1483        return self.allspice_client == other.allspice_client and self.id == other.id
1484
1485    def __hash__(self):
1486        return hash(self.allspice_client) ^ hash(self.id)
1487
1488    _fields_to_parsers: ClassVar[dict] = {
1489        "closed_at": lambda _, t: Util.convert_time(t),
1490        "due_on": lambda _, t: Util.convert_time(t),
1491    }
1492
1493    _patchable_fields: ClassVar[set[str]] = {
1494        "allow_merge_commits",
1495        "allow_rebase",
1496        "allow_rebase_explicit",
1497        "allow_squash_merge",
1498        "archived",
1499        "default_branch",
1500        "description",
1501        "has_issues",
1502        "has_pull_requests",
1503        "has_wiki",
1504        "ignore_whitespace_conflicts",
1505        "name",
1506        "private",
1507        "website",
1508    }
1509
1510    @classmethod
1511    def request(cls, allspice_client, owner: str, repo: str, number: str):
1512        return cls._request(allspice_client, {"owner": owner, "repo": repo, "number": number})
Milestone(allspice_client)
1477    def __init__(self, allspice_client):
1478        super().__init__(allspice_client)
allow_merge_commits: Any
allow_rebase: Any
allow_rebase_explicit: Any
allow_squash_merge: Any
archived: Any
closed_at: Any
closed_issues: int
created_at: str
default_branch: Any
description: str
due_on: Any
has_issues: Any
has_pull_requests: Any
has_wiki: Any
id: int
ignore_whitespace_conflicts: Any
name: Any
open_issues: int
private: Any
state: str
title: str
updated_at: str
website: Any
API_OBJECT = '/repos/{owner}/{repo}/milestones/{number}'
@classmethod
def request(cls, allspice_client, owner: str, repo: str, number: str):
1510    @classmethod
1511    def request(cls, allspice_client, owner: str, repo: str, number: str):
1512        return cls._request(allspice_client, {"owner": owner, "repo": repo, "number": number})
class Attachment(allspice.baseapiobject.ReadonlyApiObject):
1515class Attachment(ReadonlyApiObject):
1516    """
1517    An asset attached to a comment.
1518
1519    You cannot edit or delete the attachment from this object - see the instance methods
1520    Comment.edit_attachment and delete_attachment for that.
1521    """
1522
1523    browser_download_url: str
1524    created_at: str
1525    download_count: int
1526    id: int
1527    name: str
1528    size: int
1529    uuid: str
1530
1531    def __init__(self, allspice_client):
1532        super().__init__(allspice_client)
1533
1534    def __eq__(self, other):
1535        if not isinstance(other, Attachment):
1536            return False
1537
1538        return self.uuid == other.uuid
1539
1540    def __hash__(self):
1541        return hash(self.uuid)
1542
1543    def download_to_file(self, io: IO):
1544        """
1545        Download the raw, binary data of this Attachment to a file-like object.
1546
1547        Example:
1548
1549            with open("my_file.zip", "wb") as f:
1550                attachment.download_to_file(f)
1551
1552        :param io: The file-like object to write the data to.
1553        """
1554
1555        response = self.allspice_client.requests.get(
1556            self.browser_download_url,
1557            headers=self.allspice_client.headers,
1558            stream=True,
1559        )
1560        # 4kb chunks
1561        for chunk in response.iter_content(chunk_size=4096):
1562            if chunk:
1563                io.write(chunk)

An asset attached to a comment.

You cannot edit or delete the attachment from this object - see the instance methods Comment.edit_attachment and delete_attachment for that.

Attachment(allspice_client)
1531    def __init__(self, allspice_client):
1532        super().__init__(allspice_client)
browser_download_url: str
created_at: str
download_count: int
id: int
name: str
size: int
uuid: str
def download_to_file(self, io: <class 'IO'>):
1543    def download_to_file(self, io: IO):
1544        """
1545        Download the raw, binary data of this Attachment to a file-like object.
1546
1547        Example:
1548
1549            with open("my_file.zip", "wb") as f:
1550                attachment.download_to_file(f)
1551
1552        :param io: The file-like object to write the data to.
1553        """
1554
1555        response = self.allspice_client.requests.get(
1556            self.browser_download_url,
1557            headers=self.allspice_client.headers,
1558            stream=True,
1559        )
1560        # 4kb chunks
1561        for chunk in response.iter_content(chunk_size=4096):
1562            if chunk:
1563                io.write(chunk)

Download the raw, binary data of this Attachment to a file-like object.

Example:

with open("my_file.zip", "wb") as f:
    attachment.download_to_file(f)
Parameters
  • io: The file-like object to write the data to.
class Comment(allspice.baseapiobject.ApiObject):
1566class Comment(ApiObject):
1567    assets: List[Union[Any, Dict[str, Union[int, str]]]]
1568    body: str
1569    created_at: datetime
1570    html_url: str
1571    id: int
1572    issue_url: str
1573    original_author: str
1574    original_author_id: int
1575    pull_request_url: str
1576    updated_at: datetime
1577    user: User
1578
1579    API_OBJECT = """/repos/{owner}/{repo}/issues/comments/{id}"""
1580    GET_ATTACHMENTS_PATH = """/repos/{owner}/{repo}/issues/comments/{id}/assets"""
1581    ATTACHMENT_PATH = """/repos/{owner}/{repo}/issues/comments/{id}/assets/{attachment_id}"""
1582
1583    def __init__(self, allspice_client):
1584        super().__init__(allspice_client)
1585
1586    def __eq__(self, other):
1587        if not isinstance(other, Comment):
1588            return False
1589        return self.repository == other.repository and self.id == other.id
1590
1591    def __hash__(self):
1592        return hash(self.repository) ^ hash(self.id)
1593
1594    @classmethod
1595    def request(cls, allspice_client, owner: str, repo: str, id: str) -> "Comment":
1596        return cls._request(allspice_client, {"owner": owner, "repo": repo, "id": id})
1597
1598    _fields_to_parsers: ClassVar[dict] = {
1599        "user": lambda allspice_client, r: User.parse_response(allspice_client, r),
1600        "created_at": lambda _, t: Util.convert_time(t),
1601        "updated_at": lambda _, t: Util.convert_time(t),
1602    }
1603
1604    _patchable_fields: ClassVar[set[str]] = {"body"}
1605
1606    @property
1607    def parent_url(self) -> str:
1608        """URL of the parent of this comment (the issue or the pull request)"""
1609
1610        if self.issue_url is not None and self.issue_url != "":
1611            return self.issue_url
1612        else:
1613            return self.pull_request_url
1614
1615    @cached_property
1616    def repository(self) -> Repository:
1617        """The repository this comment was posted on."""
1618
1619        owner_name, repo_name = self.parent_url.split("/")[-4:-2]
1620        return Repository.request(self.allspice_client, owner_name, repo_name)
1621
1622    def __fields_for_path(self):
1623        return {
1624            "owner": self.repository.owner.username,
1625            "repo": self.repository.name,
1626            "id": self.id,
1627        }
1628
1629    def commit(self):
1630        self._commit(self.__fields_for_path())
1631
1632    def delete(self):
1633        self.allspice_client.requests_delete(self.API_OBJECT.format(**self.__fields_for_path()))
1634        self.deleted = True
1635
1636    def get_attachments(self) -> List[Attachment]:
1637        """
1638        Get all attachments on this comment. This returns Attachment objects, which
1639        contain a link to download the attachment.
1640
1641        https://hub.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments
1642        """
1643
1644        results = self.allspice_client.requests_get(
1645            self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path())
1646        )
1647        return [Attachment.parse_response(self.allspice_client, result) for result in results]
1648
1649    def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment:
1650        """
1651        Create an attachment on this comment.
1652
1653        https://hub.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment
1654
1655        :param file: The file to attach. This should be a file-like object.
1656        :param name: The name of the file. If not provided, the name of the file will be
1657                     used.
1658        :return: The created attachment.
1659        """
1660
1661        args: dict[str, Any] = {
1662            "files": {"attachment": file},
1663        }
1664        if name is not None:
1665            args["params"] = {"name": name}
1666
1667        result = self.allspice_client.requests_post(
1668            self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()),
1669            **args,
1670        )
1671        return Attachment.parse_response(self.allspice_client, result)
1672
1673    def edit_attachment(self, attachment: Attachment, data: dict) -> Attachment:
1674        """
1675        Edit an attachment.
1676
1677        The list of params that can be edited is available at
1678        https://hub.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment
1679
1680        :param attachment: The attachment to be edited
1681        :param data: The data parameter should be a dictionary of the fields to edit.
1682        :return: The edited attachment
1683        """
1684
1685        args = {
1686            **self.__fields_for_path(),
1687            "attachment_id": attachment.id,
1688        }
1689        result = self.allspice_client.requests_patch(
1690            self.ATTACHMENT_PATH.format(**args),
1691            data=data,
1692        )
1693        return Attachment.parse_response(self.allspice_client, result)
1694
1695    def delete_attachment(self, attachment: Attachment):
1696        """https://hub.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment"""
1697
1698        args = {
1699            **self.__fields_for_path(),
1700            "attachment_id": attachment.id,
1701        }
1702        self.allspice_client.requests_delete(self.ATTACHMENT_PATH.format(**args))
1703        attachment.deleted = True
Comment(allspice_client)
1583    def __init__(self, allspice_client):
1584        super().__init__(allspice_client)
assets: List[Union[Any, Dict[str, Union[int, str]]]]
body: str
created_at: datetime.datetime
html_url: str
id: int
issue_url: str
original_author: str
original_author_id: int
pull_request_url: str
updated_at: datetime.datetime
user: User
API_OBJECT = '/repos/{owner}/{repo}/issues/comments/{id}'
GET_ATTACHMENTS_PATH = '/repos/{owner}/{repo}/issues/comments/{id}/assets'
ATTACHMENT_PATH = '/repos/{owner}/{repo}/issues/comments/{id}/assets/{attachment_id}'
@classmethod
def request( cls, allspice_client, owner: str, repo: str, id: str) -> Comment:
1594    @classmethod
1595    def request(cls, allspice_client, owner: str, repo: str, id: str) -> "Comment":
1596        return cls._request(allspice_client, {"owner": owner, "repo": repo, "id": id})
parent_url: str
1606    @property
1607    def parent_url(self) -> str:
1608        """URL of the parent of this comment (the issue or the pull request)"""
1609
1610        if self.issue_url is not None and self.issue_url != "":
1611            return self.issue_url
1612        else:
1613            return self.pull_request_url

URL of the parent of this comment (the issue or the pull request)

repository: Repository
1615    @cached_property
1616    def repository(self) -> Repository:
1617        """The repository this comment was posted on."""
1618
1619        owner_name, repo_name = self.parent_url.split("/")[-4:-2]
1620        return Repository.request(self.allspice_client, owner_name, repo_name)

The repository this comment was posted on.

def commit(self):
1629    def commit(self):
1630        self._commit(self.__fields_for_path())
def delete(self):
1632    def delete(self):
1633        self.allspice_client.requests_delete(self.API_OBJECT.format(**self.__fields_for_path()))
1634        self.deleted = True
def get_attachments(self) -> List[Attachment]:
1636    def get_attachments(self) -> List[Attachment]:
1637        """
1638        Get all attachments on this comment. This returns Attachment objects, which
1639        contain a link to download the attachment.
1640
1641        https://hub.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments
1642        """
1643
1644        results = self.allspice_client.requests_get(
1645            self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path())
1646        )
1647        return [Attachment.parse_response(self.allspice_client, result) for result in results]

Get all attachments on this comment. This returns Attachment objects, which contain a link to download the attachment.

allspice.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments">https://huballspice.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments

def create_attachment( self, file: <class 'IO'>, name: Optional[str] = None) -> Attachment:
1649    def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment:
1650        """
1651        Create an attachment on this comment.
1652
1653        https://hub.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment
1654
1655        :param file: The file to attach. This should be a file-like object.
1656        :param name: The name of the file. If not provided, the name of the file will be
1657                     used.
1658        :return: The created attachment.
1659        """
1660
1661        args: dict[str, Any] = {
1662            "files": {"attachment": file},
1663        }
1664        if name is not None:
1665            args["params"] = {"name": name}
1666
1667        result = self.allspice_client.requests_post(
1668            self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()),
1669            **args,
1670        )
1671        return Attachment.parse_response(self.allspice_client, result)

Create an attachment on this comment.

allspice.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment

Parameters
  • file: The file to attach. This should be a file-like object.
  • name: The name of the file. If not provided, the name of the file will be used.
Returns

The created attachment.

def edit_attachment( self, attachment: Attachment, data: dict) -> Attachment:
1673    def edit_attachment(self, attachment: Attachment, data: dict) -> Attachment:
1674        """
1675        Edit an attachment.
1676
1677        The list of params that can be edited is available at
1678        https://hub.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment
1679
1680        :param attachment: The attachment to be edited
1681        :param data: The data parameter should be a dictionary of the fields to edit.
1682        :return: The edited attachment
1683        """
1684
1685        args = {
1686            **self.__fields_for_path(),
1687            "attachment_id": attachment.id,
1688        }
1689        result = self.allspice_client.requests_patch(
1690            self.ATTACHMENT_PATH.format(**args),
1691            data=data,
1692        )
1693        return Attachment.parse_response(self.allspice_client, result)

Edit an attachment.

The list of params that can be edited is available at allspice.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment

Parameters
  • attachment: The attachment to be edited
  • data: The data parameter should be a dictionary of the fields to edit.
Returns

The edited attachment

def delete_attachment(self, attachment: Attachment):
1695    def delete_attachment(self, attachment: Attachment):
1696        """https://hub.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment"""
1697
1698        args = {
1699            **self.__fields_for_path(),
1700            "attachment_id": attachment.id,
1701        }
1702        self.allspice_client.requests_delete(self.ATTACHMENT_PATH.format(**args))
1703        attachment.deleted = True

allspice.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment

class Commit(allspice.baseapiobject.ReadonlyApiObject):
1706class Commit(ReadonlyApiObject):
1707    author: User
1708    commit: Dict[str, Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]]
1709    committer: Dict[str, Union[int, str, bool]]
1710    created: str
1711    files: List[Dict[str, str]]
1712    html_url: str
1713    inner_commit: Dict[str, Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]]
1714    parents: List[Union[Dict[str, str], Any]]
1715    sha: str
1716    stats: Dict[str, int]
1717    url: str
1718
1719    API_OBJECT = """/repos/{owner}/{repo}/commits/{sha}"""
1720    COMMIT_GET_STATUS = """/repos/{owner}/{repo}/commits/{sha}/status"""
1721    COMMIT_GET_STATUSES = """/repos/{owner}/{repo}/commits/{sha}/statuses"""
1722
1723    # Regex to extract owner and repo names from the url property
1724    URL_REGEXP = re.compile(r"/repos/([^/]+)/([^/]+)/git/commits")
1725
1726    def __init__(self, allspice_client):
1727        super().__init__(allspice_client)
1728
1729    _fields_to_parsers: ClassVar[dict] = {
1730        # NOTE: api may return None for commiters that are no allspice users
1731        "author": lambda allspice_client, u: (
1732            User.parse_response(allspice_client, u) if u else None
1733        )
1734    }
1735
1736    def __eq__(self, other):
1737        if not isinstance(other, Commit):
1738            return False
1739        return self.sha == other.sha
1740
1741    def __hash__(self):
1742        return hash(self.sha)
1743
1744    @classmethod
1745    def parse_response(cls, allspice_client, result) -> "Commit":
1746        commit_cache = result["commit"]
1747        api_object = cls(allspice_client)
1748        cls._initialize(allspice_client, api_object, result)
1749        # inner_commit for legacy reasons
1750        Commit._add_read_property("inner_commit", commit_cache, api_object)
1751        return api_object
1752
1753    def get_status(self) -> CommitCombinedStatus:
1754        """
1755        Get a combined status consisting of all statues on this commit.
1756
1757        Note that the returned object is a CommitCombinedStatus object, which
1758        also contains a list of all statuses on the commit.
1759
1760        https://hub.allspice.io/api/swagger#/repository/repoGetCommitStatus
1761        """
1762
1763        result = self.allspice_client.requests_get(
1764            self.COMMIT_GET_STATUS.format(**self._fields_for_path)
1765        )
1766        return CommitCombinedStatus.parse_response(self.allspice_client, result)
1767
1768    def get_statuses(self) -> List[CommitStatus]:
1769        """
1770        Get a list of all statuses on this commit.
1771
1772        https://hub.allspice.io/api/swagger#/repository/repoListCommitStatuses
1773        """
1774
1775        results = self.allspice_client.requests_get(
1776            self.COMMIT_GET_STATUSES.format(**self._fields_for_path)
1777        )
1778        return [CommitStatus.parse_response(self.allspice_client, result) for result in results]
1779
1780    @cached_property
1781    def _fields_for_path(self) -> dict[str, str]:
1782        matches = self.URL_REGEXP.search(self.url)
1783        if not matches:
1784            raise ValueError(f"Invalid commit URL: {self.url}")
1785
1786        return {
1787            "owner": matches.group(1),
1788            "repo": matches.group(2),
1789            "sha": self.sha,
1790        }
Commit(allspice_client)
1726    def __init__(self, allspice_client):
1727        super().__init__(allspice_client)
author: User
commit: Dict[str, Union[str, Dict[str, str], Dict[str, Union[bool, str, NoneType]]]]
committer: Dict[str, Union[int, str, bool]]
created: str
files: List[Dict[str, str]]
html_url: str
inner_commit: Dict[str, Union[str, Dict[str, str], Dict[str, Union[bool, str, NoneType]]]]
parents: List[Union[Dict[str, str], Any]]
sha: str
stats: Dict[str, int]
url: str
API_OBJECT = '/repos/{owner}/{repo}/commits/{sha}'
COMMIT_GET_STATUS = '/repos/{owner}/{repo}/commits/{sha}/status'
COMMIT_GET_STATUSES = '/repos/{owner}/{repo}/commits/{sha}/statuses'
URL_REGEXP = re.compile('/repos/([^/]+)/([^/]+)/git/commits')
@classmethod
def parse_response(cls, allspice_client, result) -> Commit:
1744    @classmethod
1745    def parse_response(cls, allspice_client, result) -> "Commit":
1746        commit_cache = result["commit"]
1747        api_object = cls(allspice_client)
1748        cls._initialize(allspice_client, api_object, result)
1749        # inner_commit for legacy reasons
1750        Commit._add_read_property("inner_commit", commit_cache, api_object)
1751        return api_object
def get_status(self) -> CommitCombinedStatus:
1753    def get_status(self) -> CommitCombinedStatus:
1754        """
1755        Get a combined status consisting of all statues on this commit.
1756
1757        Note that the returned object is a CommitCombinedStatus object, which
1758        also contains a list of all statuses on the commit.
1759
1760        https://hub.allspice.io/api/swagger#/repository/repoGetCommitStatus
1761        """
1762
1763        result = self.allspice_client.requests_get(
1764            self.COMMIT_GET_STATUS.format(**self._fields_for_path)
1765        )
1766        return CommitCombinedStatus.parse_response(self.allspice_client, result)

Get a combined status consisting of all statues on this commit.

Note that the returned object is a CommitCombinedStatus object, which also contains a list of all statuses on the commit.

allspice.allspice.io/api/swagger#/repository/repoGetCommitStatus">https://huballspice.allspice.io/api/swagger#/repository/repoGetCommitStatus

def get_statuses(self) -> List[CommitStatus]:
1768    def get_statuses(self) -> List[CommitStatus]:
1769        """
1770        Get a list of all statuses on this commit.
1771
1772        https://hub.allspice.io/api/swagger#/repository/repoListCommitStatuses
1773        """
1774
1775        results = self.allspice_client.requests_get(
1776            self.COMMIT_GET_STATUSES.format(**self._fields_for_path)
1777        )
1778        return [CommitStatus.parse_response(self.allspice_client, result) for result in results]

Get a list of all statuses on this commit.

allspice.allspice.io/api/swagger#/repository/repoListCommitStatuses">https://huballspice.allspice.io/api/swagger#/repository/repoListCommitStatuses

class CommitStatusState(enum.Enum):
1793class CommitStatusState(Enum):
1794    PENDING = "pending"
1795    SUCCESS = "success"
1796    ERROR = "error"
1797    FAILURE = "failure"
1798    WARNING = "warning"
1799
1800    @classmethod
1801    def try_init(cls, value: str) -> CommitStatusState | str:
1802        """
1803        Try converting a string to the enum, and if that fails, return the
1804        string itself.
1805        """
1806
1807        try:
1808            return cls(value)
1809        except ValueError:
1810            return value
PENDING = <CommitStatusState.PENDING: 'pending'>
SUCCESS = <CommitStatusState.SUCCESS: 'success'>
ERROR = <CommitStatusState.ERROR: 'error'>
FAILURE = <CommitStatusState.FAILURE: 'failure'>
WARNING = <CommitStatusState.WARNING: 'warning'>
@classmethod
def try_init(cls, value: str) -> CommitStatusState | str:
1800    @classmethod
1801    def try_init(cls, value: str) -> CommitStatusState | str:
1802        """
1803        Try converting a string to the enum, and if that fails, return the
1804        string itself.
1805        """
1806
1807        try:
1808            return cls(value)
1809        except ValueError:
1810            return value

Try converting a string to the enum, and if that fails, return the string itself.

class CommitStatus(allspice.baseapiobject.ReadonlyApiObject):
1813class CommitStatus(ReadonlyApiObject):
1814    context: str
1815    created_at: str
1816    creator: User
1817    description: str
1818    id: int
1819    status: CommitStatusState
1820    target_url: str
1821    updated_at: str
1822    url: str
1823
1824    def __init__(self, allspice_client):
1825        super().__init__(allspice_client)
1826
1827    _fields_to_parsers: ClassVar[dict] = {
1828        # Gitea/ASH doesn't actually validate that the status is a "valid"
1829        # status, so we can expect empty or unknown strings in the status field.
1830        "status": lambda _, s: CommitStatusState.try_init(s),
1831        "creator": lambda allspice_client, u: (
1832            User.parse_response(allspice_client, u) if u else None
1833        ),
1834    }
1835
1836    def __eq__(self, other):
1837        if not isinstance(other, CommitStatus):
1838            return False
1839        return self.id == other.id
1840
1841    def __hash__(self):
1842        return hash(self.id)
CommitStatus(allspice_client)
1824    def __init__(self, allspice_client):
1825        super().__init__(allspice_client)
context: str
created_at: str
creator: User
description: str
id: int
target_url: str
updated_at: str
url: str
class CommitCombinedStatus(allspice.baseapiobject.ReadonlyApiObject):
1845class CommitCombinedStatus(ReadonlyApiObject):
1846    commit_url: str
1847    repository: Repository
1848    sha: str
1849    state: CommitStatusState
1850    statuses: List["CommitStatus"]
1851    total_count: int
1852    url: str
1853
1854    def __init__(self, allspice_client):
1855        super().__init__(allspice_client)
1856
1857    _fields_to_parsers: ClassVar[dict] = {
1858        # See CommitStatus
1859        "state": lambda _, s: CommitStatusState.try_init(s),
1860        "statuses": lambda allspice_client, statuses: [
1861            CommitStatus.parse_response(allspice_client, status) for status in statuses
1862        ],
1863        "repository": lambda allspice_client, r: Repository.parse_response(allspice_client, r),
1864    }
1865
1866    def __eq__(self, other):
1867        if not isinstance(other, CommitCombinedStatus):
1868            return False
1869        return self.sha == other.sha
1870
1871    def __hash__(self):
1872        return hash(self.sha)
1873
1874    @classmethod
1875    def parse_response(cls, allspice_client, result) -> "CommitCombinedStatus":
1876        api_object = cls(allspice_client)
1877        cls._initialize(allspice_client, api_object, result)
1878        return api_object
CommitCombinedStatus(allspice_client)
1854    def __init__(self, allspice_client):
1855        super().__init__(allspice_client)
commit_url: str
repository: Repository
sha: str
statuses: List[CommitStatus]
total_count: int
url: str
@classmethod
def parse_response(cls, allspice_client, result) -> CommitCombinedStatus:
1874    @classmethod
1875    def parse_response(cls, allspice_client, result) -> "CommitCombinedStatus":
1876        api_object = cls(allspice_client)
1877        cls._initialize(allspice_client, api_object, result)
1878        return api_object
class Issue(allspice.baseapiobject.ApiObject):
1881class Issue(ApiObject):
1882    """
1883    An issue on a repository.
1884
1885    Note: `Issue.assets` may not have any entries even if the issue has
1886    attachments. This happens when an issue is fetched via a bulk method like
1887    `Repository.get_issues`. In most cases, prefer using
1888    `Issue.get_attachments` to get the attachments on an issue.
1889    """
1890
1891    assets: List[Union[Any, "Attachment"]]
1892    assignee: Any
1893    assignees: Any
1894    body: str
1895    closed_at: Any
1896    comments: int
1897    created_at: str
1898    due_date: Any
1899    html_url: str
1900    id: int
1901    is_locked: bool
1902    labels: List[Any]
1903    milestone: Optional["Milestone"]
1904    number: int
1905    original_author: str
1906    original_author_id: int
1907    pin_order: int
1908    pull_request: Any
1909    ref: str
1910    repository: Dict[str, Union[int, str]]
1911    state: str
1912    title: str
1913    updated_at: str
1914    url: str
1915    user: User
1916
1917    API_OBJECT = """/repos/{owner}/{repo}/issues/{index}"""  # <owner, repo, index>
1918    GET_TIME = """/repos/%s/%s/issues/%s/times"""  # <owner, repo, index>
1919    GET_COMMENTS = """/repos/{owner}/{repo}/issues/{index}/comments"""
1920    CREATE_ISSUE = """/repos/{owner}/{repo}/issues"""
1921    GET_ATTACHMENTS = """/repos/{owner}/{repo}/issues/{index}/assets"""
1922
1923    OPENED = "open"
1924    CLOSED = "closed"
1925
1926    def __init__(self, allspice_client):
1927        super().__init__(allspice_client)
1928
1929    def __eq__(self, other):
1930        if not isinstance(other, Issue):
1931            return False
1932        return self.repository == other.repository and self.id == other.id
1933
1934    def __hash__(self):
1935        return hash(self.repository) ^ hash(self.id)
1936
1937    _fields_to_parsers: ClassVar[dict] = {
1938        "milestone": lambda allspice_client, m: Milestone.parse_response(allspice_client, m),
1939        "user": lambda allspice_client, u: User.parse_response(allspice_client, u),
1940        "assets": lambda allspice_client, assets: [
1941            Attachment.parse_response(allspice_client, a) for a in assets
1942        ],
1943        "assignee": lambda allspice_client, u: User.parse_response(allspice_client, u),
1944        "assignees": lambda allspice_client, us: [
1945            User.parse_response(allspice_client, u) for u in us
1946        ],
1947        "state": lambda _, s: (Issue.CLOSED if s == "closed" else Issue.OPENED),
1948    }
1949
1950    _parsers_to_fields: ClassVar[dict] = {
1951        "milestone": lambda m: m.id,
1952    }
1953
1954    _patchable_fields: ClassVar[set[str]] = {
1955        "assignee",
1956        "assignees",
1957        "body",
1958        "due_date",
1959        "milestone",
1960        "state",
1961        "title",
1962    }
1963
1964    def commit(self):
1965        args = {
1966            "owner": self.repository.owner.username,
1967            "repo": self.repository.name,
1968            "index": self.number,
1969        }
1970        self._commit(args)
1971
1972    @classmethod
1973    def request(cls, allspice_client, owner: str, repo: str, number: str):
1974        api_object = cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number})
1975        # The repository in the response is a RepositoryMeta object, so request
1976        # the full repository object and add it to the issue object.
1977        repository = Repository.request(allspice_client, owner, repo)
1978        setattr(api_object, "_repository", repository)
1979        # For legacy reasons
1980        cls._add_read_property("repo", repository, api_object)
1981        return api_object
1982
1983    @classmethod
1984    def create_issue(cls, allspice_client, repo: Repository, title: str, body: str = ""):
1985        args = {"owner": repo.owner.username, "repo": repo.name}
1986        data = {"title": title, "body": body}
1987        result = allspice_client.requests_post(Issue.CREATE_ISSUE.format(**args), data=data)
1988        issue = Issue.parse_response(allspice_client, result)
1989        setattr(issue, "_repository", repo)
1990        cls._add_read_property("repo", repo, issue)
1991        return issue
1992
1993    @property
1994    def owner(self) -> Organization | User:
1995        return self.repository.owner
1996
1997    def get_time_sum(self, user: User) -> int:
1998        results = self.allspice_client.requests_get(
1999            Issue.GET_TIME % (self.owner.username, self.repository.name, self.number)
2000        )
2001        return sum(result["time"] for result in results if result and result["user_id"] == user.id)
2002
2003    def get_times(self) -> Optional[Dict]:
2004        return self.allspice_client.requests_get(
2005            Issue.GET_TIME % (self.owner.username, self.repository.name, self.number)
2006        )
2007
2008    def delete_time(self, time_id: str):
2009        path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times/{time_id}"
2010        self.allspice_client.requests_delete(path)
2011
2012    def add_time(self, time: int, created: Optional[str] = None, user_name: Optional[User] = None):
2013        path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times"
2014        self.allspice_client.requests_post(
2015            path, data={"created": created, "time": int(time), "user_name": user_name}
2016        )
2017
2018    def get_comments(self) -> List[Comment]:
2019        """https://hub.allspice.io/api/swagger#/issue/issueGetComments"""
2020
2021        results = self.allspice_client.requests_get(
2022            self.GET_COMMENTS.format(
2023                owner=self.owner.username, repo=self.repository.name, index=self.number
2024            )
2025        )
2026
2027        return [Comment.parse_response(self.allspice_client, result) for result in results]
2028
2029    def create_comment(self, body: str) -> Comment:
2030        """https://hub.allspice.io/api/swagger#/issue/issueCreateComment"""
2031
2032        path = self.GET_COMMENTS.format(
2033            owner=self.owner.username, repo=self.repository.name, index=self.number
2034        )
2035
2036        response = self.allspice_client.requests_post(path, data={"body": body})
2037        return Comment.parse_response(self.allspice_client, response)
2038
2039    def get_attachments(self) -> List[Attachment]:
2040        """
2041        Fetch all attachments on this issue.
2042
2043        Unlike the assets field, this will always fetch all attachments from the
2044        API.
2045
2046        See https://hub.allspice.io/api/swagger#/issue/issueListIssueAttachments
2047        """
2048
2049        path = self.GET_ATTACHMENTS.format(
2050            owner=self.owner.username, repo=self.repository.name, index=self.number
2051        )
2052        response = self.allspice_client.requests_get(path)
2053
2054        return [Attachment.parse_response(self.allspice_client, result) for result in response]
2055
2056    def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment:
2057        """
2058        Create an attachment on this issue.
2059
2060        https://hub.allspice.io/api/swagger#/issue/issueCreateIssueAttachment
2061
2062        :param file: The file to attach. This should be a file-like object.
2063        :param name: The name of the file. If not provided, the name of the file will be
2064                     used.
2065        :return: The created attachment.
2066        """
2067
2068        args: dict[str, Any] = {
2069            "files": {"attachment": file},
2070        }
2071        if name is not None:
2072            args["params"] = {"name": name}
2073
2074        result = self.allspice_client.requests_post(
2075            self.GET_ATTACHMENTS.format(
2076                owner=self.owner.username, repo=self.repository.name, index=self.number
2077            ),
2078            **args,
2079        )
2080
2081        return Attachment.parse_response(self.allspice_client, result)

An issue on a repository.

Note: Issue.assets may not have any entries even if the issue has attachments. This happens when an issue is fetched via a bulk method like Repository.get_issues. In most cases, prefer using Issue.get_attachments to get the attachments on an issue.

Issue(allspice_client)
1926    def __init__(self, allspice_client):
1927        super().__init__(allspice_client)
assets: List[Union[Any, Attachment]]
assignee: Any
assignees: Any
body: str
closed_at: Any
comments: int
created_at: str
due_date: Any
html_url: str
id: int
is_locked: bool
labels: List[Any]
milestone: Optional[Milestone]
number: int
original_author: str
original_author_id: int
pin_order: int
pull_request: Any
ref: str
repository: Dict[str, Union[int, str]]
state: str
title: str
updated_at: str
url: str
user: User
API_OBJECT = '/repos/{owner}/{repo}/issues/{index}'
GET_TIME = '/repos/%s/%s/issues/%s/times'
GET_COMMENTS = '/repos/{owner}/{repo}/issues/{index}/comments'
CREATE_ISSUE = '/repos/{owner}/{repo}/issues'
GET_ATTACHMENTS = '/repos/{owner}/{repo}/issues/{index}/assets'
OPENED = 'open'
CLOSED = 'closed'
def commit(self):
1964    def commit(self):
1965        args = {
1966            "owner": self.repository.owner.username,
1967            "repo": self.repository.name,
1968            "index": self.number,
1969        }
1970        self._commit(args)
@classmethod
def request(cls, allspice_client, owner: str, repo: str, number: str):
1972    @classmethod
1973    def request(cls, allspice_client, owner: str, repo: str, number: str):
1974        api_object = cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number})
1975        # The repository in the response is a RepositoryMeta object, so request
1976        # the full repository object and add it to the issue object.
1977        repository = Repository.request(allspice_client, owner, repo)
1978        setattr(api_object, "_repository", repository)
1979        # For legacy reasons
1980        cls._add_read_property("repo", repository, api_object)
1981        return api_object
@classmethod
def create_issue( cls, allspice_client, repo: Repository, title: str, body: str = ''):
1983    @classmethod
1984    def create_issue(cls, allspice_client, repo: Repository, title: str, body: str = ""):
1985        args = {"owner": repo.owner.username, "repo": repo.name}
1986        data = {"title": title, "body": body}
1987        result = allspice_client.requests_post(Issue.CREATE_ISSUE.format(**args), data=data)
1988        issue = Issue.parse_response(allspice_client, result)
1989        setattr(issue, "_repository", repo)
1990        cls._add_read_property("repo", repo, issue)
1991        return issue
owner: Organization | User
1993    @property
1994    def owner(self) -> Organization | User:
1995        return self.repository.owner
def get_time_sum(self, user: User) -> int:
1997    def get_time_sum(self, user: User) -> int:
1998        results = self.allspice_client.requests_get(
1999            Issue.GET_TIME % (self.owner.username, self.repository.name, self.number)
2000        )
2001        return sum(result["time"] for result in results if result and result["user_id"] == user.id)
def get_times(self) -> Optional[Dict]:
2003    def get_times(self) -> Optional[Dict]:
2004        return self.allspice_client.requests_get(
2005            Issue.GET_TIME % (self.owner.username, self.repository.name, self.number)
2006        )
def delete_time(self, time_id: str):
2008    def delete_time(self, time_id: str):
2009        path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times/{time_id}"
2010        self.allspice_client.requests_delete(path)
def add_time( self, time: int, created: Optional[str] = None, user_name: Optional[User] = None):
2012    def add_time(self, time: int, created: Optional[str] = None, user_name: Optional[User] = None):
2013        path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times"
2014        self.allspice_client.requests_post(
2015            path, data={"created": created, "time": int(time), "user_name": user_name}
2016        )
def get_comments(self) -> List[Comment]:
2018    def get_comments(self) -> List[Comment]:
2019        """https://hub.allspice.io/api/swagger#/issue/issueGetComments"""
2020
2021        results = self.allspice_client.requests_get(
2022            self.GET_COMMENTS.format(
2023                owner=self.owner.username, repo=self.repository.name, index=self.number
2024            )
2025        )
2026
2027        return [Comment.parse_response(self.allspice_client, result) for result in results]

allspice.allspice.io/api/swagger#/issue/issueGetComments">https://huballspice.allspice.io/api/swagger#/issue/issueGetComments

def create_comment(self, body: str) -> Comment:
2029    def create_comment(self, body: str) -> Comment:
2030        """https://hub.allspice.io/api/swagger#/issue/issueCreateComment"""
2031
2032        path = self.GET_COMMENTS.format(
2033            owner=self.owner.username, repo=self.repository.name, index=self.number
2034        )
2035
2036        response = self.allspice_client.requests_post(path, data={"body": body})
2037        return Comment.parse_response(self.allspice_client, response)

allspice.allspice.io/api/swagger#/issue/issueCreateComment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateComment

def get_attachments(self) -> List[Attachment]:
2039    def get_attachments(self) -> List[Attachment]:
2040        """
2041        Fetch all attachments on this issue.
2042
2043        Unlike the assets field, this will always fetch all attachments from the
2044        API.
2045
2046        See https://hub.allspice.io/api/swagger#/issue/issueListIssueAttachments
2047        """
2048
2049        path = self.GET_ATTACHMENTS.format(
2050            owner=self.owner.username, repo=self.repository.name, index=self.number
2051        )
2052        response = self.allspice_client.requests_get(path)
2053
2054        return [Attachment.parse_response(self.allspice_client, result) for result in response]

Fetch all attachments on this issue.

Unlike the assets field, this will always fetch all attachments from the API.

See allspice.allspice.io/api/swagger#/issue/issueListIssueAttachments">https://huballspice.allspice.io/api/swagger#/issue/issueListIssueAttachments

def create_attachment( self, file: <class 'IO'>, name: Optional[str] = None) -> Attachment:
2056    def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment:
2057        """
2058        Create an attachment on this issue.
2059
2060        https://hub.allspice.io/api/swagger#/issue/issueCreateIssueAttachment
2061
2062        :param file: The file to attach. This should be a file-like object.
2063        :param name: The name of the file. If not provided, the name of the file will be
2064                     used.
2065        :return: The created attachment.
2066        """
2067
2068        args: dict[str, Any] = {
2069            "files": {"attachment": file},
2070        }
2071        if name is not None:
2072            args["params"] = {"name": name}
2073
2074        result = self.allspice_client.requests_post(
2075            self.GET_ATTACHMENTS.format(
2076                owner=self.owner.username, repo=self.repository.name, index=self.number
2077            ),
2078            **args,
2079        )
2080
2081        return Attachment.parse_response(self.allspice_client, result)

Create an attachment on this issue.

allspice.allspice.io/api/swagger#/issue/issueCreateIssueAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateIssueAttachment

Parameters
  • file: The file to attach. This should be a file-like object.
  • name: The name of the file. If not provided, the name of the file will be used.
Returns

The created attachment.

class DesignReview(allspice.baseapiobject.ApiObject):
2084class DesignReview(ApiObject):
2085    """
2086    A Design Review. See
2087    https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest.
2088
2089    Note: The base and head fields are not `Branch` objects - they are plain strings
2090    referring to the branch names. This is because DRs can exist for branches that have
2091    been deleted, which don't have an associated `Branch` object from the API. You can use
2092    the `Repository.get_branch` method to get a `Branch` object for a branch if you know
2093    it exists.
2094    """
2095
2096    additions: int
2097    allow_maintainer_edit: bool
2098    allow_maintainer_edits: Any
2099    assignee: User
2100    assignees: List["User"]
2101    base: str
2102    body: str
2103    changed_files: int
2104    closed_at: Optional[str]
2105    comments: int
2106    created_at: str
2107    deletions: int
2108    diff_url: str
2109    draft: bool
2110    due_date: Optional[str]
2111    head: str
2112    html_url: str
2113    id: int
2114    is_locked: bool
2115    labels: List[Any]
2116    merge_base: str
2117    merge_commit_sha: Optional[str]
2118    mergeable: bool
2119    merged: bool
2120    merged_at: Optional[str]
2121    merged_by: Optional["User"]
2122    milestone: Any
2123    number: int
2124    patch_url: str
2125    pin_order: int
2126    repository: Optional["Repository"]
2127    requested_reviewers: Any
2128    review_comments: int
2129    state: str
2130    title: str
2131    updated_at: str
2132    url: str
2133    user: User
2134
2135    API_OBJECT = "/repos/{owner}/{repo}/pulls/{index}"
2136    MERGE_DESIGN_REVIEW = "/repos/{owner}/{repo}/pulls/{index}/merge"
2137    GET_COMMENTS = "/repos/{owner}/{repo}/issues/{index}/comments"
2138
2139    OPEN = "open"
2140    CLOSED = "closed"
2141
2142    class MergeType(Enum):
2143        MERGE = "merge"
2144        REBASE = "rebase"
2145        REBASE_MERGE = "rebase-merge"
2146        SQUASH = "squash"
2147        MANUALLY_MERGED = "manually-merged"
2148
2149    def __init__(self, allspice_client):
2150        super().__init__(allspice_client)
2151
2152    def __eq__(self, other):
2153        if not isinstance(other, DesignReview):
2154            return False
2155        return self.repository == other.repository and self.id == other.id
2156
2157    def __hash__(self):
2158        return hash(self.repository) ^ hash(self.id)
2159
2160    @classmethod
2161    def parse_response(cls, allspice_client, result) -> "DesignReview":
2162        api_object = super().parse_response(allspice_client, result)
2163        cls._add_read_property(
2164            "repository",
2165            Repository.parse_response(allspice_client, result["base"]["repo"]),
2166            api_object,
2167        )
2168
2169        return api_object
2170
2171    @classmethod
2172    def request(cls, allspice_client, owner: str, repo: str, number: str):
2173        """See https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest"""
2174        return cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number})
2175
2176    _fields_to_parsers: ClassVar[dict] = {
2177        "assignee": lambda allspice_client, u: User.parse_response(allspice_client, u),
2178        "assignees": lambda allspice_client, us: [
2179            User.parse_response(allspice_client, u) for u in us
2180        ],
2181        "base": lambda _, b: b["ref"],
2182        "head": lambda _, h: h["ref"],
2183        "merged_by": lambda allspice_client, u: User.parse_response(allspice_client, u),
2184        "milestone": lambda allspice_client, m: Milestone.parse_response(allspice_client, m),
2185        "user": lambda allspice_client, u: User.parse_response(allspice_client, u),
2186    }
2187
2188    _patchable_fields: ClassVar[set[str]] = {
2189        "allow_maintainer_edits",
2190        "assignee",
2191        "assignees",
2192        "base",
2193        "body",
2194        "due_date",
2195        "milestone",
2196        "state",
2197        "title",
2198    }
2199
2200    _parsers_to_fields: ClassVar[dict] = {
2201        "assignee": lambda u: u.username,
2202        "assignees": lambda us: [u.username for u in us],
2203        "base": lambda b: b.name if isinstance(b, Branch) else b,
2204        "milestone": lambda m: m.id,
2205    }
2206
2207    def commit(self):
2208        data = self.get_dirty_fields()
2209        if "due_date" in data and data["due_date"] is None:
2210            data["unset_due_date"] = True
2211
2212        args = {
2213            "owner": self.repository.owner.username,
2214            "repo": self.repository.name,
2215            "index": self.number,
2216        }
2217        self._commit(args, data)
2218
2219    def merge(self, merge_type: MergeType):
2220        """
2221        Merge the pull request. See
2222        https://hub.allspice.io/api/swagger#/repository/repoMergePullRequest
2223
2224        :param merge_type: The type of merge to perform. See the MergeType enum.
2225        """
2226
2227        self.allspice_client.requests_post(
2228            self.MERGE_DESIGN_REVIEW.format(
2229                owner=self.repository.owner.username,
2230                repo=self.repository.name,
2231                index=self.number,
2232            ),
2233            data={"Do": merge_type.value},
2234        )
2235
2236    def get_comments(self) -> List[Comment]:
2237        """
2238        Get the comments on this pull request, but not specifically on a review.
2239
2240        https://hub.allspice.io/api/swagger#/issue/issueGetComments
2241
2242        :return: A list of comments on this pull request.
2243        """
2244
2245        results = self.allspice_client.requests_get(
2246            self.GET_COMMENTS.format(
2247                owner=self.repository.owner.username,
2248                repo=self.repository.name,
2249                index=self.number,
2250            )
2251        )
2252        return [Comment.parse_response(self.allspice_client, result) for result in results]
2253
2254    def create_comment(self, body: str):
2255        """
2256        Create a comment on this pull request. This uses the same endpoint as the
2257        comments on issues, and will not be associated with any reviews.
2258
2259        https://hub.allspice.io/api/swagger#/issue/issueCreateComment
2260
2261        :param body: The body of the comment.
2262        :return: The comment that was created.
2263        """
2264
2265        result = self.allspice_client.requests_post(
2266            self.GET_COMMENTS.format(
2267                owner=self.repository.owner.username,
2268                repo=self.repository.name,
2269                index=self.number,
2270            ),
2271            data={"body": body},
2272        )
2273        return Comment.parse_response(self.allspice_client, result)

A Design Review. See allspice.allspice.io/api/swagger#/repository/repoGetPullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoGetPullRequest.

Note: The base and head fields are not Branch objects - they are plain strings referring to the branch names. This is because DRs can exist for branches that have been deleted, which don't have an associated Branch object from the API. You can use the Repository.get_branch method to get a Branch object for a branch if you know it exists.

DesignReview(allspice_client)
2149    def __init__(self, allspice_client):
2150        super().__init__(allspice_client)
additions: int
allow_maintainer_edit: bool
allow_maintainer_edits: Any
assignee: User
assignees: List[User]
base: str
body: str
changed_files: int
closed_at: Optional[str]
comments: int
created_at: str
deletions: int
diff_url: str
draft: bool
due_date: Optional[str]
head: str
html_url: str
id: int
is_locked: bool
labels: List[Any]
merge_base: str
merge_commit_sha: Optional[str]
mergeable: bool
merged: bool
merged_at: Optional[str]
merged_by: Optional[User]
milestone: Any
number: int
patch_url: str
pin_order: int
repository: Optional[Repository]
requested_reviewers: Any
review_comments: int
state: str
title: str
updated_at: str
url: str
user: User
API_OBJECT = '/repos/{owner}/{repo}/pulls/{index}'
MERGE_DESIGN_REVIEW = '/repos/{owner}/{repo}/pulls/{index}/merge'
GET_COMMENTS = '/repos/{owner}/{repo}/issues/{index}/comments'
OPEN = 'open'
CLOSED = 'closed'
@classmethod
def parse_response(cls, allspice_client, result) -> DesignReview:
2160    @classmethod
2161    def parse_response(cls, allspice_client, result) -> "DesignReview":
2162        api_object = super().parse_response(allspice_client, result)
2163        cls._add_read_property(
2164            "repository",
2165            Repository.parse_response(allspice_client, result["base"]["repo"]),
2166            api_object,
2167        )
2168
2169        return api_object
@classmethod
def request(cls, allspice_client, owner: str, repo: str, number: str):
2171    @classmethod
2172    def request(cls, allspice_client, owner: str, repo: str, number: str):
2173        """See https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest"""
2174        return cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number})

See allspice.allspice.io/api/swagger#/repository/repoGetPullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoGetPullRequest

def commit(self):
2207    def commit(self):
2208        data = self.get_dirty_fields()
2209        if "due_date" in data and data["due_date"] is None:
2210            data["unset_due_date"] = True
2211
2212        args = {
2213            "owner": self.repository.owner.username,
2214            "repo": self.repository.name,
2215            "index": self.number,
2216        }
2217        self._commit(args, data)
def merge(self, merge_type: DesignReview.MergeType):
2219    def merge(self, merge_type: MergeType):
2220        """
2221        Merge the pull request. See
2222        https://hub.allspice.io/api/swagger#/repository/repoMergePullRequest
2223
2224        :param merge_type: The type of merge to perform. See the MergeType enum.
2225        """
2226
2227        self.allspice_client.requests_post(
2228            self.MERGE_DESIGN_REVIEW.format(
2229                owner=self.repository.owner.username,
2230                repo=self.repository.name,
2231                index=self.number,
2232            ),
2233            data={"Do": merge_type.value},
2234        )

Merge the pull request. See allspice.allspice.io/api/swagger#/repository/repoMergePullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoMergePullRequest

Parameters
  • merge_type: The type of merge to perform. See the MergeType enum.
def get_comments(self) -> List[Comment]:
2236    def get_comments(self) -> List[Comment]:
2237        """
2238        Get the comments on this pull request, but not specifically on a review.
2239
2240        https://hub.allspice.io/api/swagger#/issue/issueGetComments
2241
2242        :return: A list of comments on this pull request.
2243        """
2244
2245        results = self.allspice_client.requests_get(
2246            self.GET_COMMENTS.format(
2247                owner=self.repository.owner.username,
2248                repo=self.repository.name,
2249                index=self.number,
2250            )
2251        )
2252        return [Comment.parse_response(self.allspice_client, result) for result in results]

Get the comments on this pull request, but not specifically on a review.

allspice.allspice.io/api/swagger#/issue/issueGetComments">https://huballspice.allspice.io/api/swagger#/issue/issueGetComments

Returns

A list of comments on this pull request.

def create_comment(self, body: str):
2254    def create_comment(self, body: str):
2255        """
2256        Create a comment on this pull request. This uses the same endpoint as the
2257        comments on issues, and will not be associated with any reviews.
2258
2259        https://hub.allspice.io/api/swagger#/issue/issueCreateComment
2260
2261        :param body: The body of the comment.
2262        :return: The comment that was created.
2263        """
2264
2265        result = self.allspice_client.requests_post(
2266            self.GET_COMMENTS.format(
2267                owner=self.repository.owner.username,
2268                repo=self.repository.name,
2269                index=self.number,
2270            ),
2271            data={"body": body},
2272        )
2273        return Comment.parse_response(self.allspice_client, result)

Create a comment on this pull request. This uses the same endpoint as the comments on issues, and will not be associated with any reviews.

allspice.allspice.io/api/swagger#/issue/issueCreateComment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateComment

Parameters
  • body: The body of the comment.
Returns

The comment that was created.

class DesignReview.MergeType(enum.Enum):
2142    class MergeType(Enum):
2143        MERGE = "merge"
2144        REBASE = "rebase"
2145        REBASE_MERGE = "rebase-merge"
2146        SQUASH = "squash"
2147        MANUALLY_MERGED = "manually-merged"
MERGE = <MergeType.MERGE: 'merge'>
REBASE = <MergeType.REBASE: 'rebase'>
REBASE_MERGE = <MergeType.REBASE_MERGE: 'rebase-merge'>
SQUASH = <MergeType.SQUASH: 'squash'>
MANUALLY_MERGED = <MergeType.MANUALLY_MERGED: 'manually-merged'>
class Team(allspice.baseapiobject.ApiObject):
2276class Team(ApiObject):
2277    can_create_org_repo: bool
2278    description: str
2279    id: int
2280    includes_all_repositories: bool
2281    name: str
2282    organization: Optional["Organization"]
2283    permission: str
2284    units: List[str]
2285    units_map: Dict[str, str]
2286
2287    API_OBJECT = """/teams/{id}"""  # <id>
2288    ADD_REPO = """/teams/%s/repos/%s/%s"""  # <id, org, repo>
2289    TEAM_DELETE = """/teams/%s"""  # <id>
2290    GET_MEMBERS = """/teams/%s/members"""  # <id>
2291    GET_REPOS = """/teams/%s/repos"""  # <id>
2292
2293    def __init__(self, allspice_client):
2294        super().__init__(allspice_client)
2295
2296    def __eq__(self, other):
2297        if not isinstance(other, Team):
2298            return False
2299        return self.organization == other.organization and self.id == other.id
2300
2301    def __hash__(self):
2302        return hash(self.organization) ^ hash(self.id)
2303
2304    _fields_to_parsers: ClassVar[dict] = {
2305        "organization": lambda allspice_client, o: Organization.parse_response(allspice_client, o)
2306    }
2307
2308    _patchable_fields: ClassVar[set[str]] = {
2309        "can_create_org_repo",
2310        "description",
2311        "includes_all_repositories",
2312        "name",
2313        "permission",
2314        "units",
2315        "units_map",
2316    }
2317
2318    @classmethod
2319    def request(cls, allspice_client, id: int):
2320        return cls._request(allspice_client, {"id": id})
2321
2322    def commit(self):
2323        args = {"id": self.id}
2324        self._commit(args)
2325
2326    def add_user(self, user: User):
2327        """https://hub.allspice.io/api/swagger#/organization/orgAddTeamMember"""
2328        url = f"/teams/{self.id}/members/{user.login}"
2329        self.allspice_client.requests_put(url)
2330
2331    def add_repo(self, org: Organization, repo: Union[Repository, str]):
2332        if isinstance(repo, Repository):
2333            repo_name = repo.name
2334        else:
2335            repo_name = repo
2336        self.allspice_client.requests_put(Team.ADD_REPO % (self.id, org.username, repo_name))
2337
2338    def get_members(self):
2339        """Get all users assigned to the team."""
2340        results = self.allspice_client.requests_get_paginated(
2341            Team.GET_MEMBERS % self.id,
2342        )
2343        return [User.parse_response(self.allspice_client, result) for result in results]
2344
2345    def get_repos(self):
2346        """Get all repos of this Team."""
2347        results = self.allspice_client.requests_get(Team.GET_REPOS % self.id)
2348        return [Repository.parse_response(self.allspice_client, result) for result in results]
2349
2350    def delete(self):
2351        self.allspice_client.requests_delete(Team.TEAM_DELETE % self.id)
2352        self.deleted = True
2353
2354    def remove_team_member(self, user_name: str):
2355        url = f"/teams/{self.id}/members/{user_name}"
2356        self.allspice_client.requests_delete(url)
Team(allspice_client)
2293    def __init__(self, allspice_client):
2294        super().__init__(allspice_client)
can_create_org_repo: bool
description: str
id: int
includes_all_repositories: bool
name: str
organization: Optional[Organization]
permission: str
units: List[str]
units_map: Dict[str, str]
API_OBJECT = '/teams/{id}'
ADD_REPO = '/teams/%s/repos/%s/%s'
TEAM_DELETE = '/teams/%s'
GET_MEMBERS = '/teams/%s/members'
GET_REPOS = '/teams/%s/repos'
@classmethod
def request(cls, allspice_client, id: int):
2318    @classmethod
2319    def request(cls, allspice_client, id: int):
2320        return cls._request(allspice_client, {"id": id})
def commit(self):
2322    def commit(self):
2323        args = {"id": self.id}
2324        self._commit(args)
def add_user(self, user: User):
2326    def add_user(self, user: User):
2327        """https://hub.allspice.io/api/swagger#/organization/orgAddTeamMember"""
2328        url = f"/teams/{self.id}/members/{user.login}"
2329        self.allspice_client.requests_put(url)

allspice.allspice.io/api/swagger#/organization/orgAddTeamMember">https://huballspice.allspice.io/api/swagger#/organization/orgAddTeamMember

def add_repo( self, org: Organization, repo: Union[Repository, str]):
2331    def add_repo(self, org: Organization, repo: Union[Repository, str]):
2332        if isinstance(repo, Repository):
2333            repo_name = repo.name
2334        else:
2335            repo_name = repo
2336        self.allspice_client.requests_put(Team.ADD_REPO % (self.id, org.username, repo_name))
def get_members(self):
2338    def get_members(self):
2339        """Get all users assigned to the team."""
2340        results = self.allspice_client.requests_get_paginated(
2341            Team.GET_MEMBERS % self.id,
2342        )
2343        return [User.parse_response(self.allspice_client, result) for result in results]

Get all users assigned to the team.

def get_repos(self):
2345    def get_repos(self):
2346        """Get all repos of this Team."""
2347        results = self.allspice_client.requests_get(Team.GET_REPOS % self.id)
2348        return [Repository.parse_response(self.allspice_client, result) for result in results]

Get all repos of this Team.

def delete(self):
2350    def delete(self):
2351        self.allspice_client.requests_delete(Team.TEAM_DELETE % self.id)
2352        self.deleted = True
def remove_team_member(self, user_name: str):
2354    def remove_team_member(self, user_name: str):
2355        url = f"/teams/{self.id}/members/{user_name}"
2356        self.allspice_client.requests_delete(url)
class Release(allspice.baseapiobject.ApiObject):
2359class Release(ApiObject):
2360    """
2361    A release on a repo.
2362    """
2363
2364    assets: List[Union[Any, Dict[str, Union[int, str]], "ReleaseAsset"]]
2365    author: User
2366    body: str
2367    created_at: str
2368    draft: bool
2369    html_url: str
2370    id: int
2371    name: str
2372    prerelease: bool
2373    published_at: str
2374    repo: Optional["Repository"]
2375    repository: Optional["Repository"]
2376    tag_name: str
2377    tarball_url: str
2378    target_commitish: str
2379    upload_url: str
2380    url: str
2381    zipball_url: str
2382
2383    API_OBJECT = "/repos/{owner}/{repo}/releases/{id}"
2384    RELEASE_CREATE_ASSET = "/repos/{owner}/{repo}/releases/{id}/assets"
2385    # Note that we don't strictly need the get_assets route, as the release
2386    # object already contains the assets.
2387
2388    def __init__(self, allspice_client):
2389        super().__init__(allspice_client)
2390
2391    def __eq__(self, other):
2392        if not isinstance(other, Release):
2393            return False
2394        return self.repo == other.repo and self.id == other.id
2395
2396    def __hash__(self):
2397        return hash(self.repo) ^ hash(self.id)
2398
2399    _fields_to_parsers: ClassVar[dict] = {
2400        "author": lambda allspice_client, author: User.parse_response(allspice_client, author),
2401    }
2402    _patchable_fields: ClassVar[set[str]] = {
2403        "body",
2404        "draft",
2405        "name",
2406        "prerelease",
2407        "tag_name",
2408        "target_commitish",
2409    }
2410
2411    @classmethod
2412    def parse_response(cls, allspice_client, result, repo) -> Release:
2413        release = super().parse_response(allspice_client, result)
2414        Release._add_read_property("repository", repo, release)
2415        # For legacy reasons
2416        Release._add_read_property("repo", repo, release)
2417        setattr(
2418            release,
2419            "_assets",
2420            [
2421                ReleaseAsset.parse_response(allspice_client, asset, release)
2422                for asset in result["assets"]
2423            ],
2424        )
2425        return release
2426
2427    @classmethod
2428    def request(
2429        cls,
2430        allspice_client,
2431        owner: str,
2432        repo: str,
2433        id: Optional[int] = None,
2434    ) -> Release:
2435        args = {"owner": owner, "repo": repo, "id": id}
2436        release_response = cls._get_gitea_api_object(allspice_client, args)
2437        repository = Repository.request(allspice_client, owner, repo)
2438        release = cls.parse_response(allspice_client, release_response, repository)
2439        return release
2440
2441    def commit(self):
2442        if self.repo is None:
2443            raise ValueError("Cannot commit a release without a repository.")
2444
2445        args = {"owner": self.repo.owner.username, "repo": self.repo.name, "id": self.id}
2446        self._commit(args)
2447
2448    def create_asset(self, file: IO, name: Optional[str] = None) -> ReleaseAsset:
2449        """
2450        Create an asset for this release.
2451
2452        https://hub.allspice.io/api/swagger#/repository/repoCreateReleaseAsset
2453
2454        :param file: The file to upload. This should be a file-like object.
2455        :param name: The name of the file.
2456        :return: The created asset.
2457        """
2458
2459        if self.repo is None:
2460            raise ValueError("Cannot commit a release without a repository.")
2461
2462        args: dict[str, Any] = {"files": {"attachment": file}}
2463        if name is not None:
2464            args["params"] = {"name": name}
2465
2466        result = self.allspice_client.requests_post(
2467            self.RELEASE_CREATE_ASSET.format(
2468                owner=self.repo.owner.username,
2469                repo=self.repo.name,
2470                id=self.id,
2471            ),
2472            **args,
2473        )
2474        return ReleaseAsset.parse_response(self.allspice_client, result, self)
2475
2476    def delete(self):
2477        if self.repo is None:
2478            raise ValueError("Cannot commit a release without a repository.")
2479
2480        args = {"owner": self.repo.owner.username, "repo": self.repo.name, "id": self.id}
2481        self.allspice_client.requests_delete(self.API_OBJECT.format(**args))
2482        self.deleted = True

A release on a repo.

Release(allspice_client)
2388    def __init__(self, allspice_client):
2389        super().__init__(allspice_client)
assets: List[Union[Any, Dict[str, Union[int, str]], ReleaseAsset]]
author: User
body: str
created_at: str
draft: bool
html_url: str
id: int
name: str
prerelease: bool
published_at: str
repo: Optional[Repository]
repository: Optional[Repository]
tag_name: str
tarball_url: str
target_commitish: str
upload_url: str
url: str
zipball_url: str
API_OBJECT = '/repos/{owner}/{repo}/releases/{id}'
RELEASE_CREATE_ASSET = '/repos/{owner}/{repo}/releases/{id}/assets'
@classmethod
def parse_response(cls, allspice_client, result, repo) -> Release:
2411    @classmethod
2412    def parse_response(cls, allspice_client, result, repo) -> Release:
2413        release = super().parse_response(allspice_client, result)
2414        Release._add_read_property("repository", repo, release)
2415        # For legacy reasons
2416        Release._add_read_property("repo", repo, release)
2417        setattr(
2418            release,
2419            "_assets",
2420            [
2421                ReleaseAsset.parse_response(allspice_client, asset, release)
2422                for asset in result["assets"]
2423            ],
2424        )
2425        return release
@classmethod
def request( cls, allspice_client, owner: str, repo: str, id: Optional[int] = None) -> Release:
2427    @classmethod
2428    def request(
2429        cls,
2430        allspice_client,
2431        owner: str,
2432        repo: str,
2433        id: Optional[int] = None,
2434    ) -> Release:
2435        args = {"owner": owner, "repo": repo, "id": id}
2436        release_response = cls._get_gitea_api_object(allspice_client, args)
2437        repository = Repository.request(allspice_client, owner, repo)
2438        release = cls.parse_response(allspice_client, release_response, repository)
2439        return release
def commit(self):
2441    def commit(self):
2442        if self.repo is None:
2443            raise ValueError("Cannot commit a release without a repository.")
2444
2445        args = {"owner": self.repo.owner.username, "repo": self.repo.name, "id": self.id}
2446        self._commit(args)
def create_asset( self, file: <class 'IO'>, name: Optional[str] = None) -> ReleaseAsset:
2448    def create_asset(self, file: IO, name: Optional[str] = None) -> ReleaseAsset:
2449        """
2450        Create an asset for this release.
2451
2452        https://hub.allspice.io/api/swagger#/repository/repoCreateReleaseAsset
2453
2454        :param file: The file to upload. This should be a file-like object.
2455        :param name: The name of the file.
2456        :return: The created asset.
2457        """
2458
2459        if self.repo is None:
2460            raise ValueError("Cannot commit a release without a repository.")
2461
2462        args: dict[str, Any] = {"files": {"attachment": file}}
2463        if name is not None:
2464            args["params"] = {"name": name}
2465
2466        result = self.allspice_client.requests_post(
2467            self.RELEASE_CREATE_ASSET.format(
2468                owner=self.repo.owner.username,
2469                repo=self.repo.name,
2470                id=self.id,
2471            ),
2472            **args,
2473        )
2474        return ReleaseAsset.parse_response(self.allspice_client, result, self)

Create an asset for this release.

allspice.allspice.io/api/swagger#/repository/repoCreateReleaseAsset">https://huballspice.allspice.io/api/swagger#/repository/repoCreateReleaseAsset

Parameters
  • file: The file to upload. This should be a file-like object.
  • name: The name of the file.
Returns

The created asset.

def delete(self):
2476    def delete(self):
2477        if self.repo is None:
2478            raise ValueError("Cannot commit a release without a repository.")
2479
2480        args = {"owner": self.repo.owner.username, "repo": self.repo.name, "id": self.id}
2481        self.allspice_client.requests_delete(self.API_OBJECT.format(**args))
2482        self.deleted = True
class ReleaseAsset(allspice.baseapiobject.ApiObject):
2485class ReleaseAsset(ApiObject):
2486    browser_download_url: str
2487    created_at: str
2488    download_count: int
2489    id: int
2490    name: str
2491    release: Optional["Release"]
2492    size: int
2493    uuid: str
2494
2495    API_OBJECT = "/repos/{owner}/{repo}/releases/{release_id}/assets/{id}"
2496
2497    def __init__(self, allspice_client):
2498        super().__init__(allspice_client)
2499
2500    def __eq__(self, other):
2501        if not isinstance(other, ReleaseAsset):
2502            return False
2503        return self.release == other.release and self.id == other.id
2504
2505    def __hash__(self):
2506        return hash(self.release) ^ hash(self.id)
2507
2508    _fields_to_parsers: ClassVar[dict] = {}
2509    _patchable_fields: ClassVar[set[str]] = {
2510        "name",
2511    }
2512
2513    @classmethod
2514    def parse_response(cls, allspice_client, result, release) -> ReleaseAsset:
2515        asset = super().parse_response(allspice_client, result)
2516        ReleaseAsset._add_read_property("release", release, asset)
2517        return asset
2518
2519    @classmethod
2520    def request(
2521        cls,
2522        allspice_client,
2523        owner: str,
2524        repo: str,
2525        release_id: int,
2526        id: int,
2527    ) -> ReleaseAsset:
2528        args = {"owner": owner, "repo": repo, "release_id": release_id, "id": id}
2529        asset_response = cls._get_gitea_api_object(allspice_client, args)
2530        release = Release.request(allspice_client, owner, repo, release_id)
2531        asset = cls.parse_response(allspice_client, asset_response, release)
2532        return asset
2533
2534    def commit(self):
2535        if self.release is None or self.release.repo is None:
2536            raise ValueError("Cannot commit a release asset without a release or a repository.")
2537
2538        args = {
2539            "owner": self.release.repo.owner.username,
2540            "repo": self.release.repo.name,
2541            "release_id": self.release.id,
2542            "id": self.id,
2543        }
2544        self._commit(args)
2545
2546    def download(self) -> bytes:
2547        """
2548        Download the raw, binary data of this asset.
2549
2550        Note 1: if the file you are requesting is a text file, you might want to
2551        use .decode() on the result to get a string. For example:
2552
2553            asset.download().decode("utf-8")
2554
2555        Note 2: this method will store the entire file in memory. If you are
2556        downloading a large file, you might want to use download_to_file instead.
2557        """
2558
2559        return self.allspice_client.requests.get(
2560            self.browser_download_url,
2561            headers=self.allspice_client.headers,
2562        ).content
2563
2564    def download_to_file(self, io: IO):
2565        """
2566        Download the raw, binary data of this asset to a file-like object.
2567
2568        Example:
2569
2570            with open("my_file.zip", "wb") as f:
2571                asset.download_to_file(f)
2572
2573        :param io: The file-like object to write the data to.
2574        """
2575
2576        response = self.allspice_client.requests.get(
2577            self.browser_download_url,
2578            headers=self.allspice_client.headers,
2579            stream=True,
2580        )
2581        # 4kb chunks
2582        for chunk in response.iter_content(chunk_size=4096):
2583            if chunk:
2584                io.write(chunk)
2585
2586    def delete(self):
2587        if self.release is None or self.release.repo is None:
2588            raise ValueError("Cannot commit a release asset without a release or a repository.")
2589
2590        args = {
2591            "owner": self.release.repo.owner.username,
2592            "repo": self.release.repo.name,
2593            "release_id": self.release.id,
2594            "id": self.id,
2595        }
2596        self.allspice_client.requests_delete(self.API_OBJECT.format(**args))
2597        self.deleted = True
ReleaseAsset(allspice_client)
2497    def __init__(self, allspice_client):
2498        super().__init__(allspice_client)
browser_download_url: str
created_at: str
download_count: int
id: int
name: str
release: Optional[Release]
size: int
uuid: str
API_OBJECT = '/repos/{owner}/{repo}/releases/{release_id}/assets/{id}'
@classmethod
def parse_response(cls, allspice_client, result, release) -> ReleaseAsset:
2513    @classmethod
2514    def parse_response(cls, allspice_client, result, release) -> ReleaseAsset:
2515        asset = super().parse_response(allspice_client, result)
2516        ReleaseAsset._add_read_property("release", release, asset)
2517        return asset
@classmethod
def request( cls, allspice_client, owner: str, repo: str, release_id: int, id: int) -> ReleaseAsset:
2519    @classmethod
2520    def request(
2521        cls,
2522        allspice_client,
2523        owner: str,
2524        repo: str,
2525        release_id: int,
2526        id: int,
2527    ) -> ReleaseAsset:
2528        args = {"owner": owner, "repo": repo, "release_id": release_id, "id": id}
2529        asset_response = cls._get_gitea_api_object(allspice_client, args)
2530        release = Release.request(allspice_client, owner, repo, release_id)
2531        asset = cls.parse_response(allspice_client, asset_response, release)
2532        return asset
def commit(self):
2534    def commit(self):
2535        if self.release is None or self.release.repo is None:
2536            raise ValueError("Cannot commit a release asset without a release or a repository.")
2537
2538        args = {
2539            "owner": self.release.repo.owner.username,
2540            "repo": self.release.repo.name,
2541            "release_id": self.release.id,
2542            "id": self.id,
2543        }
2544        self._commit(args)
def download(self) -> bytes:
2546    def download(self) -> bytes:
2547        """
2548        Download the raw, binary data of this asset.
2549
2550        Note 1: if the file you are requesting is a text file, you might want to
2551        use .decode() on the result to get a string. For example:
2552
2553            asset.download().decode("utf-8")
2554
2555        Note 2: this method will store the entire file in memory. If you are
2556        downloading a large file, you might want to use download_to_file instead.
2557        """
2558
2559        return self.allspice_client.requests.get(
2560            self.browser_download_url,
2561            headers=self.allspice_client.headers,
2562        ).content

Download the raw, binary data of this asset.

Note 1: if the file you are requesting is a text file, you might want to use .decode() on the result to get a string. For example:

asset.download().decode("utf-8")

Note 2: this method will store the entire file in memory. If you are downloading a large file, you might want to use download_to_file instead.

def download_to_file(self, io: <class 'IO'>):
2564    def download_to_file(self, io: IO):
2565        """
2566        Download the raw, binary data of this asset to a file-like object.
2567
2568        Example:
2569
2570            with open("my_file.zip", "wb") as f:
2571                asset.download_to_file(f)
2572
2573        :param io: The file-like object to write the data to.
2574        """
2575
2576        response = self.allspice_client.requests.get(
2577            self.browser_download_url,
2578            headers=self.allspice_client.headers,
2579            stream=True,
2580        )
2581        # 4kb chunks
2582        for chunk in response.iter_content(chunk_size=4096):
2583            if chunk:
2584                io.write(chunk)

Download the raw, binary data of this asset to a file-like object.

Example:

with open("my_file.zip", "wb") as f:
    asset.download_to_file(f)
Parameters
  • io: The file-like object to write the data to.
def delete(self):
2586    def delete(self):
2587        if self.release is None or self.release.repo is None:
2588            raise ValueError("Cannot commit a release asset without a release or a repository.")
2589
2590        args = {
2591            "owner": self.release.repo.owner.username,
2592            "repo": self.release.repo.name,
2593            "release_id": self.release.id,
2594            "id": self.id,
2595        }
2596        self.allspice_client.requests_delete(self.API_OBJECT.format(**args))
2597        self.deleted = True
class Content(allspice.baseapiobject.ReadonlyApiObject):
2600class Content(ReadonlyApiObject):
2601    content: Any
2602    download_url: str
2603    encoding: Any
2604    git_url: str
2605    html_url: str
2606    last_commit_sha: str
2607    name: str
2608    path: str
2609    sha: str
2610    size: int
2611    submodule_git_url: Any
2612    target: Any
2613    type: str
2614    url: str
2615
2616    FILE = "file"
2617
2618    def __init__(self, allspice_client):
2619        super().__init__(allspice_client)
2620
2621    def __eq__(self, other):
2622        if not isinstance(other, Content):
2623            return False
2624
2625        return self.sha == other.sha and self.name == other.name
2626
2627    def __hash__(self):
2628        return hash(self.sha) ^ hash(self.name)
Content(allspice_client)
2618    def __init__(self, allspice_client):
2619        super().__init__(allspice_client)
content: Any
download_url: str
encoding: Any
git_url: str
html_url: str
last_commit_sha: str
name: str
path: str
sha: str
size: int
submodule_git_url: Any
target: Any
type: str
url: str
FILE = 'file'
Ref = typing.Union[Branch, Commit, str]
class Util:
2634class Util:
2635    @staticmethod
2636    def convert_time(time: str) -> datetime:
2637        """Parsing of strange Gitea time format ("%Y-%m-%dT%H:%M:%S:%z" but with ":" in time zone notation)"""
2638        try:
2639            return datetime.strptime(time[:-3] + "00", "%Y-%m-%dT%H:%M:%S%z")
2640        except ValueError:
2641            return datetime.strptime(time[:-3] + "00", "%Y-%m-%dT%H:%M:%S")
2642
2643    @staticmethod
2644    def format_time(time: datetime) -> str:
2645        """
2646        Format a datetime object to Gitea's time format.
2647
2648        :param time: The time to format
2649        :return: Formatted time
2650        """
2651
2652        return time.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S") + "Z"
2653
2654    @staticmethod
2655    def data_params_for_ref(ref: Optional[Ref]) -> Dict:
2656        """
2657        Given a "ref", returns a dict with the ref parameter for the API call.
2658
2659        If the ref is None, returns an empty dict. You can pass this to the API
2660        directly.
2661        """
2662
2663        if isinstance(ref, Branch):
2664            return {"ref": ref.name}
2665        elif isinstance(ref, Commit):
2666            return {"ref": ref.sha}
2667        elif ref:
2668            return {"ref": ref}
2669        else:
2670            return {}
@staticmethod
def convert_time(time: str) -> datetime.datetime:
2635    @staticmethod
2636    def convert_time(time: str) -> datetime:
2637        """Parsing of strange Gitea time format ("%Y-%m-%dT%H:%M:%S:%z" but with ":" in time zone notation)"""
2638        try:
2639            return datetime.strptime(time[:-3] + "00", "%Y-%m-%dT%H:%M:%S%z")
2640        except ValueError:
2641            return datetime.strptime(time[:-3] + "00", "%Y-%m-%dT%H:%M:%S")

Parsing of strange Gitea time format ("%Y-%m-%dT%H:%M:%S:%z" but with ":" in time zone notation)

@staticmethod
def format_time(time: datetime.datetime) -> str:
2643    @staticmethod
2644    def format_time(time: datetime) -> str:
2645        """
2646        Format a datetime object to Gitea's time format.
2647
2648        :param time: The time to format
2649        :return: Formatted time
2650        """
2651
2652        return time.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S") + "Z"

Format a datetime object to Gitea's time format.

Parameters
  • time: The time to format
Returns

Formatted time

@staticmethod
def data_params_for_ref( ref: Union[Branch, Commit, str, NoneType]) -> Dict:
2654    @staticmethod
2655    def data_params_for_ref(ref: Optional[Ref]) -> Dict:
2656        """
2657        Given a "ref", returns a dict with the ref parameter for the API call.
2658
2659        If the ref is None, returns an empty dict. You can pass this to the API
2660        directly.
2661        """
2662
2663        if isinstance(ref, Branch):
2664            return {"ref": ref.name}
2665        elif isinstance(ref, Commit):
2666            return {"ref": ref.sha}
2667        elif ref:
2668            return {"ref": ref}
2669        else:
2670            return {}

Given a "ref", returns a dict with the ref parameter for the API call.

If the ref is None, returns an empty dict. You can pass this to the API directly.