allspice.apiobject

   1from __future__ import annotations
   2
   3import logging
   4import re
   5from datetime import datetime, timezone
   6from enum import Enum
   7from functools import cached_property
   8from typing import (
   9    IO,
  10    Any,
  11    ClassVar,
  12    Dict,
  13    FrozenSet,
  14    List,
  15    Literal,
  16    Optional,
  17    Sequence,
  18    Set,
  19    Tuple,
  20    Union,
  21)
  22
  23try:
  24    from typing_extensions import Self
  25except ImportError:
  26    from typing import Self
  27
  28from .baseapiobject import ApiObject, ReadonlyApiObject
  29from .exceptions import ConflictException, NotFoundException
  30
  31
  32class Organization(ApiObject):
  33    """see https://hub.allspice.io/api/swagger#/organization/orgGetAll"""
  34
  35    active: Optional[bool]
  36    avatar_url: str
  37    created: Optional[str]
  38    description: str
  39    email: str
  40    followers_count: Optional[int]
  41    following_count: Optional[int]
  42    full_name: str
  43    html_url: Optional[str]
  44    id: int
  45    is_admin: Optional[bool]
  46    language: Optional[str]
  47    last_login: Optional[str]
  48    location: str
  49    login: Optional[str]
  50    login_name: Optional[str]
  51    name: Optional[str]
  52    prohibit_login: Optional[bool]
  53    repo_admin_change_team_access: Optional[bool]
  54    restricted: Optional[bool]
  55    source_id: Optional[int]
  56    starred_repos_count: Optional[int]
  57    username: str
  58    visibility: str
  59    website: str
  60
  61    API_OBJECT = """/orgs/{name}"""  # <org>
  62    ORG_REPOS_REQUEST = """/orgs/%s/repos"""  # <org>
  63    ORG_TEAMS_REQUEST = """/orgs/%s/teams"""  # <org>
  64    ORG_TEAMS_CREATE = """/orgs/%s/teams"""  # <org>
  65    ORG_GET_MEMBERS = """/orgs/%s/members"""  # <org>
  66    ORG_IS_MEMBER = """/orgs/%s/members/%s"""  # <org>, <username>
  67    ORG_HEATMAP = """/users/%s/heatmap"""  # <username>
  68
  69    def __init__(self, allspice_client):
  70        super().__init__(allspice_client)
  71
  72    def __eq__(self, other):
  73        if not isinstance(other, Organization):
  74            return False
  75        return self.allspice_client == other.allspice_client and self.name == other.name
  76
  77    def __hash__(self):
  78        return hash(self.allspice_client) ^ hash(self.name)
  79
  80    @classmethod
  81    def request(cls, allspice_client, name: str) -> Self:
  82        return cls._request(allspice_client, {"name": name})
  83
  84    @classmethod
  85    def parse_response(cls, allspice_client, result) -> "Organization":
  86        api_object = super().parse_response(allspice_client, result)
  87        # add "name" field to make this behave similar to users for gitea < 1.18
  88        # also necessary for repository-owner when org is repo owner
  89        if not hasattr(api_object, "name"):
  90            Organization._add_read_property("name", result["username"], api_object)
  91        return api_object
  92
  93    _patchable_fields: ClassVar[set[str]] = {
  94        "description",
  95        "full_name",
  96        "location",
  97        "visibility",
  98        "website",
  99    }
 100
 101    def commit(self):
 102        args = {"name": self.name}
 103        self._commit(args)
 104
 105    def create_repo(
 106        self,
 107        repoName: str,
 108        description: str = "",
 109        private: bool = False,
 110        autoInit=True,
 111        gitignores: Optional[str] = None,
 112        license: Optional[str] = None,
 113        readme: str = "Default",
 114        issue_labels: Optional[str] = None,
 115        default_branch="master",
 116    ):
 117        """Create an organization Repository
 118
 119        Throws:
 120            AlreadyExistsException: If the Repository exists already.
 121            Exception: If something else went wrong.
 122        """
 123        result = self.allspice_client.requests_post(
 124            f"/orgs/{self.name}/repos",
 125            data={
 126                "name": repoName,
 127                "description": description,
 128                "private": private,
 129                "auto_init": autoInit,
 130                "gitignores": gitignores,
 131                "license": license,
 132                "issue_labels": issue_labels,
 133                "readme": readme,
 134                "default_branch": default_branch,
 135            },
 136        )
 137        if "id" in result:
 138            self.allspice_client.logger.info("Successfully created Repository %s " % result["name"])
 139        else:
 140            self.allspice_client.logger.error(result["message"])
 141            raise Exception("Repository not created... (gitea: %s)" % result["message"])
 142        return Repository.parse_response(self.allspice_client, result)
 143
 144    def get_repositories(self) -> List["Repository"]:
 145        results = self.allspice_client.requests_get_paginated(
 146            Organization.ORG_REPOS_REQUEST % self.username
 147        )
 148        return [Repository.parse_response(self.allspice_client, result) for result in results]
 149
 150    def get_repository(self, name) -> "Repository":
 151        repos = self.get_repositories()
 152        for repo in repos:
 153            if repo.name == name:
 154                return repo
 155        raise NotFoundException("Repository %s not existent in organization." % name)
 156
 157    def get_teams(self) -> List["Team"]:
 158        results = self.allspice_client.requests_get(Organization.ORG_TEAMS_REQUEST % self.username)
 159        teams = [Team.parse_response(self.allspice_client, result) for result in results]
 160        # organisation seems to be missing using this request, so we add org manually
 161        for t in teams:
 162            setattr(t, "_organization", self)
 163        return teams
 164
 165    def get_team(self, name) -> "Team":
 166        teams = self.get_teams()
 167        for team in teams:
 168            if team.name == name:
 169                return team
 170        raise NotFoundException("Team not existent in organization.")
 171
 172    def create_team(
 173        self,
 174        name: str,
 175        description: str = "",
 176        permission: str = "read",
 177        can_create_org_repo: bool = False,
 178        includes_all_repositories: bool = False,
 179        units=(
 180            "repo.code",
 181            "repo.issues",
 182            "repo.ext_issues",
 183            "repo.wiki",
 184            "repo.pulls",
 185            "repo.releases",
 186            "repo.ext_wiki",
 187        ),
 188        units_map={},
 189    ) -> "Team":
 190        """Alias for AllSpice#create_team"""
 191        # TODO: Move AllSpice#create_team to Organization#create_team and
 192        #       deprecate AllSpice#create_team.
 193        return self.allspice_client.create_team(
 194            org=self,
 195            name=name,
 196            description=description,
 197            permission=permission,
 198            can_create_org_repo=can_create_org_repo,
 199            includes_all_repositories=includes_all_repositories,
 200            units=units,
 201            units_map=units_map,
 202        )
 203
 204    def get_members(self) -> List["User"]:
 205        results = self.allspice_client.requests_get(Organization.ORG_GET_MEMBERS % self.username)
 206        return [User.parse_response(self.allspice_client, result) for result in results]
 207
 208    def is_member(self, username) -> bool:
 209        if isinstance(username, User):
 210            username = username.username
 211        try:
 212            # returns 204 if its ok, 404 if its not
 213            self.allspice_client.requests_get(
 214                Organization.ORG_IS_MEMBER % (self.username, username)
 215            )
 216            return True
 217        except Exception:
 218            return False
 219
 220    def remove_member(self, user: "User"):
 221        path = f"/orgs/{self.username}/members/{user.username}"
 222        self.allspice_client.requests_delete(path)
 223
 224    def delete(self):
 225        """Delete this Organization. Invalidates this Objects data. Also deletes all Repositories owned by the User"""
 226        for repo in self.get_repositories():
 227            repo.delete()
 228        self.allspice_client.requests_delete(Organization.API_OBJECT.format(name=self.username))
 229        self.deleted = True
 230
 231    def get_heatmap(self) -> List[Tuple[datetime, int]]:
 232        results = self.allspice_client.requests_get(User.USER_HEATMAP % self.username)
 233        results = [
 234            (datetime.fromtimestamp(result["timestamp"]), result["contributions"])
 235            for result in results
 236        ]
 237        return results
 238
 239
 240class User(ApiObject):
 241    active: bool
 242    admin: Any
 243    allow_create_organization: Any
 244    allow_git_hook: Any
 245    allow_import_local: Any
 246    avatar_url: str
 247    created: str
 248    description: str
 249    email: str
 250    emails: List[Any]
 251    followers_count: int
 252    following_count: int
 253    full_name: str
 254    html_url: str
 255    id: int
 256    is_admin: bool
 257    language: str
 258    last_login: str
 259    location: str
 260    login: str
 261    login_name: str
 262    max_repo_creation: Any
 263    must_change_password: Any
 264    password: Any
 265    prohibit_login: bool
 266    restricted: bool
 267    source_id: int
 268    starred_repos_count: int
 269    username: str
 270    visibility: str
 271    website: str
 272
 273    API_OBJECT = """/users/{name}"""  # <org>
 274    USER_MAIL = """/user/emails?sudo=%s"""  # <name>
 275    USER_PATCH = """/admin/users/%s"""  # <username>
 276    ADMIN_DELETE_USER = """/admin/users/%s"""  # <username>
 277    ADMIN_EDIT_USER = """/admin/users/{username}"""  # <username>
 278    USER_HEATMAP = """/users/%s/heatmap"""  # <username>
 279
 280    def __init__(self, allspice_client):
 281        super().__init__(allspice_client)
 282        self._emails = []
 283
 284    def __eq__(self, other):
 285        if not isinstance(other, User):
 286            return False
 287        return self.allspice_client == other.allspice_client and self.id == other.id
 288
 289    def __hash__(self):
 290        return hash(self.allspice_client) ^ hash(self.id)
 291
 292    @property
 293    def emails(self):
 294        self.__request_emails()
 295        return self._emails
 296
 297    @classmethod
 298    def request(cls, allspice_client, name: str) -> "User":
 299        api_object = cls._request(allspice_client, {"name": name})
 300        return api_object
 301
 302    _patchable_fields: ClassVar[set[str]] = {
 303        "active",
 304        "admin",
 305        "allow_create_organization",
 306        "allow_git_hook",
 307        "allow_import_local",
 308        "email",
 309        "full_name",
 310        "location",
 311        "login_name",
 312        "max_repo_creation",
 313        "must_change_password",
 314        "password",
 315        "prohibit_login",
 316        "website",
 317    }
 318
 319    def commit(self, login_name: str, source_id: int = 0):
 320        """
 321        Unfortunately it is necessary to require the login name
 322        as well as the login source (that is not supplied when getting a user) for
 323        changing a user.
 324        Usually source_id is 0 and the login_name is equal to the username.
 325        """
 326        values = self.get_dirty_fields()
 327        values.update(
 328            # api-doc says that the "source_id" is necessary; works without though
 329            {"login_name": login_name, "source_id": source_id}
 330        )
 331        args = {"username": self.username}
 332        self.allspice_client.requests_patch(User.ADMIN_EDIT_USER.format(**args), data=values)
 333        self._dirty_fields = {}
 334
 335    def create_repo(
 336        self,
 337        repoName: str,
 338        description: str = "",
 339        private: bool = False,
 340        autoInit=True,
 341        gitignores: Optional[str] = None,
 342        license: Optional[str] = None,
 343        readme: str = "Default",
 344        issue_labels: Optional[str] = None,
 345        default_branch="master",
 346    ):
 347        """Create a user Repository
 348
 349        Throws:
 350            AlreadyExistsException: If the Repository exists already.
 351            Exception: If something else went wrong.
 352        """
 353        result = self.allspice_client.requests_post(
 354            "/user/repos",
 355            data={
 356                "name": repoName,
 357                "description": description,
 358                "private": private,
 359                "auto_init": autoInit,
 360                "gitignores": gitignores,
 361                "license": license,
 362                "issue_labels": issue_labels,
 363                "readme": readme,
 364                "default_branch": default_branch,
 365            },
 366        )
 367        if "id" in result:
 368            self.allspice_client.logger.info("Successfully created Repository %s " % result["name"])
 369        else:
 370            self.allspice_client.logger.error(result["message"])
 371            raise Exception("Repository not created... (gitea: %s)" % result["message"])
 372        return Repository.parse_response(self.allspice_client, result)
 373
 374    def get_repositories(self) -> List["Repository"]:
 375        """Get all Repositories owned by this User."""
 376        url = f"/users/{self.username}/repos"
 377        results = self.allspice_client.requests_get_paginated(url)
 378        return [Repository.parse_response(self.allspice_client, result) for result in results]
 379
 380    def get_orgs(self) -> List[Organization]:
 381        """Get all Organizations this user is a member of."""
 382        url = f"/users/{self.username}/orgs"
 383        results = self.allspice_client.requests_get_paginated(url)
 384        return [Organization.parse_response(self.allspice_client, result) for result in results]
 385
 386    def get_teams(self) -> List["Team"]:
 387        url = "/user/teams"
 388        results = self.allspice_client.requests_get_paginated(url, sudo=self)
 389        return [Team.parse_response(self.allspice_client, result) for result in results]
 390
 391    def get_accessible_repos(self) -> List["Repository"]:
 392        """Get all Repositories accessible by the logged in User."""
 393        results = self.allspice_client.requests_get("/user/repos", sudo=self)
 394        return [Repository.parse_response(self.allspice_client, result) for result in results]
 395
 396    def __request_emails(self):
 397        result = self.allspice_client.requests_get(User.USER_MAIL % self.login)
 398        # report if the adress changed by this
 399        for mail in result:
 400            self._emails.append(mail["email"])
 401            if mail["primary"]:
 402                self._email = mail["email"]
 403
 404    def delete(self):
 405        """Deletes this User. Also deletes all Repositories he owns."""
 406        self.allspice_client.requests_delete(User.ADMIN_DELETE_USER % self.username)
 407        self.deleted = True
 408
 409    def get_heatmap(self) -> List[Tuple[datetime, int]]:
 410        results = self.allspice_client.requests_get(User.USER_HEATMAP % self.username)
 411        results = [
 412            (datetime.fromtimestamp(result["timestamp"]), result["contributions"])
 413            for result in results
 414        ]
 415        return results
 416
 417
 418class Branch(ReadonlyApiObject):
 419    commit: Dict[str, Optional[Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]]]
 420    effective_branch_protection_name: str
 421    enable_status_check: bool
 422    name: str
 423    protected: bool
 424    required_approvals: int
 425    status_check_contexts: List[Any]
 426    user_can_merge: bool
 427    user_can_push: bool
 428
 429    API_OBJECT = """/repos/{owner}/{repo}/branches/{branch}"""
 430
 431    def __init__(self, allspice_client):
 432        super().__init__(allspice_client)
 433
 434    def __eq__(self, other):
 435        if not isinstance(other, Branch):
 436            return False
 437        return self.commit == other.commit and self.name == other.name
 438
 439    def __hash__(self):
 440        return hash(self.commit["id"]) ^ hash(self.name)
 441
 442    _fields_to_parsers: ClassVar[dict] = {
 443        # This is not a commit object
 444        # "commit": lambda allspice_client, c: Commit.parse_response(allspice_client, c)
 445    }
 446
 447    @classmethod
 448    def request(cls, allspice_client, owner: str, repo: str, branch: str):
 449        return cls._request(allspice_client, {"owner": owner, "repo": repo, "branch": branch})
 450
 451
 452class GitEntry(ReadonlyApiObject):
 453    """
 454    An object representing a file or directory in the Git tree.
 455    """
 456
 457    mode: str
 458    path: str
 459    sha: str
 460    size: int
 461    type: str
 462    url: str
 463
 464    def __init__(self, allspice_client):
 465        super().__init__(allspice_client)
 466
 467    def __eq__(self, other) -> bool:
 468        if not isinstance(other, GitEntry):
 469            return False
 470        return self.sha == other.sha
 471
 472    def __hash__(self) -> int:
 473        return hash(self.sha)
 474
 475
 476class Repository(ApiObject):
 477    allow_fast_forward_only_merge: bool
 478    allow_manual_merge: Any
 479    allow_merge_commits: bool
 480    allow_rebase: bool
 481    allow_rebase_explicit: bool
 482    allow_rebase_update: bool
 483    allow_squash_merge: bool
 484    archived: bool
 485    archived_at: str
 486    autodetect_manual_merge: Any
 487    avatar_url: str
 488    clone_url: str
 489    created_at: str
 490    default_allow_maintainer_edit: bool
 491    default_branch: str
 492    default_delete_branch_after_merge: bool
 493    default_merge_style: str
 494    description: str
 495    empty: bool
 496    enable_prune: Any
 497    external_tracker: Any
 498    external_wiki: Any
 499    fork: bool
 500    forks_count: int
 501    full_name: str
 502    has_actions: bool
 503    has_issues: bool
 504    has_packages: bool
 505    has_projects: bool
 506    has_pull_requests: bool
 507    has_releases: bool
 508    has_wiki: bool
 509    html_url: str
 510    id: int
 511    ignore_whitespace_conflicts: bool
 512    internal: bool
 513    internal_tracker: Dict[str, bool]
 514    language: str
 515    languages_url: str
 516    link: str
 517    mirror: bool
 518    mirror_interval: str
 519    mirror_updated: str
 520    name: str
 521    object_format_name: str
 522    open_issues_count: int
 523    open_pr_counter: int
 524    original_url: str
 525    owner: Union["User", "Organization"]
 526    parent: Any
 527    permissions: Dict[str, bool]
 528    private: bool
 529    projects_mode: str
 530    release_counter: int
 531    repo_transfer: Any
 532    size: int
 533    ssh_url: str
 534    stars_count: int
 535    template: bool
 536    updated_at: datetime
 537    url: str
 538    watchers_count: int
 539    website: str
 540
 541    API_OBJECT = """/repos/{owner}/{name}"""  # <owner>, <reponame>
 542    REPO_IS_COLLABORATOR = """/repos/%s/%s/collaborators/%s"""  # <owner>, <reponame>, <username>
 543    REPO_SEARCH = """/repos/search/"""
 544    REPO_BRANCHES = """/repos/%s/%s/branches"""  # <owner>, <reponame>
 545    REPO_BRANCH = """/repos/{owner}/{repo}/branches/{branch}"""
 546    REPO_ISSUES = """/repos/{owner}/{repo}/issues"""  # <owner, reponame>
 547    REPO_DESIGN_REVIEWS = """/repos/{owner}/{repo}/pulls"""
 548    REPO_DELETE = """/repos/%s/%s"""  # <owner>, <reponame>
 549    REPO_TIMES = """/repos/%s/%s/times"""  # <owner>, <reponame>
 550    REPO_USER_TIME = """/repos/%s/%s/times/%s"""  # <owner>, <reponame>, <username>
 551    REPO_COMMITS = "/repos/%s/%s/commits"  # <owner>, <reponame>
 552    REPO_TRANSFER = "/repos/{owner}/{repo}/transfer"
 553    REPO_MILESTONES = """/repos/{owner}/{repo}/milestones"""
 554    REPO_GET_ARCHIVE = "/repos/{owner}/{repo}/archive/{ref}.{format}"
 555    REPO_GET_ALLSPICE_JSON = "/repos/{owner}/{repo}/allspice_generated/json/{content}"
 556    REPO_GET_ALLSPICE_SVG = "/repos/{owner}/{repo}/allspice_generated/svg/{content}"
 557    REPO_GET_TOPICS = "/repos/{owner}/{repo}/topics"
 558    REPO_ADD_TOPIC = "/repos/{owner}/{repo}/topics/{topic}"
 559    REPO_GET_RELEASES = "/repos/{owner}/{repo}/releases"
 560    REPO_GET_LATEST_RELEASE = "/repos/{owner}/{repo}/releases/latest"
 561    REPO_GET_RELEASE_BY_TAG = "/repos/{owner}/{repo}/releases/tags/{tag}"
 562    REPO_GET_COMMIT_STATUS = "/repos/{owner}/{repo}/statuses/{sha}"
 563    REPO_GET_RAW_FILE = "/repos/{owner}/{repo}/raw/{path}"
 564    REPO_GET_TREE = "/repos/{owner}/{repo}/git/trees/{ref}"
 565
 566    class ArchiveFormat(Enum):
 567        """
 568        Archive formats for Repository.get_archive
 569        """
 570
 571        TAR = "tar.gz"
 572        ZIP = "zip"
 573
 574    class CommitStatusSort(Enum):
 575        """
 576        Sort order for Repository.get_commit_status
 577        """
 578
 579        OLDEST = "oldest"
 580        RECENT_UPDATE = "recentupdate"
 581        LEAST_UPDATE = "leastupdate"
 582        LEAST_INDEX = "leastindex"
 583        HIGHEST_INDEX = "highestindex"
 584
 585    def __init__(self, allspice_client):
 586        super().__init__(allspice_client)
 587
 588    def __eq__(self, other):
 589        if not isinstance(other, Repository):
 590            return False
 591        return self.owner == other.owner and self.name == other.name
 592
 593    def __hash__(self):
 594        return hash(self.owner) ^ hash(self.name)
 595
 596    _fields_to_parsers: ClassVar[dict] = {
 597        # dont know how to tell apart user and org as owner except form email being empty.
 598        "owner": lambda allspice_client, r: (
 599            Organization.parse_response(allspice_client, r)
 600            if r["email"] == ""
 601            else User.parse_response(allspice_client, r)
 602        ),
 603        "updated_at": lambda _, t: Util.convert_time(t),
 604    }
 605
 606    @classmethod
 607    def request(
 608        cls,
 609        allspice_client,
 610        owner: str,
 611        name: str,
 612    ) -> Repository:
 613        return cls._request(allspice_client, {"owner": owner, "name": name})
 614
 615    @classmethod
 616    def search(
 617        cls,
 618        allspice_client,
 619        query: Optional[str] = None,
 620        topic: bool = False,
 621        include_description: bool = False,
 622        user: Optional[User] = None,
 623        owner_to_prioritize: Union[User, Organization, None] = None,
 624    ) -> list[Repository]:
 625        """
 626        Search for repositories.
 627
 628        See https://hub.allspice.io/api/swagger#/repository/repoSearch
 629
 630        :param query: The query string to search for
 631        :param topic: If true, the query string will only be matched against the
 632            repository's topic.
 633        :param include_description: If true, the query string will be matched
 634            against the repository's description as well.
 635        :param user: If specified, only repositories that this user owns or
 636            contributes to will be searched.
 637        :param owner_to_prioritize: If specified, repositories owned by the
 638            given entity will be prioritized in the search.
 639        :returns: All repositories matching the query. If there are many
 640            repositories matching this query, this may take some time.
 641        """
 642
 643        params = {}
 644
 645        if query is not None:
 646            params["q"] = query
 647        if topic:
 648            params["topic"] = topic
 649        if include_description:
 650            params["include_description"] = include_description
 651        if user is not None:
 652            params["user"] = user.id
 653        if owner_to_prioritize is not None:
 654            params["owner_to_prioritize"] = owner_to_prioritize.id
 655
 656        responses = allspice_client.requests_get_paginated(cls.REPO_SEARCH, params=params)
 657
 658        return [Repository.parse_response(allspice_client, response) for response in responses]
 659
 660    _patchable_fields: ClassVar[set[str]] = {
 661        "allow_manual_merge",
 662        "allow_merge_commits",
 663        "allow_rebase",
 664        "allow_rebase_explicit",
 665        "allow_rebase_update",
 666        "allow_squash_merge",
 667        "archived",
 668        "autodetect_manual_merge",
 669        "default_branch",
 670        "default_delete_branch_after_merge",
 671        "default_merge_style",
 672        "description",
 673        "enable_prune",
 674        "external_tracker",
 675        "external_wiki",
 676        "has_issues",
 677        "has_projects",
 678        "has_pull_requests",
 679        "has_wiki",
 680        "ignore_whitespace_conflicts",
 681        "internal_tracker",
 682        "mirror_interval",
 683        "name",
 684        "private",
 685        "template",
 686        "website",
 687    }
 688
 689    def commit(self):
 690        args = {"owner": self.owner.username, "name": self.name}
 691        self._commit(args)
 692
 693    def get_branches(self) -> List["Branch"]:
 694        """Get all the Branches of this Repository."""
 695
 696        results = self.allspice_client.requests_get_paginated(
 697            Repository.REPO_BRANCHES % (self.owner.username, self.name)
 698        )
 699        return [Branch.parse_response(self.allspice_client, result) for result in results]
 700
 701    def get_branch(self, name: str) -> "Branch":
 702        """Get a specific Branch of this Repository."""
 703        result = self.allspice_client.requests_get(
 704            Repository.REPO_BRANCH.format(owner=self.owner.username, repo=self.name, branch=name)
 705        )
 706        return Branch.parse_response(self.allspice_client, result)
 707
 708    def add_branch(self, create_from: Ref, newname: str) -> "Branch":
 709        """Add a branch to the repository"""
 710        # Note: will only work with gitea 1.13 or higher!
 711
 712        ref_name = Util.data_params_for_ref(create_from)
 713        if "ref" not in ref_name:
 714            raise ValueError("create_from must be a Branch, Commit or string")
 715        ref_name = ref_name["ref"]
 716
 717        data = {"new_branch_name": newname, "old_ref_name": ref_name}
 718        result = self.allspice_client.requests_post(
 719            Repository.REPO_BRANCHES % (self.owner.username, self.name), data=data
 720        )
 721        return Branch.parse_response(self.allspice_client, result)
 722
 723    def get_issues(
 724        self,
 725        state: Literal["open", "closed", "all"] = "all",
 726        search_query: Optional[str] = None,
 727        labels: Optional[List[str]] = None,
 728        milestones: Optional[List[Union[Milestone, str]]] = None,
 729        assignee: Optional[Union[User, str]] = None,
 730        since: Optional[datetime] = None,
 731        before: Optional[datetime] = None,
 732    ) -> List["Issue"]:
 733        """
 734        Get all Issues of this Repository (open and closed)
 735
 736        https://hub.allspice.io/api/swagger#/repository/repoListIssues
 737
 738        All params of this method are optional filters. If you don't specify a filter, it
 739        will not be applied.
 740
 741        :param state: The state of the Issues to get. If None, all Issues are returned.
 742        :param search_query: Filter issues by text. This is equivalent to searching for
 743                             `search_query` in the Issues on the web interface.
 744        :param labels: Filter issues by labels.
 745        :param milestones: Filter issues by milestones.
 746        :param assignee: Filter issues by the assigned user.
 747        :param since: Filter issues by the date they were created.
 748        :param before: Filter issues by the date they were created.
 749        :return: A list of Issues.
 750        """
 751
 752        data = {
 753            "state": state,
 754        }
 755        if search_query:
 756            data["q"] = search_query
 757        if labels:
 758            data["labels"] = ",".join(labels)
 759        if milestones:
 760            data["milestone"] = ",".join(
 761                [
 762                    milestone.name if isinstance(milestone, Milestone) else milestone
 763                    for milestone in milestones
 764                ]
 765            )
 766        if assignee:
 767            if isinstance(assignee, User):
 768                data["assignee"] = assignee.username
 769            else:
 770                data["assignee"] = assignee
 771        if since:
 772            data["since"] = Util.format_time(since)
 773        if before:
 774            data["before"] = Util.format_time(before)
 775
 776        results = self.allspice_client.requests_get_paginated(
 777            Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name),
 778            params=data,
 779        )
 780
 781        issues = []
 782        for result in results:
 783            issue = Issue.parse_response(self.allspice_client, result)
 784            # See Issue.request
 785            setattr(issue, "_repository", self)
 786            # This is mostly for compatibility with an older implementation
 787            Issue._add_read_property("repo", self, issue)
 788            issues.append(issue)
 789
 790        return issues
 791
 792    def get_design_reviews(
 793        self,
 794        state: Literal["open", "closed", "all"] = "all",
 795        milestone: Optional[Union[Milestone, str]] = None,
 796        labels: Optional[List[str]] = None,
 797    ) -> List["DesignReview"]:
 798        """
 799        Get all Design Reviews of this Repository.
 800
 801        https://hub.allspice.io/api/swagger#/repository/repoListPullRequests
 802
 803        :param state: The state of the Design Reviews to get. If None, all Design Reviews
 804                      are returned.
 805        :param milestone: The milestone of the Design Reviews to get.
 806        :param labels: A list of label IDs to filter DRs by.
 807        :return: A list of Design Reviews.
 808        """
 809
 810        params = {
 811            "state": state,
 812        }
 813        if milestone:
 814            if isinstance(milestone, Milestone):
 815                params["milestone"] = milestone.name
 816            else:
 817                params["milestone"] = milestone
 818        if labels:
 819            params["labels"] = ",".join(labels)
 820
 821        results = self.allspice_client.requests_get_paginated(
 822            self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name),
 823            params=params,
 824        )
 825        return [DesignReview.parse_response(self.allspice_client, result) for result in results]
 826
 827    def get_commits(
 828        self,
 829        sha: Optional[str] = None,
 830        path: Optional[str] = None,
 831        stat: bool = True,
 832    ) -> List["Commit"]:
 833        """
 834        Get all the Commits of this Repository.
 835
 836        https://hub.allspice.io/api/swagger#/repository/repoGetAllCommits
 837
 838        :param sha: The SHA of the commit to start listing commits from.
 839        :param path: filepath of a file/dir.
 840        :param stat: Include the number of additions and deletions in the response.
 841                     Disable for speedup.
 842        :return: A list of Commits.
 843        """
 844
 845        data = {}
 846        if sha:
 847            data["sha"] = sha
 848        if path:
 849            data["path"] = path
 850        if not stat:
 851            data["stat"] = False
 852
 853        try:
 854            results = self.allspice_client.requests_get_paginated(
 855                Repository.REPO_COMMITS % (self.owner.username, self.name),
 856                params=data,
 857            )
 858        except ConflictException as err:
 859            logging.warning(err)
 860            logging.warning("Repository %s/%s is Empty" % (self.owner.username, self.name))
 861            results = []
 862        return [Commit.parse_response(self.allspice_client, result) for result in results]
 863
 864    def get_issues_state(self, state) -> List["Issue"]:
 865        """
 866        DEPRECATED: Use get_issues() instead.
 867
 868        Get issues of state Issue.open or Issue.closed of a repository.
 869        """
 870
 871        assert state in [Issue.OPENED, Issue.CLOSED]
 872        issues = []
 873        data = {"state": state}
 874        results = self.allspice_client.requests_get_paginated(
 875            Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name),
 876            params=data,
 877        )
 878        for result in results:
 879            issue = Issue.parse_response(self.allspice_client, result)
 880            # adding data not contained in the issue response
 881            # See Issue.request()
 882            setattr(issue, "_repository", self)
 883            Issue._add_read_property("repo", self, issue)
 884            Issue._add_read_property("owner", self.owner, issue)
 885            issues.append(issue)
 886        return issues
 887
 888    def get_times(self):
 889        results = self.allspice_client.requests_get(
 890            Repository.REPO_TIMES % (self.owner.username, self.name)
 891        )
 892        return results
 893
 894    def get_user_time(self, username) -> float:
 895        if isinstance(username, User):
 896            username = username.username
 897        results = self.allspice_client.requests_get(
 898            Repository.REPO_USER_TIME % (self.owner.username, self.name, username)
 899        )
 900        time = sum(r["time"] for r in results)
 901        return time
 902
 903    def get_full_name(self) -> str:
 904        return self.owner.username + "/" + self.name
 905
 906    def create_issue(self, title, assignees=frozenset(), description="") -> ApiObject:
 907        data = {
 908            "assignees": assignees,
 909            "body": description,
 910            "closed": False,
 911            "title": title,
 912        }
 913        result = self.allspice_client.requests_post(
 914            Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name),
 915            data=data,
 916        )
 917
 918        issue = Issue.parse_response(self.allspice_client, result)
 919        setattr(issue, "_repository", self)
 920        Issue._add_read_property("repo", self, issue)
 921        return issue
 922
 923    def create_design_review(
 924        self,
 925        title: str,
 926        head: Union[Branch, str],
 927        base: Union[Branch, str],
 928        assignees: Optional[Set[Union[User, str]]] = None,
 929        body: Optional[str] = None,
 930        due_date: Optional[datetime] = None,
 931        milestone: Optional["Milestone"] = None,
 932    ) -> "DesignReview":
 933        """
 934        Create a new Design Review.
 935
 936        See https://hub.allspice.io/api/swagger#/repository/repoCreatePullRequest
 937
 938        :param title: Title of the Design Review
 939        :param head: Branch or name of the branch to merge into the base branch
 940        :param base: Branch or name of the branch to merge into
 941        :param assignees: Optional. A list of users to assign this review. List can be of
 942                          User objects or of usernames.
 943        :param body: An Optional Description for the Design Review.
 944        :param due_date: An Optional Due date for the Design Review.
 945        :param milestone: An Optional Milestone for the Design Review
 946        :return: The created Design Review
 947        """
 948
 949        data: dict[str, Any] = {
 950            "title": title,
 951        }
 952
 953        if isinstance(head, Branch):
 954            data["head"] = head.name
 955        else:
 956            data["head"] = head
 957        if isinstance(base, Branch):
 958            data["base"] = base.name
 959        else:
 960            data["base"] = base
 961        if assignees:
 962            data["assignees"] = [a.username if isinstance(a, User) else a for a in assignees]
 963        if body:
 964            data["body"] = body
 965        if due_date:
 966            data["due_date"] = Util.format_time(due_date)
 967        if milestone:
 968            data["milestone"] = milestone.id
 969
 970        result = self.allspice_client.requests_post(
 971            self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name),
 972            data=data,
 973        )
 974
 975        return DesignReview.parse_response(self.allspice_client, result)
 976
 977    def create_milestone(
 978        self,
 979        title: str,
 980        description: str,
 981        due_date: Optional[str] = None,
 982        state: str = "open",
 983    ) -> "Milestone":
 984        url = Repository.REPO_MILESTONES.format(owner=self.owner.username, repo=self.name)
 985        data = {"title": title, "description": description, "state": state}
 986        if due_date:
 987            data["due_date"] = due_date
 988        result = self.allspice_client.requests_post(url, data=data)
 989        return Milestone.parse_response(self.allspice_client, result)
 990
 991    def create_gitea_hook(self, hook_url: str, events: List[str]):
 992        url = f"/repos/{self.owner.username}/{self.name}/hooks"
 993        data = {
 994            "type": "gitea",
 995            "config": {"content_type": "json", "url": hook_url},
 996            "events": events,
 997            "active": True,
 998        }
 999        return self.allspice_client.requests_post(url, data=data)
