allspice.apiobject
1from __future__ import annotations 2 3import logging 4import re 5from datetime import datetime, timezone 6from enum import Enum 7from functools import cached_property 8from typing import ( 9 IO, 10 Any, 11 ClassVar, 12 Dict, 13 FrozenSet, 14 List, 15 Literal, 16 Optional, 17 Sequence, 18 Set, 19 Tuple, 20 Union, 21) 22 23try: 24 from typing_extensions import Self 25except ImportError: 26 from typing import Self 27 28from .baseapiobject import ApiObject, ReadonlyApiObject 29from .exceptions import ConflictException, NotFoundException 30 31 32class Organization(ApiObject): 33 """see https://hub.allspice.io/api/swagger#/organization/orgGetAll""" 34 35 active: Optional[bool] 36 avatar_url: str 37 created: Optional[str] 38 description: str 39 email: str 40 followers_count: Optional[int] 41 following_count: Optional[int] 42 full_name: str 43 html_url: Optional[str] 44 id: int 45 is_admin: Optional[bool] 46 language: Optional[str] 47 last_login: Optional[str] 48 location: str 49 login: Optional[str] 50 login_name: Optional[str] 51 name: Optional[str] 52 prohibit_login: Optional[bool] 53 repo_admin_change_team_access: Optional[bool] 54 restricted: Optional[bool] 55 source_id: Optional[int] 56 starred_repos_count: Optional[int] 57 username: str 58 visibility: str 59 website: str 60 61 API_OBJECT = """/orgs/{name}""" # <org> 62 ORG_REPOS_REQUEST = """/orgs/%s/repos""" # <org> 63 ORG_TEAMS_REQUEST = """/orgs/%s/teams""" # <org> 64 ORG_TEAMS_CREATE = """/orgs/%s/teams""" # <org> 65 ORG_GET_MEMBERS = """/orgs/%s/members""" # <org> 66 ORG_IS_MEMBER = """/orgs/%s/members/%s""" # <org>, <username> 67 ORG_HEATMAP = """/users/%s/heatmap""" # <username> 68 69 def __init__(self, allspice_client): 70 super().__init__(allspice_client) 71 72 def __eq__(self, other): 73 if not isinstance(other, Organization): 74 return False 75 return self.allspice_client == other.allspice_client and self.name == other.name 76 77 def __hash__(self): 78 return hash(self.allspice_client) ^ hash(self.name) 79 80 @classmethod 81 def request(cls, allspice_client, name: str) -> Self: 82 return cls._request(allspice_client, {"name": name}) 83 84 @classmethod 85 def parse_response(cls, allspice_client, result) -> "Organization": 86 api_object = super().parse_response(allspice_client, result) 87 # add "name" field to make this behave similar to users for gitea < 1.18 88 # also necessary for repository-owner when org is repo owner 89 if not hasattr(api_object, "name"): 90 Organization._add_read_property("name", result["username"], api_object) 91 return api_object 92 93 _patchable_fields: ClassVar[set[str]] = { 94 "description", 95 "full_name", 96 "location", 97 "visibility", 98 "website", 99 } 100 101 def commit(self): 102 args = {"name": self.name} 103 self._commit(args) 104 105 def create_repo( 106 self, 107 repoName: str, 108 description: str = "", 109 private: bool = False, 110 autoInit=True, 111 gitignores: Optional[str] = None, 112 license: Optional[str] = None, 113 readme: str = "Default", 114 issue_labels: Optional[str] = None, 115 default_branch="master", 116 ): 117 """Create an organization Repository 118 119 Throws: 120 AlreadyExistsException: If the Repository exists already. 121 Exception: If something else went wrong. 122 """ 123 result = self.allspice_client.requests_post( 124 f"/orgs/{self.name}/repos", 125 data={ 126 "name": repoName, 127 "description": description, 128 "private": private, 129 "auto_init": autoInit, 130 "gitignores": gitignores, 131 "license": license, 132 "issue_labels": issue_labels, 133 "readme": readme, 134 "default_branch": default_branch, 135 }, 136 ) 137 if "id" in result: 138 self.allspice_client.logger.info("Successfully created Repository %s " % result["name"]) 139 else: 140 self.allspice_client.logger.error(result["message"]) 141 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 142 return Repository.parse_response(self.allspice_client, result) 143 144 def get_repositories(self) -> List["Repository"]: 145 results = self.allspice_client.requests_get_paginated( 146 Organization.ORG_REPOS_REQUEST % self.username 147 ) 148 return [Repository.parse_response(self.allspice_client, result) for result in results] 149 150 def get_repository(self, name) -> "Repository": 151 repos = self.get_repositories() 152 for repo in repos: 153 if repo.name == name: 154 return repo 155 raise NotFoundException("Repository %s not existent in organization." % name) 156 157 def get_teams(self) -> List["Team"]: 158 results = self.allspice_client.requests_get(Organization.ORG_TEAMS_REQUEST % self.username) 159 teams = [Team.parse_response(self.allspice_client, result) for result in results] 160 # organisation seems to be missing using this request, so we add org manually 161 for t in teams: 162 setattr(t, "_organization", self) 163 return teams 164 165 def get_team(self, name) -> "Team": 166 teams = self.get_teams() 167 for team in teams: 168 if team.name == name: 169 return team 170 raise NotFoundException("Team not existent in organization.") 171 172 def create_team( 173 self, 174 name: str, 175 description: str = "", 176 permission: str = "read", 177 can_create_org_repo: bool = False, 178 includes_all_repositories: bool = False, 179 units=( 180 "repo.code", 181 "repo.issues", 182 "repo.ext_issues", 183 "repo.wiki", 184 "repo.pulls", 185 "repo.releases", 186 "repo.ext_wiki", 187 ), 188 units_map={}, 189 ) -> "Team": 190 """Alias for AllSpice#create_team""" 191 # TODO: Move AllSpice#create_team to Organization#create_team and 192 # deprecate AllSpice#create_team. 193 return self.allspice_client.create_team( 194 org=self, 195 name=name, 196 description=description, 197 permission=permission, 198 can_create_org_repo=can_create_org_repo, 199 includes_all_repositories=includes_all_repositories, 200 units=units, 201 units_map=units_map, 202 ) 203 204 def get_members(self) -> List["User"]: 205 results = self.allspice_client.requests_get(Organization.ORG_GET_MEMBERS % self.username) 206 return [User.parse_response(self.allspice_client, result) for result in results] 207 208 def is_member(self, username) -> bool: 209 if isinstance(username, User): 210 username = username.username 211 try: 212 # returns 204 if its ok, 404 if its not 213 self.allspice_client.requests_get( 214 Organization.ORG_IS_MEMBER % (self.username, username) 215 ) 216 return True 217 except Exception: 218 return False 219 220 def remove_member(self, user: "User"): 221 path = f"/orgs/{self.username}/members/{user.username}" 222 self.allspice_client.requests_delete(path) 223 224 def delete(self): 225 """Delete this Organization. Invalidates this Objects data. Also deletes all Repositories owned by the User""" 226 for repo in self.get_repositories(): 227 repo.delete() 228 self.allspice_client.requests_delete(Organization.API_OBJECT.format(name=self.username)) 229 self.deleted = True 230 231 def get_heatmap(self) -> List[Tuple[datetime, int]]: 232 results = self.allspice_client.requests_get(User.USER_HEATMAP % self.username) 233 results = [ 234 (datetime.fromtimestamp(result["timestamp"]), result["contributions"]) 235 for result in results 236 ] 237 return results 238 239 240class User(ApiObject): 241 active: bool 242 admin: Any 243 allow_create_organization: Any 244 allow_git_hook: Any 245 allow_import_local: Any 246 avatar_url: str 247 created: str 248 description: str 249 email: str 250 emails: List[Any] 251 followers_count: int 252 following_count: int 253 full_name: str 254 html_url: str 255 id: int 256 is_admin: bool 257 language: str 258 last_login: str 259 location: str 260 login: str 261 login_name: str 262 max_repo_creation: Any 263 must_change_password: Any 264 password: Any 265 prohibit_login: bool 266 restricted: bool 267 source_id: int 268 starred_repos_count: int 269 username: str 270 visibility: str 271 website: str 272 273 API_OBJECT = """/users/{name}""" # <org> 274 USER_MAIL = """/user/emails?sudo=%s""" # <name> 275 USER_PATCH = """/admin/users/%s""" # <username> 276 ADMIN_DELETE_USER = """/admin/users/%s""" # <username> 277 ADMIN_EDIT_USER = """/admin/users/{username}""" # <username> 278 USER_HEATMAP = """/users/%s/heatmap""" # <username> 279 280 def __init__(self, allspice_client): 281 super().__init__(allspice_client) 282 self._emails = [] 283 284 def __eq__(self, other): 285 if not isinstance(other, User): 286 return False 287 return self.allspice_client == other.allspice_client and self.id == other.id 288 289 def __hash__(self): 290 return hash(self.allspice_client) ^ hash(self.id) 291 292 @property 293 def emails(self): 294 self.__request_emails() 295 return self._emails 296 297 @classmethod 298 def request(cls, allspice_client, name: str) -> "User": 299 api_object = cls._request(allspice_client, {"name": name}) 300 return api_object 301 302 _patchable_fields: ClassVar[set[str]] = { 303 "active", 304 "admin", 305 "allow_create_organization", 306 "allow_git_hook", 307 "allow_import_local", 308 "email", 309 "full_name", 310 "location", 311 "login_name", 312 "max_repo_creation", 313 "must_change_password", 314 "password", 315 "prohibit_login", 316 "website", 317 } 318 319 def commit(self, login_name: str, source_id: int = 0): 320 """ 321 Unfortunately it is necessary to require the login name 322 as well as the login source (that is not supplied when getting a user) for 323 changing a user. 324 Usually source_id is 0 and the login_name is equal to the username. 325 """ 326 values = self.get_dirty_fields() 327 values.update( 328 # api-doc says that the "source_id" is necessary; works without though 329 {"login_name": login_name, "source_id": source_id} 330 ) 331 args = {"username": self.username} 332 self.allspice_client.requests_patch(User.ADMIN_EDIT_USER.format(**args), data=values) 333 self._dirty_fields = {} 334 335 def create_repo( 336 self, 337 repoName: str, 338 description: str = "", 339 private: bool = False, 340 autoInit=True, 341 gitignores: Optional[str] = None, 342 license: Optional[str] = None, 343 readme: str = "Default", 344 issue_labels: Optional[str] = None, 345 default_branch="master", 346 ): 347 """Create a user Repository 348 349 Throws: 350 AlreadyExistsException: If the Repository exists already. 351 Exception: If something else went wrong. 352 """ 353 result = self.allspice_client.requests_post( 354 "/user/repos", 355 data={ 356 "name": repoName, 357 "description": description, 358 "private": private, 359 "auto_init": autoInit, 360 "gitignores": gitignores, 361 "license": license, 362 "issue_labels": issue_labels, 363 "readme": readme, 364 "default_branch": default_branch, 365 }, 366 ) 367 if "id" in result: 368 self.allspice_client.logger.info("Successfully created Repository %s " % result["name"]) 369 else: 370 self.allspice_client.logger.error(result["message"]) 371 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 372 return Repository.parse_response(self.allspice_client, result) 373 374 def get_repositories(self) -> List["Repository"]: 375 """Get all Repositories owned by this User.""" 376 url = f"/users/{self.username}/repos" 377 results = self.allspice_client.requests_get_paginated(url) 378 return [Repository.parse_response(self.allspice_client, result) for result in results] 379 380 def get_orgs(self) -> List[Organization]: 381 """Get all Organizations this user is a member of.""" 382 url = f"/users/{self.username}/orgs" 383 results = self.allspice_client.requests_get_paginated(url) 384 return [Organization.parse_response(self.allspice_client, result) for result in results] 385 386 def get_teams(self) -> List["Team"]: 387 url = "/user/teams" 388 results = self.allspice_client.requests_get_paginated(url, sudo=self) 389 return [Team.parse_response(self.allspice_client, result) for result in results] 390 391 def get_accessible_repos(self) -> List["Repository"]: 392 """Get all Repositories accessible by the logged in User.""" 393 results = self.allspice_client.requests_get("/user/repos", sudo=self) 394 return [Repository.parse_response(self.allspice_client, result) for result in results] 395 396 def __request_emails(self): 397 result = self.allspice_client.requests_get(User.USER_MAIL % self.login) 398 # report if the adress changed by this 399 for mail in result: 400 self._emails.append(mail["email"]) 401 if mail["primary"]: 402 self._email = mail["email"] 403 404 def delete(self): 405 """Deletes this User. Also deletes all Repositories he owns.""" 406 self.allspice_client.requests_delete(User.ADMIN_DELETE_USER % self.username) 407 self.deleted = True 408 409 def get_heatmap(self) -> List[Tuple[datetime, int]]: 410 results = self.allspice_client.requests_get(User.USER_HEATMAP % self.username) 411 results = [ 412 (datetime.fromtimestamp(result["timestamp"]), result["contributions"]) 413 for result in results 414 ] 415 return results 416 417 418class Branch(ReadonlyApiObject): 419 commit: Dict[str, Optional[Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]]] 420 effective_branch_protection_name: str 421 enable_status_check: bool 422 name: str 423 protected: bool 424 required_approvals: int 425 status_check_contexts: List[Any] 426 user_can_merge: bool 427 user_can_push: bool 428 429 API_OBJECT = """/repos/{owner}/{repo}/branches/{branch}""" 430 431 def __init__(self, allspice_client): 432 super().__init__(allspice_client) 433 434 def __eq__(self, other): 435 if not isinstance(other, Branch): 436 return False 437 return self.commit == other.commit and self.name == other.name 438 439 def __hash__(self): 440 return hash(self.commit["id"]) ^ hash(self.name) 441 442 _fields_to_parsers: ClassVar[dict] = { 443 # This is not a commit object 444 # "commit": lambda allspice_client, c: Commit.parse_response(allspice_client, c) 445 } 446 447 @classmethod 448 def request(cls, allspice_client, owner: str, repo: str, branch: str): 449 return cls._request(allspice_client, {"owner": owner, "repo": repo, "branch": branch}) 450 451 452class GitEntry(ReadonlyApiObject): 453 """ 454 An object representing a file or directory in the Git tree. 455 """ 456 457 mode: str 458 path: str 459 sha: str 460 size: int 461 type: str 462 url: str 463 464 def __init__(self, allspice_client): 465 super().__init__(allspice_client) 466 467 def __eq__(self, other) -> bool: 468 if not isinstance(other, GitEntry): 469 return False 470 return self.sha == other.sha 471 472 def __hash__(self) -> int: 473 return hash(self.sha) 474 475 476class Repository(ApiObject): 477 allow_fast_forward_only_merge: bool 478 allow_manual_merge: Any 479 allow_merge_commits: bool 480 allow_rebase: bool 481 allow_rebase_explicit: bool 482 allow_rebase_update: bool 483 allow_squash_merge: bool 484 archived: bool 485 archived_at: str 486 autodetect_manual_merge: Any 487 avatar_url: str 488 clone_url: str 489 created_at: str 490 default_allow_maintainer_edit: bool 491 default_branch: str 492 default_delete_branch_after_merge: bool 493 default_merge_style: str 494 description: str 495 empty: bool 496 enable_prune: Any 497 external_tracker: Any 498 external_wiki: Any 499 fork: bool 500 forks_count: int 501 full_name: str 502 has_actions: bool 503 has_issues: bool 504 has_packages: bool 505 has_projects: bool 506 has_pull_requests: bool 507 has_releases: bool 508 has_wiki: bool 509 html_url: str 510 id: int 511 ignore_whitespace_conflicts: bool 512 internal: bool 513 internal_tracker: Dict[str, bool] 514 language: str 515 languages_url: str 516 link: str 517 mirror: bool 518 mirror_interval: str 519 mirror_updated: str 520 name: str 521 object_format_name: str 522 open_issues_count: int 523 open_pr_counter: int 524 original_url: str 525 owner: Union["User", "Organization"] 526 parent: Any 527 permissions: Dict[str, bool] 528 private: bool 529 projects_mode: str 530 release_counter: int 531 repo_transfer: Any 532 size: int 533 ssh_url: str 534 stars_count: int 535 template: bool 536 updated_at: datetime 537 url: str 538 watchers_count: int 539 website: str 540 541 API_OBJECT = """/repos/{owner}/{name}""" # <owner>, <reponame> 542 REPO_IS_COLLABORATOR = """/repos/%s/%s/collaborators/%s""" # <owner>, <reponame>, <username> 543 REPO_SEARCH = """/repos/search/""" 544 REPO_BRANCHES = """/repos/%s/%s/branches""" # <owner>, <reponame> 545 REPO_BRANCH = """/repos/{owner}/{repo}/branches/{branch}""" 546 REPO_ISSUES = """/repos/{owner}/{repo}/issues""" # <owner, reponame> 547 REPO_DESIGN_REVIEWS = """/repos/{owner}/{repo}/pulls""" 548 REPO_DELETE = """/repos/%s/%s""" # <owner>, <reponame> 549 REPO_TIMES = """/repos/%s/%s/times""" # <owner>, <reponame> 550 REPO_USER_TIME = """/repos/%s/%s/times/%s""" # <owner>, <reponame>, <username> 551 REPO_COMMITS = "/repos/%s/%s/commits" # <owner>, <reponame> 552 REPO_TRANSFER = "/repos/{owner}/{repo}/transfer" 553 REPO_MILESTONES = """/repos/{owner}/{repo}/milestones""" 554 REPO_GET_ARCHIVE = "/repos/{owner}/{repo}/archive/{ref}.{format}" 555 REPO_GET_ALLSPICE_JSON = "/repos/{owner}/{repo}/allspice_generated/json/{content}" 556 REPO_GET_ALLSPICE_SVG = "/repos/{owner}/{repo}/allspice_generated/svg/{content}" 557 REPO_GET_ALLSPICE_PROJECT = "/repos/{owner}/{repo}/allspice_generated/project/{content}" 558 REPO_GET_TOPICS = "/repos/{owner}/{repo}/topics" 559 REPO_ADD_TOPIC = "/repos/{owner}/{repo}/topics/{topic}" 560 REPO_GET_RELEASES = "/repos/{owner}/{repo}/releases" 561 REPO_GET_LATEST_RELEASE = "/repos/{owner}/{repo}/releases/latest" 562 REPO_GET_RELEASE_BY_TAG = "/repos/{owner}/{repo}/releases/tags/{tag}" 563 REPO_GET_COMMIT_STATUS = "/repos/{owner}/{repo}/statuses/{sha}" 564 REPO_GET_RAW_FILE = "/repos/{owner}/{repo}/raw/{path}" 565 REPO_GET_TREE = "/repos/{owner}/{repo}/git/trees/{ref}" 566 567 class ArchiveFormat(Enum): 568 """ 569 Archive formats for Repository.get_archive 570 """ 571 572 TAR = "tar.gz" 573 ZIP = "zip" 574 575 class CommitStatusSort(Enum): 576 """ 577 Sort order for Repository.get_commit_status 578 """ 579 580 OLDEST = "oldest" 581 RECENT_UPDATE = "recentupdate" 582 LEAST_UPDATE = "leastupdate" 583 LEAST_INDEX = "leastindex" 584 HIGHEST_INDEX = "highestindex" 585 586 def __init__(self, allspice_client): 587 super().__init__(allspice_client) 588 589 def __eq__(self, other): 590 if not isinstance(other, Repository): 591 return False 592 return self.owner == other.owner and self.name == other.name 593 594 def __hash__(self): 595 return hash(self.owner) ^ hash(self.name) 596 597 _fields_to_parsers: ClassVar[dict] = { 598 # dont know how to tell apart user and org as owner except form email being empty. 599 "owner": lambda allspice_client, r: ( 600 Organization.parse_response(allspice_client, r) 601 if r["email"] == "" 602 else User.parse_response(allspice_client, r) 603 ), 604 "updated_at": lambda _, t: Util.convert_time(t), 605 } 606 607 @classmethod 608 def request( 609 cls, 610 allspice_client, 611 owner: str, 612 name: str, 613 ) -> Repository: 614 return cls._request(allspice_client, {"owner": owner, "name": name}) 615 616 @classmethod 617 def search( 618 cls, 619 allspice_client, 620 query: Optional[str] = None, 621 topic: bool = False, 622 include_description: bool = False, 623 user: Optional[User] = None, 624 owner_to_prioritize: Union[User, Organization, None] = None, 625 ) -> list[Repository]: 626 """ 627 Search for repositories. 628 629 See https://hub.allspice.io/api/swagger#/repository/repoSearch 630 631 :param query: The query string to search for 632 :param topic: If true, the query string will only be matched against the 633 repository's topic. 634 :param include_description: If true, the query string will be matched 635 against the repository's description as well. 636 :param user: If specified, only repositories that this user owns or 637 contributes to will be searched. 638 :param owner_to_prioritize: If specified, repositories owned by the 639 given entity will be prioritized in the search. 640 :returns: All repositories matching the query. If there are many 641 repositories matching this query, this may take some time. 642 """ 643 644 params = {} 645 646 if query is not None: 647 params["q"] = query 648 if topic: 649 params["topic"] = topic 650 if include_description: 651 params["include_description"] = include_description 652 if user is not None: 653 params["user"] = user.id 654 if owner_to_prioritize is not None: 655 params["owner_to_prioritize"] = owner_to_prioritize.id 656 657 responses = allspice_client.requests_get_paginated(cls.REPO_SEARCH, params=params) 658 659 return [Repository.parse_response(allspice_client, response) for response in responses] 660 661 _patchable_fields: ClassVar[set[str]] = { 662 "allow_manual_merge", 663 "allow_merge_commits", 664 "allow_rebase", 665 "allow_rebase_explicit", 666 "allow_rebase_update", 667 "allow_squash_merge", 668 "archived", 669 "autodetect_manual_merge", 670 "default_branch", 671 "default_delete_branch_after_merge", 672 "default_merge_style", 673 "description", 674 "enable_prune", 675 "external_tracker", 676 "external_wiki", 677 "has_actions", 678 "has_issues", 679 "has_projects", 680 "has_pull_requests", 681 "has_wiki", 682 "ignore_whitespace_conflicts", 683 "internal_tracker", 684 "mirror_interval", 685 "name", 686 "private", 687 "template", 688 "website", 689 } 690 691 def commit(self): 692 args = {"owner": self.owner.username, "name": self.name} 693 self._commit(args) 694 695 def get_branches(self) -> List["Branch"]: 696 """Get all the Branches of this Repository.""" 697 698 results = self.allspice_client.requests_get_paginated( 699 Repository.REPO_BRANCHES % (self.owner.username, self.name) 700 ) 701 return [Branch.parse_response(self.allspice_client, result) for result in results] 702 703 def get_branch(self, name: str) -> "Branch": 704 """Get a specific Branch of this Repository.""" 705 result = self.allspice_client.requests_get( 706 Repository.REPO_BRANCH.format(owner=self.owner.username, repo=self.name, branch=name) 707 ) 708 return Branch.parse_response(self.allspice_client, result) 709 710 def add_branch(self, create_from: Ref, newname: str) -> "Branch": 711 """Add a branch to the repository""" 712 # Note: will only work with gitea 1.13 or higher! 713 714 ref_name = Util.data_params_for_ref(create_from) 715 if "ref" not in ref_name: 716 raise ValueError("create_from must be a Branch, Commit or string") 717 ref_name = ref_name["ref"] 718 719 data = {"new_branch_name": newname, "old_ref_name": ref_name} 720 result = self.allspice_client.requests_post( 721 Repository.REPO_BRANCHES % (self.owner.username, self.name), data=data 722 ) 723 return Branch.parse_response(self.allspice_client, result) 724 725 def get_issues( 726 self, 727 state: Literal["open", "closed", "all"] = "all", 728 search_query: Optional[str] = None, 729 labels: Optional[List[str]] = None, 730 milestones: Optional[List[Union[Milestone, str]]] = None, 731 assignee: Optional[Union[User, str]] = None, 732 since: Optional[datetime] = None, 733 before: Optional[datetime] = None, 734 ) -> List["Issue"]: 735 """ 736 Get all Issues of this Repository (open and closed) 737 738 https://hub.allspice.io/api/swagger#/repository/repoListIssues 739 740 All params of this method are optional filters. If you don't specify a filter, it 741 will not be applied. 742 743 :param state: The state of the Issues to get. If None, all Issues are returned. 744 :param search_query: Filter issues by text. This is equivalent to searching for 745 `search_query` in the Issues on the web interface. 746 :param labels: Filter issues by labels. 747 :param milestones: Filter issues by milestones. 748 :param assignee: Filter issues by the assigned user. 749 :param since: Filter issues by the date they were created. 750 :param before: Filter issues by the date they were created. 751 :return: A list of Issues. 752 """ 753 754 data = { 755 "state": state, 756 } 757 if search_query: 758 data["q"] = search_query 759 if labels: 760 data["labels"] = ",".join(labels) 761 if milestones: 762 data["milestone"] = ",".join( 763 [ 764 milestone.name if isinstance(milestone, Milestone) else milestone 765 for milestone in milestones 766 ] 767 ) 768 if assignee: 769 if isinstance(assignee, User): 770 data["assignee"] = assignee.username 771 else: 772 data["assignee"] = assignee 773 if since: 774 data["since"] = Util.format_time(since) 775 if before: 776 data["before"] = Util.format_time(before) 777 778 results = self.allspice_client.requests_get_paginated( 779 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 780 params=data, 781 ) 782 783 issues = [] 784 for result in results: 785 issue = Issue.parse_response(self.allspice_client, result) 786 # See Issue.request 787 setattr(issue, "_repository", self) 788 # This is mostly for compatibility with an older implementation 789 Issue._add_read_property("repo", self, issue) 790 issues.append(issue) 791 792 return issues 793 794 def get_design_reviews( 795 self, 796 state: Literal["open", "closed", "all"] = "all", 797 milestone: Optional[Union[Milestone, str]] = None, 798 labels: Optional[List[str]] = None, 799 ) -> List["DesignReview"]: 800 """ 801 Get all Design Reviews of this Repository. 802 803 https://hub.allspice.io/api/swagger#/repository/repoListPullRequests 804 805 :param state: The state of the Design Reviews to get. If None, all Design Reviews 806 are returned. 807 :param milestone: The milestone of the Design Reviews to get. 808 :param labels: A list of label IDs to filter DRs by. 809 :return: A list of Design Reviews. 810 """ 811 812 params = { 813 "state": state, 814 } 815 if milestone: 816 if isinstance(milestone, Milestone): 817 params["milestone"] = milestone.name 818 else: 819 params["milestone"] = milestone 820 if labels: 821 params["labels"] = ",".join(labels) 822 823 results = self.allspice_client.requests_get_paginated( 824 self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name), 825 params=params, 826 ) 827 return [DesignReview.parse_response(self.allspice_client, result) for result in results] 828 829 def get_commits( 830 self, 831 sha: Optional[str] = None, 832 path: Optional[str] = None, 833 stat: bool = True, 834 ) -> List["Commit"]: 835 """ 836 Get all the Commits of this Repository. 837 838 https://hub.allspice.io/api/swagger#/repository/repoGetAllCommits 839 840 :param sha: The SHA of the commit to start listing commits from. 841 :param path: filepath of a file/dir. 842 :param stat: Include the number of additions and deletions in the response. 843 Disable for speedup. 844 :return: A list of Commits. 845 """ 846 847 data = {} 848 if sha: 849 data["sha"] = sha 850 if path: 851 data["path"] = path 852 if not stat: 853 data["stat"] = False 854 855 try: 856 results = self.allspice_client.requests_get_paginated( 857 Repository.REPO_COMMITS % (self.owner.username, self.name), 858 params=data, 859 ) 860 except ConflictException as err: 861 logging.warning(err) 862 logging.warning("Repository %s/%s is Empty" % (self.owner.username, self.name)) 863 results = [] 864 return [Commit.parse_response(self.allspice_client, result) for result in results] 865 866 def get_issues_state(self, state) -> List["Issue"]: 867 """ 868 DEPRECATED: Use get_issues() instead. 869 870 Get issues of state Issue.open or Issue.closed of a repository. 871 """ 872 873 assert state in [Issue.OPENED, Issue.CLOSED] 874 issues = [] 875 data = {"state": state} 876 results = self.allspice_client.requests_get_paginated( 877 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 878 params=data, 879 ) 880 for result in results: 881 issue = Issue.parse_response(self.allspice_client, result) 882 # adding data not contained in the issue response 883 # See Issue.request() 884 setattr(issue, "_repository", self) 885 Issue._add_read_property("repo", self, issue) 886 Issue._add_read_property("owner", self.owner, issue) 887 issues.append(issue) 888 return issues 889 890 def get_times(self): 891 results = self.allspice_client.requests_get( 892 Repository.REPO_TIMES % (self.owner.username, self.name) 893 ) 894 return results 895 896 def get_user_time(self, username) -> float: 897 if isinstance(username, User): 898 username = username.username 899 results = self.allspice_client.requests_get( 900 Repository.REPO_USER_TIME % (self.owner.username, self.name, username) 901 ) 902 time = sum(r["time"] for r in results) 903 return time 904 905 def get_full_name(self) -> str: 906 return self.owner.username + "/" + self.name 907 908 def create_issue(self, title, assignees=frozenset(), description="") -> ApiObject: 909 data = { 910 "assignees": assignees, 911 "body": description, 912 "closed": False, 913 "title": title, 914 } 915 result = self.allspice_client.requests_post( 916 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 917 data=data, 918 ) 919 920 issue = Issue.parse_response(self.allspice_client, result) 921 setattr(issue, "_repository", self) 922 Issue._add_read_property("repo", self, issue) 923 return issue 924 925 def create_design_review( 926 self, 927 title: str, 928 head: Union[Branch, str], 929 base: Union[Branch, str], 930 assignees: Optional[Set[Union[User, str]]] = None, 931 body: Optional[str] = None, 932 due_date: Optional[datetime] = None, 933 milestone: Optional["Milestone"] = None, 934 ) -> "DesignReview": 935 """ 936 Create a new Design Review. 937 938 See https://hub.allspice.io/api/swagger#/repository/repoCreatePullRequest 939 940 :param title: Title of the Design Review 941 :param head: Branch or name of the branch to merge into the base branch 942 :param base: Branch or name of the branch to merge into 943 :param assignees: Optional. A list of users to assign this review. List can be of 944 User objects or of usernames. 945 :param body: An Optional Description for the Design Review. 946 :param due_date: An Optional Due date for the Design Review. 947 :param milestone: An Optional Milestone for the Design Review 948 :return: The created Design Review 949 """ 950 951 data: dict[str, Any] = { 952 "title": title, 953 } 954 955 if isinstance(head, Branch): 956 data["head"] = head.name 957 else: 958 data["head"] = head 959 if isinstance(base, Branch): 960 data["base"] = base.name 961 else: 962 data["base"] = base 963 if assignees: 964 data["assignees"] = [a.username if isinstance(a, User) else a for a in assignees] 965 if body: 966 data["body"] = body 967 if due_date: 968 data["due_date"] = Util.format_time(due_date) 969 if milestone: 970 data["milestone"] = milestone.id 971 972 result = self.allspice_client.requests_post( 973 self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name), 974 data=data, 975 ) 976 977 return DesignReview.parse_response(self.allspice_client, result) 978 979 def create_milestone( 980 self, 981 title: str, 982 description: str, 983 due_date: Optional[str] = None, 984 state: str = "open", 985 ) -> "Milestone": 986 url = Repository.REPO_MILESTONES.format(owner=self.owner.username, repo=self.name) 987 data = {"title": title, "description": description, "state": state} 988 if due_date: 989 data["due_date"] = due_date 990 result = self.allspice_client.requests_post(url, data=data) 991 return Milestone.parse_response(self.allspice_client, result) 992 993 def create_gitea_hook(self, hook_url: str, events: List[str]): 994 url = f"/repos/{self.owner.username}/{self.name}/hooks" 995 data = { 996 "type": "gitea", 997 "config": {"content_type": "json", "url": hook_url}, 998 "events": events, 999 "active": True, 1000 } 1001 return self.allspice_client.requests_post(url, data=data) 1002 1003 def list_hooks(self): 1004 url = f"/repos/{self.owner.username}/{self.name}/hooks" 1005 return self.allspice_client.requests_get(url) 1006 1007 def delete_hook(self, id: str): 1008 url = f"/repos/{self.owner.username}/{self.name}/hooks/{id}" 1009 self.allspice_client.requests_delete(url) 1010 1011 def is_collaborator(self, username) -> bool: 1012 if isinstance(username, User): 1013 username = username.username 1014 try: 1015 # returns 204 if its ok, 404 if its not 1016 self.allspice_client.requests_get( 1017 Repository.REPO_IS_COLLABORATOR % (self.owner.username, self.name, username) 1018 ) 1019 return True 1020 except Exception: 1021 return False 1022 1023 def get_users_with_access(self) -> Sequence[User]: 1024 url = f"/repos/{self.owner.username}/{self.name}/collaborators" 1025 response = self.allspice_client.requests_get(url) 1026 collabs = [User.parse_response(self.allspice_client, user) for user in response] 1027 if isinstance(self.owner, User): 1028 return [*collabs, self.owner] 1029 else: 1030 # owner must be org 1031 teams = self.owner.get_teams() 1032 for team in teams: 1033 team_repos = team.get_repos() 1034 if self.name in [n.name for n in team_repos]: 1035 collabs += team.get_members() 1036 return collabs 1037 1038 def remove_collaborator(self, user_name: str): 1039 url = f"/repos/{self.owner.username}/{self.name}/collaborators/{user_name}" 1040 self.allspice_client.requests_delete(url) 1041 1042 def transfer_ownership( 1043 self, 1044 new_owner: Union[User, Organization], 1045 new_teams: Set[Team] | FrozenSet[Team] = frozenset(), 1046 ): 1047 url = Repository.REPO_TRANSFER.format(owner=self.owner.username, repo=self.name) 1048 data: dict[str, Any] = {"new_owner": new_owner.username} 1049 if isinstance(new_owner, Organization): 1050 new_team_ids = [team.id for team in new_teams if team in new_owner.get_teams()] 1051 data["team_ids"] = new_team_ids 1052 self.allspice_client.requests_post(url, data=data) 1053 # TODO: make sure this instance is either updated or discarded 1054 1055 def get_git_content( 1056 self, 1057 ref: Optional["Ref"] = None, 1058 commit: "Optional[Commit]" = None, 1059 ) -> List[Content]: 1060 """ 1061 Get the metadata for all files in the root directory. 1062 1063 https://hub.allspice.io/api/swagger#/repository/repoGetContentsList 1064 1065 :param ref: branch or commit to get content from 1066 :param commit: commit to get content from (deprecated) 1067 """ 1068 url = f"/repos/{self.owner.username}/{self.name}/contents" 1069 data = Util.data_params_for_ref(ref or commit) 1070 1071 result = [ 1072 Content.parse_response(self.allspice_client, f) 1073 for f in self.allspice_client.requests_get(url, data) 1074 ] 1075 return result 1076 1077 def get_tree(self, ref: Optional[Ref] = None, recursive: bool = False) -> List[GitEntry]: 1078 """ 1079 Get the repository's tree on a given ref. 1080 1081 By default, this will only return the top-level entries in the tree. If you want 1082 to get the entire tree, set `recursive` to True. 1083 1084 :param ref: The ref to get the tree from. If not provided, the default branch is used. 1085 :param recursive: Whether to get the entire tree or just the top-level entries. 1086 """ 1087 1088 ref = Util.data_params_for_ref(ref).get("ref", self.default_branch) 1089 url = self.REPO_GET_TREE.format(owner=self.owner.username, repo=self.name, ref=ref) 1090 params = {"recursive": recursive} 1091 results = self.allspice_client.requests_get_paginated(url, params=params) 1092 return [GitEntry.parse_response(self.allspice_client, result) for result in results] 1093 1094 def get_file_content( 1095 self, 1096 content: Content, 1097 ref: Optional[Ref] = None, 1098 commit: Optional[Commit] = None, 1099 ) -> Union[str, List["Content"]]: 1100 """https://hub.allspice.io/api/swagger#/repository/repoGetContents""" 1101 url = f"/repos/{self.owner.username}/{self.name}/contents/{content.path}" 1102 data = Util.data_params_for_ref(ref or commit) 1103 1104 if content.type == Content.FILE: 1105 return self.allspice_client.requests_get(url, data)["content"] 1106 else: 1107 return [ 1108 Content.parse_response(self.allspice_client, f) 1109 for f in self.allspice_client.requests_get(url, data) 1110 ] 1111 1112 def get_raw_file( 1113 self, 1114 file_path: str, 1115 ref: Optional[Ref] = None, 1116 ) -> bytes: 1117 """ 1118 Get the raw, binary data of a single file. 1119 1120 Note 1: if the file you are requesting is a text file, you might want to 1121 use .decode() on the result to get a string. For example: 1122 1123 content = repo.get_raw_file("file.txt").decode("utf-8") 1124 1125 Note 2: this method will store the entire file in memory. If you want 1126 to download a large file, you might want to use `download_to_file` 1127 instead. 1128 1129 See https://hub.allspice.io/api/swagger#/repository/repoGetRawFile 1130 1131 :param file_path: The path to the file to get. 1132 :param ref: The branch or commit to get the file from. If not provided, 1133 the default branch is used. 1134 """ 1135 1136 url = self.REPO_GET_RAW_FILE.format( 1137 owner=self.owner.username, 1138 repo=self.name, 1139 path=file_path, 1140 ) 1141 params = Util.data_params_for_ref(ref) 1142 return self.allspice_client.requests_get_raw(url, params=params) 1143 1144 def download_to_file( 1145 self, 1146 file_path: str, 1147 io: IO, 1148 ref: Optional[Ref] = None, 1149 ) -> None: 1150 """ 1151 Download the binary data of a file to a file-like object. 1152 1153 Example: 1154 1155 with open("schematic.DSN", "wb") as f: 1156 Repository.download_to_file("Schematics/my_schematic.DSN", f) 1157 1158 :param file_path: The path to the file in the repository from the root 1159 of the repository. 1160 :param io: The file-like object to write the data to. 1161 """ 1162 1163 url = self.allspice_client._AllSpice__get_url( 1164 self.REPO_GET_RAW_FILE.format( 1165 owner=self.owner.username, 1166 repo=self.name, 1167 path=file_path, 1168 ) 1169 ) 1170 params = Util.data_params_for_ref(ref) 1171 response = self.allspice_client.requests.get( 1172 url, 1173 params=params, 1174 headers=self.allspice_client.headers, 1175 stream=True, 1176 ) 1177 1178 for chunk in response.iter_content(chunk_size=4096): 1179 if chunk: 1180 io.write(chunk) 1181 1182 def get_generated_json(self, content: Union[Content, str], ref: Optional[Ref] = None) -> dict: 1183 """ 1184 Get the json blob for a cad file if it exists, otherwise enqueue 1185 a new job and return a 503 status. 1186 1187 WARNING: This is still experimental and not recommended for critical 1188 applications. The structure and content of the returned dictionary can 1189 change at any time. 1190 1191 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON 1192 """ 1193 1194 if isinstance(content, Content): 1195 content = content.path 1196 1197 url = self.REPO_GET_ALLSPICE_JSON.format( 1198 owner=self.owner.username, 1199 repo=self.name, 1200 content=content, 1201 ) 1202 data = Util.data_params_for_ref(ref) 1203 return self.allspice_client.requests_get(url, data) 1204 1205 def get_generated_svg(self, content: Union[Content, str], ref: Optional[Ref] = None) -> bytes: 1206 """ 1207 Get the svg blob for a cad file if it exists, otherwise enqueue 1208 a new job and return a 503 status. 1209 1210 WARNING: This is still experimental and not yet recommended for 1211 critical applications. The content of the returned svg can change 1212 at any time. 1213 1214 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG 1215 """ 1216 1217 if isinstance(content, Content): 1218 content = content.path 1219 1220 url = self.REPO_GET_ALLSPICE_SVG.format( 1221 owner=self.owner.username, 1222 repo=self.name, 1223 content=content, 1224 ) 1225 data = Util.data_params_for_ref(ref) 1226 return self.allspice_client.requests_get_raw(url, data) 1227 1228 def get_generated_projectdata( 1229 self, content: Union[Content, str], ref: Optional[Ref] = None 1230 ) -> dict: 1231 """ 1232 Get the json project data based on the cad file provided 1233 1234 WARNING: This is still experimental and not yet recommended for 1235 critical applications. The content of the returned dictionary can change 1236 at any time. 1237 1238 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceProject 1239 """ 1240 if isinstance(content, Content): 1241 content = content.path 1242 1243 url = self.REPO_GET_ALLSPICE_PROJECT.format( 1244 owner=self.owner.username, 1245 repo=self.name, 1246 content=content, 1247 ) 1248 data = Util.data_params_for_ref(ref) 1249 return self.allspice_client.requests_get(url, data) 1250 1251 def create_file(self, file_path: str, content: str, data: Optional[dict] = None): 1252 """https://hub.allspice.io/api/swagger#/repository/repoCreateFile""" 1253 if not data: 1254 data = {} 1255 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1256 data.update({"content": content}) 1257 return self.allspice_client.requests_post(url, data) 1258 1259 def change_file(self, file_path: str, file_sha: str, content: str, data: Optional[dict] = None): 1260 """https://hub.allspice.io/api/swagger#/repository/repoCreateFile""" 1261 if not data: 1262 data = {} 1263 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1264 data.update({"sha": file_sha, "content": content}) 1265 return self.allspice_client.requests_put(url, data) 1266 1267 def delete_file(self, file_path: str, file_sha: str, data: Optional[dict] = None): 1268 """https://hub.allspice.io/api/swagger#/repository/repoDeleteFile""" 1269 if not data: 1270 data = {} 1271 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1272 data.update({"sha": file_sha}) 1273 return self.allspice_client.requests_delete(url, data) 1274 1275 def get_archive( 1276 self, 1277 ref: Ref = "main", 1278 archive_format: ArchiveFormat = ArchiveFormat.ZIP, 1279 ) -> bytes: 1280 """ 1281 Download all the files in a specific ref of a repository as a zip or tarball 1282 archive. 1283 1284 https://hub.allspice.io/api/swagger#/repository/repoGetArchive 1285 1286 :param ref: branch or commit to get content from, defaults to the "main" branch 1287 :param archive_format: zip or tar, defaults to zip 1288 """ 1289 1290 ref_string = Util.data_params_for_ref(ref)["ref"] 1291 url = self.REPO_GET_ARCHIVE.format( 1292 owner=self.owner.username, 1293 repo=self.name, 1294 ref=ref_string, 1295 format=archive_format.value, 1296 ) 1297 return self.allspice_client.requests_get_raw(url) 1298 1299 def get_topics(self) -> list[str]: 1300 """ 1301 Gets the list of topics on this repository. 1302 1303 See http://localhost:3000/api/swagger#/repository/repoListTopics 1304 """ 1305 1306 url = self.REPO_GET_TOPICS.format( 1307 owner=self.owner.username, 1308 repo=self.name, 1309 ) 1310 return self.allspice_client.requests_get(url)["topics"] 1311 1312 def add_topic(self, topic: str): 1313 """ 1314 Adds a topic to the repository. 1315 1316 See https://hub.allspice.io/api/swagger#/repository/repoAddTopic 1317 1318 :param topic: The topic to add. Topic names must consist only of 1319 lowercase letters, numnbers and dashes (-), and cannot start with 1320 dashes. Topic names also must be under 35 characters long. 1321 """ 1322 1323 url = self.REPO_ADD_TOPIC.format(owner=self.owner.username, repo=self.name, topic=topic) 1324 self.allspice_client.requests_put(url) 1325 1326 def create_release( 1327 self, 1328 tag_name: str, 1329 name: Optional[str] = None, 1330 body: Optional[str] = None, 1331 draft: bool = False, 1332 ): 1333 """ 1334 Create a release for this repository. The release will be created for 1335 the tag with the given name. If there is no tag with this name, create 1336 the tag first. 1337 1338 See https://hub.allspice.io/api/swagger#/repository/repoCreateRelease 1339 """ 1340 1341 url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name) 1342 data = { 1343 "tag_name": tag_name, 1344 "draft": draft, 1345 } 1346 if name is not None: 1347 data["name"] = name 1348 if body is not None: 1349 data["body"] = body 1350 response = self.allspice_client.requests_post(url, data) 1351 return Release.parse_response(self.allspice_client, response, self) 1352 1353 def get_releases( 1354 self, draft: Optional[bool] = None, pre_release: Optional[bool] = None 1355 ) -> List[Release]: 1356 """ 1357 Get the list of releases for this repository. 1358 1359 See https://hub.allspice.io/api/swagger#/repository/repoListReleases 1360 """ 1361 1362 data = {} 1363 1364 if draft is not None: 1365 data["draft"] = draft 1366 if pre_release is not None: 1367 data["pre-release"] = pre_release 1368 1369 url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name) 1370 responses = self.allspice_client.requests_get_paginated(url, params=data) 1371 1372 return [ 1373 Release.parse_response(self.allspice_client, response, self) for response in responses 1374 ] 1375 1376 def get_latest_release(self) -> Release: 1377 """ 1378 Get the latest release for this repository. 1379 1380 See https://hub.allspice.io/api/swagger#/repository/repoGetLatestRelease 1381 """ 1382 1383 url = self.REPO_GET_LATEST_RELEASE.format(owner=self.owner.username, repo=self.name) 1384 response = self.allspice_client.requests_get(url) 1385 release = Release.parse_response(self.allspice_client, response, self) 1386 return release 1387 1388 def get_release_by_tag(self, tag: str) -> Release: 1389 """ 1390 Get a release by its tag. 1391 1392 See https://hub.allspice.io/api/swagger#/repository/repoGetReleaseByTag 1393 """ 1394 1395 url = self.REPO_GET_RELEASE_BY_TAG.format( 1396 owner=self.owner.username, repo=self.name, tag=tag 1397 ) 1398 response = self.allspice_client.requests_get(url) 1399 release = Release.parse_response(self.allspice_client, response, self) 1400 return release 1401 1402 def get_commit_statuses( 1403 self, 1404 commit: Union[str, Commit], 1405 sort: Optional[CommitStatusSort] = None, 1406 state: Optional[CommitStatusState] = None, 1407 ) -> List[CommitStatus]: 1408 """ 1409 Get a list of statuses for a commit. 1410 1411 This is roughly equivalent to the Commit.get_statuses method, but this 1412 method allows you to sort and filter commits and is more convenient if 1413 you have a commit SHA and don't need to get the commit itself. 1414 1415 See https://hub.allspice.io/api/swagger#/repository/repoListStatuses 1416 """ 1417 1418 if isinstance(commit, Commit): 1419 commit = commit.sha 1420 1421 params = {} 1422 if sort is not None: 1423 params["sort"] = sort.value 1424 if state is not None: 1425 params["state"] = state.value 1426 1427 url = self.REPO_GET_COMMIT_STATUS.format( 1428 owner=self.owner.username, repo=self.name, sha=commit 1429 ) 1430 response = self.allspice_client.requests_get_paginated(url, params=params) 1431 return [CommitStatus.parse_response(self.allspice_client, status) for status in response] 1432 1433 def create_commit_status( 1434 self, 1435 commit: Union[str, Commit], 1436 context: Optional[str] = None, 1437 description: Optional[str] = None, 1438 state: Optional[CommitStatusState] = None, 1439 target_url: Optional[str] = None, 1440 ) -> CommitStatus: 1441 """ 1442 Create a status on a commit. 1443 1444 See https://hub.allspice.io/api/swagger#/repository/repoCreateStatus 1445 """ 1446 1447 if isinstance(commit, Commit): 1448 commit = commit.sha 1449 1450 data = {} 1451 if context is not None: 1452 data["context"] = context 1453 if description is not None: 1454 data["description"] = description 1455 if state is not None: 1456 data["state"] = state.value 1457 if target_url is not None: 1458 data["target_url"] = target_url 1459 1460 url = self.REPO_GET_COMMIT_STATUS.format( 1461 owner=self.owner.username, repo=self.name, sha=commit 1462 ) 1463 response = self.allspice_client.requests_post(url, data=data) 1464 return CommitStatus.parse_response(self.allspice_client, response) 1465 1466 def delete(self): 1467 self.allspice_client.requests_delete( 1468 Repository.REPO_DELETE % (self.owner.username, self.name) 1469 ) 1470 self.deleted = True 1471 1472 1473class Milestone(ApiObject): 1474 allow_merge_commits: Any 1475 allow_rebase: Any 1476 allow_rebase_explicit: Any 1477 allow_squash_merge: Any 1478 archived: Any 1479 closed_at: Any 1480 closed_issues: int 1481 created_at: str 1482 default_branch: Any 1483 description: str 1484 due_on: Any 1485 has_issues: Any 1486 has_pull_requests: Any 1487 has_wiki: Any 1488 id: int 1489 ignore_whitespace_conflicts: Any 1490 name: Any 1491 open_issues: int 1492 private: Any 1493 state: str 1494 title: str 1495 updated_at: str 1496 website: Any 1497 1498 API_OBJECT = """/repos/{owner}/{repo}/milestones/{number}""" # <owner, repo> 1499 1500 def __init__(self, allspice_client): 1501 super().__init__(allspice_client) 1502 1503 def __eq__(self, other): 1504 if not isinstance(other, Milestone): 1505 return False 1506 return self.allspice_client == other.allspice_client and self.id == other.id 1507 1508 def __hash__(self): 1509 return hash(self.allspice_client) ^ hash(self.id) 1510 1511 _fields_to_parsers: ClassVar[dict] = { 1512 "closed_at": lambda _, t: Util.convert_time(t), 1513 "due_on": lambda _, t: Util.convert_time(t), 1514 } 1515 1516 _patchable_fields: ClassVar[set[str]] = { 1517 "allow_merge_commits", 1518 "allow_rebase", 1519 "allow_rebase_explicit", 1520 "allow_squash_merge", 1521 "archived", 1522 "default_branch", 1523 "description", 1524 "has_issues", 1525 "has_pull_requests", 1526 "has_wiki", 1527 "ignore_whitespace_conflicts", 1528 "name", 1529 "private", 1530 "website", 1531 } 1532 1533 @classmethod 1534 def request(cls, allspice_client, owner: str, repo: str, number: str): 1535 return cls._request(allspice_client, {"owner": owner, "repo": repo, "number": number}) 1536 1537 1538class Attachment(ReadonlyApiObject): 1539 """ 1540 An asset attached to a comment. 1541 1542 You cannot edit or delete the attachment from this object - see the instance methods 1543 Comment.edit_attachment and delete_attachment for that. 1544 """ 1545 1546 browser_download_url: str 1547 created_at: str 1548 download_count: int 1549 id: int 1550 name: str 1551 size: int 1552 uuid: str 1553 1554 def __init__(self, allspice_client): 1555 super().__init__(allspice_client) 1556 1557 def __eq__(self, other): 1558 if not isinstance(other, Attachment): 1559 return False 1560 1561 return self.uuid == other.uuid 1562 1563 def __hash__(self): 1564 return hash(self.uuid) 1565 1566 def download_to_file(self, io: IO): 1567 """ 1568 Download the raw, binary data of this Attachment to a file-like object. 1569 1570 Example: 1571 1572 with open("my_file.zip", "wb") as f: 1573 attachment.download_to_file(f) 1574 1575 :param io: The file-like object to write the data to. 1576 """ 1577 1578 response = self.allspice_client.requests.get( 1579 self.browser_download_url, 1580 headers=self.allspice_client.headers, 1581 stream=True, 1582 ) 1583 # 4kb chunks 1584 for chunk in response.iter_content(chunk_size=4096): 1585 if chunk: 1586 io.write(chunk) 1587 1588 1589class Comment(ApiObject): 1590 assets: List[Union[Any, Dict[str, Union[int, str]]]] 1591 body: str 1592 created_at: datetime 1593 html_url: str 1594 id: int 1595 issue_url: str 1596 original_author: str 1597 original_author_id: int 1598 pull_request_url: str 1599 updated_at: datetime 1600 user: User 1601 1602 API_OBJECT = """/repos/{owner}/{repo}/issues/comments/{id}""" 1603 GET_ATTACHMENTS_PATH = """/repos/{owner}/{repo}/issues/comments/{id}/assets""" 1604 ATTACHMENT_PATH = """/repos/{owner}/{repo}/issues/comments/{id}/assets/{attachment_id}""" 1605 1606 def __init__(self, allspice_client): 1607 super().__init__(allspice_client) 1608 1609 def __eq__(self, other): 1610 if not isinstance(other, Comment): 1611 return False 1612 return self.repository == other.repository and self.id == other.id 1613 1614 def __hash__(self): 1615 return hash(self.repository) ^ hash(self.id) 1616 1617 @classmethod 1618 def request(cls, allspice_client, owner: str, repo: str, id: str) -> "Comment": 1619 return cls._request(allspice_client, {"owner": owner, "repo": repo, "id": id}) 1620 1621 _fields_to_parsers: ClassVar[dict] = { 1622 "user": lambda allspice_client, r: User.parse_response(allspice_client, r), 1623 "created_at": lambda _, t: Util.convert_time(t), 1624 "updated_at": lambda _, t: Util.convert_time(t), 1625 } 1626 1627 _patchable_fields: ClassVar[set[str]] = {"body"} 1628 1629 @property 1630 def parent_url(self) -> str: 1631 """URL of the parent of this comment (the issue or the pull request)""" 1632 1633 if self.issue_url is not None and self.issue_url != "": 1634 return self.issue_url 1635 else: 1636 return self.pull_request_url 1637 1638 @cached_property 1639 def repository(self) -> Repository: 1640 """The repository this comment was posted on.""" 1641 1642 owner_name, repo_name = self.parent_url.split("/")[-4:-2] 1643 return Repository.request(self.allspice_client, owner_name, repo_name) 1644 1645 def __fields_for_path(self): 1646 return { 1647 "owner": self.repository.owner.username, 1648 "repo": self.repository.name, 1649 "id": self.id, 1650 } 1651 1652 def commit(self): 1653 self._commit(self.__fields_for_path()) 1654 1655 def delete(self): 1656 self.allspice_client.requests_delete(self.API_OBJECT.format(**self.__fields_for_path())) 1657 self.deleted = True 1658 1659 def get_attachments(self) -> List[Attachment]: 1660 """ 1661 Get all attachments on this comment. This returns Attachment objects, which 1662 contain a link to download the attachment. 1663 1664 https://hub.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments 1665 """ 1666 1667 results = self.allspice_client.requests_get( 1668 self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()) 1669 ) 1670 return [Attachment.parse_response(self.allspice_client, result) for result in results] 1671 1672 def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment: 1673 """ 1674 Create an attachment on this comment. 1675 1676 https://hub.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment 1677 1678 :param file: The file to attach. This should be a file-like object. 1679 :param name: The name of the file. If not provided, the name of the file will be 1680 used. 1681 :return: The created attachment. 1682 """ 1683 1684 args: dict[str, Any] = { 1685 "files": {"attachment": file}, 1686 } 1687 if name is not None: 1688 args["params"] = {"name": name} 1689 1690 result = self.allspice_client.requests_post( 1691 self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()), 1692 **args, 1693 ) 1694 return Attachment.parse_response(self.allspice_client, result) 1695 1696 def edit_attachment(self, attachment: Attachment, data: dict) -> Attachment: 1697 """ 1698 Edit an attachment. 1699 1700 The list of params that can be edited is available at 1701 https://hub.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment 1702 1703 :param attachment: The attachment to be edited 1704 :param data: The data parameter should be a dictionary of the fields to edit. 1705 :return: The edited attachment 1706 """ 1707 1708 args = { 1709 **self.__fields_for_path(), 1710 "attachment_id": attachment.id, 1711 } 1712 result = self.allspice_client.requests_patch( 1713 self.ATTACHMENT_PATH.format(**args), 1714 data=data, 1715 ) 1716 return Attachment.parse_response(self.allspice_client, result) 1717 1718 def delete_attachment(self, attachment: Attachment): 1719 """https://hub.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment""" 1720 1721 args = { 1722 **self.__fields_for_path(), 1723 "attachment_id": attachment.id, 1724 } 1725 self.allspice_client.requests_delete(self.ATTACHMENT_PATH.format(**args)) 1726 attachment.deleted = True 1727 1728 1729class Commit(ReadonlyApiObject): 1730 author: User 1731 commit: Dict[str, Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]] 1732 committer: Dict[str, Union[int, str, bool]] 1733 created: str 1734 files: List[Dict[str, str]] 1735 html_url: str 1736 inner_commit: Dict[str, Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]] 1737 parents: List[Union[Dict[str, str], Any]] 1738 sha: str 1739 stats: Dict[str, int] 1740 url: str 1741 1742 API_OBJECT = """/repos/{owner}/{repo}/commits/{sha}""" 1743 COMMIT_GET_STATUS = """/repos/{owner}/{repo}/commits/{sha}/status""" 1744 COMMIT_GET_STATUSES = """/repos/{owner}/{repo}/commits/{sha}/statuses""" 1745 1746 # Regex to extract owner and repo names from the url property 1747 URL_REGEXP = re.compile(r"/repos/([^/]+)/([^/]+)/git/commits") 1748 1749 def __init__(self, allspice_client): 1750 super().__init__(allspice_client) 1751 1752 _fields_to_parsers: ClassVar[dict] = { 1753 # NOTE: api may return None for commiters that are no allspice users 1754 "author": lambda allspice_client, u: ( 1755 User.parse_response(allspice_client, u) if u else None 1756 ) 1757 } 1758 1759 def __eq__(self, other): 1760 if not isinstance(other, Commit): 1761 return False 1762 return self.sha == other.sha 1763 1764 def __hash__(self): 1765 return hash(self.sha) 1766 1767 @classmethod 1768 def parse_response(cls, allspice_client, result) -> "Commit": 1769 commit_cache = result["commit"] 1770 api_object = cls(allspice_client) 1771 cls._initialize(allspice_client, api_object, result) 1772 # inner_commit for legacy reasons 1773 Commit._add_read_property("inner_commit", commit_cache, api_object) 1774 return api_object 1775 1776 def get_status(self) -> CommitCombinedStatus: 1777 """ 1778 Get a combined status consisting of all statues on this commit. 1779 1780 Note that the returned object is a CommitCombinedStatus object, which 1781 also contains a list of all statuses on the commit. 1782 1783 https://hub.allspice.io/api/swagger#/repository/repoGetCommitStatus 1784 """ 1785 1786 result = self.allspice_client.requests_get( 1787 self.COMMIT_GET_STATUS.format(**self._fields_for_path) 1788 ) 1789 return CommitCombinedStatus.parse_response(self.allspice_client, result) 1790 1791 def get_statuses(self) -> List[CommitStatus]: 1792 """ 1793 Get a list of all statuses on this commit. 1794 1795 https://hub.allspice.io/api/swagger#/repository/repoListCommitStatuses 1796 """ 1797 1798 results = self.allspice_client.requests_get( 1799 self.COMMIT_GET_STATUSES.format(**self._fields_for_path) 1800 ) 1801 return [CommitStatus.parse_response(self.allspice_client, result) for result in results] 1802 1803 @cached_property 1804 def _fields_for_path(self) -> dict[str, str]: 1805 matches = self.URL_REGEXP.search(self.url) 1806 if not matches: 1807 raise ValueError(f"Invalid commit URL: {self.url}") 1808 1809 return { 1810 "owner": matches.group(1), 1811 "repo": matches.group(2), 1812 "sha": self.sha, 1813 } 1814 1815 1816class CommitStatusState(Enum): 1817 PENDING = "pending" 1818 SUCCESS = "success" 1819 ERROR = "error" 1820 FAILURE = "failure" 1821 WARNING = "warning" 1822 1823 @classmethod 1824 def try_init(cls, value: str) -> CommitStatusState | str: 1825 """ 1826 Try converting a string to the enum, and if that fails, return the 1827 string itself. 1828 """ 1829 1830 try: 1831 return cls(value) 1832 except ValueError: 1833 return value 1834 1835 1836class CommitStatus(ReadonlyApiObject): 1837 context: str 1838 created_at: str 1839 creator: User 1840 description: str 1841 id: int 1842 status: CommitStatusState 1843 target_url: str 1844 updated_at: str 1845 url: str 1846 1847 def __init__(self, allspice_client): 1848 super().__init__(allspice_client) 1849 1850 _fields_to_parsers: ClassVar[dict] = { 1851 # Gitea/ASH doesn't actually validate that the status is a "valid" 1852 # status, so we can expect empty or unknown strings in the status field. 1853 "status": lambda _, s: CommitStatusState.try_init(s), 1854 "creator": lambda allspice_client, u: ( 1855 User.parse_response(allspice_client, u) if u else None 1856 ), 1857 } 1858 1859 def __eq__(self, other): 1860 if not isinstance(other, CommitStatus): 1861 return False 1862 return self.id == other.id 1863 1864 def __hash__(self): 1865 return hash(self.id) 1866 1867 1868class CommitCombinedStatus(ReadonlyApiObject): 1869 commit_url: str 1870 repository: Repository 1871 sha: str 1872 state: CommitStatusState 1873 statuses: List["CommitStatus"] 1874 total_count: int 1875 url: str 1876 1877 def __init__(self, allspice_client): 1878 super().__init__(allspice_client) 1879 1880 _fields_to_parsers: ClassVar[dict] = { 1881 # See CommitStatus 1882 "state": lambda _, s: CommitStatusState.try_init(s), 1883 "statuses": lambda allspice_client, statuses: [ 1884 CommitStatus.parse_response(allspice_client, status) for status in statuses 1885 ], 1886 "repository": lambda allspice_client, r: Repository.parse_response(allspice_client, r), 1887 } 1888 1889 def __eq__(self, other): 1890 if not isinstance(other, CommitCombinedStatus): 1891 return False 1892 return self.sha == other.sha 1893 1894 def __hash__(self): 1895 return hash(self.sha) 1896 1897 @classmethod 1898 def parse_response(cls, allspice_client, result) -> "CommitCombinedStatus": 1899 api_object = cls(allspice_client) 1900 cls._initialize(allspice_client, api_object, result) 1901 return api_object 1902 1903 1904class Issue(ApiObject): 1905 """ 1906 An issue on a repository. 1907 1908 Note: `Issue.assets` may not have any entries even if the issue has 1909 attachments. This happens when an issue is fetched via a bulk method like 1910 `Repository.get_issues`. In most cases, prefer using 1911 `Issue.get_attachments` to get the attachments on an issue. 1912 """ 1913 1914 assets: List[Union[Any, "Attachment"]] 1915 assignee: Any 1916 assignees: Any 1917 body: str 1918 closed_at: Any 1919 comments: int 1920 created_at: str 1921 due_date: Any 1922 html_url: str 1923 id: int 1924 is_locked: bool 1925 labels: List[Any] 1926 milestone: Optional["Milestone"] 1927 number: int 1928 original_author: str 1929 original_author_id: int 1930 pin_order: int 1931 pull_request: Any 1932 ref: str 1933 repository: Dict[str, Union[int, str]] 1934 state: str 1935 title: str 1936 updated_at: str 1937 url: str 1938 user: User 1939 1940 API_OBJECT = """/repos/{owner}/{repo}/issues/{index}""" # <owner, repo, index> 1941 GET_TIME = """/repos/%s/%s/issues/%s/times""" # <owner, repo, index> 1942 GET_COMMENTS = """/repos/{owner}/{repo}/issues/{index}/comments""" 1943 CREATE_ISSUE = """/repos/{owner}/{repo}/issues""" 1944 GET_ATTACHMENTS = """/repos/{owner}/{repo}/issues/{index}/assets""" 1945 1946 OPENED = "open" 1947 CLOSED = "closed" 1948 1949 def __init__(self, allspice_client): 1950 super().__init__(allspice_client) 1951 1952 def __eq__(self, other): 1953 if not isinstance(other, Issue): 1954 return False 1955 return self.repository == other.repository and self.id == other.id 1956 1957 def __hash__(self): 1958 return hash(self.repository) ^ hash(self.id) 1959 1960 _fields_to_parsers: ClassVar[dict] = { 1961 "milestone": lambda allspice_client, m: Milestone.parse_response(allspice_client, m), 1962 "user": lambda allspice_client, u: User.parse_response(allspice_client, u), 1963 "assets": lambda allspice_client, assets: [ 1964 Attachment.parse_response(allspice_client, a) for a in assets 1965 ], 1966 "assignee": lambda allspice_client, u: User.parse_response(allspice_client, u), 1967 "assignees": lambda allspice_client, us: [ 1968 User.parse_response(allspice_client, u) for u in us 1969 ], 1970 "state": lambda _, s: (Issue.CLOSED if s == "closed" else Issue.OPENED), 1971 } 1972 1973 _parsers_to_fields: ClassVar[dict] = { 1974 "milestone": lambda m: m.id, 1975 } 1976 1977 _patchable_fields: ClassVar[set[str]] = { 1978 "assignee", 1979 "assignees", 1980 "body", 1981 "due_date", 1982 "milestone", 1983 "state", 1984 "title", 1985 } 1986 1987 def commit(self): 1988 args = { 1989 "owner": self.repository.owner.username, 1990 "repo": self.repository.name, 1991 "index": self.number, 1992 } 1993 self._commit(args) 1994 1995 @classmethod 1996 def request(cls, allspice_client, owner: str, repo: str, number: str): 1997 api_object = cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number}) 1998 # The repository in the response is a RepositoryMeta object, so request 1999 # the full repository object and add it to the issue object. 2000 repository = Repository.request(allspice_client, owner, repo) 2001 setattr(api_object, "_repository", repository) 2002 # For legacy reasons 2003 cls._add_read_property("repo", repository, api_object) 2004 return api_object 2005 2006 @classmethod 2007 def create_issue(cls, allspice_client, repo: Repository, title: str, body: str = ""): 2008 args = {"owner": repo.owner.username, "repo": repo.name} 2009 data = {"title": title, "body": body} 2010 result = allspice_client.requests_post(Issue.CREATE_ISSUE.format(**args), data=data) 2011 issue = Issue.parse_response(allspice_client, result) 2012 setattr(issue, "_repository", repo) 2013 cls._add_read_property("repo", repo, issue) 2014 return issue 2015 2016 @property 2017 def owner(self) -> Organization | User: 2018 return self.repository.owner 2019 2020 def get_time_sum(self, user: User) -> int: 2021 results = self.allspice_client.requests_get( 2022 Issue.GET_TIME % (self.owner.username, self.repository.name, self.number) 2023 ) 2024 return sum(result["time"] for result in results if result and result["user_id"] == user.id) 2025 2026 def get_times(self) -> Optional[Dict]: 2027 return self.allspice_client.requests_get( 2028 Issue.GET_TIME % (self.owner.username, self.repository.name, self.number) 2029 ) 2030 2031 def delete_time(self, time_id: str): 2032 path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times/{time_id}" 2033 self.allspice_client.requests_delete(path) 2034 2035 def add_time(self, time: int, created: Optional[str] = None, user_name: Optional[User] = None): 2036 path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times" 2037 self.allspice_client.requests_post( 2038 path, data={"created": created, "time": int(time), "user_name": user_name} 2039 ) 2040 2041 def get_comments(self) -> List[Comment]: 2042 """https://hub.allspice.io/api/swagger#/issue/issueGetComments""" 2043 2044 results = self.allspice_client.requests_get( 2045 self.GET_COMMENTS.format( 2046 owner=self.owner.username, repo=self.repository.name, index=self.number 2047 ) 2048 ) 2049 2050 return [Comment.parse_response(self.allspice_client, result) for result in results] 2051 2052 def create_comment(self, body: str) -> Comment: 2053 """https://hub.allspice.io/api/swagger#/issue/issueCreateComment""" 2054 2055 path = self.GET_COMMENTS.format( 2056 owner=self.owner.username, repo=self.repository.name, index=self.number 2057 ) 2058 2059 response = self.allspice_client.requests_post(path, data={"body": body}) 2060 return Comment.parse_response(self.allspice_client, response) 2061 2062 def get_attachments(self) -> List[Attachment]: 2063 """ 2064 Fetch all attachments on this issue. 2065 2066 Unlike the assets field, this will always fetch all attachments from the 2067 API. 2068 2069 See https://hub.allspice.io/api/swagger#/issue/issueListIssueAttachments 2070 """ 2071 2072 path = self.GET_ATTACHMENTS.format( 2073 owner=self.owner.username, repo=self.repository.name, index=self.number 2074 ) 2075 response = self.allspice_client.requests_get(path) 2076 2077 return [Attachment.parse_response(self.allspice_client, result) for result in response] 2078 2079 def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment: 2080 """ 2081 Create an attachment on this issue. 2082 2083 https://hub.allspice.io/api/swagger#/issue/issueCreateIssueAttachment 2084 2085 :param file: The file to attach. This should be a file-like object. 2086 :param name: The name of the file. If not provided, the name of the file will be 2087 used. 2088 :return: The created attachment. 2089 """ 2090 2091 args: dict[str, Any] = { 2092 "files": {"attachment": file}, 2093 } 2094 if name is not None: 2095 args["params"] = {"name": name} 2096 2097 result = self.allspice_client.requests_post( 2098 self.GET_ATTACHMENTS.format( 2099 owner=self.owner.username, repo=self.repository.name, index=self.number 2100 ), 2101 **args, 2102 ) 2103 2104 return Attachment.parse_response(self.allspice_client, result) 2105 2106 2107class DesignReview(ApiObject): 2108 """ 2109 A Design Review. See 2110 https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest. 2111 2112 Note: The base and head fields are not `Branch` objects - they are plain strings 2113 referring to the branch names. This is because DRs can exist for branches that have 2114 been deleted, which don't have an associated `Branch` object from the API. You can use 2115 the `Repository.get_branch` method to get a `Branch` object for a branch if you know 2116 it exists. 2117 """ 2118 2119 additions: int 2120 allow_maintainer_edit: bool 2121 allow_maintainer_edits: Any 2122 assignee: User 2123 assignees: List["User"] 2124 base: str 2125 body: str 2126 changed_files: int 2127 closed_at: Optional[str] 2128 comments: int 2129 created_at: str 2130 deletions: int 2131 diff_url: str 2132 draft: bool 2133 due_date: Optional[str] 2134 head: str 2135 html_url: str 2136 id: int 2137 is_locked: bool 2138 labels: List[Any] 2139 merge_base: str 2140 merge_commit_sha: Optional[str] 2141 mergeable: bool 2142 merged: bool 2143 merged_at: Optional[str] 2144 merged_by: Optional["User"] 2145 milestone: Any 2146 number: int 2147 patch_url: str 2148 pin_order: int 2149 repository: Optional["Repository"] 2150 requested_reviewers: Any 2151 review_comments: int 2152 state: str 2153 title: str 2154 updated_at: str 2155 url: str 2156 user: User 2157 2158 API_OBJECT = "/repos/{owner}/{repo}/pulls/{index}" 2159 MERGE_DESIGN_REVIEW = "/repos/{owner}/{repo}/pulls/{index}/merge" 2160 GET_COMMENTS = "/repos/{owner}/{repo}/issues/{index}/comments" 2161 2162 OPEN = "open" 2163 CLOSED = "closed" 2164 2165 class MergeType(Enum): 2166 MERGE = "merge" 2167 REBASE = "rebase" 2168 REBASE_MERGE = "rebase-merge" 2169 SQUASH = "squash" 2170 MANUALLY_MERGED = "manually-merged" 2171 2172 def __init__(self, allspice_client): 2173 super().__init__(allspice_client) 2174 2175 def __eq__(self, other): 2176 if not isinstance(other, DesignReview): 2177 return False 2178 return self.repository == other.repository and self.id == other.id 2179 2180 def __hash__(self): 2181 return hash(self.repository) ^ hash(self.id) 2182 2183 @classmethod 2184 def parse_response(cls, allspice_client, result) -> "DesignReview": 2185 api_object = super().parse_response(allspice_client, result) 2186 cls._add_read_property( 2187 "repository", 2188 Repository.parse_response(allspice_client, result["base"]["repo"]), 2189 api_object, 2190 ) 2191 2192 return api_object 2193 2194 @classmethod 2195 def request(cls, allspice_client, owner: str, repo: str, number: str): 2196 """See https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest""" 2197 return cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number}) 2198 2199 _fields_to_parsers: ClassVar[dict] = { 2200 "assignee": lambda allspice_client, u: User.parse_response(allspice_client, u), 2201 "assignees": lambda allspice_client, us: [ 2202 User.parse_response(allspice_client, u) for u in us 2203 ], 2204 "base": lambda _, b: b["ref"], 2205 "head": lambda _, h: h["ref"], 2206 "merged_by": lambda allspice_client, u: User.parse_response(allspice_client, u), 2207 "milestone": lambda allspice_client, m: Milestone.parse_response(allspice_client, m), 2208 "user": lambda allspice_client, u: User.parse_response(allspice_client, u), 2209 } 2210 2211 _patchable_fields: ClassVar[set[str]] = { 2212 "allow_maintainer_edits", 2213 "assignee", 2214 "assignees", 2215 "base", 2216 "body", 2217 "due_date", 2218 "milestone", 2219 "state", 2220 "title", 2221 } 2222 2223 _parsers_to_fields: ClassVar[dict] = { 2224 "assignee": lambda u: u.username, 2225 "assignees": lambda us: [u.username for u in us], 2226 "base": lambda b: b.name if isinstance(b, Branch) else b, 2227 "milestone": lambda m: m.id, 2228 } 2229 2230 def commit(self): 2231 data = self.get_dirty_fields() 2232 if "due_date" in data and data["due_date"] is None: 2233 data["unset_due_date"] = True 2234 2235 args = { 2236 "owner": self.repository.owner.username, 2237 "repo": self.repository.name, 2238 "index": self.number, 2239 } 2240 self._commit(args, data) 2241 2242 def merge(self, merge_type: MergeType): 2243 """ 2244 Merge the pull request. See 2245 https://hub.allspice.io/api/swagger#/repository/repoMergePullRequest 2246 2247 :param merge_type: The type of merge to perform. See the MergeType enum. 2248 """ 2249 2250 self.allspice_client.requests_post( 2251 self.MERGE_DESIGN_REVIEW.format( 2252 owner=self.repository.owner.username, 2253 repo=self.repository.name, 2254 index=self.number, 2255 ), 2256 data={"Do": merge_type.value}, 2257 ) 2258 2259 def get_comments(self) -> List[Comment]: 2260 """ 2261 Get the comments on this pull request, but not specifically on a review. 2262 2263 https://hub.allspice.io/api/swagger#/issue/issueGetComments 2264 2265 :return: A list of comments on this pull request. 2266 """ 2267 2268 results = self.allspice_client.requests_get( 2269 self.GET_COMMENTS.format( 2270 owner=self.repository.owner.username, 2271 repo=self.repository.name, 2272 index=self.number, 2273 ) 2274 ) 2275 return [Comment.parse_response(self.allspice_client, result) for result in results] 2276 2277 def create_comment(self, body: str): 2278 """ 2279 Create a comment on this pull request. This uses the same endpoint as the 2280 comments on issues, and will not be associated with any reviews. 2281 2282 https://hub.allspice.io/api/swagger#/issue/issueCreateComment 2283 2284 :param body: The body of the comment. 2285 :return: The comment that was created. 2286 """ 2287 2288 result = self.allspice_client.requests_post( 2289 self.GET_COMMENTS.format( 2290 owner=self.repository.owner.username, 2291 repo=self.repository.name, 2292 index=self.number, 2293 ), 2294 data={"body": body}, 2295 ) 2296 return Comment.parse_response(self.allspice_client, result) 2297 2298 2299class Team(ApiObject): 2300 can_create_org_repo: bool 2301 description: str 2302 id: int 2303 includes_all_repositories: bool 2304 name: str 2305 organization: Optional["Organization"] 2306 permission: str 2307 units: List[str] 2308 units_map: Dict[str, str] 2309 2310 API_OBJECT = """/teams/{id}""" # <id> 2311 ADD_REPO = """/teams/%s/repos/%s/%s""" # <id, org, repo> 2312 TEAM_DELETE = """/teams/%s""" # <id> 2313 GET_MEMBERS = """/teams/%s/members""" # <id> 2314 GET_REPOS = """/teams/%s/repos""" # <id> 2315 2316 def __init__(self, allspice_client): 2317 super().__init__(allspice_client) 2318 2319 def __eq__(self, other): 2320 if not isinstance(other, Team): 2321 return False 2322 return self.organization == other.organization and self.id == other.id 2323 2324 def __hash__(self): 2325 return hash(self.organization) ^ hash(self.id) 2326 2327 _fields_to_parsers: ClassVar[dict] = { 2328 "organization": lambda allspice_client, o: Organization.parse_response(allspice_client, o) 2329 } 2330 2331 _patchable_fields: ClassVar[set[str]] = { 2332 "can_create_org_repo", 2333 "description", 2334 "includes_all_repositories", 2335 "name", 2336 "permission", 2337 "units", 2338 "units_map", 2339 } 2340 2341 @classmethod 2342 def request(cls, allspice_client, id: int): 2343 return cls._request(allspice_client, {"id": id}) 2344 2345 def commit(self): 2346 args = {"id": self.id} 2347 self._commit(args) 2348 2349 def add_user(self, user: User): 2350 """https://hub.allspice.io/api/swagger#/organization/orgAddTeamMember""" 2351 url = f"/teams/{self.id}/members/{user.login}" 2352 self.allspice_client.requests_put(url) 2353 2354 def add_repo(self, org: Organization, repo: Union[Repository, str]): 2355 if isinstance(repo, Repository): 2356 repo_name = repo.name 2357 else: 2358 repo_name = repo 2359 self.allspice_client.requests_put(Team.ADD_REPO % (self.id, org.username, repo_name)) 2360 2361 def get_members(self): 2362 """Get all users assigned to the team.""" 2363 results = self.allspice_client.requests_get_paginated( 2364 Team.GET_MEMBERS % self.id, 2365 ) 2366 return [User.parse_response(self.allspice_client, result) for result in results] 2367 2368 def get_repos(self): 2369 """Get all repos of this Team.""" 2370 results = self.allspice_client.requests_get(Team.GET_REPOS % self.id) 2371 return [Repository.parse_response(self.allspice_client, result) for result in results] 2372 2373 def delete(self): 2374 self.allspice_client.requests_delete(Team.TEAM_DELETE % self.id) 2375 self.deleted = True 2376 2377 def remove_team_member(self, user_name: str): 2378 url = f"/teams/{self.id}/members/{user_name}" 2379 self.allspice_client.requests_delete(url) 2380 2381 2382class Release(ApiObject): 2383 """ 2384 A release on a repo. 2385 """ 2386 2387 assets: List[Union[Any, Dict[str, Union[int, str]], "ReleaseAsset"]] 2388 author: User 2389 body: str 2390 created_at: str 2391 draft: bool 2392 html_url: str 2393 id: int 2394 name: str 2395 prerelease: bool 2396 published_at: str 2397 repo: Optional["Repository"] 2398 repository: Optional["Repository"] 2399 tag_name: str 2400 tarball_url: str 2401 target_commitish: str 2402 upload_url: str 2403 url: str 2404 zipball_url: str 2405 2406 API_OBJECT = "/repos/{owner}/{repo}/releases/{id}" 2407 RELEASE_CREATE_ASSET = "/repos/{owner}/{repo}/releases/{id}/assets" 2408 # Note that we don't strictly need the get_assets route, as the release 2409 # object already contains the assets. 2410 2411 def __init__(self, allspice_client): 2412 super().__init__(allspice_client) 2413 2414 def __eq__(self, other): 2415 if not isinstance(other, Release): 2416 return False 2417 return self.repo == other.repo and self.id == other.id 2418 2419 def __hash__(self): 2420 return hash(self.repo) ^ hash(self.id) 2421 2422 _fields_to_parsers: ClassVar[dict] = { 2423 "author": lambda allspice_client, author: User.parse_response(allspice_client, author), 2424 } 2425 _patchable_fields: ClassVar[set[str]] = { 2426 "body", 2427 "draft", 2428 "name", 2429 "prerelease", 2430 "tag_name", 2431 "target_commitish", 2432 } 2433 2434 @classmethod 2435 def parse_response(cls, allspice_client, result, repo) -> Release: 2436 release = super().parse_response(allspice_client, result) 2437 Release._add_read_property("repository", repo, release) 2438 # For legacy reasons 2439 Release._add_read_property("repo", repo, release) 2440 setattr( 2441 release, 2442 "_assets", 2443 [ 2444 ReleaseAsset.parse_response(allspice_client, asset, release) 2445 for asset in result["assets"] 2446 ], 2447 ) 2448 return release 2449 2450 @classmethod 2451 def request( 2452 cls, 2453 allspice_client, 2454 owner: str, 2455 repo: str, 2456 id: Optional[int] = None, 2457 ) -> Release: 2458 args = {"owner": owner, "repo": repo, "id": id} 2459 release_response = cls._get_gitea_api_object(allspice_client, args) 2460 repository = Repository.request(allspice_client, owner, repo) 2461 release = cls.parse_response(allspice_client, release_response, repository) 2462 return release 2463 2464 def commit(self): 2465 if self.repo is None: 2466 raise ValueError("Cannot commit a release without a repository.") 2467 2468 args = {"owner": self.repo.owner.username, "repo": self.repo.name, "id": self.id} 2469 self._commit(args) 2470 2471 def create_asset(self, file: IO, name: Optional[str] = None) -> ReleaseAsset: 2472 """ 2473 Create an asset for this release. 2474 2475 https://hub.allspice.io/api/swagger#/repository/repoCreateReleaseAsset 2476 2477 :param file: The file to upload. This should be a file-like object. 2478 :param name: The name of the file. 2479 :return: The created asset. 2480 """ 2481 2482 if self.repo is None: 2483 raise ValueError("Cannot commit a release without a repository.") 2484 2485 args: dict[str, Any] = {"files": {"attachment": file}} 2486 if name is not None: 2487 args["params"] = {"name": name} 2488 2489 result = self.allspice_client.requests_post( 2490 self.RELEASE_CREATE_ASSET.format( 2491 owner=self.repo.owner.username, 2492 repo=self.repo.name, 2493 id=self.id, 2494 ), 2495 **args, 2496 ) 2497 return ReleaseAsset.parse_response(self.allspice_client, result, self) 2498 2499 def delete(self): 2500 if self.repo is None: 2501 raise ValueError("Cannot commit a release without a repository.") 2502 2503 args = {"owner": self.repo.owner.username, "repo": self.repo.name, "id": self.id} 2504 self.allspice_client.requests_delete(self.API_OBJECT.format(**args)) 2505 self.deleted = True 2506 2507 2508class ReleaseAsset(ApiObject): 2509 browser_download_url: str 2510 created_at: str 2511 download_count: int 2512 id: int 2513 name: str 2514 release: Optional["Release"] 2515 size: int 2516 uuid: str 2517 2518 API_OBJECT = "/repos/{owner}/{repo}/releases/{release_id}/assets/{id}" 2519 2520 def __init__(self, allspice_client): 2521 super().__init__(allspice_client) 2522 2523 def __eq__(self, other): 2524 if not isinstance(other, ReleaseAsset): 2525 return False 2526 return self.release == other.release and self.id == other.id 2527 2528 def __hash__(self): 2529 return hash(self.release) ^ hash(self.id) 2530 2531 _fields_to_parsers: ClassVar[dict] = {} 2532 _patchable_fields: ClassVar[set[str]] = { 2533 "name", 2534 } 2535 2536 @classmethod 2537 def parse_response(cls, allspice_client, result, release) -> ReleaseAsset: 2538 asset = super().parse_response(allspice_client, result) 2539 ReleaseAsset._add_read_property("release", release, asset) 2540 return asset 2541 2542 @classmethod 2543 def request( 2544 cls, 2545 allspice_client, 2546 owner: str, 2547 repo: str, 2548 release_id: int, 2549 id: int, 2550 ) -> ReleaseAsset: 2551 args = {"owner": owner, "repo": repo, "release_id": release_id, "id": id} 2552 asset_response = cls._get_gitea_api_object(allspice_client, args) 2553 release = Release.request(allspice_client, owner, repo, release_id) 2554 asset = cls.parse_response(allspice_client, asset_response, release) 2555 return asset 2556 2557 def commit(self): 2558 if self.release is None or self.release.repo is None: 2559 raise ValueError("Cannot commit a release asset without a release or a repository.") 2560 2561 args = { 2562 "owner": self.release.repo.owner.username, 2563 "repo": self.release.repo.name, 2564 "release_id": self.release.id, 2565 "id": self.id, 2566 } 2567 self._commit(args) 2568 2569 def download(self) -> bytes: 2570 """ 2571 Download the raw, binary data of this asset. 2572 2573 Note 1: if the file you are requesting is a text file, you might want to 2574 use .decode() on the result to get a string. For example: 2575 2576 asset.download().decode("utf-8") 2577 2578 Note 2: this method will store the entire file in memory. If you are 2579 downloading a large file, you might want to use download_to_file instead. 2580 """ 2581 2582 return self.allspice_client.requests.get( 2583 self.browser_download_url, 2584 headers=self.allspice_client.headers, 2585 ).content 2586 2587 def download_to_file(self, io: IO): 2588 """ 2589 Download the raw, binary data of this asset to a file-like object. 2590 2591 Example: 2592 2593 with open("my_file.zip", "wb") as f: 2594 asset.download_to_file(f) 2595 2596 :param io: The file-like object to write the data to. 2597 """ 2598 2599 response = self.allspice_client.requests.get( 2600 self.browser_download_url, 2601 headers=self.allspice_client.headers, 2602 stream=True, 2603 ) 2604 # 4kb chunks 2605 for chunk in response.iter_content(chunk_size=4096): 2606 if chunk: 2607 io.write(chunk) 2608 2609 def delete(self): 2610 if self.release is None or self.release.repo is None: 2611 raise ValueError("Cannot commit a release asset without a release or a repository.") 2612 2613 args = { 2614 "owner": self.release.repo.owner.username, 2615 "repo": self.release.repo.name, 2616 "release_id": self.release.id, 2617 "id": self.id, 2618 } 2619 self.allspice_client.requests_delete(self.API_OBJECT.format(**args)) 2620 self.deleted = True 2621 2622 2623class Content(ReadonlyApiObject): 2624 content: Any 2625 download_url: str 2626 encoding: Any 2627 git_url: str 2628 html_url: str 2629 last_commit_sha: str 2630 name: str 2631 path: str 2632 sha: str 2633 size: int 2634 submodule_git_url: Any 2635 target: Any 2636 type: str 2637 url: str 2638 2639 FILE = "file" 2640 2641 def __init__(self, allspice_client): 2642 super().__init__(allspice_client) 2643 2644 def __eq__(self, other): 2645 if not isinstance(other, Content): 2646 return False 2647 2648 return self.sha == other.sha and self.name == other.name 2649 2650 def __hash__(self): 2651 return hash(self.sha) ^ hash(self.name) 2652 2653 2654Ref = Union[Branch, Commit, str] 2655 2656 2657class Util: 2658 @staticmethod 2659 def convert_time(time: str) -> datetime: 2660 """Parsing of strange Gitea time format ("%Y-%m-%dT%H:%M:%S:%z" but with ":" in time zone notation)""" 2661 try: 2662 return datetime.strptime(time[:-3] + "00", "%Y-%m-%dT%H:%M:%S%z") 2663 except ValueError: 2664 return datetime.strptime(time[:-3] + "00", "%Y-%m-%dT%H:%M:%S") 2665 2666 @staticmethod 2667 def format_time(time: datetime) -> str: 2668 """ 2669 Format a datetime object to Gitea's time format. 2670 2671 :param time: The time to format 2672 :return: Formatted time 2673 """ 2674 2675 return time.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S") + "Z" 2676 2677 @staticmethod 2678 def data_params_for_ref(ref: Optional[Ref]) -> Dict: 2679 """ 2680 Given a "ref", returns a dict with the ref parameter for the API call. 2681 2682 If the ref is None, returns an empty dict. You can pass this to the API 2683 directly. 2684 """ 2685 2686 if isinstance(ref, Branch): 2687 return {"ref": ref.name} 2688 elif isinstance(ref, Commit): 2689 return {"ref": ref.sha} 2690 elif ref: 2691 return {"ref": ref} 2692 else: 2693 return {}
33class Organization(ApiObject): 34 """see https://hub.allspice.io/api/swagger#/organization/orgGetAll""" 35 36 active: Optional[bool] 37 avatar_url: str 38 created: Optional[str] 39 description: str 40 email: str 41 followers_count: Optional[int] 42 following_count: Optional[int] 43 full_name: str 44 html_url: Optional[str] 45 id: int 46 is_admin: Optional[bool] 47 language: Optional[str] 48 last_login: Optional[str] 49 location: str 50 login: Optional[str] 51 login_name: Optional[str] 52 name: Optional[str] 53 prohibit_login: Optional[bool] 54 repo_admin_change_team_access: Optional[bool] 55 restricted: Optional[bool] 56 source_id: Optional[int] 57 starred_repos_count: Optional[int] 58 username: str 59 visibility: str 60 website: str 61 62 API_OBJECT = """/orgs/{name}""" # <org> 63 ORG_REPOS_REQUEST = """/orgs/%s/repos""" # <org> 64 ORG_TEAMS_REQUEST = """/orgs/%s/teams""" # <org> 65 ORG_TEAMS_CREATE = """/orgs/%s/teams""" # <org> 66 ORG_GET_MEMBERS = """/orgs/%s/members""" # <org> 67 ORG_IS_MEMBER = """/orgs/%s/members/%s""" # <org>, <username> 68 ORG_HEATMAP = """/users/%s/heatmap""" # <username> 69 70 def __init__(self, allspice_client): 71 super().__init__(allspice_client) 72 73 def __eq__(self, other): 74 if not isinstance(other, Organization): 75 return False 76 return self.allspice_client == other.allspice_client and self.name == other.name 77 78 def __hash__(self): 79 return hash(self.allspice_client) ^ hash(self.name) 80 81 @classmethod 82 def request(cls, allspice_client, name: str) -> Self: 83 return cls._request(allspice_client, {"name": name}) 84 85 @classmethod 86 def parse_response(cls, allspice_client, result) -> "Organization": 87 api_object = super().parse_response(allspice_client, result) 88 # add "name" field to make this behave similar to users for gitea < 1.18 89 # also necessary for repository-owner when org is repo owner 90 if not hasattr(api_object, "name"): 91 Organization._add_read_property("name", result["username"], api_object) 92 return api_object 93 94 _patchable_fields: ClassVar[set[str]] = { 95 "description", 96 "full_name", 97 "location", 98 "visibility", 99 "website", 100 } 101 102 def commit(self): 103 args = {"name": self.name} 104 self._commit(args) 105 106 def create_repo( 107 self, 108 repoName: str, 109 description: str = "", 110 private: bool = False, 111 autoInit=True, 112 gitignores: Optional[str] = None, 113 license: Optional[str] = None, 114 readme: str = "Default", 115 issue_labels: Optional[str] = None, 116 default_branch="master", 117 ): 118 """Create an organization Repository 119 120 Throws: 121 AlreadyExistsException: If the Repository exists already. 122 Exception: If something else went wrong. 123 """ 124 result = self.allspice_client.requests_post( 125 f"/orgs/{self.name}/repos", 126 data={ 127 "name": repoName, 128 "description": description, 129 "private": private, 130 "auto_init": autoInit, 131 "gitignores": gitignores, 132 "license": license, 133 "issue_labels": issue_labels, 134 "readme": readme, 135 "default_branch": default_branch, 136 }, 137 ) 138 if "id" in result: 139 self.allspice_client.logger.info("Successfully created Repository %s " % result["name"]) 140 else: 141 self.allspice_client.logger.error(result["message"]) 142 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 143 return Repository.parse_response(self.allspice_client, result) 144 145 def get_repositories(self) -> List["Repository"]: 146 results = self.allspice_client.requests_get_paginated( 147 Organization.ORG_REPOS_REQUEST % self.username 148 ) 149 return [Repository.parse_response(self.allspice_client, result) for result in results] 150 151 def get_repository(self, name) -> "Repository": 152 repos = self.get_repositories() 153 for repo in repos: 154 if repo.name == name: 155 return repo 156 raise NotFoundException("Repository %s not existent in organization." % name) 157 158 def get_teams(self) -> List["Team"]: 159 results = self.allspice_client.requests_get(Organization.ORG_TEAMS_REQUEST % self.username) 160 teams = [Team.parse_response(self.allspice_client, result) for result in results] 161 # organisation seems to be missing using this request, so we add org manually 162 for t in teams: 163 setattr(t, "_organization", self) 164 return teams 165 166 def get_team(self, name) -> "Team": 167 teams = self.get_teams() 168 for team in teams: 169 if team.name == name: 170 return team 171 raise NotFoundException("Team not existent in organization.") 172 173 def create_team( 174 self, 175 name: str, 176 description: str = "", 177 permission: str = "read", 178 can_create_org_repo: bool = False, 179 includes_all_repositories: bool = False, 180 units=( 181 "repo.code", 182 "repo.issues", 183 "repo.ext_issues", 184 "repo.wiki", 185 "repo.pulls", 186 "repo.releases", 187 "repo.ext_wiki", 188 ), 189 units_map={}, 190 ) -> "Team": 191 """Alias for AllSpice#create_team""" 192 # TODO: Move AllSpice#create_team to Organization#create_team and 193 # deprecate AllSpice#create_team. 194 return self.allspice_client.create_team( 195 org=self, 196 name=name, 197 description=description, 198 permission=permission, 199 can_create_org_repo=can_create_org_repo, 200 includes_all_repositories=includes_all_repositories, 201 units=units, 202 units_map=units_map, 203 ) 204 205 def get_members(self) -> List["User"]: 206 results = self.allspice_client.requests_get(Organization.ORG_GET_MEMBERS % self.username) 207 return [User.parse_response(self.allspice_client, result) for result in results] 208 209 def is_member(self, username) -> bool: 210 if isinstance(username, User): 211 username = username.username 212 try: 213 # returns 204 if its ok, 404 if its not 214 self.allspice_client.requests_get( 215 Organization.ORG_IS_MEMBER % (self.username, username) 216 ) 217 return True 218 except Exception: 219 return False 220 221 def remove_member(self, user: "User"): 222 path = f"/orgs/{self.username}/members/{user.username}" 223 self.allspice_client.requests_delete(path) 224 225 def delete(self): 226 """Delete this Organization. Invalidates this Objects data. Also deletes all Repositories owned by the User""" 227 for repo in self.get_repositories(): 228 repo.delete() 229 self.allspice_client.requests_delete(Organization.API_OBJECT.format(name=self.username)) 230 self.deleted = True 231 232 def get_heatmap(self) -> List[Tuple[datetime, int]]: 233 results = self.allspice_client.requests_get(User.USER_HEATMAP % self.username) 234 results = [ 235 (datetime.fromtimestamp(result["timestamp"]), result["contributions"]) 236 for result in results 237 ] 238 return results
see allspice.allspice.io/api/swagger#/organization/orgGetAll">https://huballspice.allspice.io/api/swagger#/organization/orgGetAll
85 @classmethod 86 def parse_response(cls, allspice_client, result) -> "Organization": 87 api_object = super().parse_response(allspice_client, result) 88 # add "name" field to make this behave similar to users for gitea < 1.18 89 # also necessary for repository-owner when org is repo owner 90 if not hasattr(api_object, "name"): 91 Organization._add_read_property("name", result["username"], api_object) 92 return api_object
106 def create_repo( 107 self, 108 repoName: str, 109 description: str = "", 110 private: bool = False, 111 autoInit=True, 112 gitignores: Optional[str] = None, 113 license: Optional[str] = None, 114 readme: str = "Default", 115 issue_labels: Optional[str] = None, 116 default_branch="master", 117 ): 118 """Create an organization Repository 119 120 Throws: 121 AlreadyExistsException: If the Repository exists already. 122 Exception: If something else went wrong. 123 """ 124 result = self.allspice_client.requests_post( 125 f"/orgs/{self.name}/repos", 126 data={ 127 "name": repoName, 128 "description": description, 129 "private": private, 130 "auto_init": autoInit, 131 "gitignores": gitignores, 132 "license": license, 133 "issue_labels": issue_labels, 134 "readme": readme, 135 "default_branch": default_branch, 136 }, 137 ) 138 if "id" in result: 139 self.allspice_client.logger.info("Successfully created Repository %s " % result["name"]) 140 else: 141 self.allspice_client.logger.error(result["message"]) 142 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 143 return Repository.parse_response(self.allspice_client, result)
Create an organization Repository
Throws: AlreadyExistsException: If the Repository exists already. Exception: If something else went wrong.
158 def get_teams(self) -> List["Team"]: 159 results = self.allspice_client.requests_get(Organization.ORG_TEAMS_REQUEST % self.username) 160 teams = [Team.parse_response(self.allspice_client, result) for result in results] 161 # organisation seems to be missing using this request, so we add org manually 162 for t in teams: 163 setattr(t, "_organization", self) 164 return teams
173 def create_team( 174 self, 175 name: str, 176 description: str = "", 177 permission: str = "read", 178 can_create_org_repo: bool = False, 179 includes_all_repositories: bool = False, 180 units=( 181 "repo.code", 182 "repo.issues", 183 "repo.ext_issues", 184 "repo.wiki", 185 "repo.pulls", 186 "repo.releases", 187 "repo.ext_wiki", 188 ), 189 units_map={}, 190 ) -> "Team": 191 """Alias for AllSpice#create_team""" 192 # TODO: Move AllSpice#create_team to Organization#create_team and 193 # deprecate AllSpice#create_team. 194 return self.allspice_client.create_team( 195 org=self, 196 name=name, 197 description=description, 198 permission=permission, 199 can_create_org_repo=can_create_org_repo, 200 includes_all_repositories=includes_all_repositories, 201 units=units, 202 units_map=units_map, 203 )
Alias for AllSpice#create_team
209 def is_member(self, username) -> bool: 210 if isinstance(username, User): 211 username = username.username 212 try: 213 # returns 204 if its ok, 404 if its not 214 self.allspice_client.requests_get( 215 Organization.ORG_IS_MEMBER % (self.username, username) 216 ) 217 return True 218 except Exception: 219 return False
225 def delete(self): 226 """Delete this Organization. Invalidates this Objects data. Also deletes all Repositories owned by the User""" 227 for repo in self.get_repositories(): 228 repo.delete() 229 self.allspice_client.requests_delete(Organization.API_OBJECT.format(name=self.username)) 230 self.deleted = True
Delete this Organization. Invalidates this Objects data. Also deletes all Repositories owned by the User
241class User(ApiObject): 242 active: bool 243 admin: Any 244 allow_create_organization: Any 245 allow_git_hook: Any 246 allow_import_local: Any 247 avatar_url: str 248 created: str 249 description: str 250 email: str 251 emails: List[Any] 252 followers_count: int 253 following_count: int 254 full_name: str 255 html_url: str 256 id: int 257 is_admin: bool 258 language: str 259 last_login: str 260 location: str 261 login: str 262 login_name: str 263 max_repo_creation: Any 264 must_change_password: Any 265 password: Any 266 prohibit_login: bool 267 restricted: bool 268 source_id: int 269 starred_repos_count: int 270 username: str 271 visibility: str 272 website: str 273 274 API_OBJECT = """/users/{name}""" # <org> 275 USER_MAIL = """/user/emails?sudo=%s""" # <name> 276 USER_PATCH = """/admin/users/%s""" # <username> 277 ADMIN_DELETE_USER = """/admin/users/%s""" # <username> 278 ADMIN_EDIT_USER = """/admin/users/{username}""" # <username> 279 USER_HEATMAP = """/users/%s/heatmap""" # <username> 280 281 def __init__(self, allspice_client): 282 super().__init__(allspice_client) 283 self._emails = [] 284 285 def __eq__(self, other): 286 if not isinstance(other, User): 287 return False 288 return self.allspice_client == other.allspice_client and self.id == other.id 289 290 def __hash__(self): 291 return hash(self.allspice_client) ^ hash(self.id) 292 293 @property 294 def emails(self): 295 self.__request_emails() 296 return self._emails 297 298 @classmethod 299 def request(cls, allspice_client, name: str) -> "User": 300 api_object = cls._request(allspice_client, {"name": name}) 301 return api_object 302 303 _patchable_fields: ClassVar[set[str]] = { 304 "active", 305 "admin", 306 "allow_create_organization", 307 "allow_git_hook", 308 "allow_import_local", 309 "email", 310 "full_name", 311 "location", 312 "login_name", 313 "max_repo_creation", 314 "must_change_password", 315 "password", 316 "prohibit_login", 317 "website", 318 } 319 320 def commit(self, login_name: str, source_id: int = 0): 321 """ 322 Unfortunately it is necessary to require the login name 323 as well as the login source (that is not supplied when getting a user) for 324 changing a user. 325 Usually source_id is 0 and the login_name is equal to the username. 326 """ 327 values = self.get_dirty_fields() 328 values.update( 329 # api-doc says that the "source_id" is necessary; works without though 330 {"login_name": login_name, "source_id": source_id} 331 ) 332 args = {"username": self.username} 333 self.allspice_client.requests_patch(User.ADMIN_EDIT_USER.format(**args), data=values) 334 self._dirty_fields = {} 335 336 def create_repo( 337 self, 338 repoName: str, 339 description: str = "", 340 private: bool = False, 341 autoInit=True, 342 gitignores: Optional[str] = None, 343 license: Optional[str] = None, 344 readme: str = "Default", 345 issue_labels: Optional[str] = None, 346 default_branch="master", 347 ): 348 """Create a user Repository 349 350 Throws: 351 AlreadyExistsException: If the Repository exists already. 352 Exception: If something else went wrong. 353 """ 354 result = self.allspice_client.requests_post( 355 "/user/repos", 356 data={ 357 "name": repoName, 358 "description": description, 359 "private": private, 360 "auto_init": autoInit, 361 "gitignores": gitignores, 362 "license": license, 363 "issue_labels": issue_labels, 364 "readme": readme, 365 "default_branch": default_branch, 366 }, 367 ) 368 if "id" in result: 369 self.allspice_client.logger.info("Successfully created Repository %s " % result["name"]) 370 else: 371 self.allspice_client.logger.error(result["message"]) 372 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 373 return Repository.parse_response(self.allspice_client, result) 374 375 def get_repositories(self) -> List["Repository"]: 376 """Get all Repositories owned by this User.""" 377 url = f"/users/{self.username}/repos" 378 results = self.allspice_client.requests_get_paginated(url) 379 return [Repository.parse_response(self.allspice_client, result) for result in results] 380 381 def get_orgs(self) -> List[Organization]: 382 """Get all Organizations this user is a member of.""" 383 url = f"/users/{self.username}/orgs" 384 results = self.allspice_client.requests_get_paginated(url) 385 return [Organization.parse_response(self.allspice_client, result) for result in results] 386 387 def get_teams(self) -> List["Team"]: 388 url = "/user/teams" 389 results = self.allspice_client.requests_get_paginated(url, sudo=self) 390 return [Team.parse_response(self.allspice_client, result) for result in results] 391 392 def get_accessible_repos(self) -> List["Repository"]: 393 """Get all Repositories accessible by the logged in User.""" 394 results = self.allspice_client.requests_get("/user/repos", sudo=self) 395 return [Repository.parse_response(self.allspice_client, result) for result in results] 396 397 def __request_emails(self): 398 result = self.allspice_client.requests_get(User.USER_MAIL % self.login) 399 # report if the adress changed by this 400 for mail in result: 401 self._emails.append(mail["email"]) 402 if mail["primary"]: 403 self._email = mail["email"] 404 405 def delete(self): 406 """Deletes this User. Also deletes all Repositories he owns.""" 407 self.allspice_client.requests_delete(User.ADMIN_DELETE_USER % self.username) 408 self.deleted = True 409 410 def get_heatmap(self) -> List[Tuple[datetime, int]]: 411 results = self.allspice_client.requests_get(User.USER_HEATMAP % self.username) 412 results = [ 413 (datetime.fromtimestamp(result["timestamp"]), result["contributions"]) 414 for result in results 415 ] 416 return results
320 def commit(self, login_name: str, source_id: int = 0): 321 """ 322 Unfortunately it is necessary to require the login name 323 as well as the login source (that is not supplied when getting a user) for 324 changing a user. 325 Usually source_id is 0 and the login_name is equal to the username. 326 """ 327 values = self.get_dirty_fields() 328 values.update( 329 # api-doc says that the "source_id" is necessary; works without though 330 {"login_name": login_name, "source_id": source_id} 331 ) 332 args = {"username": self.username} 333 self.allspice_client.requests_patch(User.ADMIN_EDIT_USER.format(**args), data=values) 334 self._dirty_fields = {}
Unfortunately it is necessary to require the login name as well as the login source (that is not supplied when getting a user) for changing a user. Usually source_id is 0 and the login_name is equal to the username.
336 def create_repo( 337 self, 338 repoName: str, 339 description: str = "", 340 private: bool = False, 341 autoInit=True, 342 gitignores: Optional[str] = None, 343 license: Optional[str] = None, 344 readme: str = "Default", 345 issue_labels: Optional[str] = None, 346 default_branch="master", 347 ): 348 """Create a user Repository 349 350 Throws: 351 AlreadyExistsException: If the Repository exists already. 352 Exception: If something else went wrong. 353 """ 354 result = self.allspice_client.requests_post( 355 "/user/repos", 356 data={ 357 "name": repoName, 358 "description": description, 359 "private": private, 360 "auto_init": autoInit, 361 "gitignores": gitignores, 362 "license": license, 363 "issue_labels": issue_labels, 364 "readme": readme, 365 "default_branch": default_branch, 366 }, 367 ) 368 if "id" in result: 369 self.allspice_client.logger.info("Successfully created Repository %s " % result["name"]) 370 else: 371 self.allspice_client.logger.error(result["message"]) 372 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 373 return Repository.parse_response(self.allspice_client, result)
Create a user Repository
Throws: AlreadyExistsException: If the Repository exists already. Exception: If something else went wrong.
375 def get_repositories(self) -> List["Repository"]: 376 """Get all Repositories owned by this User.""" 377 url = f"/users/{self.username}/repos" 378 results = self.allspice_client.requests_get_paginated(url) 379 return [Repository.parse_response(self.allspice_client, result) for result in results]
Get all Repositories owned by this User.
381 def get_orgs(self) -> List[Organization]: 382 """Get all Organizations this user is a member of.""" 383 url = f"/users/{self.username}/orgs" 384 results = self.allspice_client.requests_get_paginated(url) 385 return [Organization.parse_response(self.allspice_client, result) for result in results]
Get all Organizations this user is a member of.
392 def get_accessible_repos(self) -> List["Repository"]: 393 """Get all Repositories accessible by the logged in User.""" 394 results = self.allspice_client.requests_get("/user/repos", sudo=self) 395 return [Repository.parse_response(self.allspice_client, result) for result in results]
Get all Repositories accessible by the logged in User.
405 def delete(self): 406 """Deletes this User. Also deletes all Repositories he owns.""" 407 self.allspice_client.requests_delete(User.ADMIN_DELETE_USER % self.username) 408 self.deleted = True
Deletes this User. Also deletes all Repositories he owns.
419class Branch(ReadonlyApiObject): 420 commit: Dict[str, Optional[Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]]] 421 effective_branch_protection_name: str 422 enable_status_check: bool 423 name: str 424 protected: bool 425 required_approvals: int 426 status_check_contexts: List[Any] 427 user_can_merge: bool 428 user_can_push: bool 429 430 API_OBJECT = """/repos/{owner}/{repo}/branches/{branch}""" 431 432 def __init__(self, allspice_client): 433 super().__init__(allspice_client) 434 435 def __eq__(self, other): 436 if not isinstance(other, Branch): 437 return False 438 return self.commit == other.commit and self.name == other.name 439 440 def __hash__(self): 441 return hash(self.commit["id"]) ^ hash(self.name) 442 443 _fields_to_parsers: ClassVar[dict] = { 444 # This is not a commit object 445 # "commit": lambda allspice_client, c: Commit.parse_response(allspice_client, c) 446 } 447 448 @classmethod 449 def request(cls, allspice_client, owner: str, repo: str, branch: str): 450 return cls._request(allspice_client, {"owner": owner, "repo": repo, "branch": branch})
Inherited Members
453class GitEntry(ReadonlyApiObject): 454 """ 455 An object representing a file or directory in the Git tree. 456 """ 457 458 mode: str 459 path: str 460 sha: str 461 size: int 462 type: str 463 url: str 464 465 def __init__(self, allspice_client): 466 super().__init__(allspice_client) 467 468 def __eq__(self, other) -> bool: 469 if not isinstance(other, GitEntry): 470 return False 471 return self.sha == other.sha 472 473 def __hash__(self) -> int: 474 return hash(self.sha)
An object representing a file or directory in the Git tree.
Inherited Members
477class Repository(ApiObject): 478 allow_fast_forward_only_merge: bool 479 allow_manual_merge: Any 480 allow_merge_commits: bool 481 allow_rebase: bool 482 allow_rebase_explicit: bool 483 allow_rebase_update: bool 484 allow_squash_merge: bool 485 archived: bool 486 archived_at: str 487 autodetect_manual_merge: Any 488 avatar_url: str 489 clone_url: str 490 created_at: str 491 default_allow_maintainer_edit: bool 492 default_branch: str 493 default_delete_branch_after_merge: bool 494 default_merge_style: str 495 description: str 496 empty: bool 497 enable_prune: Any 498 external_tracker: Any 499 external_wiki: Any 500 fork: bool 501 forks_count: int 502 full_name: str 503 has_actions: bool 504 has_issues: bool 505 has_packages: bool 506 has_projects: bool 507 has_pull_requests: bool 508 has_releases: bool 509 has_wiki: bool 510 html_url: str 511 id: int 512 ignore_whitespace_conflicts: bool 513 internal: bool 514 internal_tracker: Dict[str, bool] 515 language: str 516 languages_url: str 517 link: str 518 mirror: bool 519 mirror_interval: str 520 mirror_updated: str 521 name: str 522 object_format_name: str 523 open_issues_count: int 524 open_pr_counter: int 525 original_url: str 526 owner: Union["User", "Organization"] 527 parent: Any 528 permissions: Dict[str, bool] 529 private: bool 530 projects_mode: str 531 release_counter: int 532 repo_transfer: Any 533 size: int 534 ssh_url: str 535 stars_count: int 536 template: bool 537 updated_at: datetime 538 url: str 539 watchers_count: int 540 website: str 541 542 API_OBJECT = """/repos/{owner}/{name}""" # <owner>, <reponame> 543 REPO_IS_COLLABORATOR = """/repos/%s/%s/collaborators/%s""" # <owner>, <reponame>, <username> 544 REPO_SEARCH = """/repos/search/""" 545 REPO_BRANCHES = """/repos/%s/%s/branches""" # <owner>, <reponame> 546 REPO_BRANCH = """/repos/{owner}/{repo}/branches/{branch}""" 547 REPO_ISSUES = """/repos/{owner}/{repo}/issues""" # <owner, reponame> 548 REPO_DESIGN_REVIEWS = """/repos/{owner}/{repo}/pulls""" 549 REPO_DELETE = """/repos/%s/%s""" # <owner>, <reponame> 550 REPO_TIMES = """/repos/%s/%s/times""" # <owner>, <reponame> 551 REPO_USER_TIME = """/repos/%s/%s/times/%s""" # <owner>, <reponame>, <username> 552 REPO_COMMITS = "/repos/%s/%s/commits" # <owner>, <reponame> 553 REPO_TRANSFER = "/repos/{owner}/{repo}/transfer" 554 REPO_MILESTONES = """/repos/{owner}/{repo}/milestones""" 555 REPO_GET_ARCHIVE = "/repos/{owner}/{repo}/archive/{ref}.{format}" 556 REPO_GET_ALLSPICE_JSON = "/repos/{owner}/{repo}/allspice_generated/json/{content}" 557 REPO_GET_ALLSPICE_SVG = "/repos/{owner}/{repo}/allspice_generated/svg/{content}" 558 REPO_GET_ALLSPICE_PROJECT = "/repos/{owner}/{repo}/allspice_generated/project/{content}" 559 REPO_GET_TOPICS = "/repos/{owner}/{repo}/topics" 560 REPO_ADD_TOPIC = "/repos/{owner}/{repo}/topics/{topic}" 561 REPO_GET_RELEASES = "/repos/{owner}/{repo}/releases" 562 REPO_GET_LATEST_RELEASE = "/repos/{owner}/{repo}/releases/latest" 563 REPO_GET_RELEASE_BY_TAG = "/repos/{owner}/{repo}/releases/tags/{tag}" 564 REPO_GET_COMMIT_STATUS = "/repos/{owner}/{repo}/statuses/{sha}" 565 REPO_GET_RAW_FILE = "/repos/{owner}/{repo}/raw/{path}" 566 REPO_GET_TREE = "/repos/{owner}/{repo}/git/trees/{ref}" 567 568 class ArchiveFormat(Enum): 569 """ 570 Archive formats for Repository.get_archive 571 """ 572 573 TAR = "tar.gz" 574 ZIP = "zip" 575 576 class CommitStatusSort(Enum): 577 """ 578 Sort order for Repository.get_commit_status 579 """ 580 581 OLDEST = "oldest" 582 RECENT_UPDATE = "recentupdate" 583 LEAST_UPDATE = "leastupdate" 584 LEAST_INDEX = "leastindex" 585 HIGHEST_INDEX = "highestindex" 586 587 def __init__(self, allspice_client): 588 super().__init__(allspice_client) 589 590 def __eq__(self, other): 591 if not isinstance(other, Repository): 592 return False 593 return self.owner == other.owner and self.name == other.name 594 595 def __hash__(self): 596 return hash(self.owner) ^ hash(self.name) 597 598 _fields_to_parsers: ClassVar[dict] = { 599 # dont know how to tell apart user and org as owner except form email being empty. 600 "owner": lambda allspice_client, r: ( 601 Organization.parse_response(allspice_client, r) 602 if r["email"] == "" 603 else User.parse_response(allspice_client, r) 604 ), 605 "updated_at": lambda _, t: Util.convert_time(t), 606 } 607 608 @classmethod 609 def request( 610 cls, 611 allspice_client, 612 owner: str, 613 name: str, 614 ) -> Repository: 615 return cls._request(allspice_client, {"owner": owner, "name": name}) 616 617 @classmethod 618 def search( 619 cls, 620 allspice_client, 621 query: Optional[str] = None, 622 topic: bool = False, 623 include_description: bool = False, 624 user: Optional[User] = None, 625 owner_to_prioritize: Union[User, Organization, None] = None, 626 ) -> list[Repository]: 627 """ 628 Search for repositories. 629 630 See https://hub.allspice.io/api/swagger#/repository/repoSearch 631 632 :param query: The query string to search for 633 :param topic: If true, the query string will only be matched against the 634 repository's topic. 635 :param include_description: If true, the query string will be matched 636 against the repository's description as well. 637 :param user: If specified, only repositories that this user owns or 638 contributes to will be searched. 639 :param owner_to_prioritize: If specified, repositories owned by the 640 given entity will be prioritized in the search. 641 :returns: All repositories matching the query. If there are many 642 repositories matching this query, this may take some time. 643 """ 644 645 params = {} 646 647 if query is not None: 648 params["q"] = query 649 if topic: 650 params["topic"] = topic 651 if include_description: 652 params["include_description"] = include_description 653 if user is not None: 654 params["user"] = user.id 655 if owner_to_prioritize is not None: 656 params["owner_to_prioritize"] = owner_to_prioritize.id 657 658 responses = allspice_client.requests_get_paginated(cls.REPO_SEARCH, params=params) 659 660 return [Repository.parse_response(allspice_client, response) for response in responses] 661 662 _patchable_fields: ClassVar[set[str]] = { 663 "allow_manual_merge", 664 "allow_merge_commits", 665 "allow_rebase", 666 "allow_rebase_explicit", 667 "allow_rebase_update", 668 "allow_squash_merge", 669 "archived", 670 "autodetect_manual_merge", 671 "default_branch", 672 "default_delete_branch_after_merge", 673 "default_merge_style", 674 "description", 675 "enable_prune", 676 "external_tracker", 677 "external_wiki", 678 "has_actions", 679 "has_issues", 680 "has_projects", 681 "has_pull_requests", 682 "has_wiki", 683 "ignore_whitespace_conflicts", 684 "internal_tracker", 685 "mirror_interval", 686 "name", 687 "private", 688 "template", 689 "website", 690 } 691 692 def commit(self): 693 args = {"owner": self.owner.username, "name": self.name} 694 self._commit(args) 695 696 def get_branches(self) -> List["Branch"]: 697 """Get all the Branches of this Repository.""" 698 699 results = self.allspice_client.requests_get_paginated( 700 Repository.REPO_BRANCHES % (self.owner.username, self.name) 701 ) 702 return [Branch.parse_response(self.allspice_client, result) for result in results] 703 704 def get_branch(self, name: str) -> "Branch": 705 """Get a specific Branch of this Repository.""" 706 result = self.allspice_client.requests_get( 707 Repository.REPO_BRANCH.format(owner=self.owner.username, repo=self.name, branch=name) 708 ) 709 return Branch.parse_response(self.allspice_client, result) 710 711 def add_branch(self, create_from: Ref, newname: str) -> "Branch": 712 """Add a branch to the repository""" 713 # Note: will only work with gitea 1.13 or higher! 714 715 ref_name = Util.data_params_for_ref(create_from) 716 if "ref" not in ref_name: 717 raise ValueError("create_from must be a Branch, Commit or string") 718 ref_name = ref_name["ref"] 719 720 data = {"new_branch_name": newname, "old_ref_name": ref_name} 721 result = self.allspice_client.requests_post( 722 Repository.REPO_BRANCHES % (self.owner.username, self.name), data=data 723 ) 724 return Branch.parse_response(self.allspice_client, result) 725 726 def get_issues( 727 self, 728 state: Literal["open", "closed", "all"] = "all", 729 search_query: Optional[str] = None, 730 labels: Optional[List[str]] = None, 731 milestones: Optional[List[Union[Milestone, str]]] = None, 732 assignee: Optional[Union[User, str]] = None, 733 since: Optional[datetime] = None, 734 before: Optional[datetime] = None, 735 ) -> List["Issue"]: 736 """ 737 Get all Issues of this Repository (open and closed) 738 739 https://hub.allspice.io/api/swagger#/repository/repoListIssues 740 741 All params of this method are optional filters. If you don't specify a filter, it 742 will not be applied. 743 744 :param state: The state of the Issues to get. If None, all Issues are returned. 745 :param search_query: Filter issues by text. This is equivalent to searching for 746 `search_query` in the Issues on the web interface. 747 :param labels: Filter issues by labels. 748 :param milestones: Filter issues by milestones. 749 :param assignee: Filter issues by the assigned user. 750 :param since: Filter issues by the date they were created. 751 :param before: Filter issues by the date they were created. 752 :return: A list of Issues. 753 """ 754 755 data = { 756 "state": state, 757 } 758 if search_query: 759 data["q"] = search_query 760 if labels: 761 data["labels"] = ",".join(labels) 762 if milestones: 763 data["milestone"] = ",".join( 764 [ 765 milestone.name if isinstance(milestone, Milestone) else milestone 766 for milestone in milestones 767 ] 768 ) 769 if assignee: 770 if isinstance(assignee, User): 771 data["assignee"] = assignee.username 772 else: 773 data["assignee"] = assignee 774 if since: 775 data["since"] = Util.format_time(since) 776 if before: 777 data["before"] = Util.format_time(before) 778 779 results = self.allspice_client.requests_get_paginated( 780 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 781 params=data, 782 ) 783 784 issues = [] 785 for result in results: 786 issue = Issue.parse_response(self.allspice_client, result) 787 # See Issue.request 788 setattr(issue, "_repository", self) 789 # This is mostly for compatibility with an older implementation 790 Issue._add_read_property("repo", self, issue) 791 issues.append(issue) 792 793 return issues 794 795 def get_design_reviews( 796 self, 797 state: Literal["open", "closed", "all"] = "all", 798 milestone: Optional[Union[Milestone, str]] = None, 799 labels: Optional[List[str]] = None, 800 ) -> List["DesignReview"]: 801 """ 802 Get all Design Reviews of this Repository. 803 804 https://hub.allspice.io/api/swagger#/repository/repoListPullRequests 805 806 :param state: The state of the Design Reviews to get. If None, all Design Reviews 807 are returned. 808 :param milestone: The milestone of the Design Reviews to get. 809 :param labels: A list of label IDs to filter DRs by. 810 :return: A list of Design Reviews. 811 """ 812 813 params = { 814 "state": state, 815 } 816 if milestone: 817 if isinstance(milestone, Milestone): 818 params["milestone"] = milestone.name 819 else: 820 params["milestone"] = milestone 821 if labels: 822 params["labels"] = ",".join(labels) 823 824 results = self.allspice_client.requests_get_paginated( 825 self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name), 826 params=params, 827 ) 828 return [DesignReview.parse_response(self.allspice_client, result) for result in results] 829 830 def get_commits( 831 self, 832 sha: Optional[str] = None, 833 path: Optional[str] = None, 834 stat: bool = True, 835 ) -> List["Commit"]: 836 """ 837 Get all the Commits of this Repository. 838 839 https://hub.allspice.io/api/swagger#/repository/repoGetAllCommits 840 841 :param sha: The SHA of the commit to start listing commits from. 842 :param path: filepath of a file/dir. 843 :param stat: Include the number of additions and deletions in the response. 844 Disable for speedup. 845 :return: A list of Commits. 846 """ 847 848 data = {} 849 if sha: 850 data["sha"] = sha 851 if path: 852 data["path"] = path 853 if not stat: 854 data["stat"] = False 855 856 try: 857 results = self.allspice_client.requests_get_paginated( 858 Repository.REPO_COMMITS % (self.owner.username, self.name), 859 params=data, 860 ) 861 except ConflictException as err: 862 logging.warning(err) 863 logging.warning("Repository %s/%s is Empty" % (self.owner.username, self.name)) 864 results = [] 865 return [Commit.parse_response(self.allspice_client, result) for result in results] 866 867 def get_issues_state(self, state) -> List["Issue"]: 868 """ 869 DEPRECATED: Use get_issues() instead. 870 871 Get issues of state Issue.open or Issue.closed of a repository. 872 """ 873 874 assert state in [Issue.OPENED, Issue.CLOSED] 875 issues = [] 876 data = {"state": state} 877 results = self.allspice_client.requests_get_paginated( 878 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 879 params=data, 880 ) 881 for result in results: 882 issue = Issue.parse_response(self.allspice_client, result) 883 # adding data not contained in the issue response 884 # See Issue.request() 885 setattr(issue, "_repository", self) 886 Issue._add_read_property("repo", self, issue) 887 Issue._add_read_property("owner", self.owner, issue) 888 issues.append(issue) 889 return issues 890 891 def get_times(self): 892 results = self.allspice_client.requests_get( 893 Repository.REPO_TIMES % (self.owner.username, self.name) 894 ) 895 return results 896 897 def get_user_time(self, username) -> float: 898 if isinstance(username, User): 899 username = username.username 900 results = self.allspice_client.requests_get( 901 Repository.REPO_USER_TIME % (self.owner.username, self.name, username) 902 ) 903 time = sum(r["time"] for r in results) 904 return time 905 906 def get_full_name(self) -> str: 907 return self.owner.username + "/" + self.name 908 909 def create_issue(self, title, assignees=frozenset(), description="") -> ApiObject: 910 data = { 911 "assignees": assignees, 912 "body": description, 913 "closed": False, 914 "title": title, 915 } 916 result = self.allspice_client.requests_post( 917 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 918 data=data, 919 ) 920 921 issue = Issue.parse_response(self.allspice_client, result) 922 setattr(issue, "_repository", self) 923 Issue._add_read_property("repo", self, issue) 924 return issue 925 926 def create_design_review( 927 self, 928 title: str, 929 head: Union[Branch, str], 930 base: Union[Branch, str], 931 assignees: Optional[Set[Union[User, str]]] = None, 932 body: Optional[str] = None, 933 due_date: Optional[datetime] = None, 934 milestone: Optional["Milestone"] = None, 935 ) -> "DesignReview": 936 """ 937 Create a new Design Review. 938 939 See https://hub.allspice.io/api/swagger#/repository/repoCreatePullRequest 940 941 :param title: Title of the Design Review 942 :param head: Branch or name of the branch to merge into the base branch 943 :param base: Branch or name of the branch to merge into 944 :param assignees: Optional. A list of users to assign this review. List can be of 945 User objects or of usernames. 946 :param body: An Optional Description for the Design Review. 947 :param due_date: An Optional Due date for the Design Review. 948 :param milestone: An Optional Milestone for the Design Review 949 :return: The created Design Review 950 """ 951 952 data: dict[str, Any] = { 953 "title": title, 954 } 955 956 if isinstance(head, Branch): 957 data["head"] = head.name 958 else: 959 data["head"] = head 960 if isinstance(base, Branch): 961 data["base"] = base.name 962 else: 963 data["base"] = base 964 if assignees: 965 data["assignees"] = [a.username if isinstance(a, User) else a for a in assignees] 966 if body: 967 data["body"] = body 968 if due_date: 969 data["due_date"] = Util.format_time(due_date) 970 if milestone: 971 data["milestone"] = milestone.id 972 973 result = self.allspice_client.requests_post( 974 self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name), 975 data=data, 976 ) 977 978 return DesignReview.parse_response(self.allspice_client, result) 979 980 def create_milestone( 981 self, 982 title: str, 983 description: str, 984 due_date: Optional[str] = None, 985 state: str = "open", 986 ) -> "Milestone": 987 url = Repository.REPO_MILESTONES.format(owner=self.owner.username, repo=self.name) 988 data = {"title": title, "description": description, "state": state} 989 if due_date: 990 data["due_date"] = due_date 991 result = self.allspice_client.requests_post(url, data=data) 992 return Milestone.parse_response(self.allspice_client, result) 993 994 def create_gitea_hook(self, hook_url: str, events: List[str]): 995 url = f"/repos/{self.owner.username}/{self.name}/hooks" 996 data = { 997 "type": "gitea", 998 "config": {"content_type": "json", "url": hook_url}, 999 "events": events, 1000 "active": True, 1001 } 1002 return self.allspice_client.requests_post(url, data=data) 1003 1004 def list_hooks(self): 1005 url = f"/repos/{self.owner.username}/{self.name}/hooks" 1006 return self.allspice_client.requests_get(url) 1007 1008 def delete_hook(self, id: str): 1009 url = f"/repos/{self.owner.username}/{self.name}/hooks/{id}" 1010 self.allspice_client.requests_delete(url) 1011 1012 def is_collaborator(self, username) -> bool: 1013 if isinstance(username, User): 1014 username = username.username 1015 try: 1016 # returns 204 if its ok, 404 if its not 1017 self.allspice_client.requests_get( 1018 Repository.REPO_IS_COLLABORATOR % (self.owner.username, self.name, username) 1019 ) 1020 return True 1021 except Exception: 1022 return False 1023 1024 def get_users_with_access(self) -> Sequence[User]: 1025 url = f"/repos/{self.owner.username}/{self.name}/collaborators" 1026 response = self.allspice_client.requests_get(url) 1027 collabs = [User.parse_response(self.allspice_client, user) for user in response] 1028 if isinstance(self.owner, User): 1029 return [*collabs, self.owner] 1030 else: 1031 # owner must be org 1032 teams = self.owner.get_teams() 1033 for team in teams: 1034 team_repos = team.get_repos() 1035 if self.name in [n.name for n in team_repos]: 1036 collabs += team.get_members() 1037 return collabs 1038 1039 def remove_collaborator(self, user_name: str): 1040 url = f"/repos/{self.owner.username}/{self.name}/collaborators/{user_name}" 1041 self.allspice_client.requests_delete(url) 1042 1043 def transfer_ownership( 1044 self, 1045 new_owner: Union[User, Organization], 1046 new_teams: Set[Team] | FrozenSet[Team] = frozenset(), 1047 ): 1048 url = Repository.REPO_TRANSFER.format(owner=self.owner.username, repo=self.name) 1049 data: dict[str, Any] = {"new_owner": new_owner.username} 1050 if isinstance(new_owner, Organization): 1051 new_team_ids = [team.id for team in new_teams if team in new_owner.get_teams()] 1052 data["team_ids"] = new_team_ids 1053 self.allspice_client.requests_post(url, data=data) 1054 # TODO: make sure this instance is either updated or discarded 1055 1056 def get_git_content( 1057 self, 1058 ref: Optional["Ref"] = None, 1059 commit: "Optional[Commit]" = None, 1060 ) -> List[Content]: 1061 """ 1062 Get the metadata for all files in the root directory. 1063 1064 https://hub.allspice.io/api/swagger#/repository/repoGetContentsList 1065 1066 :param ref: branch or commit to get content from 1067 :param commit: commit to get content from (deprecated) 1068 """ 1069 url = f"/repos/{self.owner.username}/{self.name}/contents" 1070 data = Util.data_params_for_ref(ref or commit) 1071 1072 result = [ 1073 Content.parse_response(self.allspice_client, f) 1074 for f in self.allspice_client.requests_get(url, data) 1075 ] 1076 return result 1077 1078 def get_tree(self, ref: Optional[Ref] = None, recursive: bool = False) -> List[GitEntry]: 1079 """ 1080 Get the repository's tree on a given ref. 1081 1082 By default, this will only return the top-level entries in the tree. If you want 1083 to get the entire tree, set `recursive` to True. 1084 1085 :param ref: The ref to get the tree from. If not provided, the default branch is used. 1086 :param recursive: Whether to get the entire tree or just the top-level entries. 1087 """ 1088 1089 ref = Util.data_params_for_ref(ref).get("ref", self.default_branch) 1090 url = self.REPO_GET_TREE.format(owner=self.owner.username, repo=self.name, ref=ref) 1091 params = {"recursive": recursive} 1092 results = self.allspice_client.requests_get_paginated(url, params=params) 1093 return [GitEntry.parse_response(self.allspice_client, result) for result in results] 1094 1095 def get_file_content( 1096 self, 1097 content: Content, 1098 ref: Optional[Ref] = None, 1099 commit: Optional[Commit] = None, 1100 ) -> Union[str, List["Content"]]: 1101 """https://hub.allspice.io/api/swagger#/repository/repoGetContents""" 1102 url = f"/repos/{self.owner.username}/{self.name}/contents/{content.path}" 1103 data = Util.data_params_for_ref(ref or commit) 1104 1105 if content.type == Content.FILE: 1106 return self.allspice_client.requests_get(url, data)["content"] 1107 else: 1108 return [ 1109 Content.parse_response(self.allspice_client, f) 1110 for f in self.allspice_client.requests_get(url, data) 1111 ] 1112 1113 def get_raw_file( 1114 self, 1115 file_path: str, 1116 ref: Optional[Ref] = None, 1117 ) -> bytes: 1118 """ 1119 Get the raw, binary data of a single file. 1120 1121 Note 1: if the file you are requesting is a text file, you might want to 1122 use .decode() on the result to get a string. For example: 1123 1124 content = repo.get_raw_file("file.txt").decode("utf-8") 1125 1126 Note 2: this method will store the entire file in memory. If you want 1127 to download a large file, you might want to use `download_to_file` 1128 instead. 1129 1130 See https://hub.allspice.io/api/swagger#/repository/repoGetRawFile 1131 1132 :param file_path: The path to the file to get. 1133 :param ref: The branch or commit to get the file from. If not provided, 1134 the default branch is used. 1135 """ 1136 1137 url = self.REPO_GET_RAW_FILE.format( 1138 owner=self.owner.username, 1139 repo=self.name, 1140 path=file_path, 1141 ) 1142 params = Util.data_params_for_ref(ref) 1143 return self.allspice_client.requests_get_raw(url, params=params) 1144 1145 def download_to_file( 1146 self, 1147 file_path: str, 1148 io: IO, 1149 ref: Optional[Ref] = None, 1150 ) -> None: 1151 """ 1152 Download the binary data of a file to a file-like object. 1153 1154 Example: 1155 1156 with open("schematic.DSN", "wb") as f: 1157 Repository.download_to_file("Schematics/my_schematic.DSN", f) 1158 1159 :param file_path: The path to the file in the repository from the root 1160 of the repository. 1161 :param io: The file-like object to write the data to. 1162 """ 1163 1164 url = self.allspice_client._AllSpice__get_url( 1165 self.REPO_GET_RAW_FILE.format( 1166 owner=self.owner.username, 1167 repo=self.name, 1168 path=file_path, 1169 ) 1170 ) 1171 params = Util.data_params_for_ref(ref) 1172 response = self.allspice_client.requests.get( 1173 url, 1174 params=params, 1175 headers=self.allspice_client.headers, 1176 stream=True, 1177 ) 1178 1179 for chunk in response.iter_content(chunk_size=4096): 1180 if chunk: 1181 io.write(chunk) 1182 1183 def get_generated_json(self, content: Union[Content, str], ref: Optional[Ref] = None) -> dict: 1184 """ 1185 Get the json blob for a cad file if it exists, otherwise enqueue 1186 a new job and return a 503 status. 1187 1188 WARNING: This is still experimental and not recommended for critical 1189 applications. The structure and content of the returned dictionary can 1190 change at any time. 1191 1192 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON 1193 """ 1194 1195 if isinstance(content, Content): 1196 content = content.path 1197 1198 url = self.REPO_GET_ALLSPICE_JSON.format( 1199 owner=self.owner.username, 1200 repo=self.name, 1201 content=content, 1202 ) 1203 data = Util.data_params_for_ref(ref) 1204 return self.allspice_client.requests_get(url, data) 1205 1206 def get_generated_svg(self, content: Union[Content, str], ref: Optional[Ref] = None) -> bytes: 1207 """ 1208 Get the svg blob for a cad file if it exists, otherwise enqueue 1209 a new job and return a 503 status. 1210 1211 WARNING: This is still experimental and not yet recommended for 1212 critical applications. The content of the returned svg can change 1213 at any time. 1214 1215 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG 1216 """ 1217 1218 if isinstance(content, Content): 1219 content = content.path 1220 1221 url = self.REPO_GET_ALLSPICE_SVG.format( 1222 owner=self.owner.username, 1223 repo=self.name, 1224 content=content, 1225 ) 1226 data = Util.data_params_for_ref(ref) 1227 return self.allspice_client.requests_get_raw(url, data) 1228 1229 def get_generated_projectdata( 1230 self, content: Union[Content, str], ref: Optional[Ref] = None 1231 ) -> dict: 1232 """ 1233 Get the json project data based on the cad file provided 1234 1235 WARNING: This is still experimental and not yet recommended for 1236 critical applications. The content of the returned dictionary can change 1237 at any time. 1238 1239 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceProject 1240 """ 1241 if isinstance(content, Content): 1242 content = content.path 1243 1244 url = self.REPO_GET_ALLSPICE_PROJECT.format( 1245 owner=self.owner.username, 1246 repo=self.name, 1247 content=content, 1248 ) 1249 data = Util.data_params_for_ref(ref) 1250 return self.allspice_client.requests_get(url, data) 1251 1252 def create_file(self, file_path: str, content: str, data: Optional[dict] = None): 1253 """https://hub.allspice.io/api/swagger#/repository/repoCreateFile""" 1254 if not data: 1255 data = {} 1256 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1257 data.update({"content": content}) 1258 return self.allspice_client.requests_post(url, data) 1259 1260 def change_file(self, file_path: str, file_sha: str, content: str, data: Optional[dict] = None): 1261 """https://hub.allspice.io/api/swagger#/repository/repoCreateFile""" 1262 if not data: 1263 data = {} 1264 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1265 data.update({"sha": file_sha, "content": content}) 1266 return self.allspice_client.requests_put(url, data) 1267 1268 def delete_file(self, file_path: str, file_sha: str, data: Optional[dict] = None): 1269 """https://hub.allspice.io/api/swagger#/repository/repoDeleteFile""" 1270 if not data: 1271 data = {} 1272 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1273 data.update({"sha": file_sha}) 1274 return self.allspice_client.requests_delete(url, data) 1275 1276 def get_archive( 1277 self, 1278 ref: Ref = "main", 1279 archive_format: ArchiveFormat = ArchiveFormat.ZIP, 1280 ) -> bytes: 1281 """ 1282 Download all the files in a specific ref of a repository as a zip or tarball 1283 archive. 1284 1285 https://hub.allspice.io/api/swagger#/repository/repoGetArchive 1286 1287 :param ref: branch or commit to get content from, defaults to the "main" branch 1288 :param archive_format: zip or tar, defaults to zip 1289 """ 1290 1291 ref_string = Util.data_params_for_ref(ref)["ref"] 1292 url = self.REPO_GET_ARCHIVE.format( 1293 owner=self.owner.username, 1294 repo=self.name, 1295 ref=ref_string, 1296 format=archive_format.value, 1297 ) 1298 return self.allspice_client.requests_get_raw(url) 1299 1300 def get_topics(self) -> list[str]: 1301 """ 1302 Gets the list of topics on this repository. 1303 1304 See http://localhost:3000/api/swagger#/repository/repoListTopics 1305 """ 1306 1307 url = self.REPO_GET_TOPICS.format( 1308 owner=self.owner.username, 1309 repo=self.name, 1310 ) 1311 return self.allspice_client.requests_get(url)["topics"] 1312 1313 def add_topic(self, topic: str): 1314 """ 1315 Adds a topic to the repository. 1316 1317 See https://hub.allspice.io/api/swagger#/repository/repoAddTopic 1318 1319 :param topic: The topic to add. Topic names must consist only of 1320 lowercase letters, numnbers and dashes (-), and cannot start with 1321 dashes. Topic names also must be under 35 characters long. 1322 """ 1323 1324 url = self.REPO_ADD_TOPIC.format(owner=self.owner.username, repo=self.name, topic=topic) 1325 self.allspice_client.requests_put(url) 1326 1327 def create_release( 1328 self, 1329 tag_name: str, 1330 name: Optional[str] = None, 1331 body: Optional[str] = None, 1332 draft: bool = False, 1333 ): 1334 """ 1335 Create a release for this repository. The release will be created for 1336 the tag with the given name. If there is no tag with this name, create 1337 the tag first. 1338 1339 See https://hub.allspice.io/api/swagger#/repository/repoCreateRelease 1340 """ 1341 1342 url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name) 1343 data = { 1344 "tag_name": tag_name, 1345 "draft": draft, 1346 } 1347 if name is not None: 1348 data["name"] = name 1349 if body is not None: 1350 data["body"] = body 1351 response = self.allspice_client.requests_post(url, data) 1352 return Release.parse_response(self.allspice_client, response, self) 1353 1354 def get_releases( 1355 self, draft: Optional[bool] = None, pre_release: Optional[bool] = None 1356 ) -> List[Release]: 1357 """ 1358 Get the list of releases for this repository. 1359 1360 See https://hub.allspice.io/api/swagger#/repository/repoListReleases 1361 """ 1362 1363 data = {} 1364 1365 if draft is not None: 1366 data["draft"] = draft 1367 if pre_release is not None: 1368 data["pre-release"] = pre_release 1369 1370 url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name) 1371 responses = self.allspice_client.requests_get_paginated(url, params=data) 1372 1373 return [ 1374 Release.parse_response(self.allspice_client, response, self) for response in responses 1375 ] 1376 1377 def get_latest_release(self) -> Release: 1378 """ 1379 Get the latest release for this repository. 1380 1381 See https://hub.allspice.io/api/swagger#/repository/repoGetLatestRelease 1382 """ 1383 1384 url = self.REPO_GET_LATEST_RELEASE.format(owner=self.owner.username, repo=self.name) 1385 response = self.allspice_client.requests_get(url) 1386 release = Release.parse_response(self.allspice_client, response, self) 1387 return release 1388 1389 def get_release_by_tag(self, tag: str) -> Release: 1390 """ 1391 Get a release by its tag. 1392 1393 See https://hub.allspice.io/api/swagger#/repository/repoGetReleaseByTag 1394 """ 1395 1396 url = self.REPO_GET_RELEASE_BY_TAG.format( 1397 owner=self.owner.username, repo=self.name, tag=tag 1398 ) 1399 response = self.allspice_client.requests_get(url) 1400 release = Release.parse_response(self.allspice_client, response, self) 1401 return release 1402 1403 def get_commit_statuses( 1404 self, 1405 commit: Union[str, Commit], 1406 sort: Optional[CommitStatusSort] = None, 1407 state: Optional[CommitStatusState] = None, 1408 ) -> List[CommitStatus]: 1409 """ 1410 Get a list of statuses for a commit. 1411 1412 This is roughly equivalent to the Commit.get_statuses method, but this 1413 method allows you to sort and filter commits and is more convenient if 1414 you have a commit SHA and don't need to get the commit itself. 1415 1416 See https://hub.allspice.io/api/swagger#/repository/repoListStatuses 1417 """ 1418 1419 if isinstance(commit, Commit): 1420 commit = commit.sha 1421 1422 params = {} 1423 if sort is not None: 1424 params["sort"] = sort.value 1425 if state is not None: 1426 params["state"] = state.value 1427 1428 url = self.REPO_GET_COMMIT_STATUS.format( 1429 owner=self.owner.username, repo=self.name, sha=commit 1430 ) 1431 response = self.allspice_client.requests_get_paginated(url, params=params) 1432 return [CommitStatus.parse_response(self.allspice_client, status) for status in response] 1433 1434 def create_commit_status( 1435 self, 1436 commit: Union[str, Commit], 1437 context: Optional[str] = None, 1438 description: Optional[str] = None, 1439 state: Optional[CommitStatusState] = None, 1440 target_url: Optional[str] = None, 1441 ) -> CommitStatus: 1442 """ 1443 Create a status on a commit. 1444 1445 See https://hub.allspice.io/api/swagger#/repository/repoCreateStatus 1446 """ 1447 1448 if isinstance(commit, Commit): 1449 commit = commit.sha 1450 1451 data = {} 1452 if context is not None: 1453 data["context"] = context 1454 if description is not None: 1455 data["description"] = description 1456 if state is not None: 1457 data["state"] = state.value 1458 if target_url is not None: 1459 data["target_url"] = target_url 1460 1461 url = self.REPO_GET_COMMIT_STATUS.format( 1462 owner=self.owner.username, repo=self.name, sha=commit 1463 ) 1464 response = self.allspice_client.requests_post(url, data=data) 1465 return CommitStatus.parse_response(self.allspice_client, response) 1466 1467 def delete(self): 1468 self.allspice_client.requests_delete( 1469 Repository.REPO_DELETE % (self.owner.username, self.name) 1470 ) 1471 self.deleted = True
617 @classmethod 618 def search( 619 cls, 620 allspice_client, 621 query: Optional[str] = None, 622 topic: bool = False, 623 include_description: bool = False, 624 user: Optional[User] = None, 625 owner_to_prioritize: Union[User, Organization, None] = None, 626 ) -> list[Repository]: 627 """ 628 Search for repositories. 629 630 See https://hub.allspice.io/api/swagger#/repository/repoSearch 631 632 :param query: The query string to search for 633 :param topic: If true, the query string will only be matched against the 634 repository's topic. 635 :param include_description: If true, the query string will be matched 636 against the repository's description as well. 637 :param user: If specified, only repositories that this user owns or 638 contributes to will be searched. 639 :param owner_to_prioritize: If specified, repositories owned by the 640 given entity will be prioritized in the search. 641 :returns: All repositories matching the query. If there are many 642 repositories matching this query, this may take some time. 643 """ 644 645 params = {} 646 647 if query is not None: 648 params["q"] = query 649 if topic: 650 params["topic"] = topic 651 if include_description: 652 params["include_description"] = include_description 653 if user is not None: 654 params["user"] = user.id 655 if owner_to_prioritize is not None: 656 params["owner_to_prioritize"] = owner_to_prioritize.id 657 658 responses = allspice_client.requests_get_paginated(cls.REPO_SEARCH, params=params) 659 660 return [Repository.parse_response(allspice_client, response) for response in responses]
Search for repositories.
See allspice.allspice.io/api/swagger#/repository/repoSearch">https://huballspice.allspice.io/api/swagger#/repository/repoSearch
Parameters
- query: The query string to search for
- topic: If true, the query string will only be matched against the repository's topic.
- include_description: If true, the query string will be matched against the repository's description as well.
- user: If specified, only repositories that this user owns or contributes to will be searched.
- owner_to_prioritize: If specified, repositories owned by the given entity will be prioritized in the search. :returns: All repositories matching the query. If there are many repositories matching this query, this may take some time.
696 def get_branches(self) -> List["Branch"]: 697 """Get all the Branches of this Repository.""" 698 699 results = self.allspice_client.requests_get_paginated( 700 Repository.REPO_BRANCHES % (self.owner.username, self.name) 701 ) 702 return [Branch.parse_response(self.allspice_client, result) for result in results]
Get all the Branches of this Repository.
704 def get_branch(self, name: str) -> "Branch": 705 """Get a specific Branch of this Repository.""" 706 result = self.allspice_client.requests_get( 707 Repository.REPO_BRANCH.format(owner=self.owner.username, repo=self.name, branch=name) 708 ) 709 return Branch.parse_response(self.allspice_client, result)
Get a specific Branch of this Repository.
711 def add_branch(self, create_from: Ref, newname: str) -> "Branch": 712 """Add a branch to the repository""" 713 # Note: will only work with gitea 1.13 or higher! 714 715 ref_name = Util.data_params_for_ref(create_from) 716 if "ref" not in ref_name: 717 raise ValueError("create_from must be a Branch, Commit or string") 718 ref_name = ref_name["ref"] 719 720 data = {"new_branch_name": newname, "old_ref_name": ref_name} 721 result = self.allspice_client.requests_post( 722 Repository.REPO_BRANCHES % (self.owner.username, self.name), data=data 723 ) 724 return Branch.parse_response(self.allspice_client, result)
Add a branch to the repository
726 def get_issues( 727 self, 728 state: Literal["open", "closed", "all"] = "all", 729 search_query: Optional[str] = None, 730 labels: Optional[List[str]] = None, 731 milestones: Optional[List[Union[Milestone, str]]] = None, 732 assignee: Optional[Union[User, str]] = None, 733 since: Optional[datetime] = None, 734 before: Optional[datetime] = None, 735 ) -> List["Issue"]: 736 """ 737 Get all Issues of this Repository (open and closed) 738 739 https://hub.allspice.io/api/swagger#/repository/repoListIssues 740 741 All params of this method are optional filters. If you don't specify a filter, it 742 will not be applied. 743 744 :param state: The state of the Issues to get. If None, all Issues are returned. 745 :param search_query: Filter issues by text. This is equivalent to searching for 746 `search_query` in the Issues on the web interface. 747 :param labels: Filter issues by labels. 748 :param milestones: Filter issues by milestones. 749 :param assignee: Filter issues by the assigned user. 750 :param since: Filter issues by the date they were created. 751 :param before: Filter issues by the date they were created. 752 :return: A list of Issues. 753 """ 754 755 data = { 756 "state": state, 757 } 758 if search_query: 759 data["q"] = search_query 760 if labels: 761 data["labels"] = ",".join(labels) 762 if milestones: 763 data["milestone"] = ",".join( 764 [ 765 milestone.name if isinstance(milestone, Milestone) else milestone 766 for milestone in milestones 767 ] 768 ) 769 if assignee: 770 if isinstance(assignee, User): 771 data["assignee"] = assignee.username 772 else: 773 data["assignee"] = assignee 774 if since: 775 data["since"] = Util.format_time(since) 776 if before: 777 data["before"] = Util.format_time(before) 778 779 results = self.allspice_client.requests_get_paginated( 780 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 781 params=data, 782 ) 783 784 issues = [] 785 for result in results: 786 issue = Issue.parse_response(self.allspice_client, result) 787 # See Issue.request 788 setattr(issue, "_repository", self) 789 # This is mostly for compatibility with an older implementation 790 Issue._add_read_property("repo", self, issue) 791 issues.append(issue) 792 793 return issues
Get all Issues of this Repository (open and closed)
allspice.allspice.io/api/swagger#/repository/repoListIssues">https://huballspice.allspice.io/api/swagger#/repository/repoListIssues
All params of this method are optional filters. If you don't specify a filter, it will not be applied.
Parameters
- state: The state of the Issues to get. If None, all Issues are returned.
- search_query: Filter issues by text. This is equivalent to searching for
search_query
in the Issues on the web interface. - labels: Filter issues by labels.
- milestones: Filter issues by milestones.
- assignee: Filter issues by the assigned user.
- since: Filter issues by the date they were created.
- before: Filter issues by the date they were created.
Returns
A list of Issues.
795 def get_design_reviews( 796 self, 797 state: Literal["open", "closed", "all"] = "all", 798 milestone: Optional[Union[Milestone, str]] = None, 799 labels: Optional[List[str]] = None, 800 ) -> List["DesignReview"]: 801 """ 802 Get all Design Reviews of this Repository. 803 804 https://hub.allspice.io/api/swagger#/repository/repoListPullRequests 805 806 :param state: The state of the Design Reviews to get. If None, all Design Reviews 807 are returned. 808 :param milestone: The milestone of the Design Reviews to get. 809 :param labels: A list of label IDs to filter DRs by. 810 :return: A list of Design Reviews. 811 """ 812 813 params = { 814 "state": state, 815 } 816 if milestone: 817 if isinstance(milestone, Milestone): 818 params["milestone"] = milestone.name 819 else: 820 params["milestone"] = milestone 821 if labels: 822 params["labels"] = ",".join(labels) 823 824 results = self.allspice_client.requests_get_paginated( 825 self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name), 826 params=params, 827 ) 828 return [DesignReview.parse_response(self.allspice_client, result) for result in results]
Get all Design Reviews of this Repository.
allspice.allspice.io/api/swagger#/repository/repoListPullRequests">https://huballspice.allspice.io/api/swagger#/repository/repoListPullRequests
Parameters
- state: The state of the Design Reviews to get. If None, all Design Reviews are returned.
- milestone: The milestone of the Design Reviews to get.
- labels: A list of label IDs to filter DRs by.
Returns
A list of Design Reviews.
830 def get_commits( 831 self, 832 sha: Optional[str] = None, 833 path: Optional[str] = None, 834 stat: bool = True, 835 ) -> List["Commit"]: 836 """ 837 Get all the Commits of this Repository. 838 839 https://hub.allspice.io/api/swagger#/repository/repoGetAllCommits 840 841 :param sha: The SHA of the commit to start listing commits from. 842 :param path: filepath of a file/dir. 843 :param stat: Include the number of additions and deletions in the response. 844 Disable for speedup. 845 :return: A list of Commits. 846 """ 847 848 data = {} 849 if sha: 850 data["sha"] = sha 851 if path: 852 data["path"] = path 853 if not stat: 854 data["stat"] = False 855 856 try: 857 results = self.allspice_client.requests_get_paginated( 858 Repository.REPO_COMMITS % (self.owner.username, self.name), 859 params=data, 860 ) 861 except ConflictException as err: 862 logging.warning(err) 863 logging.warning("Repository %s/%s is Empty" % (self.owner.username, self.name)) 864 results = [] 865 return [Commit.parse_response(self.allspice_client, result) for result in results]
Get all the Commits of this Repository.
allspice.allspice.io/api/swagger#/repository/repoGetAllCommits">https://huballspice.allspice.io/api/swagger#/repository/repoGetAllCommits
Parameters
- sha: The SHA of the commit to start listing commits from.
- path: filepath of a file/dir.
- stat: Include the number of additions and deletions in the response. Disable for speedup.
Returns
A list of Commits.
867 def get_issues_state(self, state) -> List["Issue"]: 868 """ 869 DEPRECATED: Use get_issues() instead. 870 871 Get issues of state Issue.open or Issue.closed of a repository. 872 """ 873 874 assert state in [Issue.OPENED, Issue.CLOSED] 875 issues = [] 876 data = {"state": state} 877 results = self.allspice_client.requests_get_paginated( 878 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 879 params=data, 880 ) 881 for result in results: 882 issue = Issue.parse_response(self.allspice_client, result) 883 # adding data not contained in the issue response 884 # See Issue.request() 885 setattr(issue, "_repository", self) 886 Issue._add_read_property("repo", self, issue) 887 Issue._add_read_property("owner", self.owner, issue) 888 issues.append(issue) 889 return issues
DEPRECATED: Use get_issues() instead.
Get issues of state Issue.open or Issue.closed of a repository.
897 def get_user_time(self, username) -> float: 898 if isinstance(username, User): 899 username = username.username 900 results = self.allspice_client.requests_get( 901 Repository.REPO_USER_TIME % (self.owner.username, self.name, username) 902 ) 903 time = sum(r["time"] for r in results) 904 return time
909 def create_issue(self, title, assignees=frozenset(), description="") -> ApiObject: 910 data = { 911 "assignees": assignees, 912 "body": description, 913 "closed": False, 914 "title": title, 915 } 916 result = self.allspice_client.requests_post( 917 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 918 data=data, 919 ) 920 921 issue = Issue.parse_response(self.allspice_client, result) 922 setattr(issue, "_repository", self) 923 Issue._add_read_property("repo", self, issue) 924 return issue
926 def create_design_review( 927 self, 928 title: str, 929 head: Union[Branch, str], 930 base: Union[Branch, str], 931 assignees: Optional[Set[Union[User, str]]] = None, 932 body: Optional[str] = None, 933 due_date: Optional[datetime] = None, 934 milestone: Optional["Milestone"] = None, 935 ) -> "DesignReview": 936 """ 937 Create a new Design Review. 938 939 See https://hub.allspice.io/api/swagger#/repository/repoCreatePullRequest 940 941 :param title: Title of the Design Review 942 :param head: Branch or name of the branch to merge into the base branch 943 :param base: Branch or name of the branch to merge into 944 :param assignees: Optional. A list of users to assign this review. List can be of 945 User objects or of usernames. 946 :param body: An Optional Description for the Design Review. 947 :param due_date: An Optional Due date for the Design Review. 948 :param milestone: An Optional Milestone for the Design Review 949 :return: The created Design Review 950 """ 951 952 data: dict[str, Any] = { 953 "title": title, 954 } 955 956 if isinstance(head, Branch): 957 data["head"] = head.name 958 else: 959 data["head"] = head 960 if isinstance(base, Branch): 961 data["base"] = base.name 962 else: 963 data["base"] = base 964 if assignees: 965 data["assignees"] = [a.username if isinstance(a, User) else a for a in assignees] 966 if body: 967 data["body"] = body 968 if due_date: 969 data["due_date"] = Util.format_time(due_date) 970 if milestone: 971 data["milestone"] = milestone.id 972 973 result = self.allspice_client.requests_post( 974 self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name), 975 data=data, 976 ) 977 978 return DesignReview.parse_response(self.allspice_client, result)
Create a new Design Review.
See allspice.allspice.io/api/swagger#/repository/repoCreatePullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoCreatePullRequest
Parameters
- title: Title of the Design Review
- head: Branch or name of the branch to merge into the base branch
- base: Branch or name of the branch to merge into
- assignees: Optional. A list of users to assign this review. List can be of User objects or of usernames.
- body: An Optional Description for the Design Review.
- due_date: An Optional Due date for the Design Review.
- milestone: An Optional Milestone for the Design Review
Returns
The created Design Review
980 def create_milestone( 981 self, 982 title: str, 983 description: str, 984 due_date: Optional[str] = None, 985 state: str = "open", 986 ) -> "Milestone": 987 url = Repository.REPO_MILESTONES.format(owner=self.owner.username, repo=self.name) 988 data = {"title": title, "description": description, "state": state} 989 if due_date: 990 data["due_date"] = due_date 991 result = self.allspice_client.requests_post(url, data=data) 992 return Milestone.parse_response(self.allspice_client, result)
994 def create_gitea_hook(self, hook_url: str, events: List[str]): 995 url = f"/repos/{self.owner.username}/{self.name}/hooks" 996 data = { 997 "type": "gitea", 998 "config": {"content_type": "json", "url": hook_url}, 999 "events": events, 1000 "active": True, 1001 } 1002 return self.allspice_client.requests_post(url, data=data)
1012 def is_collaborator(self, username) -> bool: 1013 if isinstance(username, User): 1014 username = username.username 1015 try: 1016 # returns 204 if its ok, 404 if its not 1017 self.allspice_client.requests_get( 1018 Repository.REPO_IS_COLLABORATOR % (self.owner.username, self.name, username) 1019 ) 1020 return True 1021 except Exception: 1022 return False
1024 def get_users_with_access(self) -> Sequence[User]: 1025 url = f"/repos/{self.owner.username}/{self.name}/collaborators" 1026 response = self.allspice_client.requests_get(url) 1027 collabs = [User.parse_response(self.allspice_client, user) for user in response] 1028 if isinstance(self.owner, User): 1029 return [*collabs, self.owner] 1030 else: 1031 # owner must be org 1032 teams = self.owner.get_teams() 1033 for team in teams: 1034 team_repos = team.get_repos() 1035 if self.name in [n.name for n in team_repos]: 1036 collabs += team.get_members() 1037 return collabs
1043 def transfer_ownership( 1044 self, 1045 new_owner: Union[User, Organization], 1046 new_teams: Set[Team] | FrozenSet[Team] = frozenset(), 1047 ): 1048 url = Repository.REPO_TRANSFER.format(owner=self.owner.username, repo=self.name) 1049 data: dict[str, Any] = {"new_owner": new_owner.username} 1050 if isinstance(new_owner, Organization): 1051 new_team_ids = [team.id for team in new_teams if team in new_owner.get_teams()] 1052 data["team_ids"] = new_team_ids 1053 self.allspice_client.requests_post(url, data=data) 1054 # TODO: make sure this instance is either updated or discarded
1056 def get_git_content( 1057 self, 1058 ref: Optional["Ref"] = None, 1059 commit: "Optional[Commit]" = None, 1060 ) -> List[Content]: 1061 """ 1062 Get the metadata for all files in the root directory. 1063 1064 https://hub.allspice.io/api/swagger#/repository/repoGetContentsList 1065 1066 :param ref: branch or commit to get content from 1067 :param commit: commit to get content from (deprecated) 1068 """ 1069 url = f"/repos/{self.owner.username}/{self.name}/contents" 1070 data = Util.data_params_for_ref(ref or commit) 1071 1072 result = [ 1073 Content.parse_response(self.allspice_client, f) 1074 for f in self.allspice_client.requests_get(url, data) 1075 ] 1076 return result
Get the metadata for all files in the root directory.
allspice.allspice.io/api/swagger#/repository/repoGetContentsList">https://huballspice.allspice.io/api/swagger#/repository/repoGetContentsList
Parameters
- ref: branch or commit to get content from
- commit: commit to get content from (deprecated)
1078 def get_tree(self, ref: Optional[Ref] = None, recursive: bool = False) -> List[GitEntry]: 1079 """ 1080 Get the repository's tree on a given ref. 1081 1082 By default, this will only return the top-level entries in the tree. If you want 1083 to get the entire tree, set `recursive` to True. 1084 1085 :param ref: The ref to get the tree from. If not provided, the default branch is used. 1086 :param recursive: Whether to get the entire tree or just the top-level entries. 1087 """ 1088 1089 ref = Util.data_params_for_ref(ref).get("ref", self.default_branch) 1090 url = self.REPO_GET_TREE.format(owner=self.owner.username, repo=self.name, ref=ref) 1091 params = {"recursive": recursive} 1092 results = self.allspice_client.requests_get_paginated(url, params=params) 1093 return [GitEntry.parse_response(self.allspice_client, result) for result in results]
Get the repository's tree on a given ref.
By default, this will only return the top-level entries in the tree. If you want
to get the entire tree, set recursive
to True.
Parameters
- ref: The ref to get the tree from. If not provided, the default branch is used.
- recursive: Whether to get the entire tree or just the top-level entries.
1095 def get_file_content( 1096 self, 1097 content: Content, 1098 ref: Optional[Ref] = None, 1099 commit: Optional[Commit] = None, 1100 ) -> Union[str, List["Content"]]: 1101 """https://hub.allspice.io/api/swagger#/repository/repoGetContents""" 1102 url = f"/repos/{self.owner.username}/{self.name}/contents/{content.path}" 1103 data = Util.data_params_for_ref(ref or commit) 1104 1105 if content.type == Content.FILE: 1106 return self.allspice_client.requests_get(url, data)["content"] 1107 else: 1108 return [ 1109 Content.parse_response(self.allspice_client, f) 1110 for f in self.allspice_client.requests_get(url, data) 1111 ]
allspice.allspice.io/api/swagger#/repository/repoGetContents">https://huballspice.allspice.io/api/swagger#/repository/repoGetContents
1113 def get_raw_file( 1114 self, 1115 file_path: str, 1116 ref: Optional[Ref] = None, 1117 ) -> bytes: 1118 """ 1119 Get the raw, binary data of a single file. 1120 1121 Note 1: if the file you are requesting is a text file, you might want to 1122 use .decode() on the result to get a string. For example: 1123 1124 content = repo.get_raw_file("file.txt").decode("utf-8") 1125 1126 Note 2: this method will store the entire file in memory. If you want 1127 to download a large file, you might want to use `download_to_file` 1128 instead. 1129 1130 See https://hub.allspice.io/api/swagger#/repository/repoGetRawFile 1131 1132 :param file_path: The path to the file to get. 1133 :param ref: The branch or commit to get the file from. If not provided, 1134 the default branch is used. 1135 """ 1136 1137 url = self.REPO_GET_RAW_FILE.format( 1138 owner=self.owner.username, 1139 repo=self.name, 1140 path=file_path, 1141 ) 1142 params = Util.data_params_for_ref(ref) 1143 return self.allspice_client.requests_get_raw(url, params=params)
Get the raw, binary data of a single file.
Note 1: if the file you are requesting is a text file, you might want to use .decode() on the result to get a string. For example:
content = repo.get_raw_file("file.txt").decode("utf-8")
Note 2: this method will store the entire file in memory. If you want
to download a large file, you might want to use download_to_file
instead.
See allspice.allspice.io/api/swagger#/repository/repoGetRawFile">https://huballspice.allspice.io/api/swagger#/repository/repoGetRawFile
Parameters
- file_path: The path to the file to get.
- ref: The branch or commit to get the file from. If not provided, the default branch is used.
1145 def download_to_file( 1146 self, 1147 file_path: str, 1148 io: IO, 1149 ref: Optional[Ref] = None, 1150 ) -> None: 1151 """ 1152 Download the binary data of a file to a file-like object. 1153 1154 Example: 1155 1156 with open("schematic.DSN", "wb") as f: 1157 Repository.download_to_file("Schematics/my_schematic.DSN", f) 1158 1159 :param file_path: The path to the file in the repository from the root 1160 of the repository. 1161 :param io: The file-like object to write the data to. 1162 """ 1163 1164 url = self.allspice_client._AllSpice__get_url( 1165 self.REPO_GET_RAW_FILE.format( 1166 owner=self.owner.username, 1167 repo=self.name, 1168 path=file_path, 1169 ) 1170 ) 1171 params = Util.data_params_for_ref(ref) 1172 response = self.allspice_client.requests.get( 1173 url, 1174 params=params, 1175 headers=self.allspice_client.headers, 1176 stream=True, 1177 ) 1178 1179 for chunk in response.iter_content(chunk_size=4096): 1180 if chunk: 1181 io.write(chunk)
Download the binary data of a file to a file-like object.
Example:
with open("schematic.DSN", "wb") as f:
Repository.download_to_file("Schematics/my_schematic.DSN", f)
Parameters
- file_path: The path to the file in the repository from the root of the repository.
- io: The file-like object to write the data to.
1183 def get_generated_json(self, content: Union[Content, str], ref: Optional[Ref] = None) -> dict: 1184 """ 1185 Get the json blob for a cad file if it exists, otherwise enqueue 1186 a new job and return a 503 status. 1187 1188 WARNING: This is still experimental and not recommended for critical 1189 applications. The structure and content of the returned dictionary can 1190 change at any time. 1191 1192 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON 1193 """ 1194 1195 if isinstance(content, Content): 1196 content = content.path 1197 1198 url = self.REPO_GET_ALLSPICE_JSON.format( 1199 owner=self.owner.username, 1200 repo=self.name, 1201 content=content, 1202 ) 1203 data = Util.data_params_for_ref(ref) 1204 return self.allspice_client.requests_get(url, data)
Get the json blob for a cad file if it exists, otherwise enqueue a new job and return a 503 status.
WARNING: This is still experimental and not recommended for critical applications. The structure and content of the returned dictionary can change at any time.
See allspice.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON">https://huballspice.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON
1206 def get_generated_svg(self, content: Union[Content, str], ref: Optional[Ref] = None) -> bytes: 1207 """ 1208 Get the svg blob for a cad file if it exists, otherwise enqueue 1209 a new job and return a 503 status. 1210 1211 WARNING: This is still experimental and not yet recommended for 1212 critical applications. The content of the returned svg can change 1213 at any time. 1214 1215 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG 1216 """ 1217 1218 if isinstance(content, Content): 1219 content = content.path 1220 1221 url = self.REPO_GET_ALLSPICE_SVG.format( 1222 owner=self.owner.username, 1223 repo=self.name, 1224 content=content, 1225 ) 1226 data = Util.data_params_for_ref(ref) 1227 return self.allspice_client.requests_get_raw(url, data)
Get the svg blob for a cad file if it exists, otherwise enqueue a new job and return a 503 status.
WARNING: This is still experimental and not yet recommended for critical applications. The content of the returned svg can change at any time.
See allspice.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG">https://huballspice.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG
1229 def get_generated_projectdata( 1230 self, content: Union[Content, str], ref: Optional[Ref] = None 1231 ) -> dict: 1232 """ 1233 Get the json project data based on the cad file provided 1234 1235 WARNING: This is still experimental and not yet recommended for 1236 critical applications. The content of the returned dictionary can change 1237 at any time. 1238 1239 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceProject 1240 """ 1241 if isinstance(content, Content): 1242 content = content.path 1243 1244 url = self.REPO_GET_ALLSPICE_PROJECT.format( 1245 owner=self.owner.username, 1246 repo=self.name, 1247 content=content, 1248 ) 1249 data = Util.data_params_for_ref(ref) 1250 return self.allspice_client.requests_get(url, data)
Get the json project data based on the cad file provided
WARNING: This is still experimental and not yet recommended for critical applications. The content of the returned dictionary can change at any time.
See allspice.allspice.io/api/swagger#/repository/repoGetAllSpiceProject">https://huballspice.allspice.io/api/swagger#/repository/repoGetAllSpiceProject
1252 def create_file(self, file_path: str, content: str, data: Optional[dict] = None): 1253 """https://hub.allspice.io/api/swagger#/repository/repoCreateFile""" 1254 if not data: 1255 data = {} 1256 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1257 data.update({"content": content}) 1258 return self.allspice_client.requests_post(url, data)
allspice.allspice.io/api/swagger#/repository/repoCreateFile">https://huballspice.allspice.io/api/swagger#/repository/repoCreateFile
1260 def change_file(self, file_path: str, file_sha: str, content: str, data: Optional[dict] = None): 1261 """https://hub.allspice.io/api/swagger#/repository/repoCreateFile""" 1262 if not data: 1263 data = {} 1264 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1265 data.update({"sha": file_sha, "content": content}) 1266 return self.allspice_client.requests_put(url, data)
allspice.allspice.io/api/swagger#/repository/repoCreateFile">https://huballspice.allspice.io/api/swagger#/repository/repoCreateFile
1268 def delete_file(self, file_path: str, file_sha: str, data: Optional[dict] = None): 1269 """https://hub.allspice.io/api/swagger#/repository/repoDeleteFile""" 1270 if not data: 1271 data = {} 1272 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1273 data.update({"sha": file_sha}) 1274 return self.allspice_client.requests_delete(url, data)
allspice.allspice.io/api/swagger#/repository/repoDeleteFile">https://huballspice.allspice.io/api/swagger#/repository/repoDeleteFile
1276 def get_archive( 1277 self, 1278 ref: Ref = "main", 1279 archive_format: ArchiveFormat = ArchiveFormat.ZIP, 1280 ) -> bytes: 1281 """ 1282 Download all the files in a specific ref of a repository as a zip or tarball 1283 archive. 1284 1285 https://hub.allspice.io/api/swagger#/repository/repoGetArchive 1286 1287 :param ref: branch or commit to get content from, defaults to the "main" branch 1288 :param archive_format: zip or tar, defaults to zip 1289 """ 1290 1291 ref_string = Util.data_params_for_ref(ref)["ref"] 1292 url = self.REPO_GET_ARCHIVE.format( 1293 owner=self.owner.username, 1294 repo=self.name, 1295 ref=ref_string, 1296 format=archive_format.value, 1297 ) 1298 return self.allspice_client.requests_get_raw(url)
Download all the files in a specific ref of a repository as a zip or tarball archive.
allspice.allspice.io/api/swagger#/repository/repoGetArchive">https://huballspice.allspice.io/api/swagger#/repository/repoGetArchive
Parameters
- ref: branch or commit to get content from, defaults to the "main" branch
- archive_format: zip or tar, defaults to zip
1300 def get_topics(self) -> list[str]: 1301 """ 1302 Gets the list of topics on this repository. 1303 1304 See http://localhost:3000/api/swagger#/repository/repoListTopics 1305 """ 1306 1307 url = self.REPO_GET_TOPICS.format( 1308 owner=self.owner.username, 1309 repo=self.name, 1310 ) 1311 return self.allspice_client.requests_get(url)["topics"]
Gets the list of topics on this repository.
See http://localhost:3000/api/swagger#/repository/repoListTopics
1313 def add_topic(self, topic: str): 1314 """ 1315 Adds a topic to the repository. 1316 1317 See https://hub.allspice.io/api/swagger#/repository/repoAddTopic 1318 1319 :param topic: The topic to add. Topic names must consist only of 1320 lowercase letters, numnbers and dashes (-), and cannot start with 1321 dashes. Topic names also must be under 35 characters long. 1322 """ 1323 1324 url = self.REPO_ADD_TOPIC.format(owner=self.owner.username, repo=self.name, topic=topic) 1325 self.allspice_client.requests_put(url)
Adds a topic to the repository.
See allspice.allspice.io/api/swagger#/repository/repoAddTopic">https://huballspice.allspice.io/api/swagger#/repository/repoAddTopic
Parameters
- topic: The topic to add. Topic names must consist only of lowercase letters, numnbers and dashes (-), and cannot start with dashes. Topic names also must be under 35 characters long.
1327 def create_release( 1328 self, 1329 tag_name: str, 1330 name: Optional[str] = None, 1331 body: Optional[str] = None, 1332 draft: bool = False, 1333 ): 1334 """ 1335 Create a release for this repository. The release will be created for 1336 the tag with the given name. If there is no tag with this name, create 1337 the tag first. 1338 1339 See https://hub.allspice.io/api/swagger#/repository/repoCreateRelease 1340 """ 1341 1342 url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name) 1343 data = { 1344 "tag_name": tag_name, 1345 "draft": draft, 1346 } 1347 if name is not None: 1348 data["name"] = name 1349 if body is not None: 1350 data["body"] = body 1351 response = self.allspice_client.requests_post(url, data) 1352 return Release.parse_response(self.allspice_client, response, self)
Create a release for this repository. The release will be created for the tag with the given name. If there is no tag with this name, create the tag first.
See allspice.allspice.io/api/swagger#/repository/repoCreateRelease">https://huballspice.allspice.io/api/swagger#/repository/repoCreateRelease
1354 def get_releases( 1355 self, draft: Optional[bool] = None, pre_release: Optional[bool] = None 1356 ) -> List[Release]: 1357 """ 1358 Get the list of releases for this repository. 1359 1360 See https://hub.allspice.io/api/swagger#/repository/repoListReleases 1361 """ 1362 1363 data = {} 1364 1365 if draft is not None: 1366 data["draft"] = draft 1367 if pre_release is not None: 1368 data["pre-release"] = pre_release 1369 1370 url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name) 1371 responses = self.allspice_client.requests_get_paginated(url, params=data) 1372 1373 return [ 1374 Release.parse_response(self.allspice_client, response, self) for response in responses 1375 ]
Get the list of releases for this repository.
See allspice.allspice.io/api/swagger#/repository/repoListReleases">https://huballspice.allspice.io/api/swagger#/repository/repoListReleases
1377 def get_latest_release(self) -> Release: 1378 """ 1379 Get the latest release for this repository. 1380 1381 See https://hub.allspice.io/api/swagger#/repository/repoGetLatestRelease 1382 """ 1383 1384 url = self.REPO_GET_LATEST_RELEASE.format(owner=self.owner.username, repo=self.name) 1385 response = self.allspice_client.requests_get(url) 1386 release = Release.parse_response(self.allspice_client, response, self) 1387 return release
Get the latest release for this repository.
See allspice.allspice.io/api/swagger#/repository/repoGetLatestRelease">https://huballspice.allspice.io/api/swagger#/repository/repoGetLatestRelease
1389 def get_release_by_tag(self, tag: str) -> Release: 1390 """ 1391 Get a release by its tag. 1392 1393 See https://hub.allspice.io/api/swagger#/repository/repoGetReleaseByTag 1394 """ 1395 1396 url = self.REPO_GET_RELEASE_BY_TAG.format( 1397 owner=self.owner.username, repo=self.name, tag=tag 1398 ) 1399 response = self.allspice_client.requests_get(url) 1400 release = Release.parse_response(self.allspice_client, response, self) 1401 return release
Get a release by its tag.
See allspice.allspice.io/api/swagger#/repository/repoGetReleaseByTag">https://huballspice.allspice.io/api/swagger#/repository/repoGetReleaseByTag
1403 def get_commit_statuses( 1404 self, 1405 commit: Union[str, Commit], 1406 sort: Optional[CommitStatusSort] = None, 1407 state: Optional[CommitStatusState] = None, 1408 ) -> List[CommitStatus]: 1409 """ 1410 Get a list of statuses for a commit. 1411 1412 This is roughly equivalent to the Commit.get_statuses method, but this 1413 method allows you to sort and filter commits and is more convenient if 1414 you have a commit SHA and don't need to get the commit itself. 1415 1416 See https://hub.allspice.io/api/swagger#/repository/repoListStatuses 1417 """ 1418 1419 if isinstance(commit, Commit): 1420 commit = commit.sha 1421 1422 params = {} 1423 if sort is not None: 1424 params["sort"] = sort.value 1425 if state is not None: 1426 params["state"] = state.value 1427 1428 url = self.REPO_GET_COMMIT_STATUS.format( 1429 owner=self.owner.username, repo=self.name, sha=commit 1430 ) 1431 response = self.allspice_client.requests_get_paginated(url, params=params) 1432 return [CommitStatus.parse_response(self.allspice_client, status) for status in response]
Get a list of statuses for a commit.
This is roughly equivalent to the Commit.get_statuses method, but this method allows you to sort and filter commits and is more convenient if you have a commit SHA and don't need to get the commit itself.
See allspice.allspice.io/api/swagger#/repository/repoListStatuses">https://huballspice.allspice.io/api/swagger#/repository/repoListStatuses
1434 def create_commit_status( 1435 self, 1436 commit: Union[str, Commit], 1437 context: Optional[str] = None, 1438 description: Optional[str] = None, 1439 state: Optional[CommitStatusState] = None, 1440 target_url: Optional[str] = None, 1441 ) -> CommitStatus: 1442 """ 1443 Create a status on a commit. 1444 1445 See https://hub.allspice.io/api/swagger#/repository/repoCreateStatus 1446 """ 1447 1448 if isinstance(commit, Commit): 1449 commit = commit.sha 1450 1451 data = {} 1452 if context is not None: 1453 data["context"] = context 1454 if description is not None: 1455 data["description"] = description 1456 if state is not None: 1457 data["state"] = state.value 1458 if target_url is not None: 1459 data["target_url"] = target_url 1460 1461 url = self.REPO_GET_COMMIT_STATUS.format( 1462 owner=self.owner.username, repo=self.name, sha=commit 1463 ) 1464 response = self.allspice_client.requests_post(url, data=data) 1465 return CommitStatus.parse_response(self.allspice_client, response)
Create a status on a commit.
See allspice.allspice.io/api/swagger#/repository/repoCreateStatus">https://huballspice.allspice.io/api/swagger#/repository/repoCreateStatus
568 class ArchiveFormat(Enum): 569 """ 570 Archive formats for Repository.get_archive 571 """ 572 573 TAR = "tar.gz" 574 ZIP = "zip"
Archive formats for Repository.get_archive
576 class CommitStatusSort(Enum): 577 """ 578 Sort order for Repository.get_commit_status 579 """ 580 581 OLDEST = "oldest" 582 RECENT_UPDATE = "recentupdate" 583 LEAST_UPDATE = "leastupdate" 584 LEAST_INDEX = "leastindex" 585 HIGHEST_INDEX = "highestindex"
Sort order for Repository.get_commit_status
1474class Milestone(ApiObject): 1475 allow_merge_commits: Any 1476 allow_rebase: Any 1477 allow_rebase_explicit: Any 1478 allow_squash_merge: Any 1479 archived: Any 1480 closed_at: Any 1481 closed_issues: int 1482 created_at: str 1483 default_branch: Any 1484 description: str 1485 due_on: Any 1486 has_issues: Any 1487 has_pull_requests: Any 1488 has_wiki: Any 1489 id: int 1490 ignore_whitespace_conflicts: Any 1491 name: Any 1492 open_issues: int 1493 private: Any 1494 state: str 1495 title: str 1496 updated_at: str 1497 website: Any 1498 1499 API_OBJECT = """/repos/{owner}/{repo}/milestones/{number}""" # <owner, repo> 1500 1501 def __init__(self, allspice_client): 1502 super().__init__(allspice_client) 1503 1504 def __eq__(self, other): 1505 if not isinstance(other, Milestone): 1506 return False 1507 return self.allspice_client == other.allspice_client and self.id == other.id 1508 1509 def __hash__(self): 1510 return hash(self.allspice_client) ^ hash(self.id) 1511 1512 _fields_to_parsers: ClassVar[dict] = { 1513 "closed_at": lambda _, t: Util.convert_time(t), 1514 "due_on": lambda _, t: Util.convert_time(t), 1515 } 1516 1517 _patchable_fields: ClassVar[set[str]] = { 1518 "allow_merge_commits", 1519 "allow_rebase", 1520 "allow_rebase_explicit", 1521 "allow_squash_merge", 1522 "archived", 1523 "default_branch", 1524 "description", 1525 "has_issues", 1526 "has_pull_requests", 1527 "has_wiki", 1528 "ignore_whitespace_conflicts", 1529 "name", 1530 "private", 1531 "website", 1532 } 1533 1534 @classmethod 1535 def request(cls, allspice_client, owner: str, repo: str, number: str): 1536 return cls._request(allspice_client, {"owner": owner, "repo": repo, "number": number})
1539class Attachment(ReadonlyApiObject): 1540 """ 1541 An asset attached to a comment. 1542 1543 You cannot edit or delete the attachment from this object - see the instance methods 1544 Comment.edit_attachment and delete_attachment for that. 1545 """ 1546 1547 browser_download_url: str 1548 created_at: str 1549 download_count: int 1550 id: int 1551 name: str 1552 size: int 1553 uuid: str 1554 1555 def __init__(self, allspice_client): 1556 super().__init__(allspice_client) 1557 1558 def __eq__(self, other): 1559 if not isinstance(other, Attachment): 1560 return False 1561 1562 return self.uuid == other.uuid 1563 1564 def __hash__(self): 1565 return hash(self.uuid) 1566 1567 def download_to_file(self, io: IO): 1568 """ 1569 Download the raw, binary data of this Attachment to a file-like object. 1570 1571 Example: 1572 1573 with open("my_file.zip", "wb") as f: 1574 attachment.download_to_file(f) 1575 1576 :param io: The file-like object to write the data to. 1577 """ 1578 1579 response = self.allspice_client.requests.get( 1580 self.browser_download_url, 1581 headers=self.allspice_client.headers, 1582 stream=True, 1583 ) 1584 # 4kb chunks 1585 for chunk in response.iter_content(chunk_size=4096): 1586 if chunk: 1587 io.write(chunk)
An asset attached to a comment.
You cannot edit or delete the attachment from this object - see the instance methods Comment.edit_attachment and delete_attachment for that.
1567 def download_to_file(self, io: IO): 1568 """ 1569 Download the raw, binary data of this Attachment to a file-like object. 1570 1571 Example: 1572 1573 with open("my_file.zip", "wb") as f: 1574 attachment.download_to_file(f) 1575 1576 :param io: The file-like object to write the data to. 1577 """ 1578 1579 response = self.allspice_client.requests.get( 1580 self.browser_download_url, 1581 headers=self.allspice_client.headers, 1582 stream=True, 1583 ) 1584 # 4kb chunks 1585 for chunk in response.iter_content(chunk_size=4096): 1586 if chunk: 1587 io.write(chunk)
Download the raw, binary data of this Attachment to a file-like object.
Example:
with open("my_file.zip", "wb") as f:
attachment.download_to_file(f)
Parameters
- io: The file-like object to write the data to.
Inherited Members
1590class Comment(ApiObject): 1591 assets: List[Union[Any, Dict[str, Union[int, str]]]] 1592 body: str 1593 created_at: datetime 1594 html_url: str 1595 id: int 1596 issue_url: str 1597 original_author: str 1598 original_author_id: int 1599 pull_request_url: str 1600 updated_at: datetime 1601 user: User 1602 1603 API_OBJECT = """/repos/{owner}/{repo}/issues/comments/{id}""" 1604 GET_ATTACHMENTS_PATH = """/repos/{owner}/{repo}/issues/comments/{id}/assets""" 1605 ATTACHMENT_PATH = """/repos/{owner}/{repo}/issues/comments/{id}/assets/{attachment_id}""" 1606 1607 def __init__(self, allspice_client): 1608 super().__init__(allspice_client) 1609 1610 def __eq__(self, other): 1611 if not isinstance(other, Comment): 1612 return False 1613 return self.repository == other.repository and self.id == other.id 1614 1615 def __hash__(self): 1616 return hash(self.repository) ^ hash(self.id) 1617 1618 @classmethod 1619 def request(cls, allspice_client, owner: str, repo: str, id: str) -> "Comment": 1620 return cls._request(allspice_client, {"owner": owner, "repo": repo, "id": id}) 1621 1622 _fields_to_parsers: ClassVar[dict] = { 1623 "user": lambda allspice_client, r: User.parse_response(allspice_client, r), 1624 "created_at": lambda _, t: Util.convert_time(t), 1625 "updated_at": lambda _, t: Util.convert_time(t), 1626 } 1627 1628 _patchable_fields: ClassVar[set[str]] = {"body"} 1629 1630 @property 1631 def parent_url(self) -> str: 1632 """URL of the parent of this comment (the issue or the pull request)""" 1633 1634 if self.issue_url is not None and self.issue_url != "": 1635 return self.issue_url 1636 else: 1637 return self.pull_request_url 1638 1639 @cached_property 1640 def repository(self) -> Repository: 1641 """The repository this comment was posted on.""" 1642 1643 owner_name, repo_name = self.parent_url.split("/")[-4:-2] 1644 return Repository.request(self.allspice_client, owner_name, repo_name) 1645 1646 def __fields_for_path(self): 1647 return { 1648 "owner": self.repository.owner.username, 1649 "repo": self.repository.name, 1650 "id": self.id, 1651 } 1652 1653 def commit(self): 1654 self._commit(self.__fields_for_path()) 1655 1656 def delete(self): 1657 self.allspice_client.requests_delete(self.API_OBJECT.format(**self.__fields_for_path())) 1658 self.deleted = True 1659 1660 def get_attachments(self) -> List[Attachment]: 1661 """ 1662 Get all attachments on this comment. This returns Attachment objects, which 1663 contain a link to download the attachment. 1664 1665 https://hub.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments 1666 """ 1667 1668 results = self.allspice_client.requests_get( 1669 self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()) 1670 ) 1671 return [Attachment.parse_response(self.allspice_client, result) for result in results] 1672 1673 def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment: 1674 """ 1675 Create an attachment on this comment. 1676 1677 https://hub.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment 1678 1679 :param file: The file to attach. This should be a file-like object. 1680 :param name: The name of the file. If not provided, the name of the file will be 1681 used. 1682 :return: The created attachment. 1683 """ 1684 1685 args: dict[str, Any] = { 1686 "files": {"attachment": file}, 1687 } 1688 if name is not None: 1689 args["params"] = {"name": name} 1690 1691 result = self.allspice_client.requests_post( 1692 self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()), 1693 **args, 1694 ) 1695 return Attachment.parse_response(self.allspice_client, result) 1696 1697 def edit_attachment(self, attachment: Attachment, data: dict) -> Attachment: 1698 """ 1699 Edit an attachment. 1700 1701 The list of params that can be edited is available at 1702 https://hub.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment 1703 1704 :param attachment: The attachment to be edited 1705 :param data: The data parameter should be a dictionary of the fields to edit. 1706 :return: The edited attachment 1707 """ 1708 1709 args = { 1710 **self.__fields_for_path(), 1711 "attachment_id": attachment.id, 1712 } 1713 result = self.allspice_client.requests_patch( 1714 self.ATTACHMENT_PATH.format(**args), 1715 data=data, 1716 ) 1717 return Attachment.parse_response(self.allspice_client, result) 1718 1719 def delete_attachment(self, attachment: Attachment): 1720 """https://hub.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment""" 1721 1722 args = { 1723 **self.__fields_for_path(), 1724 "attachment_id": attachment.id, 1725 } 1726 self.allspice_client.requests_delete(self.ATTACHMENT_PATH.format(**args)) 1727 attachment.deleted = True
1630 @property 1631 def parent_url(self) -> str: 1632 """URL of the parent of this comment (the issue or the pull request)""" 1633 1634 if self.issue_url is not None and self.issue_url != "": 1635 return self.issue_url 1636 else: 1637 return self.pull_request_url
URL of the parent of this comment (the issue or the pull request)
1639 @cached_property 1640 def repository(self) -> Repository: 1641 """The repository this comment was posted on.""" 1642 1643 owner_name, repo_name = self.parent_url.split("/")[-4:-2] 1644 return Repository.request(self.allspice_client, owner_name, repo_name)
The repository this comment was posted on.
1660 def get_attachments(self) -> List[Attachment]: 1661 """ 1662 Get all attachments on this comment. This returns Attachment objects, which 1663 contain a link to download the attachment. 1664 1665 https://hub.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments 1666 """ 1667 1668 results = self.allspice_client.requests_get( 1669 self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()) 1670 ) 1671 return [Attachment.parse_response(self.allspice_client, result) for result in results]
Get all attachments on this comment. This returns Attachment objects, which contain a link to download the attachment.
allspice.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments">https://huballspice.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments
1673 def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment: 1674 """ 1675 Create an attachment on this comment. 1676 1677 https://hub.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment 1678 1679 :param file: The file to attach. This should be a file-like object. 1680 :param name: The name of the file. If not provided, the name of the file will be 1681 used. 1682 :return: The created attachment. 1683 """ 1684 1685 args: dict[str, Any] = { 1686 "files": {"attachment": file}, 1687 } 1688 if name is not None: 1689 args["params"] = {"name": name} 1690 1691 result = self.allspice_client.requests_post( 1692 self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()), 1693 **args, 1694 ) 1695 return Attachment.parse_response(self.allspice_client, result)
Create an attachment on this comment.
allspice.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment
Parameters
- file: The file to attach. This should be a file-like object.
- name: The name of the file. If not provided, the name of the file will be used.
Returns
The created attachment.
1697 def edit_attachment(self, attachment: Attachment, data: dict) -> Attachment: 1698 """ 1699 Edit an attachment. 1700 1701 The list of params that can be edited is available at 1702 https://hub.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment 1703 1704 :param attachment: The attachment to be edited 1705 :param data: The data parameter should be a dictionary of the fields to edit. 1706 :return: The edited attachment 1707 """ 1708 1709 args = { 1710 **self.__fields_for_path(), 1711 "attachment_id": attachment.id, 1712 } 1713 result = self.allspice_client.requests_patch( 1714 self.ATTACHMENT_PATH.format(**args), 1715 data=data, 1716 ) 1717 return Attachment.parse_response(self.allspice_client, result)
Edit an attachment.
The list of params that can be edited is available at allspice.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment
Parameters
- attachment: The attachment to be edited
- data: The data parameter should be a dictionary of the fields to edit.
Returns
The edited attachment
1719 def delete_attachment(self, attachment: Attachment): 1720 """https://hub.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment""" 1721 1722 args = { 1723 **self.__fields_for_path(), 1724 "attachment_id": attachment.id, 1725 } 1726 self.allspice_client.requests_delete(self.ATTACHMENT_PATH.format(**args)) 1727 attachment.deleted = True
allspice.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment
1730class Commit(ReadonlyApiObject): 1731 author: User 1732 commit: Dict[str, Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]] 1733 committer: Dict[str, Union[int, str, bool]] 1734 created: str 1735 files: List[Dict[str, str]] 1736 html_url: str 1737 inner_commit: Dict[str, Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]] 1738 parents: List[Union[Dict[str, str], Any]] 1739 sha: str 1740 stats: Dict[str, int] 1741 url: str 1742 1743 API_OBJECT = """/repos/{owner}/{repo}/commits/{sha}""" 1744 COMMIT_GET_STATUS = """/repos/{owner}/{repo}/commits/{sha}/status""" 1745 COMMIT_GET_STATUSES = """/repos/{owner}/{repo}/commits/{sha}/statuses""" 1746 1747 # Regex to extract owner and repo names from the url property 1748 URL_REGEXP = re.compile(r"/repos/([^/]+)/([^/]+)/git/commits") 1749 1750 def __init__(self, allspice_client): 1751 super().__init__(allspice_client) 1752 1753 _fields_to_parsers: ClassVar[dict] = { 1754 # NOTE: api may return None for commiters that are no allspice users 1755 "author": lambda allspice_client, u: ( 1756 User.parse_response(allspice_client, u) if u else None 1757 ) 1758 } 1759 1760 def __eq__(self, other): 1761 if not isinstance(other, Commit): 1762 return False 1763 return self.sha == other.sha 1764 1765 def __hash__(self): 1766 return hash(self.sha) 1767 1768 @classmethod 1769 def parse_response(cls, allspice_client, result) -> "Commit": 1770 commit_cache = result["commit"] 1771 api_object = cls(allspice_client) 1772 cls._initialize(allspice_client, api_object, result) 1773 # inner_commit for legacy reasons 1774 Commit._add_read_property("inner_commit", commit_cache, api_object) 1775 return api_object 1776 1777 def get_status(self) -> CommitCombinedStatus: 1778 """ 1779 Get a combined status consisting of all statues on this commit. 1780 1781 Note that the returned object is a CommitCombinedStatus object, which 1782 also contains a list of all statuses on the commit. 1783 1784 https://hub.allspice.io/api/swagger#/repository/repoGetCommitStatus 1785 """ 1786 1787 result = self.allspice_client.requests_get( 1788 self.COMMIT_GET_STATUS.format(**self._fields_for_path) 1789 ) 1790 return CommitCombinedStatus.parse_response(self.allspice_client, result) 1791 1792 def get_statuses(self) -> List[CommitStatus]: 1793 """ 1794 Get a list of all statuses on this commit. 1795 1796 https://hub.allspice.io/api/swagger#/repository/repoListCommitStatuses 1797 """ 1798 1799 results = self.allspice_client.requests_get( 1800 self.COMMIT_GET_STATUSES.format(**self._fields_for_path) 1801 ) 1802 return [CommitStatus.parse_response(self.allspice_client, result) for result in results] 1803 1804 @cached_property 1805 def _fields_for_path(self) -> dict[str, str]: 1806 matches = self.URL_REGEXP.search(self.url) 1807 if not matches: 1808 raise ValueError(f"Invalid commit URL: {self.url}") 1809 1810 return { 1811 "owner": matches.group(1), 1812 "repo": matches.group(2), 1813 "sha": self.sha, 1814 }
1768 @classmethod 1769 def parse_response(cls, allspice_client, result) -> "Commit": 1770 commit_cache = result["commit"] 1771 api_object = cls(allspice_client) 1772 cls._initialize(allspice_client, api_object, result) 1773 # inner_commit for legacy reasons 1774 Commit._add_read_property("inner_commit", commit_cache, api_object) 1775 return api_object
1777 def get_status(self) -> CommitCombinedStatus: 1778 """ 1779 Get a combined status consisting of all statues on this commit. 1780 1781 Note that the returned object is a CommitCombinedStatus object, which 1782 also contains a list of all statuses on the commit. 1783 1784 https://hub.allspice.io/api/swagger#/repository/repoGetCommitStatus 1785 """ 1786 1787 result = self.allspice_client.requests_get( 1788 self.COMMIT_GET_STATUS.format(**self._fields_for_path) 1789 ) 1790 return CommitCombinedStatus.parse_response(self.allspice_client, result)
Get a combined status consisting of all statues on this commit.
Note that the returned object is a CommitCombinedStatus object, which also contains a list of all statuses on the commit.
allspice.allspice.io/api/swagger#/repository/repoGetCommitStatus">https://huballspice.allspice.io/api/swagger#/repository/repoGetCommitStatus
1792 def get_statuses(self) -> List[CommitStatus]: 1793 """ 1794 Get a list of all statuses on this commit. 1795 1796 https://hub.allspice.io/api/swagger#/repository/repoListCommitStatuses 1797 """ 1798 1799 results = self.allspice_client.requests_get( 1800 self.COMMIT_GET_STATUSES.format(**self._fields_for_path) 1801 ) 1802 return [CommitStatus.parse_response(self.allspice_client, result) for result in results]
Get a list of all statuses on this commit.
allspice.allspice.io/api/swagger#/repository/repoListCommitStatuses">https://huballspice.allspice.io/api/swagger#/repository/repoListCommitStatuses
Inherited Members
1817class CommitStatusState(Enum): 1818 PENDING = "pending" 1819 SUCCESS = "success" 1820 ERROR = "error" 1821 FAILURE = "failure" 1822 WARNING = "warning" 1823 1824 @classmethod 1825 def try_init(cls, value: str) -> CommitStatusState | str: 1826 """ 1827 Try converting a string to the enum, and if that fails, return the 1828 string itself. 1829 """ 1830 1831 try: 1832 return cls(value) 1833 except ValueError: 1834 return value
1824 @classmethod 1825 def try_init(cls, value: str) -> CommitStatusState | str: 1826 """ 1827 Try converting a string to the enum, and if that fails, return the 1828 string itself. 1829 """ 1830 1831 try: 1832 return cls(value) 1833 except ValueError: 1834 return value
Try converting a string to the enum, and if that fails, return the string itself.
1837class CommitStatus(ReadonlyApiObject): 1838 context: str 1839 created_at: str 1840 creator: User 1841 description: str 1842 id: int 1843 status: CommitStatusState 1844 target_url: str 1845 updated_at: str 1846 url: str 1847 1848 def __init__(self, allspice_client): 1849 super().__init__(allspice_client) 1850 1851 _fields_to_parsers: ClassVar[dict] = { 1852 # Gitea/ASH doesn't actually validate that the status is a "valid" 1853 # status, so we can expect empty or unknown strings in the status field. 1854 "status": lambda _, s: CommitStatusState.try_init(s), 1855 "creator": lambda allspice_client, u: ( 1856 User.parse_response(allspice_client, u) if u else None 1857 ), 1858 } 1859 1860 def __eq__(self, other): 1861 if not isinstance(other, CommitStatus): 1862 return False 1863 return self.id == other.id 1864 1865 def __hash__(self): 1866 return hash(self.id)
Inherited Members
1869class CommitCombinedStatus(ReadonlyApiObject): 1870 commit_url: str 1871 repository: Repository 1872 sha: str 1873 state: CommitStatusState 1874 statuses: List["CommitStatus"] 1875 total_count: int 1876 url: str 1877 1878 def __init__(self, allspice_client): 1879 super().__init__(allspice_client) 1880 1881 _fields_to_parsers: ClassVar[dict] = { 1882 # See CommitStatus 1883 "state": lambda _, s: CommitStatusState.try_init(s), 1884 "statuses": lambda allspice_client, statuses: [ 1885 CommitStatus.parse_response(allspice_client, status) for status in statuses 1886 ], 1887 "repository": lambda allspice_client, r: Repository.parse_response(allspice_client, r), 1888 } 1889 1890 def __eq__(self, other): 1891 if not isinstance(other, CommitCombinedStatus): 1892 return False 1893 return self.sha == other.sha 1894 1895 def __hash__(self): 1896 return hash(self.sha) 1897 1898 @classmethod 1899 def parse_response(cls, allspice_client, result) -> "CommitCombinedStatus": 1900 api_object = cls(allspice_client) 1901 cls._initialize(allspice_client, api_object, result) 1902 return api_object
Inherited Members
1905class Issue(ApiObject): 1906 """ 1907 An issue on a repository. 1908 1909 Note: `Issue.assets` may not have any entries even if the issue has 1910 attachments. This happens when an issue is fetched via a bulk method like 1911 `Repository.get_issues`. In most cases, prefer using 1912 `Issue.get_attachments` to get the attachments on an issue. 1913 """ 1914 1915 assets: List[Union[Any, "Attachment"]] 1916 assignee: Any 1917 assignees: Any 1918 body: str 1919 closed_at: Any 1920 comments: int 1921 created_at: str 1922 due_date: Any 1923 html_url: str 1924 id: int 1925 is_locked: bool 1926 labels: List[Any] 1927 milestone: Optional["Milestone"] 1928 number: int 1929 original_author: str 1930 original_author_id: int 1931 pin_order: int 1932 pull_request: Any 1933 ref: str 1934 repository: Dict[str, Union[int, str]] 1935 state: str 1936 title: str 1937 updated_at: str 1938 url: str 1939 user: User 1940 1941 API_OBJECT = """/repos/{owner}/{repo}/issues/{index}""" # <owner, repo, index> 1942 GET_TIME = """/repos/%s/%s/issues/%s/times""" # <owner, repo, index> 1943 GET_COMMENTS = """/repos/{owner}/{repo}/issues/{index}/comments""" 1944 CREATE_ISSUE = """/repos/{owner}/{repo}/issues""" 1945 GET_ATTACHMENTS = """/repos/{owner}/{repo}/issues/{index}/assets""" 1946 1947 OPENED = "open" 1948 CLOSED = "closed" 1949 1950 def __init__(self, allspice_client): 1951 super().__init__(allspice_client) 1952 1953 def __eq__(self, other): 1954 if not isinstance(other, Issue): 1955 return False 1956 return self.repository == other.repository and self.id == other.id 1957 1958 def __hash__(self): 1959 return hash(self.repository) ^ hash(self.id) 1960 1961 _fields_to_parsers: ClassVar[dict] = { 1962 "milestone": lambda allspice_client, m: Milestone.parse_response(allspice_client, m), 1963 "user": lambda allspice_client, u: User.parse_response(allspice_client, u), 1964 "assets": lambda allspice_client, assets: [ 1965 Attachment.parse_response(allspice_client, a) for a in assets 1966 ], 1967 "assignee": lambda allspice_client, u: User.parse_response(allspice_client, u), 1968 "assignees": lambda allspice_client, us: [ 1969 User.parse_response(allspice_client, u) for u in us 1970 ], 1971 "state": lambda _, s: (Issue.CLOSED if s == "closed" else Issue.OPENED), 1972 } 1973 1974 _parsers_to_fields: ClassVar[dict] = { 1975 "milestone": lambda m: m.id, 1976 } 1977 1978 _patchable_fields: ClassVar[set[str]] = { 1979 "assignee", 1980 "assignees", 1981 "body", 1982 "due_date", 1983 "milestone", 1984 "state", 1985 "title", 1986 } 1987 1988 def commit(self): 1989 args = { 1990 "owner": self.repository.owner.username, 1991 "repo": self.repository.name, 1992 "index": self.number, 1993 } 1994 self._commit(args) 1995 1996 @classmethod 1997 def request(cls, allspice_client, owner: str, repo: str, number: str): 1998 api_object = cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number}) 1999 # The repository in the response is a RepositoryMeta object, so request 2000 # the full repository object and add it to the issue object. 2001 repository = Repository.request(allspice_client, owner, repo) 2002 setattr(api_object, "_repository", repository) 2003 # For legacy reasons 2004 cls._add_read_property("repo", repository, api_object) 2005 return api_object 2006 2007 @classmethod 2008 def create_issue(cls, allspice_client, repo: Repository, title: str, body: str = ""): 2009 args = {"owner": repo.owner.username, "repo": repo.name} 2010 data = {"title": title, "body": body} 2011 result = allspice_client.requests_post(Issue.CREATE_ISSUE.format(**args), data=data) 2012 issue = Issue.parse_response(allspice_client, result) 2013 setattr(issue, "_repository", repo) 2014 cls._add_read_property("repo", repo, issue) 2015 return issue 2016 2017 @property 2018 def owner(self) -> Organization | User: 2019 return self.repository.owner 2020 2021 def get_time_sum(self, user: User) -> int: 2022 results = self.allspice_client.requests_get( 2023 Issue.GET_TIME % (self.owner.username, self.repository.name, self.number) 2024 ) 2025 return sum(result["time"] for result in results if result and result["user_id"] == user.id) 2026 2027 def get_times(self) -> Optional[Dict]: 2028 return self.allspice_client.requests_get( 2029 Issue.GET_TIME % (self.owner.username, self.repository.name, self.number) 2030 ) 2031 2032 def delete_time(self, time_id: str): 2033 path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times/{time_id}" 2034 self.allspice_client.requests_delete(path) 2035 2036 def add_time(self, time: int, created: Optional[str] = None, user_name: Optional[User] = None): 2037 path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times" 2038 self.allspice_client.requests_post( 2039 path, data={"created": created, "time": int(time), "user_name": user_name} 2040 ) 2041 2042 def get_comments(self) -> List[Comment]: 2043 """https://hub.allspice.io/api/swagger#/issue/issueGetComments""" 2044 2045 results = self.allspice_client.requests_get( 2046 self.GET_COMMENTS.format( 2047 owner=self.owner.username, repo=self.repository.name, index=self.number 2048 ) 2049 ) 2050 2051 return [Comment.parse_response(self.allspice_client, result) for result in results] 2052 2053 def create_comment(self, body: str) -> Comment: 2054 """https://hub.allspice.io/api/swagger#/issue/issueCreateComment""" 2055 2056 path = self.GET_COMMENTS.format( 2057 owner=self.owner.username, repo=self.repository.name, index=self.number 2058 ) 2059 2060 response = self.allspice_client.requests_post(path, data={"body": body}) 2061 return Comment.parse_response(self.allspice_client, response) 2062 2063 def get_attachments(self) -> List[Attachment]: 2064 """ 2065 Fetch all attachments on this issue. 2066 2067 Unlike the assets field, this will always fetch all attachments from the 2068 API. 2069 2070 See https://hub.allspice.io/api/swagger#/issue/issueListIssueAttachments 2071 """ 2072 2073 path = self.GET_ATTACHMENTS.format( 2074 owner=self.owner.username, repo=self.repository.name, index=self.number 2075 ) 2076 response = self.allspice_client.requests_get(path) 2077 2078 return [Attachment.parse_response(self.allspice_client, result) for result in response] 2079 2080 def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment: 2081 """ 2082 Create an attachment on this issue. 2083 2084 https://hub.allspice.io/api/swagger#/issue/issueCreateIssueAttachment 2085 2086 :param file: The file to attach. This should be a file-like object. 2087 :param name: The name of the file. If not provided, the name of the file will be 2088 used. 2089 :return: The created attachment. 2090 """ 2091 2092 args: dict[str, Any] = { 2093 "files": {"attachment": file}, 2094 } 2095 if name is not None: 2096 args["params"] = {"name": name} 2097 2098 result = self.allspice_client.requests_post( 2099 self.GET_ATTACHMENTS.format( 2100 owner=self.owner.username, repo=self.repository.name, index=self.number 2101 ), 2102 **args, 2103 ) 2104 2105 return Attachment.parse_response(self.allspice_client, result)
An issue on a repository.
Note: Issue.assets
may not have any entries even if the issue has
attachments. This happens when an issue is fetched via a bulk method like
Repository.get_issues
. In most cases, prefer using
Issue.get_attachments
to get the attachments on an issue.
1996 @classmethod 1997 def request(cls, allspice_client, owner: str, repo: str, number: str): 1998 api_object = cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number}) 1999 # The repository in the response is a RepositoryMeta object, so request 2000 # the full repository object and add it to the issue object. 2001 repository = Repository.request(allspice_client, owner, repo) 2002 setattr(api_object, "_repository", repository) 2003 # For legacy reasons 2004 cls._add_read_property("repo", repository, api_object) 2005 return api_object
2007 @classmethod 2008 def create_issue(cls, allspice_client, repo: Repository, title: str, body: str = ""): 2009 args = {"owner": repo.owner.username, "repo": repo.name} 2010 data = {"title": title, "body": body} 2011 result = allspice_client.requests_post(Issue.CREATE_ISSUE.format(**args), data=data) 2012 issue = Issue.parse_response(allspice_client, result) 2013 setattr(issue, "_repository", repo) 2014 cls._add_read_property("repo", repo, issue) 2015 return issue
2036 def add_time(self, time: int, created: Optional[str] = None, user_name: Optional[User] = None): 2037 path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times" 2038 self.allspice_client.requests_post( 2039 path, data={"created": created, "time": int(time), "user_name": user_name} 2040 )
2042 def get_comments(self) -> List[Comment]: 2043 """https://hub.allspice.io/api/swagger#/issue/issueGetComments""" 2044 2045 results = self.allspice_client.requests_get( 2046 self.GET_COMMENTS.format( 2047 owner=self.owner.username, repo=self.repository.name, index=self.number 2048 ) 2049 ) 2050 2051 return [Comment.parse_response(self.allspice_client, result) for result in results]
allspice.allspice.io/api/swagger#/issue/issueGetComments">https://huballspice.allspice.io/api/swagger#/issue/issueGetComments
2053 def create_comment(self, body: str) -> Comment: 2054 """https://hub.allspice.io/api/swagger#/issue/issueCreateComment""" 2055 2056 path = self.GET_COMMENTS.format( 2057 owner=self.owner.username, repo=self.repository.name, index=self.number 2058 ) 2059 2060 response = self.allspice_client.requests_post(path, data={"body": body}) 2061 return Comment.parse_response(self.allspice_client, response)
allspice.allspice.io/api/swagger#/issue/issueCreateComment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateComment
2063 def get_attachments(self) -> List[Attachment]: 2064 """ 2065 Fetch all attachments on this issue. 2066 2067 Unlike the assets field, this will always fetch all attachments from the 2068 API. 2069 2070 See https://hub.allspice.io/api/swagger#/issue/issueListIssueAttachments 2071 """ 2072 2073 path = self.GET_ATTACHMENTS.format( 2074 owner=self.owner.username, repo=self.repository.name, index=self.number 2075 ) 2076 response = self.allspice_client.requests_get(path) 2077 2078 return [Attachment.parse_response(self.allspice_client, result) for result in response]
Fetch all attachments on this issue.
Unlike the assets field, this will always fetch all attachments from the API.
See allspice.allspice.io/api/swagger#/issue/issueListIssueAttachments">https://huballspice.allspice.io/api/swagger#/issue/issueListIssueAttachments
2080 def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment: 2081 """ 2082 Create an attachment on this issue. 2083 2084 https://hub.allspice.io/api/swagger#/issue/issueCreateIssueAttachment 2085 2086 :param file: The file to attach. This should be a file-like object. 2087 :param name: The name of the file. If not provided, the name of the file will be 2088 used. 2089 :return: The created attachment. 2090 """ 2091 2092 args: dict[str, Any] = { 2093 "files": {"attachment": file}, 2094 } 2095 if name is not None: 2096 args["params"] = {"name": name} 2097 2098 result = self.allspice_client.requests_post( 2099 self.GET_ATTACHMENTS.format( 2100 owner=self.owner.username, repo=self.repository.name, index=self.number 2101 ), 2102 **args, 2103 ) 2104 2105 return Attachment.parse_response(self.allspice_client, result)
Create an attachment on this issue.
allspice.allspice.io/api/swagger#/issue/issueCreateIssueAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateIssueAttachment
Parameters
- file: The file to attach. This should be a file-like object.
- name: The name of the file. If not provided, the name of the file will be used.
Returns
The created attachment.
2108class DesignReview(ApiObject): 2109 """ 2110 A Design Review. See 2111 https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest. 2112 2113 Note: The base and head fields are not `Branch` objects - they are plain strings 2114 referring to the branch names. This is because DRs can exist for branches that have 2115 been deleted, which don't have an associated `Branch` object from the API. You can use 2116 the `Repository.get_branch` method to get a `Branch` object for a branch if you know 2117 it exists. 2118 """ 2119 2120 additions: int 2121 allow_maintainer_edit: bool 2122 allow_maintainer_edits: Any 2123 assignee: User 2124 assignees: List["User"] 2125 base: str 2126 body: str 2127 changed_files: int 2128 closed_at: Optional[str] 2129 comments: int 2130 created_at: str 2131 deletions: int 2132 diff_url: str 2133 draft: bool 2134 due_date: Optional[str] 2135 head: str 2136 html_url: str 2137 id: int 2138 is_locked: bool 2139 labels: List[Any] 2140 merge_base: str 2141 merge_commit_sha: Optional[str] 2142 mergeable: bool 2143 merged: bool 2144 merged_at: Optional[str] 2145 merged_by: Optional["User"] 2146 milestone: Any 2147 number: int 2148 patch_url: str 2149 pin_order: int 2150 repository: Optional["Repository"] 2151 requested_reviewers: Any 2152 review_comments: int 2153 state: str 2154 title: str 2155 updated_at: str 2156 url: str 2157 user: User 2158 2159 API_OBJECT = "/repos/{owner}/{repo}/pulls/{index}" 2160 MERGE_DESIGN_REVIEW = "/repos/{owner}/{repo}/pulls/{index}/merge" 2161 GET_COMMENTS = "/repos/{owner}/{repo}/issues/{index}/comments" 2162 2163 OPEN = "open" 2164 CLOSED = "closed" 2165 2166 class MergeType(Enum): 2167 MERGE = "merge" 2168 REBASE = "rebase" 2169 REBASE_MERGE = "rebase-merge" 2170 SQUASH = "squash" 2171 MANUALLY_MERGED = "manually-merged" 2172 2173 def __init__(self, allspice_client): 2174 super().__init__(allspice_client) 2175 2176 def __eq__(self, other): 2177 if not isinstance(other, DesignReview): 2178 return False 2179 return self.repository == other.repository and self.id == other.id 2180 2181 def __hash__(self): 2182 return hash(self.repository) ^ hash(self.id) 2183 2184 @classmethod 2185 def parse_response(cls, allspice_client, result) -> "DesignReview": 2186 api_object = super().parse_response(allspice_client, result) 2187 cls._add_read_property( 2188 "repository", 2189 Repository.parse_response(allspice_client, result["base"]["repo"]), 2190 api_object, 2191 ) 2192 2193 return api_object 2194 2195 @classmethod 2196 def request(cls, allspice_client, owner: str, repo: str, number: str): 2197 """See https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest""" 2198 return cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number}) 2199 2200 _fields_to_parsers: ClassVar[dict] = { 2201 "assignee": lambda allspice_client, u: User.parse_response(allspice_client, u), 2202 "assignees": lambda allspice_client, us: [ 2203 User.parse_response(allspice_client, u) for u in us 2204 ], 2205 "base": lambda _, b: b["ref"], 2206 "head": lambda _, h: h["ref"], 2207 "merged_by": lambda allspice_client, u: User.parse_response(allspice_client, u), 2208 "milestone": lambda allspice_client, m: Milestone.parse_response(allspice_client, m), 2209 "user": lambda allspice_client, u: User.parse_response(allspice_client, u), 2210 } 2211 2212 _patchable_fields: ClassVar[set[str]] = { 2213 "allow_maintainer_edits", 2214 "assignee", 2215 "assignees", 2216 "base", 2217 "body", 2218 "due_date", 2219 "milestone", 2220 "state", 2221 "title", 2222 } 2223 2224 _parsers_to_fields: ClassVar[dict] = { 2225 "assignee": lambda u: u.username, 2226 "assignees": lambda us: [u.username for u in us], 2227 "base": lambda b: b.name if isinstance(b, Branch) else b, 2228 "milestone": lambda m: m.id, 2229 } 2230 2231 def commit(self): 2232 data = self.get_dirty_fields() 2233 if "due_date" in data and data["due_date"] is None: 2234 data["unset_due_date"] = True 2235 2236 args = { 2237 "owner": self.repository.owner.username, 2238 "repo": self.repository.name, 2239 "index": self.number, 2240 } 2241 self._commit(args, data) 2242 2243 def merge(self, merge_type: MergeType): 2244 """ 2245 Merge the pull request. See 2246 https://hub.allspice.io/api/swagger#/repository/repoMergePullRequest 2247 2248 :param merge_type: The type of merge to perform. See the MergeType enum. 2249 """ 2250 2251 self.allspice_client.requests_post( 2252 self.MERGE_DESIGN_REVIEW.format( 2253 owner=self.repository.owner.username, 2254 repo=self.repository.name, 2255 index=self.number, 2256 ), 2257 data={"Do": merge_type.value}, 2258 ) 2259 2260 def get_comments(self) -> List[Comment]: 2261 """ 2262 Get the comments on this pull request, but not specifically on a review. 2263 2264 https://hub.allspice.io/api/swagger#/issue/issueGetComments 2265 2266 :return: A list of comments on this pull request. 2267 """ 2268 2269 results = self.allspice_client.requests_get( 2270 self.GET_COMMENTS.format( 2271 owner=self.repository.owner.username, 2272 repo=self.repository.name, 2273 index=self.number, 2274 ) 2275 ) 2276 return [Comment.parse_response(self.allspice_client, result) for result in results] 2277 2278 def create_comment(self, body: str): 2279 """ 2280 Create a comment on this pull request. This uses the same endpoint as the 2281 comments on issues, and will not be associated with any reviews. 2282 2283 https://hub.allspice.io/api/swagger#/issue/issueCreateComment 2284 2285 :param body: The body of the comment. 2286 :return: The comment that was created. 2287 """ 2288 2289 result = self.allspice_client.requests_post( 2290 self.GET_COMMENTS.format( 2291 owner=self.repository.owner.username, 2292 repo=self.repository.name, 2293 index=self.number, 2294 ), 2295 data={"body": body}, 2296 ) 2297 return Comment.parse_response(self.allspice_client, result)
A Design Review. See allspice.allspice.io/api/swagger#/repository/repoGetPullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoGetPullRequest.
Note: The base and head fields are not Branch
objects - they are plain strings
referring to the branch names. This is because DRs can exist for branches that have
been deleted, which don't have an associated Branch
object from the API. You can use
the Repository.get_branch
method to get a Branch
object for a branch if you know
it exists.
2184 @classmethod 2185 def parse_response(cls, allspice_client, result) -> "DesignReview": 2186 api_object = super().parse_response(allspice_client, result) 2187 cls._add_read_property( 2188 "repository", 2189 Repository.parse_response(allspice_client, result["base"]["repo"]), 2190 api_object, 2191 ) 2192 2193 return api_object
2195 @classmethod 2196 def request(cls, allspice_client, owner: str, repo: str, number: str): 2197 """See https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest""" 2198 return cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number})
See allspice.allspice.io/api/swagger#/repository/repoGetPullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoGetPullRequest
2231 def commit(self): 2232 data = self.get_dirty_fields() 2233 if "due_date" in data and data["due_date"] is None: 2234 data["unset_due_date"] = True 2235 2236 args = { 2237 "owner": self.repository.owner.username, 2238 "repo": self.repository.name, 2239 "index": self.number, 2240 } 2241 self._commit(args, data)
2243 def merge(self, merge_type: MergeType): 2244 """ 2245 Merge the pull request. See 2246 https://hub.allspice.io/api/swagger#/repository/repoMergePullRequest 2247 2248 :param merge_type: The type of merge to perform. See the MergeType enum. 2249 """ 2250 2251 self.allspice_client.requests_post( 2252 self.MERGE_DESIGN_REVIEW.format( 2253 owner=self.repository.owner.username, 2254 repo=self.repository.name, 2255 index=self.number, 2256 ), 2257 data={"Do": merge_type.value}, 2258 )
Merge the pull request. See allspice.allspice.io/api/swagger#/repository/repoMergePullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoMergePullRequest
Parameters
- merge_type: The type of merge to perform. See the MergeType enum.
2260 def get_comments(self) -> List[Comment]: 2261 """ 2262 Get the comments on this pull request, but not specifically on a review. 2263 2264 https://hub.allspice.io/api/swagger#/issue/issueGetComments 2265 2266 :return: A list of comments on this pull request. 2267 """ 2268 2269 results = self.allspice_client.requests_get( 2270 self.GET_COMMENTS.format( 2271 owner=self.repository.owner.username, 2272 repo=self.repository.name, 2273 index=self.number, 2274 ) 2275 ) 2276 return [Comment.parse_response(self.allspice_client, result) for result in results]
Get the comments on this pull request, but not specifically on a review.
allspice.allspice.io/api/swagger#/issue/issueGetComments">https://huballspice.allspice.io/api/swagger#/issue/issueGetComments
Returns
A list of comments on this pull request.
2278 def create_comment(self, body: str): 2279 """ 2280 Create a comment on this pull request. This uses the same endpoint as the 2281 comments on issues, and will not be associated with any reviews. 2282 2283 https://hub.allspice.io/api/swagger#/issue/issueCreateComment 2284 2285 :param body: The body of the comment. 2286 :return: The comment that was created. 2287 """ 2288 2289 result = self.allspice_client.requests_post( 2290 self.GET_COMMENTS.format( 2291 owner=self.repository.owner.username, 2292 repo=self.repository.name, 2293 index=self.number, 2294 ), 2295 data={"body": body}, 2296 ) 2297 return Comment.parse_response(self.allspice_client, result)
Create a comment on this pull request. This uses the same endpoint as the comments on issues, and will not be associated with any reviews.
allspice.allspice.io/api/swagger#/issue/issueCreateComment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateComment
Parameters
- body: The body of the comment.
Returns
The comment that was created.
2300class Team(ApiObject): 2301 can_create_org_repo: bool 2302 description: str 2303 id: int 2304 includes_all_repositories: bool 2305 name: str 2306 organization: Optional["Organization"] 2307 permission: str 2308 units: List[str] 2309 units_map: Dict[str, str] 2310 2311 API_OBJECT = """/teams/{id}""" # <id> 2312 ADD_REPO = """/teams/%s/repos/%s/%s""" # <id, org, repo> 2313 TEAM_DELETE = """/teams/%s""" # <id> 2314 GET_MEMBERS = """/teams/%s/members""" # <id> 2315 GET_REPOS = """/teams/%s/repos""" # <id> 2316 2317 def __init__(self, allspice_client): 2318 super().__init__(allspice_client) 2319 2320 def __eq__(self, other): 2321 if not isinstance(other, Team): 2322 return False 2323 return self.organization == other.organization and self.id == other.id 2324 2325 def __hash__(self): 2326 return hash(self.organization) ^ hash(self.id) 2327 2328 _fields_to_parsers: ClassVar[dict] = { 2329 "organization": lambda allspice_client, o: Organization.parse_response(allspice_client, o) 2330 } 2331 2332 _patchable_fields: ClassVar[set[str]] = { 2333 "can_create_org_repo", 2334 "description", 2335 "includes_all_repositories", 2336 "name", 2337 "permission", 2338 "units", 2339 "units_map", 2340 } 2341 2342 @classmethod 2343 def request(cls, allspice_client, id: int): 2344 return cls._request(allspice_client, {"id": id}) 2345 2346 def commit(self): 2347 args = {"id": self.id} 2348 self._commit(args) 2349 2350 def add_user(self, user: User): 2351 """https://hub.allspice.io/api/swagger#/organization/orgAddTeamMember""" 2352 url = f"/teams/{self.id}/members/{user.login}" 2353 self.allspice_client.requests_put(url) 2354 2355 def add_repo(self, org: Organization, repo: Union[Repository, str]): 2356 if isinstance(repo, Repository): 2357 repo_name = repo.name 2358 else: 2359 repo_name = repo 2360 self.allspice_client.requests_put(Team.ADD_REPO % (self.id, org.username, repo_name)) 2361 2362 def get_members(self): 2363 """Get all users assigned to the team.""" 2364 results = self.allspice_client.requests_get_paginated( 2365 Team.GET_MEMBERS % self.id, 2366 ) 2367 return [User.parse_response(self.allspice_client, result) for result in results] 2368 2369 def get_repos(self): 2370 """Get all repos of this Team.""" 2371 results = self.allspice_client.requests_get(Team.GET_REPOS % self.id) 2372 return [Repository.parse_response(self.allspice_client, result) for result in results] 2373 2374 def delete(self): 2375 self.allspice_client.requests_delete(Team.TEAM_DELETE % self.id) 2376 self.deleted = True 2377 2378 def remove_team_member(self, user_name: str): 2379 url = f"/teams/{self.id}/members/{user_name}" 2380 self.allspice_client.requests_delete(url)
2350 def add_user(self, user: User): 2351 """https://hub.allspice.io/api/swagger#/organization/orgAddTeamMember""" 2352 url = f"/teams/{self.id}/members/{user.login}" 2353 self.allspice_client.requests_put(url)
allspice.allspice.io/api/swagger#/organization/orgAddTeamMember">https://huballspice.allspice.io/api/swagger#/organization/orgAddTeamMember
2362 def get_members(self): 2363 """Get all users assigned to the team.""" 2364 results = self.allspice_client.requests_get_paginated( 2365 Team.GET_MEMBERS % self.id, 2366 ) 2367 return [User.parse_response(self.allspice_client, result) for result in results]
Get all users assigned to the team.
2369 def get_repos(self): 2370 """Get all repos of this Team.""" 2371 results = self.allspice_client.requests_get(Team.GET_REPOS % self.id) 2372 return [Repository.parse_response(self.allspice_client, result) for result in results]
Get all repos of this Team.
2383class Release(ApiObject): 2384 """ 2385 A release on a repo. 2386 """ 2387 2388 assets: List[Union[Any, Dict[str, Union[int, str]], "ReleaseAsset"]] 2389 author: User 2390 body: str 2391 created_at: str 2392 draft: bool 2393 html_url: str 2394 id: int 2395 name: str 2396 prerelease: bool 2397 published_at: str 2398 repo: Optional["Repository"] 2399 repository: Optional["Repository"] 2400 tag_name: str 2401 tarball_url: str 2402 target_commitish: str 2403 upload_url: str 2404 url: str 2405 zipball_url: str 2406 2407 API_OBJECT = "/repos/{owner}/{repo}/releases/{id}" 2408 RELEASE_CREATE_ASSET = "/repos/{owner}/{repo}/releases/{id}/assets" 2409 # Note that we don't strictly need the get_assets route, as the release 2410 # object already contains the assets. 2411 2412 def __init__(self, allspice_client): 2413 super().__init__(allspice_client) 2414 2415 def __eq__(self, other): 2416 if not isinstance(other, Release): 2417 return False 2418 return self.repo == other.repo and self.id == other.id 2419 2420 def __hash__(self): 2421 return hash(self.repo) ^ hash(self.id) 2422 2423 _fields_to_parsers: ClassVar[dict] = { 2424 "author": lambda allspice_client, author: User.parse_response(allspice_client, author), 2425 } 2426 _patchable_fields: ClassVar[set[str]] = { 2427 "body", 2428 "draft", 2429 "name", 2430 "prerelease", 2431 "tag_name", 2432 "target_commitish", 2433 } 2434 2435 @classmethod 2436 def parse_response(cls, allspice_client, result, repo) -> Release: 2437 release = super().parse_response(allspice_client, result) 2438 Release._add_read_property("repository", repo, release) 2439 # For legacy reasons 2440 Release._add_read_property("repo", repo, release) 2441 setattr( 2442 release, 2443 "_assets", 2444 [ 2445 ReleaseAsset.parse_response(allspice_client, asset, release) 2446 for asset in result["assets"] 2447 ], 2448 ) 2449 return release 2450 2451 @classmethod 2452 def request( 2453 cls, 2454 allspice_client, 2455 owner: str, 2456 repo: str, 2457 id: Optional[int] = None, 2458 ) -> Release: 2459 args = {"owner": owner, "repo": repo, "id": id} 2460 release_response = cls._get_gitea_api_object(allspice_client, args) 2461 repository = Repository.request(allspice_client, owner, repo) 2462 release = cls.parse_response(allspice_client, release_response, repository) 2463 return release 2464 2465 def commit(self): 2466 if self.repo is None: 2467 raise ValueError("Cannot commit a release without a repository.") 2468 2469 args = {"owner": self.repo.owner.username, "repo": self.repo.name, "id": self.id} 2470 self._commit(args) 2471 2472 def create_asset(self, file: IO, name: Optional[str] = None) -> ReleaseAsset: 2473 """ 2474 Create an asset for this release. 2475 2476 https://hub.allspice.io/api/swagger#/repository/repoCreateReleaseAsset 2477 2478 :param file: The file to upload. This should be a file-like object. 2479 :param name: The name of the file. 2480 :return: The created asset. 2481 """ 2482 2483 if self.repo is None: 2484 raise ValueError("Cannot commit a release without a repository.") 2485 2486 args: dict[str, Any] = {"files": {"attachment": file}} 2487 if name is not None: 2488 args["params"] = {"name": name} 2489 2490 result = self.allspice_client.requests_post( 2491 self.RELEASE_CREATE_ASSET.format( 2492 owner=self.repo.owner.username, 2493 repo=self.repo.name, 2494 id=self.id, 2495 ), 2496 **args, 2497 ) 2498 return ReleaseAsset.parse_response(self.allspice_client, result, self) 2499 2500 def delete(self): 2501 if self.repo is None: 2502 raise ValueError("Cannot commit a release without a repository.") 2503 2504 args = {"owner": self.repo.owner.username, "repo": self.repo.name, "id": self.id} 2505 self.allspice_client.requests_delete(self.API_OBJECT.format(**args)) 2506 self.deleted = True
A release on a repo.
2435 @classmethod 2436 def parse_response(cls, allspice_client, result, repo) -> Release: 2437 release = super().parse_response(allspice_client, result) 2438 Release._add_read_property("repository", repo, release) 2439 # For legacy reasons 2440 Release._add_read_property("repo", repo, release) 2441 setattr( 2442 release, 2443 "_assets", 2444 [ 2445 ReleaseAsset.parse_response(allspice_client, asset, release) 2446 for asset in result["assets"] 2447 ], 2448 ) 2449 return release
2451 @classmethod 2452 def request( 2453 cls, 2454 allspice_client, 2455 owner: str, 2456 repo: str, 2457 id: Optional[int] = None, 2458 ) -> Release: 2459 args = {"owner": owner, "repo": repo, "id": id} 2460 release_response = cls._get_gitea_api_object(allspice_client, args) 2461 repository = Repository.request(allspice_client, owner, repo) 2462 release = cls.parse_response(allspice_client, release_response, repository) 2463 return release
2472 def create_asset(self, file: IO, name: Optional[str] = None) -> ReleaseAsset: 2473 """ 2474 Create an asset for this release. 2475 2476 https://hub.allspice.io/api/swagger#/repository/repoCreateReleaseAsset 2477 2478 :param file: The file to upload. This should be a file-like object. 2479 :param name: The name of the file. 2480 :return: The created asset. 2481 """ 2482 2483 if self.repo is None: 2484 raise ValueError("Cannot commit a release without a repository.") 2485 2486 args: dict[str, Any] = {"files": {"attachment": file}} 2487 if name is not None: 2488 args["params"] = {"name": name} 2489 2490 result = self.allspice_client.requests_post( 2491 self.RELEASE_CREATE_ASSET.format( 2492 owner=self.repo.owner.username, 2493 repo=self.repo.name, 2494 id=self.id, 2495 ), 2496 **args, 2497 ) 2498 return ReleaseAsset.parse_response(self.allspice_client, result, self)
Create an asset for this release.
allspice.allspice.io/api/swagger#/repository/repoCreateReleaseAsset">https://huballspice.allspice.io/api/swagger#/repository/repoCreateReleaseAsset
Parameters
- file: The file to upload. This should be a file-like object.
- name: The name of the file.
Returns
The created asset.
2500 def delete(self): 2501 if self.repo is None: 2502 raise ValueError("Cannot commit a release without a repository.") 2503 2504 args = {"owner": self.repo.owner.username, "repo": self.repo.name, "id": self.id} 2505 self.allspice_client.requests_delete(self.API_OBJECT.format(**args)) 2506 self.deleted = True
2509class ReleaseAsset(ApiObject): 2510 browser_download_url: str 2511 created_at: str 2512 download_count: int 2513 id: int 2514 name: str 2515 release: Optional["Release"] 2516 size: int 2517 uuid: str 2518 2519 API_OBJECT = "/repos/{owner}/{repo}/releases/{release_id}/assets/{id}" 2520 2521 def __init__(self, allspice_client): 2522 super().__init__(allspice_client) 2523 2524 def __eq__(self, other): 2525 if not isinstance(other, ReleaseAsset): 2526 return False 2527 return self.release == other.release and self.id == other.id 2528 2529 def __hash__(self): 2530 return hash(self.release) ^ hash(self.id) 2531 2532 _fields_to_parsers: ClassVar[dict] = {} 2533 _patchable_fields: ClassVar[set[str]] = { 2534 "name", 2535 } 2536 2537 @classmethod 2538 def parse_response(cls, allspice_client, result, release) -> ReleaseAsset: 2539 asset = super().parse_response(allspice_client, result) 2540 ReleaseAsset._add_read_property("release", release, asset) 2541 return asset 2542 2543 @classmethod 2544 def request( 2545 cls, 2546 allspice_client, 2547 owner: str, 2548 repo: str, 2549 release_id: int, 2550 id: int, 2551 ) -> ReleaseAsset: 2552 args = {"owner": owner, "repo": repo, "release_id": release_id, "id": id} 2553 asset_response = cls._get_gitea_api_object(allspice_client, args) 2554 release = Release.request(allspice_client, owner, repo, release_id) 2555 asset = cls.parse_response(allspice_client, asset_response, release) 2556 return asset 2557 2558 def commit(self): 2559 if self.release is None or self.release.repo is None: 2560 raise ValueError("Cannot commit a release asset without a release or a repository.") 2561 2562 args = { 2563 "owner": self.release.repo.owner.username, 2564 "repo": self.release.repo.name, 2565 "release_id": self.release.id, 2566 "id": self.id, 2567 } 2568 self._commit(args) 2569 2570 def download(self) -> bytes: 2571 """ 2572 Download the raw, binary data of this asset. 2573 2574 Note 1: if the file you are requesting is a text file, you might want to 2575 use .decode() on the result to get a string. For example: 2576 2577 asset.download().decode("utf-8") 2578 2579 Note 2: this method will store the entire file in memory. If you are 2580 downloading a large file, you might want to use download_to_file instead. 2581 """ 2582 2583 return self.allspice_client.requests.get( 2584 self.browser_download_url, 2585 headers=self.allspice_client.headers, 2586 ).content 2587 2588 def download_to_file(self, io: IO): 2589 """ 2590 Download the raw, binary data of this asset to a file-like object. 2591 2592 Example: 2593 2594 with open("my_file.zip", "wb") as f: 2595 asset.download_to_file(f) 2596 2597 :param io: The file-like object to write the data to. 2598 """ 2599 2600 response = self.allspice_client.requests.get( 2601 self.browser_download_url, 2602 headers=self.allspice_client.headers, 2603 stream=True, 2604 ) 2605 # 4kb chunks 2606 for chunk in response.iter_content(chunk_size=4096): 2607 if chunk: 2608 io.write(chunk) 2609 2610 def delete(self): 2611 if self.release is None or self.release.repo is None: 2612 raise ValueError("Cannot commit a release asset without a release or a repository.") 2613 2614 args = { 2615 "owner": self.release.repo.owner.username, 2616 "repo": self.release.repo.name, 2617 "release_id": self.release.id, 2618 "id": self.id, 2619 } 2620 self.allspice_client.requests_delete(self.API_OBJECT.format(**args)) 2621 self.deleted = True
2543 @classmethod 2544 def request( 2545 cls, 2546 allspice_client, 2547 owner: str, 2548 repo: str, 2549 release_id: int, 2550 id: int, 2551 ) -> ReleaseAsset: 2552 args = {"owner": owner, "repo": repo, "release_id": release_id, "id": id} 2553 asset_response = cls._get_gitea_api_object(allspice_client, args) 2554 release = Release.request(allspice_client, owner, repo, release_id) 2555 asset = cls.parse_response(allspice_client, asset_response, release) 2556 return asset
2558 def commit(self): 2559 if self.release is None or self.release.repo is None: 2560 raise ValueError("Cannot commit a release asset without a release or a repository.") 2561 2562 args = { 2563 "owner": self.release.repo.owner.username, 2564 "repo": self.release.repo.name, 2565 "release_id": self.release.id, 2566 "id": self.id, 2567 } 2568 self._commit(args)
2570 def download(self) -> bytes: 2571 """ 2572 Download the raw, binary data of this asset. 2573 2574 Note 1: if the file you are requesting is a text file, you might want to 2575 use .decode() on the result to get a string. For example: 2576 2577 asset.download().decode("utf-8") 2578 2579 Note 2: this method will store the entire file in memory. If you are 2580 downloading a large file, you might want to use download_to_file instead. 2581 """ 2582 2583 return self.allspice_client.requests.get( 2584 self.browser_download_url, 2585 headers=self.allspice_client.headers, 2586 ).content
Download the raw, binary data of this asset.
Note 1: if the file you are requesting is a text file, you might want to use .decode() on the result to get a string. For example:
asset.download().decode("utf-8")
Note 2: this method will store the entire file in memory. If you are downloading a large file, you might want to use download_to_file instead.
2588 def download_to_file(self, io: IO): 2589 """ 2590 Download the raw, binary data of this asset to a file-like object. 2591 2592 Example: 2593 2594 with open("my_file.zip", "wb") as f: 2595 asset.download_to_file(f) 2596 2597 :param io: The file-like object to write the data to. 2598 """ 2599 2600 response = self.allspice_client.requests.get( 2601 self.browser_download_url, 2602 headers=self.allspice_client.headers, 2603 stream=True, 2604 ) 2605 # 4kb chunks 2606 for chunk in response.iter_content(chunk_size=4096): 2607 if chunk: 2608 io.write(chunk)
Download the raw, binary data of this asset to a file-like object.
Example:
with open("my_file.zip", "wb") as f:
asset.download_to_file(f)
Parameters
- io: The file-like object to write the data to.
2610 def delete(self): 2611 if self.release is None or self.release.repo is None: 2612 raise ValueError("Cannot commit a release asset without a release or a repository.") 2613 2614 args = { 2615 "owner": self.release.repo.owner.username, 2616 "repo": self.release.repo.name, 2617 "release_id": self.release.id, 2618 "id": self.id, 2619 } 2620 self.allspice_client.requests_delete(self.API_OBJECT.format(**args)) 2621 self.deleted = True
2624class Content(ReadonlyApiObject): 2625 content: Any 2626 download_url: str 2627 encoding: Any 2628 git_url: str 2629 html_url: str 2630 last_commit_sha: str 2631 name: str 2632 path: str 2633 sha: str 2634 size: int 2635 submodule_git_url: Any 2636 target: Any 2637 type: str 2638 url: str 2639 2640 FILE = "file" 2641 2642 def __init__(self, allspice_client): 2643 super().__init__(allspice_client) 2644 2645 def __eq__(self, other): 2646 if not isinstance(other, Content): 2647 return False 2648 2649 return self.sha == other.sha and self.name == other.name 2650 2651 def __hash__(self): 2652 return hash(self.sha) ^ hash(self.name)
Inherited Members
2658class Util: 2659 @staticmethod 2660 def convert_time(time: str) -> datetime: 2661 """Parsing of strange Gitea time format ("%Y-%m-%dT%H:%M:%S:%z" but with ":" in time zone notation)""" 2662 try: 2663 return datetime.strptime(time[:-3] + "00", "%Y-%m-%dT%H:%M:%S%z") 2664 except ValueError: 2665 return datetime.strptime(time[:-3] + "00", "%Y-%m-%dT%H:%M:%S") 2666 2667 @staticmethod 2668 def format_time(time: datetime) -> str: 2669 """ 2670 Format a datetime object to Gitea's time format. 2671 2672 :param time: The time to format 2673 :return: Formatted time 2674 """ 2675 2676 return time.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S") + "Z" 2677 2678 @staticmethod 2679 def data_params_for_ref(ref: Optional[Ref]) -> Dict: 2680 """ 2681 Given a "ref", returns a dict with the ref parameter for the API call. 2682 2683 If the ref is None, returns an empty dict. You can pass this to the API 2684 directly. 2685 """ 2686 2687 if isinstance(ref, Branch): 2688 return {"ref": ref.name} 2689 elif isinstance(ref, Commit): 2690 return {"ref": ref.sha} 2691 elif ref: 2692 return {"ref": ref} 2693 else: 2694 return {}
2659 @staticmethod 2660 def convert_time(time: str) -> datetime: 2661 """Parsing of strange Gitea time format ("%Y-%m-%dT%H:%M:%S:%z" but with ":" in time zone notation)""" 2662 try: 2663 return datetime.strptime(time[:-3] + "00", "%Y-%m-%dT%H:%M:%S%z") 2664 except ValueError: 2665 return datetime.strptime(time[:-3] + "00", "%Y-%m-%dT%H:%M:%S")
Parsing of strange Gitea time format ("%Y-%m-%dT%H:%M:%S:%z" but with ":" in time zone notation)
2667 @staticmethod 2668 def format_time(time: datetime) -> str: 2669 """ 2670 Format a datetime object to Gitea's time format. 2671 2672 :param time: The time to format 2673 :return: Formatted time 2674 """ 2675 2676 return time.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S") + "Z"
Format a datetime object to Gitea's time format.
Parameters
- time: The time to format
Returns
Formatted time
2678 @staticmethod 2679 def data_params_for_ref(ref: Optional[Ref]) -> Dict: 2680 """ 2681 Given a "ref", returns a dict with the ref parameter for the API call. 2682 2683 If the ref is None, returns an empty dict. You can pass this to the API 2684 directly. 2685 """ 2686 2687 if isinstance(ref, Branch): 2688 return {"ref": ref.name} 2689 elif isinstance(ref, Commit): 2690 return {"ref": ref.sha} 2691 elif ref: 2692 return {"ref": ref} 2693 else: 2694 return {}
Given a "ref", returns a dict with the ref parameter for the API call.
If the ref is None, returns an empty dict. You can pass this to the API directly.