allspice.apiobject
1# cSpell:words ECAD 2 3from __future__ import annotations 4 5import logging 6import re 7from dataclasses import asdict, dataclass 8from datetime import datetime, timezone 9from enum import Enum 10from functools import cached_property 11from typing import ( 12 IO, 13 Any, 14 ClassVar, 15 Dict, 16 FrozenSet, 17 List, 18 Literal, 19 Optional, 20 Sequence, 21 Set, 22 Tuple, 23 Union, 24) 25 26try: 27 from typing_extensions import Self 28except ImportError: 29 from typing import Self 30 31from .baseapiobject import ApiObject, ReadonlyApiObject 32from .exceptions import ConflictException, NotFoundException 33 34 35class Organization(ApiObject): 36 """see https://hub.allspice.io/api/swagger#/organization/orgGetAll""" 37 38 active: Optional[bool] 39 avatar_url: str 40 created: Optional[str] 41 description: str 42 email: str 43 followers_count: Optional[int] 44 following_count: Optional[int] 45 full_name: str 46 html_url: Optional[str] 47 id: int 48 is_admin: Optional[bool] 49 language: Optional[str] 50 last_login: Optional[str] 51 location: str 52 login: Optional[str] 53 login_name: Optional[str] 54 name: Optional[str] 55 prohibit_login: Optional[bool] 56 repo_admin_change_team_access: Optional[bool] 57 restricted: Optional[bool] 58 source_id: Optional[int] 59 starred_repos_count: Optional[int] 60 username: str 61 visibility: str 62 website: str 63 64 API_OBJECT = """/orgs/{name}""" # <org> 65 ORG_REPOS_REQUEST = """/orgs/%s/repos""" # <org> 66 ORG_TEAMS_REQUEST = """/orgs/%s/teams""" # <org> 67 ORG_TEAMS_CREATE = """/orgs/%s/teams""" # <org> 68 ORG_GET_MEMBERS = """/orgs/%s/members""" # <org> 69 ORG_IS_MEMBER = """/orgs/%s/members/%s""" # <org>, <username> 70 ORG_HEATMAP = """/users/%s/heatmap""" # <username> 71 72 def __init__(self, allspice_client): 73 super().__init__(allspice_client) 74 75 def __eq__(self, other): 76 if not isinstance(other, Organization): 77 return False 78 return self.allspice_client == other.allspice_client and self.name == other.name 79 80 def __hash__(self): 81 return hash(self.allspice_client) ^ hash(self.name) 82 83 @classmethod 84 def request(cls, allspice_client, name: str) -> Self: 85 return cls._request(allspice_client, {"name": name}) 86 87 @classmethod 88 def parse_response(cls, allspice_client, result) -> "Organization": 89 api_object = super().parse_response(allspice_client, result) 90 # add "name" field to make this behave similar to users for gitea < 1.18 91 # also necessary for repository-owner when org is repo owner 92 if not hasattr(api_object, "name"): 93 Organization._add_read_property("name", result["username"], api_object) 94 return api_object 95 96 _patchable_fields: ClassVar[set[str]] = { 97 "description", 98 "full_name", 99 "location", 100 "visibility", 101 "website", 102 } 103 104 def commit(self): 105 args = {"name": self.name} 106 self._commit(args) 107 108 def create_repo( 109 self, 110 repoName: str, 111 description: str = "", 112 private: bool = False, 113 autoInit=True, 114 gitignores: Optional[str] = None, 115 license: Optional[str] = None, 116 readme: str = "Default", 117 issue_labels: Optional[str] = None, 118 default_branch="master", 119 ): 120 """Create an organization Repository 121 122 Throws: 123 AlreadyExistsException: If the Repository exists already. 124 Exception: If something else went wrong. 125 """ 126 result = self.allspice_client.requests_post( 127 f"/orgs/{self.name}/repos", 128 data={ 129 "name": repoName, 130 "description": description, 131 "private": private, 132 "auto_init": autoInit, 133 "gitignores": gitignores, 134 "license": license, 135 "issue_labels": issue_labels, 136 "readme": readme, 137 "default_branch": default_branch, 138 }, 139 ) 140 if "id" in result: 141 self.allspice_client.logger.info("Successfully created Repository %s " % result["name"]) 142 else: 143 self.allspice_client.logger.error(result["message"]) 144 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 145 return Repository.parse_response(self.allspice_client, result) 146 147 def get_repositories(self) -> List["Repository"]: 148 results = self.allspice_client.requests_get_paginated( 149 Organization.ORG_REPOS_REQUEST % self.username 150 ) 151 return [Repository.parse_response(self.allspice_client, result) for result in results] 152 153 def get_repository(self, name) -> "Repository": 154 repos = self.get_repositories() 155 for repo in repos: 156 if repo.name == name: 157 return repo 158 raise NotFoundException("Repository %s not existent in organization." % name) 159 160 def get_teams(self) -> List["Team"]: 161 results = self.allspice_client.requests_get(Organization.ORG_TEAMS_REQUEST % self.username) 162 teams = [Team.parse_response(self.allspice_client, result) for result in results] 163 # organisation seems to be missing using this request, so we add org manually 164 for t in teams: 165 setattr(t, "_organization", self) 166 return teams 167 168 def get_team(self, name) -> "Team": 169 teams = self.get_teams() 170 for team in teams: 171 if team.name == name: 172 return team 173 raise NotFoundException("Team not existent in organization.") 174 175 def create_team( 176 self, 177 name: str, 178 description: str = "", 179 permission: str = "read", 180 can_create_org_repo: bool = False, 181 includes_all_repositories: bool = False, 182 units=( 183 "repo.code", 184 "repo.issues", 185 "repo.ext_issues", 186 "repo.wiki", 187 "repo.pulls", 188 "repo.releases", 189 "repo.ext_wiki", 190 ), 191 units_map={}, 192 ) -> "Team": 193 """Alias for AllSpice#create_team""" 194 # TODO: Move AllSpice#create_team to Organization#create_team and 195 # deprecate AllSpice#create_team. 196 return self.allspice_client.create_team( 197 org=self, 198 name=name, 199 description=description, 200 permission=permission, 201 can_create_org_repo=can_create_org_repo, 202 includes_all_repositories=includes_all_repositories, 203 units=units, 204 units_map=units_map, 205 ) 206 207 def get_members(self) -> List["User"]: 208 results = self.allspice_client.requests_get(Organization.ORG_GET_MEMBERS % self.username) 209 return [User.parse_response(self.allspice_client, result) for result in results] 210 211 def is_member(self, username) -> bool: 212 if isinstance(username, User): 213 username = username.username 214 try: 215 # returns 204 if its ok, 404 if its not 216 self.allspice_client.requests_get( 217 Organization.ORG_IS_MEMBER % (self.username, username) 218 ) 219 return True 220 except Exception: 221 return False 222 223 def remove_member(self, user: "User"): 224 path = f"/orgs/{self.username}/members/{user.username}" 225 self.allspice_client.requests_delete(path) 226 227 def delete(self): 228 """Delete this Organization. Invalidates this Objects data. Also deletes all Repositories owned by the User""" 229 for repo in self.get_repositories(): 230 repo.delete() 231 self.allspice_client.requests_delete(Organization.API_OBJECT.format(name=self.username)) 232 self.deleted = True 233 234 def get_heatmap(self) -> List[Tuple[datetime, int]]: 235 results = self.allspice_client.requests_get(User.USER_HEATMAP % self.username) 236 results = [ 237 (datetime.fromtimestamp(result["timestamp"]), result["contributions"]) 238 for result in results 239 ] 240 return results 241 242 243class User(ApiObject): 244 active: bool 245 admin: Any 246 allow_create_organization: Any 247 allow_git_hook: Any 248 allow_import_local: Any 249 avatar_url: str 250 created: str 251 description: str 252 email: str 253 emails: List[Any] 254 followers_count: int 255 following_count: int 256 full_name: str 257 html_url: str 258 id: int 259 is_admin: bool 260 language: str 261 last_login: str 262 location: str 263 login: str 264 login_name: str 265 max_repo_creation: Any 266 must_change_password: Any 267 password: Any 268 prohibit_login: bool 269 restricted: bool 270 source_id: int 271 starred_repos_count: int 272 username: str 273 visibility: str 274 website: str 275 276 API_OBJECT = """/users/{name}""" # <org> 277 USER_MAIL = """/user/emails?sudo=%s""" # <name> 278 USER_PATCH = """/admin/users/%s""" # <username> 279 ADMIN_DELETE_USER = """/admin/users/%s""" # <username> 280 ADMIN_EDIT_USER = """/admin/users/{username}""" # <username> 281 USER_HEATMAP = """/users/%s/heatmap""" # <username> 282 283 def __init__(self, allspice_client): 284 super().__init__(allspice_client) 285 self._emails = [] 286 287 def __eq__(self, other): 288 if not isinstance(other, User): 289 return False 290 return self.allspice_client == other.allspice_client and self.id == other.id 291 292 def __hash__(self): 293 return hash(self.allspice_client) ^ hash(self.id) 294 295 @property 296 def emails(self): 297 self.__request_emails() 298 return self._emails 299 300 @classmethod 301 def request(cls, allspice_client, name: str) -> "User": 302 api_object = cls._request(allspice_client, {"name": name}) 303 return api_object 304 305 _patchable_fields: ClassVar[set[str]] = { 306 "active", 307 "admin", 308 "allow_create_organization", 309 "allow_git_hook", 310 "allow_import_local", 311 "email", 312 "full_name", 313 "location", 314 "login_name", 315 "max_repo_creation", 316 "must_change_password", 317 "password", 318 "prohibit_login", 319 "website", 320 } 321 322 def commit(self, login_name: str, source_id: int = 0): 323 """ 324 Unfortunately it is necessary to require the login name 325 as well as the login source (that is not supplied when getting a user) for 326 changing a user. 327 Usually source_id is 0 and the login_name is equal to the username. 328 """ 329 values = self.get_dirty_fields() 330 values.update( 331 # api-doc says that the "source_id" is necessary; works without though 332 {"login_name": login_name, "source_id": source_id} 333 ) 334 args = {"username": self.username} 335 self.allspice_client.requests_patch(User.ADMIN_EDIT_USER.format(**args), data=values) 336 self._dirty_fields = {} 337 338 def create_repo( 339 self, 340 repoName: str, 341 description: str = "", 342 private: bool = False, 343 autoInit=True, 344 gitignores: Optional[str] = None, 345 license: Optional[str] = None, 346 readme: str = "Default", 347 issue_labels: Optional[str] = None, 348 default_branch="master", 349 ): 350 """Create a user Repository 351 352 Throws: 353 AlreadyExistsException: If the Repository exists already. 354 Exception: If something else went wrong. 355 """ 356 result = self.allspice_client.requests_post( 357 "/user/repos", 358 data={ 359 "name": repoName, 360 "description": description, 361 "private": private, 362 "auto_init": autoInit, 363 "gitignores": gitignores, 364 "license": license, 365 "issue_labels": issue_labels, 366 "readme": readme, 367 "default_branch": default_branch, 368 }, 369 ) 370 if "id" in result: 371 self.allspice_client.logger.info("Successfully created Repository %s " % result["name"]) 372 else: 373 self.allspice_client.logger.error(result["message"]) 374 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 375 return Repository.parse_response(self.allspice_client, result) 376 377 def get_repositories(self) -> List["Repository"]: 378 """Get all Repositories owned by this User.""" 379 url = f"/users/{self.username}/repos" 380 results = self.allspice_client.requests_get_paginated(url) 381 return [Repository.parse_response(self.allspice_client, result) for result in results] 382 383 def get_orgs(self) -> List[Organization]: 384 """Get all Organizations this user is a member of.""" 385 url = f"/users/{self.username}/orgs" 386 results = self.allspice_client.requests_get_paginated(url) 387 return [Organization.parse_response(self.allspice_client, result) for result in results] 388 389 def get_teams(self) -> List["Team"]: 390 url = "/user/teams" 391 results = self.allspice_client.requests_get_paginated(url, sudo=self) 392 return [Team.parse_response(self.allspice_client, result) for result in results] 393 394 def get_accessible_repos(self) -> List["Repository"]: 395 """Get all Repositories accessible by the logged in User.""" 396 results = self.allspice_client.requests_get("/user/repos", sudo=self) 397 return [Repository.parse_response(self.allspice_client, result) for result in results] 398 399 def __request_emails(self): 400 result = self.allspice_client.requests_get(User.USER_MAIL % self.login) 401 # report if the adress changed by this 402 for mail in result: 403 self._emails.append(mail["email"]) 404 if mail["primary"]: 405 self._email = mail["email"] 406 407 def delete(self): 408 """Deletes this User. Also deletes all Repositories he owns.""" 409 self.allspice_client.requests_delete(User.ADMIN_DELETE_USER % self.username) 410 self.deleted = True 411 412 def get_heatmap(self) -> List[Tuple[datetime, int]]: 413 results = self.allspice_client.requests_get(User.USER_HEATMAP % self.username) 414 results = [ 415 (datetime.fromtimestamp(result["timestamp"]), result["contributions"]) 416 for result in results 417 ] 418 return results 419 420 421class Branch(ReadonlyApiObject): 422 commit: Dict[str, Optional[Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]]] 423 effective_branch_protection_name: str 424 enable_status_check: bool 425 name: str 426 protected: bool 427 required_approvals: int 428 status_check_contexts: List[Any] 429 user_can_merge: bool 430 user_can_push: bool 431 432 API_OBJECT = """/repos/{owner}/{repo}/branches/{branch}""" 433 434 def __init__(self, allspice_client): 435 super().__init__(allspice_client) 436 437 def __eq__(self, other): 438 if not isinstance(other, Branch): 439 return False 440 return self.commit == other.commit and self.name == other.name 441 442 def __hash__(self): 443 return hash(self.commit["id"]) ^ hash(self.name) 444 445 _fields_to_parsers: ClassVar[dict] = { 446 # This is not a commit object 447 # "commit": lambda allspice_client, c: Commit.parse_response(allspice_client, c) 448 } 449 450 @classmethod 451 def request(cls, allspice_client, owner: str, repo: str, branch: str): 452 return cls._request(allspice_client, {"owner": owner, "repo": repo, "branch": branch}) 453 454 455class GitEntry(ReadonlyApiObject): 456 """ 457 An object representing a file or directory in the Git tree. 458 """ 459 460 mode: str 461 path: str 462 sha: str 463 size: int 464 type: str 465 url: str 466 467 def __init__(self, allspice_client): 468 super().__init__(allspice_client) 469 470 def __eq__(self, other) -> bool: 471 if not isinstance(other, GitEntry): 472 return False 473 return self.sha == other.sha 474 475 def __hash__(self) -> int: 476 return hash(self.sha) 477 478 479class Repository(ApiObject): 480 allow_fast_forward_only_merge: bool 481 allow_manual_merge: Any 482 allow_merge_commits: bool 483 allow_rebase: bool 484 allow_rebase_explicit: bool 485 allow_rebase_update: bool 486 allow_squash_merge: bool 487 archived: bool 488 archived_at: str 489 autodetect_manual_merge: Any 490 avatar_url: str 491 clone_url: str 492 created_at: str 493 default_allow_maintainer_edit: bool 494 default_branch: str 495 default_delete_branch_after_merge: bool 496 default_merge_style: str 497 description: str 498 empty: bool 499 enable_prune: Any 500 external_tracker: Any 501 external_wiki: Any 502 fork: bool 503 forks_count: int 504 full_name: str 505 has_actions: bool 506 has_issues: bool 507 has_packages: bool 508 has_projects: bool 509 has_pull_requests: bool 510 has_releases: bool 511 has_wiki: bool 512 html_url: str 513 id: int 514 ignore_whitespace_conflicts: bool 515 internal: bool 516 internal_tracker: Dict[str, bool] 517 language: str 518 languages_url: str 519 licenses: Any 520 link: str 521 mirror: bool 522 mirror_interval: str 523 mirror_updated: str 524 name: str 525 object_format_name: str 526 open_issues_count: int 527 open_pr_counter: int 528 original_url: str 529 owner: Union["User", "Organization"] 530 parent: Any 531 permissions: Dict[str, bool] 532 private: bool 533 projects_mode: str 534 release_counter: int 535 repo_transfer: Any 536 size: int 537 ssh_url: str 538 stars_count: int 539 template: bool 540 topics: List[Union[Any, str]] 541 updated_at: datetime 542 url: str 543 watchers_count: int 544 website: str 545 546 API_OBJECT = """/repos/{owner}/{name}""" # <owner>, <reponame> 547 REPO_IS_COLLABORATOR = """/repos/%s/%s/collaborators/%s""" # <owner>, <reponame>, <username> 548 REPO_SEARCH = """/repos/search/""" 549 REPO_BRANCHES = """/repos/%s/%s/branches""" # <owner>, <reponame> 550 REPO_BRANCH = """/repos/{owner}/{repo}/branches/{branch}""" 551 REPO_ISSUES = """/repos/{owner}/{repo}/issues""" # <owner, reponame> 552 REPO_DESIGN_REVIEWS = """/repos/{owner}/{repo}/pulls""" 553 REPO_DELETE = """/repos/%s/%s""" # <owner>, <reponame> 554 REPO_TIMES = """/repos/%s/%s/times""" # <owner>, <reponame> 555 REPO_USER_TIME = """/repos/%s/%s/times/%s""" # <owner>, <reponame>, <username> 556 REPO_COMMITS = "/repos/%s/%s/commits" # <owner>, <reponame> 557 REPO_TRANSFER = "/repos/{owner}/{repo}/transfer" 558 REPO_MILESTONES = """/repos/{owner}/{repo}/milestones""" 559 REPO_GET_ARCHIVE = "/repos/{owner}/{repo}/archive/{ref}.{format}" 560 REPO_GET_ALLSPICE_JSON = "/repos/{owner}/{repo}/allspice_generated/json/{content}" 561 REPO_GET_ALLSPICE_SVG = "/repos/{owner}/{repo}/allspice_generated/svg/{content}" 562 REPO_GET_ALLSPICE_PROJECT = "/repos/{owner}/{repo}/allspice_generated/project/{content}" 563 REPO_GET_TOPICS = "/repos/{owner}/{repo}/topics" 564 REPO_ADD_TOPIC = "/repos/{owner}/{repo}/topics/{topic}" 565 REPO_GET_RELEASES = "/repos/{owner}/{repo}/releases" 566 REPO_GET_LATEST_RELEASE = "/repos/{owner}/{repo}/releases/latest" 567 REPO_GET_RELEASE_BY_TAG = "/repos/{owner}/{repo}/releases/tags/{tag}" 568 REPO_GET_COMMIT_STATUS = "/repos/{owner}/{repo}/statuses/{sha}" 569 REPO_GET_RAW_FILE = "/repos/{owner}/{repo}/raw/{path}" 570 REPO_GET_TREE = "/repos/{owner}/{repo}/git/trees/{ref}" 571 572 class ArchiveFormat(Enum): 573 """ 574 Archive formats for Repository.get_archive 575 """ 576 577 TAR = "tar.gz" 578 ZIP = "zip" 579 580 class CommitStatusSort(Enum): 581 """ 582 Sort order for Repository.get_commit_status 583 """ 584 585 OLDEST = "oldest" 586 RECENT_UPDATE = "recentupdate" 587 LEAST_UPDATE = "leastupdate" 588 LEAST_INDEX = "leastindex" 589 HIGHEST_INDEX = "highestindex" 590 591 def __init__(self, allspice_client): 592 super().__init__(allspice_client) 593 594 def __eq__(self, other): 595 if not isinstance(other, Repository): 596 return False 597 return self.owner == other.owner and self.name == other.name 598 599 def __hash__(self): 600 return hash(self.owner) ^ hash(self.name) 601 602 _fields_to_parsers: ClassVar[dict] = { 603 # dont know how to tell apart user and org as owner except form email being empty. 604 "owner": lambda allspice_client, r: ( 605 Organization.parse_response(allspice_client, r) 606 if r["email"] == "" 607 else User.parse_response(allspice_client, r) 608 ), 609 "updated_at": lambda _, t: Util.convert_time(t), 610 } 611 612 @classmethod 613 def request( 614 cls, 615 allspice_client, 616 owner: str, 617 name: str, 618 ) -> Repository: 619 return cls._request(allspice_client, {"owner": owner, "name": name}) 620 621 @classmethod 622 def search( 623 cls, 624 allspice_client, 625 query: Optional[str] = None, 626 topic: bool = False, 627 include_description: bool = False, 628 user: Optional[User] = None, 629 owner_to_prioritize: Union[User, Organization, None] = None, 630 ) -> list[Repository]: 631 """ 632 Search for repositories. 633 634 See https://hub.allspice.io/api/swagger#/repository/repoSearch 635 636 :param query: The query string to search for 637 :param topic: If true, the query string will only be matched against the 638 repository's topic. 639 :param include_description: If true, the query string will be matched 640 against the repository's description as well. 641 :param user: If specified, only repositories that this user owns or 642 contributes to will be searched. 643 :param owner_to_prioritize: If specified, repositories owned by the 644 given entity will be prioritized in the search. 645 :returns: All repositories matching the query. If there are many 646 repositories matching this query, this may take some time. 647 """ 648 649 params = {} 650 651 if query is not None: 652 params["q"] = query 653 if topic: 654 params["topic"] = topic 655 if include_description: 656 params["include_description"] = include_description 657 if user is not None: 658 params["user"] = user.id 659 if owner_to_prioritize is not None: 660 params["owner_to_prioritize"] = owner_to_prioritize.id 661 662 responses = allspice_client.requests_get_paginated(cls.REPO_SEARCH, params=params) 663 664 return [Repository.parse_response(allspice_client, response) for response in responses] 665 666 _patchable_fields: ClassVar[set[str]] = { 667 "allow_manual_merge", 668 "allow_merge_commits", 669 "allow_rebase", 670 "allow_rebase_explicit", 671 "allow_rebase_update", 672 "allow_squash_merge", 673 "archived", 674 "autodetect_manual_merge", 675 "default_branch", 676 "default_delete_branch_after_merge", 677 "default_merge_style", 678 "description", 679 "enable_prune", 680 "external_tracker", 681 "external_wiki", 682 "has_actions", 683 "has_issues", 684 "has_projects", 685 "has_pull_requests", 686 "has_wiki", 687 "ignore_whitespace_conflicts", 688 "internal_tracker", 689 "mirror_interval", 690 "name", 691 "private", 692 "template", 693 "website", 694 } 695 696 def commit(self): 697 args = {"owner": self.owner.username, "name": self.name} 698 self._commit(args) 699 700 def get_branches(self) -> List["Branch"]: 701 """Get all the Branches of this Repository.""" 702 703 results = self.allspice_client.requests_get_paginated( 704 Repository.REPO_BRANCHES % (self.owner.username, self.name) 705 ) 706 return [Branch.parse_response(self.allspice_client, result) for result in results] 707 708 def get_branch(self, name: str) -> "Branch": 709 """Get a specific Branch of this Repository.""" 710 result = self.allspice_client.requests_get( 711 Repository.REPO_BRANCH.format(owner=self.owner.username, repo=self.name, branch=name) 712 ) 713 return Branch.parse_response(self.allspice_client, result) 714 715 def add_branch(self, create_from: Ref, newname: str) -> "Branch": 716 """Add a branch to the repository""" 717 # Note: will only work with gitea 1.13 or higher! 718 719 ref_name = Util.data_params_for_ref(create_from) 720 if "ref" not in ref_name: 721 raise ValueError("create_from must be a Branch, Commit or string") 722 ref_name = ref_name["ref"] 723 724 data = {"new_branch_name": newname, "old_ref_name": ref_name} 725 result = self.allspice_client.requests_post( 726 Repository.REPO_BRANCHES % (self.owner.username, self.name), data=data 727 ) 728 return Branch.parse_response(self.allspice_client, result) 729 730 def get_issues( 731 self, 732 state: Literal["open", "closed", "all"] = "all", 733 search_query: Optional[str] = None, 734 labels: Optional[List[str]] = None, 735 milestones: Optional[List[Union[Milestone, str]]] = None, 736 assignee: Optional[Union[User, str]] = None, 737 since: Optional[datetime] = None, 738 before: Optional[datetime] = None, 739 ) -> List["Issue"]: 740 """ 741 Get all Issues of this Repository (open and closed) 742 743 https://hub.allspice.io/api/swagger#/repository/repoListIssues 744 745 All params of this method are optional filters. If you don't specify a filter, it 746 will not be applied. 747 748 :param state: The state of the Issues to get. If None, all Issues are returned. 749 :param search_query: Filter issues by text. This is equivalent to searching for 750 `search_query` in the Issues on the web interface. 751 :param labels: Filter issues by labels. 752 :param milestones: Filter issues by milestones. 753 :param assignee: Filter issues by the assigned user. 754 :param since: Filter issues by the date they were created. 755 :param before: Filter issues by the date they were created. 756 :return: A list of Issues. 757 """ 758 759 data = { 760 "state": state, 761 } 762 if search_query: 763 data["q"] = search_query 764 if labels: 765 data["labels"] = ",".join(labels) 766 if milestones: 767 data["milestone"] = ",".join( 768 [ 769 milestone.name if isinstance(milestone, Milestone) else milestone 770 for milestone in milestones 771 ] 772 ) 773 if assignee: 774 if isinstance(assignee, User): 775 data["assignee"] = assignee.username 776 else: 777 data["assignee"] = assignee 778 if since: 779 data["since"] = Util.format_time(since) 780 if before: 781 data["before"] = Util.format_time(before) 782 783 results = self.allspice_client.requests_get_paginated( 784 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 785 params=data, 786 ) 787 788 issues = [] 789 for result in results: 790 issue = Issue.parse_response(self.allspice_client, result) 791 # See Issue.request 792 setattr(issue, "_repository", self) 793 # This is mostly for compatibility with an older implementation 794 Issue._add_read_property("repo", self, issue) 795 issues.append(issue) 796 797 return issues 798 799 def get_design_reviews( 800 self, 801 state: Literal["open", "closed", "all"] = "all", 802 milestone: Optional[Union[Milestone, str]] = None, 803 labels: Optional[List[str]] = None, 804 ) -> List["DesignReview"]: 805 """ 806 Get all Design Reviews of this Repository. 807 808 https://hub.allspice.io/api/swagger#/repository/repoListPullRequests 809 810 :param state: The state of the Design Reviews to get. If None, all Design Reviews 811 are returned. 812 :param milestone: The milestone of the Design Reviews to get. 813 :param labels: A list of label IDs to filter DRs by. 814 :return: A list of Design Reviews. 815 """ 816 817 params = { 818 "state": state, 819 } 820 if milestone: 821 if isinstance(milestone, Milestone): 822 params["milestone"] = milestone.name 823 else: 824 params["milestone"] = milestone 825 if labels: 826 params["labels"] = ",".join(labels) 827 828 results = self.allspice_client.requests_get_paginated( 829 self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name), 830 params=params, 831 ) 832 return [DesignReview.parse_response(self.allspice_client, result) for result in results] 833 834 def get_commits( 835 self, 836 sha: Optional[str] = None, 837 path: Optional[str] = None, 838 stat: bool = True, 839 ) -> List["Commit"]: 840 """ 841 Get all the Commits of this Repository. 842 843 https://hub.allspice.io/api/swagger#/repository/repoGetAllCommits 844 845 :param sha: The SHA of the commit to start listing commits from. 846 :param path: filepath of a file/dir. 847 :param stat: Include the number of additions and deletions in the response. 848 Disable for speedup. 849 :return: A list of Commits. 850 """ 851 852 data = {} 853 if sha: 854 data["sha"] = sha 855 if path: 856 data["path"] = path 857 if not stat: 858 data["stat"] = False 859 860 try: 861 results = self.allspice_client.requests_get_paginated( 862 Repository.REPO_COMMITS % (self.owner.username, self.name), 863 params=data, 864 ) 865 except ConflictException as err: 866 logging.warning(err) 867 logging.warning("Repository %s/%s is Empty" % (self.owner.username, self.name)) 868 results = [] 869 return [Commit.parse_response(self.allspice_client, result) for result in results] 870 871 def get_issues_state(self, state) -> List["Issue"]: 872 """ 873 DEPRECATED: Use get_issues() instead. 874 875 Get issues of state Issue.open or Issue.closed of a repository. 876 """ 877 878 assert state in [Issue.OPENED, Issue.CLOSED] 879 issues = [] 880 data = {"state": state} 881 results = self.allspice_client.requests_get_paginated( 882 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 883 params=data, 884 ) 885 for result in results: 886 issue = Issue.parse_response(self.allspice_client, result) 887 # adding data not contained in the issue response 888 # See Issue.request() 889 setattr(issue, "_repository", self) 890 Issue._add_read_property("repo", self, issue) 891 Issue._add_read_property("owner", self.owner, issue) 892 issues.append(issue) 893 return issues 894 895 def get_times(self): 896 results = self.allspice_client.requests_get( 897 Repository.REPO_TIMES % (self.owner.username, self.name) 898 ) 899 return results 900 901 def get_user_time(self, username) -> float: 902 if isinstance(username, User): 903 username = username.username 904 results = self.allspice_client.requests_get( 905 Repository.REPO_USER_TIME % (self.owner.username, self.name, username) 906 ) 907 time = sum(r["time"] for r in results) 908 return time 909 910 def get_full_name(self) -> str: 911 return self.owner.username + "/" + self.name 912 913 def create_issue(self, title, assignees=frozenset(), description="") -> ApiObject: 914 data = { 915 "assignees": assignees, 916 "body": description, 917 "closed": False, 918 "title": title, 919 } 920 result = self.allspice_client.requests_post( 921 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 922 data=data, 923 ) 924 925 issue = Issue.parse_response(self.allspice_client, result) 926 setattr(issue, "_repository", self) 927 Issue._add_read_property("repo", self, issue) 928 return issue 929 930 def create_design_review( 931 self, 932 title: str, 933 head: Union[Branch, str], 934 base: Union[Branch, str], 935 assignees: Optional[Set[Union[User, str]]] = None, 936 body: Optional[str] = None, 937 due_date: Optional[datetime] = None, 938 milestone: Optional["Milestone"] = None, 939 ) -> "DesignReview": 940 """ 941 Create a new Design Review. 942 943 See https://hub.allspice.io/api/swagger#/repository/repoCreatePullRequest 944 945 :param title: Title of the Design Review 946 :param head: Branch or name of the branch to merge into the base branch 947 :param base: Branch or name of the branch to merge into 948 :param assignees: Optional. A list of users to assign this review. List can be of 949 User objects or of usernames. 950 :param body: An Optional Description for the Design Review. 951 :param due_date: An Optional Due date for the Design Review. 952 :param milestone: An Optional Milestone for the Design Review 953 :return: The created Design Review 954 """ 955 956 data: dict[str, Any] = { 957 "title": title, 958 } 959 960 if isinstance(head, Branch): 961 data["head"] = head.name 962 else: 963 data["head"] = head 964 if isinstance(base, Branch): 965 data["base"] = base.name 966 else: 967 data["base"] = base 968 if assignees: 969 data["assignees"] = [a.username if isinstance(a, User) else a for a in assignees] 970 if body: 971 data["body"] = body 972 if due_date: 973 data["due_date"] = Util.format_time(due_date) 974 if milestone: 975 data["milestone"] = milestone.id 976 977 result = self.allspice_client.requests_post( 978 self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name), 979 data=data, 980 ) 981 982 return DesignReview.parse_response(self.allspice_client, result) 983 984 def create_milestone( 985 self, 986 title: str, 987 description: str, 988 due_date: Optional[str] = None, 989 state: str = "open", 990 ) -> "Milestone": 991 url = Repository.REPO_MILESTONES.format(owner=self.owner.username, repo=self.name) 992 data = {"title": title, "description": description, "state": state} 993 if due_date: 994 data["due_date"] = due_date 995 result = self.allspice_client.requests_post(url, data=data) 996 return Milestone.parse_response(self.allspice_client, result) 997 998 def create_gitea_hook(self, hook_url: str, events: List[str]): 999 url = f"/repos/{self.owner.username}/{self.name}/hooks" 1000 data = { 1001 "type": "gitea", 1002 "config": {"content_type": "json", "url": hook_url}, 1003 "events": events, 1004 "active": True, 1005 } 1006 return self.allspice_client.requests_post(url, data=data) 1007 1008 def list_hooks(self): 1009 url = f"/repos/{self.owner.username}/{self.name}/hooks" 1010 return self.allspice_client.requests_get(url) 1011 1012 def delete_hook(self, id: str): 1013 url = f"/repos/{self.owner.username}/{self.name}/hooks/{id}" 1014 self.allspice_client.requests_delete(url) 1015 1016 def is_collaborator(self, username) -> bool: 1017 if isinstance(username, User): 1018 username = username.username 1019 try: 1020 # returns 204 if its ok, 404 if its not 1021 self.allspice_client.requests_get( 1022 Repository.REPO_IS_COLLABORATOR % (self.owner.username, self.name, username) 1023 ) 1024 return True 1025 except Exception: 1026 return False 1027 1028 def get_users_with_access(self) -> Sequence[User]: 1029 url = f"/repos/{self.owner.username}/{self.name}/collaborators" 1030 response = self.allspice_client.requests_get(url) 1031 collabs = [User.parse_response(self.allspice_client, user) for user in response] 1032 if isinstance(self.owner, User): 1033 return [*collabs, self.owner] 1034 else: 1035 # owner must be org 1036 teams = self.owner.get_teams() 1037 for team in teams: 1038 team_repos = team.get_repos() 1039 if self.name in [n.name for n in team_repos]: 1040 collabs += team.get_members() 1041 return collabs 1042 1043 def remove_collaborator(self, user_name: str): 1044 url = f"/repos/{self.owner.username}/{self.name}/collaborators/{user_name}" 1045 self.allspice_client.requests_delete(url) 1046 1047 def transfer_ownership( 1048 self, 1049 new_owner: Union[User, Organization], 1050 new_teams: Set[Team] | FrozenSet[Team] = frozenset(), 1051 ): 1052 url = Repository.REPO_TRANSFER.format(owner=self.owner.username, repo=self.name) 1053 data: dict[str, Any] = {"new_owner": new_owner.username} 1054 if isinstance(new_owner, Organization): 1055 new_team_ids = [team.id for team in new_teams if team in new_owner.get_teams()] 1056 data["team_ids"] = new_team_ids 1057 self.allspice_client.requests_post(url, data=data) 1058 # TODO: make sure this instance is either updated or discarded 1059 1060 def get_git_content( 1061 self, 1062 ref: Optional["Ref"] = None, 1063 commit: "Optional[Commit]" = None, 1064 ) -> List[Content]: 1065 """ 1066 Get the metadata for all files in the root directory. 1067 1068 https://hub.allspice.io/api/swagger#/repository/repoGetContentsList 1069 1070 :param ref: branch or commit to get content from 1071 :param commit: commit to get content from (deprecated) 1072 """ 1073 url = f"/repos/{self.owner.username}/{self.name}/contents" 1074 data = Util.data_params_for_ref(ref or commit) 1075 1076 result = [ 1077 Content.parse_response(self.allspice_client, f) 1078 for f in self.allspice_client.requests_get(url, data) 1079 ] 1080 return result 1081 1082 def get_tree(self, ref: Optional[Ref] = None, recursive: bool = False) -> List[GitEntry]: 1083 """ 1084 Get the repository's tree on a given ref. 1085 1086 By default, this will only return the top-level entries in the tree. If you want 1087 to get the entire tree, set `recursive` to True. 1088 1089 :param ref: The ref to get the tree from. If not provided, the default branch is used. 1090 :param recursive: Whether to get the entire tree or just the top-level entries. 1091 """ 1092 1093 ref = Util.data_params_for_ref(ref).get("ref", self.default_branch) 1094 url = self.REPO_GET_TREE.format(owner=self.owner.username, repo=self.name, ref=ref) 1095 params = {"recursive": recursive} 1096 results = self.allspice_client.requests_get_paginated(url, params=params) 1097 return [GitEntry.parse_response(self.allspice_client, result) for result in results] 1098 1099 def get_file_content( 1100 self, 1101 content: Content, 1102 ref: Optional[Ref] = None, 1103 commit: Optional[Commit] = None, 1104 ) -> Union[str, List["Content"]]: 1105 """https://hub.allspice.io/api/swagger#/repository/repoGetContents""" 1106 url = f"/repos/{self.owner.username}/{self.name}/contents/{content.path}" 1107 data = Util.data_params_for_ref(ref or commit) 1108 1109 if content.type == Content.FILE: 1110 return self.allspice_client.requests_get(url, data)["content"] 1111 else: 1112 return [ 1113 Content.parse_response(self.allspice_client, f) 1114 for f in self.allspice_client.requests_get(url, data) 1115 ] 1116 1117 def get_raw_file( 1118 self, 1119 file_path: str, 1120 ref: Optional[Ref] = None, 1121 ) -> bytes: 1122 """ 1123 Get the raw, binary data of a single file. 1124 1125 Note 1: if the file you are requesting is a text file, you might want to 1126 use .decode() on the result to get a string. For example: 1127 1128 content = repo.get_raw_file("file.txt").decode("utf-8") 1129 1130 Note 2: this method will store the entire file in memory. If you want 1131 to download a large file, you might want to use `download_to_file` 1132 instead. 1133 1134 See https://hub.allspice.io/api/swagger#/repository/repoGetRawFile 1135 1136 :param file_path: The path to the file to get. 1137 :param ref: The branch or commit to get the file from. If not provided, 1138 the default branch is used. 1139 """ 1140 1141 url = self.REPO_GET_RAW_FILE.format( 1142 owner=self.owner.username, 1143 repo=self.name, 1144 path=file_path, 1145 ) 1146 params = Util.data_params_for_ref(ref) 1147 return self.allspice_client.requests_get_raw(url, params=params) 1148 1149 def download_to_file( 1150 self, 1151 file_path: str, 1152 io: IO, 1153 ref: Optional[Ref] = None, 1154 ) -> None: 1155 """ 1156 Download the binary data of a file to a file-like object. 1157 1158 Example: 1159 1160 with open("schematic.DSN", "wb") as f: 1161 Repository.download_to_file("Schematics/my_schematic.DSN", f) 1162 1163 :param file_path: The path to the file in the repository from the root 1164 of the repository. 1165 :param io: The file-like object to write the data to. 1166 """ 1167 1168 url = self.allspice_client._AllSpice__get_url( 1169 self.REPO_GET_RAW_FILE.format( 1170 owner=self.owner.username, 1171 repo=self.name, 1172 path=file_path, 1173 ) 1174 ) 1175 params = Util.data_params_for_ref(ref) 1176 response = self.allspice_client.requests.get( 1177 url, 1178 params=params, 1179 headers=self.allspice_client.headers, 1180 stream=True, 1181 ) 1182 1183 for chunk in response.iter_content(chunk_size=4096): 1184 if chunk: 1185 io.write(chunk) 1186 1187 def get_generated_json(self, content: Union[Content, str], ref: Optional[Ref] = None) -> dict: 1188 """ 1189 Get the json blob for a cad file if it exists, otherwise enqueue 1190 a new job and return a 503 status. 1191 1192 WARNING: This is still experimental and not recommended for critical 1193 applications. The structure and content of the returned dictionary can 1194 change at any time. 1195 1196 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON 1197 """ 1198 1199 if isinstance(content, Content): 1200 content = content.path 1201 1202 url = self.REPO_GET_ALLSPICE_JSON.format( 1203 owner=self.owner.username, 1204 repo=self.name, 1205 content=content, 1206 ) 1207 data = Util.data_params_for_ref(ref) 1208 return self.allspice_client.requests_get(url, data) 1209 1210 def get_generated_svg(self, content: Union[Content, str], ref: Optional[Ref] = None) -> bytes: 1211 """ 1212 Get the svg blob for a cad file if it exists, otherwise enqueue 1213 a new job and return a 503 status. 1214 1215 WARNING: This is still experimental and not yet recommended for 1216 critical applications. The content of the returned svg can change 1217 at any time. 1218 1219 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG 1220 """ 1221 1222 if isinstance(content, Content): 1223 content = content.path 1224 1225 url = self.REPO_GET_ALLSPICE_SVG.format( 1226 owner=self.owner.username, 1227 repo=self.name, 1228 content=content, 1229 ) 1230 data = Util.data_params_for_ref(ref) 1231 return self.allspice_client.requests_get_raw(url, data) 1232 1233 def get_generated_projectdata( 1234 self, content: Union[Content, str], ref: Optional[Ref] = None 1235 ) -> dict: 1236 """ 1237 Get the json project data based on the cad file provided 1238 1239 WARNING: This is still experimental and not yet recommended for 1240 critical applications. The content of the returned dictionary can change 1241 at any time. 1242 1243 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceProject 1244 """ 1245 if isinstance(content, Content): 1246 content = content.path 1247 1248 url = self.REPO_GET_ALLSPICE_PROJECT.format( 1249 owner=self.owner.username, 1250 repo=self.name, 1251 content=content, 1252 ) 1253 data = Util.data_params_for_ref(ref) 1254 return self.allspice_client.requests_get(url, data) 1255 1256 def create_file(self, file_path: str, content: str, data: Optional[dict] = None): 1257 """https://hub.allspice.io/api/swagger#/repository/repoCreateFile""" 1258 if not data: 1259 data = {} 1260 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1261 data.update({"content": content}) 1262 return self.allspice_client.requests_post(url, data) 1263 1264 def change_file(self, file_path: str, file_sha: str, content: str, data: Optional[dict] = None): 1265 """https://hub.allspice.io/api/swagger#/repository/repoCreateFile""" 1266 if not data: 1267 data = {} 1268 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1269 data.update({"sha": file_sha, "content": content}) 1270 return self.allspice_client.requests_put(url, data) 1271 1272 def delete_file(self, file_path: str, file_sha: str, data: Optional[dict] = None): 1273 """https://hub.allspice.io/api/swagger#/repository/repoDeleteFile""" 1274 if not data: 1275 data = {} 1276 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1277 data.update({"sha": file_sha}) 1278 return self.allspice_client.requests_delete(url, data) 1279 1280 def get_archive( 1281 self, 1282 ref: Ref = "main", 1283 archive_format: ArchiveFormat = ArchiveFormat.ZIP, 1284 ) -> bytes: 1285 """ 1286 Download all the files in a specific ref of a repository as a zip or tarball 1287 archive. 1288 1289 https://hub.allspice.io/api/swagger#/repository/repoGetArchive 1290 1291 :param ref: branch or commit to get content from, defaults to the "main" branch 1292 :param archive_format: zip or tar, defaults to zip 1293 """ 1294 1295 ref_string = Util.data_params_for_ref(ref)["ref"] 1296 url = self.REPO_GET_ARCHIVE.format( 1297 owner=self.owner.username, 1298 repo=self.name, 1299 ref=ref_string, 1300 format=archive_format.value, 1301 ) 1302 return self.allspice_client.requests_get_raw(url) 1303 1304 def get_topics(self) -> list[str]: 1305 """ 1306 Gets the list of topics on this repository. 1307 1308 See http://localhost:3000/api/swagger#/repository/repoListTopics 1309 """ 1310 1311 url = self.REPO_GET_TOPICS.format( 1312 owner=self.owner.username, 1313 repo=self.name, 1314 ) 1315 return self.allspice_client.requests_get(url)["topics"] 1316 1317 def add_topic(self, topic: str): 1318 """ 1319 Adds a topic to the repository. 1320 1321 See https://hub.allspice.io/api/swagger#/repository/repoAddTopic 1322 1323 :param topic: The topic to add. Topic names must consist only of 1324 lowercase letters, numnbers and dashes (-), and cannot start with 1325 dashes. Topic names also must be under 35 characters long. 1326 """ 1327 1328 url = self.REPO_ADD_TOPIC.format(owner=self.owner.username, repo=self.name, topic=topic) 1329 self.allspice_client.requests_put(url) 1330 1331 def create_release( 1332 self, 1333 tag_name: str, 1334 name: Optional[str] = None, 1335 body: Optional[str] = None, 1336 draft: bool = False, 1337 ): 1338 """ 1339 Create a release for this repository. The release will be created for 1340 the tag with the given name. If there is no tag with this name, create 1341 the tag first. 1342 1343 See https://hub.allspice.io/api/swagger#/repository/repoCreateRelease 1344 """ 1345 1346 url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name) 1347 data = { 1348 "tag_name": tag_name, 1349 "draft": draft, 1350 } 1351 if name is not None: 1352 data["name"] = name 1353 if body is not None: 1354 data["body"] = body 1355 response = self.allspice_client.requests_post(url, data) 1356 return Release.parse_response(self.allspice_client, response, self) 1357 1358 def get_releases( 1359 self, draft: Optional[bool] = None, pre_release: Optional[bool] = None 1360 ) -> List[Release]: 1361 """ 1362 Get the list of releases for this repository. 1363 1364 See https://hub.allspice.io/api/swagger#/repository/repoListReleases 1365 """ 1366 1367 data = {} 1368 1369 if draft is not None: 1370 data["draft"] = draft 1371 if pre_release is not None: 1372 data["pre-release"] = pre_release 1373 1374 url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name) 1375 responses = self.allspice_client.requests_get_paginated(url, params=data) 1376 1377 return [ 1378 Release.parse_response(self.allspice_client, response, self) for response in responses 1379 ] 1380 1381 def get_latest_release(self) -> Release: 1382 """ 1383 Get the latest release for this repository. 1384 1385 See https://hub.allspice.io/api/swagger#/repository/repoGetLatestRelease 1386 """ 1387 1388 url = self.REPO_GET_LATEST_RELEASE.format(owner=self.owner.username, repo=self.name) 1389 response = self.allspice_client.requests_get(url) 1390 release = Release.parse_response(self.allspice_client, response, self) 1391 return release 1392 1393 def get_release_by_tag(self, tag: str) -> Release: 1394 """ 1395 Get a release by its tag. 1396 1397 See https://hub.allspice.io/api/swagger#/repository/repoGetReleaseByTag 1398 """ 1399 1400 url = self.REPO_GET_RELEASE_BY_TAG.format( 1401 owner=self.owner.username, repo=self.name, tag=tag 1402 ) 1403 response = self.allspice_client.requests_get(url) 1404 release = Release.parse_response(self.allspice_client, response, self) 1405 return release 1406 1407 def get_commit_statuses( 1408 self, 1409 commit: Union[str, Commit], 1410 sort: Optional[CommitStatusSort] = None, 1411 state: Optional[CommitStatusState] = None, 1412 ) -> List[CommitStatus]: 1413 """ 1414 Get a list of statuses for a commit. 1415 1416 This is roughly equivalent to the Commit.get_statuses method, but this 1417 method allows you to sort and filter commits and is more convenient if 1418 you have a commit SHA and don't need to get the commit itself. 1419 1420 See https://hub.allspice.io/api/swagger#/repository/repoListStatuses 1421 """ 1422 1423 if isinstance(commit, Commit): 1424 commit = commit.sha 1425 1426 params = {} 1427 if sort is not None: 1428 params["sort"] = sort.value 1429 if state is not None: 1430 params["state"] = state.value 1431 1432 url = self.REPO_GET_COMMIT_STATUS.format( 1433 owner=self.owner.username, repo=self.name, sha=commit 1434 ) 1435 response = self.allspice_client.requests_get_paginated(url, params=params) 1436 return [CommitStatus.parse_response(self.allspice_client, status) for status in response] 1437 1438 def create_commit_status( 1439 self, 1440 commit: Union[str, Commit], 1441 context: Optional[str] = None, 1442 description: Optional[str] = None, 1443 state: Optional[CommitStatusState] = None, 1444 target_url: Optional[str] = None, 1445 ) -> CommitStatus: 1446 """ 1447 Create a status on a commit. 1448 1449 See https://hub.allspice.io/api/swagger#/repository/repoCreateStatus 1450 """ 1451 1452 if isinstance(commit, Commit): 1453 commit = commit.sha 1454 1455 data = {} 1456 if context is not None: 1457 data["context"] = context 1458 if description is not None: 1459 data["description"] = description 1460 if state is not None: 1461 data["state"] = state.value 1462 if target_url is not None: 1463 data["target_url"] = target_url 1464 1465 url = self.REPO_GET_COMMIT_STATUS.format( 1466 owner=self.owner.username, repo=self.name, sha=commit 1467 ) 1468 response = self.allspice_client.requests_post(url, data=data) 1469 return CommitStatus.parse_response(self.allspice_client, response) 1470 1471 def delete(self): 1472 self.allspice_client.requests_delete( 1473 Repository.REPO_DELETE % (self.owner.username, self.name) 1474 ) 1475 self.deleted = True 1476 1477 1478class Milestone(ApiObject): 1479 allow_merge_commits: Any 1480 allow_rebase: Any 1481 allow_rebase_explicit: Any 1482 allow_squash_merge: Any 1483 archived: Any 1484 closed_at: Any 1485 closed_issues: int 1486 created_at: str 1487 default_branch: Any 1488 description: str 1489 due_on: Any 1490 has_issues: Any 1491 has_pull_requests: Any 1492 has_wiki: Any 1493 id: int 1494 ignore_whitespace_conflicts: Any 1495 name: Any 1496 open_issues: int 1497 private: Any 1498 state: str 1499 title: str 1500 updated_at: str 1501 website: Any 1502 1503 API_OBJECT = """/repos/{owner}/{repo}/milestones/{number}""" # <owner, repo> 1504 1505 def __init__(self, allspice_client): 1506 super().__init__(allspice_client) 1507 1508 def __eq__(self, other): 1509 if not isinstance(other, Milestone): 1510 return False 1511 return self.allspice_client == other.allspice_client and self.id == other.id 1512 1513 def __hash__(self): 1514 return hash(self.allspice_client) ^ hash(self.id) 1515 1516 _fields_to_parsers: ClassVar[dict] = { 1517 "closed_at": lambda _, t: Util.convert_time(t), 1518 "due_on": lambda _, t: Util.convert_time(t), 1519 } 1520 1521 _patchable_fields: ClassVar[set[str]] = { 1522 "allow_merge_commits", 1523 "allow_rebase", 1524 "allow_rebase_explicit", 1525 "allow_squash_merge", 1526 "archived", 1527 "default_branch", 1528 "description", 1529 "has_issues", 1530 "has_pull_requests", 1531 "has_wiki", 1532 "ignore_whitespace_conflicts", 1533 "name", 1534 "private", 1535 "website", 1536 } 1537 1538 @classmethod 1539 def request(cls, allspice_client, owner: str, repo: str, number: str): 1540 return cls._request(allspice_client, {"owner": owner, "repo": repo, "number": number}) 1541 1542 1543class Attachment(ReadonlyApiObject): 1544 """ 1545 An asset attached to a comment. 1546 1547 You cannot edit or delete the attachment from this object - see the instance methods 1548 Comment.edit_attachment and delete_attachment for that. 1549 """ 1550 1551 browser_download_url: str 1552 created_at: str 1553 download_count: int 1554 id: int 1555 name: str 1556 size: int 1557 uuid: str 1558 1559 def __init__(self, allspice_client): 1560 super().__init__(allspice_client) 1561 1562 def __eq__(self, other): 1563 if not isinstance(other, Attachment): 1564 return False 1565 1566 return self.uuid == other.uuid 1567 1568 def __hash__(self): 1569 return hash(self.uuid) 1570 1571 def download_to_file(self, io: IO): 1572 """ 1573 Download the raw, binary data of this Attachment to a file-like object. 1574 1575 Example: 1576 1577 with open("my_file.zip", "wb") as f: 1578 attachment.download_to_file(f) 1579 1580 :param io: The file-like object to write the data to. 1581 """ 1582 1583 response = self.allspice_client.requests.get( 1584 self.browser_download_url, 1585 headers=self.allspice_client.headers, 1586 stream=True, 1587 ) 1588 # 4kb chunks 1589 for chunk in response.iter_content(chunk_size=4096): 1590 if chunk: 1591 io.write(chunk) 1592 1593 1594class Comment(ApiObject): 1595 assets: List[Union[Any, Dict[str, Union[int, str]]]] 1596 body: str 1597 created_at: datetime 1598 html_url: str 1599 id: int 1600 issue_url: str 1601 original_author: str 1602 original_author_id: int 1603 pull_request_url: str 1604 updated_at: datetime 1605 user: User 1606 1607 API_OBJECT = """/repos/{owner}/{repo}/issues/comments/{id}""" 1608 GET_ATTACHMENTS_PATH = """/repos/{owner}/{repo}/issues/comments/{id}/assets""" 1609 ATTACHMENT_PATH = """/repos/{owner}/{repo}/issues/comments/{id}/assets/{attachment_id}""" 1610 1611 def __init__(self, allspice_client): 1612 super().__init__(allspice_client) 1613 1614 def __eq__(self, other): 1615 if not isinstance(other, Comment): 1616 return False 1617 return self.repository == other.repository and self.id == other.id 1618 1619 def __hash__(self): 1620 return hash(self.repository) ^ hash(self.id) 1621 1622 @classmethod 1623 def request(cls, allspice_client, owner: str, repo: str, id: str) -> "Comment": 1624 return cls._request(allspice_client, {"owner": owner, "repo": repo, "id": id}) 1625 1626 _fields_to_parsers: ClassVar[dict] = { 1627 "user": lambda allspice_client, r: User.parse_response(allspice_client, r), 1628 "created_at": lambda _, t: Util.convert_time(t), 1629 "updated_at": lambda _, t: Util.convert_time(t), 1630 } 1631 1632 _patchable_fields: ClassVar[set[str]] = {"body"} 1633 1634 @property 1635 def parent_url(self) -> str: 1636 """URL of the parent of this comment (the issue or the pull request)""" 1637 1638 if self.issue_url is not None and self.issue_url != "": 1639 return self.issue_url 1640 else: 1641 return self.pull_request_url 1642 1643 @cached_property 1644 def repository(self) -> Repository: 1645 """The repository this comment was posted on.""" 1646 1647 owner_name, repo_name = self.parent_url.split("/")[-4:-2] 1648 return Repository.request(self.allspice_client, owner_name, repo_name) 1649 1650 def __fields_for_path(self): 1651 return { 1652 "owner": self.repository.owner.username, 1653 "repo": self.repository.name, 1654 "id": self.id, 1655 } 1656 1657 def commit(self): 1658 self._commit(self.__fields_for_path()) 1659 1660 def delete(self): 1661 self.allspice_client.requests_delete(self.API_OBJECT.format(**self.__fields_for_path())) 1662 self.deleted = True 1663 1664 def get_attachments(self) -> List[Attachment]: 1665 """ 1666 Get all attachments on this comment. This returns Attachment objects, which 1667 contain a link to download the attachment. 1668 1669 https://hub.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments 1670 """ 1671 1672 results = self.allspice_client.requests_get( 1673 self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()) 1674 ) 1675 return [Attachment.parse_response(self.allspice_client, result) for result in results] 1676 1677 def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment: 1678 """ 1679 Create an attachment on this comment. 1680 1681 https://hub.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment 1682 1683 :param file: The file to attach. This should be a file-like object. 1684 :param name: The name of the file. If not provided, the name of the file will be 1685 used. 1686 :return: The created attachment. 1687 """ 1688 1689 args: dict[str, Any] = { 1690 "files": {"attachment": file}, 1691 } 1692 if name is not None: 1693 args["params"] = {"name": name} 1694 1695 result = self.allspice_client.requests_post( 1696 self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()), 1697 **args, 1698 ) 1699 return Attachment.parse_response(self.allspice_client, result) 1700 1701 def edit_attachment(self, attachment: Attachment, data: dict) -> Attachment: 1702 """ 1703 Edit an attachment. 1704 1705 The list of params that can be edited is available at 1706 https://hub.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment 1707 1708 :param attachment: The attachment to be edited 1709 :param data: The data parameter should be a dictionary of the fields to edit. 1710 :return: The edited attachment 1711 """ 1712 1713 args = { 1714 **self.__fields_for_path(), 1715 "attachment_id": attachment.id, 1716 } 1717 result = self.allspice_client.requests_patch( 1718 self.ATTACHMENT_PATH.format(**args), 1719 data=data, 1720 ) 1721 return Attachment.parse_response(self.allspice_client, result) 1722 1723 def delete_attachment(self, attachment: Attachment): 1724 """https://hub.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment""" 1725 1726 args = { 1727 **self.__fields_for_path(), 1728 "attachment_id": attachment.id, 1729 } 1730 self.allspice_client.requests_delete(self.ATTACHMENT_PATH.format(**args)) 1731 attachment.deleted = True 1732 1733 1734class Commit(ReadonlyApiObject): 1735 author: User 1736 commit: Dict[str, Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]] 1737 committer: Dict[str, Union[int, str, bool]] 1738 created: str 1739 files: List[Dict[str, str]] 1740 html_url: str 1741 inner_commit: Dict[str, Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]] 1742 parents: List[Union[Dict[str, str], Any]] 1743 sha: str 1744 stats: Dict[str, int] 1745 url: str 1746 1747 API_OBJECT = """/repos/{owner}/{repo}/commits/{sha}""" 1748 COMMIT_GET_STATUS = """/repos/{owner}/{repo}/commits/{sha}/status""" 1749 COMMIT_GET_STATUSES = """/repos/{owner}/{repo}/commits/{sha}/statuses""" 1750 1751 # Regex to extract owner and repo names from the url property 1752 URL_REGEXP = re.compile(r"/repos/([^/]+)/([^/]+)/git/commits") 1753 1754 def __init__(self, allspice_client): 1755 super().__init__(allspice_client) 1756 1757 _fields_to_parsers: ClassVar[dict] = { 1758 # NOTE: api may return None for commiters that are no allspice users 1759 "author": lambda allspice_client, u: ( 1760 User.parse_response(allspice_client, u) if u else None 1761 ) 1762 } 1763 1764 def __eq__(self, other): 1765 if not isinstance(other, Commit): 1766 return False 1767 return self.sha == other.sha 1768 1769 def __hash__(self): 1770 return hash(self.sha) 1771 1772 @classmethod 1773 def parse_response(cls, allspice_client, result) -> "Commit": 1774 commit_cache = result["commit"] 1775 api_object = cls(allspice_client) 1776 cls._initialize(allspice_client, api_object, result) 1777 # inner_commit for legacy reasons 1778 Commit._add_read_property("inner_commit", commit_cache, api_object) 1779 return api_object 1780 1781 def get_status(self) -> CommitCombinedStatus: 1782 """ 1783 Get a combined status consisting of all statues on this commit. 1784 1785 Note that the returned object is a CommitCombinedStatus object, which 1786 also contains a list of all statuses on the commit. 1787 1788 https://hub.allspice.io/api/swagger#/repository/repoGetCommitStatus 1789 """ 1790 1791 result = self.allspice_client.requests_get( 1792 self.COMMIT_GET_STATUS.format(**self._fields_for_path) 1793 ) 1794 return CommitCombinedStatus.parse_response(self.allspice_client, result) 1795 1796 def get_statuses(self) -> List[CommitStatus]: 1797 """ 1798 Get a list of all statuses on this commit. 1799 1800 https://hub.allspice.io/api/swagger#/repository/repoListCommitStatuses 1801 """ 1802 1803 results = self.allspice_client.requests_get( 1804 self.COMMIT_GET_STATUSES.format(**self._fields_for_path) 1805 ) 1806 return [CommitStatus.parse_response(self.allspice_client, result) for result in results] 1807 1808 @cached_property 1809 def _fields_for_path(self) -> dict[str, str]: 1810 matches = self.URL_REGEXP.search(self.url) 1811 if not matches: 1812 raise ValueError(f"Invalid commit URL: {self.url}") 1813 1814 return { 1815 "owner": matches.group(1), 1816 "repo": matches.group(2), 1817 "sha": self.sha, 1818 } 1819 1820 1821class CommitStatusState(Enum): 1822 PENDING = "pending" 1823 SUCCESS = "success" 1824 ERROR = "error" 1825 FAILURE = "failure" 1826 WARNING = "warning" 1827 1828 @classmethod 1829 def try_init(cls, value: str) -> CommitStatusState | str: 1830 """ 1831 Try converting a string to the enum, and if that fails, return the 1832 string itself. 1833 """ 1834 1835 try: 1836 return cls(value) 1837 except ValueError: 1838 return value 1839 1840 1841class CommitStatus(ReadonlyApiObject): 1842 context: str 1843 created_at: str 1844 creator: User 1845 description: str 1846 id: int 1847 status: CommitStatusState 1848 target_url: str 1849 updated_at: str 1850 url: str 1851 1852 def __init__(self, allspice_client): 1853 super().__init__(allspice_client) 1854 1855 _fields_to_parsers: ClassVar[dict] = { 1856 # Gitea/ASH doesn't actually validate that the status is a "valid" 1857 # status, so we can expect empty or unknown strings in the status field. 1858 "status": lambda _, s: CommitStatusState.try_init(s), 1859 "creator": lambda allspice_client, u: ( 1860 User.parse_response(allspice_client, u) if u else None 1861 ), 1862 } 1863 1864 def __eq__(self, other): 1865 if not isinstance(other, CommitStatus): 1866 return False 1867 return self.id == other.id 1868 1869 def __hash__(self): 1870 return hash(self.id) 1871 1872 1873class CommitCombinedStatus(ReadonlyApiObject): 1874 commit_url: str 1875 repository: Repository 1876 sha: str 1877 state: CommitStatusState 1878 statuses: List["CommitStatus"] 1879 total_count: int 1880 url: str 1881 1882 def __init__(self, allspice_client): 1883 super().__init__(allspice_client) 1884 1885 _fields_to_parsers: ClassVar[dict] = { 1886 # See CommitStatus 1887 "state": lambda _, s: CommitStatusState.try_init(s), 1888 "statuses": lambda allspice_client, statuses: [ 1889 CommitStatus.parse_response(allspice_client, status) for status in statuses 1890 ], 1891 "repository": lambda allspice_client, r: Repository.parse_response(allspice_client, r), 1892 } 1893 1894 def __eq__(self, other): 1895 if not isinstance(other, CommitCombinedStatus): 1896 return False 1897 return self.sha == other.sha 1898 1899 def __hash__(self): 1900 return hash(self.sha) 1901 1902 @classmethod 1903 def parse_response(cls, allspice_client, result) -> "CommitCombinedStatus": 1904 api_object = cls(allspice_client) 1905 cls._initialize(allspice_client, api_object, result) 1906 return api_object 1907 1908 1909class Issue(ApiObject): 1910 """ 1911 An issue on a repository. 1912 1913 Note: `Issue.assets` may not have any entries even if the issue has 1914 attachments. This happens when an issue is fetched via a bulk method like 1915 `Repository.get_issues`. In most cases, prefer using 1916 `Issue.get_attachments` to get the attachments on an issue. 1917 """ 1918 1919 assets: List[Union[Any, "Attachment"]] 1920 assignee: Any 1921 assignees: Any 1922 body: str 1923 closed_at: Any 1924 comments: int 1925 created_at: str 1926 due_date: Any 1927 html_url: str 1928 id: int 1929 is_locked: bool 1930 labels: List[Any] 1931 milestone: Optional["Milestone"] 1932 number: int 1933 original_author: str 1934 original_author_id: int 1935 pin_order: int 1936 pull_request: Any 1937 ref: str 1938 repository: Dict[str, Union[int, str]] 1939 state: str 1940 title: str 1941 updated_at: str 1942 url: str 1943 user: User 1944 1945 API_OBJECT = """/repos/{owner}/{repo}/issues/{index}""" # <owner, repo, index> 1946 GET_TIME = """/repos/%s/%s/issues/%s/times""" # <owner, repo, index> 1947 GET_COMMENTS = """/repos/{owner}/{repo}/issues/{index}/comments""" 1948 CREATE_ISSUE = """/repos/{owner}/{repo}/issues""" 1949 GET_ATTACHMENTS = """/repos/{owner}/{repo}/issues/{index}/assets""" 1950 1951 OPENED = "open" 1952 CLOSED = "closed" 1953 1954 def __init__(self, allspice_client): 1955 super().__init__(allspice_client) 1956 1957 def __eq__(self, other): 1958 if not isinstance(other, Issue): 1959 return False 1960 return self.repository == other.repository and self.id == other.id 1961 1962 def __hash__(self): 1963 return hash(self.repository) ^ hash(self.id) 1964 1965 _fields_to_parsers: ClassVar[dict] = { 1966 "milestone": lambda allspice_client, m: Milestone.parse_response(allspice_client, m), 1967 "user": lambda allspice_client, u: User.parse_response(allspice_client, u), 1968 "assets": lambda allspice_client, assets: [ 1969 Attachment.parse_response(allspice_client, a) for a in assets 1970 ], 1971 "assignee": lambda allspice_client, u: User.parse_response(allspice_client, u), 1972 "assignees": lambda allspice_client, us: [ 1973 User.parse_response(allspice_client, u) for u in us 1974 ], 1975 "state": lambda _, s: (Issue.CLOSED if s == "closed" else Issue.OPENED), 1976 } 1977 1978 _parsers_to_fields: ClassVar[dict] = { 1979 "milestone": lambda m: m.id, 1980 } 1981 1982 _patchable_fields: ClassVar[set[str]] = { 1983 "assignee", 1984 "assignees", 1985 "body", 1986 "due_date", 1987 "milestone", 1988 "state", 1989 "title", 1990 } 1991 1992 def commit(self): 1993 args = { 1994 "owner": self.repository.owner.username, 1995 "repo": self.repository.name, 1996 "index": self.number, 1997 } 1998 self._commit(args) 1999 2000 @classmethod 2001 def request(cls, allspice_client, owner: str, repo: str, number: str): 2002 api_object = cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number}) 2003 # The repository in the response is a RepositoryMeta object, so request 2004 # the full repository object and add it to the issue object. 2005 repository = Repository.request(allspice_client, owner, repo) 2006 setattr(api_object, "_repository", repository) 2007 # For legacy reasons 2008 cls._add_read_property("repo", repository, api_object) 2009 return api_object 2010 2011 @classmethod 2012 def create_issue(cls, allspice_client, repo: Repository, title: str, body: str = ""): 2013 args = {"owner": repo.owner.username, "repo": repo.name} 2014 data = {"title": title, "body": body} 2015 result = allspice_client.requests_post(Issue.CREATE_ISSUE.format(**args), data=data) 2016 issue = Issue.parse_response(allspice_client, result) 2017 setattr(issue, "_repository", repo) 2018 cls._add_read_property("repo", repo, issue) 2019 return issue 2020 2021 @property 2022 def owner(self) -> Organization | User: 2023 return self.repository.owner 2024 2025 def get_time_sum(self, user: User) -> int: 2026 results = self.allspice_client.requests_get( 2027 Issue.GET_TIME % (self.owner.username, self.repository.name, self.number) 2028 ) 2029 return sum(result["time"] for result in results if result and result["user_id"] == user.id) 2030 2031 def get_times(self) -> Optional[Dict]: 2032 return self.allspice_client.requests_get( 2033 Issue.GET_TIME % (self.owner.username, self.repository.name, self.number) 2034 ) 2035 2036 def delete_time(self, time_id: str): 2037 path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times/{time_id}" 2038 self.allspice_client.requests_delete(path) 2039 2040 def add_time(self, time: int, created: Optional[str] = None, user_name: Optional[User] = None): 2041 path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times" 2042 self.allspice_client.requests_post( 2043 path, data={"created": created, "time": int(time), "user_name": user_name} 2044 ) 2045 2046 def get_comments(self) -> List[Comment]: 2047 """https://hub.allspice.io/api/swagger#/issue/issueGetComments""" 2048 2049 results = self.allspice_client.requests_get( 2050 self.GET_COMMENTS.format( 2051 owner=self.owner.username, repo=self.repository.name, index=self.number 2052 ) 2053 ) 2054 2055 return [Comment.parse_response(self.allspice_client, result) for result in results] 2056 2057 def create_comment(self, body: str) -> Comment: 2058 """https://hub.allspice.io/api/swagger#/issue/issueCreateComment""" 2059 2060 path = self.GET_COMMENTS.format( 2061 owner=self.owner.username, repo=self.repository.name, index=self.number 2062 ) 2063 2064 response = self.allspice_client.requests_post(path, data={"body": body}) 2065 return Comment.parse_response(self.allspice_client, response) 2066 2067 def get_attachments(self) -> List[Attachment]: 2068 """ 2069 Fetch all attachments on this issue. 2070 2071 Unlike the assets field, this will always fetch all attachments from the 2072 API. 2073 2074 See https://hub.allspice.io/api/swagger#/issue/issueListIssueAttachments 2075 """ 2076 2077 path = self.GET_ATTACHMENTS.format( 2078 owner=self.owner.username, repo=self.repository.name, index=self.number 2079 ) 2080 response = self.allspice_client.requests_get(path) 2081 2082 return [Attachment.parse_response(self.allspice_client, result) for result in response] 2083 2084 def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment: 2085 """ 2086 Create an attachment on this issue. 2087 2088 https://hub.allspice.io/api/swagger#/issue/issueCreateIssueAttachment 2089 2090 :param file: The file to attach. This should be a file-like object. 2091 :param name: The name of the file. If not provided, the name of the file will be 2092 used. 2093 :return: The created attachment. 2094 """ 2095 2096 args: dict[str, Any] = { 2097 "files": {"attachment": file}, 2098 } 2099 if name is not None: 2100 args["params"] = {"name": name} 2101 2102 result = self.allspice_client.requests_post( 2103 self.GET_ATTACHMENTS.format( 2104 owner=self.owner.username, repo=self.repository.name, index=self.number 2105 ), 2106 **args, 2107 ) 2108 2109 return Attachment.parse_response(self.allspice_client, result) 2110 2111 2112class DesignReviewReviewComment(ApiObject): 2113 """ 2114 A comment on a Design Review Review. 2115 """ 2116 2117 body: str 2118 commit_id: str 2119 created_at: str 2120 diff_hunk: str 2121 html_url: str 2122 id: int 2123 original_commit_id: str 2124 original_position: int 2125 path: str 2126 position: int 2127 pull_request_review_id: int 2128 pull_request_url: str 2129 resolver: Any 2130 sub_path: str 2131 updated_at: str 2132 user: User 2133 2134 def __init__(self, allspice_client): 2135 super().__init__(allspice_client) 2136 2137 _fields_to_parsers: ClassVar[dict] = { 2138 "resolver": lambda allspice_client, r: User.parse_response(allspice_client, r), 2139 "user": lambda allspice_client, u: User.parse_response(allspice_client, u), 2140 } 2141 2142 2143class DesignReviewReview(ReadonlyApiObject): 2144 """ 2145 A review on a Design Review. 2146 """ 2147 2148 body: str 2149 comments_count: int 2150 commit_id: str 2151 dismissed: bool 2152 html_url: str 2153 id: int 2154 official: bool 2155 pull_request_url: str 2156 stale: bool 2157 state: ReviewEvent 2158 submitted_at: str 2159 team: Any 2160 updated_at: str 2161 user: User 2162 2163 API_OBJECT = "/repos/{owner}/{repo}/pulls/{index}/reviews/{id}" 2164 GET_COMMENTS = "/repos/{owner}/{repo}/pulls/{index}/reviews/{id}/comments" 2165 2166 class ReviewEvent(Enum): 2167 APPROVED = "APPROVED" 2168 PENDING = "PENDING" 2169 COMMENT = "COMMENT" 2170 REQUEST_CHANGES = "REQUEST_CHANGES" 2171 REQUEST_REVIEW = "REQUEST_REVIEW" 2172 UNKNOWN = "" 2173 2174 @dataclass 2175 class ReviewComment: 2176 """ 2177 Data required to create a review comment on a design review. 2178 2179 :param body: The body of the comment. 2180 :param path: The path of the file to comment on. If you have a 2181 `Content` object, get the path using the `path` property. 2182 :param sub_path: The sub-path of the file to comment on. This is 2183 usually the page ID of the page in the multi-page document. 2184 :param new_position: The line number of the source code file after the 2185 change to add this comment on. Optional, leave unset if this is an ECAD 2186 file or the comment must be on the entire file. 2187 :param old_position: The line number of the source code file before the 2188 change to add this comment on. Optional, leave unset if this is an ECAD 2189 file or the comment must be on the entire file. 2190 """ 2191 2192 body: str 2193 path: str 2194 sub_path: Optional[str] = None 2195 new_position: Optional[int] = None 2196 old_position: Optional[int] = None 2197 2198 def __init__(self, allspice_client): 2199 super().__init__(allspice_client) 2200 2201 _fields_to_parsers: ClassVar[dict] = { 2202 "user": lambda allspice_client, u: User.parse_response(allspice_client, u), 2203 "state": lambda _, s: DesignReviewReview.ReviewEvent(s), 2204 } 2205 2206 def _get_dr_properties(self) -> dict[str, str]: 2207 """ 2208 Get the owner, repo name and design review number from the URL of this 2209 review's DR. 2210 """ 2211 2212 parts = self.pull_request_url.strip("/").split("/") 2213 2214 try: 2215 index = parts[-1] 2216 assert parts[-2] == "pulls" or parts[-2] == "pull", ( 2217 "Expected the second last part of the URL to be 'pulls' or 'pull', " 2218 ) 2219 repo = parts[-3] 2220 owner = parts[-4] 2221 2222 return { 2223 "owner": owner, 2224 "repo": repo, 2225 "index": index, 2226 } 2227 except IndexError: 2228 raise ValueError("Malformed design review URL: {}".format(self.pull_request_url)) 2229 2230 @cached_property 2231 def owner_name(self) -> str: 2232 """ 2233 The owner of the repository this review is on. 2234 """ 2235 2236 return self._get_dr_properties()["owner"] 2237 2238 @cached_property 2239 def repository_name(self) -> str: 2240 """ 2241 The name of the repository this review is on. 2242 """ 2243 2244 return self._get_dr_properties()["repo"] 2245 2246 @cached_property 2247 def index(self) -> str: 2248 """ 2249 The index of the design review this review is on. 2250 """ 2251 2252 return self._get_dr_properties()["index"] 2253 2254 def delete(self): 2255 """ 2256 Delete this review. 2257 """ 2258 2259 self.allspice_client.requests_delete( 2260 self.API_OBJECT.format(**self._get_dr_properties(), id=self.id) 2261 ) 2262 self.deleted = True 2263 2264 def get_comments(self) -> List[DesignReviewReviewComment]: 2265 """ 2266 Get the comments on this review. 2267 """ 2268 2269 result = self.allspice_client.requests_get( 2270 self.GET_COMMENTS.format(**self._get_dr_properties(), id=self.id) 2271 ) 2272 2273 return [ 2274 DesignReviewReviewComment.parse_response(self.allspice_client, comment) 2275 for comment in result 2276 ] 2277 2278 2279class DesignReview(ApiObject): 2280 """ 2281 A Design Review. See 2282 https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest. 2283 2284 Note: The base and head fields are not `Branch` objects - they are plain strings 2285 referring to the branch names. This is because DRs can exist for branches that have 2286 been deleted, which don't have an associated `Branch` object from the API. You can use 2287 the `Repository.get_branch` method to get a `Branch` object for a branch if you know 2288 it exists. 2289 """ 2290 2291 additions: Optional[int] 2292 allow_maintainer_edit: bool 2293 allow_maintainer_edits: Any 2294 assignee: User 2295 assignees: List["User"] 2296 base: str 2297 body: str 2298 changed_files: Optional[int] 2299 closed_at: Optional[str] 2300 comments: int 2301 created_at: str 2302 deletions: Optional[int] 2303 diff_url: str 2304 draft: bool 2305 due_date: Optional[str] 2306 head: str 2307 html_url: str 2308 id: int 2309 is_locked: bool 2310 labels: List[Any] 2311 merge_base: str 2312 merge_commit_sha: Optional[str] 2313 mergeable: bool 2314 merged: bool 2315 merged_at: Optional[str] 2316 merged_by: Any 2317 milestone: Any 2318 number: int 2319 patch_url: str 2320 pin_order: int 2321 repository: Optional["Repository"] 2322 requested_reviewers: Any 2323 requested_reviewers_teams: Any 2324 review_comments: int 2325 state: str 2326 title: str 2327 updated_at: str 2328 url: str 2329 user: User 2330 2331 API_OBJECT = "/repos/{owner}/{repo}/pulls/{index}" 2332 MERGE_DESIGN_REVIEW = "/repos/{owner}/{repo}/pulls/{index}/merge" 2333 GET_REVIEWS = "/repos/{owner}/{repo}/pulls/{index}/reviews" 2334 GET_REVIEW = "/repos/{owner}/{repo}/pulls/{index}/reviews/{review_id}" 2335 GET_COMMENTS = "/repos/{owner}/{repo}/issues/{index}/comments" 2336 2337 OPEN = "open" 2338 CLOSED = "closed" 2339 2340 class MergeType(Enum): 2341 MERGE = "merge" 2342 REBASE = "rebase" 2343 REBASE_MERGE = "rebase-merge" 2344 SQUASH = "squash" 2345 MANUALLY_MERGED = "manually-merged" 2346 2347 def __init__(self, allspice_client): 2348 super().__init__(allspice_client) 2349 2350 def __eq__(self, other): 2351 if not isinstance(other, DesignReview): 2352 return False 2353 return self.repository == other.repository and self.id == other.id 2354 2355 def __hash__(self): 2356 return hash(self.repository) ^ hash(self.id) 2357 2358 @classmethod 2359 def parse_response(cls, allspice_client, result) -> "DesignReview": 2360 api_object = super().parse_response(allspice_client, result) 2361 cls._add_read_property( 2362 "repository", 2363 Repository.parse_response(allspice_client, result["base"]["repo"]), 2364 api_object, 2365 ) 2366 2367 return api_object 2368 2369 @classmethod 2370 def request(cls, allspice_client, owner: str, repo: str, number: str): 2371 """See https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest""" 2372 return cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number}) 2373 2374 _fields_to_parsers: ClassVar[dict] = { 2375 "assignee": lambda allspice_client, u: User.parse_response(allspice_client, u), 2376 "assignees": lambda allspice_client, us: [ 2377 User.parse_response(allspice_client, u) for u in us 2378 ], 2379 "base": lambda _, b: b["ref"], 2380 "head": lambda _, h: h["ref"], 2381 "merged_by": lambda allspice_client, u: User.parse_response(allspice_client, u), 2382 "milestone": lambda allspice_client, m: Milestone.parse_response(allspice_client, m), 2383 "user": lambda allspice_client, u: User.parse_response(allspice_client, u), 2384 } 2385 2386 _patchable_fields: ClassVar[set[str]] = { 2387 "allow_maintainer_edits", 2388 "assignee", 2389 "assignees", 2390 "base", 2391 "body", 2392 "due_date", 2393 "milestone", 2394 "state", 2395 "title", 2396 } 2397 2398 _parsers_to_fields: ClassVar[dict] = { 2399 "assignee": lambda u: u.username, 2400 "assignees": lambda us: [u.username for u in us], 2401 "base": lambda b: b.name if isinstance(b, Branch) else b, 2402 "milestone": lambda m: m.id, 2403 } 2404 2405 def commit(self): 2406 data = self.get_dirty_fields() 2407 if "due_date" in data and data["due_date"] is None: 2408 data["unset_due_date"] = True 2409 2410 args = { 2411 "owner": self.repository.owner.username, 2412 "repo": self.repository.name, 2413 "index": self.number, 2414 } 2415 self._commit(args, data) 2416 2417 def merge(self, merge_type: MergeType): 2418 """ 2419 Merge the pull request. See 2420 https://hub.allspice.io/api/swagger#/repository/repoMergePullRequest 2421 2422 :param merge_type: The type of merge to perform. See the MergeType enum. 2423 """ 2424 2425 self.allspice_client.requests_post( 2426 self.MERGE_DESIGN_REVIEW.format( 2427 owner=self.repository.owner.username, 2428 repo=self.repository.name, 2429 index=self.number, 2430 ), 2431 data={"Do": merge_type.value}, 2432 ) 2433 2434 def get_comments(self) -> List[Comment]: 2435 """ 2436 Get the comments on this pull request, but not specifically on a review. 2437 2438 https://hub.allspice.io/api/swagger#/issue/issueGetComments 2439 2440 :return: A list of comments on this pull request. 2441 """ 2442 2443 results = self.allspice_client.requests_get( 2444 self.GET_COMMENTS.format( 2445 owner=self.repository.owner.username, 2446 repo=self.repository.name, 2447 index=self.number, 2448 ) 2449 ) 2450 return [Comment.parse_response(self.allspice_client, result) for result in results] 2451 2452 def create_comment(self, body: str): 2453 """ 2454 Create a comment on this pull request. This uses the same endpoint as the 2455 comments on issues, and will not be associated with any reviews. 2456 2457 https://hub.allspice.io/api/swagger#/issue/issueCreateComment 2458 2459 :param body: The body of the comment. 2460 :return: The comment that was created. 2461 """ 2462 2463 result = self.allspice_client.requests_post( 2464 self.GET_COMMENTS.format( 2465 owner=self.repository.owner.username, 2466 repo=self.repository.name, 2467 index=self.number, 2468 ), 2469 data={"body": body}, 2470 ) 2471 return Comment.parse_response(self.allspice_client, result) 2472 2473 def create_review( 2474 self, 2475 *, 2476 body: Optional[str] = None, 2477 event: Optional[DesignReviewReview.ReviewEvent] = None, 2478 comments: Optional[List[DesignReviewReview.ReviewComment]] = None, 2479 commit_id: Optional[str] = None, 2480 ) -> DesignReviewReview: 2481 """ 2482 Create a review on this design review. 2483 2484 https://hub.allspice.io/api/swagger#/repository/repoCreatePullRequestReview 2485 2486 Note: in most cases, you should not set the body or event when creating 2487 a review. The event is automatically set to "PENDING" when the review 2488 is created. You should then use `submit_review` to submit the review 2489 with the desired event and body. 2490 2491 :param body: The body of the review. This is the top-level comment on 2492 the review. If not provided, the review will be created with no body. 2493 :param event: The event of the review. This is the overall status of the 2494 review. See the ReviewEvent enum. If not provided, the API will 2495 default to "PENDING". 2496 :param comments: A list of comments on the review. Each comment should 2497 be a ReviewComment object. If not provided, only the base comment 2498 will be created. 2499 :param commit_id: The commit SHA to associate with the review. This is 2500 optional. 2501 """ 2502 2503 data: dict[str, Any] = {} 2504 2505 if body is not None: 2506 data["body"] = body 2507 if event is not None: 2508 data["event"] = event.value 2509 if commit_id is not None: 2510 data["commit_id"] = commit_id 2511 if comments is not None: 2512 data["comments"] = [asdict(comment) for comment in comments] 2513 2514 result = self.allspice_client.requests_post( 2515 self.GET_REVIEWS.format( 2516 owner=self.repository.owner.username, 2517 repo=self.repository.name, 2518 index=self.number, 2519 ), 2520 data=data, 2521 ) 2522 2523 return DesignReviewReview.parse_response(self.allspice_client, result) 2524 2525 def get_reviews(self) -> List[DesignReviewReview]: 2526 """ 2527 Get all reviews on this design review. 2528 2529 https://hub.allspice.io/api/swagger#/repository/repoListPullReviews 2530 """ 2531 2532 results = self.allspice_client.requests_get( 2533 self.GET_REVIEWS.format( 2534 owner=self.repository.owner.username, 2535 repo=self.repository.name, 2536 index=self.number, 2537 ) 2538 ) 2539 2540 return [ 2541 DesignReviewReview.parse_response(self.allspice_client, result) for result in results 2542 ] 2543 2544 def submit_review( 2545 self, 2546 review_id: int, 2547 event: DesignReviewReview.ReviewEvent, 2548 *, 2549 body: Optional[str] = None, 2550 ): 2551 """ 2552 Submit a review on this design review. 2553 2554 https://hub.allspice.io/api/swagger#/repository/repoSubmitPullReview 2555 2556 :param review_id: The ID of the review to submit. 2557 :param event: The event to submit the review with. See the ReviewEvent 2558 enum for the possible values. 2559 :param body: Optional body text for the review submission. 2560 """ 2561 2562 data = { 2563 "event": event.value, 2564 } 2565 if body is not None: 2566 data["body"] = body 2567 2568 result = self.allspice_client.requests_post( 2569 self.GET_REVIEW.format( 2570 owner=self.repository.owner.username, 2571 repo=self.repository.name, 2572 index=self.number, 2573 review_id=review_id, 2574 ), 2575 data=data, 2576 ) 2577 2578 return result 2579 2580 2581class Team(ApiObject): 2582 can_create_org_repo: bool 2583 description: str 2584 id: int 2585 includes_all_repositories: bool 2586 name: str 2587 organization: Optional["Organization"] 2588 permission: str 2589 units: List[str] 2590 units_map: Dict[str, str] 2591 2592 API_OBJECT = """/teams/{id}""" # <id> 2593 ADD_REPO = """/teams/%s/repos/%s/%s""" # <id, org, repo> 2594 TEAM_DELETE = """/teams/%s""" # <id> 2595 GET_MEMBERS = """/teams/%s/members""" # <id> 2596 GET_REPOS = """/teams/%s/repos""" # <id> 2597 2598 def __init__(self, allspice_client): 2599 super().__init__(allspice_client) 2600 2601 def __eq__(self, other): 2602 if not isinstance(other, Team): 2603 return False 2604 return self.organization == other.organization and self.id == other.id 2605 2606 def __hash__(self): 2607 return hash(self.organization) ^ hash(self.id) 2608 2609 _fields_to_parsers: ClassVar[dict] = { 2610 "organization": lambda allspice_client, o: Organization.parse_response(allspice_client, o) 2611 } 2612 2613 _patchable_fields: ClassVar[set[str]] = { 2614 "can_create_org_repo", 2615 "description", 2616 "includes_all_repositories", 2617 "name", 2618 "permission", 2619 "units", 2620 "units_map", 2621 } 2622 2623 @classmethod 2624 def request(cls, allspice_client, id: int): 2625 return cls._request(allspice_client, {"id": id}) 2626 2627 def commit(self): 2628 args = {"id": self.id} 2629 self._commit(args) 2630 2631 def add_user(self, user: User): 2632 """https://hub.allspice.io/api/swagger#/organization/orgAddTeamMember""" 2633 url = f"/teams/{self.id}/members/{user.login}" 2634 self.allspice_client.requests_put(url) 2635 2636 def add_repo(self, org: Organization, repo: Union[Repository, str]): 2637 if isinstance(repo, Repository): 2638 repo_name = repo.name 2639 else: 2640 repo_name = repo 2641 self.allspice_client.requests_put(Team.ADD_REPO % (self.id, org.username, repo_name)) 2642 2643 def get_members(self): 2644 """Get all users assigned to the team.""" 2645 results = self.allspice_client.requests_get_paginated( 2646 Team.GET_MEMBERS % self.id, 2647 ) 2648 return [User.parse_response(self.allspice_client, result) for result in results] 2649 2650 def get_repos(self): 2651 """Get all repos of this Team.""" 2652 results = self.allspice_client.requests_get(Team.GET_REPOS % self.id) 2653 return [Repository.parse_response(self.allspice_client, result) for result in results] 2654 2655 def delete(self): 2656 self.allspice_client.requests_delete(Team.TEAM_DELETE % self.id) 2657 self.deleted = True 2658 2659 def remove_team_member(self, user_name: str): 2660 url = f"/teams/{self.id}/members/{user_name}" 2661 self.allspice_client.requests_delete(url) 2662 2663 2664class Release(ApiObject): 2665 """ 2666 A release on a repo. 2667 """ 2668 2669 assets: List[Union[Any, Dict[str, Union[int, str]], "ReleaseAsset"]] 2670 author: User 2671 body: str 2672 created_at: str 2673 draft: bool 2674 html_url: str 2675 id: int 2676 name: str 2677 prerelease: bool 2678 published_at: str 2679 repo: Optional["Repository"] 2680 repository: Optional["Repository"] 2681 tag_name: str 2682 tarball_url: str 2683 target_commitish: str 2684 upload_url: str 2685 url: str 2686 zipball_url: str 2687 2688 API_OBJECT = "/repos/{owner}/{repo}/releases/{id}" 2689 RELEASE_CREATE_ASSET = "/repos/{owner}/{repo}/releases/{id}/assets" 2690 # Note that we don't strictly need the get_assets route, as the release 2691 # object already contains the assets. 2692 2693 def __init__(self, allspice_client): 2694 super().__init__(allspice_client) 2695 2696 def __eq__(self, other): 2697 if not isinstance(other, Release): 2698 return False 2699 return self.repo == other.repo and self.id == other.id 2700 2701 def __hash__(self): 2702 return hash(self.repo) ^ hash(self.id) 2703 2704 _fields_to_parsers: ClassVar[dict] = { 2705 "author": lambda allspice_client, author: User.parse_response(allspice_client, author), 2706 } 2707 _patchable_fields: ClassVar[set[str]] = { 2708 "body", 2709 "draft", 2710 "name", 2711 "prerelease", 2712 "tag_name", 2713 "target_commitish", 2714 } 2715 2716 @classmethod 2717 def parse_response(cls, allspice_client, result, repo) -> Release: 2718 release = super().parse_response(allspice_client, result) 2719 Release._add_read_property("repository", repo, release) 2720 # For legacy reasons 2721 Release._add_read_property("repo", repo, release) 2722 setattr( 2723 release, 2724 "_assets", 2725 [ 2726 ReleaseAsset.parse_response(allspice_client, asset, release) 2727 for asset in result["assets"] 2728 ], 2729 ) 2730 return release 2731 2732 @classmethod 2733 def request( 2734 cls, 2735 allspice_client, 2736 owner: str, 2737 repo: str, 2738 id: Optional[int] = None, 2739 ) -> Release: 2740 args = {"owner": owner, "repo": repo, "id": id} 2741 release_response = cls._get_gitea_api_object(allspice_client, args) 2742 repository = Repository.request(allspice_client, owner, repo) 2743 release = cls.parse_response(allspice_client, release_response, repository) 2744 return release 2745 2746 def commit(self): 2747 if self.repo is None: 2748 raise ValueError("Cannot commit a release without a repository.") 2749 2750 args = {"owner": self.repo.owner.username, "repo": self.repo.name, "id": self.id} 2751 self._commit(args) 2752 2753 def create_asset(self, file: IO, name: Optional[str] = None) -> ReleaseAsset: 2754 """ 2755 Create an asset for this release. 2756 2757 https://hub.allspice.io/api/swagger#/repository/repoCreateReleaseAsset 2758 2759 :param file: The file to upload. This should be a file-like object. 2760 :param name: The name of the file. 2761 :return: The created asset. 2762 """ 2763 2764 if self.repo is None: 2765 raise ValueError("Cannot commit a release without a repository.") 2766 2767 args: dict[str, Any] = {"files": {"attachment": file}} 2768 if name is not None: 2769 args["params"] = {"name": name} 2770 2771 result = self.allspice_client.requests_post( 2772 self.RELEASE_CREATE_ASSET.format( 2773 owner=self.repo.owner.username, 2774 repo=self.repo.name, 2775 id=self.id, 2776 ), 2777 **args, 2778 ) 2779 return ReleaseAsset.parse_response(self.allspice_client, result, self) 2780 2781 def delete(self): 2782 if self.repo is None: 2783 raise ValueError("Cannot commit a release without a repository.") 2784 2785 args = {"owner": self.repo.owner.username, "repo": self.repo.name, "id": self.id} 2786 self.allspice_client.requests_delete(self.API_OBJECT.format(**args)) 2787 self.deleted = True 2788 2789 2790class ReleaseAsset(ApiObject): 2791 browser_download_url: str 2792 created_at: str 2793 download_count: int 2794 id: int 2795 name: str 2796 release: Optional["Release"] 2797 size: int 2798 uuid: str 2799 2800 API_OBJECT = "/repos/{owner}/{repo}/releases/{release_id}/assets/{id}" 2801 2802 def __init__(self, allspice_client): 2803 super().__init__(allspice_client) 2804 2805 def __eq__(self, other): 2806 if not isinstance(other, ReleaseAsset): 2807 return False 2808 return self.release == other.release and self.id == other.id 2809 2810 def __hash__(self): 2811 return hash(self.release) ^ hash(self.id) 2812 2813 _fields_to_parsers: ClassVar[dict] = {} 2814 _patchable_fields: ClassVar[set[str]] = { 2815 "name", 2816 } 2817 2818 @classmethod 2819 def parse_response(cls, allspice_client, result, release) -> ReleaseAsset: 2820 asset = super().parse_response(allspice_client, result) 2821 ReleaseAsset._add_read_property("release", release, asset) 2822 return asset 2823 2824 @classmethod 2825 def request( 2826 cls, 2827 allspice_client, 2828 owner: str, 2829 repo: str, 2830 release_id: int, 2831 id: int, 2832 ) -> ReleaseAsset: 2833 args = {"owner": owner, "repo": repo, "release_id": release_id, "id": id} 2834 asset_response = cls._get_gitea_api_object(allspice_client, args) 2835 release = Release.request(allspice_client, owner, repo, release_id) 2836 asset = cls.parse_response(allspice_client, asset_response, release) 2837 return asset 2838 2839 def commit(self): 2840 if self.release is None or self.release.repo is None: 2841 raise ValueError("Cannot commit a release asset without a release or a repository.") 2842 2843 args = { 2844 "owner": self.release.repo.owner.username, 2845 "repo": self.release.repo.name, 2846 "release_id": self.release.id, 2847 "id": self.id, 2848 } 2849 self._commit(args) 2850 2851 def download(self) -> bytes: 2852 """ 2853 Download the raw, binary data of this asset. 2854 2855 Note 1: if the file you are requesting is a text file, you might want to 2856 use .decode() on the result to get a string. For example: 2857 2858 asset.download().decode("utf-8") 2859 2860 Note 2: this method will store the entire file in memory. If you are 2861 downloading a large file, you might want to use download_to_file instead. 2862 """ 2863 2864 return self.allspice_client.requests.get( 2865 self.browser_download_url, 2866 headers=self.allspice_client.headers, 2867 ).content 2868 2869 def download_to_file(self, io: IO): 2870 """ 2871 Download the raw, binary data of this asset to a file-like object. 2872 2873 Example: 2874 2875 with open("my_file.zip", "wb") as f: 2876 asset.download_to_file(f) 2877 2878 :param io: The file-like object to write the data to. 2879 """ 2880 2881 response = self.allspice_client.requests.get( 2882 self.browser_download_url, 2883 headers=self.allspice_client.headers, 2884 stream=True, 2885 ) 2886 # 4kb chunks 2887 for chunk in response.iter_content(chunk_size=4096): 2888 if chunk: 2889 io.write(chunk) 2890 2891 def delete(self): 2892 if self.release is None or self.release.repo is None: 2893 raise ValueError("Cannot commit a release asset without a release or a repository.") 2894 2895 args = { 2896 "owner": self.release.repo.owner.username, 2897 "repo": self.release.repo.name, 2898 "release_id": self.release.id, 2899 "id": self.id, 2900 } 2901 self.allspice_client.requests_delete(self.API_OBJECT.format(**args)) 2902 self.deleted = True 2903 2904 2905class Content(ReadonlyApiObject): 2906 content: Any 2907 download_url: str 2908 encoding: Any 2909 git_url: str 2910 html_url: str 2911 last_commit_sha: str 2912 name: str 2913 path: str 2914 sha: str 2915 size: int 2916 submodule_git_url: Any 2917 target: Any 2918 type: str 2919 url: str 2920 2921 FILE = "file" 2922 2923 def __init__(self, allspice_client): 2924 super().__init__(allspice_client) 2925 2926 def __eq__(self, other): 2927 if not isinstance(other, Content): 2928 return False 2929 2930 return self.sha == other.sha and self.name == other.name 2931 2932 def __hash__(self): 2933 return hash(self.sha) ^ hash(self.name) 2934 2935 2936Ref = Union[Branch, Commit, str] 2937 2938 2939class Util: 2940 @staticmethod 2941 def convert_time(time: str) -> datetime: 2942 """Parsing of strange Gitea time format ("%Y-%m-%dT%H:%M:%S:%z" but with ":" in time zone notation)""" 2943 try: 2944 return datetime.strptime(time[:-3] + "00", "%Y-%m-%dT%H:%M:%S%z") 2945 except ValueError: 2946 return datetime.strptime(time[:-3] + "00", "%Y-%m-%dT%H:%M:%S") 2947 2948 @staticmethod 2949 def format_time(time: datetime) -> str: 2950 """ 2951 Format a datetime object to Gitea's time format. 2952 2953 :param time: The time to format 2954 :return: Formatted time 2955 """ 2956 2957 return time.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S") + "Z" 2958 2959 @staticmethod 2960 def data_params_for_ref(ref: Optional[Ref]) -> Dict: 2961 """ 2962 Given a "ref", returns a dict with the ref parameter for the API call. 2963 2964 If the ref is None, returns an empty dict. You can pass this to the API 2965 directly. 2966 """ 2967 2968 if isinstance(ref, Branch): 2969 return {"ref": ref.name} 2970 elif isinstance(ref, Commit): 2971 return {"ref": ref.sha} 2972 elif ref: 2973 return {"ref": ref} 2974 else: 2975 return {}
36class Organization(ApiObject): 37 """see https://hub.allspice.io/api/swagger#/organization/orgGetAll""" 38 39 active: Optional[bool] 40 avatar_url: str 41 created: Optional[str] 42 description: str 43 email: str 44 followers_count: Optional[int] 45 following_count: Optional[int] 46 full_name: str 47 html_url: Optional[str] 48 id: int 49 is_admin: Optional[bool] 50 language: Optional[str] 51 last_login: Optional[str] 52 location: str 53 login: Optional[str] 54 login_name: Optional[str] 55 name: Optional[str] 56 prohibit_login: Optional[bool] 57 repo_admin_change_team_access: Optional[bool] 58 restricted: Optional[bool] 59 source_id: Optional[int] 60 starred_repos_count: Optional[int] 61 username: str 62 visibility: str 63 website: str 64 65 API_OBJECT = """/orgs/{name}""" # <org> 66 ORG_REPOS_REQUEST = """/orgs/%s/repos""" # <org> 67 ORG_TEAMS_REQUEST = """/orgs/%s/teams""" # <org> 68 ORG_TEAMS_CREATE = """/orgs/%s/teams""" # <org> 69 ORG_GET_MEMBERS = """/orgs/%s/members""" # <org> 70 ORG_IS_MEMBER = """/orgs/%s/members/%s""" # <org>, <username> 71 ORG_HEATMAP = """/users/%s/heatmap""" # <username> 72 73 def __init__(self, allspice_client): 74 super().__init__(allspice_client) 75 76 def __eq__(self, other): 77 if not isinstance(other, Organization): 78 return False 79 return self.allspice_client == other.allspice_client and self.name == other.name 80 81 def __hash__(self): 82 return hash(self.allspice_client) ^ hash(self.name) 83 84 @classmethod 85 def request(cls, allspice_client, name: str) -> Self: 86 return cls._request(allspice_client, {"name": name}) 87 88 @classmethod 89 def parse_response(cls, allspice_client, result) -> "Organization": 90 api_object = super().parse_response(allspice_client, result) 91 # add "name" field to make this behave similar to users for gitea < 1.18 92 # also necessary for repository-owner when org is repo owner 93 if not hasattr(api_object, "name"): 94 Organization._add_read_property("name", result["username"], api_object) 95 return api_object 96 97 _patchable_fields: ClassVar[set[str]] = { 98 "description", 99 "full_name", 100 "location", 101 "visibility", 102 "website", 103 } 104 105 def commit(self): 106 args = {"name": self.name} 107 self._commit(args) 108 109 def create_repo( 110 self, 111 repoName: str, 112 description: str = "", 113 private: bool = False, 114 autoInit=True, 115 gitignores: Optional[str] = None, 116 license: Optional[str] = None, 117 readme: str = "Default", 118 issue_labels: Optional[str] = None, 119 default_branch="master", 120 ): 121 """Create an organization Repository 122 123 Throws: 124 AlreadyExistsException: If the Repository exists already. 125 Exception: If something else went wrong. 126 """ 127 result = self.allspice_client.requests_post( 128 f"/orgs/{self.name}/repos", 129 data={ 130 "name": repoName, 131 "description": description, 132 "private": private, 133 "auto_init": autoInit, 134 "gitignores": gitignores, 135 "license": license, 136 "issue_labels": issue_labels, 137 "readme": readme, 138 "default_branch": default_branch, 139 }, 140 ) 141 if "id" in result: 142 self.allspice_client.logger.info("Successfully created Repository %s " % result["name"]) 143 else: 144 self.allspice_client.logger.error(result["message"]) 145 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 146 return Repository.parse_response(self.allspice_client, result) 147 148 def get_repositories(self) -> List["Repository"]: 149 results = self.allspice_client.requests_get_paginated( 150 Organization.ORG_REPOS_REQUEST % self.username 151 ) 152 return [Repository.parse_response(self.allspice_client, result) for result in results] 153 154 def get_repository(self, name) -> "Repository": 155 repos = self.get_repositories() 156 for repo in repos: 157 if repo.name == name: 158 return repo 159 raise NotFoundException("Repository %s not existent in organization." % name) 160 161 def get_teams(self) -> List["Team"]: 162 results = self.allspice_client.requests_get(Organization.ORG_TEAMS_REQUEST % self.username) 163 teams = [Team.parse_response(self.allspice_client, result) for result in results] 164 # organisation seems to be missing using this request, so we add org manually 165 for t in teams: 166 setattr(t, "_organization", self) 167 return teams 168 169 def get_team(self, name) -> "Team": 170 teams = self.get_teams() 171 for team in teams: 172 if team.name == name: 173 return team 174 raise NotFoundException("Team not existent in organization.") 175 176 def create_team( 177 self, 178 name: str, 179 description: str = "", 180 permission: str = "read", 181 can_create_org_repo: bool = False, 182 includes_all_repositories: bool = False, 183 units=( 184 "repo.code", 185 "repo.issues", 186 "repo.ext_issues", 187 "repo.wiki", 188 "repo.pulls", 189 "repo.releases", 190 "repo.ext_wiki", 191 ), 192 units_map={}, 193 ) -> "Team": 194 """Alias for AllSpice#create_team""" 195 # TODO: Move AllSpice#create_team to Organization#create_team and 196 # deprecate AllSpice#create_team. 197 return self.allspice_client.create_team( 198 org=self, 199 name=name, 200 description=description, 201 permission=permission, 202 can_create_org_repo=can_create_org_repo, 203 includes_all_repositories=includes_all_repositories, 204 units=units, 205 units_map=units_map, 206 ) 207 208 def get_members(self) -> List["User"]: 209 results = self.allspice_client.requests_get(Organization.ORG_GET_MEMBERS % self.username) 210 return [User.parse_response(self.allspice_client, result) for result in results] 211 212 def is_member(self, username) -> bool: 213 if isinstance(username, User): 214 username = username.username 215 try: 216 # returns 204 if its ok, 404 if its not 217 self.allspice_client.requests_get( 218 Organization.ORG_IS_MEMBER % (self.username, username) 219 ) 220 return True 221 except Exception: 222 return False 223 224 def remove_member(self, user: "User"): 225 path = f"/orgs/{self.username}/members/{user.username}" 226 self.allspice_client.requests_delete(path) 227 228 def delete(self): 229 """Delete this Organization. Invalidates this Objects data. Also deletes all Repositories owned by the User""" 230 for repo in self.get_repositories(): 231 repo.delete() 232 self.allspice_client.requests_delete(Organization.API_OBJECT.format(name=self.username)) 233 self.deleted = True 234 235 def get_heatmap(self) -> List[Tuple[datetime, int]]: 236 results = self.allspice_client.requests_get(User.USER_HEATMAP % self.username) 237 results = [ 238 (datetime.fromtimestamp(result["timestamp"]), result["contributions"]) 239 for result in results 240 ] 241 return results
see allspice.allspice.io/api/swagger#/organization/orgGetAll">https://huballspice.allspice.io/api/swagger#/organization/orgGetAll
88 @classmethod 89 def parse_response(cls, allspice_client, result) -> "Organization": 90 api_object = super().parse_response(allspice_client, result) 91 # add "name" field to make this behave similar to users for gitea < 1.18 92 # also necessary for repository-owner when org is repo owner 93 if not hasattr(api_object, "name"): 94 Organization._add_read_property("name", result["username"], api_object) 95 return api_object
109 def create_repo( 110 self, 111 repoName: str, 112 description: str = "", 113 private: bool = False, 114 autoInit=True, 115 gitignores: Optional[str] = None, 116 license: Optional[str] = None, 117 readme: str = "Default", 118 issue_labels: Optional[str] = None, 119 default_branch="master", 120 ): 121 """Create an organization Repository 122 123 Throws: 124 AlreadyExistsException: If the Repository exists already. 125 Exception: If something else went wrong. 126 """ 127 result = self.allspice_client.requests_post( 128 f"/orgs/{self.name}/repos", 129 data={ 130 "name": repoName, 131 "description": description, 132 "private": private, 133 "auto_init": autoInit, 134 "gitignores": gitignores, 135 "license": license, 136 "issue_labels": issue_labels, 137 "readme": readme, 138 "default_branch": default_branch, 139 }, 140 ) 141 if "id" in result: 142 self.allspice_client.logger.info("Successfully created Repository %s " % result["name"]) 143 else: 144 self.allspice_client.logger.error(result["message"]) 145 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 146 return Repository.parse_response(self.allspice_client, result)
Create an organization Repository
Throws: AlreadyExistsException: If the Repository exists already. Exception: If something else went wrong.
161 def get_teams(self) -> List["Team"]: 162 results = self.allspice_client.requests_get(Organization.ORG_TEAMS_REQUEST % self.username) 163 teams = [Team.parse_response(self.allspice_client, result) for result in results] 164 # organisation seems to be missing using this request, so we add org manually 165 for t in teams: 166 setattr(t, "_organization", self) 167 return teams
176 def create_team( 177 self, 178 name: str, 179 description: str = "", 180 permission: str = "read", 181 can_create_org_repo: bool = False, 182 includes_all_repositories: bool = False, 183 units=( 184 "repo.code", 185 "repo.issues", 186 "repo.ext_issues", 187 "repo.wiki", 188 "repo.pulls", 189 "repo.releases", 190 "repo.ext_wiki", 191 ), 192 units_map={}, 193 ) -> "Team": 194 """Alias for AllSpice#create_team""" 195 # TODO: Move AllSpice#create_team to Organization#create_team and 196 # deprecate AllSpice#create_team. 197 return self.allspice_client.create_team( 198 org=self, 199 name=name, 200 description=description, 201 permission=permission, 202 can_create_org_repo=can_create_org_repo, 203 includes_all_repositories=includes_all_repositories, 204 units=units, 205 units_map=units_map, 206 )
Alias for AllSpice#create_team
212 def is_member(self, username) -> bool: 213 if isinstance(username, User): 214 username = username.username 215 try: 216 # returns 204 if its ok, 404 if its not 217 self.allspice_client.requests_get( 218 Organization.ORG_IS_MEMBER % (self.username, username) 219 ) 220 return True 221 except Exception: 222 return False
228 def delete(self): 229 """Delete this Organization. Invalidates this Objects data. Also deletes all Repositories owned by the User""" 230 for repo in self.get_repositories(): 231 repo.delete() 232 self.allspice_client.requests_delete(Organization.API_OBJECT.format(name=self.username)) 233 self.deleted = True
Delete this Organization. Invalidates this Objects data. Also deletes all Repositories owned by the User
244class User(ApiObject): 245 active: bool 246 admin: Any 247 allow_create_organization: Any 248 allow_git_hook: Any 249 allow_import_local: Any 250 avatar_url: str 251 created: str 252 description: str 253 email: str 254 emails: List[Any] 255 followers_count: int 256 following_count: int 257 full_name: str 258 html_url: str 259 id: int 260 is_admin: bool 261 language: str 262 last_login: str 263 location: str 264 login: str 265 login_name: str 266 max_repo_creation: Any 267 must_change_password: Any 268 password: Any 269 prohibit_login: bool 270 restricted: bool 271 source_id: int 272 starred_repos_count: int 273 username: str 274 visibility: str 275 website: str 276 277 API_OBJECT = """/users/{name}""" # <org> 278 USER_MAIL = """/user/emails?sudo=%s""" # <name> 279 USER_PATCH = """/admin/users/%s""" # <username> 280 ADMIN_DELETE_USER = """/admin/users/%s""" # <username> 281 ADMIN_EDIT_USER = """/admin/users/{username}""" # <username> 282 USER_HEATMAP = """/users/%s/heatmap""" # <username> 283 284 def __init__(self, allspice_client): 285 super().__init__(allspice_client) 286 self._emails = [] 287 288 def __eq__(self, other): 289 if not isinstance(other, User): 290 return False 291 return self.allspice_client == other.allspice_client and self.id == other.id 292 293 def __hash__(self): 294 return hash(self.allspice_client) ^ hash(self.id) 295 296 @property 297 def emails(self): 298 self.__request_emails() 299 return self._emails 300 301 @classmethod 302 def request(cls, allspice_client, name: str) -> "User": 303 api_object = cls._request(allspice_client, {"name": name}) 304 return api_object 305 306 _patchable_fields: ClassVar[set[str]] = { 307 "active", 308 "admin", 309 "allow_create_organization", 310 "allow_git_hook", 311 "allow_import_local", 312 "email", 313 "full_name", 314 "location", 315 "login_name", 316 "max_repo_creation", 317 "must_change_password", 318 "password", 319 "prohibit_login", 320 "website", 321 } 322 323 def commit(self, login_name: str, source_id: int = 0): 324 """ 325 Unfortunately it is necessary to require the login name 326 as well as the login source (that is not supplied when getting a user) for 327 changing a user. 328 Usually source_id is 0 and the login_name is equal to the username. 329 """ 330 values = self.get_dirty_fields() 331 values.update( 332 # api-doc says that the "source_id" is necessary; works without though 333 {"login_name": login_name, "source_id": source_id} 334 ) 335 args = {"username": self.username} 336 self.allspice_client.requests_patch(User.ADMIN_EDIT_USER.format(**args), data=values) 337 self._dirty_fields = {} 338 339 def create_repo( 340 self, 341 repoName: str, 342 description: str = "", 343 private: bool = False, 344 autoInit=True, 345 gitignores: Optional[str] = None, 346 license: Optional[str] = None, 347 readme: str = "Default", 348 issue_labels: Optional[str] = None, 349 default_branch="master", 350 ): 351 """Create a user Repository 352 353 Throws: 354 AlreadyExistsException: If the Repository exists already. 355 Exception: If something else went wrong. 356 """ 357 result = self.allspice_client.requests_post( 358 "/user/repos", 359 data={ 360 "name": repoName, 361 "description": description, 362 "private": private, 363 "auto_init": autoInit, 364 "gitignores": gitignores, 365 "license": license, 366 "issue_labels": issue_labels, 367 "readme": readme, 368 "default_branch": default_branch, 369 }, 370 ) 371 if "id" in result: 372 self.allspice_client.logger.info("Successfully created Repository %s " % result["name"]) 373 else: 374 self.allspice_client.logger.error(result["message"]) 375 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 376 return Repository.parse_response(self.allspice_client, result) 377 378 def get_repositories(self) -> List["Repository"]: 379 """Get all Repositories owned by this User.""" 380 url = f"/users/{self.username}/repos" 381 results = self.allspice_client.requests_get_paginated(url) 382 return [Repository.parse_response(self.allspice_client, result) for result in results] 383 384 def get_orgs(self) -> List[Organization]: 385 """Get all Organizations this user is a member of.""" 386 url = f"/users/{self.username}/orgs" 387 results = self.allspice_client.requests_get_paginated(url) 388 return [Organization.parse_response(self.allspice_client, result) for result in results] 389 390 def get_teams(self) -> List["Team"]: 391 url = "/user/teams" 392 results = self.allspice_client.requests_get_paginated(url, sudo=self) 393 return [Team.parse_response(self.allspice_client, result) for result in results] 394 395 def get_accessible_repos(self) -> List["Repository"]: 396 """Get all Repositories accessible by the logged in User.""" 397 results = self.allspice_client.requests_get("/user/repos", sudo=self) 398 return [Repository.parse_response(self.allspice_client, result) for result in results] 399 400 def __request_emails(self): 401 result = self.allspice_client.requests_get(User.USER_MAIL % self.login) 402 # report if the adress changed by this 403 for mail in result: 404 self._emails.append(mail["email"]) 405 if mail["primary"]: 406 self._email = mail["email"] 407 408 def delete(self): 409 """Deletes this User. Also deletes all Repositories he owns.""" 410 self.allspice_client.requests_delete(User.ADMIN_DELETE_USER % self.username) 411 self.deleted = True 412 413 def get_heatmap(self) -> List[Tuple[datetime, int]]: 414 results = self.allspice_client.requests_get(User.USER_HEATMAP % self.username) 415 results = [ 416 (datetime.fromtimestamp(result["timestamp"]), result["contributions"]) 417 for result in results 418 ] 419 return results
323 def commit(self, login_name: str, source_id: int = 0): 324 """ 325 Unfortunately it is necessary to require the login name 326 as well as the login source (that is not supplied when getting a user) for 327 changing a user. 328 Usually source_id is 0 and the login_name is equal to the username. 329 """ 330 values = self.get_dirty_fields() 331 values.update( 332 # api-doc says that the "source_id" is necessary; works without though 333 {"login_name": login_name, "source_id": source_id} 334 ) 335 args = {"username": self.username} 336 self.allspice_client.requests_patch(User.ADMIN_EDIT_USER.format(**args), data=values) 337 self._dirty_fields = {}
Unfortunately it is necessary to require the login name as well as the login source (that is not supplied when getting a user) for changing a user. Usually source_id is 0 and the login_name is equal to the username.
339 def create_repo( 340 self, 341 repoName: str, 342 description: str = "", 343 private: bool = False, 344 autoInit=True, 345 gitignores: Optional[str] = None, 346 license: Optional[str] = None, 347 readme: str = "Default", 348 issue_labels: Optional[str] = None, 349 default_branch="master", 350 ): 351 """Create a user Repository 352 353 Throws: 354 AlreadyExistsException: If the Repository exists already. 355 Exception: If something else went wrong. 356 """ 357 result = self.allspice_client.requests_post( 358 "/user/repos", 359 data={ 360 "name": repoName, 361 "description": description, 362 "private": private, 363 "auto_init": autoInit, 364 "gitignores": gitignores, 365 "license": license, 366 "issue_labels": issue_labels, 367 "readme": readme, 368 "default_branch": default_branch, 369 }, 370 ) 371 if "id" in result: 372 self.allspice_client.logger.info("Successfully created Repository %s " % result["name"]) 373 else: 374 self.allspice_client.logger.error(result["message"]) 375 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 376 return Repository.parse_response(self.allspice_client, result)
Create a user Repository
Throws: AlreadyExistsException: If the Repository exists already. Exception: If something else went wrong.
378 def get_repositories(self) -> List["Repository"]: 379 """Get all Repositories owned by this User.""" 380 url = f"/users/{self.username}/repos" 381 results = self.allspice_client.requests_get_paginated(url) 382 return [Repository.parse_response(self.allspice_client, result) for result in results]
Get all Repositories owned by this User.
384 def get_orgs(self) -> List[Organization]: 385 """Get all Organizations this user is a member of.""" 386 url = f"/users/{self.username}/orgs" 387 results = self.allspice_client.requests_get_paginated(url) 388 return [Organization.parse_response(self.allspice_client, result) for result in results]
Get all Organizations this user is a member of.
395 def get_accessible_repos(self) -> List["Repository"]: 396 """Get all Repositories accessible by the logged in User.""" 397 results = self.allspice_client.requests_get("/user/repos", sudo=self) 398 return [Repository.parse_response(self.allspice_client, result) for result in results]
Get all Repositories accessible by the logged in User.
408 def delete(self): 409 """Deletes this User. Also deletes all Repositories he owns.""" 410 self.allspice_client.requests_delete(User.ADMIN_DELETE_USER % self.username) 411 self.deleted = True
Deletes this User. Also deletes all Repositories he owns.
422class Branch(ReadonlyApiObject): 423 commit: Dict[str, Optional[Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]]] 424 effective_branch_protection_name: str 425 enable_status_check: bool 426 name: str 427 protected: bool 428 required_approvals: int 429 status_check_contexts: List[Any] 430 user_can_merge: bool 431 user_can_push: bool 432 433 API_OBJECT = """/repos/{owner}/{repo}/branches/{branch}""" 434 435 def __init__(self, allspice_client): 436 super().__init__(allspice_client) 437 438 def __eq__(self, other): 439 if not isinstance(other, Branch): 440 return False 441 return self.commit == other.commit and self.name == other.name 442 443 def __hash__(self): 444 return hash(self.commit["id"]) ^ hash(self.name) 445 446 _fields_to_parsers: ClassVar[dict] = { 447 # This is not a commit object 448 # "commit": lambda allspice_client, c: Commit.parse_response(allspice_client, c) 449 } 450 451 @classmethod 452 def request(cls, allspice_client, owner: str, repo: str, branch: str): 453 return cls._request(allspice_client, {"owner": owner, "repo": repo, "branch": branch})
Inherited Members
456class GitEntry(ReadonlyApiObject): 457 """ 458 An object representing a file or directory in the Git tree. 459 """ 460 461 mode: str 462 path: str 463 sha: str 464 size: int 465 type: str 466 url: str 467 468 def __init__(self, allspice_client): 469 super().__init__(allspice_client) 470 471 def __eq__(self, other) -> bool: 472 if not isinstance(other, GitEntry): 473 return False 474 return self.sha == other.sha 475 476 def __hash__(self) -> int: 477 return hash(self.sha)
An object representing a file or directory in the Git tree.
Inherited Members
480class Repository(ApiObject): 481 allow_fast_forward_only_merge: bool 482 allow_manual_merge: Any 483 allow_merge_commits: bool 484 allow_rebase: bool 485 allow_rebase_explicit: bool 486 allow_rebase_update: bool 487 allow_squash_merge: bool 488 archived: bool 489 archived_at: str 490 autodetect_manual_merge: Any 491 avatar_url: str 492 clone_url: str 493 created_at: str 494 default_allow_maintainer_edit: bool 495 default_branch: str 496 default_delete_branch_after_merge: bool 497 default_merge_style: str 498 description: str 499 empty: bool 500 enable_prune: Any 501 external_tracker: Any 502 external_wiki: Any 503 fork: bool 504 forks_count: int 505 full_name: str 506 has_actions: bool 507 has_issues: bool 508 has_packages: bool 509 has_projects: bool 510 has_pull_requests: bool 511 has_releases: bool 512 has_wiki: bool 513 html_url: str 514 id: int 515 ignore_whitespace_conflicts: bool 516 internal: bool 517 internal_tracker: Dict[str, bool] 518 language: str 519 languages_url: str 520 licenses: Any 521 link: str 522 mirror: bool 523 mirror_interval: str 524 mirror_updated: str 525 name: str 526 object_format_name: str 527 open_issues_count: int 528 open_pr_counter: int 529 original_url: str 530 owner: Union["User", "Organization"] 531 parent: Any 532 permissions: Dict[str, bool] 533 private: bool 534 projects_mode: str 535 release_counter: int 536 repo_transfer: Any 537 size: int 538 ssh_url: str 539 stars_count: int 540 template: bool 541 topics: List[Union[Any, str]] 542 updated_at: datetime 543 url: str 544 watchers_count: int 545 website: str 546 547 API_OBJECT = """/repos/{owner}/{name}""" # <owner>, <reponame> 548 REPO_IS_COLLABORATOR = """/repos/%s/%s/collaborators/%s""" # <owner>, <reponame>, <username> 549 REPO_SEARCH = """/repos/search/""" 550 REPO_BRANCHES = """/repos/%s/%s/branches""" # <owner>, <reponame> 551 REPO_BRANCH = """/repos/{owner}/{repo}/branches/{branch}""" 552 REPO_ISSUES = """/repos/{owner}/{repo}/issues""" # <owner, reponame> 553 REPO_DESIGN_REVIEWS = """/repos/{owner}/{repo}/pulls""" 554 REPO_DELETE = """/repos/%s/%s""" # <owner>, <reponame> 555 REPO_TIMES = """/repos/%s/%s/times""" # <owner>, <reponame> 556 REPO_USER_TIME = """/repos/%s/%s/times/%s""" # <owner>, <reponame>, <username> 557 REPO_COMMITS = "/repos/%s/%s/commits" # <owner>, <reponame> 558 REPO_TRANSFER = "/repos/{owner}/{repo}/transfer" 559 REPO_MILESTONES = """/repos/{owner}/{repo}/milestones""" 560 REPO_GET_ARCHIVE = "/repos/{owner}/{repo}/archive/{ref}.{format}" 561 REPO_GET_ALLSPICE_JSON = "/repos/{owner}/{repo}/allspice_generated/json/{content}" 562 REPO_GET_ALLSPICE_SVG = "/repos/{owner}/{repo}/allspice_generated/svg/{content}" 563 REPO_GET_ALLSPICE_PROJECT = "/repos/{owner}/{repo}/allspice_generated/project/{content}" 564 REPO_GET_TOPICS = "/repos/{owner}/{repo}/topics" 565 REPO_ADD_TOPIC = "/repos/{owner}/{repo}/topics/{topic}" 566 REPO_GET_RELEASES = "/repos/{owner}/{repo}/releases" 567 REPO_GET_LATEST_RELEASE = "/repos/{owner}/{repo}/releases/latest" 568 REPO_GET_RELEASE_BY_TAG = "/repos/{owner}/{repo}/releases/tags/{tag}" 569 REPO_GET_COMMIT_STATUS = "/repos/{owner}/{repo}/statuses/{sha}" 570 REPO_GET_RAW_FILE = "/repos/{owner}/{repo}/raw/{path}" 571 REPO_GET_TREE = "/repos/{owner}/{repo}/git/trees/{ref}" 572 573 class ArchiveFormat(Enum): 574 """ 575 Archive formats for Repository.get_archive 576 """ 577 578 TAR = "tar.gz" 579 ZIP = "zip" 580 581 class CommitStatusSort(Enum): 582 """ 583 Sort order for Repository.get_commit_status 584 """ 585 586 OLDEST = "oldest" 587 RECENT_UPDATE = "recentupdate" 588 LEAST_UPDATE = "leastupdate" 589 LEAST_INDEX = "leastindex" 590 HIGHEST_INDEX = "highestindex" 591 592 def __init__(self, allspice_client): 593 super().__init__(allspice_client) 594 595 def __eq__(self, other): 596 if not isinstance(other, Repository): 597 return False 598 return self.owner == other.owner and self.name == other.name 599 600 def __hash__(self): 601 return hash(self.owner) ^ hash(self.name) 602 603 _fields_to_parsers: ClassVar[dict] = { 604 # dont know how to tell apart user and org as owner except form email being empty. 605 "owner": lambda allspice_client, r: ( 606 Organization.parse_response(allspice_client, r) 607 if r["email"] == "" 608 else User.parse_response(allspice_client, r) 609 ), 610 "updated_at": lambda _, t: Util.convert_time(t), 611 } 612 613 @classmethod 614 def request( 615 cls, 616 allspice_client, 617 owner: str, 618 name: str, 619 ) -> Repository: 620 return cls._request(allspice_client, {"owner": owner, "name": name}) 621 622 @classmethod 623 def search( 624 cls, 625 allspice_client, 626 query: Optional[str] = None, 627 topic: bool = False, 628 include_description: bool = False, 629 user: Optional[User] = None, 630 owner_to_prioritize: Union[User, Organization, None] = None, 631 ) -> list[Repository]: 632 """ 633 Search for repositories. 634 635 See https://hub.allspice.io/api/swagger#/repository/repoSearch 636 637 :param query: The query string to search for 638 :param topic: If true, the query string will only be matched against the 639 repository's topic. 640 :param include_description: If true, the query string will be matched 641 against the repository's description as well. 642 :param user: If specified, only repositories that this user owns or 643 contributes to will be searched. 644 :param owner_to_prioritize: If specified, repositories owned by the 645 given entity will be prioritized in the search. 646 :returns: All repositories matching the query. If there are many 647 repositories matching this query, this may take some time. 648 """ 649 650 params = {} 651 652 if query is not None: 653 params["q"] = query 654 if topic: 655 params["topic"] = topic 656 if include_description: 657 params["include_description"] = include_description 658 if user is not None: 659 params["user"] = user.id 660 if owner_to_prioritize is not None: 661 params["owner_to_prioritize"] = owner_to_prioritize.id 662 663 responses = allspice_client.requests_get_paginated(cls.REPO_SEARCH, params=params) 664 665 return [Repository.parse_response(allspice_client, response) for response in responses] 666 667 _patchable_fields: ClassVar[set[str]] = { 668 "allow_manual_merge", 669 "allow_merge_commits", 670 "allow_rebase", 671 "allow_rebase_explicit", 672 "allow_rebase_update", 673 "allow_squash_merge", 674 "archived", 675 "autodetect_manual_merge", 676 "default_branch", 677 "default_delete_branch_after_merge", 678 "default_merge_style", 679 "description", 680 "enable_prune", 681 "external_tracker", 682 "external_wiki", 683 "has_actions", 684 "has_issues", 685 "has_projects", 686 "has_pull_requests", 687 "has_wiki", 688 "ignore_whitespace_conflicts", 689 "internal_tracker", 690 "mirror_interval", 691 "name", 692 "private", 693 "template", 694 "website", 695 } 696 697 def commit(self): 698 args = {"owner": self.owner.username, "name": self.name} 699 self._commit(args) 700 701 def get_branches(self) -> List["Branch"]: 702 """Get all the Branches of this Repository.""" 703 704 results = self.allspice_client.requests_get_paginated( 705 Repository.REPO_BRANCHES % (self.owner.username, self.name) 706 ) 707 return [Branch.parse_response(self.allspice_client, result) for result in results] 708 709 def get_branch(self, name: str) -> "Branch": 710 """Get a specific Branch of this Repository.""" 711 result = self.allspice_client.requests_get( 712 Repository.REPO_BRANCH.format(owner=self.owner.username, repo=self.name, branch=name) 713 ) 714 return Branch.parse_response(self.allspice_client, result) 715 716 def add_branch(self, create_from: Ref, newname: str) -> "Branch": 717 """Add a branch to the repository""" 718 # Note: will only work with gitea 1.13 or higher! 719 720 ref_name = Util.data_params_for_ref(create_from) 721 if "ref" not in ref_name: 722 raise ValueError("create_from must be a Branch, Commit or string") 723 ref_name = ref_name["ref"] 724 725 data = {"new_branch_name": newname, "old_ref_name": ref_name} 726 result = self.allspice_client.requests_post( 727 Repository.REPO_BRANCHES % (self.owner.username, self.name), data=data 728 ) 729 return Branch.parse_response(self.allspice_client, result) 730 731 def get_issues( 732 self, 733 state: Literal["open", "closed", "all"] = "all", 734 search_query: Optional[str] = None, 735 labels: Optional[List[str]] = None, 736 milestones: Optional[List[Union[Milestone, str]]] = None, 737 assignee: Optional[Union[User, str]] = None, 738 since: Optional[datetime] = None, 739 before: Optional[datetime] = None, 740 ) -> List["Issue"]: 741 """ 742 Get all Issues of this Repository (open and closed) 743 744 https://hub.allspice.io/api/swagger#/repository/repoListIssues 745 746 All params of this method are optional filters. If you don't specify a filter, it 747 will not be applied. 748 749 :param state: The state of the Issues to get. If None, all Issues are returned. 750 :param search_query: Filter issues by text. This is equivalent to searching for 751 `search_query` in the Issues on the web interface. 752 :param labels: Filter issues by labels. 753 :param milestones: Filter issues by milestones. 754 :param assignee: Filter issues by the assigned user. 755 :param since: Filter issues by the date they were created. 756 :param before: Filter issues by the date they were created. 757 :return: A list of Issues. 758 """ 759 760 data = { 761 "state": state, 762 } 763 if search_query: 764 data["q"] = search_query 765 if labels: 766 data["labels"] = ",".join(labels) 767 if milestones: 768 data["milestone"] = ",".join( 769 [ 770 milestone.name if isinstance(milestone, Milestone) else milestone 771 for milestone in milestones 772 ] 773 ) 774 if assignee: 775 if isinstance(assignee, User): 776 data["assignee"] = assignee.username 777 else: 778 data["assignee"] = assignee 779 if since: 780 data["since"] = Util.format_time(since) 781 if before: 782 data["before"] = Util.format_time(before) 783 784 results = self.allspice_client.requests_get_paginated( 785 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 786 params=data, 787 ) 788 789 issues = [] 790 for result in results: 791 issue = Issue.parse_response(self.allspice_client, result) 792 # See Issue.request 793 setattr(issue, "_repository", self) 794 # This is mostly for compatibility with an older implementation 795 Issue._add_read_property("repo", self, issue) 796 issues.append(issue) 797 798 return issues 799 800 def get_design_reviews( 801 self, 802 state: Literal["open", "closed", "all"] = "all", 803 milestone: Optional[Union[Milestone, str]] = None, 804 labels: Optional[List[str]] = None, 805 ) -> List["DesignReview"]: 806 """ 807 Get all Design Reviews of this Repository. 808 809 https://hub.allspice.io/api/swagger#/repository/repoListPullRequests 810 811 :param state: The state of the Design Reviews to get. If None, all Design Reviews 812 are returned. 813 :param milestone: The milestone of the Design Reviews to get. 814 :param labels: A list of label IDs to filter DRs by. 815 :return: A list of Design Reviews. 816 """ 817 818 params = { 819 "state": state, 820 } 821 if milestone: 822 if isinstance(milestone, Milestone): 823 params["milestone"] = milestone.name 824 else: 825 params["milestone"] = milestone 826 if labels: 827 params["labels"] = ",".join(labels) 828 829 results = self.allspice_client.requests_get_paginated( 830 self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name), 831 params=params, 832 ) 833 return [DesignReview.parse_response(self.allspice_client, result) for result in results] 834 835 def get_commits( 836 self, 837 sha: Optional[str] = None, 838 path: Optional[str] = None, 839 stat: bool = True, 840 ) -> List["Commit"]: 841 """ 842 Get all the Commits of this Repository. 843 844 https://hub.allspice.io/api/swagger#/repository/repoGetAllCommits 845 846 :param sha: The SHA of the commit to start listing commits from. 847 :param path: filepath of a file/dir. 848 :param stat: Include the number of additions and deletions in the response. 849 Disable for speedup. 850 :return: A list of Commits. 851 """ 852 853 data = {} 854 if sha: 855 data["sha"] = sha 856 if path: 857 data["path"] = path 858 if not stat: 859 data["stat"] = False 860 861 try: 862 results = self.allspice_client.requests_get_paginated( 863 Repository.REPO_COMMITS % (self.owner.username, self.name), 864 params=data, 865 ) 866 except ConflictException as err: 867 logging.warning(err) 868 logging.warning("Repository %s/%s is Empty" % (self.owner.username, self.name)) 869 results = [] 870 return [Commit.parse_response(self.allspice_client, result) for result in results] 871 872 def get_issues_state(self, state) -> List["Issue"]: 873 """ 874 DEPRECATED: Use get_issues() instead. 875 876 Get issues of state Issue.open or Issue.closed of a repository. 877 """ 878 879 assert state in [Issue.OPENED, Issue.CLOSED] 880 issues = [] 881 data = {"state": state} 882 results = self.allspice_client.requests_get_paginated( 883 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 884 params=data, 885 ) 886 for result in results: 887 issue = Issue.parse_response(self.allspice_client, result) 888 # adding data not contained in the issue response 889 # See Issue.request() 890 setattr(issue, "_repository", self) 891 Issue._add_read_property("repo", self, issue) 892 Issue._add_read_property("owner", self.owner, issue) 893 issues.append(issue) 894 return issues 895 896 def get_times(self): 897 results = self.allspice_client.requests_get( 898 Repository.REPO_TIMES % (self.owner.username, self.name) 899 ) 900 return results 901 902 def get_user_time(self, username) -> float: 903 if isinstance(username, User): 904 username = username.username 905 results = self.allspice_client.requests_get( 906 Repository.REPO_USER_TIME % (self.owner.username, self.name, username) 907 ) 908 time = sum(r["time"] for r in results) 909 return time 910 911 def get_full_name(self) -> str: 912 return self.owner.username + "/" + self.name 913 914 def create_issue(self, title, assignees=frozenset(), description="") -> ApiObject: 915 data = { 916 "assignees": assignees, 917 "body": description, 918 "closed": False, 919 "title": title, 920 } 921 result = self.allspice_client.requests_post( 922 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 923 data=data, 924 ) 925 926 issue = Issue.parse_response(self.allspice_client, result) 927 setattr(issue, "_repository", self) 928 Issue._add_read_property("repo", self, issue) 929 return issue 930 931 def create_design_review( 932 self, 933 title: str, 934 head: Union[Branch, str], 935 base: Union[Branch, str], 936 assignees: Optional[Set[Union[User, str]]] = None, 937 body: Optional[str] = None, 938 due_date: Optional[datetime] = None, 939 milestone: Optional["Milestone"] = None, 940 ) -> "DesignReview": 941 """ 942 Create a new Design Review. 943 944 See https://hub.allspice.io/api/swagger#/repository/repoCreatePullRequest 945 946 :param title: Title of the Design Review 947 :param head: Branch or name of the branch to merge into the base branch 948 :param base: Branch or name of the branch to merge into 949 :param assignees: Optional. A list of users to assign this review. List can be of 950 User objects or of usernames. 951 :param body: An Optional Description for the Design Review. 952 :param due_date: An Optional Due date for the Design Review. 953 :param milestone: An Optional Milestone for the Design Review 954 :return: The created Design Review 955 """ 956 957 data: dict[str, Any] = { 958 "title": title, 959 } 960 961 if isinstance(head, Branch): 962 data["head"] = head.name 963 else: 964 data["head"] = head 965 if isinstance(base, Branch): 966 data["base"] = base.name 967 else: 968 data["base"] = base 969 if assignees: 970 data["assignees"] = [a.username if isinstance(a, User) else a for a in assignees] 971 if body: 972 data["body"] = body 973 if due_date: 974 data["due_date"] = Util.format_time(due_date) 975 if milestone: 976 data["milestone"] = milestone.id 977 978 result = self.allspice_client.requests_post( 979 self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name), 980 data=data, 981 ) 982 983 return DesignReview.parse_response(self.allspice_client, result) 984 985 def create_milestone( 986 self, 987 title: str, 988 description: str, 989 due_date: Optional[str] = None, 990 state: str = "open", 991 ) -> "Milestone": 992 url = Repository.REPO_MILESTONES.format(owner=self.owner.username, repo=self.name) 993 data = {"title": title, "description": description, "state": state} 994 if due_date: 995 data["due_date"] = due_date 996 result = self.allspice_client.requests_post(url, data=data) 997 return Milestone.parse_response(self.allspice_client, result) 998 999 def create_gitea_hook(self, hook_url: str, events: List[str]): 1000 url = f"/repos/{self.owner.username}/{self.name}/hooks" 1001 data = { 1002 "type": "gitea", 1003 "config": {"content_type": "json", "url": hook_url}, 1004 "events": events, 1005 "active": True, 1006 } 1007 return self.allspice_client.requests_post(url, data=data) 1008 1009 def list_hooks(self): 1010 url = f"/repos/{self.owner.username}/{self.name}/hooks" 1011 return self.allspice_client.requests_get(url) 1012 1013 def delete_hook(self, id: str): 1014 url = f"/repos/{self.owner.username}/{self.name}/hooks/{id}" 1015 self.allspice_client.requests_delete(url) 1016 1017 def is_collaborator(self, username) -> bool: 1018 if isinstance(username, User): 1019 username = username.username 1020 try: 1021 # returns 204 if its ok, 404 if its not 1022 self.allspice_client.requests_get( 1023 Repository.REPO_IS_COLLABORATOR % (self.owner.username, self.name, username) 1024 ) 1025 return True 1026 except Exception: 1027 return False 1028 1029 def get_users_with_access(self) -> Sequence[User]: 1030 url = f"/repos/{self.owner.username}/{self.name}/collaborators" 1031 response = self.allspice_client.requests_get(url) 1032 collabs = [User.parse_response(self.allspice_client, user) for user in response] 1033 if isinstance(self.owner, User): 1034 return [*collabs, self.owner] 1035 else: 1036 # owner must be org 1037 teams = self.owner.get_teams() 1038 for team in teams: 1039 team_repos = team.get_repos() 1040 if self.name in [n.name for n in team_repos]: 1041 collabs += team.get_members() 1042 return collabs 1043 1044 def remove_collaborator(self, user_name: str): 1045 url = f"/repos/{self.owner.username}/{self.name}/collaborators/{user_name}" 1046 self.allspice_client.requests_delete(url) 1047 1048 def transfer_ownership( 1049 self, 1050 new_owner: Union[User, Organization], 1051 new_teams: Set[Team] | FrozenSet[Team] = frozenset(), 1052 ): 1053 url = Repository.REPO_TRANSFER.format(owner=self.owner.username, repo=self.name) 1054 data: dict[str, Any] = {"new_owner": new_owner.username} 1055 if isinstance(new_owner, Organization): 1056 new_team_ids = [team.id for team in new_teams if team in new_owner.get_teams()] 1057 data["team_ids"] = new_team_ids 1058 self.allspice_client.requests_post(url, data=data) 1059 # TODO: make sure this instance is either updated or discarded 1060 1061 def get_git_content( 1062 self, 1063 ref: Optional["Ref"] = None, 1064 commit: "Optional[Commit]" = None, 1065 ) -> List[Content]: 1066 """ 1067 Get the metadata for all files in the root directory. 1068 1069 https://hub.allspice.io/api/swagger#/repository/repoGetContentsList 1070 1071 :param ref: branch or commit to get content from 1072 :param commit: commit to get content from (deprecated) 1073 """ 1074 url = f"/repos/{self.owner.username}/{self.name}/contents" 1075 data = Util.data_params_for_ref(ref or commit) 1076 1077 result = [ 1078 Content.parse_response(self.allspice_client, f) 1079 for f in self.allspice_client.requests_get(url, data) 1080 ] 1081 return result 1082 1083 def get_tree(self, ref: Optional[Ref] = None, recursive: bool = False) -> List[GitEntry]: 1084 """ 1085 Get the repository's tree on a given ref. 1086 1087 By default, this will only return the top-level entries in the tree. If you want 1088 to get the entire tree, set `recursive` to True. 1089 1090 :param ref: The ref to get the tree from. If not provided, the default branch is used. 1091 :param recursive: Whether to get the entire tree or just the top-level entries. 1092 """ 1093 1094 ref = Util.data_params_for_ref(ref).get("ref", self.default_branch) 1095 url = self.REPO_GET_TREE.format(owner=self.owner.username, repo=self.name, ref=ref) 1096 params = {"recursive": recursive} 1097 results = self.allspice_client.requests_get_paginated(url, params=params) 1098 return [GitEntry.parse_response(self.allspice_client, result) for result in results] 1099 1100 def get_file_content( 1101 self, 1102 content: Content, 1103 ref: Optional[Ref] = None, 1104 commit: Optional[Commit] = None, 1105 ) -> Union[str, List["Content"]]: 1106 """https://hub.allspice.io/api/swagger#/repository/repoGetContents""" 1107 url = f"/repos/{self.owner.username}/{self.name}/contents/{content.path}" 1108 data = Util.data_params_for_ref(ref or commit) 1109 1110 if content.type == Content.FILE: 1111 return self.allspice_client.requests_get(url, data)["content"] 1112 else: 1113 return [ 1114 Content.parse_response(self.allspice_client, f) 1115 for f in self.allspice_client.requests_get(url, data) 1116 ] 1117 1118 def get_raw_file( 1119 self, 1120 file_path: str, 1121 ref: Optional[Ref] = None, 1122 ) -> bytes: 1123 """ 1124 Get the raw, binary data of a single file. 1125 1126 Note 1: if the file you are requesting is a text file, you might want to 1127 use .decode() on the result to get a string. For example: 1128 1129 content = repo.get_raw_file("file.txt").decode("utf-8") 1130 1131 Note 2: this method will store the entire file in memory. If you want 1132 to download a large file, you might want to use `download_to_file` 1133 instead. 1134 1135 See https://hub.allspice.io/api/swagger#/repository/repoGetRawFile 1136 1137 :param file_path: The path to the file to get. 1138 :param ref: The branch or commit to get the file from. If not provided, 1139 the default branch is used. 1140 """ 1141 1142 url = self.REPO_GET_RAW_FILE.format( 1143 owner=self.owner.username, 1144 repo=self.name, 1145 path=file_path, 1146 ) 1147 params = Util.data_params_for_ref(ref) 1148 return self.allspice_client.requests_get_raw(url, params=params) 1149 1150 def download_to_file( 1151 self, 1152 file_path: str, 1153 io: IO, 1154 ref: Optional[Ref] = None, 1155 ) -> None: 1156 """ 1157 Download the binary data of a file to a file-like object. 1158 1159 Example: 1160 1161 with open("schematic.DSN", "wb") as f: 1162 Repository.download_to_file("Schematics/my_schematic.DSN", f) 1163 1164 :param file_path: The path to the file in the repository from the root 1165 of the repository. 1166 :param io: The file-like object to write the data to. 1167 """ 1168 1169 url = self.allspice_client._AllSpice__get_url( 1170 self.REPO_GET_RAW_FILE.format( 1171 owner=self.owner.username, 1172 repo=self.name, 1173 path=file_path, 1174 ) 1175 ) 1176 params = Util.data_params_for_ref(ref) 1177 response = self.allspice_client.requests.get( 1178 url, 1179 params=params, 1180 headers=self.allspice_client.headers, 1181 stream=True, 1182 ) 1183 1184 for chunk in response.iter_content(chunk_size=4096): 1185 if chunk: 1186 io.write(chunk) 1187 1188 def get_generated_json(self, content: Union[Content, str], ref: Optional[Ref] = None) -> dict: 1189 """ 1190 Get the json blob for a cad file if it exists, otherwise enqueue 1191 a new job and return a 503 status. 1192 1193 WARNING: This is still experimental and not recommended for critical 1194 applications. The structure and content of the returned dictionary can 1195 change at any time. 1196 1197 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON 1198 """ 1199 1200 if isinstance(content, Content): 1201 content = content.path 1202 1203 url = self.REPO_GET_ALLSPICE_JSON.format( 1204 owner=self.owner.username, 1205 repo=self.name, 1206 content=content, 1207 ) 1208 data = Util.data_params_for_ref(ref) 1209 return self.allspice_client.requests_get(url, data) 1210 1211 def get_generated_svg(self, content: Union[Content, str], ref: Optional[Ref] = None) -> bytes: 1212 """ 1213 Get the svg blob for a cad file if it exists, otherwise enqueue 1214 a new job and return a 503 status. 1215 1216 WARNING: This is still experimental and not yet recommended for 1217 critical applications. The content of the returned svg can change 1218 at any time. 1219 1220 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG 1221 """ 1222 1223 if isinstance(content, Content): 1224 content = content.path 1225 1226 url = self.REPO_GET_ALLSPICE_SVG.format( 1227 owner=self.owner.username, 1228 repo=self.name, 1229 content=content, 1230 ) 1231 data = Util.data_params_for_ref(ref) 1232 return self.allspice_client.requests_get_raw(url, data) 1233 1234 def get_generated_projectdata( 1235 self, content: Union[Content, str], ref: Optional[Ref] = None 1236 ) -> dict: 1237 """ 1238 Get the json project data based on the cad file provided 1239 1240 WARNING: This is still experimental and not yet recommended for 1241 critical applications. The content of the returned dictionary can change 1242 at any time. 1243 1244 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceProject 1245 """ 1246 if isinstance(content, Content): 1247 content = content.path 1248 1249 url = self.REPO_GET_ALLSPICE_PROJECT.format( 1250 owner=self.owner.username, 1251 repo=self.name, 1252 content=content, 1253 ) 1254 data = Util.data_params_for_ref(ref) 1255 return self.allspice_client.requests_get(url, data) 1256 1257 def create_file(self, file_path: str, content: str, data: Optional[dict] = None): 1258 """https://hub.allspice.io/api/swagger#/repository/repoCreateFile""" 1259 if not data: 1260 data = {} 1261 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1262 data.update({"content": content}) 1263 return self.allspice_client.requests_post(url, data) 1264 1265 def change_file(self, file_path: str, file_sha: str, content: str, data: Optional[dict] = None): 1266 """https://hub.allspice.io/api/swagger#/repository/repoCreateFile""" 1267 if not data: 1268 data = {} 1269 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1270 data.update({"sha": file_sha, "content": content}) 1271 return self.allspice_client.requests_put(url, data) 1272 1273 def delete_file(self, file_path: str, file_sha: str, data: Optional[dict] = None): 1274 """https://hub.allspice.io/api/swagger#/repository/repoDeleteFile""" 1275 if not data: 1276 data = {} 1277 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1278 data.update({"sha": file_sha}) 1279 return self.allspice_client.requests_delete(url, data) 1280 1281 def get_archive( 1282 self, 1283 ref: Ref = "main", 1284 archive_format: ArchiveFormat = ArchiveFormat.ZIP, 1285 ) -> bytes: 1286 """ 1287 Download all the files in a specific ref of a repository as a zip or tarball 1288 archive. 1289 1290 https://hub.allspice.io/api/swagger#/repository/repoGetArchive 1291 1292 :param ref: branch or commit to get content from, defaults to the "main" branch 1293 :param archive_format: zip or tar, defaults to zip 1294 """ 1295 1296 ref_string = Util.data_params_for_ref(ref)["ref"] 1297 url = self.REPO_GET_ARCHIVE.format( 1298 owner=self.owner.username, 1299 repo=self.name, 1300 ref=ref_string, 1301 format=archive_format.value, 1302 ) 1303 return self.allspice_client.requests_get_raw(url) 1304 1305 def get_topics(self) -> list[str]: 1306 """ 1307 Gets the list of topics on this repository. 1308 1309 See http://localhost:3000/api/swagger#/repository/repoListTopics 1310 """ 1311 1312 url = self.REPO_GET_TOPICS.format( 1313 owner=self.owner.username, 1314 repo=self.name, 1315 ) 1316 return self.allspice_client.requests_get(url)["topics"] 1317 1318 def add_topic(self, topic: str): 1319 """ 1320 Adds a topic to the repository. 1321 1322 See https://hub.allspice.io/api/swagger#/repository/repoAddTopic 1323 1324 :param topic: The topic to add. Topic names must consist only of 1325 lowercase letters, numnbers and dashes (-), and cannot start with 1326 dashes. Topic names also must be under 35 characters long. 1327 """ 1328 1329 url = self.REPO_ADD_TOPIC.format(owner=self.owner.username, repo=self.name, topic=topic) 1330 self.allspice_client.requests_put(url) 1331 1332 def create_release( 1333 self, 1334 tag_name: str, 1335 name: Optional[str] = None, 1336 body: Optional[str] = None, 1337 draft: bool = False, 1338 ): 1339 """ 1340 Create a release for this repository. The release will be created for 1341 the tag with the given name. If there is no tag with this name, create 1342 the tag first. 1343 1344 See https://hub.allspice.io/api/swagger#/repository/repoCreateRelease 1345 """ 1346 1347 url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name) 1348 data = { 1349 "tag_name": tag_name, 1350 "draft": draft, 1351 } 1352 if name is not None: 1353 data["name"] = name 1354 if body is not None: 1355 data["body"] = body 1356 response = self.allspice_client.requests_post(url, data) 1357 return Release.parse_response(self.allspice_client, response, self) 1358 1359 def get_releases( 1360 self, draft: Optional[bool] = None, pre_release: Optional[bool] = None 1361 ) -> List[Release]: 1362 """ 1363 Get the list of releases for this repository. 1364 1365 See https://hub.allspice.io/api/swagger#/repository/repoListReleases 1366 """ 1367 1368 data = {} 1369 1370 if draft is not None: 1371 data["draft"] = draft 1372 if pre_release is not None: 1373 data["pre-release"] = pre_release 1374 1375 url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name) 1376 responses = self.allspice_client.requests_get_paginated(url, params=data) 1377 1378 return [ 1379 Release.parse_response(self.allspice_client, response, self) for response in responses 1380 ] 1381 1382 def get_latest_release(self) -> Release: 1383 """ 1384 Get the latest release for this repository. 1385 1386 See https://hub.allspice.io/api/swagger#/repository/repoGetLatestRelease 1387 """ 1388 1389 url = self.REPO_GET_LATEST_RELEASE.format(owner=self.owner.username, repo=self.name) 1390 response = self.allspice_client.requests_get(url) 1391 release = Release.parse_response(self.allspice_client, response, self) 1392 return release 1393 1394 def get_release_by_tag(self, tag: str) -> Release: 1395 """ 1396 Get a release by its tag. 1397 1398 See https://hub.allspice.io/api/swagger#/repository/repoGetReleaseByTag 1399 """ 1400 1401 url = self.REPO_GET_RELEASE_BY_TAG.format( 1402 owner=self.owner.username, repo=self.name, tag=tag 1403 ) 1404 response = self.allspice_client.requests_get(url) 1405 release = Release.parse_response(self.allspice_client, response, self) 1406 return release 1407 1408 def get_commit_statuses( 1409 self, 1410 commit: Union[str, Commit], 1411 sort: Optional[CommitStatusSort] = None, 1412 state: Optional[CommitStatusState] = None, 1413 ) -> List[CommitStatus]: 1414 """ 1415 Get a list of statuses for a commit. 1416 1417 This is roughly equivalent to the Commit.get_statuses method, but this 1418 method allows you to sort and filter commits and is more convenient if 1419 you have a commit SHA and don't need to get the commit itself. 1420 1421 See https://hub.allspice.io/api/swagger#/repository/repoListStatuses 1422 """ 1423 1424 if isinstance(commit, Commit): 1425 commit = commit.sha 1426 1427 params = {} 1428 if sort is not None: 1429 params["sort"] = sort.value 1430 if state is not None: 1431 params["state"] = state.value 1432 1433 url = self.REPO_GET_COMMIT_STATUS.format( 1434 owner=self.owner.username, repo=self.name, sha=commit 1435 ) 1436 response = self.allspice_client.requests_get_paginated(url, params=params) 1437 return [CommitStatus.parse_response(self.allspice_client, status) for status in response] 1438 1439 def create_commit_status( 1440 self, 1441 commit: Union[str, Commit], 1442 context: Optional[str] = None, 1443 description: Optional[str] = None, 1444 state: Optional[CommitStatusState] = None, 1445 target_url: Optional[str] = None, 1446 ) -> CommitStatus: 1447 """ 1448 Create a status on a commit. 1449 1450 See https://hub.allspice.io/api/swagger#/repository/repoCreateStatus 1451 """ 1452 1453 if isinstance(commit, Commit): 1454 commit = commit.sha 1455 1456 data = {} 1457 if context is not None: 1458 data["context"] = context 1459 if description is not None: 1460 data["description"] = description 1461 if state is not None: 1462 data["state"] = state.value 1463 if target_url is not None: 1464 data["target_url"] = target_url 1465 1466 url = self.REPO_GET_COMMIT_STATUS.format( 1467 owner=self.owner.username, repo=self.name, sha=commit 1468 ) 1469 response = self.allspice_client.requests_post(url, data=data) 1470 return CommitStatus.parse_response(self.allspice_client, response) 1471 1472 def delete(self): 1473 self.allspice_client.requests_delete( 1474 Repository.REPO_DELETE % (self.owner.username, self.name) 1475 ) 1476 self.deleted = True
622 @classmethod 623 def search( 624 cls, 625 allspice_client, 626 query: Optional[str] = None, 627 topic: bool = False, 628 include_description: bool = False, 629 user: Optional[User] = None, 630 owner_to_prioritize: Union[User, Organization, None] = None, 631 ) -> list[Repository]: 632 """ 633 Search for repositories. 634 635 See https://hub.allspice.io/api/swagger#/repository/repoSearch 636 637 :param query: The query string to search for 638 :param topic: If true, the query string will only be matched against the 639 repository's topic. 640 :param include_description: If true, the query string will be matched 641 against the repository's description as well. 642 :param user: If specified, only repositories that this user owns or 643 contributes to will be searched. 644 :param owner_to_prioritize: If specified, repositories owned by the 645 given entity will be prioritized in the search. 646 :returns: All repositories matching the query. If there are many 647 repositories matching this query, this may take some time. 648 """ 649 650 params = {} 651 652 if query is not None: 653 params["q"] = query 654 if topic: 655 params["topic"] = topic 656 if include_description: 657 params["include_description"] = include_description 658 if user is not None: 659 params["user"] = user.id 660 if owner_to_prioritize is not None: 661 params["owner_to_prioritize"] = owner_to_prioritize.id 662 663 responses = allspice_client.requests_get_paginated(cls.REPO_SEARCH, params=params) 664 665 return [Repository.parse_response(allspice_client, response) for response in responses]
Search for repositories.
See allspice.allspice.io/api/swagger#/repository/repoSearch">https://huballspice.allspice.io/api/swagger#/repository/repoSearch
Parameters
- query: The query string to search for
- topic: If true, the query string will only be matched against the repository's topic.
- include_description: If true, the query string will be matched against the repository's description as well.
- user: If specified, only repositories that this user owns or contributes to will be searched.
- owner_to_prioritize: If specified, repositories owned by the given entity will be prioritized in the search. :returns: All repositories matching the query. If there are many repositories matching this query, this may take some time.
701 def get_branches(self) -> List["Branch"]: 702 """Get all the Branches of this Repository.""" 703 704 results = self.allspice_client.requests_get_paginated( 705 Repository.REPO_BRANCHES % (self.owner.username, self.name) 706 ) 707 return [Branch.parse_response(self.allspice_client, result) for result in results]
Get all the Branches of this Repository.
709 def get_branch(self, name: str) -> "Branch": 710 """Get a specific Branch of this Repository.""" 711 result = self.allspice_client.requests_get( 712 Repository.REPO_BRANCH.format(owner=self.owner.username, repo=self.name, branch=name) 713 ) 714 return Branch.parse_response(self.allspice_client, result)
Get a specific Branch of this Repository.
716 def add_branch(self, create_from: Ref, newname: str) -> "Branch": 717 """Add a branch to the repository""" 718 # Note: will only work with gitea 1.13 or higher! 719 720 ref_name = Util.data_params_for_ref(create_from) 721 if "ref" not in ref_name: 722 raise ValueError("create_from must be a Branch, Commit or string") 723 ref_name = ref_name["ref"] 724 725 data = {"new_branch_name": newname, "old_ref_name": ref_name} 726 result = self.allspice_client.requests_post( 727 Repository.REPO_BRANCHES % (self.owner.username, self.name), data=data 728 ) 729 return Branch.parse_response(self.allspice_client, result)
Add a branch to the repository
731 def get_issues( 732 self, 733 state: Literal["open", "closed", "all"] = "all", 734 search_query: Optional[str] = None, 735 labels: Optional[List[str]] = None, 736 milestones: Optional[List[Union[Milestone, str]]] = None, 737 assignee: Optional[Union[User, str]] = None, 738 since: Optional[datetime] = None, 739 before: Optional[datetime] = None, 740 ) -> List["Issue"]: 741 """ 742 Get all Issues of this Repository (open and closed) 743 744 https://hub.allspice.io/api/swagger#/repository/repoListIssues 745 746 All params of this method are optional filters. If you don't specify a filter, it 747 will not be applied. 748 749 :param state: The state of the Issues to get. If None, all Issues are returned. 750 :param search_query: Filter issues by text. This is equivalent to searching for 751 `search_query` in the Issues on the web interface. 752 :param labels: Filter issues by labels. 753 :param milestones: Filter issues by milestones. 754 :param assignee: Filter issues by the assigned user. 755 :param since: Filter issues by the date they were created. 756 :param before: Filter issues by the date they were created. 757 :return: A list of Issues. 758 """ 759 760 data = { 761 "state": state, 762 } 763 if search_query: 764 data["q"] = search_query 765 if labels: 766 data["labels"] = ",".join(labels) 767 if milestones: 768 data["milestone"] = ",".join( 769 [ 770 milestone.name if isinstance(milestone, Milestone) else milestone 771 for milestone in milestones 772 ] 773 ) 774 if assignee: 775 if isinstance(assignee, User): 776 data["assignee"] = assignee.username 777 else: 778 data["assignee"] = assignee 779 if since: 780 data["since"] = Util.format_time(since) 781 if before: 782 data["before"] = Util.format_time(before) 783 784 results = self.allspice_client.requests_get_paginated( 785 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 786 params=data, 787 ) 788 789 issues = [] 790 for result in results: 791 issue = Issue.parse_response(self.allspice_client, result) 792 # See Issue.request 793 setattr(issue, "_repository", self) 794 # This is mostly for compatibility with an older implementation 795 Issue._add_read_property("repo", self, issue) 796 issues.append(issue) 797 798 return issues
Get all Issues of this Repository (open and closed)
allspice.allspice.io/api/swagger#/repository/repoListIssues">https://huballspice.allspice.io/api/swagger#/repository/repoListIssues
All params of this method are optional filters. If you don't specify a filter, it will not be applied.
Parameters
- state: The state of the Issues to get. If None, all Issues are returned.
- search_query: Filter issues by text. This is equivalent to searching for
search_query
in the Issues on the web interface. - labels: Filter issues by labels.
- milestones: Filter issues by milestones.
- assignee: Filter issues by the assigned user.
- since: Filter issues by the date they were created.
- before: Filter issues by the date they were created.
Returns
A list of Issues.
800 def get_design_reviews( 801 self, 802 state: Literal["open", "closed", "all"] = "all", 803 milestone: Optional[Union[Milestone, str]] = None, 804 labels: Optional[List[str]] = None, 805 ) -> List["DesignReview"]: 806 """ 807 Get all Design Reviews of this Repository. 808 809 https://hub.allspice.io/api/swagger#/repository/repoListPullRequests 810 811 :param state: The state of the Design Reviews to get. If None, all Design Reviews 812 are returned. 813 :param milestone: The milestone of the Design Reviews to get. 814 :param labels: A list of label IDs to filter DRs by. 815 :return: A list of Design Reviews. 816 """ 817 818 params = { 819 "state": state, 820 } 821 if milestone: 822 if isinstance(milestone, Milestone): 823 params["milestone"] = milestone.name 824 else: 825 params["milestone"] = milestone 826 if labels: 827 params["labels"] = ",".join(labels) 828 829 results = self.allspice_client.requests_get_paginated( 830 self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name), 831 params=params, 832 ) 833 return [DesignReview.parse_response(self.allspice_client, result) for result in results]
Get all Design Reviews of this Repository.
allspice.allspice.io/api/swagger#/repository/repoListPullRequests">https://huballspice.allspice.io/api/swagger#/repository/repoListPullRequests
Parameters
- state: The state of the Design Reviews to get. If None, all Design Reviews are returned.
- milestone: The milestone of the Design Reviews to get.
- labels: A list of label IDs to filter DRs by.
Returns
A list of Design Reviews.
835 def get_commits( 836 self, 837 sha: Optional[str] = None, 838 path: Optional[str] = None, 839 stat: bool = True, 840 ) -> List["Commit"]: 841 """ 842 Get all the Commits of this Repository. 843 844 https://hub.allspice.io/api/swagger#/repository/repoGetAllCommits 845 846 :param sha: The SHA of the commit to start listing commits from. 847 :param path: filepath of a file/dir. 848 :param stat: Include the number of additions and deletions in the response. 849 Disable for speedup. 850 :return: A list of Commits. 851 """ 852 853 data = {} 854 if sha: 855 data["sha"] = sha 856 if path: 857 data["path"] = path 858 if not stat: 859 data["stat"] = False 860 861 try: 862 results = self.allspice_client.requests_get_paginated( 863 Repository.REPO_COMMITS % (self.owner.username, self.name), 864 params=data, 865 ) 866 except ConflictException as err: 867 logging.warning(err) 868 logging.warning("Repository %s/%s is Empty" % (self.owner.username, self.name)) 869 results = [] 870 return [Commit.parse_response(self.allspice_client, result) for result in results]
Get all the Commits of this Repository.
allspice.allspice.io/api/swagger#/repository/repoGetAllCommits">https://huballspice.allspice.io/api/swagger#/repository/repoGetAllCommits
Parameters
- sha: The SHA of the commit to start listing commits from.
- path: filepath of a file/dir.
- stat: Include the number of additions and deletions in the response. Disable for speedup.
Returns
A list of Commits.
872 def get_issues_state(self, state) -> List["Issue"]: 873 """ 874 DEPRECATED: Use get_issues() instead. 875 876 Get issues of state Issue.open or Issue.closed of a repository. 877 """ 878 879 assert state in [Issue.OPENED, Issue.CLOSED] 880 issues = [] 881 data = {"state": state} 882 results = self.allspice_client.requests_get_paginated( 883 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 884 params=data, 885 ) 886 for result in results: 887 issue = Issue.parse_response(self.allspice_client, result) 888 # adding data not contained in the issue response 889 # See Issue.request() 890 setattr(issue, "_repository", self) 891 Issue._add_read_property("repo", self, issue) 892 Issue._add_read_property("owner", self.owner, issue) 893 issues.append(issue) 894 return issues
DEPRECATED: Use get_issues() instead.
Get issues of state Issue.open or Issue.closed of a repository.
902 def get_user_time(self, username) -> float: 903 if isinstance(username, User): 904 username = username.username 905 results = self.allspice_client.requests_get( 906 Repository.REPO_USER_TIME % (self.owner.username, self.name, username) 907 ) 908 time = sum(r["time"] for r in results) 909 return time
914 def create_issue(self, title, assignees=frozenset(), description="") -> ApiObject: 915 data = { 916 "assignees": assignees, 917 "body": description, 918 "closed": False, 919 "title": title, 920 } 921 result = self.allspice_client.requests_post( 922 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 923 data=data, 924 ) 925 926 issue = Issue.parse_response(self.allspice_client, result) 927 setattr(issue, "_repository", self) 928 Issue._add_read_property("repo", self, issue) 929 return issue
931 def create_design_review( 932 self, 933 title: str, 934 head: Union[Branch, str], 935 base: Union[Branch, str], 936 assignees: Optional[Set[Union[User, str]]] = None, 937 body: Optional[str] = None, 938 due_date: Optional[datetime] = None, 939 milestone: Optional["Milestone"] = None, 940 ) -> "DesignReview": 941 """ 942 Create a new Design Review. 943 944 See https://hub.allspice.io/api/swagger#/repository/repoCreatePullRequest 945 946 :param title: Title of the Design Review 947 :param head: Branch or name of the branch to merge into the base branch 948 :param base: Branch or name of the branch to merge into 949 :param assignees: Optional. A list of users to assign this review. List can be of 950 User objects or of usernames. 951 :param body: An Optional Description for the Design Review. 952 :param due_date: An Optional Due date for the Design Review. 953 :param milestone: An Optional Milestone for the Design Review 954 :return: The created Design Review 955 """ 956 957 data: dict[str, Any] = { 958 "title": title, 959 } 960 961 if isinstance(head, Branch): 962 data["head"] = head.name 963 else: 964 data["head"] = head 965 if isinstance(base, Branch): 966 data["base"] = base.name 967 else: 968 data["base"] = base 969 if assignees: 970 data["assignees"] = [a.username if isinstance(a, User) else a for a in assignees] 971 if body: 972 data["body"] = body 973 if due_date: 974 data["due_date"] = Util.format_time(due_date) 975 if milestone: 976 data["milestone"] = milestone.id 977 978 result = self.allspice_client.requests_post( 979 self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name), 980 data=data, 981 ) 982 983 return DesignReview.parse_response(self.allspice_client, result)
Create a new Design Review.
See allspice.allspice.io/api/swagger#/repository/repoCreatePullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoCreatePullRequest
Parameters
- title: Title of the Design Review
- head: Branch or name of the branch to merge into the base branch
- base: Branch or name of the branch to merge into
- assignees: Optional. A list of users to assign this review. List can be of User objects or of usernames.
- body: An Optional Description for the Design Review.
- due_date: An Optional Due date for the Design Review.
- milestone: An Optional Milestone for the Design Review
Returns
The created Design Review
985 def create_milestone( 986 self, 987 title: str, 988 description: str, 989 due_date: Optional[str] = None, 990 state: str = "open", 991 ) -> "Milestone": 992 url = Repository.REPO_MILESTONES.format(owner=self.owner.username, repo=self.name) 993 data = {"title": title, "description": description, "state": state} 994 if due_date: 995 data["due_date"] = due_date 996 result = self.allspice_client.requests_post(url, data=data) 997 return Milestone.parse_response(self.allspice_client, result)
999 def create_gitea_hook(self, hook_url: str, events: List[str]): 1000 url = f"/repos/{self.owner.username}/{self.name}/hooks" 1001 data = { 1002 "type": "gitea", 1003 "config": {"content_type": "json", "url": hook_url}, 1004 "events": events, 1005 "active": True, 1006 } 1007 return self.allspice_client.requests_post(url, data=data)
1017 def is_collaborator(self, username) -> bool: 1018 if isinstance(username, User): 1019 username = username.username 1020 try: 1021 # returns 204 if its ok, 404 if its not 1022 self.allspice_client.requests_get( 1023 Repository.REPO_IS_COLLABORATOR % (self.owner.username, self.name, username) 1024 ) 1025 return True 1026 except Exception: 1027 return False
1029 def get_users_with_access(self) -> Sequence[User]: 1030 url = f"/repos/{self.owner.username}/{self.name}/collaborators" 1031 response = self.allspice_client.requests_get(url) 1032 collabs = [User.parse_response(self.allspice_client, user) for user in response] 1033 if isinstance(self.owner, User): 1034 return [*collabs, self.owner] 1035 else: 1036 # owner must be org 1037 teams = self.owner.get_teams() 1038 for team in teams: 1039 team_repos = team.get_repos() 1040 if self.name in [n.name for n in team_repos]: 1041 collabs += team.get_members() 1042 return collabs
1048 def transfer_ownership( 1049 self, 1050 new_owner: Union[User, Organization], 1051 new_teams: Set[Team] | FrozenSet[Team] = frozenset(), 1052 ): 1053 url = Repository.REPO_TRANSFER.format(owner=self.owner.username, repo=self.name) 1054 data: dict[str, Any] = {"new_owner": new_owner.username} 1055 if isinstance(new_owner, Organization): 1056 new_team_ids = [team.id for team in new_teams if team in new_owner.get_teams()] 1057 data["team_ids"] = new_team_ids 1058 self.allspice_client.requests_post(url, data=data) 1059 # TODO: make sure this instance is either updated or discarded
1061 def get_git_content( 1062 self, 1063 ref: Optional["Ref"] = None, 1064 commit: "Optional[Commit]" = None, 1065 ) -> List[Content]: 1066 """ 1067 Get the metadata for all files in the root directory. 1068 1069 https://hub.allspice.io/api/swagger#/repository/repoGetContentsList 1070 1071 :param ref: branch or commit to get content from 1072 :param commit: commit to get content from (deprecated) 1073 """ 1074 url = f"/repos/{self.owner.username}/{self.name}/contents" 1075 data = Util.data_params_for_ref(ref or commit) 1076 1077 result = [ 1078 Content.parse_response(self.allspice_client, f) 1079 for f in self.allspice_client.requests_get(url, data) 1080 ] 1081 return result
Get the metadata for all files in the root directory.
allspice.allspice.io/api/swagger#/repository/repoGetContentsList">https://huballspice.allspice.io/api/swagger#/repository/repoGetContentsList
Parameters
- ref: branch or commit to get content from
- commit: commit to get content from (deprecated)
1083 def get_tree(self, ref: Optional[Ref] = None, recursive: bool = False) -> List[GitEntry]: 1084 """ 1085 Get the repository's tree on a given ref. 1086 1087 By default, this will only return the top-level entries in the tree. If you want 1088 to get the entire tree, set `recursive` to True. 1089 1090 :param ref: The ref to get the tree from. If not provided, the default branch is used. 1091 :param recursive: Whether to get the entire tree or just the top-level entries. 1092 """ 1093 1094 ref = Util.data_params_for_ref(ref).get("ref", self.default_branch) 1095 url = self.REPO_GET_TREE.format(owner=self.owner.username, repo=self.name, ref=ref) 1096 params = {"recursive": recursive} 1097 results = self.allspice_client.requests_get_paginated(url, params=params) 1098 return [GitEntry.parse_response(self.allspice_client, result) for result in results]
Get the repository's tree on a given ref.
By default, this will only return the top-level entries in the tree. If you want
to get the entire tree, set recursive
to True.
Parameters
- ref: The ref to get the tree from. If not provided, the default branch is used.
- recursive: Whether to get the entire tree or just the top-level entries.
1100 def get_file_content( 1101 self, 1102 content: Content, 1103 ref: Optional[Ref] = None, 1104 commit: Optional[Commit] = None, 1105 ) -> Union[str, List["Content"]]: 1106 """https://hub.allspice.io/api/swagger#/repository/repoGetContents""" 1107 url = f"/repos/{self.owner.username}/{self.name}/contents/{content.path}" 1108 data = Util.data_params_for_ref(ref or commit) 1109 1110 if content.type == Content.FILE: 1111 return self.allspice_client.requests_get(url, data)["content"] 1112 else: 1113 return [ 1114 Content.parse_response(self.allspice_client, f) 1115 for f in self.allspice_client.requests_get(url, data) 1116 ]
allspice.allspice.io/api/swagger#/repository/repoGetContents">https://huballspice.allspice.io/api/swagger#/repository/repoGetContents
1118 def get_raw_file( 1119 self, 1120 file_path: str, 1121 ref: Optional[Ref] = None, 1122 ) -> bytes: 1123 """ 1124 Get the raw, binary data of a single file. 1125 1126 Note 1: if the file you are requesting is a text file, you might want to 1127 use .decode() on the result to get a string. For example: 1128 1129 content = repo.get_raw_file("file.txt").decode("utf-8") 1130 1131 Note 2: this method will store the entire file in memory. If you want 1132 to download a large file, you might want to use `download_to_file` 1133 instead. 1134 1135 See https://hub.allspice.io/api/swagger#/repository/repoGetRawFile 1136 1137 :param file_path: The path to the file to get. 1138 :param ref: The branch or commit to get the file from. If not provided, 1139 the default branch is used. 1140 """ 1141 1142 url = self.REPO_GET_RAW_FILE.format( 1143 owner=self.owner.username, 1144 repo=self.name, 1145 path=file_path, 1146 ) 1147 params = Util.data_params_for_ref(ref) 1148 return self.allspice_client.requests_get_raw(url, params=params)
Get the raw, binary data of a single file.
Note 1: if the file you are requesting is a text file, you might want to use .decode() on the result to get a string. For example:
content = repo.get_raw_file("file.txt").decode("utf-8")
Note 2: this method will store the entire file in memory. If you want
to download a large file, you might want to use download_to_file
instead.
See allspice.allspice.io/api/swagger#/repository/repoGetRawFile">https://huballspice.allspice.io/api/swagger#/repository/repoGetRawFile
Parameters
- file_path: The path to the file to get.
- ref: The branch or commit to get the file from. If not provided, the default branch is used.
1150 def download_to_file( 1151 self, 1152 file_path: str, 1153 io: IO, 1154 ref: Optional[Ref] = None, 1155 ) -> None: 1156 """ 1157 Download the binary data of a file to a file-like object. 1158 1159 Example: 1160 1161 with open("schematic.DSN", "wb") as f: 1162 Repository.download_to_file("Schematics/my_schematic.DSN", f) 1163 1164 :param file_path: The path to the file in the repository from the root 1165 of the repository. 1166 :param io: The file-like object to write the data to. 1167 """ 1168 1169 url = self.allspice_client._AllSpice__get_url( 1170 self.REPO_GET_RAW_FILE.format( 1171 owner=self.owner.username, 1172 repo=self.name, 1173 path=file_path, 1174 ) 1175 ) 1176 params = Util.data_params_for_ref(ref) 1177 response = self.allspice_client.requests.get( 1178 url, 1179 params=params, 1180 headers=self.allspice_client.headers, 1181 stream=True, 1182 ) 1183 1184 for chunk in response.iter_content(chunk_size=4096): 1185 if chunk: 1186 io.write(chunk)
Download the binary data of a file to a file-like object.
Example:
with open("schematic.DSN", "wb") as f:
Repository.download_to_file("Schematics/my_schematic.DSN", f)
Parameters
- file_path: The path to the file in the repository from the root of the repository.
- io: The file-like object to write the data to.
1188 def get_generated_json(self, content: Union[Content, str], ref: Optional[Ref] = None) -> dict: 1189 """ 1190 Get the json blob for a cad file if it exists, otherwise enqueue 1191 a new job and return a 503 status. 1192 1193 WARNING: This is still experimental and not recommended for critical 1194 applications. The structure and content of the returned dictionary can 1195 change at any time. 1196 1197 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON 1198 """ 1199 1200 if isinstance(content, Content): 1201 content = content.path 1202 1203 url = self.REPO_GET_ALLSPICE_JSON.format( 1204 owner=self.owner.username, 1205 repo=self.name, 1206 content=content, 1207 ) 1208 data = Util.data_params_for_ref(ref) 1209 return self.allspice_client.requests_get(url, data)
Get the json blob for a cad file if it exists, otherwise enqueue a new job and return a 503 status.
WARNING: This is still experimental and not recommended for critical applications. The structure and content of the returned dictionary can change at any time.
See allspice.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON">https://huballspice.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON
1211 def get_generated_svg(self, content: Union[Content, str], ref: Optional[Ref] = None) -> bytes: 1212 """ 1213 Get the svg blob for a cad file if it exists, otherwise enqueue 1214 a new job and return a 503 status. 1215 1216 WARNING: This is still experimental and not yet recommended for 1217 critical applications. The content of the returned svg can change 1218 at any time. 1219 1220 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG 1221 """ 1222 1223 if isinstance(content, Content): 1224 content = content.path 1225 1226 url = self.REPO_GET_ALLSPICE_SVG.format( 1227 owner=self.owner.username, 1228 repo=self.name, 1229 content=content, 1230 ) 1231 data = Util.data_params_for_ref(ref) 1232 return self.allspice_client.requests_get_raw(url, data)
Get the svg blob for a cad file if it exists, otherwise enqueue a new job and return a 503 status.
WARNING: This is still experimental and not yet recommended for critical applications. The content of the returned svg can change at any time.
See allspice.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG">https://huballspice.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG
1234 def get_generated_projectdata( 1235 self, content: Union[Content, str], ref: Optional[Ref] = None 1236 ) -> dict: 1237 """ 1238 Get the json project data based on the cad file provided 1239 1240 WARNING: This is still experimental and not yet recommended for 1241 critical applications. The content of the returned dictionary can change 1242 at any time. 1243 1244 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceProject 1245 """ 1246 if isinstance(content, Content): 1247 content = content.path 1248 1249 url = self.REPO_GET_ALLSPICE_PROJECT.format( 1250 owner=self.owner.username, 1251 repo=self.name, 1252 content=content, 1253 ) 1254 data = Util.data_params_for_ref(ref) 1255 return self.allspice_client.requests_get(url, data)
Get the json project data based on the cad file provided
WARNING: This is still experimental and not yet recommended for critical applications. The content of the returned dictionary can change at any time.
See allspice.allspice.io/api/swagger#/repository/repoGetAllSpiceProject">https://huballspice.allspice.io/api/swagger#/repository/repoGetAllSpiceProject
1257 def create_file(self, file_path: str, content: str, data: Optional[dict] = None): 1258 """https://hub.allspice.io/api/swagger#/repository/repoCreateFile""" 1259 if not data: 1260 data = {} 1261 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1262 data.update({"content": content}) 1263 return self.allspice_client.requests_post(url, data)
allspice.allspice.io/api/swagger#/repository/repoCreateFile">https://huballspice.allspice.io/api/swagger#/repository/repoCreateFile
1265 def change_file(self, file_path: str, file_sha: str, content: str, data: Optional[dict] = None): 1266 """https://hub.allspice.io/api/swagger#/repository/repoCreateFile""" 1267 if not data: 1268 data = {} 1269 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1270 data.update({"sha": file_sha, "content": content}) 1271 return self.allspice_client.requests_put(url, data)
allspice.allspice.io/api/swagger#/repository/repoCreateFile">https://huballspice.allspice.io/api/swagger#/repository/repoCreateFile
1273 def delete_file(self, file_path: str, file_sha: str, data: Optional[dict] = None): 1274 """https://hub.allspice.io/api/swagger#/repository/repoDeleteFile""" 1275 if not data: 1276 data = {} 1277 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1278 data.update({"sha": file_sha}) 1279 return self.allspice_client.requests_delete(url, data)
allspice.allspice.io/api/swagger#/repository/repoDeleteFile">https://huballspice.allspice.io/api/swagger#/repository/repoDeleteFile
1281 def get_archive( 1282 self, 1283 ref: Ref = "main", 1284 archive_format: ArchiveFormat = ArchiveFormat.ZIP, 1285 ) -> bytes: 1286 """ 1287 Download all the files in a specific ref of a repository as a zip or tarball 1288 archive. 1289 1290 https://hub.allspice.io/api/swagger#/repository/repoGetArchive 1291 1292 :param ref: branch or commit to get content from, defaults to the "main" branch 1293 :param archive_format: zip or tar, defaults to zip 1294 """ 1295 1296 ref_string = Util.data_params_for_ref(ref)["ref"] 1297 url = self.REPO_GET_ARCHIVE.format( 1298 owner=self.owner.username, 1299 repo=self.name, 1300 ref=ref_string, 1301 format=archive_format.value, 1302 ) 1303 return self.allspice_client.requests_get_raw(url)
Download all the files in a specific ref of a repository as a zip or tarball archive.
allspice.allspice.io/api/swagger#/repository/repoGetArchive">https://huballspice.allspice.io/api/swagger#/repository/repoGetArchive
Parameters
- ref: branch or commit to get content from, defaults to the "main" branch
- archive_format: zip or tar, defaults to zip
1305 def get_topics(self) -> list[str]: 1306 """ 1307 Gets the list of topics on this repository. 1308 1309 See http://localhost:3000/api/swagger#/repository/repoListTopics 1310 """ 1311 1312 url = self.REPO_GET_TOPICS.format( 1313 owner=self.owner.username, 1314 repo=self.name, 1315 ) 1316 return self.allspice_client.requests_get(url)["topics"]
Gets the list of topics on this repository.
See http://localhost:3000/api/swagger#/repository/repoListTopics
1318 def add_topic(self, topic: str): 1319 """ 1320 Adds a topic to the repository. 1321 1322 See https://hub.allspice.io/api/swagger#/repository/repoAddTopic 1323 1324 :param topic: The topic to add. Topic names must consist only of 1325 lowercase letters, numnbers and dashes (-), and cannot start with 1326 dashes. Topic names also must be under 35 characters long. 1327 """ 1328 1329 url = self.REPO_ADD_TOPIC.format(owner=self.owner.username, repo=self.name, topic=topic) 1330 self.allspice_client.requests_put(url)
Adds a topic to the repository.
See allspice.allspice.io/api/swagger#/repository/repoAddTopic">https://huballspice.allspice.io/api/swagger#/repository/repoAddTopic
Parameters
- topic: The topic to add. Topic names must consist only of lowercase letters, numnbers and dashes (-), and cannot start with dashes. Topic names also must be under 35 characters long.
1332 def create_release( 1333 self, 1334 tag_name: str, 1335 name: Optional[str] = None, 1336 body: Optional[str] = None, 1337 draft: bool = False, 1338 ): 1339 """ 1340 Create a release for this repository. The release will be created for 1341 the tag with the given name. If there is no tag with this name, create 1342 the tag first. 1343 1344 See https://hub.allspice.io/api/swagger#/repository/repoCreateRelease 1345 """ 1346 1347 url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name) 1348 data = { 1349 "tag_name": tag_name, 1350 "draft": draft, 1351 } 1352 if name is not None: 1353 data["name"] = name 1354 if body is not None: 1355 data["body"] = body 1356 response = self.allspice_client.requests_post(url, data) 1357 return Release.parse_response(self.allspice_client, response, self)
Create a release for this repository. The release will be created for the tag with the given name. If there is no tag with this name, create the tag first.
See allspice.allspice.io/api/swagger#/repository/repoCreateRelease">https://huballspice.allspice.io/api/swagger#/repository/repoCreateRelease
1359 def get_releases( 1360 self, draft: Optional[bool] = None, pre_release: Optional[bool] = None 1361 ) -> List[Release]: 1362 """ 1363 Get the list of releases for this repository. 1364 1365 See https://hub.allspice.io/api/swagger#/repository/repoListReleases 1366 """ 1367 1368 data = {} 1369 1370 if draft is not None: 1371 data["draft"] = draft 1372 if pre_release is not None: 1373 data["pre-release"] = pre_release 1374 1375 url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name) 1376 responses = self.allspice_client.requests_get_paginated(url, params=data) 1377 1378 return [ 1379 Release.parse_response(self.allspice_client, response, self) for response in responses 1380 ]
Get the list of releases for this repository.
See allspice.allspice.io/api/swagger#/repository/repoListReleases">https://huballspice.allspice.io/api/swagger#/repository/repoListReleases
1382 def get_latest_release(self) -> Release: 1383 """ 1384 Get the latest release for this repository. 1385 1386 See https://hub.allspice.io/api/swagger#/repository/repoGetLatestRelease 1387 """ 1388 1389 url = self.REPO_GET_LATEST_RELEASE.format(owner=self.owner.username, repo=self.name) 1390 response = self.allspice_client.requests_get(url) 1391 release = Release.parse_response(self.allspice_client, response, self) 1392 return release
Get the latest release for this repository.
See allspice.allspice.io/api/swagger#/repository/repoGetLatestRelease">https://huballspice.allspice.io/api/swagger#/repository/repoGetLatestRelease
1394 def get_release_by_tag(self, tag: str) -> Release: 1395 """ 1396 Get a release by its tag. 1397 1398 See https://hub.allspice.io/api/swagger#/repository/repoGetReleaseByTag 1399 """ 1400 1401 url = self.REPO_GET_RELEASE_BY_TAG.format( 1402 owner=self.owner.username, repo=self.name, tag=tag 1403 ) 1404 response = self.allspice_client.requests_get(url) 1405 release = Release.parse_response(self.allspice_client, response, self) 1406 return release
Get a release by its tag.
See allspice.allspice.io/api/swagger#/repository/repoGetReleaseByTag">https://huballspice.allspice.io/api/swagger#/repository/repoGetReleaseByTag
1408 def get_commit_statuses( 1409 self, 1410 commit: Union[str, Commit], 1411 sort: Optional[CommitStatusSort] = None, 1412 state: Optional[CommitStatusState] = None, 1413 ) -> List[CommitStatus]: 1414 """ 1415 Get a list of statuses for a commit. 1416 1417 This is roughly equivalent to the Commit.get_statuses method, but this 1418 method allows you to sort and filter commits and is more convenient if 1419 you have a commit SHA and don't need to get the commit itself. 1420 1421 See https://hub.allspice.io/api/swagger#/repository/repoListStatuses 1422 """ 1423 1424 if isinstance(commit, Commit): 1425 commit = commit.sha 1426 1427 params = {} 1428 if sort is not None: 1429 params["sort"] = sort.value 1430 if state is not None: 1431 params["state"] = state.value 1432 1433 url = self.REPO_GET_COMMIT_STATUS.format( 1434 owner=self.owner.username, repo=self.name, sha=commit 1435 ) 1436 response = self.allspice_client.requests_get_paginated(url, params=params) 1437 return [CommitStatus.parse_response(self.allspice_client, status) for status in response]
Get a list of statuses for a commit.
This is roughly equivalent to the Commit.get_statuses method, but this method allows you to sort and filter commits and is more convenient if you have a commit SHA and don't need to get the commit itself.
See allspice.allspice.io/api/swagger#/repository/repoListStatuses">https://huballspice.allspice.io/api/swagger#/repository/repoListStatuses
1439 def create_commit_status( 1440 self, 1441 commit: Union[str, Commit], 1442 context: Optional[str] = None, 1443 description: Optional[str] = None, 1444 state: Optional[CommitStatusState] = None, 1445 target_url: Optional[str] = None, 1446 ) -> CommitStatus: 1447 """ 1448 Create a status on a commit. 1449 1450 See https://hub.allspice.io/api/swagger#/repository/repoCreateStatus 1451 """ 1452 1453 if isinstance(commit, Commit): 1454 commit = commit.sha 1455 1456 data = {} 1457 if context is not None: 1458 data["context"] = context 1459 if description is not None: 1460 data["description"] = description 1461 if state is not None: 1462 data["state"] = state.value 1463 if target_url is not None: 1464 data["target_url"] = target_url 1465 1466 url = self.REPO_GET_COMMIT_STATUS.format( 1467 owner=self.owner.username, repo=self.name, sha=commit 1468 ) 1469 response = self.allspice_client.requests_post(url, data=data) 1470 return CommitStatus.parse_response(self.allspice_client, response)
Create a status on a commit.
See allspice.allspice.io/api/swagger#/repository/repoCreateStatus">https://huballspice.allspice.io/api/swagger#/repository/repoCreateStatus
573 class ArchiveFormat(Enum): 574 """ 575 Archive formats for Repository.get_archive 576 """ 577 578 TAR = "tar.gz" 579 ZIP = "zip"
Archive formats for Repository.get_archive
581 class CommitStatusSort(Enum): 582 """ 583 Sort order for Repository.get_commit_status 584 """ 585 586 OLDEST = "oldest" 587 RECENT_UPDATE = "recentupdate" 588 LEAST_UPDATE = "leastupdate" 589 LEAST_INDEX = "leastindex" 590 HIGHEST_INDEX = "highestindex"
Sort order for Repository.get_commit_status
1479class Milestone(ApiObject): 1480 allow_merge_commits: Any 1481 allow_rebase: Any 1482 allow_rebase_explicit: Any 1483 allow_squash_merge: Any 1484 archived: Any 1485 closed_at: Any 1486 closed_issues: int 1487 created_at: str 1488 default_branch: Any 1489 description: str 1490 due_on: Any 1491 has_issues: Any 1492 has_pull_requests: Any 1493 has_wiki: Any 1494 id: int 1495 ignore_whitespace_conflicts: Any 1496 name: Any 1497 open_issues: int 1498 private: Any 1499 state: str 1500 title: str 1501 updated_at: str 1502 website: Any 1503 1504 API_OBJECT = """/repos/{owner}/{repo}/milestones/{number}""" # <owner, repo> 1505 1506 def __init__(self, allspice_client): 1507 super().__init__(allspice_client) 1508 1509 def __eq__(self, other): 1510 if not isinstance(other, Milestone): 1511 return False 1512 return self.allspice_client == other.allspice_client and self.id == other.id 1513 1514 def __hash__(self): 1515 return hash(self.allspice_client) ^ hash(self.id) 1516 1517 _fields_to_parsers: ClassVar[dict] = { 1518 "closed_at": lambda _, t: Util.convert_time(t), 1519 "due_on": lambda _, t: Util.convert_time(t), 1520 } 1521 1522 _patchable_fields: ClassVar[set[str]] = { 1523 "allow_merge_commits", 1524 "allow_rebase", 1525 "allow_rebase_explicit", 1526 "allow_squash_merge", 1527 "archived", 1528 "default_branch", 1529 "description", 1530 "has_issues", 1531 "has_pull_requests", 1532 "has_wiki", 1533 "ignore_whitespace_conflicts", 1534 "name", 1535 "private", 1536 "website", 1537 } 1538 1539 @classmethod 1540 def request(cls, allspice_client, owner: str, repo: str, number: str): 1541 return cls._request(allspice_client, {"owner": owner, "repo": repo, "number": number})
1544class Attachment(ReadonlyApiObject): 1545 """ 1546 An asset attached to a comment. 1547 1548 You cannot edit or delete the attachment from this object - see the instance methods 1549 Comment.edit_attachment and delete_attachment for that. 1550 """ 1551 1552 browser_download_url: str 1553 created_at: str 1554 download_count: int 1555 id: int 1556 name: str 1557 size: int 1558 uuid: str 1559 1560 def __init__(self, allspice_client): 1561 super().__init__(allspice_client) 1562 1563 def __eq__(self, other): 1564 if not isinstance(other, Attachment): 1565 return False 1566 1567 return self.uuid == other.uuid 1568 1569 def __hash__(self): 1570 return hash(self.uuid) 1571 1572 def download_to_file(self, io: IO): 1573 """ 1574 Download the raw, binary data of this Attachment to a file-like object. 1575 1576 Example: 1577 1578 with open("my_file.zip", "wb") as f: 1579 attachment.download_to_file(f) 1580 1581 :param io: The file-like object to write the data to. 1582 """ 1583 1584 response = self.allspice_client.requests.get( 1585 self.browser_download_url, 1586 headers=self.allspice_client.headers, 1587 stream=True, 1588 ) 1589 # 4kb chunks 1590 for chunk in response.iter_content(chunk_size=4096): 1591 if chunk: 1592 io.write(chunk)
An asset attached to a comment.
You cannot edit or delete the attachment from this object - see the instance methods Comment.edit_attachment and delete_attachment for that.
1572 def download_to_file(self, io: IO): 1573 """ 1574 Download the raw, binary data of this Attachment to a file-like object. 1575 1576 Example: 1577 1578 with open("my_file.zip", "wb") as f: 1579 attachment.download_to_file(f) 1580 1581 :param io: The file-like object to write the data to. 1582 """ 1583 1584 response = self.allspice_client.requests.get( 1585 self.browser_download_url, 1586 headers=self.allspice_client.headers, 1587 stream=True, 1588 ) 1589 # 4kb chunks 1590 for chunk in response.iter_content(chunk_size=4096): 1591 if chunk: 1592 io.write(chunk)
Download the raw, binary data of this Attachment to a file-like object.
Example:
with open("my_file.zip", "wb") as f:
attachment.download_to_file(f)
Parameters
- io: The file-like object to write the data to.
Inherited Members
1595class Comment(ApiObject): 1596 assets: List[Union[Any, Dict[str, Union[int, str]]]] 1597 body: str 1598 created_at: datetime 1599 html_url: str 1600 id: int 1601 issue_url: str 1602 original_author: str 1603 original_author_id: int 1604 pull_request_url: str 1605 updated_at: datetime 1606 user: User 1607 1608 API_OBJECT = """/repos/{owner}/{repo}/issues/comments/{id}""" 1609 GET_ATTACHMENTS_PATH = """/repos/{owner}/{repo}/issues/comments/{id}/assets""" 1610 ATTACHMENT_PATH = """/repos/{owner}/{repo}/issues/comments/{id}/assets/{attachment_id}""" 1611 1612 def __init__(self, allspice_client): 1613 super().__init__(allspice_client) 1614 1615 def __eq__(self, other): 1616 if not isinstance(other, Comment): 1617 return False 1618 return self.repository == other.repository and self.id == other.id 1619 1620 def __hash__(self): 1621 return hash(self.repository) ^ hash(self.id) 1622 1623 @classmethod 1624 def request(cls, allspice_client, owner: str, repo: str, id: str) -> "Comment": 1625 return cls._request(allspice_client, {"owner": owner, "repo": repo, "id": id}) 1626 1627 _fields_to_parsers: ClassVar[dict] = { 1628 "user": lambda allspice_client, r: User.parse_response(allspice_client, r), 1629 "created_at": lambda _, t: Util.convert_time(t), 1630 "updated_at": lambda _, t: Util.convert_time(t), 1631 } 1632 1633 _patchable_fields: ClassVar[set[str]] = {"body"} 1634 1635 @property 1636 def parent_url(self) -> str: 1637 """URL of the parent of this comment (the issue or the pull request)""" 1638 1639 if self.issue_url is not None and self.issue_url != "": 1640 return self.issue_url 1641 else: 1642 return self.pull_request_url 1643 1644 @cached_property 1645 def repository(self) -> Repository: 1646 """The repository this comment was posted on.""" 1647 1648 owner_name, repo_name = self.parent_url.split("/")[-4:-2] 1649 return Repository.request(self.allspice_client, owner_name, repo_name) 1650 1651 def __fields_for_path(self): 1652 return { 1653 "owner": self.repository.owner.username, 1654 "repo": self.repository.name, 1655 "id": self.id, 1656 } 1657 1658 def commit(self): 1659 self._commit(self.__fields_for_path()) 1660 1661 def delete(self): 1662 self.allspice_client.requests_delete(self.API_OBJECT.format(**self.__fields_for_path())) 1663 self.deleted = True 1664 1665 def get_attachments(self) -> List[Attachment]: 1666 """ 1667 Get all attachments on this comment. This returns Attachment objects, which 1668 contain a link to download the attachment. 1669 1670 https://hub.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments 1671 """ 1672 1673 results = self.allspice_client.requests_get( 1674 self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()) 1675 ) 1676 return [Attachment.parse_response(self.allspice_client, result) for result in results] 1677 1678 def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment: 1679 """ 1680 Create an attachment on this comment. 1681 1682 https://hub.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment 1683 1684 :param file: The file to attach. This should be a file-like object. 1685 :param name: The name of the file. If not provided, the name of the file will be 1686 used. 1687 :return: The created attachment. 1688 """ 1689 1690 args: dict[str, Any] = { 1691 "files": {"attachment": file}, 1692 } 1693 if name is not None: 1694 args["params"] = {"name": name} 1695 1696 result = self.allspice_client.requests_post( 1697 self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()), 1698 **args, 1699 ) 1700 return Attachment.parse_response(self.allspice_client, result) 1701 1702 def edit_attachment(self, attachment: Attachment, data: dict) -> Attachment: 1703 """ 1704 Edit an attachment. 1705 1706 The list of params that can be edited is available at 1707 https://hub.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment 1708 1709 :param attachment: The attachment to be edited 1710 :param data: The data parameter should be a dictionary of the fields to edit. 1711 :return: The edited attachment 1712 """ 1713 1714 args = { 1715 **self.__fields_for_path(), 1716 "attachment_id": attachment.id, 1717 } 1718 result = self.allspice_client.requests_patch( 1719 self.ATTACHMENT_PATH.format(**args), 1720 data=data, 1721 ) 1722 return Attachment.parse_response(self.allspice_client, result) 1723 1724 def delete_attachment(self, attachment: Attachment): 1725 """https://hub.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment""" 1726 1727 args = { 1728 **self.__fields_for_path(), 1729 "attachment_id": attachment.id, 1730 } 1731 self.allspice_client.requests_delete(self.ATTACHMENT_PATH.format(**args)) 1732 attachment.deleted = True
1635 @property 1636 def parent_url(self) -> str: 1637 """URL of the parent of this comment (the issue or the pull request)""" 1638 1639 if self.issue_url is not None and self.issue_url != "": 1640 return self.issue_url 1641 else: 1642 return self.pull_request_url
URL of the parent of this comment (the issue or the pull request)
1644 @cached_property 1645 def repository(self) -> Repository: 1646 """The repository this comment was posted on.""" 1647 1648 owner_name, repo_name = self.parent_url.split("/")[-4:-2] 1649 return Repository.request(self.allspice_client, owner_name, repo_name)
The repository this comment was posted on.
1665 def get_attachments(self) -> List[Attachment]: 1666 """ 1667 Get all attachments on this comment. This returns Attachment objects, which 1668 contain a link to download the attachment. 1669 1670 https://hub.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments 1671 """ 1672 1673 results = self.allspice_client.requests_get( 1674 self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()) 1675 ) 1676 return [Attachment.parse_response(self.allspice_client, result) for result in results]
Get all attachments on this comment. This returns Attachment objects, which contain a link to download the attachment.
allspice.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments">https://huballspice.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments
1678 def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment: 1679 """ 1680 Create an attachment on this comment. 1681 1682 https://hub.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment 1683 1684 :param file: The file to attach. This should be a file-like object. 1685 :param name: The name of the file. If not provided, the name of the file will be 1686 used. 1687 :return: The created attachment. 1688 """ 1689 1690 args: dict[str, Any] = { 1691 "files": {"attachment": file}, 1692 } 1693 if name is not None: 1694 args["params"] = {"name": name} 1695 1696 result = self.allspice_client.requests_post( 1697 self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()), 1698 **args, 1699 ) 1700 return Attachment.parse_response(self.allspice_client, result)
Create an attachment on this comment.
allspice.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment
Parameters
- file: The file to attach. This should be a file-like object.
- name: The name of the file. If not provided, the name of the file will be used.
Returns
The created attachment.
1702 def edit_attachment(self, attachment: Attachment, data: dict) -> Attachment: 1703 """ 1704 Edit an attachment. 1705 1706 The list of params that can be edited is available at 1707 https://hub.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment 1708 1709 :param attachment: The attachment to be edited 1710 :param data: The data parameter should be a dictionary of the fields to edit. 1711 :return: The edited attachment 1712 """ 1713 1714 args = { 1715 **self.__fields_for_path(), 1716 "attachment_id": attachment.id, 1717 } 1718 result = self.allspice_client.requests_patch( 1719 self.ATTACHMENT_PATH.format(**args), 1720 data=data, 1721 ) 1722 return Attachment.parse_response(self.allspice_client, result)
Edit an attachment.
The list of params that can be edited is available at allspice.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment
Parameters
- attachment: The attachment to be edited
- data: The data parameter should be a dictionary of the fields to edit.
Returns
The edited attachment
1724 def delete_attachment(self, attachment: Attachment): 1725 """https://hub.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment""" 1726 1727 args = { 1728 **self.__fields_for_path(), 1729 "attachment_id": attachment.id, 1730 } 1731 self.allspice_client.requests_delete(self.ATTACHMENT_PATH.format(**args)) 1732 attachment.deleted = True
allspice.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment
1735class Commit(ReadonlyApiObject): 1736 author: User 1737 commit: Dict[str, Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]] 1738 committer: Dict[str, Union[int, str, bool]] 1739 created: str 1740 files: List[Dict[str, str]] 1741 html_url: str 1742 inner_commit: Dict[str, Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]] 1743 parents: List[Union[Dict[str, str], Any]] 1744 sha: str 1745 stats: Dict[str, int] 1746 url: str 1747 1748 API_OBJECT = """/repos/{owner}/{repo}/commits/{sha}""" 1749 COMMIT_GET_STATUS = """/repos/{owner}/{repo}/commits/{sha}/status""" 1750 COMMIT_GET_STATUSES = """/repos/{owner}/{repo}/commits/{sha}/statuses""" 1751 1752 # Regex to extract owner and repo names from the url property 1753 URL_REGEXP = re.compile(r"/repos/([^/]+)/([^/]+)/git/commits") 1754 1755 def __init__(self, allspice_client): 1756 super().__init__(allspice_client) 1757 1758 _fields_to_parsers: ClassVar[dict] = { 1759 # NOTE: api may return None for commiters that are no allspice users 1760 "author": lambda allspice_client, u: ( 1761 User.parse_response(allspice_client, u) if u else None 1762 ) 1763 } 1764 1765 def __eq__(self, other): 1766 if not isinstance(other, Commit): 1767 return False 1768 return self.sha == other.sha 1769 1770 def __hash__(self): 1771 return hash(self.sha) 1772 1773 @classmethod 1774 def parse_response(cls, allspice_client, result) -> "Commit": 1775 commit_cache = result["commit"] 1776 api_object = cls(allspice_client) 1777 cls._initialize(allspice_client, api_object, result) 1778 # inner_commit for legacy reasons 1779 Commit._add_read_property("inner_commit", commit_cache, api_object) 1780 return api_object 1781 1782 def get_status(self) -> CommitCombinedStatus: 1783 """ 1784 Get a combined status consisting of all statues on this commit. 1785 1786 Note that the returned object is a CommitCombinedStatus object, which 1787 also contains a list of all statuses on the commit. 1788 1789 https://hub.allspice.io/api/swagger#/repository/repoGetCommitStatus 1790 """ 1791 1792 result = self.allspice_client.requests_get( 1793 self.COMMIT_GET_STATUS.format(**self._fields_for_path) 1794 ) 1795 return CommitCombinedStatus.parse_response(self.allspice_client, result) 1796 1797 def get_statuses(self) -> List[CommitStatus]: 1798 """ 1799 Get a list of all statuses on this commit. 1800 1801 https://hub.allspice.io/api/swagger#/repository/repoListCommitStatuses 1802 """ 1803 1804 results = self.allspice_client.requests_get( 1805 self.COMMIT_GET_STATUSES.format(**self._fields_for_path) 1806 ) 1807 return [CommitStatus.parse_response(self.allspice_client, result) for result in results] 1808 1809 @cached_property 1810 def _fields_for_path(self) -> dict[str, str]: 1811 matches = self.URL_REGEXP.search(self.url) 1812 if not matches: 1813 raise ValueError(f"Invalid commit URL: {self.url}") 1814 1815 return { 1816 "owner": matches.group(1), 1817 "repo": matches.group(2), 1818 "sha": self.sha, 1819 }
1773 @classmethod 1774 def parse_response(cls, allspice_client, result) -> "Commit": 1775 commit_cache = result["commit"] 1776 api_object = cls(allspice_client) 1777 cls._initialize(allspice_client, api_object, result) 1778 # inner_commit for legacy reasons 1779 Commit._add_read_property("inner_commit", commit_cache, api_object) 1780 return api_object
1782 def get_status(self) -> CommitCombinedStatus: 1783 """ 1784 Get a combined status consisting of all statues on this commit. 1785 1786 Note that the returned object is a CommitCombinedStatus object, which 1787 also contains a list of all statuses on the commit. 1788 1789 https://hub.allspice.io/api/swagger#/repository/repoGetCommitStatus 1790 """ 1791 1792 result = self.allspice_client.requests_get( 1793 self.COMMIT_GET_STATUS.format(**self._fields_for_path) 1794 ) 1795 return CommitCombinedStatus.parse_response(self.allspice_client, result)
Get a combined status consisting of all statues on this commit.
Note that the returned object is a CommitCombinedStatus object, which also contains a list of all statuses on the commit.
allspice.allspice.io/api/swagger#/repository/repoGetCommitStatus">https://huballspice.allspice.io/api/swagger#/repository/repoGetCommitStatus
1797 def get_statuses(self) -> List[CommitStatus]: 1798 """ 1799 Get a list of all statuses on this commit. 1800 1801 https://hub.allspice.io/api/swagger#/repository/repoListCommitStatuses 1802 """ 1803 1804 results = self.allspice_client.requests_get( 1805 self.COMMIT_GET_STATUSES.format(**self._fields_for_path) 1806 ) 1807 return [CommitStatus.parse_response(self.allspice_client, result) for result in results]
Get a list of all statuses on this commit.
allspice.allspice.io/api/swagger#/repository/repoListCommitStatuses">https://huballspice.allspice.io/api/swagger#/repository/repoListCommitStatuses
Inherited Members
1822class CommitStatusState(Enum): 1823 PENDING = "pending" 1824 SUCCESS = "success" 1825 ERROR = "error" 1826 FAILURE = "failure" 1827 WARNING = "warning" 1828 1829 @classmethod 1830 def try_init(cls, value: str) -> CommitStatusState | str: 1831 """ 1832 Try converting a string to the enum, and if that fails, return the 1833 string itself. 1834 """ 1835 1836 try: 1837 return cls(value) 1838 except ValueError: 1839 return value
1829 @classmethod 1830 def try_init(cls, value: str) -> CommitStatusState | str: 1831 """ 1832 Try converting a string to the enum, and if that fails, return the 1833 string itself. 1834 """ 1835 1836 try: 1837 return cls(value) 1838 except ValueError: 1839 return value
Try converting a string to the enum, and if that fails, return the string itself.
1842class CommitStatus(ReadonlyApiObject): 1843 context: str 1844 created_at: str 1845 creator: User 1846 description: str 1847 id: int 1848 status: CommitStatusState 1849 target_url: str 1850 updated_at: str 1851 url: str 1852 1853 def __init__(self, allspice_client): 1854 super().__init__(allspice_client) 1855 1856 _fields_to_parsers: ClassVar[dict] = { 1857 # Gitea/ASH doesn't actually validate that the status is a "valid" 1858 # status, so we can expect empty or unknown strings in the status field. 1859 "status": lambda _, s: CommitStatusState.try_init(s), 1860 "creator": lambda allspice_client, u: ( 1861 User.parse_response(allspice_client, u) if u else None 1862 ), 1863 } 1864 1865 def __eq__(self, other): 1866 if not isinstance(other, CommitStatus): 1867 return False 1868 return self.id == other.id 1869 1870 def __hash__(self): 1871 return hash(self.id)
Inherited Members
1874class CommitCombinedStatus(ReadonlyApiObject): 1875 commit_url: str 1876 repository: Repository 1877 sha: str 1878 state: CommitStatusState 1879 statuses: List["CommitStatus"] 1880 total_count: int 1881 url: str 1882 1883 def __init__(self, allspice_client): 1884 super().__init__(allspice_client) 1885 1886 _fields_to_parsers: ClassVar[dict] = { 1887 # See CommitStatus 1888 "state": lambda _, s: CommitStatusState.try_init(s), 1889 "statuses": lambda allspice_client, statuses: [ 1890 CommitStatus.parse_response(allspice_client, status) for status in statuses 1891 ], 1892 "repository": lambda allspice_client, r: Repository.parse_response(allspice_client, r), 1893 } 1894 1895 def __eq__(self, other): 1896 if not isinstance(other, CommitCombinedStatus): 1897 return False 1898 return self.sha == other.sha 1899 1900 def __hash__(self): 1901 return hash(self.sha) 1902 1903 @classmethod 1904 def parse_response(cls, allspice_client, result) -> "CommitCombinedStatus": 1905 api_object = cls(allspice_client) 1906 cls._initialize(allspice_client, api_object, result) 1907 return api_object
Inherited Members
1910class Issue(ApiObject): 1911 """ 1912 An issue on a repository. 1913 1914 Note: `Issue.assets` may not have any entries even if the issue has 1915 attachments. This happens when an issue is fetched via a bulk method like 1916 `Repository.get_issues`. In most cases, prefer using 1917 `Issue.get_attachments` to get the attachments on an issue. 1918 """ 1919 1920 assets: List[Union[Any, "Attachment"]] 1921 assignee: Any 1922 assignees: Any 1923 body: str 1924 closed_at: Any 1925 comments: int 1926 created_at: str 1927 due_date: Any 1928 html_url: str 1929 id: int 1930 is_locked: bool 1931 labels: List[Any] 1932 milestone: Optional["Milestone"] 1933 number: int 1934 original_author: str 1935 original_author_id: int 1936 pin_order: int 1937 pull_request: Any 1938 ref: str 1939 repository: Dict[str, Union[int, str]] 1940 state: str 1941 title: str 1942 updated_at: str 1943 url: str 1944 user: User 1945 1946 API_OBJECT = """/repos/{owner}/{repo}/issues/{index}""" # <owner, repo, index> 1947 GET_TIME = """/repos/%s/%s/issues/%s/times""" # <owner, repo, index> 1948 GET_COMMENTS = """/repos/{owner}/{repo}/issues/{index}/comments""" 1949 CREATE_ISSUE = """/repos/{owner}/{repo}/issues""" 1950 GET_ATTACHMENTS = """/repos/{owner}/{repo}/issues/{index}/assets""" 1951 1952 OPENED = "open" 1953 CLOSED = "closed" 1954 1955 def __init__(self, allspice_client): 1956 super().__init__(allspice_client) 1957 1958 def __eq__(self, other): 1959 if not isinstance(other, Issue): 1960 return False 1961 return self.repository == other.repository and self.id == other.id 1962 1963 def __hash__(self): 1964 return hash(self.repository) ^ hash(self.id) 1965 1966 _fields_to_parsers: ClassVar[dict] = { 1967 "milestone": lambda allspice_client, m: Milestone.parse_response(allspice_client, m), 1968 "user": lambda allspice_client, u: User.parse_response(allspice_client, u), 1969 "assets": lambda allspice_client, assets: [ 1970 Attachment.parse_response(allspice_client, a) for a in assets 1971 ], 1972 "assignee": lambda allspice_client, u: User.parse_response(allspice_client, u), 1973 "assignees": lambda allspice_client, us: [ 1974 User.parse_response(allspice_client, u) for u in us 1975 ], 1976 "state": lambda _, s: (Issue.CLOSED if s == "closed" else Issue.OPENED), 1977 } 1978 1979 _parsers_to_fields: ClassVar[dict] = { 1980 "milestone": lambda m: m.id, 1981 } 1982 1983 _patchable_fields: ClassVar[set[str]] = { 1984 "assignee", 1985 "assignees", 1986 "body", 1987 "due_date", 1988 "milestone", 1989 "state", 1990 "title", 1991 } 1992 1993 def commit(self): 1994 args = { 1995 "owner": self.repository.owner.username, 1996 "repo": self.repository.name, 1997 "index": self.number, 1998 } 1999 self._commit(args) 2000 2001 @classmethod 2002 def request(cls, allspice_client, owner: str, repo: str, number: str): 2003 api_object = cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number}) 2004 # The repository in the response is a RepositoryMeta object, so request 2005 # the full repository object and add it to the issue object. 2006 repository = Repository.request(allspice_client, owner, repo) 2007 setattr(api_object, "_repository", repository) 2008 # For legacy reasons 2009 cls._add_read_property("repo", repository, api_object) 2010 return api_object 2011 2012 @classmethod 2013 def create_issue(cls, allspice_client, repo: Repository, title: str, body: str = ""): 2014 args = {"owner": repo.owner.username, "repo": repo.name} 2015 data = {"title": title, "body": body} 2016 result = allspice_client.requests_post(Issue.CREATE_ISSUE.format(**args), data=data) 2017 issue = Issue.parse_response(allspice_client, result) 2018 setattr(issue, "_repository", repo) 2019 cls._add_read_property("repo", repo, issue) 2020 return issue 2021 2022 @property 2023 def owner(self) -> Organization | User: 2024 return self.repository.owner 2025 2026 def get_time_sum(self, user: User) -> int: 2027 results = self.allspice_client.requests_get( 2028 Issue.GET_TIME % (self.owner.username, self.repository.name, self.number) 2029 ) 2030 return sum(result["time"] for result in results if result and result["user_id"] == user.id) 2031 2032 def get_times(self) -> Optional[Dict]: 2033 return self.allspice_client.requests_get( 2034 Issue.GET_TIME % (self.owner.username, self.repository.name, self.number) 2035 ) 2036 2037 def delete_time(self, time_id: str): 2038 path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times/{time_id}" 2039 self.allspice_client.requests_delete(path) 2040 2041 def add_time(self, time: int, created: Optional[str] = None, user_name: Optional[User] = None): 2042 path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times" 2043 self.allspice_client.requests_post( 2044 path, data={"created": created, "time": int(time), "user_name": user_name} 2045 ) 2046 2047 def get_comments(self) -> List[Comment]: 2048 """https://hub.allspice.io/api/swagger#/issue/issueGetComments""" 2049 2050 results = self.allspice_client.requests_get( 2051 self.GET_COMMENTS.format( 2052 owner=self.owner.username, repo=self.repository.name, index=self.number 2053 ) 2054 ) 2055 2056 return [Comment.parse_response(self.allspice_client, result) for result in results] 2057 2058 def create_comment(self, body: str) -> Comment: 2059 """https://hub.allspice.io/api/swagger#/issue/issueCreateComment""" 2060 2061 path = self.GET_COMMENTS.format( 2062 owner=self.owner.username, repo=self.repository.name, index=self.number 2063 ) 2064 2065 response = self.allspice_client.requests_post(path, data={"body": body}) 2066 return Comment.parse_response(self.allspice_client, response) 2067 2068 def get_attachments(self) -> List[Attachment]: 2069 """ 2070 Fetch all attachments on this issue. 2071 2072 Unlike the assets field, this will always fetch all attachments from the 2073 API. 2074 2075 See https://hub.allspice.io/api/swagger#/issue/issueListIssueAttachments 2076 """ 2077 2078 path = self.GET_ATTACHMENTS.format( 2079 owner=self.owner.username, repo=self.repository.name, index=self.number 2080 ) 2081 response = self.allspice_client.requests_get(path) 2082 2083 return [Attachment.parse_response(self.allspice_client, result) for result in response] 2084 2085 def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment: 2086 """ 2087 Create an attachment on this issue. 2088 2089 https://hub.allspice.io/api/swagger#/issue/issueCreateIssueAttachment 2090 2091 :param file: The file to attach. This should be a file-like object. 2092 :param name: The name of the file. If not provided, the name of the file will be 2093 used. 2094 :return: The created attachment. 2095 """ 2096 2097 args: dict[str, Any] = { 2098 "files": {"attachment": file}, 2099 } 2100 if name is not None: 2101 args["params"] = {"name": name} 2102 2103 result = self.allspice_client.requests_post( 2104 self.GET_ATTACHMENTS.format( 2105 owner=self.owner.username, repo=self.repository.name, index=self.number 2106 ), 2107 **args, 2108 ) 2109 2110 return Attachment.parse_response(self.allspice_client, result)
An issue on a repository.
Note: Issue.assets
may not have any entries even if the issue has
attachments. This happens when an issue is fetched via a bulk method like
Repository.get_issues
. In most cases, prefer using
Issue.get_attachments
to get the attachments on an issue.
2001 @classmethod 2002 def request(cls, allspice_client, owner: str, repo: str, number: str): 2003 api_object = cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number}) 2004 # The repository in the response is a RepositoryMeta object, so request 2005 # the full repository object and add it to the issue object. 2006 repository = Repository.request(allspice_client, owner, repo) 2007 setattr(api_object, "_repository", repository) 2008 # For legacy reasons 2009 cls._add_read_property("repo", repository, api_object) 2010 return api_object
2012 @classmethod 2013 def create_issue(cls, allspice_client, repo: Repository, title: str, body: str = ""): 2014 args = {"owner": repo.owner.username, "repo": repo.name} 2015 data = {"title": title, "body": body} 2016 result = allspice_client.requests_post(Issue.CREATE_ISSUE.format(**args), data=data) 2017 issue = Issue.parse_response(allspice_client, result) 2018 setattr(issue, "_repository", repo) 2019 cls._add_read_property("repo", repo, issue) 2020 return issue
2041 def add_time(self, time: int, created: Optional[str] = None, user_name: Optional[User] = None): 2042 path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times" 2043 self.allspice_client.requests_post( 2044 path, data={"created": created, "time": int(time), "user_name": user_name} 2045 )
2047 def get_comments(self) -> List[Comment]: 2048 """https://hub.allspice.io/api/swagger#/issue/issueGetComments""" 2049 2050 results = self.allspice_client.requests_get( 2051 self.GET_COMMENTS.format( 2052 owner=self.owner.username, repo=self.repository.name, index=self.number 2053 ) 2054 ) 2055 2056 return [Comment.parse_response(self.allspice_client, result) for result in results]
allspice.allspice.io/api/swagger#/issue/issueGetComments">https://huballspice.allspice.io/api/swagger#/issue/issueGetComments
2058 def create_comment(self, body: str) -> Comment: 2059 """https://hub.allspice.io/api/swagger#/issue/issueCreateComment""" 2060 2061 path = self.GET_COMMENTS.format( 2062 owner=self.owner.username, repo=self.repository.name, index=self.number 2063 ) 2064 2065 response = self.allspice_client.requests_post(path, data={"body": body}) 2066 return Comment.parse_response(self.allspice_client, response)
allspice.allspice.io/api/swagger#/issue/issueCreateComment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateComment
2068 def get_attachments(self) -> List[Attachment]: 2069 """ 2070 Fetch all attachments on this issue. 2071 2072 Unlike the assets field, this will always fetch all attachments from the 2073 API. 2074 2075 See https://hub.allspice.io/api/swagger#/issue/issueListIssueAttachments 2076 """ 2077 2078 path = self.GET_ATTACHMENTS.format( 2079 owner=self.owner.username, repo=self.repository.name, index=self.number 2080 ) 2081 response = self.allspice_client.requests_get(path) 2082 2083 return [Attachment.parse_response(self.allspice_client, result) for result in response]
Fetch all attachments on this issue.
Unlike the assets field, this will always fetch all attachments from the API.
See allspice.allspice.io/api/swagger#/issue/issueListIssueAttachments">https://huballspice.allspice.io/api/swagger#/issue/issueListIssueAttachments
2085 def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment: 2086 """ 2087 Create an attachment on this issue. 2088 2089 https://hub.allspice.io/api/swagger#/issue/issueCreateIssueAttachment 2090 2091 :param file: The file to attach. This should be a file-like object. 2092 :param name: The name of the file. If not provided, the name of the file will be 2093 used. 2094 :return: The created attachment. 2095 """ 2096 2097 args: dict[str, Any] = { 2098 "files": {"attachment": file}, 2099 } 2100 if name is not None: 2101 args["params"] = {"name": name} 2102 2103 result = self.allspice_client.requests_post( 2104 self.GET_ATTACHMENTS.format( 2105 owner=self.owner.username, repo=self.repository.name, index=self.number 2106 ), 2107 **args, 2108 ) 2109 2110 return Attachment.parse_response(self.allspice_client, result)
Create an attachment on this issue.
allspice.allspice.io/api/swagger#/issue/issueCreateIssueAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateIssueAttachment
Parameters
- file: The file to attach. This should be a file-like object.
- name: The name of the file. If not provided, the name of the file will be used.
Returns
The created attachment.
2113class DesignReviewReviewComment(ApiObject): 2114 """ 2115 A comment on a Design Review Review. 2116 """ 2117 2118 body: str 2119 commit_id: str 2120 created_at: str 2121 diff_hunk: str 2122 html_url: str 2123 id: int 2124 original_commit_id: str 2125 original_position: int 2126 path: str 2127 position: int 2128 pull_request_review_id: int 2129 pull_request_url: str 2130 resolver: Any 2131 sub_path: str 2132 updated_at: str 2133 user: User 2134 2135 def __init__(self, allspice_client): 2136 super().__init__(allspice_client) 2137 2138 _fields_to_parsers: ClassVar[dict] = { 2139 "resolver": lambda allspice_client, r: User.parse_response(allspice_client, r), 2140 "user": lambda allspice_client, u: User.parse_response(allspice_client, u), 2141 }
A comment on a Design Review Review.
2144class DesignReviewReview(ReadonlyApiObject): 2145 """ 2146 A review on a Design Review. 2147 """ 2148 2149 body: str 2150 comments_count: int 2151 commit_id: str 2152 dismissed: bool 2153 html_url: str 2154 id: int 2155 official: bool 2156 pull_request_url: str 2157 stale: bool 2158 state: ReviewEvent 2159 submitted_at: str 2160 team: Any 2161 updated_at: str 2162 user: User 2163 2164 API_OBJECT = "/repos/{owner}/{repo}/pulls/{index}/reviews/{id}" 2165 GET_COMMENTS = "/repos/{owner}/{repo}/pulls/{index}/reviews/{id}/comments" 2166 2167 class ReviewEvent(Enum): 2168 APPROVED = "APPROVED" 2169 PENDING = "PENDING" 2170 COMMENT = "COMMENT" 2171 REQUEST_CHANGES = "REQUEST_CHANGES" 2172 REQUEST_REVIEW = "REQUEST_REVIEW" 2173 UNKNOWN = "" 2174 2175 @dataclass 2176 class ReviewComment: 2177 """ 2178 Data required to create a review comment on a design review. 2179 2180 :param body: The body of the comment. 2181 :param path: The path of the file to comment on. If you have a 2182 `Content` object, get the path using the `path` property. 2183 :param sub_path: The sub-path of the file to comment on. This is 2184 usually the page ID of the page in the multi-page document. 2185 :param new_position: The line number of the source code file after the 2186 change to add this comment on. Optional, leave unset if this is an ECAD 2187 file or the comment must be on the entire file. 2188 :param old_position: The line number of the source code file before the 2189 change to add this comment on. Optional, leave unset if this is an ECAD 2190 file or the comment must be on the entire file. 2191 """ 2192 2193 body: str 2194 path: str 2195 sub_path: Optional[str] = None 2196 new_position: Optional[int] = None 2197 old_position: Optional[int] = None 2198 2199 def __init__(self, allspice_client): 2200 super().__init__(allspice_client) 2201 2202 _fields_to_parsers: ClassVar[dict] = { 2203 "user": lambda allspice_client, u: User.parse_response(allspice_client, u), 2204 "state": lambda _, s: DesignReviewReview.ReviewEvent(s), 2205 } 2206 2207 def _get_dr_properties(self) -> dict[str, str]: 2208 """ 2209 Get the owner, repo name and design review number from the URL of this 2210 review's DR. 2211 """ 2212 2213 parts = self.pull_request_url.strip("/").split("/") 2214 2215 try: 2216 index = parts[-1] 2217 assert parts[-2] == "pulls" or parts[-2] == "pull", ( 2218 "Expected the second last part of the URL to be 'pulls' or 'pull', " 2219 ) 2220 repo = parts[-3] 2221 owner = parts[-4] 2222 2223 return { 2224 "owner": owner, 2225 "repo": repo, 2226 "index": index, 2227 } 2228 except IndexError: 2229 raise ValueError("Malformed design review URL: {}".format(self.pull_request_url)) 2230 2231 @cached_property 2232 def owner_name(self) -> str: 2233 """ 2234 The owner of the repository this review is on. 2235 """ 2236 2237 return self._get_dr_properties()["owner"] 2238 2239 @cached_property 2240 def repository_name(self) -> str: 2241 """ 2242 The name of the repository this review is on. 2243 """ 2244 2245 return self._get_dr_properties()["repo"] 2246 2247 @cached_property 2248 def index(self) -> str: 2249 """ 2250 The index of the design review this review is on. 2251 """ 2252 2253 return self._get_dr_properties()["index"] 2254 2255 def delete(self): 2256 """ 2257 Delete this review. 2258 """ 2259 2260 self.allspice_client.requests_delete( 2261 self.API_OBJECT.format(**self._get_dr_properties(), id=self.id) 2262 ) 2263 self.deleted = True 2264 2265 def get_comments(self) -> List[DesignReviewReviewComment]: 2266 """ 2267 Get the comments on this review. 2268 """ 2269 2270 result = self.allspice_client.requests_get( 2271 self.GET_COMMENTS.format(**self._get_dr_properties(), id=self.id) 2272 ) 2273 2274 return [ 2275 DesignReviewReviewComment.parse_response(self.allspice_client, comment) 2276 for comment in result 2277 ]
A review on a Design Review.
2231 @cached_property 2232 def owner_name(self) -> str: 2233 """ 2234 The owner of the repository this review is on. 2235 """ 2236 2237 return self._get_dr_properties()["owner"]
The owner of the repository this review is on.
2239 @cached_property 2240 def repository_name(self) -> str: 2241 """ 2242 The name of the repository this review is on. 2243 """ 2244 2245 return self._get_dr_properties()["repo"]
The name of the repository this review is on.
2247 @cached_property 2248 def index(self) -> str: 2249 """ 2250 The index of the design review this review is on. 2251 """ 2252 2253 return self._get_dr_properties()["index"]
The index of the design review this review is on.
2255 def delete(self): 2256 """ 2257 Delete this review. 2258 """ 2259 2260 self.allspice_client.requests_delete( 2261 self.API_OBJECT.format(**self._get_dr_properties(), id=self.id) 2262 ) 2263 self.deleted = True
Delete this review.
2265 def get_comments(self) -> List[DesignReviewReviewComment]: 2266 """ 2267 Get the comments on this review. 2268 """ 2269 2270 result = self.allspice_client.requests_get( 2271 self.GET_COMMENTS.format(**self._get_dr_properties(), id=self.id) 2272 ) 2273 2274 return [ 2275 DesignReviewReviewComment.parse_response(self.allspice_client, comment) 2276 for comment in result 2277 ]
Get the comments on this review.
Inherited Members
2175 @dataclass 2176 class ReviewComment: 2177 """ 2178 Data required to create a review comment on a design review. 2179 2180 :param body: The body of the comment. 2181 :param path: The path of the file to comment on. If you have a 2182 `Content` object, get the path using the `path` property. 2183 :param sub_path: The sub-path of the file to comment on. This is 2184 usually the page ID of the page in the multi-page document. 2185 :param new_position: The line number of the source code file after the 2186 change to add this comment on. Optional, leave unset if this is an ECAD 2187 file or the comment must be on the entire file. 2188 :param old_position: The line number of the source code file before the 2189 change to add this comment on. Optional, leave unset if this is an ECAD 2190 file or the comment must be on the entire file. 2191 """ 2192 2193 body: str 2194 path: str 2195 sub_path: Optional[str] = None 2196 new_position: Optional[int] = None 2197 old_position: Optional[int] = None
Data required to create a review comment on a design review.
Parameters
- body: The body of the comment.
- path: The path of the file to comment on. If you have a
Content
object, get the path using thepath
property. - sub_path: The sub-path of the file to comment on. This is usually the page ID of the page in the multi-page document.
- new_position: The line number of the source code file after the change to add this comment on. Optional, leave unset if this is an ECAD file or the comment must be on the entire file.
- old_position: The line number of the source code file before the change to add this comment on. Optional, leave unset if this is an ECAD file or the comment must be on the entire file.
2280class DesignReview(ApiObject): 2281 """ 2282 A Design Review. See 2283 https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest. 2284 2285 Note: The base and head fields are not `Branch` objects - they are plain strings 2286 referring to the branch names. This is because DRs can exist for branches that have 2287 been deleted, which don't have an associated `Branch` object from the API. You can use 2288 the `Repository.get_branch` method to get a `Branch` object for a branch if you know 2289 it exists. 2290 """ 2291 2292 additions: Optional[int] 2293 allow_maintainer_edit: bool 2294 allow_maintainer_edits: Any 2295 assignee: User 2296 assignees: List["User"] 2297 base: str 2298 body: str 2299 changed_files: Optional[int] 2300 closed_at: Optional[str] 2301 comments: int 2302 created_at: str 2303 deletions: Optional[int] 2304 diff_url: str 2305 draft: bool 2306 due_date: Optional[str] 2307 head: str 2308 html_url: str 2309 id: int 2310 is_locked: bool 2311 labels: List[Any] 2312 merge_base: str 2313 merge_commit_sha: Optional[str] 2314 mergeable: bool 2315 merged: bool 2316 merged_at: Optional[str] 2317 merged_by: Any 2318 milestone: Any 2319 number: int 2320 patch_url: str 2321 pin_order: int 2322 repository: Optional["Repository"] 2323 requested_reviewers: Any 2324 requested_reviewers_teams: Any 2325 review_comments: int 2326 state: str 2327 title: str 2328 updated_at: str 2329 url: str 2330 user: User 2331 2332 API_OBJECT = "/repos/{owner}/{repo}/pulls/{index}" 2333 MERGE_DESIGN_REVIEW = "/repos/{owner}/{repo}/pulls/{index}/merge" 2334 GET_REVIEWS = "/repos/{owner}/{repo}/pulls/{index}/reviews" 2335 GET_REVIEW = "/repos/{owner}/{repo}/pulls/{index}/reviews/{review_id}" 2336 GET_COMMENTS = "/repos/{owner}/{repo}/issues/{index}/comments" 2337 2338 OPEN = "open" 2339 CLOSED = "closed" 2340 2341 class MergeType(Enum): 2342 MERGE = "merge" 2343 REBASE = "rebase" 2344 REBASE_MERGE = "rebase-merge" 2345 SQUASH = "squash" 2346 MANUALLY_MERGED = "manually-merged" 2347 2348 def __init__(self, allspice_client): 2349 super().__init__(allspice_client) 2350 2351 def __eq__(self, other): 2352 if not isinstance(other, DesignReview): 2353 return False 2354 return self.repository == other.repository and self.id == other.id 2355 2356 def __hash__(self): 2357 return hash(self.repository) ^ hash(self.id) 2358 2359 @classmethod 2360 def parse_response(cls, allspice_client, result) -> "DesignReview": 2361 api_object = super().parse_response(allspice_client, result) 2362 cls._add_read_property( 2363 "repository", 2364 Repository.parse_response(allspice_client, result["base"]["repo"]), 2365 api_object, 2366 ) 2367 2368 return api_object 2369 2370 @classmethod 2371 def request(cls, allspice_client, owner: str, repo: str, number: str): 2372 """See https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest""" 2373 return cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number}) 2374 2375 _fields_to_parsers: ClassVar[dict] = { 2376 "assignee": lambda allspice_client, u: User.parse_response(allspice_client, u), 2377 "assignees": lambda allspice_client, us: [ 2378 User.parse_response(allspice_client, u) for u in us 2379 ], 2380 "base": lambda _, b: b["ref"], 2381 "head": lambda _, h: h["ref"], 2382 "merged_by": lambda allspice_client, u: User.parse_response(allspice_client, u), 2383 "milestone": lambda allspice_client, m: Milestone.parse_response(allspice_client, m), 2384 "user": lambda allspice_client, u: User.parse_response(allspice_client, u), 2385 } 2386 2387 _patchable_fields: ClassVar[set[str]] = { 2388 "allow_maintainer_edits", 2389 "assignee", 2390 "assignees", 2391 "base", 2392 "body", 2393 "due_date", 2394 "milestone", 2395 "state", 2396 "title", 2397 } 2398 2399 _parsers_to_fields: ClassVar[dict] = { 2400 "assignee": lambda u: u.username, 2401 "assignees": lambda us: [u.username for u in us], 2402 "base": lambda b: b.name if isinstance(b, Branch) else b, 2403 "milestone": lambda m: m.id, 2404 } 2405 2406 def commit(self): 2407 data = self.get_dirty_fields() 2408 if "due_date" in data and data["due_date"] is None: 2409 data["unset_due_date"] = True 2410 2411 args = { 2412 "owner": self.repository.owner.username, 2413 "repo": self.repository.name, 2414 "index": self.number, 2415 } 2416 self._commit(args, data) 2417 2418 def merge(self, merge_type: MergeType): 2419 """ 2420 Merge the pull request. See 2421 https://hub.allspice.io/api/swagger#/repository/repoMergePullRequest 2422 2423 :param merge_type: The type of merge to perform. See the MergeType enum. 2424 """ 2425 2426 self.allspice_client.requests_post( 2427 self.MERGE_DESIGN_REVIEW.format( 2428 owner=self.repository.owner.username, 2429 repo=self.repository.name, 2430 index=self.number, 2431 ), 2432 data={"Do": merge_type.value}, 2433 ) 2434 2435 def get_comments(self) -> List[Comment]: 2436 """ 2437 Get the comments on this pull request, but not specifically on a review. 2438 2439 https://hub.allspice.io/api/swagger#/issue/issueGetComments 2440 2441 :return: A list of comments on this pull request. 2442 """ 2443 2444 results = self.allspice_client.requests_get( 2445 self.GET_COMMENTS.format( 2446 owner=self.repository.owner.username, 2447 repo=self.repository.name, 2448 index=self.number, 2449 ) 2450 ) 2451 return [Comment.parse_response(self.allspice_client, result) for result in results] 2452 2453 def create_comment(self, body: str): 2454 """ 2455 Create a comment on this pull request. This uses the same endpoint as the 2456 comments on issues, and will not be associated with any reviews. 2457 2458 https://hub.allspice.io/api/swagger#/issue/issueCreateComment 2459 2460 :param body: The body of the comment. 2461 :return: The comment that was created. 2462 """ 2463 2464 result = self.allspice_client.requests_post( 2465 self.GET_COMMENTS.format( 2466 owner=self.repository.owner.username, 2467 repo=self.repository.name, 2468 index=self.number, 2469 ), 2470 data={"body": body}, 2471 ) 2472 return Comment.parse_response(self.allspice_client, result) 2473 2474 def create_review( 2475 self, 2476 *, 2477 body: Optional[str] = None, 2478 event: Optional[DesignReviewReview.ReviewEvent] = None, 2479 comments: Optional[List[DesignReviewReview.ReviewComment]] = None, 2480 commit_id: Optional[str] = None, 2481 ) -> DesignReviewReview: 2482 """ 2483 Create a review on this design review. 2484 2485 https://hub.allspice.io/api/swagger#/repository/repoCreatePullRequestReview 2486 2487 Note: in most cases, you should not set the body or event when creating 2488 a review. The event is automatically set to "PENDING" when the review 2489 is created. You should then use `submit_review` to submit the review 2490 with the desired event and body. 2491 2492 :param body: The body of the review. This is the top-level comment on 2493 the review. If not provided, the review will be created with no body. 2494 :param event: The event of the review. This is the overall status of the 2495 review. See the ReviewEvent enum. If not provided, the API will 2496 default to "PENDING". 2497 :param comments: A list of comments on the review. Each comment should 2498 be a ReviewComment object. If not provided, only the base comment 2499 will be created. 2500 :param commit_id: The commit SHA to associate with the review. This is 2501 optional. 2502 """ 2503 2504 data: dict[str, Any] = {} 2505 2506 if body is not None: 2507 data["body"] = body 2508 if event is not None: 2509 data["event"] = event.value 2510 if commit_id is not None: 2511 data["commit_id"] = commit_id 2512 if comments is not None: 2513 data["comments"] = [asdict(comment) for comment in comments] 2514 2515 result = self.allspice_client.requests_post( 2516 self.GET_REVIEWS.format( 2517 owner=self.repository.owner.username, 2518 repo=self.repository.name, 2519 index=self.number, 2520 ), 2521 data=data, 2522 ) 2523 2524 return DesignReviewReview.parse_response(self.allspice_client, result) 2525 2526 def get_reviews(self) -> List[DesignReviewReview]: 2527 """ 2528 Get all reviews on this design review. 2529 2530 https://hub.allspice.io/api/swagger#/repository/repoListPullReviews 2531 """ 2532 2533 results = self.allspice_client.requests_get( 2534 self.GET_REVIEWS.format( 2535 owner=self.repository.owner.username, 2536 repo=self.repository.name, 2537 index=self.number, 2538 ) 2539 ) 2540 2541 return [ 2542 DesignReviewReview.parse_response(self.allspice_client, result) for result in results 2543 ] 2544 2545 def submit_review( 2546 self, 2547 review_id: int, 2548 event: DesignReviewReview.ReviewEvent, 2549 *, 2550 body: Optional[str] = None, 2551 ): 2552 """ 2553 Submit a review on this design review. 2554 2555 https://hub.allspice.io/api/swagger#/repository/repoSubmitPullReview 2556 2557 :param review_id: The ID of the review to submit. 2558 :param event: The event to submit the review with. See the ReviewEvent 2559 enum for the possible values. 2560 :param body: Optional body text for the review submission. 2561 """ 2562 2563 data = { 2564 "event": event.value, 2565 } 2566 if body is not None: 2567 data["body"] = body 2568 2569 result = self.allspice_client.requests_post( 2570 self.GET_REVIEW.format( 2571 owner=self.repository.owner.username, 2572 repo=self.repository.name, 2573 index=self.number, 2574 review_id=review_id, 2575 ), 2576 data=data, 2577 ) 2578 2579 return result
A Design Review. See allspice.allspice.io/api/swagger#/repository/repoGetPullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoGetPullRequest.
Note: The base and head fields are not Branch
objects - they are plain strings
referring to the branch names. This is because DRs can exist for branches that have
been deleted, which don't have an associated Branch
object from the API. You can use
the Repository.get_branch
method to get a Branch
object for a branch if you know
it exists.
2359 @classmethod 2360 def parse_response(cls, allspice_client, result) -> "DesignReview": 2361 api_object = super().parse_response(allspice_client, result) 2362 cls._add_read_property( 2363 "repository", 2364 Repository.parse_response(allspice_client, result["base"]["repo"]), 2365 api_object, 2366 ) 2367 2368 return api_object
2370 @classmethod 2371 def request(cls, allspice_client, owner: str, repo: str, number: str): 2372 """See https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest""" 2373 return cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number})
See allspice.allspice.io/api/swagger#/repository/repoGetPullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoGetPullRequest
2406 def commit(self): 2407 data = self.get_dirty_fields() 2408 if "due_date" in data and data["due_date"] is None: 2409 data["unset_due_date"] = True 2410 2411 args = { 2412 "owner": self.repository.owner.username, 2413 "repo": self.repository.name, 2414 "index": self.number, 2415 } 2416 self._commit(args, data)
2418 def merge(self, merge_type: MergeType): 2419 """ 2420 Merge the pull request. See 2421 https://hub.allspice.io/api/swagger#/repository/repoMergePullRequest 2422 2423 :param merge_type: The type of merge to perform. See the MergeType enum. 2424 """ 2425 2426 self.allspice_client.requests_post( 2427 self.MERGE_DESIGN_REVIEW.format( 2428 owner=self.repository.owner.username, 2429 repo=self.repository.name, 2430 index=self.number, 2431 ), 2432 data={"Do": merge_type.value}, 2433 )
Merge the pull request. See allspice.allspice.io/api/swagger#/repository/repoMergePullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoMergePullRequest
Parameters
- merge_type: The type of merge to perform. See the MergeType enum.
2435 def get_comments(self) -> List[Comment]: 2436 """ 2437 Get the comments on this pull request, but not specifically on a review. 2438 2439 https://hub.allspice.io/api/swagger#/issue/issueGetComments 2440 2441 :return: A list of comments on this pull request. 2442 """ 2443 2444 results = self.allspice_client.requests_get( 2445 self.GET_COMMENTS.format( 2446 owner=self.repository.owner.username, 2447 repo=self.repository.name, 2448 index=self.number, 2449 ) 2450 ) 2451 return [Comment.parse_response(self.allspice_client, result) for result in results]
Get the comments on this pull request, but not specifically on a review.
allspice.allspice.io/api/swagger#/issue/issueGetComments">https://huballspice.allspice.io/api/swagger#/issue/issueGetComments
Returns
A list of comments on this pull request.
2453 def create_comment(self, body: str): 2454 """ 2455 Create a comment on this pull request. This uses the same endpoint as the 2456 comments on issues, and will not be associated with any reviews. 2457 2458 https://hub.allspice.io/api/swagger#/issue/issueCreateComment 2459 2460 :param body: The body of the comment. 2461 :return: The comment that was created. 2462 """ 2463 2464 result = self.allspice_client.requests_post( 2465 self.GET_COMMENTS.format( 2466 owner=self.repository.owner.username, 2467 repo=self.repository.name, 2468 index=self.number, 2469 ), 2470 data={"body": body}, 2471 ) 2472 return Comment.parse_response(self.allspice_client, result)
Create a comment on this pull request. This uses the same endpoint as the comments on issues, and will not be associated with any reviews.
allspice.allspice.io/api/swagger#/issue/issueCreateComment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateComment
Parameters
- body: The body of the comment.
Returns
The comment that was created.
2474 def create_review( 2475 self, 2476 *, 2477 body: Optional[str] = None, 2478 event: Optional[DesignReviewReview.ReviewEvent] = None, 2479 comments: Optional[List[DesignReviewReview.ReviewComment]] = None, 2480 commit_id: Optional[str] = None, 2481 ) -> DesignReviewReview: 2482 """ 2483 Create a review on this design review. 2484 2485 https://hub.allspice.io/api/swagger#/repository/repoCreatePullRequestReview 2486 2487 Note: in most cases, you should not set the body or event when creating 2488 a review. The event is automatically set to "PENDING" when the review 2489 is created. You should then use `submit_review` to submit the review 2490 with the desired event and body. 2491 2492 :param body: The body of the review. This is the top-level comment on 2493 the review. If not provided, the review will be created with no body. 2494 :param event: The event of the review. This is the overall status of the 2495 review. See the ReviewEvent enum. If not provided, the API will 2496 default to "PENDING". 2497 :param comments: A list of comments on the review. Each comment should 2498 be a ReviewComment object. If not provided, only the base comment 2499 will be created. 2500 :param commit_id: The commit SHA to associate with the review. This is 2501 optional. 2502 """ 2503 2504 data: dict[str, Any] = {} 2505 2506 if body is not None: 2507 data["body"] = body 2508 if event is not None: 2509 data["event"] = event.value 2510 if commit_id is not None: 2511 data["commit_id"] = commit_id 2512 if comments is not None: 2513 data["comments"] = [asdict(comment) for comment in comments] 2514 2515 result = self.allspice_client.requests_post( 2516 self.GET_REVIEWS.format( 2517 owner=self.repository.owner.username, 2518 repo=self.repository.name, 2519 index=self.number, 2520 ), 2521 data=data, 2522 ) 2523 2524 return DesignReviewReview.parse_response(self.allspice_client, result)
Create a review on this design review.
allspice.allspice.io/api/swagger#/repository/repoCreatePullRequestReview">https://huballspice.allspice.io/api/swagger#/repository/repoCreatePullRequestReview
Note: in most cases, you should not set the body or event when creating
a review. The event is automatically set to "PENDING" when the review
is created. You should then use submit_review
to submit the review
with the desired event and body.
Parameters
- body: The body of the review. This is the top-level comment on the review. If not provided, the review will be created with no body.
- event: The event of the review. This is the overall status of the review. See the ReviewEvent enum. If not provided, the API will default to "PENDING".
- comments: A list of comments on the review. Each comment should be a ReviewComment object. If not provided, only the base comment will be created.
- commit_id: The commit SHA to associate with the review. This is optional.
2526 def get_reviews(self) -> List[DesignReviewReview]: 2527 """ 2528 Get all reviews on this design review. 2529 2530 https://hub.allspice.io/api/swagger#/repository/repoListPullReviews 2531 """ 2532 2533 results = self.allspice_client.requests_get( 2534 self.GET_REVIEWS.format( 2535 owner=self.repository.owner.username, 2536 repo=self.repository.name, 2537 index=self.number, 2538 ) 2539 ) 2540 2541 return [ 2542 DesignReviewReview.parse_response(self.allspice_client, result) for result in results 2543 ]
Get all reviews on this design review.
allspice.allspice.io/api/swagger#/repository/repoListPullReviews">https://huballspice.allspice.io/api/swagger#/repository/repoListPullReviews
2545 def submit_review( 2546 self, 2547 review_id: int, 2548 event: DesignReviewReview.ReviewEvent, 2549 *, 2550 body: Optional[str] = None, 2551 ): 2552 """ 2553 Submit a review on this design review. 2554 2555 https://hub.allspice.io/api/swagger#/repository/repoSubmitPullReview 2556 2557 :param review_id: The ID of the review to submit. 2558 :param event: The event to submit the review with. See the ReviewEvent 2559 enum for the possible values. 2560 :param body: Optional body text for the review submission. 2561 """ 2562 2563 data = { 2564 "event": event.value, 2565 } 2566 if body is not None: 2567 data["body"] = body 2568 2569 result = self.allspice_client.requests_post( 2570 self.GET_REVIEW.format( 2571 owner=self.repository.owner.username, 2572 repo=self.repository.name, 2573 index=self.number, 2574 review_id=review_id, 2575 ), 2576 data=data, 2577 ) 2578 2579 return result
Submit a review on this design review.
allspice.allspice.io/api/swagger#/repository/repoSubmitPullReview">https://huballspice.allspice.io/api/swagger#/repository/repoSubmitPullReview
Parameters
- review_id: The ID of the review to submit.
- event: The event to submit the review with. See the ReviewEvent enum for the possible values.
- body: Optional body text for the review submission.
2582class Team(ApiObject): 2583 can_create_org_repo: bool 2584 description: str 2585 id: int 2586 includes_all_repositories: bool 2587 name: str 2588 organization: Optional["Organization"] 2589 permission: str 2590 units: List[str] 2591 units_map: Dict[str, str] 2592 2593 API_OBJECT = """/teams/{id}""" # <id> 2594 ADD_REPO = """/teams/%s/repos/%s/%s""" # <id, org, repo> 2595 TEAM_DELETE = """/teams/%s""" # <id> 2596 GET_MEMBERS = """/teams/%s/members""" # <id> 2597 GET_REPOS = """/teams/%s/repos""" # <id> 2598 2599 def __init__(self, allspice_client): 2600 super().__init__(allspice_client) 2601 2602 def __eq__(self, other): 2603 if not isinstance(other, Team): 2604 return False 2605 return self.organization == other.organization and self.id == other.id 2606 2607 def __hash__(self): 2608 return hash(self.organization) ^ hash(self.id) 2609 2610 _fields_to_parsers: ClassVar[dict] = { 2611 "organization": lambda allspice_client, o: Organization.parse_response(allspice_client, o) 2612 } 2613 2614 _patchable_fields: ClassVar[set[str]] = { 2615 "can_create_org_repo", 2616 "description", 2617 "includes_all_repositories", 2618 "name", 2619 "permission", 2620 "units", 2621 "units_map", 2622 } 2623 2624 @classmethod 2625 def request(cls, allspice_client, id: int): 2626 return cls._request(allspice_client, {"id": id}) 2627 2628 def commit(self): 2629 args = {"id": self.id} 2630 self._commit(args) 2631 2632 def add_user(self, user: User): 2633 """https://hub.allspice.io/api/swagger#/organization/orgAddTeamMember""" 2634 url = f"/teams/{self.id}/members/{user.login}" 2635 self.allspice_client.requests_put(url) 2636 2637 def add_repo(self, org: Organization, repo: Union[Repository, str]): 2638 if isinstance(repo, Repository): 2639 repo_name = repo.name 2640 else: 2641 repo_name = repo 2642 self.allspice_client.requests_put(Team.ADD_REPO % (self.id, org.username, repo_name)) 2643 2644 def get_members(self): 2645 """Get all users assigned to the team.""" 2646 results = self.allspice_client.requests_get_paginated( 2647 Team.GET_MEMBERS % self.id, 2648 ) 2649 return [User.parse_response(self.allspice_client, result) for result in results] 2650 2651 def get_repos(self): 2652 """Get all repos of this Team.""" 2653 results = self.allspice_client.requests_get(Team.GET_REPOS % self.id) 2654 return [Repository.parse_response(self.allspice_client, result) for result in results] 2655 2656 def delete(self): 2657 self.allspice_client.requests_delete(Team.TEAM_DELETE % self.id) 2658 self.deleted = True 2659 2660 def remove_team_member(self, user_name: str): 2661 url = f"/teams/{self.id}/members/{user_name}" 2662 self.allspice_client.requests_delete(url)
2632 def add_user(self, user: User): 2633 """https://hub.allspice.io/api/swagger#/organization/orgAddTeamMember""" 2634 url = f"/teams/{self.id}/members/{user.login}" 2635 self.allspice_client.requests_put(url)
allspice.allspice.io/api/swagger#/organization/orgAddTeamMember">https://huballspice.allspice.io/api/swagger#/organization/orgAddTeamMember
2644 def get_members(self): 2645 """Get all users assigned to the team.""" 2646 results = self.allspice_client.requests_get_paginated( 2647 Team.GET_MEMBERS % self.id, 2648 ) 2649 return [User.parse_response(self.allspice_client, result) for result in results]
Get all users assigned to the team.
2651 def get_repos(self): 2652 """Get all repos of this Team.""" 2653 results = self.allspice_client.requests_get(Team.GET_REPOS % self.id) 2654 return [Repository.parse_response(self.allspice_client, result) for result in results]
Get all repos of this Team.
2665class Release(ApiObject): 2666 """ 2667 A release on a repo. 2668 """ 2669 2670 assets: List[Union[Any, Dict[str, Union[int, str]], "ReleaseAsset"]] 2671 author: User 2672 body: str 2673 created_at: str 2674 draft: bool 2675 html_url: str 2676 id: int 2677 name: str 2678 prerelease: bool 2679 published_at: str 2680 repo: Optional["Repository"] 2681 repository: Optional["Repository"] 2682 tag_name: str 2683 tarball_url: str 2684 target_commitish: str 2685 upload_url: str 2686 url: str 2687 zipball_url: str 2688 2689 API_OBJECT = "/repos/{owner}/{repo}/releases/{id}" 2690 RELEASE_CREATE_ASSET = "/repos/{owner}/{repo}/releases/{id}/assets" 2691 # Note that we don't strictly need the get_assets route, as the release 2692 # object already contains the assets. 2693 2694 def __init__(self, allspice_client): 2695 super().__init__(allspice_client) 2696 2697 def __eq__(self, other): 2698 if not isinstance(other, Release): 2699 return False 2700 return self.repo == other.repo and self.id == other.id 2701 2702 def __hash__(self): 2703 return hash(self.repo) ^ hash(self.id) 2704 2705 _fields_to_parsers: ClassVar[dict] = { 2706 "author": lambda allspice_client, author: User.parse_response(allspice_client, author), 2707 } 2708 _patchable_fields: ClassVar[set[str]] = { 2709 "body", 2710 "draft", 2711 "name", 2712 "prerelease", 2713 "tag_name", 2714 "target_commitish", 2715 } 2716 2717 @classmethod 2718 def parse_response(cls, allspice_client, result, repo) -> Release: 2719 release = super().parse_response(allspice_client, result) 2720 Release._add_read_property("repository", repo, release) 2721 # For legacy reasons 2722 Release._add_read_property("repo", repo, release) 2723 setattr( 2724 release, 2725 "_assets", 2726 [ 2727 ReleaseAsset.parse_response(allspice_client, asset, release) 2728 for asset in result["assets"] 2729 ], 2730 ) 2731 return release 2732 2733 @classmethod 2734 def request( 2735 cls, 2736 allspice_client, 2737 owner: str, 2738 repo: str, 2739 id: Optional[int] = None, 2740 ) -> Release: 2741 args = {"owner": owner, "repo": repo, "id": id} 2742 release_response = cls._get_gitea_api_object(allspice_client, args) 2743 repository = Repository.request(allspice_client, owner, repo) 2744 release = cls.parse_response(allspice_client, release_response, repository) 2745 return release 2746 2747 def commit(self): 2748 if self.repo is None: 2749 raise ValueError("Cannot commit a release without a repository.") 2750 2751 args = {"owner": self.repo.owner.username, "repo": self.repo.name, "id": self.id} 2752 self._commit(args) 2753 2754 def create_asset(self, file: IO, name: Optional[str] = None) -> ReleaseAsset: 2755 """ 2756 Create an asset for this release. 2757 2758 https://hub.allspice.io/api/swagger#/repository/repoCreateReleaseAsset 2759 2760 :param file: The file to upload. This should be a file-like object. 2761 :param name: The name of the file. 2762 :return: The created asset. 2763 """ 2764 2765 if self.repo is None: 2766 raise ValueError("Cannot commit a release without a repository.") 2767 2768 args: dict[str, Any] = {"files": {"attachment": file}} 2769 if name is not None: 2770 args["params"] = {"name": name} 2771 2772 result = self.allspice_client.requests_post( 2773 self.RELEASE_CREATE_ASSET.format( 2774 owner=self.repo.owner.username, 2775 repo=self.repo.name, 2776 id=self.id, 2777 ), 2778 **args, 2779 ) 2780 return ReleaseAsset.parse_response(self.allspice_client, result, self) 2781 2782 def delete(self): 2783 if self.repo is None: 2784 raise ValueError("Cannot commit a release without a repository.") 2785 2786 args = {"owner": self.repo.owner.username, "repo": self.repo.name, "id": self.id} 2787 self.allspice_client.requests_delete(self.API_OBJECT.format(**args)) 2788 self.deleted = True
A release on a repo.
2717 @classmethod 2718 def parse_response(cls, allspice_client, result, repo) -> Release: 2719 release = super().parse_response(allspice_client, result) 2720 Release._add_read_property("repository", repo, release) 2721 # For legacy reasons 2722 Release._add_read_property("repo", repo, release) 2723 setattr( 2724 release, 2725 "_assets", 2726 [ 2727 ReleaseAsset.parse_response(allspice_client, asset, release) 2728 for asset in result["assets"] 2729 ], 2730 ) 2731 return release
2733 @classmethod 2734 def request( 2735 cls, 2736 allspice_client, 2737 owner: str, 2738 repo: str, 2739 id: Optional[int] = None, 2740 ) -> Release: 2741 args = {"owner": owner, "repo": repo, "id": id} 2742 release_response = cls._get_gitea_api_object(allspice_client, args) 2743 repository = Repository.request(allspice_client, owner, repo) 2744 release = cls.parse_response(allspice_client, release_response, repository) 2745 return release
2754 def create_asset(self, file: IO, name: Optional[str] = None) -> ReleaseAsset: 2755 """ 2756 Create an asset for this release. 2757 2758 https://hub.allspice.io/api/swagger#/repository/repoCreateReleaseAsset 2759 2760 :param file: The file to upload. This should be a file-like object. 2761 :param name: The name of the file. 2762 :return: The created asset. 2763 """ 2764 2765 if self.repo is None: 2766 raise ValueError("Cannot commit a release without a repository.") 2767 2768 args: dict[str, Any] = {"files": {"attachment": file}} 2769 if name is not None: 2770 args["params"] = {"name": name} 2771 2772 result = self.allspice_client.requests_post( 2773 self.RELEASE_CREATE_ASSET.format( 2774 owner=self.repo.owner.username, 2775 repo=self.repo.name, 2776 id=self.id, 2777 ), 2778 **args, 2779 ) 2780 return ReleaseAsset.parse_response(self.allspice_client, result, self)
Create an asset for this release.
allspice.allspice.io/api/swagger#/repository/repoCreateReleaseAsset">https://huballspice.allspice.io/api/swagger#/repository/repoCreateReleaseAsset
Parameters
- file: The file to upload. This should be a file-like object.
- name: The name of the file.
Returns
The created asset.
2782 def delete(self): 2783 if self.repo is None: 2784 raise ValueError("Cannot commit a release without a repository.") 2785 2786 args = {"owner": self.repo.owner.username, "repo": self.repo.name, "id": self.id} 2787 self.allspice_client.requests_delete(self.API_OBJECT.format(**args)) 2788 self.deleted = True
2791class ReleaseAsset(ApiObject): 2792 browser_download_url: str 2793 created_at: str 2794 download_count: int 2795 id: int 2796 name: str 2797 release: Optional["Release"] 2798 size: int 2799 uuid: str 2800 2801 API_OBJECT = "/repos/{owner}/{repo}/releases/{release_id}/assets/{id}" 2802 2803 def __init__(self, allspice_client): 2804 super().__init__(allspice_client) 2805 2806 def __eq__(self, other): 2807 if not isinstance(other, ReleaseAsset): 2808 return False 2809 return self.release == other.release and self.id == other.id 2810 2811 def __hash__(self): 2812 return hash(self.release) ^ hash(self.id) 2813 2814 _fields_to_parsers: ClassVar[dict] = {} 2815 _patchable_fields: ClassVar[set[str]] = { 2816 "name", 2817 } 2818 2819 @classmethod 2820 def parse_response(cls, allspice_client, result, release) -> ReleaseAsset: 2821 asset = super().parse_response(allspice_client, result) 2822 ReleaseAsset._add_read_property("release", release, asset) 2823 return asset 2824 2825 @classmethod 2826 def request( 2827 cls, 2828 allspice_client, 2829 owner: str, 2830 repo: str, 2831 release_id: int, 2832 id: int, 2833 ) -> ReleaseAsset: 2834 args = {"owner": owner, "repo": repo, "release_id": release_id, "id": id} 2835 asset_response = cls._get_gitea_api_object(allspice_client, args) 2836 release = Release.request(allspice_client, owner, repo, release_id) 2837 asset = cls.parse_response(allspice_client, asset_response, release) 2838 return asset 2839 2840 def commit(self): 2841 if self.release is None or self.release.repo is None: 2842 raise ValueError("Cannot commit a release asset without a release or a repository.") 2843 2844 args = { 2845 "owner": self.release.repo.owner.username, 2846 "repo": self.release.repo.name, 2847 "release_id": self.release.id, 2848 "id": self.id, 2849 } 2850 self._commit(args) 2851 2852 def download(self) -> bytes: 2853 """ 2854 Download the raw, binary data of this asset. 2855 2856 Note 1: if the file you are requesting is a text file, you might want to 2857 use .decode() on the result to get a string. For example: 2858 2859 asset.download().decode("utf-8") 2860 2861 Note 2: this method will store the entire file in memory. If you are 2862 downloading a large file, you might want to use download_to_file instead. 2863 """ 2864 2865 return self.allspice_client.requests.get( 2866 self.browser_download_url, 2867 headers=self.allspice_client.headers, 2868 ).content 2869 2870 def download_to_file(self, io: IO): 2871 """ 2872 Download the raw, binary data of this asset to a file-like object. 2873 2874 Example: 2875 2876 with open("my_file.zip", "wb") as f: 2877 asset.download_to_file(f) 2878 2879 :param io: The file-like object to write the data to. 2880 """ 2881 2882 response = self.allspice_client.requests.get( 2883 self.browser_download_url, 2884 headers=self.allspice_client.headers, 2885 stream=True, 2886 ) 2887 # 4kb chunks 2888 for chunk in response.iter_content(chunk_size=4096): 2889 if chunk: 2890 io.write(chunk) 2891 2892 def delete(self): 2893 if self.release is None or self.release.repo is None: 2894 raise ValueError("Cannot commit a release asset without a release or a repository.") 2895 2896 args = { 2897 "owner": self.release.repo.owner.username, 2898 "repo": self.release.repo.name, 2899 "release_id": self.release.id, 2900 "id": self.id, 2901 } 2902 self.allspice_client.requests_delete(self.API_OBJECT.format(**args)) 2903 self.deleted = True
2825 @classmethod 2826 def request( 2827 cls, 2828 allspice_client, 2829 owner: str, 2830 repo: str, 2831 release_id: int, 2832 id: int, 2833 ) -> ReleaseAsset: 2834 args = {"owner": owner, "repo": repo, "release_id": release_id, "id": id} 2835 asset_response = cls._get_gitea_api_object(allspice_client, args) 2836 release = Release.request(allspice_client, owner, repo, release_id) 2837 asset = cls.parse_response(allspice_client, asset_response, release) 2838 return asset
2840 def commit(self): 2841 if self.release is None or self.release.repo is None: 2842 raise ValueError("Cannot commit a release asset without a release or a repository.") 2843 2844 args = { 2845 "owner": self.release.repo.owner.username, 2846 "repo": self.release.repo.name, 2847 "release_id": self.release.id, 2848 "id": self.id, 2849 } 2850 self._commit(args)
2852 def download(self) -> bytes: 2853 """ 2854 Download the raw, binary data of this asset. 2855 2856 Note 1: if the file you are requesting is a text file, you might want to 2857 use .decode() on the result to get a string. For example: 2858 2859 asset.download().decode("utf-8") 2860 2861 Note 2: this method will store the entire file in memory. If you are 2862 downloading a large file, you might want to use download_to_file instead. 2863 """ 2864 2865 return self.allspice_client.requests.get( 2866 self.browser_download_url, 2867 headers=self.allspice_client.headers, 2868 ).content
Download the raw, binary data of this asset.
Note 1: if the file you are requesting is a text file, you might want to use .decode() on the result to get a string. For example:
asset.download().decode("utf-8")
Note 2: this method will store the entire file in memory. If you are downloading a large file, you might want to use download_to_file instead.
2870 def download_to_file(self, io: IO): 2871 """ 2872 Download the raw, binary data of this asset to a file-like object. 2873 2874 Example: 2875 2876 with open("my_file.zip", "wb") as f: 2877 asset.download_to_file(f) 2878 2879 :param io: The file-like object to write the data to. 2880 """ 2881 2882 response = self.allspice_client.requests.get( 2883 self.browser_download_url, 2884 headers=self.allspice_client.headers, 2885 stream=True, 2886 ) 2887 # 4kb chunks 2888 for chunk in response.iter_content(chunk_size=4096): 2889 if chunk: 2890 io.write(chunk)
Download the raw, binary data of this asset to a file-like object.
Example:
with open("my_file.zip", "wb") as f:
asset.download_to_file(f)
Parameters
- io: The file-like object to write the data to.
2892 def delete(self): 2893 if self.release is None or self.release.repo is None: 2894 raise ValueError("Cannot commit a release asset without a release or a repository.") 2895 2896 args = { 2897 "owner": self.release.repo.owner.username, 2898 "repo": self.release.repo.name, 2899 "release_id": self.release.id, 2900 "id": self.id, 2901 } 2902 self.allspice_client.requests_delete(self.API_OBJECT.format(**args)) 2903 self.deleted = True
2906class Content(ReadonlyApiObject): 2907 content: Any 2908 download_url: str 2909 encoding: Any 2910 git_url: str 2911 html_url: str 2912 last_commit_sha: str 2913 name: str 2914 path: str 2915 sha: str 2916 size: int 2917 submodule_git_url: Any 2918 target: Any 2919 type: str 2920 url: str 2921 2922 FILE = "file" 2923 2924 def __init__(self, allspice_client): 2925 super().__init__(allspice_client) 2926 2927 def __eq__(self, other): 2928 if not isinstance(other, Content): 2929 return False 2930 2931 return self.sha == other.sha and self.name == other.name 2932 2933 def __hash__(self): 2934 return hash(self.sha) ^ hash(self.name)
Inherited Members
2940class Util: 2941 @staticmethod 2942 def convert_time(time: str) -> datetime: 2943 """Parsing of strange Gitea time format ("%Y-%m-%dT%H:%M:%S:%z" but with ":" in time zone notation)""" 2944 try: 2945 return datetime.strptime(time[:-3] + "00", "%Y-%m-%dT%H:%M:%S%z") 2946 except ValueError: 2947 return datetime.strptime(time[:-3] + "00", "%Y-%m-%dT%H:%M:%S") 2948 2949 @staticmethod 2950 def format_time(time: datetime) -> str: 2951 """ 2952 Format a datetime object to Gitea's time format. 2953 2954 :param time: The time to format 2955 :return: Formatted time 2956 """ 2957 2958 return time.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S") + "Z" 2959 2960 @staticmethod 2961 def data_params_for_ref(ref: Optional[Ref]) -> Dict: 2962 """ 2963 Given a "ref", returns a dict with the ref parameter for the API call. 2964 2965 If the ref is None, returns an empty dict. You can pass this to the API 2966 directly. 2967 """ 2968 2969 if isinstance(ref, Branch): 2970 return {"ref": ref.name} 2971 elif isinstance(ref, Commit): 2972 return {"ref": ref.sha} 2973 elif ref: 2974 return {"ref": ref} 2975 else: 2976 return {}
2941 @staticmethod 2942 def convert_time(time: str) -> datetime: 2943 """Parsing of strange Gitea time format ("%Y-%m-%dT%H:%M:%S:%z" but with ":" in time zone notation)""" 2944 try: 2945 return datetime.strptime(time[:-3] + "00", "%Y-%m-%dT%H:%M:%S%z") 2946 except ValueError: 2947 return datetime.strptime(time[:-3] + "00", "%Y-%m-%dT%H:%M:%S")
Parsing of strange Gitea time format ("%Y-%m-%dT%H:%M:%S:%z" but with ":" in time zone notation)
2949 @staticmethod 2950 def format_time(time: datetime) -> str: 2951 """ 2952 Format a datetime object to Gitea's time format. 2953 2954 :param time: The time to format 2955 :return: Formatted time 2956 """ 2957 2958 return time.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S") + "Z"
Format a datetime object to Gitea's time format.
Parameters
- time: The time to format
Returns
Formatted time
2960 @staticmethod 2961 def data_params_for_ref(ref: Optional[Ref]) -> Dict: 2962 """ 2963 Given a "ref", returns a dict with the ref parameter for the API call. 2964 2965 If the ref is None, returns an empty dict. You can pass this to the API 2966 directly. 2967 """ 2968 2969 if isinstance(ref, Branch): 2970 return {"ref": ref.name} 2971 elif isinstance(ref, Commit): 2972 return {"ref": ref.sha} 2973 elif ref: 2974 return {"ref": ref} 2975 else: 2976 return {}
Given a "ref", returns a dict with the ref parameter for the API call.
If the ref is None, returns an empty dict. You can pass this to the API directly.