1000
1001    def list_hooks(self):
1002        url = f"/repos/{self.owner.username}/{self.name}/hooks"
1003        return self.allspice_client.requests_get(url)
1004
1005    def delete_hook(self, id: str):
1006        url = f"/repos/{self.owner.username}/{self.name}/hooks/{id}"
1007        self.allspice_client.requests_delete(url)
1008
1009    def is_collaborator(self, username) -> bool:
1010        if isinstance(username, User):
1011            username = username.username
1012        try:
1013            # returns 204 if its ok, 404 if its not
1014            self.allspice_client.requests_get(
1015                Repository.REPO_IS_COLLABORATOR % (self.owner.username, self.name, username)
1016            )
1017            return True
1018        except Exception:
1019            return False
1020
1021    def get_users_with_access(self) -> Sequence[User]:
1022        url = f"/repos/{self.owner.username}/{self.name}/collaborators"
1023        response = self.allspice_client.requests_get(url)
1024        collabs = [User.parse_response(self.allspice_client, user) for user in response]
1025        if isinstance(self.owner, User):
1026            return [*collabs, self.owner]
1027        else:
1028            # owner must be org
1029            teams = self.owner.get_teams()
1030            for team in teams:
1031                team_repos = team.get_repos()
1032                if self.name in [n.name for n in team_repos]:
1033                    collabs += team.get_members()
1034            return collabs
1035
1036    def remove_collaborator(self, user_name: str):
1037        url = f"/repos/{self.owner.username}/{self.name}/collaborators/{user_name}"
1038        self.allspice_client.requests_delete(url)
1039
1040    def transfer_ownership(
1041        self,
1042        new_owner: Union[User, Organization],
1043        new_teams: Set[Team] | FrozenSet[Team] = frozenset(),
1044    ):
1045        url = Repository.REPO_TRANSFER.format(owner=self.owner.username, repo=self.name)
1046        data: dict[str, Any] = {"new_owner": new_owner.username}
1047        if isinstance(new_owner, Organization):
1048            new_team_ids = [team.id for team in new_teams if team in new_owner.get_teams()]
1049            data["team_ids"] = new_team_ids
1050        self.allspice_client.requests_post(url, data=data)
1051        # TODO: make sure this instance is either updated or discarded
1052
1053    def get_git_content(
1054        self,
1055        ref: Optional["Ref"] = None,
1056        commit: "Optional[Commit]" = None,
1057    ) -> List[Content]:
1058        """
1059        Get the metadata for all files in the root directory.
1060
1061        https://hub.allspice.io/api/swagger#/repository/repoGetContentsList
1062
1063        :param ref: branch or commit to get content from
1064        :param commit: commit to get content from (deprecated)
1065        """
1066        url = f"/repos/{self.owner.username}/{self.name}/contents"
1067        data = Util.data_params_for_ref(ref or commit)
1068
1069        result = [
1070            Content.parse_response(self.allspice_client, f)
1071            for f in self.allspice_client.requests_get(url, data)
1072        ]
1073        return result
1074
1075    def get_tree(self, ref: Optional[Ref] = None, recursive: bool = False) -> List[GitEntry]:
1076        """
1077        Get the repository's tree on a given ref.
1078
1079        By default, this will only return the top-level entries in the tree. If you want
1080        to get the entire tree, set `recursive` to True.
1081
1082        :param ref: The ref to get the tree from. If not provided, the default branch is used.
1083        :param recursive: Whether to get the entire tree or just the top-level entries.
1084        """
1085
1086        ref = Util.data_params_for_ref(ref).get("ref", self.default_branch)
1087        url = self.REPO_GET_TREE.format(owner=self.owner.username, repo=self.name, ref=ref)
1088        params = {"recursive": recursive}
1089        results = self.allspice_client.requests_get_paginated(url, params=params)
1090        return [GitEntry.parse_response(self.allspice_client, result) for result in results]
1091
1092    def get_file_content(
1093        self,
1094        content: Content,
1095        ref: Optional[Ref] = None,
1096        commit: Optional[Commit] = None,
1097    ) -> Union[str, List["Content"]]:
1098        """https://hub.allspice.io/api/swagger#/repository/repoGetContents"""
1099        url = f"/repos/{self.owner.username}/{self.name}/contents/{content.path}"
1100        data = Util.data_params_for_ref(ref or commit)
1101
1102        if content.type == Content.FILE:
1103            return self.allspice_client.requests_get(url, data)["content"]
1104        else:
1105            return [
1106                Content.parse_response(self.allspice_client, f)
1107                for f in self.allspice_client.requests_get(url, data)
1108            ]
1109
1110    def get_raw_file(
1111        self,
1112        file_path: str,
1113        ref: Optional[Ref] = None,
1114    ) -> bytes:
1115        """
1116        Get the raw, binary data of a single file.
1117
1118        Note 1: if the file you are requesting is a text file, you might want to
1119        use .decode() on the result to get a string. For example:
1120
1121            content = repo.get_raw_file("file.txt").decode("utf-8")
1122
1123        Note 2: this method will store the entire file in memory. If you want
1124        to download a large file, you might want to use `download_to_file`
1125        instead.
1126
1127        See https://hub.allspice.io/api/swagger#/repository/repoGetRawFile
1128
1129        :param file_path: The path to the file to get.
1130        :param ref: The branch or commit to get the file from.  If not provided,
1131            the default branch is used.
1132        """
1133
1134        url = self.REPO_GET_RAW_FILE.format(
1135            owner=self.owner.username,
1136            repo=self.name,
1137            path=file_path,
1138        )
1139        params = Util.data_params_for_ref(ref)
1140        return self.allspice_client.requests_get_raw(url, params=params)
1141
1142    def download_to_file(
1143        self,
1144        file_path: str,
1145        io: IO,
1146        ref: Optional[Ref] = None,
1147    ) -> None:
1148        """
1149        Download the binary data of a file to a file-like object.
1150
1151        Example:
1152
1153            with open("schematic.DSN", "wb") as f:
1154                Repository.download_to_file("Schematics/my_schematic.DSN", f)
1155
1156        :param file_path: The path to the file in the repository from the root
1157            of the repository.
1158        :param io: The file-like object to write the data to.
1159        """
1160
1161        url = self.allspice_client._AllSpice__get_url(
1162            self.REPO_GET_RAW_FILE.format(
1163                owner=self.owner.username,
1164                repo=self.name,
1165                path=file_path,
1166            )
1167        )
1168        params = Util.data_params_for_ref(ref)
1169        response = self.allspice_client.requests.get(
1170            url,
1171            params=params,
1172            headers=self.allspice_client.headers,
1173            stream=True,
1174        )
1175
1176        for chunk in response.iter_content(chunk_size=4096):
1177            if chunk:
1178                io.write(chunk)
1179
1180    def get_generated_json(self, content: Union[Content, str], ref: Optional[Ref] = None) -> dict:
1181        """
1182        Get the json blob for a cad file if it exists, otherwise enqueue
1183        a new job and return a 503 status.
1184
1185        WARNING: This is still experimental and not recommended for critical
1186        applications. The structure and content of the returned dictionary can
1187        change at any time.
1188
1189        See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON
1190        """
1191
1192        if isinstance(content, Content):
1193            content = content.path
1194
1195        url = self.REPO_GET_ALLSPICE_JSON.format(
1196            owner=self.owner.username,
1197            repo=self.name,
1198            content=content,
1199        )
1200        data = Util.data_params_for_ref(ref)
1201        return self.allspice_client.requests_get(url, data)
1202
1203    def get_generated_svg(self, content: Union[Content, str], ref: Optional[Ref] = None) -> bytes:
1204        """
1205        Get the svg blob for a cad file if it exists, otherwise enqueue
1206        a new job and return a 503 status.
1207
1208        WARNING: This is still experimental and not yet recommended for
1209        critical applications. The content of the returned svg can change
1210        at any time.
1211
1212        See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG
1213        """
1214
1215        if isinstance(content, Content):
1216            content = content.path
1217
1218        url = self.REPO_GET_ALLSPICE_SVG.format(
1219            owner=self.owner.username,
1220            repo=self.name,
1221            content=content,
1222        )
1223        data = Util.data_params_for_ref(ref)
1224        return self.allspice_client.requests_get_raw(url, data)
1225
1226    def create_file(self, file_path: str, content: str, data: Optional[dict] = None):
1227        """https://hub.allspice.io/api/swagger#/repository/repoCreateFile"""
1228        if not data:
1229            data = {}
1230        url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}"
1231        data.update({"content": content})
1232        return self.allspice_client.requests_post(url, data)
1233
1234    def change_file(self, file_path: str, file_sha: str, content: str, data: Optional[dict] = None):
1235        """https://hub.allspice.io/api/swagger#/repository/repoCreateFile"""
1236        if not data:
1237            data = {}
1238        url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}"
1239        data.update({"sha": file_sha, "content": content})
1240        return self.allspice_client.requests_put(url, data)
1241
1242    def delete_file(self, file_path: str, file_sha: str, data: Optional[dict] = None):
1243        """https://hub.allspice.io/api/swagger#/repository/repoDeleteFile"""
1244        if not data:
1245            data = {}
1246        url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}"
1247        data.update({"sha": file_sha})
1248        return self.allspice_client.requests_delete(url, data)
1249
1250    def get_archive(
1251        self,
1252        ref: Ref = "main",
1253        archive_format: ArchiveFormat = ArchiveFormat.ZIP,
1254    ) -> bytes:
1255        """
1256        Download all the files in a specific ref of a repository as a zip or tarball
1257        archive.
1258
1259        https://hub.allspice.io/api/swagger#/repository/repoGetArchive
1260
1261        :param ref: branch or commit to get content from, defaults to the "main" branch
1262        :param archive_format: zip or tar, defaults to zip
1263        """
1264
1265        ref_string = Util.data_params_for_ref(ref)["ref"]
1266        url = self.REPO_GET_ARCHIVE.format(
1267            owner=self.owner.username,
1268            repo=self.name,
1269            ref=ref_string,
1270            format=archive_format.value,
1271        )
1272        return self.allspice_client.requests_get_raw(url)
1273
1274    def get_topics(self) -> list[str]:
1275        """
1276        Gets the list of topics on this repository.
1277
1278        See http://localhost:3000/api/swagger#/repository/repoListTopics
1279        """
1280
1281        url = self.REPO_GET_TOPICS.format(
1282            owner=self.owner.username,
1283            repo=self.name,
1284        )
1285        return self.allspice_client.requests_get(url)["topics"]
1286
1287    def add_topic(self, topic: str):
1288        """
1289        Adds a topic to the repository.
1290
1291        See https://hub.allspice.io/api/swagger#/repository/repoAddTopic
1292
1293        :param topic: The topic to add. Topic names must consist only of
1294            lowercase letters, numnbers and dashes (-), and cannot start with
1295            dashes. Topic names also must be under 35 characters long.
1296        """
1297
1298        url = self.REPO_ADD_TOPIC.format(owner=self.owner.username, repo=self.name, topic=topic)
1299        self.allspice_client.requests_put(url)
1300
1301    def create_release(
1302        self,
1303        tag_name: str,
1304        name: Optional[str] = None,
1305        body: Optional[str] = None,
1306        draft: bool = False,
1307    ):
1308        """
1309        Create a release for this repository. The release will be created for
1310        the tag with the given name. If there is no tag with this name, create
1311        the tag first.
1312
1313        See https://hub.allspice.io/api/swagger#/repository/repoCreateRelease
1314        """
1315
1316        url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name)
1317        data = {
1318            "tag_name": tag_name,
1319            "draft": draft,
1320        }
1321        if name is not None:
1322            data["name"] = name
1323        if body is not None:
1324            data["body"] = body
1325        response = self.allspice_client.requests_post(url, data)
1326        return Release.parse_response(self.allspice_client, response, self)
1327
1328    def get_releases(
1329        self, draft: Optional[bool] = None, pre_release: Optional[bool] = None
1330    ) -> List[Release]:
1331        """
1332        Get the list of releases for this repository.
1333
1334        See https://hub.allspice.io/api/swagger#/repository/repoListReleases
1335        """
1336
1337        data = {}
1338
1339        if draft is not None:
1340            data["draft"] = draft
1341        if pre_release is not None:
1342            data["pre-release"] = pre_release
1343
1344        url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name)
1345        responses = self.allspice_client.requests_get_paginated(url, params=data)
1346
1347        return [
1348            Release.parse_response(self.allspice_client, response, self) for response in responses
1349        ]
1350
1351    def get_latest_release(self) -> Release:
1352        """
1353        Get the latest release for this repository.
1354
1355        See https://hub.allspice.io/api/swagger#/repository/repoGetLatestRelease
1356        """
1357
1358        url = self.REPO_GET_LATEST_RELEASE.format(owner=self.owner.username, repo=self.name)
1359        response = self.allspice_client.requests_get(url)
1360        release = Release.parse_response(self.allspice_client, response, self)
1361        return release
1362
1363    def get_release_by_tag(self, tag: str) -> Release:
1364        """
1365        Get a release by its tag.
1366
1367        See https://hub.allspice.io/api/swagger#/repository/repoGetReleaseByTag
1368        """
1369
1370        url = self.REPO_GET_RELEASE_BY_TAG.format(
1371            owner=self.owner.username, repo=self.name, tag=tag
1372        )
1373        response = self.allspice_client.requests_get(url)
1374        release = Release.parse_response(self.allspice_client, response, self)
1375        return release
1376
1377    def get_commit_statuses(
1378        self,
1379        commit: Union[str, Commit],
1380        sort: Optional[CommitStatusSort] = None,
1381        state: Optional[CommitStatusState] = None,
1382    ) -> List[CommitStatus]:
1383        """
1384        Get a list of statuses for a commit.
1385
1386        This is roughly equivalent to the Commit.get_statuses method, but this
1387        method allows you to sort and filter commits and is more convenient if
1388        you have a commit SHA and don't need to get the commit itself.
1389
1390        See https://hub.allspice.io/api/swagger#/repository/repoListStatuses
1391        """
1392
1393        if isinstance(commit, Commit):
1394            commit = commit.sha
1395
1396        params = {}
1397        if sort is not None:
1398            params["sort"] = sort.value
1399        if state is not None:
1400            params["state"] = state.value
1401
1402        url = self.REPO_GET_COMMIT_STATUS.format(
1403            owner=self.owner.username, repo=self.name, sha=commit
1404        )
1405        response = self.allspice_client.requests_get_paginated(url, params=params)
1406        return [CommitStatus.parse_response(self.allspice_client, status) for status in response]
1407
1408    def create_commit_status(
1409        self,
1410        commit: Union[str, Commit],
1411        context: Optional[str] = None,
1412        description: Optional[str] = None,
1413        state: Optional[CommitStatusState] = None,
1414        target_url: Optional[str] = None,
1415    ) -> CommitStatus:
1416        """
1417        Create a status on a commit.
1418
1419        See https://hub.allspice.io/api/swagger#/repository/repoCreateStatus
1420        """
1421
1422        if isinstance(commit, Commit):
1423            commit = commit.sha
1424
1425        data = {}
1426        if context is not None:
1427            data["context"] = context
1428        if description is not None:
1429            data["description"] = description
1430        if state is not None:
1431            data["state"] = state.value
1432        if target_url is not None:
1433            data["target_url"] = target_url
1434
1435        url = self.REPO_GET_COMMIT_STATUS.format(
1436            owner=self.owner.username, repo=self.name, sha=commit
1437        )
1438        response = self.allspice_client.requests_post(url, data=data)
1439        return CommitStatus.parse_response(self.allspice_client, response)
1440
1441    def delete(self):
1442        self.allspice_client.requests_delete(
1443            Repository.REPO_DELETE % (self.owner.username, self.name)
1444        )
1445        self.deleted = True
1446
1447
1448class Milestone(ApiObject):
1449    allow_merge_commits: Any
1450    allow_rebase: Any
1451    allow_rebase_explicit: Any
1452    allow_squash_merge: Any
1453    archived: Any
1454    closed_at: Any
1455    closed_issues: int
1456    created_at: str
1457    default_branch: Any
1458    description: str
1459    due_on: Any
1460    has_issues: Any
1461    has_pull_requests: Any
1462    has_wiki: Any
1463    id: int
1464    ignore_whitespace_conflicts: Any
1465    name: Any
1466    open_issues: int
1467    private: Any
1468    state: str
1469    title: str
1470    updated_at: str
1471    website: Any
1472
1473    API_OBJECT = """/repos/{owner}/{repo}/milestones/{number}"""  # <owner, repo>
1474
1475    def __init__(self, allspice_client):
1476        super().__init__(allspice_client)
1477
1478    def __eq__(self, other):
1479        if not isinstance(other, Milestone):
1480            return False
1481        return self.allspice_client == other.allspice_client and self.id == other.id
1482
1483    def __hash__(self):
1484        return hash(self.allspice_client) ^ hash(self.id)
1485
1486    _fields_to_parsers: ClassVar[dict] = {
1487        "closed_at": lambda _, t: Util.convert_time(t),
1488        "due_on": lambda _, t: Util.convert_time(t),
1489    }
1490
1491    _patchable_fields: ClassVar[set[str]] = {
1492        "allow_merge_commits",
1493        "allow_rebase",
1494        "allow_rebase_explicit",
1495        "allow_squash_merge",
1496        "archived",
1497        "default_branch",
1498        "description",
1499        "has_issues",
1500        "has_pull_requests",
1501        "has_wiki",
1502        "ignore_whitespace_conflicts",
1503        "name",
1504        "private",
1505        "website",
1506    }
1507
1508    @classmethod
1509    def request(cls, allspice_client, owner: str, repo: str, number: str):
1510        return cls._request(allspice_client, {"owner": owner, "repo": repo, "number": number})
1511
1512
1513class Attachment(ReadonlyApiObject):
1514    """
1515    An asset attached to a comment.
1516
1517    You cannot edit or delete the attachment from this object - see the instance methods
1518    Comment.edit_attachment and delete_attachment for that.
1519    """
1520
1521    browser_download_url: str
1522    created_at: str
1523    download_count: int
1524    id: int
1525    name: str
1526    size: int
1527    uuid: str
1528
1529    def __init__(self, allspice_client):
1530        super().__init__(allspice_client)
1531
1532    def __eq__(self, other):
1533        if not isinstance(other, Attachment):
1534            return False
1535
1536        return self.uuid == other.uuid
1537
1538    def __hash__(self):
1539        return hash(self.uuid)
1540
1541    def download_to_file(self, io: IO):
1542        """
1543        Download the raw, binary data of this Attachment to a file-like object.
1544
1545        Example:
1546
1547            with open("my_file.zip", "wb") as f:
1548                attachment.download_to_file(f)
1549
1550        :param io: The file-like object to write the data to.
1551        """
1552
1553        response = self.allspice_client.requests.get(
1554            self.browser_download_url,
1555            headers=self.allspice_client.headers,
1556            stream=True,
1557        )
1558        # 4kb chunks
1559        for chunk in response.iter_content(chunk_size=4096):
1560            if chunk:
1561                io.write(chunk)
1562
1563
1564class Comment(ApiObject):
1565    assets: List[Union[Any, Dict[str, Union[int, str]]]]
1566    body: str
1567    created_at: datetime
1568    html_url: str
1569    id: int
1570    issue_url: str
1571    original_author: str
1572    original_author_id: int
1573    pull_request_url: str
1574    updated_at: datetime
1575    user: User
1576
1577    API_OBJECT = """/repos/{owner}/{repo}/issues/comments/{id}"""
1578    GET_ATTACHMENTS_PATH = """/repos/{owner}/{repo}/issues/comments/{id}/assets"""
1579    ATTACHMENT_PATH = """/repos/{owner}/{repo}/issues/comments/{id}/assets/{attachment_id}"""
1580
1581    def __init__(self, allspice_client):
1582        super().__init__(allspice_client)
1583
1584    def __eq__(self, other):
1585        if not isinstance(other, Comment):
1586            return False
1587        return self.repository == other.repository and self.id == other.id
1588
1589    def __hash__(self):
1590        return hash(self.repository) ^ hash(self.id)
1591
1592    @classmethod
1593    def request(cls, allspice_client, owner: str, repo: str, id: str) -> "Comment":
1594        return cls._request(allspice_client, {"owner": owner, "repo": repo, "id": id})
1595
1596    _fields_to_parsers: ClassVar[dict] = {
1597        "user": lambda allspice_client, r: User.parse_response(allspice_client, r),
1598        "created_at": lambda _, t: Util.convert_time(t),
1599        "updated_at": lambda _, t: Util.convert_time(t),
1600    }
1601
1602    _patchable_fields: ClassVar[set[str]] = {"body"}
1603
1604    @property
1605    def parent_url(self) -> str:
1606        """URL of the parent of this comment (the issue or the pull request)"""
1607
1608        if self.issue_url is not None and self.issue_url != "":
1609            return self.issue_url
1610        else:
1611            return self.pull_request_url
1612
1613    @cached_property
1614    def repository(self) -> Repository:
1615        """The repository this comment was posted on."""
1616
1617        owner_name, repo_name = self.parent_url.split("/")[-4:-2]
1618        return Repository.request(self.allspice_client, owner_name, repo_name)
1619
1620    def __fields_for_path(self):
1621        return {
1622            "owner": self.repository.owner.username,
1623            "repo": self.repository.name,
1624            "id": self.id,
1625        }
1626
1627    def commit(self):
1628        self._commit(self.__fields_for_path())
1629
1630    def delete(self):
1631        self.allspice_client.requests_delete(self.API_OBJECT.format(**self.__fields_for_path()))
1632        self.deleted = True
1633
1634    def get_attachments(self) -> List[Attachment]:
1635        """
1636        Get all attachments on this comment. This returns Attachment objects, which
1637        contain a link to download the attachment.
1638
1639        https://hub.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments
1640        """
1641
1642        results = self.allspice_client.requests_get(
1643            self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path())
1644        )
1645        return [Attachment.parse_response(self.allspice_client, result) for result in results]
1646
1647    def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment:
1648        """
1649        Create an attachment on this comment.
1650
1651        https://hub.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment
1652
1653        :param file: The file to attach. This should be a file-like object.
1654        :param name: The name of the file. If not provided, the name of the file will be
1655                     used.
1656        :return: The created attachment.
1657        """
1658
1659        args: dict[str, Any] = {
1660            "files": {"attachment": file},
1661        }
1662        if name is not None:
1663            args["params"] = {"name": name}
1664
1665        result = self.allspice_client.requests_post(
1666            self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()),
1667            **args,
1668        )
1669        return Attachment.parse_response(self.allspice_client, result)
1670
1671    def edit_attachment(self, attachment: Attachment, data: dict) -> Attachment:
1672        """
1673        Edit an attachment.
1674
1675        The list of params that can be edited is available at
1676        https://hub.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment
1677
1678        :param attachment: The attachment to be edited
1679        :param data: The data parameter should be a dictionary of the fields to edit.
1680        :return: The edited attachment
1681        """
1682
1683        args = {
1684            **self.__fields_for_path(),
1685            "attachment_id": attachment.id,
1686        }
1687        result = self.allspice_client.requests_patch(
1688            self.ATTACHMENT_PATH.format(**args),
1689            data=data,
1690        )
1691        return Attachment.parse_response(self.allspice_client, result)
1692
1693    def delete_attachment(self, attachment: Attachment):
1694        """https://hub.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment"""
1695
1696        args = {
1697            **self.__fields_for_path(),
1698            "attachment_id": attachment.id,
1699        }
1700        self.allspice_client.requests_delete(self.ATTACHMENT_PATH.format(**args))
1701        attachment.deleted = True
1702
1703
1704class Commit(ReadonlyApiObject):
1705    author: User
1706    commit: Dict[str, Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]]
1707    committer: Dict[str, Union[int, str, bool]]
1708    created: str
1709    files: List[Dict[str, str]]
1710    html_url: str
1711    inner_commit: Dict[str, Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]]
1712    parents: List[Union[Dict[str, str], Any]]
1713    sha: str
1714    stats: Dict[str, int]
1715    url: str
1716
1717    API_OBJECT = """/repos/{owner}/{repo}/commits/{sha}"""
1718    COMMIT_GET_STATUS = """/repos/{owner}/{repo}/commits/{sha}/status"""
1719    COMMIT_GET_STATUSES = """/repos/{owner}/{repo}/commits/{sha}/statuses"""
1720
1721    # Regex to extract owner and repo names from the url property
1722    URL_REGEXP = re.compile(r"/repos/([^/]+)/([^/]+)/git/commits")
1723
1724    def __init__(self, allspice_client):
1725        super().__init__(allspice_client)
1726
1727    _fields_to_parsers: ClassVar[dict] = {
1728        # NOTE: api may return None for commiters that are no allspice users
1729        "author": lambda allspice_client, u: (
1730            User.parse_response(allspice_client, u) if u else None
1731        )
1732    }
1733
1734    def __eq__(self, other):
1735        if not isinstance(other, Commit):
1736            return False
1737        return self.sha == other.sha
1738
1739    def __hash__(self):
1740        return hash(self.sha)
1741
1742    @classmethod
1743    def parse_response(cls, allspice_client, result) -> "Commit":
1744        commit_cache = result["commit"]
1745        api_object = cls(allspice_client)
1746        cls._initialize(allspice_client, api_object, result)
1747        # inner_commit for legacy reasons
1748        Commit._add_read_property("inner_commit", commit_cache, api_object)
1749        return api_object
1750
1751    def get_status(self) -> CommitCombinedStatus:
1752        """
1753        Get a combined status consisting of all statues on this commit.
1754
1755        Note that the returned object is a CommitCombinedStatus object, which
1756        also contains a list of all statuses on the commit.
1757
1758        https://hub.allspice.io/api/swagger#/repository/repoGetCommitStatus
1759        """
1760
1761        result = self.allspice_client.requests_get(
1762            self.COMMIT_GET_STATUS.format(**self._fields_for_path)
1763        )
1764        return CommitCombinedStatus.parse_response(self.allspice_client, result)
1765
1766    def get_statuses(self) -> List[CommitStatus]:
1767        """
1768        Get a list of all statuses on this commit.
1769
1770        https://hub.allspice.io/api/swagger#/repository/repoListCommitStatuses
1771        """
1772
1773        results = self.allspice_client.requests_get(
1774            self.COMMIT_GET_STATUSES.format(**self._fields_for_path)
1775        )
1776        return [CommitStatus.parse_response(self.allspice_client, result) for result in results]
1777
1778    @cached_property
1779    def _fields_for_path(self) -> dict[str, str]:
1780        matches = self.URL_REGEXP.search(self.url)
1781        if not matches:
1782            raise ValueError(f"Invalid commit URL: {self.url}")
1783
1784        return {
1785            "owner": matches.group(1),
1786            "repo": matches.group(2),
1787            "sha": self.sha,
1788        }
1789
1790
1791class CommitStatusState(Enum):
1792    PENDING = "pending"
1793    SUCCESS = "success"
1794    ERROR = "error"
1795    FAILURE = "failure"
1796    WARNING = "warning"
1797
1798    @classmethod
1799    def try_init(cls, value: str) -> CommitStatusState | str:
1800        """
1801        Try converting a string to the enum, and if that fails, return the
1802        string itself.
1803        """
1804
1805        try:
1806            return cls(value)
1807        except ValueError:
1808            return value
1809
1810
1811class CommitStatus(ReadonlyApiObject):
1812    context: str
1813    created_at: str
1814    creator: User
1815    description: str
1816    id: int
1817    status: CommitStatusState
1818    target_url: str
1819    updated_at: str
1820    url: str
1821
1822    def __init__(self, allspice_client):
1823        super().__init__(allspice_client)
1824
1825    _fields_to_parsers: ClassVar[dict] = {
1826        # Gitea/ASH doesn't actually validate that the status is a "valid"
1827        # status, so we can expect empty or unknown strings in the status field.
1828        "status": lambda _, s: CommitStatusState.try_init(s),
1829        "creator": lambda allspice_client, u: (
1830            User.parse_response(allspice_client, u) if u else None
1831        ),
1832    }
1833
1834    def __eq__(self, other):
1835        if not isinstance(other, CommitStatus):
1836            return False
1837        return self.id == other.id
1838
1839    def __hash__(self):
1840        return hash(self.id)
1841
1842
1843class CommitCombinedStatus(ReadonlyApiObject):
1844    commit_url: str
1845    repository: Repository
1846    sha: str
1847    state: CommitStatusState
1848    statuses: List["CommitStatus"]
1849    total_count: int
1850    url: str
1851
1852    def __init__(self, allspice_client):
1853        super().__init__(allspice_client)
1854
1855    _fields_to_parsers: ClassVar[dict] = {
1856        # See CommitStatus
1857        "state": lambda _, s: CommitStatusState.try_init(s),
1858        "statuses": lambda allspice_client, statuses: [
1859            CommitStatus.parse_response(allspice_client, status) for status in statuses
1860        ],
1861        "repository": lambda allspice_client, r: Repository.parse_response(allspice_client, r),
1862    }
1863
1864    def __eq__(self, other):
1865        if not isinstance(other, CommitCombinedStatus):
1866            return False
1867        return self.sha == other.sha
1868
1869    def __hash__(self):
1870        return hash(self.sha)
1871
1872    @classmethod
1873    def parse_response(cls, allspice_client, result) -> "CommitCombinedStatus":
1874        api_object = cls(allspice_client)
1875        cls._initialize(allspice_client, api_object, result)
1876        return api_object
1877
1878
1879class Issue(ApiObject):
1880    assets: List[Any]
1881    assignee: Any
1882    assignees: Any
1883    body: str
1884    closed_at: Any
1885    comments: int
1886    created_at: str
1887    due_date: Any
1888    html_url: str
1889    id: int
1890    is_locked: bool
1891    labels: List[Any]
1892    milestone: Optional["Milestone"]
1893    number: int
1894    original_author: str
1895    original_author_id: int
1896    pin_order: int
1897    pull_request: Any
1898    ref: str
1899    repository: Dict[str, Union[int, str]]
1900    state: str
1901    title: str
1902    updated_at: str
1903    url: str
1904    user: User
1905
1906    API_OBJECT = """/repos/{owner}/{repo}/issues/{index}"""  # <owner, repo, index>
1907    GET_TIME = """/repos/%s/%s/issues/%s/times"""  # <owner, repo, index>
1908    GET_COMMENTS = """/repos/{owner}/{repo}/issues/{index}/comments"""
1909    CREATE_ISSUE = """/repos/{owner}/{repo}/issues"""
1910
1911    OPENED = "open"
1912    CLOSED = "closed"
1913
1914    def __init__(self, allspice_client):
1915        super().__init__(allspice_client)
1916
1917    def __eq__(self, other):
1918        if not isinstance(other, Issue):
1919            return False
1920        return self.repository == other.repository and self.id == other.id
1921
1922    def __hash__(self):
1923        return hash(self.repository) ^ hash(self.id)
1924
1925    _fields_to_parsers: ClassVar[dict] = {
1926        "milestone": lambda allspice_client, m: Milestone.parse_response(allspice_client, m),
1927        "user": lambda allspice_client, u: User.parse_response(allspice_client, u),
1928        "assignee": lambda allspice_client, u: User.parse_response(allspice_client, u),
1929        "assignees": lambda allspice_client, us: [
1930            User.parse_response(allspice_client, u) for u in us
1931        ],
1932        "state": lambda _, s: (Issue.CLOSED if s == "closed" else Issue.OPENED),
1933    }
1934
1935    _parsers_to_fields: ClassVar[dict] = {
1936        "milestone": lambda m: m.id,
1937    }
1938
1939    _patchable_fields: ClassVar[set[str]] = {
1940        "assignee",
1941        "assignees",
1942        "body",
1943        "due_date",
1944        "milestone",
1945        "state",
1946        "title",
1947    }
1948
1949    def commit(self):
1950        args = {
1951            "owner": self.repository.owner.username,
1952            "repo": self.repository.name,
1953            "index": self.number,
1954        }
1955        self._commit(args)
1956
1957    @classmethod
1958    def request(cls, allspice_client, owner: str, repo: str, number: str):
1959        api_object = cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number})
1960        # The repository in the response is a RepositoryMeta object, so request
1961        # the full repository object and add it to the issue object.
1962        repository = Repository.request(allspice_client, owner, repo)
1963        setattr(api_object, "_repository", repository)
1964        # For legacy reasons
1965        cls._add_read_property("repo", repository, api_object)
1966        return api_object
1967
1968    @classmethod
1969    def create_issue(cls, allspice_client, repo: Repository, title: str, body: str = ""):
1970        args = {"owner": repo.owner.username, "repo": repo.name}
1971        data = {"title": title, "body": body}
1972        result = allspice_client.requests_post(Issue.CREATE_ISSUE.format(**args), data=data)
1973        issue = Issue.parse_response(allspice_client, result)
1974        setattr(issue, "_repository", repo)
1975        cls._add_read_property("repo", repo, issue)
1976        return issue
1977
1978    @property
1979    def owner(self) -> Organization | User:
1980        return self.repository.owner
1981
1982    def get_time_sum(self, user: User) -> int:
1983        results = self.allspice_client.requests_get(
1984            Issue.GET_TIME % (self.owner.username, self.repository.name, self.number)
1985        )
1986        return sum(result["time"] for result in results if result and result["user_id"] == user.id)
1987
1988    def get_times(self) -> Optional[Dict]:
1989        return self.allspice_client.requests_get(
1990            Issue.GET_TIME % (self.owner.username, self.repository.name, self.number)
1991        )
1992
1993    def delete_time(self, time_id: str):
1994        path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times/{time_id}"
1995        self.allspice_client.requests_delete(path)
1996
1997    def add_time(self, time: int, created: Optional[str] = None, user_name: Optional[User] = None):
1998        path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times"
1999        self.allspice_client.requests_post(
2000            path, data={"created": created, "time": int(time), "user_name": user_name}
2001        )
2002
2003    def get_comments(self) -> List[Comment]:
2004        """https://hub.allspice.io/api/swagger#/issue/issueGetComments"""
2005
2006        results = self.allspice_client.requests_get(
2007            self.GET_COMMENTS.format(
2008                owner=self.owner.username, repo=self.repository.name, index=self.number
2009            )
2010        )
2011
2012        return [Comment.parse_response(self.allspice_client, result) for result in results]
2013
2014    def create_comment(self, body: str) -> Comment:
2015        """https://hub.allspice.io/api/swagger#/issue/issueCreateComment"""
2016
2017        path = self.GET_COMMENTS.format(
2018            owner=self.owner.username, repo=self.repository.name, index=self.number
2019        )
2020
2021        response = self.allspice_client.requests_post(path, data={"body": body})
2022        return Comment.parse_response(self.allspice_client, response)
2023
2024
2025class DesignReview(ApiObject):
2026    """
2027    A Design Review. See
2028    https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest.
2029
2030    Note: The base and head fields are not `Branch` objects - they are plain strings
2031    referring to the branch names. This is because DRs can exist for branches that have
2032    been deleted, which don't have an associated `Branch` object from the API. You can use
2033    the `Repository.get_branch` method to get a `Branch` object for a branch if you know
2034    it exists.
2035    """
2036
2037    additions: int
2038    allow_maintainer_edit: bool
2039    allow_maintainer_edits: Any
2040    assignee: User
2041    assignees: List["User"]
2042    base: str
2043    body: str
2044    changed_files: int
2045    closed_at: Any
2046    comments: int
2047    created_at: str
2048    deletions: int
2049    diff_url: str
2050    draft: bool
2051    due_date: Optional[str]
2052    head: str
2053    html_url: str
2054    id: int
2055    is_locked: bool
2056    labels: List[Any]
2057    merge_base: str
2058    merge_commit_sha: Any
2059    mergeable: bool
2060    merged: bool
2061    merged_at: Any
2062    merged_by: Any
2063    milestone: Any
2064    number: int
2065    patch_url: str
2066    pin_order: int
2067    repository: Optional["Repository"]
2068    requested_reviewers: Any
2069    review_comments: int
2070    state: str
2071    title: str
2072    updated_at: str
2073    url: str
2074    user: User
2075
2076    API_OBJECT = "/repos/{owner}/{repo}/pulls/{index}"
2077    MERGE_DESIGN_REVIEW = "/repos/{owner}/{repo}/pulls/{index}/merge"
2078    GET_COMMENTS = "/repos/{owner}/{repo}/issues/{index}/comments"
2079
2080    OPEN = "open"
2081    CLOSED = "closed"
2082
2083    class MergeType(Enum):
2084        MERGE = "merge"
2085        REBASE = "rebase"
2086        REBASE_MERGE = "rebase-merge"
2087        SQUASH = "squash"
2088        MANUALLY_MERGED = "manually-merged"
2089
2090    def __init__(self, allspice_client):
2091        super().__init__(allspice_client)
2092
2093    def __eq__(self, other):
2094        if not isinstance(other, DesignReview):
2095            return False
2096        return self.repository == other.repository and self.id == other.id
2097
2098    def __hash__(self):
2099        return hash(self.repository) ^ hash(self.id)
2100
2101    @classmethod
2102    def parse_response(cls, allspice_client, result) -> "DesignReview":
2103        api_object = super().parse_response(allspice_client, result)
2104        cls._add_read_property(
2105            "repository",
2106            Repository.parse_response(allspice_client, result["base"]["repo"]),
2107            api_object,
2108        )
2109
2110        return api_object
2111
2112    @classmethod
2113    def request(cls, allspice_client, owner: str, repo: str, number: str):
2114        """See https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest"""
2115        return cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number})
2116
2117    _fields_to_parsers: ClassVar[dict] = {
2118        "assignee": lambda allspice_client, u: User.parse_response(allspice_client, u),
2119        "assignees": lambda allspice_client, us: [
2120            User.parse_response(allspice_client, u) for u in us
2121        ],
2122        "base": lambda _, b: b["ref"],
2123        "head": lambda _, h: h["ref"],
2124        "merged_by": lambda allspice_client, u: User.parse_response(allspice_client, u),
2125        "milestone": lambda allspice_client, m: Milestone.parse_response(allspice_client, m),
2126        "user": lambda allspice_client, u: User.parse_response(allspice_client, u),
2127    }
2128
2129    _patchable_fields: ClassVar[set[str]] = {
2130        "allow_maintainer_edits",
2131        "assignee",
2132        "assignees",
2133        "base",
2134        "body",
2135        "due_date",
2136        "milestone",
2137        "state",
2138        "title",
2139    }
2140
2141    _parsers_to_fields: ClassVar[dict] = {
2142        "assignee": lambda u: u.username,
2143        "assignees": lambda us: [u.username for u in us],
2144        "base": lambda b: b.name if isinstance(b, Branch) else b,
2145        "milestone": lambda m: m.id,
2146    }
2147
2148    def commit(self):
2149        data = self.get_dirty_fields()
2150        if "due_date" in data and data["due_date"] is None:
2151            data["unset_due_date"] = True
2152
2153        args = {
2154            "owner": self.repository.owner.username,
2155            "repo": self.repository.name,
2156            "index": self.number,
2157        }
2158        self._commit(args, data)
2159
2160    def merge(self, merge_type: MergeType):
2161        """
2162        Merge the pull request. See
2163        https://hub.allspice.io/api/swagger#/repository/repoMergePullRequest
2164
2165        :param merge_type: The type of merge to perform. See the MergeType enum.
2166        """
2167
2168        self.allspice_client.requests_put(
2169            self.MERGE_DESIGN_REVIEW.format(
2170                owner=self.repository.owner.username,
2171                repo=self.repository.name,
2172                index=self.number,
2173            ),
2174            data={"Do": merge_type.value},
2175        )
2176
2177    def get_comments(self) -> List[Comment]:
2178        """
2179        Get the comments on this pull request, but not specifically on a review.
2180
2181        https://hub.allspice.io/api/swagger#/issue/issueGetComments
2182
2183        :return: A list of comments on this pull request.
2184        """
2185
2186        results = self.allspice_client.requests_get(
2187            self.GET_COMMENTS.format(
2188                owner=self.repository.owner.username,
2189                repo=self.repository.name,
2190                index=self.number,
2191            )
2192        )
2193        return [Comment.parse_response(self.allspice_client, result) for result in results]
2194
2195    def create_comment(self, body: str):
2196        """
2197        Create a comment on this pull request. This uses the same endpoint as the
2198        comments on issues, and will not be associated with any reviews.
2199
2200        https://hub.allspice.io/api/swagger#/issue/issueCreateComment
2201
2202        :param body: The body of the comment.
2203        :return: The comment that was created.
2204        """
2205
2206        result = self.allspice_client.requests_post(
2207            self.GET_COMMENTS.format(
2208                owner=self.repository.owner.username,
2209                repo=self.repository.name,
2210                index=self.number,
2211            ),
2212            data={"body": body},
2213        )
2214        return Comment.parse_response(self.allspice_client, result)
2215
2216
2217class Team(ApiObject):
2218    can_create_org_repo: bool
2219    description: str
2220    id: int
2221    includes_all_repositories: bool
2222    name: str
2223    organization: Optional["Organization"]
2224    permission: str
2225    units: List[str]
2226    units_map: Dict[str, str]
2227
2228    API_OBJECT = """/teams/{id}"""  # <id>
2229    ADD_REPO = """/teams/%s/repos/%s/%s"""  # <id, org, repo>
2230    TEAM_DELETE = """/teams/%s"""  # <id>
2231    GET_MEMBERS = """/teams/%s/members"""  # <id>
2232    GET_REPOS = """/teams/%s/repos"""  # <id>
2233
2234    def __init__(self, allspice_client):
2235        super().__init__(allspice_client)
2236
2237    def __eq__(self, other):
2238        if not isinstance(other, Team):
2239            return False
2240        return self.organization == other.organization and self.id == other.id
2241
2242    def __hash__(self):
2243        return hash(self.organization) ^ hash(self.id)
2244
2245    _fields_to_parsers: ClassVar[dict] = {
2246        "organization": lambda allspice_client, o: Organization.parse_response(allspice_client, o)
2247    }
2248
2249    _patchable_fields: ClassVar[set[str]] = {
2250        "can_create_org_repo",
2251        "description",
2252        "includes_all_repositories",
2253        "name",
2254        "permission",
2255        "units",
2256        "units_map",
2257    }
2258
2259    @classmethod
2260    def request(cls, allspice_client, id: int):
2261        return cls._request(allspice_client, {"id": id})
2262
2263    def commit(self):
2264        args = {"id": self.id}
2265        self._commit(args)
2266
2267    def add_user(self, user: User):
2268        """https://hub.allspice.io/api/swagger#/organization/orgAddTeamMember"""
2269        url = f"/teams/{self.id}/members/{user.login}"
2270        self.allspice_client.requests_put(url)
2271
2272    def add_repo(self, org: Organization, repo: Union[Repository, str]):
2273        if isinstance(repo, Repository):
2274            repo_name = repo.name
2275        else:
2276            repo_name = repo
2277        self.allspice_client.requests_put(Team.ADD_REPO % (self.id, org.username, repo_name))
2278
2279    def get_members(self):
2280        """Get all users assigned to the team."""
2281        results = self.allspice_client.requests_get_paginated(
2282            Team.GET_MEMBERS % self.id,
2283        )
2284        return [User.parse_response(self.allspice_client, result) for result in results]
2285
2286    def get_repos(self):
2287        """Get all repos of this Team."""
2288        results = self.allspice_client.requests_get(Team.GET_REPOS % self.id)
2289        return [Repository.parse_response(self.allspice_client, result) for result in results]
2290
2291    def delete(self):
2292        self.allspice_client.requests_delete(Team.TEAM_DELETE % self.id)
2293        self.deleted = True
2294
2295    def remove_team_member(self, user_name: str):
2296        url = f"/teams/{self.id}/members/{user_name}"
2297        self.allspice_client.requests_delete(url)
2298
2299
2300class Release(ApiObject):
2301    """
2302    A release on a repo.
2303    """
2304
2305    assets: List[Union[Any, Dict[str, Union[int, str]], "ReleaseAsset"]]
2306    author: User
2307    body: str
2308    created_at: str
2309    draft: bool
2310    html_url: str
2311    id: int
2312    name: str
2313    prerelease: bool
2314    published_at: str
2315    repo: Optional["Repository"]
2316    repository: Optional["Repository"]
2317    tag_name: str
2318    tarball_url: str
2319    target_commitish: str
2320    upload_url: str
2321    url: str
2322    zipball_url: str
2323
2324    API_OBJECT = "/repos/{owner}/{repo}/releases/{id}"
2325    RELEASE_CREATE_ASSET = "/repos/{owner}/{repo}/releases/{id}/assets"
2326    # Note that we don't strictly need the get_assets route, as the release
2327    # object already contains the assets.
2328
2329    def __init__(self, allspice_client):
2330        super().__init__(allspice_client)
2331
2332    def __eq__(self, other):
2333        if not isinstance(other, Release):
2334            return False
2335        return self.repo == other.repo and self.id == other.id
2336
2337    def __hash__(self):
2338        return hash(self.repo) ^ hash(self.id)
2339
2340    _fields_to_parsers: ClassVar[dict] = {
2341        "author": lambda allspice_client, author: User.parse_response(allspice_client, author),
2342    }
2343    _patchable_fields: ClassVar[set[str]] = {
2344        "body",
2345        "draft",
2346        "name",
2347        "prerelease",
2348        "tag_name",
2349        "target_commitish",
2350    }
2351
2352    @classmethod
2353    def parse_response(cls, allspice_client, result, repo) -> Release:
2354        release = super().parse_response(allspice_client, result)
2355        Release._add_read_property("repository", repo, release)
2356        # For legacy reasons
2357        Release._add_read_property("repo", repo, release)
2358        setattr(
2359            release,
2360            "_assets",
2361            [
2362                ReleaseAsset.parse_response(allspice_client, asset, release)
2363                for asset in result["assets"]
2364            ],
2365        )
2366        return release
2367
2368    @classmethod
2369    def request(
2370        cls,
2371        allspice_client,
2372        owner: str,
2373        repo: str,
2374        id: Optional[int] = None,
2375    ) -> Release:
2376        args = {"owner": owner, "repo": repo, "id": id}
2377        release_response = cls._get_gitea_api_object(allspice_client, args)
2378        repository = Repository.request(allspice_client, owner, repo)
2379        release = cls.parse_response(allspice_client, release_response, repository)
2380        return release
2381
2382    def commit(self):
2383        args = {"owner": self.repo.owner.name, "repo": self.repo.name, "id": self.id}
2384        self._commit(args)
2385
2386    def create_asset(self, file: IO, name: Optional[str] = None) -> ReleaseAsset:
2387        """
2388        Create an asset for this release.
2389
2390        https://hub.allspice.io/api/swagger#/repository/repoCreateReleaseAsset
2391
2392        :param file: The file to upload. This should be a file-like object.
2393        :param name: The name of the file.
2394        :return: The created asset.
2395        """
2396
2397        args: dict[str, Any] = {"files": {"attachment": file}}
2398        if name is not None:
2399            args["params"] = {"name": name}
2400
2401        result = self.allspice_client.requests_post(
2402            self.RELEASE_CREATE_ASSET.format(
2403                owner=self.repo.owner.username,
2404                repo=self.repo.name,
2405                id=self.id,
2406            ),
2407            **args,
2408        )
2409        return ReleaseAsset.parse_response(self.allspice_client, result, self)
2410
2411    def delete(self):
2412        args = {"owner": self.repo.owner.name, "repo": self.repo.name, "id": self.id}
2413        self.allspice_client.requests_delete(self.API_OBJECT.format(**args))
2414        self.deleted = True
2415
2416
2417class ReleaseAsset(ApiObject):
2418    browser_download_url: str
2419    created_at: str
2420    download_count: int
2421    id: int
2422    name: str
2423    release: Optional["Release"]
2424    size: int
2425    uuid: str
2426
2427    API_OBJECT = "/repos/{owner}/{repo}/releases/{release_id}/assets/{id}"
2428
2429    def __init__(self, allspice_client):
2430        super().__init__(allspice_client)
2431
2432    def __eq__(self, other):
2433        if not isinstance(other, ReleaseAsset):
2434            return False
2435        return self.release == other.release and self.id == other.id
2436
2437    def __hash__(self):
2438        return hash(self.release) ^ hash(self.id)
2439
2440    _fields_to_parsers: ClassVar[dict] = {}
2441    _patchable_fields: ClassVar[set[str]] = {
2442        "name",
2443    }
2444
2445    @classmethod
2446    def parse_response(cls, allspice_client, result, release) -> ReleaseAsset:
2447        asset = super().parse_response(allspice_client, result)
2448        ReleaseAsset._add_read_property("release", release, asset)
2449        return asset
2450
2451    @classmethod
2452    def request(
2453        cls,
2454        allspice_client,
2455        owner: str,
2456        repo: str,
2457        release_id: int,
2458        id: int,
2459    ) -> ReleaseAsset:
2460        args = {"owner": owner, "repo": repo, "release_id": release_id, "id": id}
2461        asset_response = cls._get_gitea_api_object(allspice_client, args)
2462        release = Release.request(allspice_client, owner, repo, release_id)
2463        asset = cls.parse_response(allspice_client, asset_response, release)
2464        return asset
2465
2466    def commit(self):
2467        args = {
2468            "owner": self.release.repo.owner,
2469            "repo": self.release.repo.name,
2470            "release_id": self.release.id,
2471            "id": self.id,
2472        }
2473        self._commit(args)
2474
2475    def download(self) -> bytes:
2476        """
2477        Download the raw, binary data of this asset.
2478
2479        Note 1: if the file you are requesting is a text file, you might want to
2480        use .decode() on the result to get a string. For example:
2481
2482            asset.download().decode("utf-8")
2483
2484        Note 2: this method will store the entire file in memory. If you are
2485        downloading a large file, you might want to use download_to_file instead.
2486        """
2487
2488        return self.allspice_client.requests.get(
2489            self.browser_download_url,
2490            headers=self.allspice_client.headers,
2491        ).content
2492
2493    def download_to_file(self, io: IO):
2494        """
2495        Download the raw, binary data of this asset to a file-like object.
2496
2497        Example:
2498
2499            with open("my_file.zip", "wb") as f:
2500                asset.download_to_file(f)
2501
2502        :param io: The file-like object to write the data to.
2503        """
2504
2505        response = self.allspice_client.requests.get(
2506            self.browser_download_url,
2507            headers=self.allspice_client.headers,
2508            stream=True,
2509        )
2510        # 4kb chunks
2511        for chunk in response.iter_content(chunk_size=4096):
2512            if chunk:
2513                io.write(chunk)
2514
2515    def delete(self):
2516        args = {
2517            "owner": self.release.repo.owner.name,
2518            "repo": self.release.repo.name,
2519            "release_id": self.release.id,
2520            "id": self.id,
2521        }
2522        self.allspice_client.requests_delete(self.API_OBJECT.format(**args))
2523        self.deleted = True
2524
2525
2526class Content(ReadonlyApiObject):
2527    content: Any
2528    download_url: str
2529    encoding: Any
2530    git_url: str
2531    html_url: str
2532    last_commit_sha: str
2533    name: str
2534    path: str
2535    sha: str
2536    size: int
2537    submodule_git_url: Any
2538    target: Any
2539    type: str
2540    url: str
2541
2542    FILE = "file"
2543
2544    def __init__(self, allspice_client):
2545        super().__init__(allspice_client)
2546
2547    def __eq__(self, other):
2548        if not isinstance(other, Content):
2549            return False
2550
2551        return self.sha == other.sha and self.name == other.name
2552
2553    def __hash__(self):
2554        return hash(self.sha) ^ hash(self.name)
2555
2556
2557Ref = Union[Branch, Commit, str]
2558
2559
2560class Util:
2561    @staticmethod
2562    def convert_time(time: str) -> datetime:
2563        """Parsing of strange Gitea time format ("%Y-%m-%dT%H:%M:%S:%z" but with ":" in time zone notation)"""
2564        try:
2565            return datetime.strptime(time[:-3] + "00", "%Y-%m-%dT%H:%M:%S%z")
2566        except ValueError:
2567            return datetime.strptime(time[:-3] + "00", "%Y-%m-%dT%H:%M:%S")
2568
2569    @staticmethod
2570    def format_time(time: datetime) -> str:
2571        """
2572        Format a datetime object to Gitea's time format.
2573
2574        :param time: The time to format
2575        :return: Formatted time
2576        """
2577
2578        return time.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S") + "Z"
2579
2580    @staticmethod
2581    def data_params_for_ref(ref: Optional[Ref]) -> Dict:
2582        """
2583        Given a "ref", returns a dict with the ref parameter for the API call.
2584
2585        If the ref is None, returns an empty dict. You can pass this to the API
2586        directly.
2587        """
2588
2589        if isinstance(ref, Branch):
2590            return {"ref": ref.name}
2591        elif isinstance(ref, Commit):
2592            return {"ref": ref.sha}
2593        elif ref:
2594            return {"ref": ref}
2595        else:
2596            return {}
class Organization(allspice.baseapiobject.ApiObject):
 33class Organization(ApiObject):
 34    """see https://hub.allspice.io/api/swagger#/organization/orgGetAll"""
 35
 36    active: Optional[bool]
 37    avatar_url: str
 38    created: Optional[str]
 39    description: str
 40    email: str
 41    followers_count: Optional[int]
 42    following_count: Optional[int]
 43    full_name: str
 44    html_url: Optional[str]
 45    id: int
 46    is_admin: Optional[bool]
 47    language: Optional[str]
 48    last_login: Optional[str]
 49    location: str
 50    login: Optional[str]
 51    login_name: Optional[str]
 52    name: Optional[str]
 53    prohibit_login: Optional[bool]
 54    repo_admin_change_team_access: Optional[bool]
 55    restricted: Optional[bool]
 56    source_id: Optional[int]
 57    starred_repos_count: Optional[int]
 58    username: str
 59    visibility: str
 60    website: str
 61
 62    API_OBJECT = """/orgs/{name}"""  # <org>
 63    ORG_REPOS_REQUEST = """/orgs/%s/repos"""  # <org>
 64    ORG_TEAMS_REQUEST = """/orgs/%s/teams"""  # <org>
 65    ORG_TEAMS_CREATE = """/orgs/%s/teams"""  # <org>
 66    ORG_GET_MEMBERS = """/orgs/%s/members"""  # <org>
 67    ORG_IS_MEMBER = """/orgs/%s/members/%s"""  # <org>, <username>
 68    ORG_HEATMAP = """/users/%s/heatmap"""  # <username>
 69
 70    def __init__(self, allspice_client):
 71        super().__init__(allspice_client)
 72
 73    def __eq__(self, other):
 74        if not isinstance(other, Organization):
 75            return False
 76        return self.allspice_client == other.allspice_client and self.name == other.name
 77
 78    def __hash__(self):
 79        return hash(self.allspice_client) ^ hash(self.name)
 80
 81    @classmethod
 82    def request(cls, allspice_client, name: str) -> Self:
 83        return cls._request(allspice_client, {"name": name})
 84
 85    @classmethod
 86    def parse_response(cls, allspice_client, result) -> "Organization":
 87        api_object = super().parse_response(allspice_client, result)
 88        # add "name" field to make this behave similar to users for gitea < 1.18
 89        # also necessary for repository-owner when org is repo owner
 90        if not hasattr(api_object, "name"):
 91            Organization._add_read_property("name", result["username"], api_object)
 92        return api_object
 93
 94    _patchable_fields: ClassVar[set[str]] = {
 95        "description",
 96        "full_name",
 97        "location",
 98        "visibility",
 99        "website",
100    }
101
102    def commit(self):
103        args = {"name": self.name}
104        self._commit(args)
105
106    def create_repo(
107        self,
108        repoName: str,
109        description: str = "",
110        private: bool = False,
111        autoInit=True,
112        gitignores: Optional[str] = None,
113        license: Optional[str] = None,
114        readme: str = "Default",
115        issue_labels: Optional[str] = None,
116        default_branch="master",
117    ):
118        """Create an organization Repository
119
120        Throws:
121            AlreadyExistsException: If the Repository exists already.
122            Exception: If something else went wrong.
123        """
124        result = self.allspice_client.requests_post(
125            f"/orgs/{self.name}/repos",
126            data={
127                "name": repoName,
128                "description": description,
129                "private": private,
130                "auto_init": autoInit,
131                "gitignores": gitignores,
132                "license": license,
133                "issue_labels": issue_labels,
134                "readme": readme,
135                "default_branch": default_branch,
136            },
137        )
138        if "id" in result:
139            self.allspice_client.logger.info("Successfully created Repository %s " % result["name"])
140        else:
141            self.allspice_client.logger.error(result["message"])
142            raise Exception("Repository not created... (gitea: %s)" % result["message"])
143        return Repository.parse_response(self.allspice_client, result)
144
145    def get_repositories(self) -> List["Repository"]:
146        results = self.allspice_client.requests_get_paginated(
147            Organization.ORG_REPOS_REQUEST % self.username
148        )
149        return [Repository.parse_response(self.allspice_client, result) for result in results]
150
151    def get_repository(self, name) -> "Repository":
152        repos = self.get_repositories()
153        for repo in repos:
154            if repo.name == name:
155                return repo
156        raise NotFoundException("Repository %s not existent in organization." % name)
157
158    def get_teams(self) -> List["Team"]:
159        results = self.allspice_client.requests_get(Organization.ORG_TEAMS_REQUEST % self.username)
160        teams = [Team.parse_response(self.allspice_client, result) for result in results]
161        # organisation seems to be missing using this request, so we add org manually
162        for t in teams:
163            setattr(t, "_organization", self)
164        return teams
165
166    def get_team(self, name) -> "Team":
167        teams = self.get_teams()
168        for team in teams:
169            if team.name == name:
170                return team
171        raise NotFoundException("Team not existent in organization.")
172
173    def create_team(
174        self,
175        name: str,
176        description: str = "",
177        permission: str = "read",
178        can_create_org_repo: bool = False,
179        includes_all_repositories: bool = False,
180        units=(
181            "repo.code",
182            "repo.issues",
183            "repo.ext_issues",
184            "repo.wiki",
185            "repo.pulls",
186            "repo.releases",
187            "repo.ext_wiki",
188        ),
189        units_map={},
190    ) -> "Team":
191        """Alias for AllSpice#create_team"""
192        # TODO: Move AllSpice#create_team to Organization#create_team and
193        #       deprecate AllSpice#create_team.
194        return self.allspice_client.create_team(
195            org=self,
196            name=name,
197            description=description,
198            permission=permission,
199            can_create_org_repo=can_create_org_repo,
200            includes_all_repositories=includes_all_repositories,
201            units=units,
202            units_map=units_map,
203        )
204
205    def get_members(self) -> List["User"]:
206        results = self.allspice_client.requests_get(Organization.ORG_GET_MEMBERS % self.username)
207        return [User.parse_response(self.allspice_client, result) for result in results]
208
209    def is_member(self, username) -> bool:
210        if isinstance(username, User):
211            username = username.username
212        try:
213            # returns 204 if its ok, 404 if its not
214            self.allspice_client.requests_get(
215                Organization.ORG_IS_MEMBER % (self.username, username)
216            )
217            return True
218        except Exception:
219            return False
220
221    def remove_member(self, user: "User"):
222        path = f"/orgs/{self.username}/members/{user.username}"
223        self.allspice_client.requests_delete(path)
224
225    def delete(self):
226        """Delete this Organization. Invalidates this Objects data. Also deletes all Repositories owned by the User"""
227        for repo in self.get_repositories():
228            repo.delete()
229        self.allspice_client.requests_delete(Organization.API_OBJECT.format(name=self.username))
230        self.deleted = True
231
232    def get_heatmap(self) -> List[Tuple[datetime, int]]:
233        results = self.allspice_client.requests_get(User.USER_HEATMAP % self.username)
234        results = [
235            (datetime.fromtimestamp(result["timestamp"]), result["contributions"])
236            for result in results
237        ]
238        return results

see allspice.allspice.io/api/swagger#/organization/orgGetAll">https://huballspice.allspice.io/api/swagger#/organization/orgGetAll

Organization(allspice_client)
70    def __init__(self, allspice_client):
71        super().__init__(allspice_client)
active: Optional[bool]
avatar_url: str
created: Optional[str]
description: str
email: str
followers_count: Optional[int]
following_count: Optional[int]
full_name: str
html_url: Optional[str]
id: int
is_admin: Optional[bool]
language: Optional[str]
last_login: Optional[str]
location: str
login: Optional[str]
login_name: Optional[str]
name: Optional[str]
prohibit_login: Optional[bool]
repo_admin_change_team_access: Optional[bool]
restricted: Optional[bool]
source_id: Optional[int]
starred_repos_count: Optional[int]
username: str
visibility: str
website: str
API_OBJECT = '/orgs/{name}'
ORG_REPOS_REQUEST = '/orgs/%s/repos'
ORG_TEAMS_REQUEST = '/orgs/%s/teams'
ORG_TEAMS_CREATE = '/orgs/%s/teams'
ORG_GET_MEMBERS = '/orgs/%s/members'
ORG_IS_MEMBER = '/orgs/%s/members/%s'
ORG_HEATMAP = '/users/%s/heatmap'
@classmethod
def request(cls, allspice_client, name: str) -> Self:
81    @classmethod
82    def request(cls, allspice_client, name: str) -> Self:
83        return cls._request(allspice_client, {"name": name})
@classmethod
def parse_response(cls, allspice_client, result) -> Organization:
85    @classmethod
86    def parse_response(cls, allspice_client, result) -> "Organization":
87        api_object = super().parse_response(allspice_client, result)
88        # add "name" field to make this behave similar to users for gitea < 1.18
89        # also necessary for repository-owner when org is repo owner
90        if not hasattr(api_object, "name"):
91            Organization._add_read_property("name", result["username"], api_object)
92        return api_object
def commit(self):
102    def commit(self):
103        args = {"name": self.name}
104        self._commit(args)
def create_repo( self, repoName: str, description: str = '', private: bool = False, autoInit=True, gitignores: Optional[str] = None, license: Optional[str] = None, readme: str = 'Default', issue_labels: Optional[str] = None, default_branch='master'):
106    def create_repo(
107        self,
108        repoName: str,
109        description: str = "",
110        private: bool = False,
111        autoInit=True,
112        gitignores: Optional[str] = None,
113        license: Optional[str] = None,
114        readme: str = "Default",
115        issue_labels: Optional[str] = None,
116        default_branch="master",
117    ):
118        """Create an organization Repository
119
120        Throws:
121            AlreadyExistsException: If the Repository exists already.
122            Exception: If something else went wrong.
123        """
124        result = self.allspice_client.requests_post(
125            f"/orgs/{self.name}/repos",
126            data={
127                "name": repoName,
128                "description": description,
129                "private": private,
130                "auto_init": autoInit,
131                "gitignores": gitignores,
132                "license": license,
133                "issue_labels": issue_labels,
134                "readme": readme,
135                "default_branch": default_branch,
136            },
137        )
138        if "id" in result:
139            self.allspice_client.logger.info("Successfully created Repository %s " % result["name"])
140        else:
141            self.allspice_client.logger.error(result["message"])
142            raise Exception("Repository not created... (gitea: %s)" % result["message"])
143        return Repository.parse_response(self.allspice_client, result)

Create an organization Repository

Throws: AlreadyExistsException: If the Repository exists already. Exception: If something else went wrong.

def get_repositories(self) -> List[Repository]:
145    def get_repositories(self) -> List["Repository"]:
146        results = self.allspice_client.requests_get_paginated(
147            Organization.ORG_REPOS_REQUEST % self.username
148        )
149        return [Repository.parse_response(self.allspice_client, result) for result in results]
def get_repository(self, name) -> Repository:
151    def get_repository(self, name) -> "Repository":
152        repos = self.get_repositories()
153        for repo in repos:
154            if repo.name == name:
155                return repo
156        raise NotFoundException("Repository %s not existent in organization." % name)
def get_teams(self) -> List[Team]:
158    def get_teams(self) -> List["Team"]:
159        results = self.allspice_client.requests_get(Organization.ORG_TEAMS_REQUEST % self.username)
160        teams = [Team.parse_response(self.allspice_client, result) for result in results]
161        # organisation seems to be missing using this request, so we add org manually
162        for t in teams:
163            setattr(t, "_organization", self)
164        return teams
def get_team(self, name) -> Team:
166    def get_team(self, name) -> "Team":
167        teams = self.get_teams()
168        for team in teams:
169            if team.name == name:
170                return team
171        raise NotFoundException("Team not existent in organization.")
def create_team( self, name: str, description: str = '', permission: str = 'read', can_create_org_repo: bool = False, includes_all_repositories: bool = False, units=('repo.code', 'repo.issues', 'repo.ext_issues', 'repo.wiki', 'repo.pulls', 'repo.releases', 'repo.ext_wiki'), units_map={}) -> Team:
173    def create_team(
174        self,
175        name: str,
176        description: str = "",
177        permission: str = "read",
178        can_create_org_repo: bool = False,
179        includes_all_repositories: bool = False,
180        units=(
181            "repo.code",
182            "repo.issues",
183            "repo.ext_issues",
184            "repo.wiki",
185            "repo.pulls",
186            "repo.releases",
187            "repo.ext_wiki",
188        ),
189        units_map={},
190    ) -> "Team":
191        """Alias for AllSpice#create_team"""
192        # TODO: Move AllSpice#create_team to Organization#create_team and
193        #       deprecate AllSpice#create_team.
194        return self.allspice_client.create_team(
195            org=self,
196            name=name,
197            description=description,
198            permission=permission,
199            can_create_org_repo=can_create_org_repo,
200            includes_all_repositories=includes_all_repositories,
201            units=units,
202            units_map=units_map,
203        )

Alias for AllSpice#create_team

def get_members(self) -> List[User]:
205    def get_members(self) -> List["User"]:
206        results = self.allspice_client.requests_get(Organization.ORG_GET_MEMBERS % self.username)
207        return [User.parse_response(self.allspice_client, result) for result in results]
def is_member(self, username) -> bool:
209    def is_member(self, username) -> bool:
210        if isinstance(username, User):
211            username = username.username
212        try:
213            # returns 204 if its ok, 404 if its not
214            self.allspice_client.requests_get(
215                Organization.ORG_IS_MEMBER % (self.username, username)
216            )
217            return True
218        except Exception:
219            return False
def remove_member(self, user: User):
221    def remove_member(self, user: "User"):
222        path = f"/orgs/{self.username}/members/{user.username}"
223        self.allspice_client.requests_delete(path)
def delete(self):
225    def delete(self):
226        """Delete this Organization. Invalidates this Objects data. Also deletes all Repositories owned by the User"""
227        for repo in self.get_repositories():
228            repo.delete()
229        self.allspice_client.requests_delete(Organization.API_OBJECT.format(name=self.username))
230        self.deleted = True

Delete this Organization. Invalidates this Objects data. Also deletes all Repositories owned by the User

def get_heatmap(self) -> List[Tuple[datetime.datetime, int]]:
232    def get_heatmap(self) -> List[Tuple[datetime, int]]:
233        results = self.allspice_client.requests_get(User.USER_HEATMAP % self.username)
234        results = [
235            (datetime.fromtimestamp(result["timestamp"]), result["contributions"])
236            for result in results
237        ]
238        return results
class User(allspice.baseapiobject.ApiObject):
241class User(ApiObject):
242    active: bool
243    admin: Any
244    allow_create_organization: Any
245    allow_git_hook: Any
246    allow_import_local: Any
247    avatar_url: str
248    created: str
249    description: str
250    email: str
251    emails: List[Any]
252    followers_count: int
253    following_count: int
254    full_name: str
255    html_url: str
256    id: int
257    is_admin: bool
258    language: str
259    last_login: str
260    location: str
261    login: str
262    login_name: str
263    max_repo_creation: Any
264    must_change_password: Any
265    password: Any
266    prohibit_login: bool
267    restricted: bool
268    source_id: int
269    starred_repos_count: int
270    username: str
271    visibility: str
272    website: str
273
274    API_OBJECT = """/users/{name}"""  # <org>
275    USER_MAIL = """/user/emails?sudo=%s"""  # <name>
276    USER_PATCH = """/admin/users/%s"""  # <username>
277    ADMIN_DELETE_USER = """/admin/users/%s"""  # <username>
278    ADMIN_EDIT_USER = """/admin/users/{username}"""  # <username>
279    USER_HEATMAP = """/users/%s/heatmap"""  # <username>
280
281    def __init__(self, allspice_client):
282        super().__init__(allspice_client)
283        self._emails = []
284
285    def __eq__(self, other):
286        if not isinstance(other, User):
287            return False
288        return self.allspice_client == other.allspice_client and self.id == other.id
289
290    def __hash__(self):
291        return hash(self.allspice_client) ^ hash(self.id)
292
293    @property
294    def emails(self):
295        self.__request_emails()
296        return self._emails
297
298    @classmethod
299    def request(cls, allspice_client, name: str) -> "User":
300        api_object = cls._request(allspice_client, {"name": name})
301        return api_object
302
303    _patchable_fields: ClassVar[set[str]] = {
304        "active",
305        "admin",
306        "allow_create_organization",
307        "allow_git_hook",
308        "allow_import_local",
309        "email",
310        "full_name",
311        "location",
312        "login_name",
313        "max_repo_creation",
314        "must_change_password",
315        "password",
316        "prohibit_login",
317        "website",
318    }
319
320    def commit(self, login_name: str, source_id: int = 0):
321        """
322        Unfortunately it is necessary to require the login name
323        as well as the login source (that is not supplied when getting a user) for
324        changing a user.
325        Usually source_id is 0 and the login_name is equal to the username.
326        """
327        values = self.get_dirty_fields()
328        values.update(
329            # api-doc says that the "source_id" is necessary; works without though
330            {"login_name": login_name, "source_id": source_id}
331        )
332        args = {"username": self.username}
333        self.allspice_client.requests_patch(User.ADMIN_EDIT_USER.format(**args), data=values)
334        self._dirty_fields = {}
335
336    def create_repo(
337        self,
338        repoName: str,
339        description: str = "",
340        private: bool = False,
341        autoInit=True,
342        gitignores: Optional[str] = None,
343        license: Optional[str] = None,
344        readme: str = "Default",
345        issue_labels: Optional[str] = None,
346        default_branch="master",
347    ):
348        """Create a user Repository
349
350        Throws:
351            AlreadyExistsException: If the Repository exists already.
352            Exception: If something else went wrong.
353        """
354        result = self.allspice_client.requests_post(
355            "/user/repos",
356            data={
357                "name": repoName,
358                "description": description,
359                "private": private,
360                "auto_init": autoInit,
361                "gitignores": gitignores,
362                "license": license,
363                "issue_labels": issue_labels,
364                "readme": readme,
365                "default_branch": default_branch,
366            },
367        )
368        if "id" in result:
369            self.allspice_client.logger.info("Successfully created Repository %s " % result["name"])
370        else:
371            self.allspice_client.logger.error(result["message"])
372            raise Exception("Repository not created... (gitea: %s)" % result["message"])
373        return Repository.parse_response(self.allspice_client, result)
374
375    def get_repositories(self) -> List["Repository"]:
376        """Get all Repositories owned by this User."""
377        url = f"/users/{self.username}/repos"
378        results = self.allspice_client.requests_get_paginated(url)
379        return [Repository.parse_response(self.allspice_client, result) for result in results]
380
381    def get_orgs(self) -> List[Organization]:
382        """Get all Organizations this user is a member of."""
383        url = f"/users/{self.username}/orgs"
384        results = self.allspice_client.requests_get_paginated(url)
385        return [Organization.parse_response(self.allspice_client, result) for result in results]
386
387    def get_teams(self) -> List["Team"]:
388        url = "/user/teams"
389        results = self.allspice_client.requests_get_paginated(url, sudo=self)
390        return [Team.parse_response(self.allspice_client, result) for result in results]
391
392    def get_accessible_repos(self) -> List["Repository"]:
393        """Get all Repositories accessible by the logged in User."""
394        results = self.allspice_client.requests_get("/user/repos", sudo=self)
395        return [Repository.parse_response(self.allspice_client, result) for result in results]
396
397    def __request_emails(self):
398        result = self.allspice_client.requests_get(User.USER_MAIL % self.login)
399        # report if the adress changed by this
400        for mail in result:
401            self._emails.append(mail["email"])
402            if mail["primary"]:
403                self._email = mail["email"]
404
405    def delete(self):
406        """Deletes this User. Also deletes all Repositories he owns."""
407        self.allspice_client.requests_delete(User.ADMIN_DELETE_USER % self.username)
408        self.deleted = True
409
410    def get_heatmap(self) -> List[Tuple[datetime, int]]:
411        results = self.allspice_client.requests_get(User.USER_HEATMAP % self.username)
412        results = [
413            (datetime.fromtimestamp(result["timestamp"]), result["contributions"])
414            for result in results
415        ]
416        return results
User(allspice_client)
281    def __init__(self, allspice_client):
282        super().__init__(allspice_client)
283        self._emails = []
active: bool
admin: Any
allow_create_organization: Any
allow_git_hook: Any
allow_import_local: Any
avatar_url: str
created: str
description: str
email: str
emails
293    @property
294    def emails(self):
295        self.__request_emails()
296        return self._emails
followers_count: int
following_count: int
full_name: str
html_url: str
id: int
is_admin: bool
language: str
last_login: str
location: str
login: str
login_name: str
max_repo_creation: Any
must_change_password: Any
password: Any
prohibit_login: bool
restricted: bool
source_id: int
starred_repos_count: int
username: str
visibility: str
website: str
API_OBJECT = '/users/{name}'
USER_MAIL = '/user/emails?sudo=%s'
USER_PATCH = '/admin/users/%s'
ADMIN_DELETE_USER = '/admin/users/%s'
ADMIN_EDIT_USER = '/admin/users/{username}'
USER_HEATMAP = '/users/%s/heatmap'
@classmethod
def request(cls, allspice_client, name: str) -> User:
298    @classmethod
299    def request(cls, allspice_client, name: str) -> "User":
300        api_object = cls._request(allspice_client, {"name": name})
301        return api_object
def commit(self, login_name: str, source_id: int = 0):
320    def commit(self, login_name: str, source_id: int = 0):
321        """
322        Unfortunately it is necessary to require the login name
323        as well as the login source (that is not supplied when getting a user) for
324        changing a user.
325        Usually source_id is 0 and the login_name is equal to the username.
326        """
327        values = self.get_dirty_fields()
328        values.update(
329            # api-doc says that the "source_id" is necessary; works without though
330            {"login_name": login_name, "source_id": source_id}
331        )
332        args = {"username": self.username}
333        self.allspice_client.requests_patch(User.ADMIN_EDIT_USER.format(**args), data=values)
334        self._dirty_fields = {}

Unfortunately it is necessary to require the login name as well as the login source (that is not supplied when getting a user) for changing a user. Usually source_id is 0 and the login_name is equal to the username.

def create_repo( self, repoName: str, description: str = '', private: bool = False, autoInit=True, gitignores: Optional[str] = None, license: Optional[str] = None, readme: str = 'Default', issue_labels: Optional[str] = None, default_branch='master'):
336    def create_repo(
337        self,
338        repoName: str,
339        description: str = "",
340        private: bool = False,
341        autoInit=True,
342        gitignores: Optional[str] = None,
343        license: Optional[str] = None,
344        readme: str = "Default",
345        issue_labels: Optional[str] = None,
346        default_branch="master",
347    ):
348        """Create a user Repository
349
350        Throws:
351            AlreadyExistsException: If the Repository exists already.
352            Exception: If something else went wrong.
353        """
354        result = self.allspice_client.requests_post(
355            "/user/repos",
356            data={
357                "name": repoName,
358                "description": description,
359                "private": private,
360                "auto_init": autoInit,
361                "gitignores": gitignores,
362                "license": license,
363                "issue_labels": issue_labels,
364                "readme": readme,
365                "default_branch": default_branch,
366            },
367        )
368        if "id" in result:
369            self.allspice_client.logger.info("Successfully created Repository %s " % result["name"])
370        else:
371            self.allspice_client.logger.error(result["message"])
372            raise Exception("Repository not created... (gitea: %s)" % result["message"])
373        return Repository.parse_response(self.allspice_client, result)

Create a user Repository

Throws: AlreadyExistsException: If the Repository exists already. Exception: If something else went wrong.

def get_repositories(self) -> List[Repository]:
375    def get_repositories(self) -> List["Repository"]:
376        """Get all Repositories owned by this User."""
377        url = f"/users/{self.username}/repos"
378        results = self.allspice_client.requests_get_paginated(url)
379        return [Repository.parse_response(self.allspice_client, result) for result in results]

Get all Repositories owned by this User.

def get_orgs(self) -> List[Organization]:
381    def get_orgs(self) -> List[Organization]:
382        """Get all Organizations this user is a member of."""
383        url = f"/users/{self.username}/orgs"
384        results = self.allspice_client.requests_get_paginated(url)
385        return [Organization.parse_response(self.allspice_client, result) for result in results]

Get all Organizations this user is a member of.

def get_teams(self) -> List[Team]:
387    def get_teams(self) -> List["Team"]:
388        url = "/user/teams"
389        results = self.allspice_client.requests_get_paginated(url, sudo=self)
390        return [Team.parse_response(self.allspice_client, result) for result in results]
def get_accessible_repos(self) -> List[Repository]:
392    def get_accessible_repos(self) -> List["Repository"]:
393        """Get all Repositories accessible by the logged in User."""
394        results = self.allspice_client.requests_get("/user/repos", sudo=self)
395        return [Repository.parse_response(self.allspice_client, result) for result in results]

Get all Repositories accessible by the logged in User.

def delete(self):
405    def delete(self):
406        """Deletes this User. Also deletes all Repositories he owns."""
407        self.allspice_client.requests_delete(User.ADMIN_DELETE_USER % self.username)
408        self.deleted = True

Deletes this User. Also deletes all Repositories he owns.

def get_heatmap(self) -> List[Tuple[datetime.datetime, int]]:
410    def get_heatmap(self) -> List[Tuple[datetime, int]]:
411        results = self.allspice_client.requests_get(User.USER_HEATMAP % self.username)
412        results = [
413            (datetime.fromtimestamp(result["timestamp"]), result["contributions"])
414            for result in results
415        ]
416        return results
class Branch(allspice.baseapiobject.ReadonlyApiObject):
419class Branch(ReadonlyApiObject):
420    commit: Dict[str, Optional[Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]]]
421    effective_branch_protection_name: str
422    enable_status_check: bool
423    name: str
424    protected: bool
425    required_approvals: int
426    status_check_contexts: List[Any]
427    user_can_merge: bool
428    user_can_push: bool
429
430    API_OBJECT = """/repos/{owner}/{repo}/branches/{branch}"""
431
432    def __init__(self, allspice_client):
433        super().__init__(allspice_client)
434
435    def __eq__(self, other):
436        if not isinstance(other, Branch):
437            return False
438        return self.commit == other.commit and self.name == other.name
439
440    def __hash__(self):
441        return hash(self.commit["id"]) ^ hash(self.name)
442
443    _fields_to_parsers: ClassVar[dict] = {
444        # This is not a commit object
445        # "commit": lambda allspice_client, c: Commit.parse_response(allspice_client, c)
446    }
447
448    @classmethod
449    def request(cls, allspice_client, owner: str, repo: str, branch: str):
450        return cls._request(allspice_client, {"owner": owner, "repo": repo, "branch": branch})
Branch(allspice_client)
432    def __init__(self, allspice_client):
433        super().__init__(allspice_client)
commit: Dict[str, Union[str, Dict[str, str], Dict[str, Union[bool, str, NoneType]], NoneType]]
effective_branch_protection_name: str
enable_status_check: bool
name: str
protected: bool
required_approvals: int
status_check_contexts: List[Any]
user_can_merge: bool
user_can_push: bool
API_OBJECT = '/repos/{owner}/{repo}/branches/{branch}'
@classmethod
def request(cls, allspice_client, owner: str, repo: str, branch: str):
448    @classmethod
449    def request(cls, allspice_client, owner: str, repo: str, branch: str):
450        return cls._request(allspice_client, {"owner": owner, "repo": repo, "branch": branch})
class GitEntry(allspice.baseapiobject.ReadonlyApiObject):
453class GitEntry(ReadonlyApiObject):
454    """
455    An object representing a file or directory in the Git tree.
456    """
457
458    mode: str
459    path: str
460    sha: str
461    size: int
462    type: str
463    url: str
464
465    def __init__(self, allspice_client):
466        super().__init__(allspice_client)
467
468    def __eq__(self, other) -> bool:
469        if not isinstance(other, GitEntry):
470            return False
471        return self.sha == other.sha
472
473    def __hash__(self) -> int:
474        return hash(self.sha)

An object representing a file or directory in the Git tree.

GitEntry(allspice_client)
465    def __init__(self, allspice_client):
466        super().__init__(allspice_client)
mode: str
path: str
sha: str
size: int
type: str
url: str
class Repository(allspice.baseapiobject.ApiObject):
 477class Repository(ApiObject):
 478    allow_fast_forward_only_merge: bool
 479    allow_manual_merge: Any
 480    allow_merge_commits: bool
 481    allow_rebase: bool
 482    allow_rebase_explicit: bool
 483    allow_rebase_update: bool
 484    allow_squash_merge: bool
 485    archived: bool
 486    archived_at: str
 487    autodetect_manual_merge: Any
 488    avatar_url: str
 489    clone_url: str
 490    created_at: str
 491    default_allow_maintainer_edit: bool
 492    default_branch: str
 493    default_delete_branch_after_merge: bool
 494    default_merge_style: str
 495    description: str
 496    empty: bool
 497    enable_prune: Any
 498    external_tracker: Any
 499    external_wiki: Any
 500    fork: bool
 501    forks_count: int
 502    full_name: str
 503    has_actions: bool
 504    has_issues: bool
 505    has_packages: bool
 506    has_projects: bool
 507    has_pull_requests: bool
 508    has_releases: bool
 509    has_wiki: bool
 510    html_url: str
 511    id: int
 512    ignore_whitespace_conflicts: bool
 513    internal: bool
 514    internal_tracker: Dict[str, bool]
 515    language: str
 516    languages_url: str
 517    link: str
 518    mirror: bool
 519    mirror_interval: str
 520    mirror_updated: str
 521    name: str
 522    object_format_name: str
 523    open_issues_count: int
 524    open_pr_counter: int
 525    original_url: str
 526    owner: Union["User", "Organization"]
 527    parent: Any
 528    permissions: Dict[str, bool]
 529    private: bool
 530    projects_mode: str
 531    release_counter: int
 532    repo_transfer: Any
 533    size: int
 534    ssh_url: str
 535    stars_count: int
 536    template: bool
 537    updated_at: datetime
 538    url: str
 539    watchers_count: int
 540    website: str
 541
 542    API_OBJECT = """/repos/{owner}/{name}"""  # <owner>, <reponame>
 543    REPO_IS_COLLABORATOR = """/repos/%s/%s/collaborators/%s"""  # <owner>, <reponame>, <username>
 544    REPO_SEARCH = """/repos/search/"""
 545    REPO_BRANCHES = """/repos/%s/%s/branches"""  # <owner>, <reponame>
 546    REPO_BRANCH = """/repos/{owner}/{repo}/branches/{branch}"""
 547    REPO_ISSUES = """/repos/{owner}/{repo}/issues"""  # <owner, reponame>
 548    REPO_DESIGN_REVIEWS = """/repos/{owner}/{repo}/pulls"""
 549    REPO_DELETE = """/repos/%s/%s"""  # <owner>, <reponame>
 550    REPO_TIMES = """/repos/%s/%s/times"""  # <owner>, <reponame>
 551    REPO_USER_TIME = """/repos/%s/%s/times/%s"""  # <owner>, <reponame>, <username>
 552    REPO_COMMITS = "/repos/%s/%s/commits"  # <owner>, <reponame>
 553    REPO_TRANSFER = "/repos/{owner}/{repo}/transfer"
 554    REPO_MILESTONES = """/repos/{owner}/{repo}/milestones"""
 555    REPO_GET_ARCHIVE = "/repos/{owner}/{repo}/archive/{ref}.{format}"
 556    REPO_GET_ALLSPICE_JSON = "/repos/{owner}/{repo}/allspice_generated/json/{content}"
 557    REPO_GET_ALLSPICE_SVG = "/repos/{owner}/{repo}/allspice_generated/svg/{content}"
 558    REPO_GET_TOPICS = "/repos/{owner}/{repo}/topics"
 559    REPO_ADD_TOPIC = "/repos/{owner}/{repo}/topics/{topic}"
 560    REPO_GET_RELEASES = "/repos/{owner}/{repo}/releases"
 561    REPO_GET_LATEST_RELEASE = "/repos/{owner}/{repo}/releases/latest"
 562    REPO_GET_RELEASE_BY_TAG = "/repos/{owner}/{repo}/releases/tags/{tag}"
 563    REPO_GET_COMMIT_STATUS = "/repos/{owner}/{repo}/statuses/{sha}"
 564    REPO_GET_RAW_FILE = "/repos/{owner}/{repo}/raw/{path}"
 565    REPO_GET_TREE = "/repos/{owner}/{repo}/git/trees/{ref}"
 566
 567    class ArchiveFormat(Enum):
 568        """
 569        Archive formats for Repository.get_archive
 570        """
 571
 572        TAR = "tar.gz"
 573        ZIP = "zip"
 574
 575    class CommitStatusSort(Enum):
 576        """
 577        Sort order for Repository.get_commit_status
 578        """
 579
 580        OLDEST = "oldest"
 581        RECENT_UPDATE = "recentupdate"
 582        LEAST_UPDATE = "leastupdate"
 583        LEAST_INDEX = "leastindex"
 584        HIGHEST_INDEX = "highestindex"
 585
 586    def __init__(self, allspice_client):
 587        super().__init__(allspice_client)
 588
 589    def __eq__(self, other):
 590        if not isinstance(other, Repository):
 591            return False
 592        return self.owner == other.owner and self.name == other.name
 593
 594    def __hash__(self):
 595        return hash(self.owner) ^ hash(self.name)
 596
 597    _fields_to_parsers: ClassVar[dict] = {
 598        # dont know how to tell apart user and org as owner except form email being empty.
 599        "owner": lambda allspice_client, r: (
 600            Organization.parse_response(allspice_client, r)
 601            if r["email"] == ""
 602            else User.parse_response(allspice_client, r)
 603        ),
 604        "updated_at": lambda _, t: Util.convert_time(t),
 605    }
 606
 607    @classmethod
 608    def request(
 609        cls,
 610        allspice_client,
 611        owner: str,
 612        name: str,
 613    ) -> Repository:
 614        return cls._request(allspice_client, {"owner": owner, "name": name})
 615
 616    @classmethod
 617    def search(
 618        cls,
 619        allspice_client,
 620        query: Optional[str] = None,
 621        topic: bool = False,
 622        include_description: bool = False,
 623        user: Optional[User] = None,
 624        owner_to_prioritize: Union[User, Organization, None] = None,
 625    ) -> list[Repository]:
 626        """
 627        Search for repositories.
 628
 629        See https://hub.allspice.io/api/swagger#/repository/repoSearch
 630
 631        :param query: The query string to search for
 632        :param topic: If true, the query string will only be matched against the
 633            repository's topic.
 634        :param include_description: If true, the query string will be matched
 635            against the repository's description as well.
 636        :param user: If specified, only repositories that this user owns or
 637            contributes to will be searched.
 638        :param owner_to_prioritize: If specified, repositories owned by the
 639            given entity will be prioritized in the search.
 640        :returns: All repositories matching the query. If there are many
 641            repositories matching this query, this may take some time.
 642        """
 643
 644        params = {}
 645
 646        if query is not None:
 647            params["q"] = query
 648        if topic:
 649            params["topic"] = topic
 650        if include_description:
 651            params["include_description"] = include_description
 652        if user is not None:
 653            params["user"] = user.id
 654        if owner_to_prioritize is not None:
 655            params["owner_to_prioritize"] = owner_to_prioritize.id
 656
 657        responses = allspice_client.requests_get_paginated(cls.REPO_SEARCH, params=params)
 658
 659        return [Repository.parse_response(allspice_client, response) for response in responses]
 660
 661    _patchable_fields: ClassVar[set[str]] = {
 662        "allow_manual_merge",
 663        "allow_merge_commits",
 664        "allow_rebase",
 665        "allow_rebase_explicit",
 666        "allow_rebase_update",
 667        "allow_squash_merge",
 668        "archived",
 669        "autodetect_manual_merge",
 670        "default_branch",
 671        "default_delete_branch_after_merge",
 672        "default_merge_style",
 673        "description",
 674        "enable_prune",
 675        "external_tracker",
 676        "external_wiki",
 677        "has_issues",
 678        "has_projects",
 679        "has_pull_requests",
 680        "has_wiki",
 681        "ignore_whitespace_conflicts",
 682        "internal_tracker",
 683        "mirror_interval",
 684        "name",
 685        "private",
 686        "template",
 687        "website",
 688    }
 689
 690    def commit(self):
 691        args = {"owner": self.owner.username, "name": self.name}
 692        self._commit(args)
 693
 694    def get_branches(self) -> List["Branch"]:
 695        """Get all the Branches of this Repository."""
 696
 697        results = self.allspice_client.requests_get_paginated(
 698            Repository.REPO_BRANCHES % (self.owner.username, self.name)
 699        )
 700        return [Branch.parse_response(self.allspice_client, result) for result in results]
 701
 702    def get_branch(self, name: str) -> "Branch":
 703        """Get a specific Branch of this Repository."""
 704        result = self.allspice_client.requests_get(
 705            Repository.REPO_BRANCH.format(owner=self.owner.username, repo=self.name, branch=name)
 706        )
 707        return Branch.parse_response(self.allspice_client, result)
 708
 709    def add_branch(self, create_from: Ref, newname: str) -> "Branch":
 710        """Add a branch to the repository"""
 711        # Note: will only work with gitea 1.13 or higher!
 712
 713        ref_name = Util.data_params_for_ref(create_from)
 714        if "ref" not in ref_name:
 715            raise ValueError("create_from must be a Branch, Commit or string")
 716        ref_name = ref_name["ref"]
 717
 718        data = {"new_branch_name": newname, "old_ref_name": ref_name}
 719        result = self.allspice_client.requests_post(
 720            Repository.REPO_BRANCHES % (self.owner.username, self.name), data=data
 721        )
 722        return Branch.parse_response(self.allspice_client, result)
 723
 724    def get_issues(
 725        self,
 726        state: Literal["open", "closed", "all"] = "all",
 727        search_query: Optional[str] = None,
 728        labels: Optional[List[str]] = None,
 729        milestones: Optional[List[Union[Milestone, str]]] = None,
 730        assignee: Optional[Union[User, str]] = None,
 731        since: Optional[datetime] = None,
 732        before: Optional[datetime] = None,
 733    ) -> List["Issue"]:
 734        """
 735        Get all Issues of this Repository (open and closed)
 736
 737        https://hub.allspice.io/api/swagger#/repository/repoListIssues
 738
 739        All params of this method are optional filters. If you don't specify a filter, it
 740        will not be applied.
 741
 742        :param state: The state of the Issues to get. If None, all Issues are returned.
 743        :param search_query: Filter issues by text. This is equivalent to searching for
 744                             `search_query` in the Issues on the web interface.
 745        :param labels: Filter issues by labels.
 746        :param milestones: Filter issues by milestones.
 747        :param assignee: Filter issues by the assigned user.
 748        :param since: Filter issues by the date they were created.
 749        :param before: Filter issues by the date they were created.
 750        :return: A list of Issues.
 751        """
 752
 753        data = {
 754            "state": state,
 755        }
 756        if search_query:
 757            data["q"] = search_query
 758        if labels:
 759            data["labels"] = ",".join(labels)
 760        if milestones:
 761            data["milestone"] = ",".join(
 762                [
 763                    milestone.name if isinstance(milestone, Milestone) else milestone
 764                    for milestone in milestones
 765                ]
 766            )
 767        if assignee:
 768            if isinstance(assignee, User):
 769                data["assignee"] = assignee.username
 770            else:
 771                data["assignee"] = assignee
 772        if since:
 773            data["since"] = Util.format_time(since)
 774        if before:
 775            data["before"] = Util.format_time(before)
 776
 777        results = self.allspice_client.requests_get_paginated(
 778            Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name),
 779            params=data,
 780        )
 781
 782        issues = []
 783        for result in results:
 784            issue = Issue.parse_response(self.allspice_client, result)
 785            # See Issue.request
 786            setattr(issue, "_repository", self)
 787            # This is mostly for compatibility with an older implementation
 788            Issue._add_read_property("repo", self, issue)
 789            issues.append(issue)
 790
 791        return issues
 792
 793    def get_design_reviews(
 794        self,
 795        state: Literal["open", "closed", "all"] = "all",
 796        milestone: Optional[Union[Milestone, str]] = None,
 797        labels: Optional[List[str]] = None,
 798    ) -> List["DesignReview"]:
 799        """
 800        Get all Design Reviews of this Repository.
 801
 802        https://hub.allspice.io/api/swagger#/repository/repoListPullRequests
 803
 804        :param state: The state of the Design Reviews to get. If None, all Design Reviews
 805                      are returned.
 806        :param milestone: The milestone of the Design Reviews to get.
 807        :param labels: A list of label IDs to filter DRs by.
 808        :return: A list of Design Reviews.
 809        """
 810
 811        params = {
 812            "state": state,
 813        }
 814        if milestone:
 815            if isinstance(milestone, Milestone):
 816                params["milestone"] = milestone.name
 817            else:
 818                params["milestone"] = milestone
 819        if labels:
 820            params["labels"] = ",".join(labels)
 821
 822        results = self.allspice_client.requests_get_paginated(
 823            self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name),
 824            params=params,
 825        )
 826        return [DesignReview.parse_response(self.allspice_client, result) for result in results]
 827
 828    def get_commits(
 829        self,
 830        sha: Optional[str] = None,
 831        path: Optional[str] = None,
 832        stat: bool = True,
 833    ) -> List["Commit"]:
 834        """
 835        Get all the Commits of this Repository.
 836
 837        https://hub.allspice.io/api/swagger#/repository/repoGetAllCommits
 838
 839        :param sha: The SHA of the commit to start listing commits from.
 840        :param path: filepath of a file/dir.
 841        :param stat: Include the number of additions and deletions in the response.
 842                     Disable for speedup.
 843        :return: A list of Commits.
 844        """
 845
 846        data = {}
 847        if sha:
 848            data["sha"] = sha
 849        if path:
 850            data["path"] = path
 851        if not stat:
 852            data["stat"] = False
 853
 854        try:
 855            results = self.allspice_client.requests_get_paginated(
 856                Repository.REPO_COMMITS % (self.owner.username, self.name),
 857                params=data,
 858            )
 859        except ConflictException as err:
 860            logging.warning(err)
 861            logging.warning("Repository %s/%s is Empty" % (self.owner.username, self.name))
 862            results = []
 863        return [Commit.parse_response(self.allspice_client, result) for result in results]
 864
 865    def get_issues_state(self, state) -> List["Issue"]:
 866        """
 867        DEPRECATED: Use get_issues() instead.
 868
 869        Get issues of state Issue.open or Issue.closed of a repository.
 870        """
 871
 872        assert state in [Issue.OPENED, Issue.CLOSED]
 873        issues = []
 874        data = {"state": state}
 875        results = self.allspice_client.requests_get_paginated(
 876            Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name),
 877            params=data,
 878        )
 879        for result in results:
 880            issue = Issue.parse_response(self.allspice_client, result)
 881            # adding data not contained in the issue response
 882            # See Issue.request()
 883            setattr(issue, "_repository", self)
 884            Issue._add_read_property("repo", self, issue)
 885            Issue._add_read_property("owner", self.owner, issue)
 886            issues.append(issue)
 887        return issues
 888
 889    def get_times(self):
 890        results = self.allspice_client.requests_get(
 891            Repository.REPO_TIMES % (self.owner.username, self.name)
 892        )
 893        return results
 894
 895    def get_user_time(self, username) -> float:
 896        if isinstance(username, User):
 897            username = username.username
 898        results = self.allspice_client.requests_get(
 899            Repository.REPO_USER_TIME % (self.owner.username, self.name, username)
 900        )
 901        time = sum(r["time"] for r in results)
 902        return time
 903
 904    def get_full_name(self) -> str:
 905        return self.owner.username + "/" + self.name
 906
 907    def create_issue(self, title, assignees=frozenset(), description="") -> ApiObject:
 908        data = {
 909            "assignees": assignees,
 910            "body": description,
 911            "closed": False,
 912            "title": title,
 913        }
 914        result = self.allspice_client.requests_post(
 915            Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name),
 916            data=data,
 917        )
 918
 919        issue = Issue.parse_response(self.allspice_client, result)
 920        setattr(issue, "_repository", self)
 921        Issue._add_read_property("repo", self, issue)
 922        return issue
 923
 924    def create_design_review(
 925        self,
 926        title: str,
 927        head: Union[Branch, str],
 928        base: Union[Branch, str],
 929        assignees: Optional[Set[Union[User, str]]] = None,
 930        body: Optional[str] = None,
 931        due_date: Optional[datetime] = None,
 932        milestone: Optional["Milestone"] = None,
 933    ) -> "DesignReview":
 934        """
 935        Create a new Design Review.
 936
 937        See https://hub.allspice.io/api/swagger#/repository/repoCreatePullRequest
 938
 939        :param title: Title of the Design Review
 940        :param head: Branch or name of the branch to merge into the base branch
 941        :param base: Branch or name of the branch to merge into
 942        :param assignees: Optional. A list of users to assign this review. List can be of
 943                          User objects or of usernames.
 944        :param body: An Optional Description for the Design Review.
 945        :param due_date: An Optional Due date for the Design Review.
 946        :param milestone: An Optional Milestone for the Design Review
 947        :return: The created Design Review
 948        """
 949
 950        data: dict[str, Any] = {
 951            "title": title,
 952        }
 953
 954        if isinstance(head, Branch):
 955            data["head"] = head.name
 956        else:
 957            data["head"] = head
 958        if isinstance(base, Branch):
 959            data["base"] = base.name
 960        else:
 961            data["base"] = base
 962        if assignees:
 963            data["assignees"] = [a.username if isinstance(a, User) else a for a in assignees]
 964        if body:
 965            data["body"] = body
 966        if due_date:
 967            data["due_date"] = Util.format_time(due_date)
 968        if milestone:
 969            data["milestone"] = milestone.id
 970
 971        result = self.allspice_client.requests_post(
 972            self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name),
 973            data=data,
 974        )
 975
 976        return DesignReview.parse_response(self.allspice_client, result)
 977
 978    def create_milestone(
 979        self,
 980        title: str,
 981        description: str,
 982        due_date: Optional[str] = None,
 983        state: str = "open",
 984    ) -> "Milestone":
 985        url = Repository.REPO_MILESTONES.format(owner=self.owner.username, repo=self.name)
 986        data = {"title": title, "description": description, "state": state}
 987        if due_date:
 988            data["due_date"] = due_date
 989        result = self.allspice_client.requests_post(url, data=data)
 990        return Milestone.parse_response(self.allspice_client, result)
 991
 992    def create_gitea_hook(self, hook_url: str, events: List[str]):
 993        url = f"/repos/{self.owner.username}/{self.name}/hooks"
 994        data = {
 995            "type": "gitea",
 996            "config": {"content_type": "json", "url": hook_url},
 997            "events": events,
 998            "active": True,
 999        }
1000        return self.allspice_client.requests_post(url, data=data)
1001
1002    def list_hooks(self):
1003        url = f"/repos/{self.owner.username}/{self.name}/hooks"
1004        return self.allspice_client.requests_get(url)
1005
1006    def delete_hook(self, id: str):
1007        url = f"/repos/{self.owner.username}/{self.name}/hooks/{id}"
1008        self.allspice_client.requests_delete(url)
1009
1010    def is_collaborator(self, username) -> bool:
1011        if isinstance(username, User):
1012            username = username.username
1013        try:
1014            # returns 204 if its ok, 404 if its not
1015            self.allspice_client.requests_get(
1016                Repository.REPO_IS_COLLABORATOR % (self.owner.username, self.name, username)
1017            )
1018            return True
1019        except Exception:
1020            return False
1021
1022    def get_users_with_access(self) -> Sequence[User]:
1023        url = f"/repos/{self.owner.username}/{self.name}/collaborators"
1024        response = self.allspice_client.requests_get(url)
1025        collabs = [User.parse_response(self.allspice_client, user) for user in response]
1026        if isinstance(self.owner, User):
1027            return [*collabs, self.owner]
1028        else:
1029            # owner must be org
1030            teams = self.owner.get_teams()
1031            for team in teams:
1032                team_repos = team.get_repos()
1033                if self.name in [n.name for n in team_repos]:
1034                    collabs += team.get_members()
1035            return collabs
1036
1037    def remove_collaborator(self, user_name: str):
1038        url = f"/repos/{self.owner.username}/{self.name}/collaborators/{user_name}"
1039        self.allspice_client.requests_delete(url)
1040
1041    def transfer_ownership(
1042        self,
1043        new_owner: Union[User, Organization],
1044        new_teams: Set[Team] | FrozenSet[Team] = frozenset(),
1045    ):
1046        url = Repository.REPO_TRANSFER.format(owner=self.owner.username, repo=self.name)
1047        data: dict[str, Any] = {"new_owner": new_owner.username}
1048        if isinstance(new_owner, Organization):
1049            new_team_ids = [team.id for team in new_teams if team in new_owner.get_teams()]
1050            data["team_ids"] = new_team_ids
1051        self.allspice_client.requests_post(url, data=data)
1052        # TODO: make sure this instance is either updated or discarded
1053
1054    def get_git_content(
1055        self,
1056        ref: Optional["Ref"] = None,
1057        commit: "Optional[Commit]" = None,
1058    ) -> List[Content]:
1059        """
1060        Get the metadata for all files in the root directory.
1061
1062        https://hub.allspice.io/api/swagger#/repository/repoGetContentsList
1063
1064        :param ref: branch or commit to get content from
1065        :param commit: commit to get content from (deprecated)
1066        """
1067        url = f"/repos/{self.owner.username}/{self.name}/contents"
1068        data = Util.data_params_for_ref(ref or commit)
1069
1070        result = [
1071            Content.parse_response(self.allspice_client, f)
1072            for f in self.allspice_client.requests_get(url, data)
1073        ]
1074        return result
1075
1076    def get_tree(self, ref: Optional[Ref] = None, recursive: bool = False) -> List[GitEntry]:
1077        """
1078        Get the repository's tree on a given ref.
1079
1080        By default, this will only return the top-level entries in the tree. If you want
1081        to get the entire tree, set `recursive` to True.
1082
1083        :param ref: The ref to get the tree from. If not provided, the default branch is used.
1084        :param recursive: Whether to get the entire tree or just the top-level entries.
1085        """
1086
1087        ref = Util.data_params_for_ref(ref).get("ref", self.default_branch)
1088        url = self.REPO_GET_TREE.format(owner=self.owner.username, repo=self.name, ref=ref)
1089        params = {"recursive": recursive}
1090        results = self.allspice_client.requests_get_paginated(url, params=params)
1091        return [GitEntry.parse_response(self.allspice_client, result) for result in results]
1092
1093    def get_file_content(
1094        self,
1095        content: Content,
1096        ref: Optional[Ref] = None,
1097        commit: Optional[Commit] = None,
1098    ) -> Union[str, List["Content"]]:
1099        """https://hub.allspice.io/api/swagger#/repository/repoGetContents"""
1100        url = f"/repos/{self.owner.username}/{self.name}/contents/{content.path}"
1101        data = Util.data_params_for_ref(ref or commit)
1102
1103        if content.type == Content.FILE:
1104            return self.allspice_client.requests_get(url, data)["content"]
1105        else:
1106            return [
1107                Content.parse_response(self.allspice_client, f)
1108                for f in self.allspice_client.requests_get(url, data)
1109            ]
1110
1111    def get_raw_file(
1112        self,
1113        file_path: str,
1114        ref: Optional[Ref] = None,
1115    ) -> bytes:
1116        """
1117        Get the raw, binary data of a single file.
1118
1119        Note 1: if the file you are requesting is a text file, you might want to
1120        use .decode() on the result to get a string. For example:
1121
1122            content = repo.get_raw_file("file.txt").decode("utf-8")
1123
1124        Note 2: this method will store the entire file in memory. If you want
1125        to download a large file, you might want to use `download_to_file`
1126        instead.
1127
1128        See https://hub.allspice.io/api/swagger#/repository/repoGetRawFile
1129
1130        :param file_path: The path to the file to get.
1131        :param ref: The branch or commit to get the file from.  If not provided,
1132            the default branch is used.
1133        """
1134
1135        url = self.REPO_GET_RAW_FILE.format(
1136            owner=self.owner.username,
1137            repo=self.name,
1138            path=file_path,
1139        )
1140        params = Util.data_params_for_ref(ref)
1141        return self.allspice_client.requests_get_raw(url, params=params)
1142
1143    def download_to_file(
1144        self,
1145        file_path: str,
1146        io: IO,
1147        ref: Optional[Ref] = None,
1148    ) -> None:
1149        """
1150        Download the binary data of a file to a file-like object.
1151
1152        Example:
1153
1154            with open("schematic.DSN", "wb") as f:
1155                Repository.download_to_file("Schematics/my_schematic.DSN", f)
1156
1157        :param file_path: The path to the file in the repository from the root
1158            of the repository.
1159        :param io: The file-like object to write the data to.
1160        """
1161
1162        url = self.allspice_client._AllSpice__get_url(
1163            self.REPO_GET_RAW_FILE.format(
1164                owner=self.owner.username,
1165                repo=self.name,
1166                path=file_path,
1167            )
1168        )
1169        params = Util.data_params_for_ref(ref)
1170        response = self.allspice_client.requests.get(
1171            url,
1172            params=params,
1173            headers=self.allspice_client.headers,
1174            stream=True,
1175        )
1176
1177        for chunk in response.iter_content(chunk_size=4096):
1178            if chunk:
1179                io.write(chunk)
1180
1181    def get_generated_json(self, content: Union[Content, str], ref: Optional[Ref] = None) -> dict:
1182        """
1183        Get the json blob for a cad file if it exists, otherwise enqueue
1184        a new job and return a 503 status.
1185
1186        WARNING: This is still experimental and not recommended for critical
1187        applications. The structure and content of the returned dictionary can
1188        change at any time.
1189
1190        See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON
1191        """
1192
1193        if isinstance(content, Content):
1194            content = content.path
1195
1196        url = self.REPO_GET_ALLSPICE_JSON.format(
1197            owner=self.owner.username,
1198            repo=self.name,
1199            content=content,
1200        )
1201        data = Util.data_params_for_ref(ref)
1202        return self.allspice_client.requests_get(url, data)
1203
1204    def get_generated_svg(self, content: Union[Content, str], ref: Optional[Ref] = None) -> bytes:
1205        """
1206        Get the svg blob for a cad file if it exists, otherwise enqueue
1207        a new job and return a 503 status.
1208
1209        WARNING: This is still experimental and not yet recommended for
1210        critical applications. The content of the returned svg can change
1211        at any time.
1212
1213        See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG
1214        """
1215
1216        if isinstance(content, Content):
1217            content = content.path
1218
1219        url = self.REPO_GET_ALLSPICE_SVG.format(
1220            owner=self.owner.username,
1221            repo=self.name,
1222            content=content,
1223        )
1224        data = Util.data_params_for_ref(ref)
1225        return self.allspice_client.requests_get_raw(url, data)
1226
1227    def create_file(self, file_path: str, content: str, data: Optional[dict] = None):
1228        """https://hub.allspice.io/api/swagger#/repository/repoCreateFile"""
1229        if not data:
1230            data = {}
1231        url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}"
1232        data.update({"content": content})
1233        return self.allspice_client.requests_post(url, data)
1234
1235    def change_file(self, file_path: str, file_sha: str, content: str, data: Optional[dict] = None):
1236        """https://hub.allspice.io/api/swagger#/repository/repoCreateFile"""
1237        if not data:
1238            data = {}
1239        url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}"
1240        data.update({"sha": file_sha, "content": content})
1241        return self.allspice_client.requests_put(url, data)
1242
1243    def delete_file(self, file_path: str, file_sha: str, data: Optional[dict] = None):
1244        """https://hub.allspice.io/api/swagger#/repository/repoDeleteFile"""
1245        if not data:
1246            data = {}
1247        url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}"
1248        data.update({"sha": file_sha})
1249        return self.allspice_client.requests_delete(url, data)
1250
1251    def get_archive(
1252        self,
1253        ref: Ref = "main",
1254        archive_format: ArchiveFormat = ArchiveFormat.ZIP,
1255    ) -> bytes:
1256        """
1257        Download all the files in a specific ref of a repository as a zip or tarball
1258        archive.
1259
1260        https://hub.allspice.io/api/swagger#/repository/repoGetArchive
1261
1262        :param ref: branch or commit to get content from, defaults to the "main" branch
1263        :param archive_format: zip or tar, defaults to zip
1264        """
1265
1266        ref_string = Util.data_params_for_ref(ref)["ref"]
1267        url = self.REPO_GET_ARCHIVE.format(
1268            owner=self.owner.username,
1269            repo=self.name,
1270            ref=ref_string,
1271            format=archive_format.value,
1272        )
1273        return self.allspice_client.requests_get_raw(url)
1274
1275    def get_topics(self) -> list[str]:
1276        """
1277        Gets the list of topics on this repository.
1278
1279        See http://localhost:3000/api/swagger#/repository/repoListTopics
1280        """
1281
1282        url = self.REPO_GET_TOPICS.format(
1283            owner=self.owner.username,
1284            repo=self.name,
1285        )
1286        return self.allspice_client.requests_get(url)["topics"]
1287
1288    def add_topic(self, topic: str):
1289        """
1290        Adds a topic to the repository.
1291
1292        See https://hub.allspice.io/api/swagger#/repository/repoAddTopic
1293
1294        :param topic: The topic to add. Topic names must consist only of
1295            lowercase letters, numnbers and dashes (-), and cannot start with
1296            dashes. Topic names also must be under 35 characters long.
1297        """
1298
1299        url = self.REPO_ADD_TOPIC.format(owner=self.owner.username, repo=self.name, topic=topic)
1300        self.allspice_client.requests_put(url)
1301
1302    def create_release(
1303        self,
1304        tag_name: str,
1305        name: Optional[str] = None,
1306        body: Optional[str] = None,
1307        draft: bool = False,
1308    ):
1309        """
1310        Create a release for this repository. The release will be created for
1311        the tag with the given name. If there is no tag with this name, create
1312        the tag first.
1313
1314        See https://hub.allspice.io/api/swagger#/repository/repoCreateRelease
1315        """
1316
1317        url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name)
1318        data = {
1319            "tag_name": tag_name,
1320            "draft": draft,
1321        }
1322        if name is not None:
1323            data["name"] = name
1324        if body is not None:
1325            data["body"] = body
1326        response = self.allspice_client.requests_post(url, data)
1327        return Release.parse_response(self.allspice_client, response, self)
1328
1329    def get_releases(
1330        self, draft: Optional[bool] = None, pre_release: Optional[bool] = None
1331    ) -> List[Release]:
1332        """
1333        Get the list of releases for this repository.
1334
1335        See https://hub.allspice.io/api/swagger#/repository/repoListReleases
1336        """
1337
1338        data = {}
1339
1340        if draft is not None:
1341            data["draft"] = draft
1342        if pre_release is not None:
1343            data["pre-release"] = pre_release
1344
1345        url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name)
1346        responses = self.allspice_client.requests_get_paginated(url, params=data)
1347
1348        return [
1349            Release.parse_response(self.allspice_client, response, self) for response in responses
1350        ]
1351
1352    def get_latest_release(self) -> Release:
1353        """
1354        Get the latest release for this repository.
1355
1356        See https://hub.allspice.io/api/swagger#/repository/repoGetLatestRelease
1357        """
1358
1359        url = self.REPO_GET_LATEST_RELEASE.format(owner=self.owner.username, repo=self.name)
1360        response = self.allspice_client.requests_get(url)
1361        release = Release.parse_response(self.allspice_client, response, self)
1362        return release
1363
1364    def get_release_by_tag(self, tag: str) -> Release:
1365        """
1366        Get a release by its tag.
1367
1368        See https://hub.allspice.io/api/swagger#/repository/repoGetReleaseByTag
1369        """
1370
1371        url = self.REPO_GET_RELEASE_BY_TAG.format(
1372            owner=self.owner.username, repo=self.name, tag=tag
1373        )
1374        response = self.allspice_client.requests_get(url)
1375        release = Release.parse_response(self.allspice_client, response, self)
1376        return release
1377
1378    def get_commit_statuses(
1379        self,
1380        commit: Union[str, Commit],
1381        sort: Optional[CommitStatusSort] = None,
1382        state: Optional[CommitStatusState] = None,
1383    ) -> List[CommitStatus]:
1384        """
1385        Get a list of statuses for a commit.
1386
1387        This is roughly equivalent to the Commit.get_statuses method, but this
1388        method allows you to sort and filter commits and is more convenient if
1389        you have a commit SHA and don't need to get the commit itself.
1390
1391        See https://hub.allspice.io/api/swagger#/repository/repoListStatuses
1392        """
1393
1394        if isinstance(commit, Commit):
1395            commit = commit.sha
1396
1397        params = {}
1398        if sort is not None:
1399            params["sort"] = sort.value
1400        if state is not None:
1401            params["state"] = state.value
1402
1403        url = self.REPO_GET_COMMIT_STATUS.format(
1404            owner=self.owner.username, repo=self.name, sha=commit
1405        )
1406        response = self.allspice_client.requests_get_paginated(url, params=params)
1407        return [CommitStatus.parse_response(self.allspice_client, status) for status in response]
1408
1409    def create_commit_status(
1410        self,
1411        commit: Union[str, Commit],
1412        context: Optional[str] = None,
1413        description: Optional[str] = None,
1414        state: Optional[CommitStatusState] = None,
1415        target_url: Optional[str] = None,
1416    ) -> CommitStatus:
1417        """
1418        Create a status on a commit.
1419
1420        See https://hub.allspice.io/api/swagger#/repository/repoCreateStatus
1421        """
1422
1423        if isinstance(commit, Commit):
1424            commit = commit.sha
1425
1426        data = {}
1427        if context is not None:
1428            data["context"] = context
1429        if description is not None:
1430            data["description"] = description
1431        if state is not None:
1432            data["state"] = state.value
1433        if target_url is not None:
1434            data["target_url"] = target_url
1435
1436        url = self.REPO_GET_COMMIT_STATUS.format(
1437            owner=self.owner.username, repo=self.name, sha=commit
1438        )
1439        response = self.allspice_client.requests_post(url, data=data)
1440        return CommitStatus.parse_response(self.allspice_client, response)
1441
1442    def delete(self):
1443        self.allspice_client.requests_delete(
1444            Repository.REPO_DELETE % (self.owner.username, self.name)
1445        )
1446        self.deleted = True
Repository(allspice_client)
586    def __init__(self, allspice_client):
587        super().__init__(allspice_client)
allow_fast_forward_only_merge: bool
allow_manual_merge: Any
allow_merge_commits: bool
allow_rebase: bool
allow_rebase_explicit: bool
allow_rebase_update: bool
allow_squash_merge: bool
archived: bool
archived_at: str
autodetect_manual_merge: Any
avatar_url: str
clone_url: str
created_at: str
default_allow_maintainer_edit: bool
default_branch: str
default_delete_branch_after_merge: bool
default_merge_style: str
description: str
empty: bool
enable_prune: Any
external_tracker: Any
external_wiki: Any
fork: bool
forks_count: int
full_name: str
has_actions: bool
has_issues: bool
has_packages: bool
has_projects: bool
has_pull_requests: bool
has_releases: bool
has_wiki: bool
html_url: str
id: int
ignore_whitespace_conflicts: bool
internal: bool
internal_tracker: Dict[str, bool]
language: str
languages_url: str
mirror: bool
mirror_interval: str
mirror_updated: str
name: str
object_format_name: str
open_issues_count: int
open_pr_counter: int
original_url: str
owner: Union[User, Organization]
parent: Any
permissions: Dict[str, bool]
private: bool
projects_mode: str
release_counter: int
repo_transfer: Any
size: int
ssh_url: str
stars_count: int
template: bool
updated_at: datetime.datetime
url: str
watchers_count: int
website: str
API_OBJECT = '/repos/{owner}/{name}'
REPO_IS_COLLABORATOR = '/repos/%s/%s/collaborators/%s'
REPO_BRANCHES = '/repos/%s/%s/branches'
REPO_BRANCH = '/repos/{owner}/{repo}/branches/{branch}'
REPO_ISSUES = '/repos/{owner}/{repo}/issues'
REPO_DESIGN_REVIEWS = '/repos/{owner}/{repo}/pulls'
REPO_DELETE = '/repos/%s/%s'
REPO_TIMES = '/repos/%s/%s/times'
REPO_USER_TIME = '/repos/%s/%s/times/%s'
REPO_COMMITS = '/repos/%s/%s/commits'
REPO_TRANSFER = '/repos/{owner}/{repo}/transfer'
REPO_MILESTONES = '/repos/{owner}/{repo}/milestones'
REPO_GET_ARCHIVE = '/repos/{owner}/{repo}/archive/{ref}.{format}'
REPO_GET_ALLSPICE_JSON = '/repos/{owner}/{repo}/allspice_generated/json/{content}'
REPO_GET_ALLSPICE_SVG = '/repos/{owner}/{repo}/allspice_generated/svg/{content}'
REPO_GET_TOPICS = '/repos/{owner}/{repo}/topics'
REPO_ADD_TOPIC = '/repos/{owner}/{repo}/topics/{topic}'
REPO_GET_RELEASES = '/repos/{owner}/{repo}/releases'
REPO_GET_LATEST_RELEASE = '/repos/{owner}/{repo}/releases/latest'
REPO_GET_RELEASE_BY_TAG = '/repos/{owner}/{repo}/releases/tags/{tag}'
REPO_GET_COMMIT_STATUS = '/repos/{owner}/{repo}/statuses/{sha}'
REPO_GET_RAW_FILE = '/repos/{owner}/{repo}/raw/{path}'
REPO_GET_TREE = '/repos/{owner}/{repo}/git/trees/{ref}'
@classmethod
def request( cls, allspice_client, owner: str, name: str) -> Repository:
607    @classmethod
608    def request(
609        cls,
610        allspice_client,
611        owner: str,
612        name: str,
613    ) -> Repository:
614        return cls._request(allspice_client, {"owner": owner, "name": name})
@classmethod
def search( cls, allspice_client, query: Optional[str] = None, topic: bool = False, include_description: bool = False, user: Optional[User] = None, owner_to_prioritize: Union[User, Organization, NoneType] = None) -> list[Repository]:
616    @classmethod
617    def search(
618        cls,
619        allspice_client,
620        query: Optional[str] = None,
621        topic: bool = False,
622        include_description: bool = False,
623        user: Optional[User] = None,
624        owner_to_prioritize: Union[User, Organization, None] = None,
625    ) -> list[Repository]:
626        """
627        Search for repositories.
628
629        See https://hub.allspice.io/api/swagger#/repository/repoSearch
630
631        :param query: The query string to search for
632        :param topic: If true, the query string will only be matched against the
633            repository's topic.
634        :param include_description: If true, the query string will be matched
635            against the repository's description as well.
636        :param user: If specified, only repositories that this user owns or
637            contributes to will be searched.
638        :param owner_to_prioritize: If specified, repositories owned by the
639            given entity will be prioritized in the search.
640        :returns: All repositories matching the query. If there are many
641            repositories matching this query, this may take some time.
642        """
643
644        params = {}
645
646        if query is not None:
647            params["q"] = query
648        if topic:
649            params["topic"] = topic
650        if include_description:
651            params["include_description"] = include_description
652        if user is not None:
653            params["user"] = user.id
654        if owner_to_prioritize is not None:
655            params["owner_to_prioritize"] = owner_to_prioritize.id
656
657        responses = allspice_client.requests_get_paginated(cls.REPO_SEARCH, params=params)
658
659        return [Repository.parse_response(allspice_client, response) for response in responses]

Search for repositories.

See allspice.allspice.io/api/swagger#/repository/repoSearch">https://huballspice.allspice.io/api/swagger#/repository/repoSearch

Parameters
  • query: The query string to search for
  • topic: If true, the query string will only be matched against the repository's topic.
  • include_description: If true, the query string will be matched against the repository's description as well.
  • user: If specified, only repositories that this user owns or contributes to will be searched.
  • owner_to_prioritize: If specified, repositories owned by the given entity will be prioritized in the search. :returns: All repositories matching the query. If there are many repositories matching this query, this may take some time.
def commit(self):
690    def commit(self):
691        args = {"owner": self.owner.username, "name": self.name}
692        self._commit(args)
def get_branches(self) -> List[Branch]:
694    def get_branches(self) -> List["Branch"]:
695        """Get all the Branches of this Repository."""
696
697        results = self.allspice_client.requests_get_paginated(
698            Repository.REPO_BRANCHES % (self.owner.username, self.name)
699        )
700        return [Branch.parse_response(self.allspice_client, result) for result in results]

Get all the Branches of this Repository.

def get_branch(self, name: str) -> Branch:
702    def get_branch(self, name: str) -> "Branch":
703        """Get a specific Branch of this Repository."""
704        result = self.allspice_client.requests_get(
705            Repository.REPO_BRANCH.format(owner=self.owner.username, repo=self.name, branch=name)
706        )
707        return Branch.parse_response(self.allspice_client, result)

Get a specific Branch of this Repository.

def add_branch( self, create_from: Union[Branch, Commit, str], newname: str) -> Branch:
709    def add_branch(self, create_from: Ref, newname: str) -> "Branch":
710        """Add a branch to the repository"""
711        # Note: will only work with gitea 1.13 or higher!
712
713        ref_name = Util.data_params_for_ref(create_from)
714        if "ref" not in ref_name:
715            raise ValueError("create_from must be a Branch, Commit or string")
716        ref_name = ref_name["ref"]
717
718        data = {"new_branch_name": newname, "old_ref_name": ref_name}
719        result = self.allspice_client.requests_post(
720            Repository.REPO_BRANCHES % (self.owner.username, self.name), data=data
721        )
722        return Branch.parse_response(self.allspice_client, result)

Add a branch to the repository

def get_issues( self, state: Literal['open', 'closed', 'all'] = 'all', search_query: Optional[str] = None, labels: Optional[List[str]] = None, milestones: Optional[List[Union[Milestone, str]]] = None, assignee: Union[User, str, NoneType] = None, since: Optional[datetime.datetime] = None, before: Optional[datetime.datetime] = None) -> List[Issue]:
724    def get_issues(
725        self,
726        state: Literal["open", "closed", "all"] = "all",
727        search_query: Optional[str] = None,
728        labels: Optional[List[str]] = None,
729        milestones: Optional[List[Union[Milestone, str]]] = None,
730        assignee: Optional[Union[User, str]] = None,
731        since: Optional[datetime] = None,
732        before: Optional[datetime] = None,
733    ) -> List["Issue"]:
734        """
735        Get all Issues of this Repository (open and closed)
736
737        https://hub.allspice.io/api/swagger#/repository/repoListIssues
738
739        All params of this method are optional filters. If you don't specify a filter, it
740        will not be applied.
741
742        :param state: The state of the Issues to get. If None, all Issues are returned.
743        :param search_query: Filter issues by text. This is equivalent to searching for
744                             `search_query` in the Issues on the web interface.
745        :param labels: Filter issues by labels.
746        :param milestones: Filter issues by milestones.
747        :param assignee: Filter issues by the assigned user.
748        :param since: Filter issues by the date they were created.
749        :param before: Filter issues by the date they were created.
750        :return: A list of Issues.
751        """
752
753        data = {
754            "state": state,
755        }
756        if search_query:
757            data["q"] = search_query
758        if labels:
759            data["labels"] = ",".join(labels)
760        if milestones:
761            data["milestone"] = ",".join(
762                [
763                    milestone.name if isinstance(milestone, Milestone) else milestone
764                    for milestone in milestones
765                ]
766            )
767        if assignee:
768            if isinstance(assignee, User):
769                data["assignee"] = assignee.username
770            else:
771                data["assignee"] = assignee
772        if since:
773            data["since"] = Util.format_time(since)
774        if before:
775            data["before"] = Util.format_time(before)
776
777        results = self.allspice_client.requests_get_paginated(
778            Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name),
779            params=data,
780        )
781
782        issues = []
783        for result in results:
784            issue = Issue.parse_response(self.allspice_client, result)
785            # See Issue.request
786            setattr(issue, "_repository", self)
787            # This is mostly for compatibility with an older implementation
788            Issue._add_read_property("repo", self, issue)
789            issues.append(issue)
790
791        return issues

Get all Issues of this Repository (open and closed)

allspice.allspice.io/api/swagger#/repository/repoListIssues">https://huballspice.allspice.io/api/swagger#/repository/repoListIssues

All params of this method are optional filters. If you don't specify a filter, it will not be applied.

Parameters
  • state: The state of the Issues to get. If None, all Issues are returned.
  • search_query: Filter issues by text. This is equivalent to searching for search_query in the Issues on the web interface.
  • labels: Filter issues by labels.
  • milestones: Filter issues by milestones.
  • assignee: Filter issues by the assigned user.
  • since: Filter issues by the date they were created.
  • before: Filter issues by the date they were created.
Returns

A list of Issues.

def get_design_reviews( self, state: Literal['open', 'closed', 'all'] = 'all', milestone: Union[Milestone, str, NoneType] = None, labels: Optional[List[str]] = None) -> List[DesignReview]:
793    def get_design_reviews(
794        self,
795        state: Literal["open", "closed", "all"] = "all",
796        milestone: Optional[Union[Milestone, str]] = None,
797        labels: Optional[List[str]] = None,
798    ) -> List["DesignReview"]:
799        """
800        Get all Design Reviews of this Repository.
801
802        https://hub.allspice.io/api/swagger#/repository/repoListPullRequests
803
804        :param state: The state of the Design Reviews to get. If None, all Design Reviews
805                      are returned.
806        :param milestone: The milestone of the Design Reviews to get.
807        :param labels: A list of label IDs to filter DRs by.
808        :return: A list of Design Reviews.
809        """
810
811        params = {
812            "state": state,
813        }
814        if milestone:
815            if isinstance(milestone, Milestone):
816                params["milestone"] = milestone.name
817            else:
818                params["milestone"] = milestone
819        if labels:
820            params["labels"] = ",".join(labels)
821
822        results = self.allspice_client.requests_get_paginated(
823            self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name),
824            params=params,
825        )
826        return [DesignReview.parse_response(self.allspice_client, result) for result in results]

Get all Design Reviews of this Repository.

allspice.allspice.io/api/swagger#/repository/repoListPullRequests">https://huballspice.allspice.io/api/swagger#/repository/repoListPullRequests

Parameters
  • state: The state of the Design Reviews to get. If None, all Design Reviews are returned.
  • milestone: The milestone of the Design Reviews to get.
  • labels: A list of label IDs to filter DRs by.
Returns

A list of Design Reviews.

def get_commits( self, sha: Optional[str] = None, path: Optional[str] = None, stat: bool = True) -> List[Commit]:
828    def get_commits(
829        self,
830        sha: Optional[str] = None,
831        path: Optional[str] = None,
832        stat: bool = True,
833    ) -> List["Commit"]:
834        """
835        Get all the Commits of this Repository.
836
837        https://hub.allspice.io/api/swagger#/repository/repoGetAllCommits
838
839        :param sha: The SHA of the commit to start listing commits from.
840        :param path: filepath of a file/dir.
841        :param stat: Include the number of additions and deletions in the response.
842                     Disable for speedup.
843        :return: A list of Commits.
844        """
845
846        data = {}
847        if sha:
848            data["sha"] = sha
849        if path:
850            data["path"] = path
851        if not stat:
852            data["stat"] = False
853
854        try:
855            results = self.allspice_client.requests_get_paginated(
856                Repository.REPO_COMMITS % (self.owner.username, self.name),
857                params=data,
858            )
859        except ConflictException as err:
860            logging.warning(err)
861            logging.warning("Repository %s/%s is Empty" % (self.owner.username, self.name))
862            results = []
863        return [Commit.parse_response(self.allspice_client, result) for result in results]

Get all the Commits of this Repository.

allspice.allspice.io/api/swagger#/repository/repoGetAllCommits">https://huballspice.allspice.io/api/swagger#/repository/repoGetAllCommits

Parameters
  • sha: The SHA of the commit to start listing commits from.
  • path: filepath of a file/dir.
  • stat: Include the number of additions and deletions in the response. Disable for speedup.
Returns

A list of Commits.

def get_issues_state(self, state) -> List[Issue]:
865    def get_issues_state(self, state) -> List["Issue"]:
866        """
867        DEPRECATED: Use get_issues() instead.
868
869        Get issues of state Issue.open or Issue.closed of a repository.
870        """
871
872        assert state in [Issue.OPENED, Issue.CLOSED]
873        issues = []
874        data = {"state": state}
875        results = self.allspice_client.requests_get_paginated(
876            Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name),
877            params=data,
878        )
879        for result in results:
880            issue = Issue.parse_response(self.allspice_client, result)
881            # adding data not contained in the issue response
882            # See Issue.request()
883            setattr(issue, "_repository", self)
884            Issue._add_read_property("repo", self, issue)
885            Issue._add_read_property("owner", self.owner, issue)
886            issues.append(issue)
887        return issues

DEPRECATED: Use get_issues() instead.

Get issues of state Issue.open or Issue.closed of a repository.

def get_times(self):
889    def get_times(self):
890        results = self.allspice_client.requests_get(
891            Repository.REPO_TIMES % (self.owner.username, self.name)
892        )
893        return results
def get_user_time(self, username) -> float:
895    def get_user_time(self, username) -> float:
896        if isinstance(username, User):
897            username = username.username
898        results = self.allspice_client.requests_get(
899            Repository.REPO_USER_TIME % (self.owner.username, self.name, username)
900        )
901        time = sum(r["time"] for r in results)
902        return time
def get_full_name(self) -> str:
904    def get_full_name(self) -> str:
905        return self.owner.username + "/" + self.name
def create_issue( self, title, assignees=frozenset(), description='') -> allspice.baseapiobject.ApiObject:
907    def create_issue(self, title, assignees=frozenset(), description="") -> ApiObject:
908        data = {
909            "assignees": assignees,
910            "body": description,
911            "closed": False,
912            "title": title,
913        }
914        result = self.allspice_client.requests_post(
915            Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name),
916            data=data,
917        )
918
919        issue = Issue.parse_response(self.allspice_client, result)
920        setattr(issue, "_repository", self)
921        Issue._add_read_property("repo", self, issue)
922        return issue
def create_design_review( self, title: str, head: Union[Branch, str], base: Union[Branch, str], assignees: Optional[Set[Union[User, str]]] = None, body: Optional[str] = None, due_date: Optional[datetime.datetime] = None, milestone: Optional[Milestone] = None) -> DesignReview:
924    def create_design_review(
925        self,
926        title: str,
927        head: Union[Branch, str],
928        base: Union[Branch, str],
929        assignees: Optional[Set[Union[User, str]]] = None,
930        body: Optional[str] = None,
931        due_date: Optional[datetime] = None,
932        milestone: Optional["Milestone"] = None,
933    ) -> "DesignReview":
934        """
935        Create a new Design Review.
936
937        See https://hub.allspice.io/api/swagger#/repository/repoCreatePullRequest
938
939        :param title: Title of the Design Review
940        :param head: Branch or name of the branch to merge into the base branch
941        :param base: Branch or name of the branch to merge into
942        :param assignees: Optional. A list of users to assign this review. List can be of
943                          User objects or of usernames.
944        :param body: An Optional Description for the Design Review.
945        :param due_date: An Optional Due date for the Design Review.
946        :param milestone: An Optional Milestone for the Design Review
947        :return: The created Design Review
948        """
949
950        data: dict[str, Any] = {
951            "title": title,
952        }
953
954        if isinstance(head, Branch):
955            data["head"] = head.name
956        else:
957            data["head"] = head
958        if isinstance(base, Branch):
959            data["base"] = base.name
960        else:
961            data["base"] = base
962        if assignees:
963            data["assignees"] = [a.username if isinstance(a, User) else a for a in assignees]
964        if body:
965            data["body"] = body
966        if due_date:
967            data["due_date"] = Util.format_time(due_date)
968        if milestone:
969            data["milestone"] = milestone.id
970
971        result = self.allspice_client.requests_post(
972            self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name),
973            data=data,
974        )
975
976        return DesignReview.parse_response(self.allspice_client, result)

Create a new Design Review.

See allspice.allspice.io/api/swagger#/repository/repoCreatePullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoCreatePullRequest

Parameters
  • title: Title of the Design Review
  • head: Branch or name of the branch to merge into the base branch
  • base: Branch or name of the branch to merge into
  • assignees: Optional. A list of users to assign this review. List can be of User objects or of usernames.
  • body: An Optional Description for the Design Review.
  • due_date: An Optional Due date for the Design Review.
  • milestone: An Optional Milestone for the Design Review
Returns

The created Design Review

def create_milestone( self, title: str, description: str, due_date: Optional[str] = None, state: str = 'open') -> Milestone:
978    def create_milestone(
979        self,
980        title: str,
981        description: str,
982        due_date: Optional[str] = None,
983        state: str = "open",
984    ) -> "Milestone":
985        url = Repository.REPO_MILESTONES.format(owner=self.owner.username, repo=self.name)
986        data = {"title": title, "description": description, "state": state}
987        if due_date:
988            data["due_date"] = due_date
989        result = self.allspice_client.requests_post(url, data=data)
990        return Milestone.parse_response(self.allspice_client, result)
def create_gitea_hook(self, hook_url: str, events: List[str]):
 992    def create_gitea_hook(self, hook_url: str, events: List[str]):
 993        url = f"/repos/{self.owner.username}/{self.name}/hooks"
 994        data = {
 995            "type": "gitea",
 996            "config": {"content_type": "json", "url": hook_url},
 997            "events": events,
 998            "active": True,
 999        }
1000        return self.allspice_client.requests_post(url, data=data)
def list_hooks(self):
1002    def list_hooks(self):
1003        url = f"/repos/{self.owner.username}/{self.name}/hooks"
1004        return self.allspice_client.requests_get(url)
def delete_hook(self, id: str):
1006    def delete_hook(self, id: str):
1007        url = f"/repos/{self.owner.username}/{self.name}/hooks/{id}"
1008        self.allspice_client.requests_delete(url)
def is_collaborator(self, username) -> bool:
1010    def is_collaborator(self, username) -> bool:
1011        if isinstance(username, User):
1012            username = username.username
1013        try:
1014            # returns 204 if its ok, 404 if its not
1015            self.allspice_client.requests_get(
1016                Repository.REPO_IS_COLLABORATOR % (self.owner.username, self.name, username)
1017            )
1018            return True
1019        except Exception:
1020            return False
def get_users_with_access(self) -> Sequence[User]:
1022    def get_users_with_access(self) -> Sequence[User]:
1023        url = f"/repos/{self.owner.username}/{self.name}/collaborators"
1024        response = self.allspice_client.requests_get(url)
1025        collabs = [User.parse_response(self.allspice_client, user) for user in response]
1026        if isinstance(self.owner, User):
1027            return [*collabs, self.owner]
1028        else:
1029            # owner must be org
1030            teams = self.owner.get_teams()
1031            for team in teams:
1032                team_repos = team.get_repos()
1033                if self.name in [n.name for n in team_repos]:
1034                    collabs += team.get_members()
1035            return collabs
def remove_collaborator(self, user_name: str):
1037    def remove_collaborator(self, user_name: str):
1038        url = f"/repos/{self.owner.username}/{self.name}/collaborators/{user_name}"
1039        self.allspice_client.requests_delete(url)
def transfer_ownership( self, new_owner: Union[User, Organization], new_teams: Union[Set[Team], FrozenSet[Team]] = frozenset()):
1041    def transfer_ownership(
1042        self,
1043        new_owner: Union[User, Organization],
1044        new_teams: Set[Team] | FrozenSet[Team] = frozenset(),
1045    ):
1046        url = Repository.REPO_TRANSFER.format(owner=self.owner.username, repo=self.name)
1047        data: dict[str, Any] = {"new_owner": new_owner.username}
1048        if isinstance(new_owner, Organization):
1049            new_team_ids = [team.id for team in new_teams if team in new_owner.get_teams()]
1050            data["team_ids"] = new_team_ids
1051        self.allspice_client.requests_post(url, data=data)
1052        # TODO: make sure this instance is either updated or discarded
def get_git_content( self, ref: Union[Branch, Commit, str, NoneType] = None, commit: Optional[Commit] = None) -> List[Content]:
1054    def get_git_content(
1055        self,
1056        ref: Optional["Ref"] = None,
1057        commit: "Optional[Commit]" = None,
1058    ) -> List[Content]:
1059        """
1060        Get the metadata for all files in the root directory.
1061
1062        https://hub.allspice.io/api/swagger#/repository/repoGetContentsList
1063
1064        :param ref: branch or commit to get content from
1065        :param commit: commit to get content from (deprecated)
1066        """
1067        url = f"/repos/{self.owner.username}/{self.name}/contents"
1068        data = Util.data_params_for_ref(ref or commit)
1069
1070        result = [
1071            Content.parse_response(self.allspice_client, f)
1072            for f in self.allspice_client.requests_get(url, data)
1073        ]
1074        return result

Get the metadata for all files in the root directory.

allspice.allspice.io/api/swagger#/repository/repoGetContentsList">https://huballspice.allspice.io/api/swagger#/repository/repoGetContentsList

Parameters
  • ref: branch or commit to get content from
  • commit: commit to get content from (deprecated)
def get_tree( self, ref: Union[Branch, Commit, str, NoneType] = None, recursive: bool = False) -> List[GitEntry]:
1076    def get_tree(self, ref: Optional[Ref] = None, recursive: bool = False) -> List[GitEntry]:
1077        """
1078        Get the repository's tree on a given ref.
1079
1080        By default, this will only return the top-level entries in the tree. If you want
1081        to get the entire tree, set `recursive` to True.
1082
1083        :param ref: The ref to get the tree from. If not provided, the default branch is used.
1084        :param recursive: Whether to get the entire tree or just the top-level entries.
1085        """
1086
1087        ref = Util.data_params_for_ref(ref).get("ref", self.default_branch)
1088        url = self.REPO_GET_TREE.format(owner=self.owner.username, repo=self.name, ref=ref)
1089        params = {"recursive": recursive}
1090        results = self.allspice_client.requests_get_paginated(url, params=params)
1091        return [GitEntry.parse_response(self.allspice_client, result) for result in results]

Get the repository's tree on a given ref.

By default, this will only return the top-level entries in the tree. If you want to get the entire tree, set recursive to True.

Parameters
  • ref: The ref to get the tree from. If not provided, the default branch is used.
  • recursive: Whether to get the entire tree or just the top-level entries.
def get_file_content( self, content: Content, ref: Union[Branch, Commit, str, NoneType] = None, commit: Optional[Commit] = None) -> Union[str, List[Content]]:
1093    def get_file_content(
1094        self,
1095        content: Content,
1096        ref: Optional[Ref] = None,
1097        commit: Optional[Commit] = None,
1098    ) -> Union[str, List["Content"]]:
1099        """https://hub.allspice.io/api/swagger#/repository/repoGetContents"""
1100        url = f"/repos/{self.owner.username}/{self.name}/contents/{content.path}"
1101        data = Util.data_params_for_ref(ref or commit)
1102
1103        if content.type == Content.FILE:
1104            return self.allspice_client.requests_get(url, data)["content"]
1105        else:
1106            return [
1107                Content.parse_response(self.allspice_client, f)
1108                for f in self.allspice_client.requests_get(url, data)
1109            ]

allspice.allspice.io/api/swagger#/repository/repoGetContents">https://huballspice.allspice.io/api/swagger#/repository/repoGetContents

def get_raw_file( self, file_path: str, ref: Union[Branch, Commit, str, NoneType] = None) -> bytes:
1111    def get_raw_file(
1112        self,
1113        file_path: str,
1114        ref: Optional[Ref] = None,
1115    ) -> bytes:
1116        """
1117        Get the raw, binary data of a single file.
1118
1119        Note 1: if the file you are requesting is a text file, you might want to
1120        use .decode() on the result to get a string. For example:
1121
1122            content = repo.get_raw_file("file.txt").decode("utf-8")
1123
1124        Note 2: this method will store the entire file in memory. If you want
1125        to download a large file, you might want to use `download_to_file`
1126        instead.
1127
1128        See https://hub.allspice.io/api/swagger#/repository/repoGetRawFile
1129
1130        :param file_path: The path to the file to get.
1131        :param ref: The branch or commit to get the file from.  If not provided,
1132            the default branch is used.
1133        """
1134
1135        url = self.REPO_GET_RAW_FILE.format(
1136            owner=self.owner.username,
1137            repo=self.name,
1138            path=file_path,
1139        )
1140        params = Util.data_params_for_ref(ref)
1141        return self.allspice_client.requests_get_raw(url, params=params)

Get the raw, binary data of a single file.

Note 1: if the file you are requesting is a text file, you might want to use .decode() on the result to get a string. For example:

content = repo.get_raw_file("file.txt").decode("utf-8")

Note 2: this method will store the entire file in memory. If you want to download a large file, you might want to use download_to_file instead.

See allspice.allspice.io/api/swagger#/repository/repoGetRawFile">https://huballspice.allspice.io/api/swagger#/repository/repoGetRawFile

Parameters
  • file_path: The path to the file to get.
  • ref: The branch or commit to get the file from. If not provided, the default branch is used.
def download_to_file( self, file_path: str, io: <class 'IO'>, ref: Union[Branch, Commit, str, NoneType] = None) -> None:
1143    def download_to_file(
1144        self,
1145        file_path: str,
1146        io: IO,
1147        ref: Optional[Ref] = None,
1148    ) -> None:
1149        """
1150        Download the binary data of a file to a file-like object.
1151
1152        Example:
1153
1154            with open("schematic.DSN", "wb") as f:
1155                Repository.download_to_file("Schematics/my_schematic.DSN", f)
1156
1157        :param file_path: The path to the file in the repository from the root
1158            of the repository.
1159        :param io: The file-like object to write the data to.
1160        """
1161
1162        url = self.allspice_client._AllSpice__get_url(
1163            self.REPO_GET_RAW_FILE.format(
1164                owner=self.owner.username,
1165                repo=self.name,
1166                path=file_path,
1167            )
1168        )
1169        params = Util.data_params_for_ref(ref)
1170        response = self.allspice_client.requests.get(
1171            url,
1172            params=params,
1173            headers=self.allspice_client.headers,
1174            stream=True,
1175        )
1176
1177        for chunk in response.iter_content(chunk_size=4096):
1178            if chunk:
1179                io.write(chunk)

Download the binary data of a file to a file-like object.

Example:

with open("schematic.DSN", "wb") as f:
    Repository.download_to_file("Schematics/my_schematic.DSN", f)
Parameters
  • file_path: The path to the file in the repository from the root of the repository.
  • io: The file-like object to write the data to.
def get_generated_json( self, content: Union[Content, str], ref: Union[Branch, Commit, str, NoneType] = None) -> dict:
1181    def get_generated_json(self, content: Union[Content, str], ref: Optional[Ref] = None) -> dict:
1182        """
1183        Get the json blob for a cad file if it exists, otherwise enqueue
1184        a new job and return a 503 status.
1185
1186        WARNING: This is still experimental and not recommended for critical
1187        applications. The structure and content of the returned dictionary can
1188        change at any time.
1189
1190        See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON
1191        """
1192
1193        if isinstance(content, Content):
1194            content = content.path
1195
1196        url = self.REPO_GET_ALLSPICE_JSON.format(
1197            owner=self.owner.username,
1198            repo=self.name,
1199            content=content,
1200        )
1201        data = Util.data_params_for_ref(ref)
1202        return self.allspice_client.requests_get(url, data)

Get the json blob for a cad file if it exists, otherwise enqueue a new job and return a 503 status.

WARNING: This is still experimental and not recommended for critical applications. The structure and content of the returned dictionary can change at any time.

See allspice.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON">https://huballspice.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON

def get_generated_svg( self, content: Union[Content, str], ref: Union[Branch, Commit, str, NoneType] = None) -> bytes:
1204    def get_generated_svg(self, content: Union[Content, str], ref: Optional[Ref] = None) -> bytes:
1205        """
1206        Get the svg blob for a cad file if it exists, otherwise enqueue
1207        a new job and return a 503 status.
1208
1209        WARNING: This is still experimental and not yet recommended for
1210        critical applications. The content of the returned svg can change
1211        at any time.
1212
1213        See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG
1214        """
1215
1216        if isinstance(content, Content):
1217            content = content.path
1218
1219        url = self.REPO_GET_ALLSPICE_SVG.format(
1220            owner=self.owner.username,
1221            repo=self.name,
1222            content=content,
1223        )
1224        data = Util.data_params_for_ref(ref)
1225        return self.allspice_client.requests_get_raw(url, data)

Get the svg blob for a cad file if it exists, otherwise enqueue a new job and return a 503 status.

WARNING: This is still experimental and not yet recommended for critical applications. The content of the returned svg can change at any time.

See allspice.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG">https://huballspice.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG

def create_file(self, file_path: str, content: str, data: Optional[dict] = None):
1227    def create_file(self, file_path: str, content: str, data: Optional[dict] = None):
1228        """https://hub.allspice.io/api/swagger#/repository/repoCreateFile"""
1229        if not data:
1230            data = {}
1231        url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}"
1232        data.update({"content": content})
1233        return self.allspice_client.requests_post(url, data)

allspice.allspice.io/api/swagger#/repository/repoCreateFile">https://huballspice.allspice.io/api/swagger#/repository/repoCreateFile

def change_file( self, file_path: str, file_sha: str, content: str, data: Optional[dict] = None):
1235    def change_file(self, file_path: str, file_sha: str, content: str, data: Optional[dict] = None):
1236        """https://hub.allspice.io/api/swagger#/repository/repoCreateFile"""
1237        if not data:
1238            data = {}
1239        url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}"
1240        data.update({"sha": file_sha, "content": content})
1241        return self.allspice_client.requests_put(url, data)

allspice.allspice.io/api/swagger#/repository/repoCreateFile">https://huballspice.allspice.io/api/swagger#/repository/repoCreateFile

def delete_file(self, file_path: str, file_sha: str, data: Optional[dict] = None):
1243    def delete_file(self, file_path: str, file_sha: str, data: Optional[dict] = None):
1244        """https://hub.allspice.io/api/swagger#/repository/repoDeleteFile"""
1245        if not data:
1246            data = {}
1247        url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}"
1248        data.update({"sha": file_sha})
1249        return self.allspice_client.requests_delete(url, data)

allspice.allspice.io/api/swagger#/repository/repoDeleteFile">https://huballspice.allspice.io/api/swagger#/repository/repoDeleteFile

def get_archive( self, ref: Union[Branch, Commit, str] = 'main', archive_format: Repository.ArchiveFormat = <ArchiveFormat.ZIP: 'zip'>) -> bytes:
1251    def get_archive(
1252        self,
1253        ref: Ref = "main",
1254        archive_format: ArchiveFormat = ArchiveFormat.ZIP,
1255    ) -> bytes:
1256        """
1257        Download all the files in a specific ref of a repository as a zip or tarball
1258        archive.
1259
1260        https://hub.allspice.io/api/swagger#/repository/repoGetArchive
1261
1262        :param ref: branch or commit to get content from, defaults to the "main" branch
1263        :param archive_format: zip or tar, defaults to zip
1264        """
1265
1266        ref_string = Util.data_params_for_ref(ref)["ref"]
1267        url = self.REPO_GET_ARCHIVE.format(
1268            owner=self.owner.username,
1269            repo=self.name,
1270            ref=ref_string,
1271            format=archive_format.value,
1272        )
1273        return self.allspice_client.requests_get_raw(url)

Download all the files in a specific ref of a repository as a zip or tarball archive.

allspice.allspice.io/api/swagger#/repository/repoGetArchive">https://huballspice.allspice.io/api/swagger#/repository/repoGetArchive

Parameters
  • ref: branch or commit to get content from, defaults to the "main" branch
  • archive_format: zip or tar, defaults to zip
def get_topics(self) -> list[str]:
1275    def get_topics(self) -> list[str]:
1276        """
1277        Gets the list of topics on this repository.
1278
1279        See http://localhost:3000/api/swagger#/repository/repoListTopics
1280        """
1281
1282        url = self.REPO_GET_TOPICS.format(
1283            owner=self.owner.username,
1284            repo=self.name,
1285        )
1286        return self.allspice_client.requests_get(url)["topics"]

Gets the list of topics on this repository.

See http://localhost:3000/api/swagger#/repository/repoListTopics

def add_topic(self, topic: str):
1288    def add_topic(self, topic: str):
1289        """
1290        Adds a topic to the repository.
1291
1292        See https://hub.allspice.io/api/swagger#/repository/repoAddTopic
1293
1294        :param topic: The topic to add. Topic names must consist only of
1295            lowercase letters, numnbers and dashes (-), and cannot start with
1296            dashes. Topic names also must be under 35 characters long.
1297        """
1298
1299        url = self.REPO_ADD_TOPIC.format(owner=self.owner.username, repo=self.name, topic=topic)
1300        self.allspice_client.requests_put(url)

Adds a topic to the repository.

See allspice.allspice.io/api/swagger#/repository/repoAddTopic">https://huballspice.allspice.io/api/swagger#/repository/repoAddTopic

Parameters
  • topic: The topic to add. Topic names must consist only of lowercase letters, numnbers and dashes (-), and cannot start with dashes. Topic names also must be under 35 characters long.
def create_release( self, tag_name: str, name: Optional[str] = None, body: Optional[str] = None, draft: bool = False):
1302    def create_release(
1303        self,
1304        tag_name: str,
1305        name: Optional[str] = None,
1306        body: Optional[str] = None,
1307        draft: bool = False,
1308    ):
1309        """
1310        Create a release for this repository. The release will be created for
1311        the tag with the given name. If there is no tag with this name, create
1312        the tag first.
1313
1314        See https://hub.allspice.io/api/swagger#/repository/repoCreateRelease
1315        """
1316
1317        url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name)
1318        data = {
1319            "tag_name": tag_name,
1320            "draft": draft,
1321        }
1322        if name is not None:
1323            data["name"] = name
1324        if body is not None:
1325            data["body"] = body
1326        response = self.allspice_client.requests_post(url, data)
1327        return Release.parse_response(self.allspice_client, response, self)

Create a release for this repository. The release will be created for the tag with the given name. If there is no tag with this name, create the tag first.

See allspice.allspice.io/api/swagger#/repository/repoCreateRelease">https://huballspice.allspice.io/api/swagger#/repository/repoCreateRelease

def get_releases( self, draft: Optional[bool] = None, pre_release: Optional[bool] = None) -> List[Release]:
1329    def get_releases(
1330        self, draft: Optional[bool] = None, pre_release: Optional[bool] = None
1331    ) -> List[Release]:
1332        """
1333        Get the list of releases for this repository.
1334
1335        See https://hub.allspice.io/api/swagger#/repository/repoListReleases
1336        """
1337
1338        data = {}
1339
1340        if draft is not None:
1341            data["draft"] = draft
1342        if pre_release is not None:
1343            data["pre-release"] = pre_release
1344
1345        url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name)
1346        responses = self.allspice_client.requests_get_paginated(url, params=data)
1347
1348        return [
1349            Release.parse_response(self.allspice_client, response, self) for response in responses
1350        ]

Get the list of releases for this repository.

See allspice.allspice.io/api/swagger#/repository/repoListReleases">https://huballspice.allspice.io/api/swagger#/repository/repoListReleases

def get_latest_release(self) -> Release:
1352    def get_latest_release(self) -> Release:
1353        """
1354        Get the latest release for this repository.
1355
1356        See https://hub.allspice.io/api/swagger#/repository/repoGetLatestRelease
1357        """
1358
1359        url = self.REPO_GET_LATEST_RELEASE.format(owner=self.owner.username, repo=self.name)
1360        response = self.allspice_client.requests_get(url)
1361        release = Release.parse_response(self.allspice_client, response, self)
1362        return release

Get the latest release for this repository.

See allspice.allspice.io/api/swagger#/repository/repoGetLatestRelease">https://huballspice.allspice.io/api/swagger#/repository/repoGetLatestRelease

def get_release_by_tag(self, tag: str) -> Release:
1364    def get_release_by_tag(self, tag: str) -> Release:
1365        """
1366        Get a release by its tag.
1367
1368        See https://hub.allspice.io/api/swagger#/repository/repoGetReleaseByTag
1369        """
1370
1371        url = self.REPO_GET_RELEASE_BY_TAG.format(
1372            owner=self.owner.username, repo=self.name, tag=tag
1373        )
1374        response = self.allspice_client.requests_get(url)
1375        release = Release.parse_response(self.allspice_client, response, self)
1376        return release

Get a release by its tag.

See allspice.allspice.io/api/swagger#/repository/repoGetReleaseByTag">https://huballspice.allspice.io/api/swagger#/repository/repoGetReleaseByTag

def get_commit_statuses( self, commit: Union[str, Commit], sort: Optional[Repository.CommitStatusSort] = None, state: Optional[CommitStatusState] = None) -> List[CommitStatus]:
1378    def get_commit_statuses(
1379        self,
1380        commit: Union[str, Commit],
1381        sort: Optional[CommitStatusSort] = None,
1382        state: Optional[CommitStatusState] = None,
1383    ) -> List[CommitStatus]:
1384        """
1385        Get a list of statuses for a commit.
1386
1387        This is roughly equivalent to the Commit.get_statuses method, but this
1388        method allows you to sort and filter commits and is more convenient if
1389        you have a commit SHA and don't need to get the commit itself.
1390
1391        See https://hub.allspice.io/api/swagger#/repository/repoListStatuses
1392        """
1393
1394        if isinstance(commit, Commit):
1395            commit = commit.sha
1396
1397        params = {}
1398        if sort is not None:
1399            params["sort"] = sort.value
1400        if state is not None:
1401            params["state"] = state.value
1402
1403        url = self.REPO_GET_COMMIT_STATUS.format(
1404            owner=self.owner.username, repo=self.name, sha=commit
1405        )
1406        response = self.allspice_client.requests_get_paginated(url, params=params)
1407        return [CommitStatus.parse_response(self.allspice_client, status) for status in response]

Get a list of statuses for a commit.

This is roughly equivalent to the Commit.get_statuses method, but this method allows you to sort and filter commits and is more convenient if you have a commit SHA and don't need to get the commit itself.

See allspice.allspice.io/api/swagger#/repository/repoListStatuses">https://huballspice.allspice.io/api/swagger#/repository/repoListStatuses

def create_commit_status( self, commit: Union[str, Commit], context: Optional[str] = None, description: Optional[str] = None, state: Optional[CommitStatusState] = None, target_url: Optional[str] = None) -> CommitStatus:
1409    def create_commit_status(
1410        self,
1411        commit: Union[str, Commit],
1412        context: Optional[str] = None,
1413        description: Optional[str] = None,
1414        state: Optional[CommitStatusState] = None,
1415        target_url: Optional[str] = None,
1416    ) -> CommitStatus:
1417        """
1418        Create a status on a commit.
1419
1420        See https://hub.allspice.io/api/swagger#/repository/repoCreateStatus
1421        """
1422
1423        if isinstance(commit, Commit):
1424            commit = commit.sha
1425
1426        data = {}
1427        if context is not None:
1428            data["context"] = context
1429        if description is not None:
1430            data["description"] = description
1431        if state is not None:
1432            data["state"] = state.value
1433        if target_url is not None:
1434            data["target_url"] = target_url
1435
1436        url = self.REPO_GET_COMMIT_STATUS.format(
1437            owner=self.owner.username, repo=self.name, sha=commit
1438        )
1439        response = self.allspice_client.requests_post(url, data=data)
1440        return CommitStatus.parse_response(self.allspice_client, response)

Create a status on a commit.

See allspice.allspice.io/api/swagger#/repository/repoCreateStatus">https://huballspice.allspice.io/api/swagger#/repository/repoCreateStatus

def delete(self):
1442    def delete(self):
1443        self.allspice_client.requests_delete(
1444            Repository.REPO_DELETE % (self.owner.username, self.name)
1445        )
1446        self.deleted = True
class Repository.ArchiveFormat(enum.Enum):
567    class ArchiveFormat(Enum):
568        """
569        Archive formats for Repository.get_archive
570        """
571
572        TAR = "tar.gz"
573        ZIP = "zip"

Archive formats for Repository.get_archive

TAR = <ArchiveFormat.TAR: 'tar.gz'>
ZIP = <ArchiveFormat.ZIP: 'zip'>
class Repository.CommitStatusSort(enum.Enum):
575    class CommitStatusSort(Enum):
576        """
577        Sort order for Repository.get_commit_status
578        """
579
580        OLDEST = "oldest"
581        RECENT_UPDATE = "recentupdate"
582        LEAST_UPDATE = "leastupdate"
583        LEAST_INDEX = "leastindex"
584        HIGHEST_INDEX = "highestindex"

Sort order for Repository.get_commit_status

OLDEST = <CommitStatusSort.OLDEST: 'oldest'>
RECENT_UPDATE = <CommitStatusSort.RECENT_UPDATE: 'recentupdate'>
LEAST_UPDATE = <CommitStatusSort.LEAST_UPDATE: 'leastupdate'>
LEAST_INDEX = <CommitStatusSort.LEAST_INDEX: 'leastindex'>
HIGHEST_INDEX = <CommitStatusSort.HIGHEST_INDEX: 'highestindex'>
class Milestone(allspice.baseapiobject.ApiObject):
1449class Milestone(ApiObject):
1450    allow_merge_commits: Any
1451    allow_rebase: Any
1452    allow_rebase_explicit: Any
1453    allow_squash_merge: Any
1454    archived: Any
1455    closed_at: Any
1456    closed_issues: int
1457    created_at: str
1458    default_branch: Any
1459    description: str
1460    due_on: Any
1461    has_issues: Any
1462    has_pull_requests: Any
1463    has_wiki: Any
1464    id: int
1465    ignore_whitespace_conflicts: Any
1466    name: Any
1467    open_issues: int
1468    private: Any
1469    state: str
1470    title: str
1471    updated_at: str
1472    website: Any
1473
1474    API_OBJECT = """/repos/{owner}/{repo}/milestones/{number}"""  # <owner, repo>
1475
1476    def __init__(self, allspice_client):
1477        super().__init__(allspice_client)
1478
1479    def __eq__(self, other):
1480        if not isinstance(other, Milestone):
1481            return False
1482        return self.allspice_client == other.allspice_client and self.id == other.id
1483
1484    def __hash__(self):
1485        return hash(self.allspice_client) ^ hash(self.id)
1486
1487    _fields_to_parsers: ClassVar[dict] = {
1488        "closed_at": lambda _, t: Util.convert_time(t),
1489        "due_on": lambda _, t: Util.convert_time(t),
1490    }
1491
1492    _patchable_fields: ClassVar[set[str]] = {
1493        "allow_merge_commits",
1494        "allow_rebase",
1495        "allow_rebase_explicit",
1496        "allow_squash_merge",
1497        "archived",
1498        "default_branch",
1499        "description",
1500        "has_issues",
1501        "has_pull_requests",
1502        "has_wiki",
1503        "ignore_whitespace_conflicts",
1504        "name",
1505        "private",
1506        "website",
1507    }
1508
1509    @classmethod
1510    def request(cls, allspice_client, owner: str, repo: str, number: str):
1511        return cls._request(allspice_client, {"owner": owner, "repo": repo, "number": number})
Milestone(allspice_client)
1476    def __init__(self, allspice_client):
1477        super().__init__(allspice_client)
allow_merge_commits: Any
allow_rebase: Any
allow_rebase_explicit: Any
allow_squash_merge: Any
archived: Any
closed_at: Any
closed_issues: int
created_at: str
default_branch: Any
description: str
due_on: Any
has_issues: Any
has_pull_requests: Any
has_wiki: Any
id: int
ignore_whitespace_conflicts: Any
name: Any
open_issues: int
private: Any
state: str
title: str
updated_at: str
website: Any
API_OBJECT = '/repos/{owner}/{repo}/milestones/{number}'
@classmethod
def request(cls, allspice_client, owner: str, repo: str, number: str):
1509    @classmethod
1510    def request(cls, allspice_client, owner: str, repo: str, number: str):
1511        return cls._request(allspice_client, {"owner": owner, "repo": repo, "number": number})
class Attachment(allspice.baseapiobject.ReadonlyApiObject):
1514class Attachment(ReadonlyApiObject):
1515    """
1516    An asset attached to a comment.
1517
1518    You cannot edit or delete the attachment from this object - see the instance methods
1519    Comment.edit_attachment and delete_attachment for that.
1520    """
1521
1522    browser_download_url: str
1523    created_at: str
1524    download_count: int
1525    id: int
1526    name: str
1527    size: int
1528    uuid: str
1529
1530    def __init__(self, allspice_client):
1531        super().__init__(allspice_client)
1532
1533    def __eq__(self, other):
1534        if not isinstance(other, Attachment):
1535            return False
1536
1537        return self.uuid == other.uuid
1538
1539    def __hash__(self):
1540        return hash(self.uuid)
1541
1542    def download_to_file(self, io: IO):
1543        """
1544        Download the raw, binary data of this Attachment to a file-like object.
1545
1546        Example:
1547
1548            with open("my_file.zip", "wb") as f:
1549                attachment.download_to_file(f)
1550
1551        :param io: The file-like object to write the data to.
1552        """
1553
1554        response = self.allspice_client.requests.get(
1555            self.browser_download_url,
1556            headers=self.allspice_client.headers,
1557            stream=True,
1558        )
1559        # 4kb chunks
1560        for chunk in response.iter_content(chunk_size=4096):
1561            if chunk:
1562                io.write(chunk)

An asset attached to a comment.

You cannot edit or delete the attachment from this object - see the instance methods Comment.edit_attachment and delete_attachment for that.

Attachment(allspice_client)
1530    def __init__(self, allspice_client):
1531        super().__init__(allspice_client)
browser_download_url: str
created_at: str
download_count: int
id: int
name: str
size: int
uuid: str
def download_to_file(self, io: <class 'IO'>):
1542    def download_to_file(self, io: IO):
1543        """
1544        Download the raw, binary data of this Attachment to a file-like object.
1545
1546        Example:
1547
1548            with open("my_file.zip", "wb") as f:
1549                attachment.download_to_file(f)
1550
1551        :param io: The file-like object to write the data to.
1552        """
1553
1554        response = self.allspice_client.requests.get(
1555            self.browser_download_url,
1556            headers=self.allspice_client.headers,
1557            stream=True,
1558        )
1559        # 4kb chunks
1560        for chunk in response.iter_content(chunk_size=4096):
1561            if chunk:
1562                io.write(chunk)

Download the raw, binary data of this Attachment to a file-like object.

Example:

with open("my_file.zip", "wb") as f:
    attachment.download_to_file(f)
Parameters
  • io: The file-like object to write the data to.
class Comment(allspice.baseapiobject.ApiObject):
1565class Comment(ApiObject):
1566    assets: List[Union[Any, Dict[str, Union[int, str]]]]
1567    body: str
1568    created_at: datetime
1569    html_url: str
1570    id: int
1571    issue_url: str
1572    original_author: str
1573    original_author_id: int
1574    pull_request_url: str
1575    updated_at: datetime
1576    user: User
1577
1578    API_OBJECT = """/repos/{owner}/{repo}/issues/comments/{id}"""
1579    GET_ATTACHMENTS_PATH = """/repos/{owner}/{repo}/issues/comments/{id}/assets"""
1580    ATTACHMENT_PATH = """/repos/{owner}/{repo}/issues/comments/{id}/assets/{attachment_id}"""
1581
1582    def __init__(self, allspice_client):
1583        super().__init__(allspice_client)
1584
1585    def __eq__(self, other):
1586        if not isinstance(other, Comment):
1587            return False
1588        return self.repository == other.repository and self.id == other.id
1589
1590    def __hash__(self):
1591        return hash(self.repository) ^ hash(self.id)
1592
1593    @classmethod
1594    def request(cls, allspice_client, owner: str, repo: str, id: str) -> "Comment":
1595        return cls._request(allspice_client, {"owner": owner, "repo": repo, "id": id})
1596
1597    _fields_to_parsers: ClassVar[dict] = {
1598        "user": lambda allspice_client, r: User.parse_response(allspice_client, r),
1599        "created_at": lambda _, t: Util.convert_time(t),
1600        "updated_at": lambda _, t: Util.convert_time(t),
1601    }
1602
1603    _patchable_fields: ClassVar[set[str]] = {"body"}
1604
1605    @property
1606    def parent_url(self) -> str:
1607        """URL of the parent of this comment (the issue or the pull request)"""
1608
1609        if self.issue_url is not None and self.issue_url != "":
1610            return self.issue_url
1611        else:
1612            return self.pull_request_url
1613
1614    @cached_property
1615    def repository(self) -> Repository:
1616        """The repository this comment was posted on."""
1617
1618        owner_name, repo_name = self.parent_url.split("/")[-4:-2]
1619        return Repository.request(self.allspice_client, owner_name, repo_name)
1620
1621    def __fields_for_path(self):
1622        return {
1623            "owner": self.repository.owner.username,
1624            "repo": self.repository.name,
1625            "id": self.id,
1626        }
1627
1628    def commit(self):
1629        self._commit(self.__fields_for_path())
1630
1631    def delete(self):
1632        self.allspice_client.requests_delete(self.API_OBJECT.format(**self.__fields_for_path()))
1633        self.deleted = True
1634
1635    def get_attachments(self) -> List[Attachment]:
1636        """
1637        Get all attachments on this comment. This returns Attachment objects, which
1638        contain a link to download the attachment.
1639
1640        https://hub.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments
1641        """
1642
1643        results = self.allspice_client.requests_get(
1644            self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path())
1645        )
1646        return [Attachment.parse_response(self.allspice_client, result) for result in results]
1647
1648    def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment:
1649        """
1650        Create an attachment on this comment.
1651
1652        https://hub.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment
1653
1654        :param file: The file to attach. This should be a file-like object.
1655        :param name: The name of the file. If not provided, the name of the file will be
1656                     used.
1657        :return: The created attachment.
1658        """
1659
1660        args: dict[str, Any] = {
1661            "files": {"attachment": file},
1662        }
1663        if name is not None:
1664            args["params"] = {"name": name}
1665
1666        result = self.allspice_client.requests_post(
1667            self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()),
1668            **args,
1669        )
1670        return Attachment.parse_response(self.allspice_client, result)
1671
1672    def edit_attachment(self, attachment: Attachment, data: dict) -> Attachment:
1673        """
1674        Edit an attachment.
1675
1676        The list of params that can be edited is available at
1677        https://hub.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment
1678
1679        :param attachment: The attachment to be edited
1680        :param data: The data parameter should be a dictionary of the fields to edit.
1681        :return: The edited attachment
1682        """
1683
1684        args = {
1685            **self.__fields_for_path(),
1686            "attachment_id": attachment.id,
1687        }
1688        result = self.allspice_client.requests_patch(
1689            self.ATTACHMENT_PATH.format(**args),
1690            data=data,
1691        )
1692        return Attachment.parse_response(self.allspice_client, result)
1693
1694    def delete_attachment(self, attachment: Attachment):
1695        """https://hub.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment"""
1696
1697        args = {
1698            **self.__fields_for_path(),
1699            "attachment_id": attachment.id,
1700        }
1701        self.allspice_client.requests_delete(self.ATTACHMENT_PATH.format(**args))
1702        attachment.deleted = True
Comment(allspice_client)
1582    def __init__(self, allspice_client):
1583        super().__init__(allspice_client)
assets: List[Union[Any, Dict[str, Union[int, str]]]]
body: str
created_at: datetime.datetime
html_url: str
id: int
issue_url: str
original_author: str
original_author_id: int
pull_request_url: str
updated_at: datetime.datetime
user: User
API_OBJECT = '/repos/{owner}/{repo}/issues/comments/{id}'
GET_ATTACHMENTS_PATH = '/repos/{owner}/{repo}/issues/comments/{id}/assets'
ATTACHMENT_PATH = '/repos/{owner}/{repo}/issues/comments/{id}/assets/{attachment_id}'
@classmethod
def request( cls, allspice_client, owner: str, repo: str, id: str) -> Comment:
1593    @classmethod
1594    def request(cls, allspice_client, owner: str, repo: str, id: str) -> "Comment":
1595        return cls._request(allspice_client, {"owner": owner, "repo": repo, "id": id})
parent_url: str
1605    @property
1606    def parent_url(self) -> str:
1607        """URL of the parent of this comment (the issue or the pull request)"""
1608
1609        if self.issue_url is not None and self.issue_url != "":
1610            return self.issue_url
1611        else:
1612            return self.pull_request_url

URL of the parent of this comment (the issue or the pull request)

repository: Repository
1614    @cached_property
1615    def repository(self) -> Repository:
1616        """The repository this comment was posted on."""
1617
1618        owner_name, repo_name = self.parent_url.split("/")[-4:-2]
1619        return Repository.request(self.allspice_client, owner_name, repo_name)

The repository this comment was posted on.

def commit(self):
1628    def commit(self):
1629        self._commit(self.__fields_for_path())
def delete(self):
1631    def delete(self):
1632        self.allspice_client.requests_delete(self.API_OBJECT.format(**self.__fields_for_path()))
1633        self.deleted = True
def get_attachments(self) -> List[Attachment]:
1635    def get_attachments(self) -> List[Attachment]:
1636        """
1637        Get all attachments on this comment. This returns Attachment objects, which
1638        contain a link to download the attachment.
1639
1640        https://hub.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments
1641        """
1642
1643        results = self.allspice_client.requests_get(
1644            self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path())
1645        )
1646        return [Attachment.parse_response(self.allspice_client, result) for result in results]

Get all attachments on this comment. This returns Attachment objects, which contain a link to download the attachment.

allspice.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments">https://huballspice.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments

def create_attachment( self, file: <class 'IO'>, name: Optional[str] = None) -> Attachment:
1648    def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment:
1649        """
1650        Create an attachment on this comment.
1651
1652        https://hub.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment
1653
1654        :param file: The file to attach. This should be a file-like object.
1655        :param name: The name of the file. If not provided, the name of the file will be
1656                     used.
1657        :return: The created attachment.
1658        """
1659
1660        args: dict[str, Any] = {
1661            "files": {"attachment": file},
1662        }
1663        if name is not None:
1664            args["params"] = {"name": name}
1665
1666        result = self.allspice_client.requests_post(
1667            self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()),
1668            **args,
1669        )
1670        return Attachment.parse_response(self.allspice_client, result)

Create an attachment on this comment.

allspice.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment

Parameters
  • file: The file to attach. This should be a file-like object.
  • name: The name of the file. If not provided, the name of the file will be used.
Returns

The created attachment.

def edit_attachment( self, attachment: Attachment, data: dict) -> Attachment:
1672    def edit_attachment(self, attachment: Attachment, data: dict) -> Attachment:
1673        """
1674        Edit an attachment.
1675
1676        The list of params that can be edited is available at
1677        https://hub.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment
1678
1679        :param attachment: The attachment to be edited
1680        :param data: The data parameter should be a dictionary of the fields to edit.
1681        :return: The edited attachment
1682        """
1683
1684        args = {
1685            **self.__fields_for_path(),
1686            "attachment_id": attachment.id,
1687        }
1688        result = self.allspice_client.requests_patch(
1689            self.ATTACHMENT_PATH.format(**args),
1690            data=data,
1691        )
1692        return Attachment.parse_response(self.allspice_client, result)

Edit an attachment.

The list of params that can be edited is available at allspice.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment

Parameters
  • attachment: The attachment to be edited
  • data: The data parameter should be a dictionary of the fields to edit.
Returns

The edited attachment

def delete_attachment(self, attachment: Attachment):
1694    def delete_attachment(self, attachment: Attachment):
1695        """https://hub.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment"""
1696
1697        args = {
1698            **self.__fields_for_path(),
1699            "attachment_id": attachment.id,
1700        }
1701        self.allspice_client.requests_delete(self.ATTACHMENT_PATH.format(**args))
1702        attachment.deleted = True

allspice.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment

class Commit(allspice.baseapiobject.ReadonlyApiObject):
1705class Commit(ReadonlyApiObject):
1706    author: User
1707    commit: Dict[str, Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]]
1708    committer: Dict[str, Union[int, str, bool]]
1709    created: str
1710    files: List[Dict[str, str]]
1711    html_url: str
1712    inner_commit: Dict[str, Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]]
1713    parents: List[Union[Dict[str, str], Any]]
1714    sha: str
1715    stats: Dict[str, int]
1716    url: str
1717
1718    API_OBJECT = """/repos/{owner}/{repo}/commits/{sha}"""
1719    COMMIT_GET_STATUS = """/repos/{owner}/{repo}/commits/{sha}/status"""
1720    COMMIT_GET_STATUSES = """/repos/{owner}/{repo}/commits/{sha}/statuses"""
1721
1722    # Regex to extract owner and repo names from the url property
1723    URL_REGEXP = re.compile(r"/repos/([^/]+)/([^/]+)/git/commits")
1724
1725    def __init__(self, allspice_client):
1726        super().__init__(allspice_client)
1727
1728    _fields_to_parsers: ClassVar[dict] = {
1729        # NOTE: api may return None for commiters that are no allspice users
1730        "author": lambda allspice_client, u: (
1731            User.parse_response(allspice_client, u) if u else None
1732        )
1733    }
1734
1735    def __eq__(self, other):
1736        if not isinstance(other, Commit):
1737            return False
1738        return self.sha == other.sha
1739
1740    def __hash__(self):
1741        return hash(self.sha)
1742
1743    @classmethod
1744    def parse_response(cls, allspice_client, result) -> "Commit":
1745        commit_cache = result["commit"]
1746        api_object = cls(allspice_client)
1747        cls._initialize(allspice_client, api_object, result)
1748        # inner_commit for legacy reasons
1749        Commit._add_read_property("inner_commit", commit_cache, api_object)
1750        return api_object
1751
1752    def get_status(self) -> CommitCombinedStatus:
1753        """
1754        Get a combined status consisting of all statues on this commit.
1755
1756        Note that the returned object is a CommitCombinedStatus object, which
1757        also contains a list of all statuses on the commit.
1758
1759        https://hub.allspice.io/api/swagger#/repository/repoGetCommitStatus
1760        """
1761
1762        result = self.allspice_client.requests_get(
1763            self.COMMIT_GET_STATUS.format(**self._fields_for_path)
1764        )
1765        return CommitCombinedStatus.parse_response(self.allspice_client, result)
1766
1767    def get_statuses(self) -> List[CommitStatus]:
1768        """
1769        Get a list of all statuses on this commit.
1770
1771        https://hub.allspice.io/api/swagger#/repository/repoListCommitStatuses
1772        """
1773
1774        results = self.allspice_client.requests_get(
1775            self.COMMIT_GET_STATUSES.format(**self._fields_for_path)
1776        )
1777        return [CommitStatus.parse_response(self.allspice_client, result) for result in results]
1778
1779    @cached_property
1780    def _fields_for_path(self) -> dict[str, str]:
1781        matches = self.URL_REGEXP.search(self.url)
1782        if not matches:
1783            raise ValueError(f"Invalid commit URL: {self.url}")
1784
1785        return {
1786            "owner": matches.group(1),
1787            "repo": matches.group(2),
1788            "sha": self.sha,
1789        }
Commit(allspice_client)
1725    def __init__(self, allspice_client):
1726        super().__init__(allspice_client)
author: User
commit: Dict[str, Union[str, Dict[str, str], Dict[str, Union[bool, str, NoneType]]]]
committer: Dict[str, Union[int, str, bool]]
created: str
files: List[Dict[str, str]]
html_url: str
inner_commit: Dict[str, Union[str, Dict[str, str], Dict[str, Union[bool, str, NoneType]]]]
parents: List[Union[Dict[str, str], Any]]
sha: str
stats: Dict[str, int]
url: str
API_OBJECT = '/repos/{owner}/{repo}/commits/{sha}'
COMMIT_GET_STATUS = '/repos/{owner}/{repo}/commits/{sha}/status'
COMMIT_GET_STATUSES = '/repos/{owner}/{repo}/commits/{sha}/statuses'
URL_REGEXP = re.compile('/repos/([^/]+)/([^/]+)/git/commits')
@classmethod
def parse_response(cls, allspice_client, result) -> Commit:
1743    @classmethod
1744    def parse_response(cls, allspice_client, result) -> "Commit":
1745        commit_cache = result["commit"]
1746        api_object = cls(allspice_client)
1747        cls._initialize(allspice_client, api_object, result)
1748        # inner_commit for legacy reasons
1749        Commit._add_read_property("inner_commit", commit_cache, api_object)
1750        return api_object
def get_status(self) -> CommitCombinedStatus:
1752    def get_status(self) -> CommitCombinedStatus:
1753        """
1754        Get a combined status consisting of all statues on this commit.
1755
1756        Note that the returned object is a CommitCombinedStatus object, which
1757        also contains a list of all statuses on the commit.
1758
1759        https://hub.allspice.io/api/swagger#/repository/repoGetCommitStatus
1760        """
1761
1762        result = self.allspice_client.requests_get(
1763            self.COMMIT_GET_STATUS.format(**self._fields_for_path)
1764        )
1765        return CommitCombinedStatus.parse_response(self.allspice_client, result)

Get a combined status consisting of all statues on this commit.

Note that the returned object is a CommitCombinedStatus object, which also contains a list of all statuses on the commit.

allspice.allspice.io/api/swagger#/repository/repoGetCommitStatus">https://huballspice.allspice.io/api/swagger#/repository/repoGetCommitStatus

def get_statuses(self) -> List[CommitStatus]:
1767    def get_statuses(self) -> List[CommitStatus]:
1768        """
1769        Get a list of all statuses on this commit.
1770
1771        https://hub.allspice.io/api/swagger#/repository/repoListCommitStatuses
1772        """
1773
1774        results = self.allspice_client.requests_get(
1775            self.COMMIT_GET_STATUSES.format(**self._fields_for_path)
1776        )
1777        return [CommitStatus.parse_response(self.allspice_client, result) for result in results]

Get a list of all statuses on this commit.

allspice.allspice.io/api/swagger#/repository/repoListCommitStatuses">https://huballspice.allspice.io/api/swagger#/repository/repoListCommitStatuses

class CommitStatusState(enum.Enum):
1792class CommitStatusState(Enum):
1793    PENDING = "pending"
1794    SUCCESS = "success"
1795    ERROR = "error"
1796    FAILURE = "failure"
1797    WARNING = "warning"
1798
1799    @classmethod
1800    def try_init(cls, value: str) -> CommitStatusState | str:
1801        """
1802        Try converting a string to the enum, and if that fails, return the
1803        string itself.
1804        """
1805
1806        try:
1807            return cls(value)
1808        except ValueError:
1809            return value
PENDING = <CommitStatusState.PENDING: 'pending'>
SUCCESS = <CommitStatusState.SUCCESS: 'success'>
ERROR = <CommitStatusState.ERROR: 'error'>
FAILURE = <CommitStatusState.FAILURE: 'failure'>
WARNING = <CommitStatusState.WARNING: 'warning'>
@classmethod
def try_init(cls, value: str) -> CommitStatusState | str:
1799    @classmethod
1800    def try_init(cls, value: str) -> CommitStatusState | str:
1801        """
1802        Try converting a string to the enum, and if that fails, return the
1803        string itself.
1804        """
1805
1806        try:
1807            return cls(value)
1808        except ValueError:
1809            return value

Try converting a string to the enum, and if that fails, return the string itself.

class CommitStatus(allspice.baseapiobject.ReadonlyApiObject):
1812class CommitStatus(ReadonlyApiObject):
1813    context: str
1814    created_at: str
1815    creator: User
1816    description: str
1817    id: int
1818    status: CommitStatusState
1819    target_url: str
1820    updated_at: str
1821    url: str
1822
1823    def __init__(self, allspice_client):
1824        super().__init__(allspice_client)
1825
1826    _fields_to_parsers: ClassVar[dict] = {
1827        # Gitea/ASH doesn't actually validate that the status is a "valid"
1828        # status, so we can expect empty or unknown strings in the status field.
1829        "status": lambda _, s: CommitStatusState.try_init(s),
1830        "creator": lambda allspice_client, u: (
1831            User.parse_response(allspice_client, u) if u else None
1832        ),
1833    }
1834
1835    def __eq__(self, other):
1836        if not isinstance(other, CommitStatus):
1837            return False
1838        return self.id == other.id
1839
1840    def __hash__(self):
1841        return hash(self.id)
CommitStatus(allspice_client)
1823    def __init__(self, allspice_client):
1824        super().__init__(allspice_client)
context: str
created_at: str
creator: User
description: str
id: int
target_url: str
updated_at: str
url: str
class CommitCombinedStatus(allspice.baseapiobject.ReadonlyApiObject):
1844class CommitCombinedStatus(ReadonlyApiObject):
1845    commit_url: str
1846    repository: Repository
1847    sha: str
1848    state: CommitStatusState
1849    statuses: List["CommitStatus"]
1850    total_count: int
1851    url: str
1852
1853    def __init__(self, allspice_client):
1854        super().__init__(allspice_client)
1855
1856    _fields_to_parsers: ClassVar[dict] = {
1857        # See CommitStatus
1858        "state": lambda _, s: CommitStatusState.try_init(s),
1859        "statuses": lambda allspice_client, statuses: [
1860            CommitStatus.parse_response(allspice_client, status) for status in statuses
1861        ],
1862        "repository": lambda allspice_client, r: Repository.parse_response(allspice_client, r),
1863    }
1864
1865    def __eq__(self, other):
1866        if not isinstance(other, CommitCombinedStatus):
1867            return False
1868        return self.sha == other.sha
1869
1870    def __hash__(self):
1871        return hash(self.sha)
1872
1873    @classmethod
1874    def parse_response(cls, allspice_client, result) -> "CommitCombinedStatus":
1875        api_object = cls(allspice_client)
1876        cls._initialize(allspice_client, api_object, result)
1877        return api_object
CommitCombinedStatus(allspice_client)
1853    def __init__(self, allspice_client):
1854        super().__init__(allspice_client)
commit_url: str
repository: Repository
sha: str
statuses: List[CommitStatus]
total_count: int
url: str
@classmethod
def parse_response(cls, allspice_client, result) -> CommitCombinedStatus:
1873    @classmethod
1874    def parse_response(cls, allspice_client, result) -> "CommitCombinedStatus":
1875        api_object = cls(allspice_client)
1876        cls._initialize(allspice_client, api_object, result)
1877        return api_object
class Issue(allspice.baseapiobject.ApiObject):
1880class Issue(ApiObject):
1881    assets: List[Any]
1882    assignee: Any
1883    assignees: Any
1884    body: str
1885    closed_at: Any
1886    comments: int
1887    created_at: str
1888    due_date: Any
1889    html_url: str
1890    id: int
1891    is_locked: bool
1892    labels: List[Any]
1893    milestone: Optional["Milestone"]
1894    number: int
1895    original_author: str
1896    original_author_id: int
1897    pin_order: int
1898    pull_request: Any
1899    ref: str
1900    repository: Dict[str, Union[int, str]]
1901    state: str
1902    title: str
1903    updated_at: str
1904    url: str
1905    user: User
1906
1907    API_OBJECT = """/repos/{owner}/{repo}/issues/{index}"""  # <owner, repo, index>
1908    GET_TIME = """/repos/%s/%s/issues/%s/times"""  # <owner, repo, index>
1909    GET_COMMENTS = """/repos/{owner}/{repo}/issues/{index}/comments"""
1910    CREATE_ISSUE = """/repos/{owner}/{repo}/issues"""
1911
1912    OPENED = "open"
1913    CLOSED = "closed"
1914
1915    def __init__(self, allspice_client):
1916        super().__init__(allspice_client)
1917
1918    def __eq__(self, other):
1919        if not isinstance(other, Issue):
1920            return False
1921        return self.repository == other.repository and self.id == other.id
1922
1923    def __hash__(self):
1924        return hash(self.repository) ^ hash(self.id)
1925
1926    _fields_to_parsers: ClassVar[dict] = {
1927        "milestone": lambda allspice_client, m: Milestone.parse_response(allspice_client, m),
1928        "user": lambda allspice_client, u: User.parse_response(allspice_client, u),
1929        "assignee": lambda allspice_client, u: User.parse_response(allspice_client, u),
1930        "assignees": lambda allspice_client, us: [
1931            User.parse_response(allspice_client, u) for u in us
1932        ],
1933        "state": lambda _, s: (Issue.CLOSED if s == "closed" else Issue.OPENED),
1934    }
1935
1936    _parsers_to_fields: ClassVar[dict] = {
1937        "milestone": lambda m: m.id,
1938    }
1939
1940    _patchable_fields: ClassVar[set[str]] = {
1941        "assignee",
1942        "assignees",
1943        "body",
1944        "due_date",
1945        "milestone",
1946        "state",
1947        "title",
1948    }
1949
1950    def commit(self):
1951        args = {
1952            "owner": self.repository.owner.username,
1953            "repo": self.repository.name,
1954            "index": self.number,
1955        }
1956        self._commit(args)
1957
1958    @classmethod
1959    def request(cls, allspice_client, owner: str, repo: str, number: str):
1960        api_object = cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number})
1961        # The repository in the response is a RepositoryMeta object, so request
1962        # the full repository object and add it to the issue object.
1963        repository = Repository.request(allspice_client, owner, repo)
1964        setattr(api_object, "_repository", repository)
1965        # For legacy reasons
1966        cls._add_read_property("repo", repository, api_object)
1967        return api_object
1968
1969    @classmethod
1970    def create_issue(cls, allspice_client, repo: Repository, title: str, body: str = ""):
1971        args = {"owner": repo.owner.username, "repo": repo.name}
1972        data = {"title": title, "body": body}
1973        result = allspice_client.requests_post(Issue.CREATE_ISSUE.format(**args), data=data)
1974        issue = Issue.parse_response(allspice_client, result)
1975        setattr(issue, "_repository", repo)
1976        cls._add_read_property("repo", repo, issue)
1977        return issue
1978
1979    @property
1980    def owner(self) -> Organization | User:
1981        return self.repository.owner
1982
1983    def get_time_sum(self, user: User) -> int:
1984        results = self.allspice_client.requests_get(
1985            Issue.GET_TIME % (self.owner.username, self.repository.name, self.number)
1986        )
1987        return sum(result["time"] for result in results if result and result["user_id"] == user.id)
1988
1989    def get_times(self) -> Optional[Dict]:
1990        return self.allspice_client.requests_get(
1991            Issue.GET_TIME % (self.owner.username, self.repository.name, self.number)
1992        )
1993
1994    def delete_time(self, time_id: str):
1995        path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times/{time_id}"
1996        self.allspice_client.requests_delete(path)
1997
1998    def add_time(self, time: int, created: Optional[str] = None, user_name: Optional[User] = None):
1999        path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times"
2000        self.allspice_client.requests_post(
2001            path, data={"created": created, "time": int(time), "user_name": user_name}
2002        )
2003
2004    def get_comments(self) -> List[Comment]:
2005        """https://hub.allspice.io/api/swagger#/issue/issueGetComments"""
2006
2007        results = self.allspice_client.requests_get(
2008            self.GET_COMMENTS.format(
2009                owner=self.owner.username, repo=self.repository.name, index=self.number
2010            )
2011        )
2012
2013        return [Comment.parse_response(self.allspice_client, result) for result in results]
2014
2015    def create_comment(self, body: str) -> Comment:
2016        """https://hub.allspice.io/api/swagger#/issue/issueCreateComment"""
2017
2018        path = self.GET_COMMENTS.format(
2019            owner=self.owner.username, repo=self.repository.name, index=self.number
2020        )
2021
2022        response = self.allspice_client.requests_post(path, data={"body": body})
2023        return Comment.parse_response(self.allspice_client, response)
Issue(allspice_client)
1915    def __init__(self, allspice_client):
1916        super().__init__(allspice_client)
assets: List[Any]
assignee: Any
assignees: Any
body: str
closed_at: Any
comments: int
created_at: str
due_date: Any
html_url: str
id: int
is_locked: bool
labels: List[Any]
milestone: Optional[Milestone]
number: int
original_author: str
original_author_id: int
pin_order: int
pull_request: Any
ref: str
repository: Dict[str, Union[int, str]]
state: str
title: str
updated_at: str
url: str
user: User
API_OBJECT = '/repos/{owner}/{repo}/issues/{index}'
GET_TIME = '/repos/%s/%s/issues/%s/times'
GET_COMMENTS = '/repos/{owner}/{repo}/issues/{index}/comments'
CREATE_ISSUE = '/repos/{owner}/{repo}/issues'
OPENED = 'open'
CLOSED = 'closed'
def commit(self):
1950    def commit(self):
1951        args = {
1952            "owner": self.repository.owner.username,
1953            "repo": self.repository.name,
1954            "index": self.number,
1955        }
1956        self._commit(args)
@classmethod
def request(cls, allspice_client, owner: str, repo: str, number: str):
1958    @classmethod
1959    def request(cls, allspice_client, owner: str, repo: str, number: str):
1960        api_object = cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number})
1961        # The repository in the response is a RepositoryMeta object, so request
1962        # the full repository object and add it to the issue object.
1963        repository = Repository.request(allspice_client, owner, repo)
1964        setattr(api_object, "_repository", repository)
1965        # For legacy reasons
1966        cls._add_read_property("repo", repository, api_object)
1967        return api_object
@classmethod
def create_issue( cls, allspice_client, repo: Repository, title: str, body: str = ''):
1969    @classmethod
1970    def create_issue(cls, allspice_client, repo: Repository, title: str, body: str = ""):
1971        args = {"owner": repo.owner.username, "repo": repo.name}
1972        data = {"title": title, "body": body}
1973        result = allspice_client.requests_post(Issue.CREATE_ISSUE.format(**args), data=data)
1974        issue = Issue.parse_response(allspice_client, result)
1975        setattr(issue, "_repository", repo)
1976        cls._add_read_property("repo", repo, issue)
1977        return issue
owner: Organization | User
1979    @property
1980    def owner(self) -> Organization | User:
1981        return self.repository.owner
def get_time_sum(self, user: User) -> int:
1983    def get_time_sum(self, user: User) -> int:
1984        results = self.allspice_client.requests_get(
1985            Issue.GET_TIME % (self.owner.username, self.repository.name, self.number)
1986        )
1987        return sum(result["time"] for result in results if result and result["user_id"] == user.id)
def get_times(self) -> Optional[Dict]:
1989    def get_times(self) -> Optional[Dict]:
1990        return self.allspice_client.requests_get(
1991            Issue.GET_TIME % (self.owner.username, self.repository.name, self.number)
1992        )
def delete_time(self, time_id: str):
1994    def delete_time(self, time_id: str):
1995        path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times/{time_id}"
1996        self.allspice_client.requests_delete(path)
def add_time( self, time: int, created: Optional[str] = None, user_name: Optional[User] = None):
1998    def add_time(self, time: int, created: Optional[str] = None, user_name: Optional[User] = None):
1999        path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times"
2000        self.allspice_client.requests_post(
2001            path, data={"created": created, "time": int(time), "user_name": user_name}
2002        )
def get_comments(self) -> List[Comment]:
2004    def get_comments(self) -> List[Comment]:
2005        """https://hub.allspice.io/api/swagger#/issue/issueGetComments"""
2006
2007        results = self.allspice_client.requests_get(
2008            self.GET_COMMENTS.format(
2009                owner=self.owner.username, repo=self.repository.name, index=self.number
2010            )
2011        )
2012
2013        return [Comment.parse_response(self.allspice_client, result) for result in results]

allspice.allspice.io/api/swagger#/issue/issueGetComments">https://huballspice.allspice.io/api/swagger#/issue/issueGetComments

def create_comment(self, body: str) -> Comment:
2015    def create_comment(self, body: str) -> Comment:
2016        """https://hub.allspice.io/api/swagger#/issue/issueCreateComment"""
2017
2018        path = self.GET_COMMENTS.format(
2019            owner=self.owner.username, repo=self.repository.name, index=self.number
2020        )
2021
2022        response = self.allspice_client.requests_post(path, data={"body": body})
2023        return Comment.parse_response(self.allspice_client, response)

allspice.allspice.io/api/swagger#/issue/issueCreateComment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateComment

class DesignReview(allspice.baseapiobject.ApiObject):
2026class DesignReview(ApiObject):
2027    """
2028    A Design Review. See
2029    https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest.
2030
2031    Note: The base and head fields are not `Branch` objects - they are plain strings
2032    referring to the branch names. This is because DRs can exist for branches that have
2033    been deleted, which don't have an associated `Branch` object from the API. You can use
2034    the `Repository.get_branch` method to get a `Branch` object for a branch if you know
2035    it exists.
2036    """
2037
2038    additions: int
2039    allow_maintainer_edit: bool
2040    allow_maintainer_edits: Any
2041    assignee: User
2042    assignees: List["User"]
2043    base: str
2044    body: str
2045    changed_files: int
2046    closed_at: Any
2047    comments: int
2048    created_at: str
2049    deletions: int
2050    diff_url: str
2051    draft: bool
2052    due_date: Optional[str]
2053    head: str
2054    html_url: str
2055    id: int
2056    is_locked: bool
2057    labels: List[Any]
2058    merge_base: str
2059    merge_commit_sha: Any
2060    mergeable: bool
2061    merged: bool
2062    merged_at: Any
2063    merged_by: Any
2064    milestone: Any
2065    number: int
2066    patch_url: str
2067    pin_order: int
2068    repository: Optional["Repository"]
2069    requested_reviewers: Any
2070    review_comments: int
2071    state: str
2072    title: str
2073    updated_at: str
2074    url: str
2075    user: User
2076
2077    API_OBJECT = "/repos/{owner}/{repo}/pulls/{index}"
2078    MERGE_DESIGN_REVIEW = "/repos/{owner}/{repo}/pulls/{index}/merge"
2079    GET_COMMENTS = "/repos/{owner}/{repo}/issues/{index}/comments"
2080
2081    OPEN = "open"
2082    CLOSED = "closed"
2083
2084    class MergeType(Enum):
2085        MERGE = "merge"
2086        REBASE = "rebase"
2087        REBASE_MERGE = "rebase-merge"
2088        SQUASH = "squash"
2089        MANUALLY_MERGED = "manually-merged"
2090
2091    def __init__(self, allspice_client):
2092        super().__init__(allspice_client)
2093
2094    def __eq__(self, other):
2095        if not isinstance(other, DesignReview):
2096            return False
2097        return self.repository == other.repository and self.id == other.id
2098
2099    def __hash__(self):
2100        return hash(self.repository) ^ hash(self.id)
2101
2102    @classmethod
2103    def parse_response(cls, allspice_client, result) -> "DesignReview":
2104        api_object = super().parse_response(allspice_client, result)
2105        cls._add_read_property(
2106            "repository",
2107            Repository.parse_response(allspice_client, result["base"]["repo"]),
2108            api_object,
2109        )
2110
2111        return api_object
2112
2113    @classmethod
2114    def request(cls, allspice_client, owner: str, repo: str, number: str):
2115        """See https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest"""
2116        return cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number})
2117
2118    _fields_to_parsers: ClassVar[dict] = {
2119        "assignee": lambda allspice_client, u: User.parse_response(allspice_client, u),
2120        "assignees": lambda allspice_client, us: [
2121            User.parse_response(allspice_client, u) for u in us
2122        ],
2123        "base": lambda _, b: b["ref"],
2124        "head": lambda _, h: h["ref"],
2125        "merged_by": lambda allspice_client, u: User.parse_response(allspice_client, u),
2126        "milestone": lambda allspice_client, m: Milestone.parse_response(allspice_client, m),
2127        "user": lambda allspice_client, u: User.parse_response(allspice_client, u),
2128    }
2129
2130    _patchable_fields: ClassVar[set[str]] = {
2131        "allow_maintainer_edits",
2132        "assignee",
2133        "assignees",
2134        "base",
2135        "body",
2136        "due_date",
2137        "milestone",
2138        "state",
2139        "title",
2140    }
2141
2142    _parsers_to_fields: ClassVar[dict] = {
2143        "assignee": lambda u: u.username,
2144        "assignees": lambda us: [u.username for u in us],
2145        "base": lambda b: b.name if isinstance(b, Branch) else b,
2146        "milestone": lambda m: m.id,
2147    }
2148
2149    def commit(self):
2150        data = self.get_dirty_fields()
2151        if "due_date" in data and data["due_date"] is None:
2152            data["unset_due_date"] = True
2153
2154        args = {
2155            "owner": self.repository.owner.username,
2156            "repo": self.repository.name,
2157            "index": self.number,
2158        }
2159        self._commit(args, data)
2160
2161    def merge(self, merge_type: MergeType):
2162        """
2163        Merge the pull request. See
2164        https://hub.allspice.io/api/swagger#/repository/repoMergePullRequest
2165
2166        :param merge_type: The type of merge to perform. See the MergeType enum.
2167        """
2168
2169        self.allspice_client.requests_put(
2170            self.MERGE_DESIGN_REVIEW.format(
2171                owner=self.repository.owner.username,
2172                repo=self.repository.name,
2173                index=self.number,
2174            ),
2175            data={"Do": merge_type.value},
2176        )
2177
2178    def get_comments(self) -> List[Comment]:
2179        """
2180        Get the comments on this pull request, but not specifically on a review.
2181
2182        https://hub.allspice.io/api/swagger#/issue/issueGetComments
2183
2184        :return: A list of comments on this pull request.
2185        """
2186
2187        results = self.allspice_client.requests_get(
2188            self.GET_COMMENTS.format(
2189                owner=self.repository.owner.username,
2190                repo=self.repository.name,
2191                index=self.number,
2192            )
2193        )
2194        return [Comment.parse_response(self.allspice_client, result) for result in results]
2195
2196    def create_comment(self, body: str):
2197        """
2198        Create a comment on this pull request. This uses the same endpoint as the
2199        comments on issues, and will not be associated with any reviews.
2200
2201        https://hub.allspice.io/api/swagger#/issue/issueCreateComment
2202
2203        :param body: The body of the comment.
2204        :return: The comment that was created.
2205        """
2206
2207        result = self.allspice_client.requests_post(
2208            self.GET_COMMENTS.format(
2209                owner=self.repository.owner.username,
2210                repo=self.repository.name,
2211                index=self.number,
2212            ),
2213            data={"body": body},
2214        )
2215        return Comment.parse_response(self.allspice_client, result)

A Design Review. See allspice.allspice.io/api/swagger#/repository/repoGetPullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoGetPullRequest.

Note: The base and head fields are not Branch objects - they are plain strings referring to the branch names. This is because DRs can exist for branches that have been deleted, which don't have an associated Branch object from the API. You can use the Repository.get_branch method to get a Branch object for a branch if you know it exists.

DesignReview(allspice_client)
2091    def __init__(self, allspice_client):
2092        super().__init__(allspice_client)
additions: int
allow_maintainer_edit: bool
allow_maintainer_edits: Any
assignee: User
assignees: List[User]
base: str
body: str
changed_files: int
closed_at: Any
comments: int
created_at: str
deletions: int
diff_url: str
draft: bool
due_date: Optional[str]
head: str
html_url: str
id: int
is_locked: bool
labels: List[Any]
merge_base: str
merge_commit_sha: Any
mergeable: bool
merged: bool
merged_at: Any
merged_by: Any
milestone: Any
number: int
patch_url: str
pin_order: int
repository: Optional[Repository]
requested_reviewers: Any
review_comments: int
state: str
title: str
updated_at: str
url: str
user: User
API_OBJECT = '/repos/{owner}/{repo}/pulls/{index}'
MERGE_DESIGN_REVIEW = '/repos/{owner}/{repo}/pulls/{index}/merge'
GET_COMMENTS = '/repos/{owner}/{repo}/issues/{index}/comments'
OPEN = 'open'
CLOSED = 'closed'
@classmethod
def parse_response(cls, allspice_client, result) -> DesignReview:
2102    @classmethod
2103    def parse_response(cls, allspice_client, result) -> "DesignReview":
2104        api_object = super().parse_response(allspice_client, result)
2105        cls._add_read_property(
2106            "repository",
2107            Repository.parse_response(allspice_client, result["base"]["repo"]),
2108            api_object,
2109        )
2110
2111        return api_object
@classmethod
def request(cls, allspice_client, owner: str, repo: str, number: str):
2113    @classmethod
2114    def request(cls, allspice_client, owner: str, repo: str, number: str):
2115        """See https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest"""
2116        return cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number})

See allspice.allspice.io/api/swagger#/repository/repoGetPullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoGetPullRequest

def commit(self):
2149    def commit(self):
2150        data = self.get_dirty_fields()
2151        if "due_date" in data and data["due_date"] is None:
2152            data["unset_due_date"] = True
2153
2154        args = {
2155            "owner": self.repository.owner.username,
2156            "repo": self.repository.name,
2157            "index": self.number,
2158        }
2159        self._commit(args, data)
def merge(self, merge_type: DesignReview.MergeType):
2161    def merge(self, merge_type: MergeType):
2162        """
2163        Merge the pull request. See
2164        https://hub.allspice.io/api/swagger#/repository/repoMergePullRequest
2165
2166        :param merge_type: The type of merge to perform. See the MergeType enum.
2167        """
2168
2169        self.allspice_client.requests_put(
2170            self.MERGE_DESIGN_REVIEW.format(
2171                owner=self.repository.owner.username,
2172                repo=self.repository.name,
2173                index=self.number,
2174            ),
2175            data={"Do": merge_type.value},
2176        )

Merge the pull request. See allspice.allspice.io/api/swagger#/repository/repoMergePullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoMergePullRequest

Parameters
  • merge_type: The type of merge to perform. See the MergeType enum.
def get_comments(self) -> List[Comment]:
2178    def get_comments(self) -> List[Comment]:
2179        """
2180        Get the comments on this pull request, but not specifically on a review.
2181
2182        https://hub.allspice.io/api/swagger#/issue/issueGetComments
2183
2184        :return: A list of comments on this pull request.
2185        """
2186
2187        results = self.allspice_client.requests_get(
2188            self.GET_COMMENTS.format(
2189                owner=self.repository.owner.username,
2190                repo=self.repository.name,
2191                index=self.number,
2192            )
2193        )
2194        return [Comment.parse_response(self.allspice_client, result) for result in results]

Get the comments on this pull request, but not specifically on a review.

allspice.allspice.io/api/swagger#/issue/issueGetComments">https://huballspice.allspice.io/api/swagger#/issue/issueGetComments

Returns

A list of comments on this pull request.

def create_comment(self, body: str):
2196    def create_comment(self, body: str):
2197        """
2198        Create a comment on this pull request. This uses the same endpoint as the
2199        comments on issues, and will not be associated with any reviews.
2200
2201        https://hub.allspice.io/api/swagger#/issue/issueCreateComment
2202
2203        :param body: The body of the comment.
2204        :return: The comment that was created.
2205        """
2206
2207        result = self.allspice_client.requests_post(
2208            self.GET_COMMENTS.format(
2209                owner=self.repository.owner.username,
2210                repo=self.repository.name,
2211                index=self.number,
2212            ),
2213            data={"body": body},
2214        )
2215        return Comment.parse_response(self.allspice_client, result)

Create a comment on this pull request. This uses the same endpoint as the comments on issues, and will not be associated with any reviews.

allspice.allspice.io/api/swagger#/issue/issueCreateComment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateComment

Parameters
  • body: The body of the comment.
Returns

The comment that was created.

class DesignReview.MergeType(enum.Enum):
2084    class MergeType(Enum):
2085        MERGE = "merge"
2086        REBASE = "rebase"
2087        REBASE_MERGE = "rebase-merge"
2088        SQUASH = "squash"
2089        MANUALLY_MERGED = "manually-merged"
MERGE = <MergeType.MERGE: 'merge'>
REBASE = <MergeType.REBASE: 'rebase'>
REBASE_MERGE = <MergeType.REBASE_MERGE: 'rebase-merge'>
SQUASH = <MergeType.SQUASH: 'squash'>
MANUALLY_MERGED = <MergeType.MANUALLY_MERGED: 'manually-merged'>
class Team(allspice.baseapiobject.ApiObject):
2218class Team(ApiObject):
2219    can_create_org_repo: bool
2220    description: str
2221    id: int
2222    includes_all_repositories: bool
2223    name: str
2224    organization: Optional["Organization"]
2225    permission: str
2226    units: List[str]
2227    units_map: Dict[str, str]
2228
2229    API_OBJECT = """/teams/{id}"""  # <id>
2230    ADD_REPO = """/teams/%s/repos/%s/%s"""  # <id, org, repo>
2231    TEAM_DELETE = """/teams/%s"""  # <id>
2232    GET_MEMBERS = """/teams/%s/members"""  # <id>
2233    GET_REPOS = """/teams/%s/repos"""  # <id>
2234
2235    def __init__(self, allspice_client):
2236        super().__init__(allspice_client)
2237
2238    def __eq__(self, other):
2239        if not isinstance(other, Team):
2240            return False
2241        return self.organization == other.organization and self.id == other.id
2242
2243    def __hash__(self):
2244        return hash(self.organization) ^ hash(self.id)
2245
2246    _fields_to_parsers: ClassVar[dict] = {
2247        "organization": lambda allspice_client, o: Organization.parse_response(allspice_client, o)
2248    }
2249
2250    _patchable_fields: ClassVar[set[str]] = {
2251        "can_create_org_repo",
2252        "description",
2253        "includes_all_repositories",
2254        "name",
2255        "permission",
2256        "units",
2257        "units_map",
2258    }
2259
2260    @classmethod
2261    def request(cls, allspice_client, id: int):
2262        return cls._request(allspice_client, {"id": id})
2263
2264    def commit(self):
2265        args = {"id": self.id}
2266        self._commit(args)
2267
2268    def add_user(self, user: User):
2269        """https://hub.allspice.io/api/swagger#/organization/orgAddTeamMember"""
2270        url = f"/teams/{self.id}/members/{user.login}"
2271        self.allspice_client.requests_put(url)
2272
2273    def add_repo(self, org: Organization, repo: Union[Repository, str]):
2274        if isinstance(repo, Repository):
2275            repo_name = repo.name
2276        else:
2277            repo_name = repo
2278        self.allspice_client.requests_put(Team.ADD_REPO % (self.id, org.username, repo_name))
2279
2280    def get_members(self):
2281        """Get all users assigned to the team."""
2282        results = self.allspice_client.requests_get_paginated(
2283            Team.GET_MEMBERS % self.id,
2284        )
2285        return [User.parse_response(self.allspice_client, result) for result in results]
2286
2287    def get_repos(self):
2288        """Get all repos of this Team."""
2289        results = self.allspice_client.requests_get(Team.GET_REPOS % self.id)
2290        return [Repository.parse_response(self.allspice_client, result) for result in results]
2291
2292    def delete(self):
2293        self.allspice_client.requests_delete(Team.TEAM_DELETE % self.id)
2294        self.deleted = True
2295
2296    def remove_team_member(self, user_name: str):
2297        url = f"/teams/{self.id}/members/{user_name}"
2298        self.allspice_client.requests_delete(url)
Team(allspice_client)
2235    def __init__(self, allspice_client):
2236        super().__init__(allspice_client)
can_create_org_repo: bool
description: str
id: int
includes_all_repositories: bool
name: str
organization: Optional[Organization]
permission: str
units: List[str]
units_map: Dict[str, str]
API_OBJECT = '/teams/{id}'
ADD_REPO = '/teams/%s/repos/%s/%s'
TEAM_DELETE = '/teams/%s'
GET_MEMBERS = '/teams/%s/members'
GET_REPOS = '/teams/%s/repos'
@classmethod
def request(cls, allspice_client, id: int):
2260    @classmethod
2261    def request(cls, allspice_client, id: int):
2262        return cls._request(allspice_client, {"id": id})
def commit(self):
2264    def commit(self):
2265        args = {"id": self.id}
2266        self._commit(args)
def add_user(self, user: User):
2268    def add_user(self, user: User):
2269        """https://hub.allspice.io/api/swagger#/organization/orgAddTeamMember"""
2270        url = f"/teams/{self.id}/members/{user.login}"
2271        self.allspice_client.requests_put(url)

allspice.allspice.io/api/swagger#/organization/orgAddTeamMember">https://huballspice.allspice.io/api/swagger#/organization/orgAddTeamMember

def add_repo( self, org: Organization, repo: Union[Repository, str]):
2273    def add_repo(self, org: Organization, repo: Union[Repository, str]):
2274        if isinstance(repo, Repository):
2275            repo_name = repo.name
2276        else:
2277            repo_name = repo
2278        self.allspice_client.requests_put(Team.ADD_REPO % (self.id, org.username, repo_name))
def get_members(self):
2280    def get_members(self):
2281        """Get all users assigned to the team."""
2282        results = self.allspice_client.requests_get_paginated(
2283            Team.GET_MEMBERS % self.id,
2284        )
2285        return [User.parse_response(self.allspice_client, result) for result in results]

Get all users assigned to the team.

def get_repos(self):
2287    def get_repos(self):
2288        """Get all repos of this Team."""
2289        results = self.allspice_client.requests_get(Team.GET_REPOS % self.id)
2290        return [Repository.parse_response(self.allspice_client, result) for result in results]

Get all repos of this Team.

def delete(self):
2292    def delete(self):
2293        self.allspice_client.requests_delete(Team.TEAM_DELETE % self.id)
2294        self.deleted = True
def remove_team_member(self, user_name: str):
2296    def remove_team_member(self, user_name: str):
2297        url = f"/teams/{self.id}/members/{user_name}"
2298        self.allspice_client.requests_delete(url)
class Release(allspice.baseapiobject.ApiObject):
2301class Release(ApiObject):
2302    """
2303    A release on a repo.
2304    """
2305
2306    assets: List[Union[Any, Dict[str, Union[int, str]], "ReleaseAsset"]]
2307    author: User
2308    body: str
2309    created_at: str
2310    draft: bool
2311    html_url: str
2312    id: int
2313    name: str
2314    prerelease: bool
2315    published_at: str
2316    repo: Optional["Repository"]
2317    repository: Optional["Repository"]
2318    tag_name: str
2319    tarball_url: str
2320    target_commitish: str
2321    upload_url: str
2322    url: str
2323    zipball_url: str
2324
2325    API_OBJECT = "/repos/{owner}/{repo}/releases/{id}"
2326    RELEASE_CREATE_ASSET = "/repos/{owner}/{repo}/releases/{id}/assets"
2327    # Note that we don't strictly need the get_assets route, as the release
2328    # object already contains the assets.
2329
2330    def __init__(self, allspice_client):
2331        super().__init__(allspice_client)
2332
2333    def __eq__(self, other):
2334        if not isinstance(other, Release):
2335            return False
2336        return self.repo == other.repo and self.id == other.id
2337
2338    def __hash__(self):
2339        return hash(self.repo) ^ hash(self.id)
2340
2341    _fields_to_parsers: ClassVar[dict] = {
2342        "author": lambda allspice_client, author: User.parse_response(allspice_client, author),
2343    }
2344    _patchable_fields: ClassVar[set[str]] = {
2345        "body",
2346        "draft",
2347        "name",
2348        "prerelease",
2349        "tag_name",
2350        "target_commitish",
2351    }
2352
2353    @classmethod
2354    def parse_response(cls, allspice_client, result, repo) -> Release:
2355        release = super().parse_response(allspice_client, result)
2356        Release._add_read_property("repository", repo, release)
2357        # For legacy reasons
2358        Release._add_read_property("repo", repo, release)
2359        setattr(
2360            release,
2361            "_assets",
2362            [
2363                ReleaseAsset.parse_response(allspice_client, asset, release)
2364                for asset in result["assets"]
2365            ],
2366        )
2367        return release
2368
2369    @classmethod
2370    def request(
2371        cls,
2372        allspice_client,
2373        owner: str,
2374        repo: str,
2375        id: Optional[int] = None,
2376    ) -> Release:
2377        args = {"owner": owner, "repo": repo, "id": id}
2378        release_response = cls._get_gitea_api_object(allspice_client, args)
2379        repository = Repository.request(allspice_client, owner, repo)
2380        release = cls.parse_response(allspice_client, release_response, repository)
2381        return release
2382
2383    def commit(self):
2384        args = {"owner": self.repo.owner.name, "repo": self.repo.name, "id": self.id}
2385        self._commit(args)
2386
2387    def create_asset(self, file: IO, name: Optional[str] = None) -> ReleaseAsset:
2388        """
2389        Create an asset for this release.
2390
2391        https://hub.allspice.io/api/swagger#/repository/repoCreateReleaseAsset
2392
2393        :param file: The file to upload. This should be a file-like object.
2394        :param name: The name of the file.
2395        :return: The created asset.
2396        """
2397
2398        args: dict[str, Any] = {"files": {"attachment": file}}
2399        if name is not None:
2400            args["params"] = {"name": name}
2401
2402        result = self.allspice_client.requests_post(
2403            self.RELEASE_CREATE_ASSET.format(
2404                owner=self.repo.owner.username,
2405                repo=self.repo.name,
2406                id=self.id,
2407            ),
2408            **args,
2409        )
2410        return ReleaseAsset.parse_response(self.allspice_client, result, self)
2411
2412    def delete(self):
2413        args = {"owner": self.repo.owner.name, "repo": self.repo.name, "id": self.id}
2414        self.allspice_client.requests_delete(self.API_OBJECT.format(**args))
2415        self.deleted = True

A release on a repo.

Release(allspice_client)
2330    def __init__(self, allspice_client):
2331        super().__init__(allspice_client)
assets: List[Union[Any, Dict[str, Union[int, str]], ReleaseAsset]]
author: User
body: str
created_at: str
draft: bool
html_url: str
id: int
name: str
prerelease: bool
published_at: str
repo: Optional[Repository]
repository: Optional[Repository]
tag_name: str
tarball_url: str
target_commitish: str
upload_url: str
url: str
zipball_url: str
API_OBJECT = '/repos/{owner}/{repo}/releases/{id}'
RELEASE_CREATE_ASSET = '/repos/{owner}/{repo}/releases/{id}/assets'
@classmethod
def parse_response(cls, allspice_client, result, repo) -> Release:
2353    @classmethod
2354    def parse_response(cls, allspice_client, result, repo) -> Release:
2355        release = super().parse_response(allspice_client, result)
2356        Release._add_read_property("repository", repo, release)
2357        # For legacy reasons
2358        Release._add_read_property("repo", repo, release)
2359        setattr(
2360            release,
2361            "_assets",
2362            [
2363                ReleaseAsset.parse_response(allspice_client, asset, release)
2364                for asset in result["assets"]
2365            ],
2366        )
2367        return release
@classmethod
def request( cls, allspice_client, owner: str, repo: str, id: Optional[int] = None) -> Release:
2369    @classmethod
2370    def request(
2371        cls,
2372        allspice_client,
2373        owner: str,
2374        repo: str,
2375        id: Optional[int] = None,
2376    ) -> Release:
2377        args = {"owner": owner, "repo": repo, "id": id}
2378        release_response = cls._get_gitea_api_object(allspice_client, args)
2379        repository = Repository.request(allspice_client, owner, repo)
2380        release = cls.parse_response(allspice_client, release_response, repository)
2381        return release
def commit(self):
2383    def commit(self):
2384        args = {"owner": self.repo.owner.name, "repo": self.repo.name, "id": self.id}
2385        self._commit(args)
def create_asset( self, file: <class 'IO'>, name: Optional[str] = None) -> ReleaseAsset:
2387    def create_asset(self, file: IO, name: Optional[str] = None) -> ReleaseAsset:
2388        """
2389        Create an asset for this release.
2390
2391        https://hub.allspice.io/api/swagger#/repository/repoCreateReleaseAsset
2392
2393        :param file: The file to upload. This should be a file-like object.
2394        :param name: The name of the file.
2395        :return: The created asset.
2396        """
2397
2398        args: dict[str, Any] = {"files": {"attachment": file}}
2399        if name is not None:
2400            args["params"] = {"name": name}
2401
2402        result = self.allspice_client.requests_post(
2403            self.RELEASE_CREATE_ASSET.format(
2404                owner=self.repo.owner.username,
2405                repo=self.repo.name,
2406                id=self.id,
2407            ),
2408            **args,
2409        )
2410        return ReleaseAsset.parse_response(self.allspice_client, result, self)

Create an asset for this release.

allspice.allspice.io/api/swagger#/repository/repoCreateReleaseAsset">https://huballspice.allspice.io/api/swagger#/repository/repoCreateReleaseAsset

Parameters
  • file: The file to upload. This should be a file-like object.
  • name: The name of the file.
Returns

The created asset.

def delete(self):
2412    def delete(self):
2413        args = {"owner": self.repo.owner.name, "repo": self.repo.name, "id": self.id}
2414        self.allspice_client.requests_delete(self.API_OBJECT.format(**args))
2415        self.deleted = True
class ReleaseAsset(allspice.baseapiobject.ApiObject):
2418class ReleaseAsset(ApiObject):
2419    browser_download_url: str
2420    created_at: str
2421    download_count: int
2422    id: int
2423    name: str
2424    release: Optional["Release"]
2425    size: int
2426    uuid: str
2427
2428    API_OBJECT = "/repos/{owner}/{repo}/releases/{release_id}/assets/{id}"
2429
2430    def __init__(self, allspice_client):
2431        super().__init__(allspice_client)
2432
2433    def __eq__(self, other):
2434        if not isinstance(other, ReleaseAsset):
2435            return False
2436        return self.release == other.release and self.id == other.id
2437
2438    def __hash__(self):
2439        return hash(self.release) ^ hash(self.id)
2440
2441    _fields_to_parsers: ClassVar[dict] = {}
2442    _patchable_fields: ClassVar[set[str]] = {
2443        "name",
2444    }
2445
2446    @classmethod
2447    def parse_response(cls, allspice_client, result, release) -> ReleaseAsset:
2448        asset = super().parse_response(allspice_client, result)
2449        ReleaseAsset._add_read_property("release", release, asset)
2450        return asset
2451
2452    @classmethod
2453    def request(
2454        cls,
2455        allspice_client,
2456        owner: str,
2457        repo: str,
2458        release_id: int,
2459        id: int,
2460    ) -> ReleaseAsset:
2461        args = {"owner": owner, "repo": repo, "release_id": release_id, "id": id}
2462        asset_response = cls._get_gitea_api_object(allspice_client, args)
2463        release = Release.request(allspice_client, owner, repo, release_id)
2464        asset = cls.parse_response(allspice_client, asset_response, release)
2465        return asset
2466
2467    def commit(self):
2468        args = {
2469            "owner": self.release.repo.owner,
2470            "repo": self.release.repo.name,
2471            "release_id": self.release.id,
2472            "id": self.id,
2473        }
2474        self._commit(args)
2475
2476    def download(self) -> bytes:
2477        """
2478        Download the raw, binary data of this asset.
2479
2480        Note 1: if the file you are requesting is a text file, you might want to
2481        use .decode() on the result to get a string. For example:
2482
2483            asset.download().decode("utf-8")
2484
2485        Note 2: this method will store the entire file in memory. If you are
2486        downloading a large file, you might want to use download_to_file instead.
2487        """
2488
2489        return self.allspice_client.requests.get(
2490            self.browser_download_url,
2491            headers=self.allspice_client.headers,
2492        ).content
2493
2494    def download_to_file(self, io: IO):
2495        """
2496        Download the raw, binary data of this asset to a file-like object.
2497
2498        Example:
2499
2500            with open("my_file.zip", "wb") as f:
2501                asset.download_to_file(f)
2502
2503        :param io: The file-like object to write the data to.
2504        """
2505
2506        response = self.allspice_client.requests.get(
2507            self.browser_download_url,
2508            headers=self.allspice_client.headers,
2509            stream=True,
2510        )
2511        # 4kb chunks
2512        for chunk in response.iter_content(chunk_size=4096):
2513            if chunk:
2514                io.write(chunk)
2515
2516    def delete(self):
2517        args = {
2518            "owner": self.release.repo.owner.name,
2519            "repo": self.release.repo.name,
2520            "release_id": self.release.id,
2521            "id": self.id,
2522        }
2523        self.allspice_client.requests_delete(self.API_OBJECT.format(**args))
2524        self.deleted = True
ReleaseAsset(allspice_client)
2430    def __init__(self, allspice_client):
2431        super().__init__(allspice_client)
browser_download_url: str
created_at: str
download_count: int
id: int
name: str
release: Optional[Release]
size: int
uuid: str
API_OBJECT = '/repos/{owner}/{repo}/releases/{release_id}/assets/{id}'
@classmethod
def parse_response(cls, allspice_client, result, release) -> ReleaseAsset:
2446    @classmethod
2447    def parse_response(cls, allspice_client, result, release) -> ReleaseAsset:
2448        asset = super().parse_response(allspice_client, result)
2449        ReleaseAsset._add_read_property("release", release, asset)
2450        return asset
@classmethod
def request( cls, allspice_client, owner: str, repo: str, release_id: int, id: int) -> ReleaseAsset:
2452    @classmethod
2453    def request(
2454        cls,
2455        allspice_client,
2456        owner: str,
2457        repo: str,
2458        release_id: int,
2459        id: int,
2460    ) -> ReleaseAsset:
2461        args = {"owner": owner, "repo": repo, "release_id": release_id, "id": id}
2462        asset_response = cls._get_gitea_api_object(allspice_client, args)
2463        release = Release.request(allspice_client, owner, repo, release_id)
2464        asset = cls.parse_response(allspice_client, asset_response, release)
2465        return asset
def commit(self):
2467    def commit(self):
2468        args = {
2469            "owner": self.release.repo.owner,
2470            "repo": self.release.repo.name,
2471            "release_id": self.release.id,
2472            "id": self.id,
2473        }
2474        self._commit(args)
def download(self) -> bytes:
2476    def download(self) -> bytes:
2477        """
2478        Download the raw, binary data of this asset.
2479
2480        Note 1: if the file you are requesting is a text file, you might want to
2481        use .decode() on the result to get a string. For example:
2482
2483            asset.download().decode("utf-8")
2484
2485        Note 2: this method will store the entire file in memory. If you are
2486        downloading a large file, you might want to use download_to_file instead.
2487        """
2488
2489        return self.allspice_client.requests.get(
2490            self.browser_download_url,
2491            headers=self.allspice_client.headers,
2492        ).content

Download the raw, binary data of this asset.

Note 1: if the file you are requesting is a text file, you might want to use .decode() on the result to get a string. For example:

asset.download().decode("utf-8")

Note 2: this method will store the entire file in memory. If you are downloading a large file, you might want to use download_to_file instead.

def download_to_file(self, io: <class 'IO'>):
2494    def download_to_file(self, io: IO):
2495        """
2496        Download the raw, binary data of this asset to a file-like object.
2497
2498        Example:
2499
2500            with open("my_file.zip", "wb") as f:
2501                asset.download_to_file(f)
2502
2503        :param io: The file-like object to write the data to.
2504        """
2505
2506        response = self.allspice_client.requests.get(
2507            self.browser_download_url,
2508            headers=self.allspice_client.headers,
2509            stream=True,
2510        )
2511        # 4kb chunks
2512        for chunk in response.iter_content(chunk_size=4096):
2513            if chunk:
2514                io.write(chunk)

Download the raw, binary data of this asset to a file-like object.

Example:

with open("my_file.zip", "wb") as f:
    asset.download_to_file(f)
Parameters
  • io: The file-like object to write the data to.
def delete(self):
2516    def delete(self):
2517        args = {
2518            "owner": self.release.repo.owner.name,
2519            "repo": self.release.repo.name,
2520            "release_id": self.release.id,
2521            "id": self.id,
2522        }
2523        self.allspice_client.requests_delete(self.API_OBJECT.format(**args))
2524        self.deleted = True
class Content(allspice.baseapiobject.ReadonlyApiObject):
2527class Content(ReadonlyApiObject):
2528    content: Any
2529    download_url: str
2530    encoding: Any
2531    git_url: str
2532    html_url: str
2533    last_commit_sha: str
2534    name: str
2535    path: str
2536    sha: str
2537    size: int
2538    submodule_git_url: Any
2539    target: Any
2540    type: str
2541    url: str
2542
2543    FILE = "file"
2544
2545    def __init__(self, allspice_client):
2546        super().__init__(allspice_client)
2547
2548    def __eq__(self, other):
2549        if not isinstance(other, Content):
2550            return False
2551
2552        return self.sha == other.sha and self.name == other.name
2553
2554    def __hash__(self):
2555        return hash(self.sha) ^ hash(self.name)
Content(allspice_client)
2545    def __init__(self, allspice_client):
2546        super().__init__(allspice_client)
content: Any
download_url: str
encoding: Any
git_url: str
html_url: str
last_commit_sha: str
name: str
path: str
sha: str
size: int
submodule_git_url: Any
target: Any
type: str
url: str
FILE = 'file'
Ref = typing.Union[Branch, Commit, str]
class Util:
2561class Util:
2562    @staticmethod
2563    def convert_time(time: str) -> datetime:
2564        """Parsing of strange Gitea time format ("%Y-%m-%dT%H:%M:%S:%z" but with ":" in time zone notation)"""
2565        try:
2566            return datetime.strptime(time[:-3] + "00", "%Y-%m-%dT%H:%M:%S%z")
2567        except ValueError:
2568            return datetime.strptime(time[:-3] + "00", "%Y-%m-%dT%H:%M:%S")
2569
2570    @staticmethod
2571    def format_time(time: datetime) -> str:
2572        """
2573        Format a datetime object to Gitea's time format.
2574
2575        :param time: The time to format
2576        :return: Formatted time
2577        """
2578
2579        return time.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S") + "Z"
2580
2581    @staticmethod
2582    def data_params_for_ref(ref: Optional[Ref]) -> Dict:
2583        """
2584        Given a "ref", returns a dict with the ref parameter for the API call.
2585
2586        If the ref is None, returns an empty dict. You can pass this to the API
2587        directly.
2588        """
2589
2590        if isinstance(ref, Branch):
2591            return {"ref": ref.name}
2592        elif isinstance(ref, Commit):
2593            return {"ref": ref.sha}
2594        elif ref:
2595            return {"ref": ref}
2596        else:
2597            return {}
@staticmethod
def convert_time(time: str) -> datetime.datetime:
2562    @staticmethod
2563    def convert_time(time: str) -> datetime:
2564        """Parsing of strange Gitea time format ("%Y-%m-%dT%H:%M:%S:%z" but with ":" in time zone notation)"""
2565        try:
2566            return datetime.strptime(time[:-3] + "00", "%Y-%m-%dT%H:%M:%S%z")
2567        except ValueError:
2568            return datetime.strptime(time[:-3] + "00", "%Y-%m-%dT%H:%M:%S")

Parsing of strange Gitea time format ("%Y-%m-%dT%H:%M:%S:%z" but with ":" in time zone notation)

@staticmethod
def format_time(time: datetime.datetime) -> str:
2570    @staticmethod
2571    def format_time(time: datetime) -> str:
2572        """
2573        Format a datetime object to Gitea's time format.
2574
2575        :param time: The time to format
2576        :return: Formatted time
2577        """
2578
2579        return time.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S") + "Z"

Format a datetime object to Gitea's time format.

Parameters
  • time: The time to format
Returns

Formatted time

@staticmethod
def data_params_for_ref( ref: Union[Branch, Commit, str, NoneType]) -> Dict:
2581    @staticmethod
2582    def data_params_for_ref(ref: Optional[Ref]) -> Dict:
2583        """
2584        Given a "ref", returns a dict with the ref parameter for the API call.
2585
2586        If the ref is None, returns an empty dict. You can pass this to the API
2587        directly.
2588        """
2589
2590        if isinstance(ref, Branch):
2591            return {"ref": ref.name}
2592        elif isinstance(ref, Commit):
2593            return {"ref": ref.sha}
2594        elif ref:
2595            return {"ref": ref}
2596        else:
2597            return {}

Given a "ref", returns a dict with the ref parameter for the API call.

If the ref is None, returns an empty dict. You can pass this to the API directly.