allspice.apiobject
1from __future__ import annotations 2 3import logging 4import re 5from datetime import datetime, timezone 6from enum import Enum 7from functools import cached_property 8from typing import ( 9 IO, 10 Any, 11 ClassVar, 12 Dict, 13 FrozenSet, 14 List, 15 Literal, 16 Optional, 17 Sequence, 18 Set, 19 Tuple, 20 Union, 21) 22 23try: 24 from typing_extensions import Self 25except ImportError: 26 from typing import Self 27 28from .baseapiobject import ApiObject, ReadonlyApiObject 29from .exceptions import ConflictException, NotFoundException 30 31 32class Organization(ApiObject): 33 """see https://hub.allspice.io/api/swagger#/organization/orgGetAll""" 34 35 active: Optional[bool] 36 avatar_url: str 37 created: Optional[str] 38 description: str 39 email: str 40 followers_count: Optional[int] 41 following_count: Optional[int] 42 full_name: str 43 html_url: Optional[str] 44 id: int 45 is_admin: Optional[bool] 46 language: Optional[str] 47 last_login: Optional[str] 48 location: str 49 login: Optional[str] 50 login_name: Optional[str] 51 name: Optional[str] 52 prohibit_login: Optional[bool] 53 repo_admin_change_team_access: Optional[bool] 54 restricted: Optional[bool] 55 source_id: Optional[int] 56 starred_repos_count: Optional[int] 57 username: str 58 visibility: str 59 website: str 60 61 API_OBJECT = """/orgs/{name}""" # <org> 62 ORG_REPOS_REQUEST = """/orgs/%s/repos""" # <org> 63 ORG_TEAMS_REQUEST = """/orgs/%s/teams""" # <org> 64 ORG_TEAMS_CREATE = """/orgs/%s/teams""" # <org> 65 ORG_GET_MEMBERS = """/orgs/%s/members""" # <org> 66 ORG_IS_MEMBER = """/orgs/%s/members/%s""" # <org>, <username> 67 ORG_HEATMAP = """/users/%s/heatmap""" # <username> 68 69 def __init__(self, allspice_client): 70 super().__init__(allspice_client) 71 72 def __eq__(self, other): 73 if not isinstance(other, Organization): 74 return False 75 return self.allspice_client == other.allspice_client and self.name == other.name 76 77 def __hash__(self): 78 return hash(self.allspice_client) ^ hash(self.name) 79 80 @classmethod 81 def request(cls, allspice_client, name: str) -> Self: 82 return cls._request(allspice_client, {"name": name}) 83 84 @classmethod 85 def parse_response(cls, allspice_client, result) -> "Organization": 86 api_object = super().parse_response(allspice_client, result) 87 # add "name" field to make this behave similar to users for gitea < 1.18 88 # also necessary for repository-owner when org is repo owner 89 if not hasattr(api_object, "name"): 90 Organization._add_read_property("name", result["username"], api_object) 91 return api_object 92 93 _patchable_fields: ClassVar[set[str]] = { 94 "description", 95 "full_name", 96 "location", 97 "visibility", 98 "website", 99 } 100 101 def commit(self): 102 args = {"name": self.name} 103 self._commit(args) 104 105 def create_repo( 106 self, 107 repoName: str, 108 description: str = "", 109 private: bool = False, 110 autoInit=True, 111 gitignores: Optional[str] = None, 112 license: Optional[str] = None, 113 readme: str = "Default", 114 issue_labels: Optional[str] = None, 115 default_branch="master", 116 ): 117 """Create an organization Repository 118 119 Throws: 120 AlreadyExistsException: If the Repository exists already. 121 Exception: If something else went wrong. 122 """ 123 result = self.allspice_client.requests_post( 124 f"/orgs/{self.name}/repos", 125 data={ 126 "name": repoName, 127 "description": description, 128 "private": private, 129 "auto_init": autoInit, 130 "gitignores": gitignores, 131 "license": license, 132 "issue_labels": issue_labels, 133 "readme": readme, 134 "default_branch": default_branch, 135 }, 136 ) 137 if "id" in result: 138 self.allspice_client.logger.info("Successfully created Repository %s " % result["name"]) 139 else: 140 self.allspice_client.logger.error(result["message"]) 141 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 142 return Repository.parse_response(self.allspice_client, result) 143 144 def get_repositories(self) -> List["Repository"]: 145 results = self.allspice_client.requests_get_paginated( 146 Organization.ORG_REPOS_REQUEST % self.username 147 ) 148 return [Repository.parse_response(self.allspice_client, result) for result in results] 149 150 def get_repository(self, name) -> "Repository": 151 repos = self.get_repositories() 152 for repo in repos: 153 if repo.name == name: 154 return repo 155 raise NotFoundException("Repository %s not existent in organization." % name) 156 157 def get_teams(self) -> List["Team"]: 158 results = self.allspice_client.requests_get(Organization.ORG_TEAMS_REQUEST % self.username) 159 teams = [Team.parse_response(self.allspice_client, result) for result in results] 160 # organisation seems to be missing using this request, so we add org manually 161 for t in teams: 162 setattr(t, "_organization", self) 163 return teams 164 165 def get_team(self, name) -> "Team": 166 teams = self.get_teams() 167 for team in teams: 168 if team.name == name: 169 return team 170 raise NotFoundException("Team not existent in organization.") 171 172 def create_team( 173 self, 174 name: str, 175 description: str = "", 176 permission: str = "read", 177 can_create_org_repo: bool = False, 178 includes_all_repositories: bool = False, 179 units=( 180 "repo.code", 181 "repo.issues", 182 "repo.ext_issues", 183 "repo.wiki", 184 "repo.pulls", 185 "repo.releases", 186 "repo.ext_wiki", 187 ), 188 units_map={}, 189 ) -> "Team": 190 """Alias for AllSpice#create_team""" 191 # TODO: Move AllSpice#create_team to Organization#create_team and 192 # deprecate AllSpice#create_team. 193 return self.allspice_client.create_team( 194 org=self, 195 name=name, 196 description=description, 197 permission=permission, 198 can_create_org_repo=can_create_org_repo, 199 includes_all_repositories=includes_all_repositories, 200 units=units, 201 units_map=units_map, 202 ) 203 204 def get_members(self) -> List["User"]: 205 results = self.allspice_client.requests_get(Organization.ORG_GET_MEMBERS % self.username) 206 return [User.parse_response(self.allspice_client, result) for result in results] 207 208 def is_member(self, username) -> bool: 209 if isinstance(username, User): 210 username = username.username 211 try: 212 # returns 204 if its ok, 404 if its not 213 self.allspice_client.requests_get( 214 Organization.ORG_IS_MEMBER % (self.username, username) 215 ) 216 return True 217 except Exception: 218 return False 219 220 def remove_member(self, user: "User"): 221 path = f"/orgs/{self.username}/members/{user.username}" 222 self.allspice_client.requests_delete(path) 223 224 def delete(self): 225 """Delete this Organization. Invalidates this Objects data. Also deletes all Repositories owned by the User""" 226 for repo in self.get_repositories(): 227 repo.delete() 228 self.allspice_client.requests_delete(Organization.API_OBJECT.format(name=self.username)) 229 self.deleted = True 230 231 def get_heatmap(self) -> List[Tuple[datetime, int]]: 232 results = self.allspice_client.requests_get(User.USER_HEATMAP % self.username) 233 results = [ 234 (datetime.fromtimestamp(result["timestamp"]), result["contributions"]) 235 for result in results 236 ] 237 return results 238 239 240class User(ApiObject): 241 active: bool 242 admin: Any 243 allow_create_organization: Any 244 allow_git_hook: Any 245 allow_import_local: Any 246 avatar_url: str 247 created: str 248 description: str 249 email: str 250 emails: List[Any] 251 followers_count: int 252 following_count: int 253 full_name: str 254 html_url: str 255 id: int 256 is_admin: bool 257 language: str 258 last_login: str 259 location: str 260 login: str 261 login_name: str 262 max_repo_creation: Any 263 must_change_password: Any 264 password: Any 265 prohibit_login: bool 266 restricted: bool 267 source_id: int 268 starred_repos_count: int 269 username: str 270 visibility: str 271 website: str 272 273 API_OBJECT = """/users/{name}""" # <org> 274 USER_MAIL = """/user/emails?sudo=%s""" # <name> 275 USER_PATCH = """/admin/users/%s""" # <username> 276 ADMIN_DELETE_USER = """/admin/users/%s""" # <username> 277 ADMIN_EDIT_USER = """/admin/users/{username}""" # <username> 278 USER_HEATMAP = """/users/%s/heatmap""" # <username> 279 280 def __init__(self, allspice_client): 281 super().__init__(allspice_client) 282 self._emails = [] 283 284 def __eq__(self, other): 285 if not isinstance(other, User): 286 return False 287 return self.allspice_client == other.allspice_client and self.id == other.id 288 289 def __hash__(self): 290 return hash(self.allspice_client) ^ hash(self.id) 291 292 @property 293 def emails(self): 294 self.__request_emails() 295 return self._emails 296 297 @classmethod 298 def request(cls, allspice_client, name: str) -> "User": 299 api_object = cls._request(allspice_client, {"name": name}) 300 return api_object 301 302 _patchable_fields: ClassVar[set[str]] = { 303 "active", 304 "admin", 305 "allow_create_organization", 306 "allow_git_hook", 307 "allow_import_local", 308 "email", 309 "full_name", 310 "location", 311 "login_name", 312 "max_repo_creation", 313 "must_change_password", 314 "password", 315 "prohibit_login", 316 "website", 317 } 318 319 def commit(self, login_name: str, source_id: int = 0): 320 """ 321 Unfortunately it is necessary to require the login name 322 as well as the login source (that is not supplied when getting a user) for 323 changing a user. 324 Usually source_id is 0 and the login_name is equal to the username. 325 """ 326 values = self.get_dirty_fields() 327 values.update( 328 # api-doc says that the "source_id" is necessary; works without though 329 {"login_name": login_name, "source_id": source_id} 330 ) 331 args = {"username": self.username} 332 self.allspice_client.requests_patch(User.ADMIN_EDIT_USER.format(**args), data=values) 333 self._dirty_fields = {} 334 335 def create_repo( 336 self, 337 repoName: str, 338 description: str = "", 339 private: bool = False, 340 autoInit=True, 341 gitignores: Optional[str] = None, 342 license: Optional[str] = None, 343 readme: str = "Default", 344 issue_labels: Optional[str] = None, 345 default_branch="master", 346 ): 347 """Create a user Repository 348 349 Throws: 350 AlreadyExistsException: If the Repository exists already. 351 Exception: If something else went wrong. 352 """ 353 result = self.allspice_client.requests_post( 354 "/user/repos", 355 data={ 356 "name": repoName, 357 "description": description, 358 "private": private, 359 "auto_init": autoInit, 360 "gitignores": gitignores, 361 "license": license, 362 "issue_labels": issue_labels, 363 "readme": readme, 364 "default_branch": default_branch, 365 }, 366 ) 367 if "id" in result: 368 self.allspice_client.logger.info("Successfully created Repository %s " % result["name"]) 369 else: 370 self.allspice_client.logger.error(result["message"]) 371 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 372 return Repository.parse_response(self.allspice_client, result) 373 374 def get_repositories(self) -> List["Repository"]: 375 """Get all Repositories owned by this User.""" 376 url = f"/users/{self.username}/repos" 377 results = self.allspice_client.requests_get_paginated(url) 378 return [Repository.parse_response(self.allspice_client, result) for result in results] 379 380 def get_orgs(self) -> List[Organization]: 381 """Get all Organizations this user is a member of.""" 382 url = f"/users/{self.username}/orgs" 383 results = self.allspice_client.requests_get_paginated(url) 384 return [Organization.parse_response(self.allspice_client, result) for result in results] 385 386 def get_teams(self) -> List["Team"]: 387 url = "/user/teams" 388 results = self.allspice_client.requests_get_paginated(url, sudo=self) 389 return [Team.parse_response(self.allspice_client, result) for result in results] 390 391 def get_accessible_repos(self) -> List["Repository"]: 392 """Get all Repositories accessible by the logged in User.""" 393 results = self.allspice_client.requests_get("/user/repos", sudo=self) 394 return [Repository.parse_response(self.allspice_client, result) for result in results] 395 396 def __request_emails(self): 397 result = self.allspice_client.requests_get(User.USER_MAIL % self.login) 398 # report if the adress changed by this 399 for mail in result: 400 self._emails.append(mail["email"]) 401 if mail["primary"]: 402 self._email = mail["email"] 403 404 def delete(self): 405 """Deletes this User. Also deletes all Repositories he owns.""" 406 self.allspice_client.requests_delete(User.ADMIN_DELETE_USER % self.username) 407 self.deleted = True 408 409 def get_heatmap(self) -> List[Tuple[datetime, int]]: 410 results = self.allspice_client.requests_get(User.USER_HEATMAP % self.username) 411 results = [ 412 (datetime.fromtimestamp(result["timestamp"]), result["contributions"]) 413 for result in results 414 ] 415 return results 416 417 418class Branch(ReadonlyApiObject): 419 commit: Dict[str, Optional[Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]]] 420 effective_branch_protection_name: str 421 enable_status_check: bool 422 name: str 423 protected: bool 424 required_approvals: int 425 status_check_contexts: List[Any] 426 user_can_merge: bool 427 user_can_push: bool 428 429 API_OBJECT = """/repos/{owner}/{repo}/branches/{branch}""" 430 431 def __init__(self, allspice_client): 432 super().__init__(allspice_client) 433 434 def __eq__(self, other): 435 if not isinstance(other, Branch): 436 return False 437 return self.commit == other.commit and self.name == other.name 438 439 def __hash__(self): 440 return hash(self.commit["id"]) ^ hash(self.name) 441 442 _fields_to_parsers: ClassVar[dict] = { 443 # This is not a commit object 444 # "commit": lambda allspice_client, c: Commit.parse_response(allspice_client, c) 445 } 446 447 @classmethod 448 def request(cls, allspice_client, owner: str, repo: str, branch: str): 449 return cls._request(allspice_client, {"owner": owner, "repo": repo, "branch": branch}) 450 451 452class GitEntry(ReadonlyApiObject): 453 """ 454 An object representing a file or directory in the Git tree. 455 """ 456 457 mode: str 458 path: str 459 sha: str 460 size: int 461 type: str 462 url: str 463 464 def __init__(self, allspice_client): 465 super().__init__(allspice_client) 466 467 def __eq__(self, other) -> bool: 468 if not isinstance(other, GitEntry): 469 return False 470 return self.sha == other.sha 471 472 def __hash__(self) -> int: 473 return hash(self.sha) 474 475 476class Repository(ApiObject): 477 allow_fast_forward_only_merge: bool 478 allow_manual_merge: Any 479 allow_merge_commits: bool 480 allow_rebase: bool 481 allow_rebase_explicit: bool 482 allow_rebase_update: bool 483 allow_squash_merge: bool 484 archived: bool 485 archived_at: str 486 autodetect_manual_merge: Any 487 avatar_url: str 488 clone_url: str 489 created_at: str 490 default_allow_maintainer_edit: bool 491 default_branch: str 492 default_delete_branch_after_merge: bool 493 default_merge_style: str 494 description: str 495 empty: bool 496 enable_prune: Any 497 external_tracker: Any 498 external_wiki: Any 499 fork: bool 500 forks_count: int 501 full_name: str 502 has_actions: bool 503 has_issues: bool 504 has_packages: bool 505 has_projects: bool 506 has_pull_requests: bool 507 has_releases: bool 508 has_wiki: bool 509 html_url: str 510 id: int 511 ignore_whitespace_conflicts: bool 512 internal: bool 513 internal_tracker: Dict[str, bool] 514 language: str 515 languages_url: str 516 link: str 517 mirror: bool 518 mirror_interval: str 519 mirror_updated: str 520 name: str 521 object_format_name: str 522 open_issues_count: int 523 open_pr_counter: int 524 original_url: str 525 owner: Union["User", "Organization"] 526 parent: Any 527 permissions: Dict[str, bool] 528 private: bool 529 projects_mode: str 530 release_counter: int 531 repo_transfer: Any 532 size: int 533 ssh_url: str 534 stars_count: int 535 template: bool 536 updated_at: datetime 537 url: str 538 watchers_count: int 539 website: str 540 541 API_OBJECT = """/repos/{owner}/{name}""" # <owner>, <reponame> 542 REPO_IS_COLLABORATOR = """/repos/%s/%s/collaborators/%s""" # <owner>, <reponame>, <username> 543 REPO_SEARCH = """/repos/search/""" 544 REPO_BRANCHES = """/repos/%s/%s/branches""" # <owner>, <reponame> 545 REPO_BRANCH = """/repos/{owner}/{repo}/branches/{branch}""" 546 REPO_ISSUES = """/repos/{owner}/{repo}/issues""" # <owner, reponame> 547 REPO_DESIGN_REVIEWS = """/repos/{owner}/{repo}/pulls""" 548 REPO_DELETE = """/repos/%s/%s""" # <owner>, <reponame> 549 REPO_TIMES = """/repos/%s/%s/times""" # <owner>, <reponame> 550 REPO_USER_TIME = """/repos/%s/%s/times/%s""" # <owner>, <reponame>, <username> 551 REPO_COMMITS = "/repos/%s/%s/commits" # <owner>, <reponame> 552 REPO_TRANSFER = "/repos/{owner}/{repo}/transfer" 553 REPO_MILESTONES = """/repos/{owner}/{repo}/milestones""" 554 REPO_GET_ARCHIVE = "/repos/{owner}/{repo}/archive/{ref}.{format}" 555 REPO_GET_ALLSPICE_JSON = "/repos/{owner}/{repo}/allspice_generated/json/{content}" 556 REPO_GET_ALLSPICE_SVG = "/repos/{owner}/{repo}/allspice_generated/svg/{content}" 557 REPO_GET_TOPICS = "/repos/{owner}/{repo}/topics" 558 REPO_ADD_TOPIC = "/repos/{owner}/{repo}/topics/{topic}" 559 REPO_GET_RELEASES = "/repos/{owner}/{repo}/releases" 560 REPO_GET_LATEST_RELEASE = "/repos/{owner}/{repo}/releases/latest" 561 REPO_GET_RELEASE_BY_TAG = "/repos/{owner}/{repo}/releases/tags/{tag}" 562 REPO_GET_COMMIT_STATUS = "/repos/{owner}/{repo}/statuses/{sha}" 563 REPO_GET_RAW_FILE = "/repos/{owner}/{repo}/raw/{path}" 564 REPO_GET_TREE = "/repos/{owner}/{repo}/git/trees/{ref}" 565 566 class ArchiveFormat(Enum): 567 """ 568 Archive formats for Repository.get_archive 569 """ 570 571 TAR = "tar.gz" 572 ZIP = "zip" 573 574 class CommitStatusSort(Enum): 575 """ 576 Sort order for Repository.get_commit_status 577 """ 578 579 OLDEST = "oldest" 580 RECENT_UPDATE = "recentupdate" 581 LEAST_UPDATE = "leastupdate" 582 LEAST_INDEX = "leastindex" 583 HIGHEST_INDEX = "highestindex" 584 585 def __init__(self, allspice_client): 586 super().__init__(allspice_client) 587 588 def __eq__(self, other): 589 if not isinstance(other, Repository): 590 return False 591 return self.owner == other.owner and self.name == other.name 592 593 def __hash__(self): 594 return hash(self.owner) ^ hash(self.name) 595 596 _fields_to_parsers: ClassVar[dict] = { 597 # dont know how to tell apart user and org as owner except form email being empty. 598 "owner": lambda allspice_client, r: ( 599 Organization.parse_response(allspice_client, r) 600 if r["email"] == "" 601 else User.parse_response(allspice_client, r) 602 ), 603 "updated_at": lambda _, t: Util.convert_time(t), 604 } 605 606 @classmethod 607 def request( 608 cls, 609 allspice_client, 610 owner: str, 611 name: str, 612 ) -> Repository: 613 return cls._request(allspice_client, {"owner": owner, "name": name}) 614 615 @classmethod 616 def search( 617 cls, 618 allspice_client, 619 query: Optional[str] = None, 620 topic: bool = False, 621 include_description: bool = False, 622 user: Optional[User] = None, 623 owner_to_prioritize: Union[User, Organization, None] = None, 624 ) -> list[Repository]: 625 """ 626 Search for repositories. 627 628 See https://hub.allspice.io/api/swagger#/repository/repoSearch 629 630 :param query: The query string to search for 631 :param topic: If true, the query string will only be matched against the 632 repository's topic. 633 :param include_description: If true, the query string will be matched 634 against the repository's description as well. 635 :param user: If specified, only repositories that this user owns or 636 contributes to will be searched. 637 :param owner_to_prioritize: If specified, repositories owned by the 638 given entity will be prioritized in the search. 639 :returns: All repositories matching the query. If there are many 640 repositories matching this query, this may take some time. 641 """ 642 643 params = {} 644 645 if query is not None: 646 params["q"] = query 647 if topic: 648 params["topic"] = topic 649 if include_description: 650 params["include_description"] = include_description 651 if user is not None: 652 params["user"] = user.id 653 if owner_to_prioritize is not None: 654 params["owner_to_prioritize"] = owner_to_prioritize.id 655 656 responses = allspice_client.requests_get_paginated(cls.REPO_SEARCH, params=params) 657 658 return [Repository.parse_response(allspice_client, response) for response in responses] 659 660 _patchable_fields: ClassVar[set[str]] = { 661 "allow_manual_merge", 662 "allow_merge_commits", 663 "allow_rebase", 664 "allow_rebase_explicit", 665 "allow_rebase_update", 666 "allow_squash_merge", 667 "archived", 668 "autodetect_manual_merge", 669 "default_branch", 670 "default_delete_branch_after_merge", 671 "default_merge_style", 672 "description", 673 "enable_prune", 674 "external_tracker", 675 "external_wiki", 676 "has_issues", 677 "has_projects", 678 "has_pull_requests", 679 "has_wiki", 680 "ignore_whitespace_conflicts", 681 "internal_tracker", 682 "mirror_interval", 683 "name", 684 "private", 685 "template", 686 "website", 687 } 688 689 def commit(self): 690 args = {"owner": self.owner.username, "name": self.name} 691 self._commit(args) 692 693 def get_branches(self) -> List["Branch"]: 694 """Get all the Branches of this Repository.""" 695 696 results = self.allspice_client.requests_get_paginated( 697 Repository.REPO_BRANCHES % (self.owner.username, self.name) 698 ) 699 return [Branch.parse_response(self.allspice_client, result) for result in results] 700 701 def get_branch(self, name: str) -> "Branch": 702 """Get a specific Branch of this Repository.""" 703 result = self.allspice_client.requests_get( 704 Repository.REPO_BRANCH.format(owner=self.owner.username, repo=self.name, branch=name) 705 ) 706 return Branch.parse_response(self.allspice_client, result) 707 708 def add_branch(self, create_from: Ref, newname: str) -> "Branch": 709 """Add a branch to the repository""" 710 # Note: will only work with gitea 1.13 or higher! 711 712 ref_name = Util.data_params_for_ref(create_from) 713 if "ref" not in ref_name: 714 raise ValueError("create_from must be a Branch, Commit or string") 715 ref_name = ref_name["ref"] 716 717 data = {"new_branch_name": newname, "old_ref_name": ref_name} 718 result = self.allspice_client.requests_post( 719 Repository.REPO_BRANCHES % (self.owner.username, self.name), data=data 720 ) 721 return Branch.parse_response(self.allspice_client, result) 722 723 def get_issues( 724 self, 725 state: Literal["open", "closed", "all"] = "all", 726 search_query: Optional[str] = None, 727 labels: Optional[List[str]] = None, 728 milestones: Optional[List[Union[Milestone, str]]] = None, 729 assignee: Optional[Union[User, str]] = None, 730 since: Optional[datetime] = None, 731 before: Optional[datetime] = None, 732 ) -> List["Issue"]: 733 """ 734 Get all Issues of this Repository (open and closed) 735 736 https://hub.allspice.io/api/swagger#/repository/repoListIssues 737 738 All params of this method are optional filters. If you don't specify a filter, it 739 will not be applied. 740 741 :param state: The state of the Issues to get. If None, all Issues are returned. 742 :param search_query: Filter issues by text. This is equivalent to searching for 743 `search_query` in the Issues on the web interface. 744 :param labels: Filter issues by labels. 745 :param milestones: Filter issues by milestones. 746 :param assignee: Filter issues by the assigned user. 747 :param since: Filter issues by the date they were created. 748 :param before: Filter issues by the date they were created. 749 :return: A list of Issues. 750 """ 751 752 data = { 753 "state": state, 754 } 755 if search_query: 756 data["q"] = search_query 757 if labels: 758 data["labels"] = ",".join(labels) 759 if milestones: 760 data["milestone"] = ",".join( 761 [ 762 milestone.name if isinstance(milestone, Milestone) else milestone 763 for milestone in milestones 764 ] 765 ) 766 if assignee: 767 if isinstance(assignee, User): 768 data["assignee"] = assignee.username 769 else: 770 data["assignee"] = assignee 771 if since: 772 data["since"] = Util.format_time(since) 773 if before: 774 data["before"] = Util.format_time(before) 775 776 results = self.allspice_client.requests_get_paginated( 777 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 778 params=data, 779 ) 780 781 issues = [] 782 for result in results: 783 issue = Issue.parse_response(self.allspice_client, result) 784 # See Issue.request 785 setattr(issue, "_repository", self) 786 # This is mostly for compatibility with an older implementation 787 Issue._add_read_property("repo", self, issue) 788 issues.append(issue) 789 790 return issues 791 792 def get_design_reviews( 793 self, 794 state: Literal["open", "closed", "all"] = "all", 795 milestone: Optional[Union[Milestone, str]] = None, 796 labels: Optional[List[str]] = None, 797 ) -> List["DesignReview"]: 798 """ 799 Get all Design Reviews of this Repository. 800 801 https://hub.allspice.io/api/swagger#/repository/repoListPullRequests 802 803 :param state: The state of the Design Reviews to get. If None, all Design Reviews 804 are returned. 805 :param milestone: The milestone of the Design Reviews to get. 806 :param labels: A list of label IDs to filter DRs by. 807 :return: A list of Design Reviews. 808 """ 809 810 params = { 811 "state": state, 812 } 813 if milestone: 814 if isinstance(milestone, Milestone): 815 params["milestone"] = milestone.name 816 else: 817 params["milestone"] = milestone 818 if labels: 819 params["labels"] = ",".join(labels) 820 821 results = self.allspice_client.requests_get_paginated( 822 self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name), 823 params=params, 824 ) 825 return [DesignReview.parse_response(self.allspice_client, result) for result in results] 826 827 def get_commits( 828 self, 829 sha: Optional[str] = None, 830 path: Optional[str] = None, 831 stat: bool = True, 832 ) -> List["Commit"]: 833 """ 834 Get all the Commits of this Repository. 835 836 https://hub.allspice.io/api/swagger#/repository/repoGetAllCommits 837 838 :param sha: The SHA of the commit to start listing commits from. 839 :param path: filepath of a file/dir. 840 :param stat: Include the number of additions and deletions in the response. 841 Disable for speedup. 842 :return: A list of Commits. 843 """ 844 845 data = {} 846 if sha: 847 data["sha"] = sha 848 if path: 849 data["path"] = path 850 if not stat: 851 data["stat"] = False 852 853 try: 854 results = self.allspice_client.requests_get_paginated( 855 Repository.REPO_COMMITS % (self.owner.username, self.name), 856 params=data, 857 ) 858 except ConflictException as err: 859 logging.warning(err) 860 logging.warning("Repository %s/%s is Empty" % (self.owner.username, self.name)) 861 results = [] 862 return [Commit.parse_response(self.allspice_client, result) for result in results] 863 864 def get_issues_state(self, state) -> List["Issue"]: 865 """ 866 DEPRECATED: Use get_issues() instead. 867 868 Get issues of state Issue.open or Issue.closed of a repository. 869 """ 870 871 assert state in [Issue.OPENED, Issue.CLOSED] 872 issues = [] 873 data = {"state": state} 874 results = self.allspice_client.requests_get_paginated( 875 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 876 params=data, 877 ) 878 for result in results: 879 issue = Issue.parse_response(self.allspice_client, result) 880 # adding data not contained in the issue response 881 # See Issue.request() 882 setattr(issue, "_repository", self) 883 Issue._add_read_property("repo", self, issue) 884 Issue._add_read_property("owner", self.owner, issue) 885 issues.append(issue) 886 return issues 887 888 def get_times(self): 889 results = self.allspice_client.requests_get( 890 Repository.REPO_TIMES % (self.owner.username, self.name) 891 ) 892 return results 893 894 def get_user_time(self, username) -> float: 895 if isinstance(username, User): 896 username = username.username 897 results = self.allspice_client.requests_get( 898 Repository.REPO_USER_TIME % (self.owner.username, self.name, username) 899 ) 900 time = sum(r["time"] for r in results) 901 return time 902 903 def get_full_name(self) -> str: 904 return self.owner.username + "/" + self.name 905 906 def create_issue(self, title, assignees=frozenset(), description="") -> ApiObject: 907 data = { 908 "assignees": assignees, 909 "body": description, 910 "closed": False, 911 "title": title, 912 } 913 result = self.allspice_client.requests_post( 914 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 915 data=data, 916 ) 917 918 issue = Issue.parse_response(self.allspice_client, result) 919 setattr(issue, "_repository", self) 920 Issue._add_read_property("repo", self, issue) 921 return issue 922 923 def create_design_review( 924 self, 925 title: str, 926 head: Union[Branch, str], 927 base: Union[Branch, str], 928 assignees: Optional[Set[Union[User, str]]] = None, 929 body: Optional[str] = None, 930 due_date: Optional[datetime] = None, 931 milestone: Optional["Milestone"] = None, 932 ) -> "DesignReview": 933 """ 934 Create a new Design Review. 935 936 See https://hub.allspice.io/api/swagger#/repository/repoCreatePullRequest 937 938 :param title: Title of the Design Review 939 :param head: Branch or name of the branch to merge into the base branch 940 :param base: Branch or name of the branch to merge into 941 :param assignees: Optional. A list of users to assign this review. List can be of 942 User objects or of usernames. 943 :param body: An Optional Description for the Design Review. 944 :param due_date: An Optional Due date for the Design Review. 945 :param milestone: An Optional Milestone for the Design Review 946 :return: The created Design Review 947 """ 948 949 data: dict[str, Any] = { 950 "title": title, 951 } 952 953 if isinstance(head, Branch): 954 data["head"] = head.name 955 else: 956 data["head"] = head 957 if isinstance(base, Branch): 958 data["base"] = base.name 959 else: 960 data["base"] = base 961 if assignees: 962 data["assignees"] = [a.username if isinstance(a, User) else a for a in assignees] 963 if body: 964 data["body"] = body 965 if due_date: 966 data["due_date"] = Util.format_time(due_date) 967 if milestone: 968 data["milestone"] = milestone.id 969 970 result = self.allspice_client.requests_post( 971 self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name), 972 data=data, 973 ) 974 975 return DesignReview.parse_response(self.allspice_client, result) 976 977 def create_milestone( 978 self, 979 title: str, 980 description: str, 981 due_date: Optional[str] = None, 982 state: str = "open", 983 ) -> "Milestone": 984 url = Repository.REPO_MILESTONES.format(owner=self.owner.username, repo=self.name) 985 data = {"title": title, "description": description, "state": state} 986 if due_date: 987 data["due_date"] = due_date 988 result = self.allspice_client.requests_post(url, data=data) 989 return Milestone.parse_response(self.allspice_client, result) 990 991 def create_gitea_hook(self, hook_url: str, events: List[str]): 992 url = f"/repos/{self.owner.username}/{self.name}/hooks" 993 data = { 994 "type": "gitea", 995 "config": {"content_type": "json", "url": hook_url}, 996 "events": events, 997 "active": True, 998 } 999 return self.allspice_client.requests_post(url, data=data) 1000 1001 def list_hooks(self): 1002 url = f"/repos/{self.owner.username}/{self.name}/hooks" 1003 return self.allspice_client.requests_get(url) 1004 1005 def delete_hook(self, id: str): 1006 url = f"/repos/{self.owner.username}/{self.name}/hooks/{id}" 1007 self.allspice_client.requests_delete(url) 1008 1009 def is_collaborator(self, username) -> bool: 1010 if isinstance(username, User): 1011 username = username.username 1012 try: 1013 # returns 204 if its ok, 404 if its not 1014 self.allspice_client.requests_get( 1015 Repository.REPO_IS_COLLABORATOR % (self.owner.username, self.name, username) 1016 ) 1017 return True 1018 except Exception: 1019 return False 1020 1021 def get_users_with_access(self) -> Sequence[User]: 1022 url = f"/repos/{self.owner.username}/{self.name}/collaborators" 1023 response = self.allspice_client.requests_get(url) 1024 collabs = [User.parse_response(self.allspice_client, user) for user in response] 1025 if isinstance(self.owner, User): 1026 return [*collabs, self.owner] 1027 else: 1028 # owner must be org 1029 teams = self.owner.get_teams() 1030 for team in teams: 1031 team_repos = team.get_repos() 1032 if self.name in [n.name for n in team_repos]: 1033 collabs += team.get_members() 1034 return collabs 1035 1036 def remove_collaborator(self, user_name: str): 1037 url = f"/repos/{self.owner.username}/{self.name}/collaborators/{user_name}" 1038 self.allspice_client.requests_delete(url) 1039 1040 def transfer_ownership( 1041 self, 1042 new_owner: Union[User, Organization], 1043 new_teams: Set[Team] | FrozenSet[Team] = frozenset(), 1044 ): 1045 url = Repository.REPO_TRANSFER.format(owner=self.owner.username, repo=self.name) 1046 data: dict[str, Any] = {"new_owner": new_owner.username} 1047 if isinstance(new_owner, Organization): 1048 new_team_ids = [team.id for team in new_teams if team in new_owner.get_teams()] 1049 data["team_ids"] = new_team_ids 1050 self.allspice_client.requests_post(url, data=data) 1051 # TODO: make sure this instance is either updated or discarded 1052 1053 def get_git_content( 1054 self, 1055 ref: Optional["Ref"] = None, 1056 commit: "Optional[Commit]" = None, 1057 ) -> List[Content]: 1058 """ 1059 Get the metadata for all files in the root directory. 1060 1061 https://hub.allspice.io/api/swagger#/repository/repoGetContentsList 1062 1063 :param ref: branch or commit to get content from 1064 :param commit: commit to get content from (deprecated) 1065 """ 1066 url = f"/repos/{self.owner.username}/{self.name}/contents" 1067 data = Util.data_params_for_ref(ref or commit) 1068 1069 result = [ 1070 Content.parse_response(self.allspice_client, f) 1071 for f in self.allspice_client.requests_get(url, data) 1072 ] 1073 return result 1074 1075 def get_tree(self, ref: Optional[Ref] = None, recursive: bool = False) -> List[GitEntry]: 1076 """ 1077 Get the repository's tree on a given ref. 1078 1079 By default, this will only return the top-level entries in the tree. If you want 1080 to get the entire tree, set `recursive` to True. 1081 1082 :param ref: The ref to get the tree from. If not provided, the default branch is used. 1083 :param recursive: Whether to get the entire tree or just the top-level entries. 1084 """ 1085 1086 ref = Util.data_params_for_ref(ref).get("ref", self.default_branch) 1087 url = self.REPO_GET_TREE.format(owner=self.owner.username, repo=self.name, ref=ref) 1088 params = {"recursive": recursive} 1089 results = self.allspice_client.requests_get_paginated(url, params=params) 1090 return [GitEntry.parse_response(self.allspice_client, result) for result in results] 1091 1092 def get_file_content( 1093 self, 1094 content: Content, 1095 ref: Optional[Ref] = None, 1096 commit: Optional[Commit] = None, 1097 ) -> Union[str, List["Content"]]: 1098 """https://hub.allspice.io/api/swagger#/repository/repoGetContents""" 1099 url = f"/repos/{self.owner.username}/{self.name}/contents/{content.path}" 1100 data = Util.data_params_for_ref(ref or commit) 1101 1102 if content.type == Content.FILE: 1103 return self.allspice_client.requests_get(url, data)["content"] 1104 else: 1105 return [ 1106 Content.parse_response(self.allspice_client, f) 1107 for f in self.allspice_client.requests_get(url, data) 1108 ] 1109 1110 def get_raw_file( 1111 self, 1112 file_path: str, 1113 ref: Optional[Ref] = None, 1114 ) -> bytes: 1115 """ 1116 Get the raw, binary data of a single file. 1117 1118 Note 1: if the file you are requesting is a text file, you might want to 1119 use .decode() on the result to get a string. For example: 1120 1121 content = repo.get_raw_file("file.txt").decode("utf-8") 1122 1123 Note 2: this method will store the entire file in memory. If you want 1124 to download a large file, you might want to use `download_to_file` 1125 instead. 1126 1127 See https://hub.allspice.io/api/swagger#/repository/repoGetRawFile 1128 1129 :param file_path: The path to the file to get. 1130 :param ref: The branch or commit to get the file from. If not provided, 1131 the default branch is used. 1132 """ 1133 1134 url = self.REPO_GET_RAW_FILE.format( 1135 owner=self.owner.username, 1136 repo=self.name, 1137 path=file_path, 1138 ) 1139 params = Util.data_params_for_ref(ref) 1140 return self.allspice_client.requests_get_raw(url, params=params) 1141 1142 def download_to_file( 1143 self, 1144 file_path: str, 1145 io: IO, 1146 ref: Optional[Ref] = None, 1147 ) -> None: 1148 """ 1149 Download the binary data of a file to a file-like object. 1150 1151 Example: 1152 1153 with open("schematic.DSN", "wb") as f: 1154 Repository.download_to_file("Schematics/my_schematic.DSN", f) 1155 1156 :param file_path: The path to the file in the repository from the root 1157 of the repository. 1158 :param io: The file-like object to write the data to. 1159 """ 1160 1161 url = self.allspice_client._AllSpice__get_url( 1162 self.REPO_GET_RAW_FILE.format( 1163 owner=self.owner.username, 1164 repo=self.name, 1165 path=file_path, 1166 ) 1167 ) 1168 params = Util.data_params_for_ref(ref) 1169 response = self.allspice_client.requests.get( 1170 url, 1171 params=params, 1172 headers=self.allspice_client.headers, 1173 stream=True, 1174 ) 1175 1176 for chunk in response.iter_content(chunk_size=4096): 1177 if chunk: 1178 io.write(chunk) 1179 1180 def get_generated_json(self, content: Union[Content, str], ref: Optional[Ref] = None) -> dict: 1181 """ 1182 Get the json blob for a cad file if it exists, otherwise enqueue 1183 a new job and return a 503 status. 1184 1185 WARNING: This is still experimental and not recommended for critical 1186 applications. The structure and content of the returned dictionary can 1187 change at any time. 1188 1189 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON 1190 """ 1191 1192 if isinstance(content, Content): 1193 content = content.path 1194 1195 url = self.REPO_GET_ALLSPICE_JSON.format( 1196 owner=self.owner.username, 1197 repo=self.name, 1198 content=content, 1199 ) 1200 data = Util.data_params_for_ref(ref) 1201 return self.allspice_client.requests_get(url, data) 1202 1203 def get_generated_svg(self, content: Union[Content, str], ref: Optional[Ref] = None) -> bytes: 1204 """ 1205 Get the svg blob for a cad file if it exists, otherwise enqueue 1206 a new job and return a 503 status. 1207 1208 WARNING: This is still experimental and not yet recommended for 1209 critical applications. The content of the returned svg can change 1210 at any time. 1211 1212 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG 1213 """ 1214 1215 if isinstance(content, Content): 1216 content = content.path 1217 1218 url = self.REPO_GET_ALLSPICE_SVG.format( 1219 owner=self.owner.username, 1220 repo=self.name, 1221 content=content, 1222 ) 1223 data = Util.data_params_for_ref(ref) 1224 return self.allspice_client.requests_get_raw(url, data) 1225 1226 def create_file(self, file_path: str, content: str, data: Optional[dict] = None): 1227 """https://hub.allspice.io/api/swagger#/repository/repoCreateFile""" 1228 if not data: 1229 data = {} 1230 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1231 data.update({"content": content}) 1232 return self.allspice_client.requests_post(url, data) 1233 1234 def change_file(self, file_path: str, file_sha: str, content: str, data: Optional[dict] = None): 1235 """https://hub.allspice.io/api/swagger#/repository/repoCreateFile""" 1236 if not data: 1237 data = {} 1238 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1239 data.update({"sha": file_sha, "content": content}) 1240 return self.allspice_client.requests_put(url, data) 1241 1242 def delete_file(self, file_path: str, file_sha: str, data: Optional[dict] = None): 1243 """https://hub.allspice.io/api/swagger#/repository/repoDeleteFile""" 1244 if not data: 1245 data = {} 1246 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1247 data.update({"sha": file_sha}) 1248 return self.allspice_client.requests_delete(url, data) 1249 1250 def get_archive( 1251 self, 1252 ref: Ref = "main", 1253 archive_format: ArchiveFormat = ArchiveFormat.ZIP, 1254 ) -> bytes: 1255 """ 1256 Download all the files in a specific ref of a repository as a zip or tarball 1257 archive. 1258 1259 https://hub.allspice.io/api/swagger#/repository/repoGetArchive 1260 1261 :param ref: branch or commit to get content from, defaults to the "main" branch 1262 :param archive_format: zip or tar, defaults to zip 1263 """ 1264 1265 ref_string = Util.data_params_for_ref(ref)["ref"] 1266 url = self.REPO_GET_ARCHIVE.format( 1267 owner=self.owner.username, 1268 repo=self.name, 1269 ref=ref_string, 1270 format=archive_format.value, 1271 ) 1272 return self.allspice_client.requests_get_raw(url) 1273 1274 def get_topics(self) -> list[str]: 1275 """ 1276 Gets the list of topics on this repository. 1277 1278 See http://localhost:3000/api/swagger#/repository/repoListTopics 1279 """ 1280 1281 url = self.REPO_GET_TOPICS.format( 1282 owner=self.owner.username, 1283 repo=self.name, 1284 ) 1285 return self.allspice_client.requests_get(url)["topics"] 1286 1287 def add_topic(self, topic: str): 1288 """ 1289 Adds a topic to the repository. 1290 1291 See https://hub.allspice.io/api/swagger#/repository/repoAddTopic 1292 1293 :param topic: The topic to add. Topic names must consist only of 1294 lowercase letters, numnbers and dashes (-), and cannot start with 1295 dashes. Topic names also must be under 35 characters long. 1296 """ 1297 1298 url = self.REPO_ADD_TOPIC.format(owner=self.owner.username, repo=self.name, topic=topic) 1299 self.allspice_client.requests_put(url) 1300 1301 def create_release( 1302 self, 1303 tag_name: str, 1304 name: Optional[str] = None, 1305 body: Optional[str] = None, 1306 draft: bool = False, 1307 ): 1308 """ 1309 Create a release for this repository. The release will be created for 1310 the tag with the given name. If there is no tag with this name, create 1311 the tag first. 1312 1313 See https://hub.allspice.io/api/swagger#/repository/repoCreateRelease 1314 """ 1315 1316 url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name) 1317 data = { 1318 "tag_name": tag_name, 1319 "draft": draft, 1320 } 1321 if name is not None: 1322 data["name"] = name 1323 if body is not None: 1324 data["body"] = body 1325 response = self.allspice_client.requests_post(url, data) 1326 return Release.parse_response(self.allspice_client, response, self) 1327 1328 def get_releases( 1329 self, draft: Optional[bool] = None, pre_release: Optional[bool] = None 1330 ) -> List[Release]: 1331 """ 1332 Get the list of releases for this repository. 1333 1334 See https://hub.allspice.io/api/swagger#/repository/repoListReleases 1335 """ 1336 1337 data = {} 1338 1339 if draft is not None: 1340 data["draft"] = draft 1341 if pre_release is not None: 1342 data["pre-release"] = pre_release 1343 1344 url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name) 1345 responses = self.allspice_client.requests_get_paginated(url, params=data) 1346 1347 return [ 1348 Release.parse_response(self.allspice_client, response, self) for response in responses 1349 ] 1350 1351 def get_latest_release(self) -> Release: 1352 """ 1353 Get the latest release for this repository. 1354 1355 See https://hub.allspice.io/api/swagger#/repository/repoGetLatestRelease 1356 """ 1357 1358 url = self.REPO_GET_LATEST_RELEASE.format(owner=self.owner.username, repo=self.name) 1359 response = self.allspice_client.requests_get(url) 1360 release = Release.parse_response(self.allspice_client, response, self) 1361 return release 1362 1363 def get_release_by_tag(self, tag: str) -> Release: 1364 """ 1365 Get a release by its tag. 1366 1367 See https://hub.allspice.io/api/swagger#/repository/repoGetReleaseByTag 1368 """ 1369 1370 url = self.REPO_GET_RELEASE_BY_TAG.format( 1371 owner=self.owner.username, repo=self.name, tag=tag 1372 ) 1373 response = self.allspice_client.requests_get(url) 1374 release = Release.parse_response(self.allspice_client, response, self) 1375 return release 1376 1377 def get_commit_statuses( 1378 self, 1379 commit: Union[str, Commit], 1380 sort: Optional[CommitStatusSort] = None, 1381 state: Optional[CommitStatusState] = None, 1382 ) -> List[CommitStatus]: 1383 """ 1384 Get a list of statuses for a commit. 1385 1386 This is roughly equivalent to the Commit.get_statuses method, but this 1387 method allows you to sort and filter commits and is more convenient if 1388 you have a commit SHA and don't need to get the commit itself. 1389 1390 See https://hub.allspice.io/api/swagger#/repository/repoListStatuses 1391 """ 1392 1393 if isinstance(commit, Commit): 1394 commit = commit.sha 1395 1396 params = {} 1397 if sort is not None: 1398 params["sort"] = sort.value 1399 if state is not None: 1400 params["state"] = state.value 1401 1402 url = self.REPO_GET_COMMIT_STATUS.format( 1403 owner=self.owner.username, repo=self.name, sha=commit 1404 ) 1405 response = self.allspice_client.requests_get_paginated(url, params=params) 1406 return [CommitStatus.parse_response(self.allspice_client, status) for status in response] 1407 1408 def create_commit_status( 1409 self, 1410 commit: Union[str, Commit], 1411 context: Optional[str] = None, 1412 description: Optional[str] = None, 1413 state: Optional[CommitStatusState] = None, 1414 target_url: Optional[str] = None, 1415 ) -> CommitStatus: 1416 """ 1417 Create a status on a commit. 1418 1419 See https://hub.allspice.io/api/swagger#/repository/repoCreateStatus 1420 """ 1421 1422 if isinstance(commit, Commit): 1423 commit = commit.sha 1424 1425 data = {} 1426 if context is not None: 1427 data["context"] = context 1428 if description is not None: 1429 data["description"] = description 1430 if state is not None: 1431 data["state"] = state.value 1432 if target_url is not None: 1433 data["target_url"] = target_url 1434 1435 url = self.REPO_GET_COMMIT_STATUS.format( 1436 owner=self.owner.username, repo=self.name, sha=commit 1437 ) 1438 response = self.allspice_client.requests_post(url, data=data) 1439 return CommitStatus.parse_response(self.allspice_client, response) 1440 1441 def delete(self): 1442 self.allspice_client.requests_delete( 1443 Repository.REPO_DELETE % (self.owner.username, self.name) 1444 ) 1445 self.deleted = True 1446 1447 1448class Milestone(ApiObject): 1449 allow_merge_commits: Any 1450 allow_rebase: Any 1451 allow_rebase_explicit: Any 1452 allow_squash_merge: Any 1453 archived: Any 1454 closed_at: Any 1455 closed_issues: int 1456 created_at: str 1457 default_branch: Any 1458 description: str 1459 due_on: Any 1460 has_issues: Any 1461 has_pull_requests: Any 1462 has_wiki: Any 1463 id: int 1464 ignore_whitespace_conflicts: Any 1465 name: Any 1466 open_issues: int 1467 private: Any 1468 state: str 1469 title: str 1470 updated_at: str 1471 website: Any 1472 1473 API_OBJECT = """/repos/{owner}/{repo}/milestones/{number}""" # <owner, repo> 1474 1475 def __init__(self, allspice_client): 1476 super().__init__(allspice_client) 1477 1478 def __eq__(self, other): 1479 if not isinstance(other, Milestone): 1480 return False 1481 return self.allspice_client == other.allspice_client and self.id == other.id 1482 1483 def __hash__(self): 1484 return hash(self.allspice_client) ^ hash(self.id) 1485 1486 _fields_to_parsers: ClassVar[dict] = { 1487 "closed_at": lambda _, t: Util.convert_time(t), 1488 "due_on": lambda _, t: Util.convert_time(t), 1489 } 1490 1491 _patchable_fields: ClassVar[set[str]] = { 1492 "allow_merge_commits", 1493 "allow_rebase", 1494 "allow_rebase_explicit", 1495 "allow_squash_merge", 1496 "archived", 1497 "default_branch", 1498 "description", 1499 "has_issues", 1500 "has_pull_requests", 1501 "has_wiki", 1502 "ignore_whitespace_conflicts", 1503 "name", 1504 "private", 1505 "website", 1506 } 1507 1508 @classmethod 1509 def request(cls, allspice_client, owner: str, repo: str, number: str): 1510 return cls._request(allspice_client, {"owner": owner, "repo": repo, "number": number}) 1511 1512 1513class Attachment(ReadonlyApiObject): 1514 """ 1515 An asset attached to a comment. 1516 1517 You cannot edit or delete the attachment from this object - see the instance methods 1518 Comment.edit_attachment and delete_attachment for that. 1519 """ 1520 1521 browser_download_url: str 1522 created_at: str 1523 download_count: int 1524 id: int 1525 name: str 1526 size: int 1527 uuid: str 1528 1529 def __init__(self, allspice_client): 1530 super().__init__(allspice_client) 1531 1532 def __eq__(self, other): 1533 if not isinstance(other, Attachment): 1534 return False 1535 1536 return self.uuid == other.uuid 1537 1538 def __hash__(self): 1539 return hash(self.uuid) 1540 1541 def download_to_file(self, io: IO): 1542 """ 1543 Download the raw, binary data of this Attachment to a file-like object. 1544 1545 Example: 1546 1547 with open("my_file.zip", "wb") as f: 1548 attachment.download_to_file(f) 1549 1550 :param io: The file-like object to write the data to. 1551 """ 1552 1553 response = self.allspice_client.requests.get( 1554 self.browser_download_url, 1555 headers=self.allspice_client.headers, 1556 stream=True, 1557 ) 1558 # 4kb chunks 1559 for chunk in response.iter_content(chunk_size=4096): 1560 if chunk: 1561 io.write(chunk) 1562 1563 1564class Comment(ApiObject): 1565 assets: List[Union[Any, Dict[str, Union[int, str]]]] 1566 body: str 1567 created_at: datetime 1568 html_url: str 1569 id: int 1570 issue_url: str 1571 original_author: str 1572 original_author_id: int 1573 pull_request_url: str 1574 updated_at: datetime 1575 user: User 1576 1577 API_OBJECT = """/repos/{owner}/{repo}/issues/comments/{id}""" 1578 GET_ATTACHMENTS_PATH = """/repos/{owner}/{repo}/issues/comments/{id}/assets""" 1579 ATTACHMENT_PATH = """/repos/{owner}/{repo}/issues/comments/{id}/assets/{attachment_id}""" 1580 1581 def __init__(self, allspice_client): 1582 super().__init__(allspice_client) 1583 1584 def __eq__(self, other): 1585 if not isinstance(other, Comment): 1586 return False 1587 return self.repository == other.repository and self.id == other.id 1588 1589 def __hash__(self): 1590 return hash(self.repository) ^ hash(self.id) 1591 1592 @classmethod 1593 def request(cls, allspice_client, owner: str, repo: str, id: str) -> "Comment": 1594 return cls._request(allspice_client, {"owner": owner, "repo": repo, "id": id}) 1595 1596 _fields_to_parsers: ClassVar[dict] = { 1597 "user": lambda allspice_client, r: User.parse_response(allspice_client, r), 1598 "created_at": lambda _, t: Util.convert_time(t), 1599 "updated_at": lambda _, t: Util.convert_time(t), 1600 } 1601 1602 _patchable_fields: ClassVar[set[str]] = {"body"} 1603 1604 @property 1605 def parent_url(self) -> str: 1606 """URL of the parent of this comment (the issue or the pull request)""" 1607 1608 if self.issue_url is not None and self.issue_url != "": 1609 return self.issue_url 1610 else: 1611 return self.pull_request_url 1612 1613 @cached_property 1614 def repository(self) -> Repository: 1615 """The repository this comment was posted on.""" 1616 1617 owner_name, repo_name = self.parent_url.split("/")[-4:-2] 1618 return Repository.request(self.allspice_client, owner_name, repo_name) 1619 1620 def __fields_for_path(self): 1621 return { 1622 "owner": self.repository.owner.username, 1623 "repo": self.repository.name, 1624 "id": self.id, 1625 } 1626 1627 def commit(self): 1628 self._commit(self.__fields_for_path()) 1629 1630 def delete(self): 1631 self.allspice_client.requests_delete(self.API_OBJECT.format(**self.__fields_for_path())) 1632 self.deleted = True 1633 1634 def get_attachments(self) -> List[Attachment]: 1635 """ 1636 Get all attachments on this comment. This returns Attachment objects, which 1637 contain a link to download the attachment. 1638 1639 https://hub.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments 1640 """ 1641 1642 results = self.allspice_client.requests_get( 1643 self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()) 1644 ) 1645 return [Attachment.parse_response(self.allspice_client, result) for result in results] 1646 1647 def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment: 1648 """ 1649 Create an attachment on this comment. 1650 1651 https://hub.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment 1652 1653 :param file: The file to attach. This should be a file-like object. 1654 :param name: The name of the file. If not provided, the name of the file will be 1655 used. 1656 :return: The created attachment. 1657 """ 1658 1659 args: dict[str, Any] = { 1660 "files": {"attachment": file}, 1661 } 1662 if name is not None: 1663 args["params"] = {"name": name} 1664 1665 result = self.allspice_client.requests_post( 1666 self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()), 1667 **args, 1668 ) 1669 return Attachment.parse_response(self.allspice_client, result) 1670 1671 def edit_attachment(self, attachment: Attachment, data: dict) -> Attachment: 1672 """ 1673 Edit an attachment. 1674 1675 The list of params that can be edited is available at 1676 https://hub.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment 1677 1678 :param attachment: The attachment to be edited 1679 :param data: The data parameter should be a dictionary of the fields to edit. 1680 :return: The edited attachment 1681 """ 1682 1683 args = { 1684 **self.__fields_for_path(), 1685 "attachment_id": attachment.id, 1686 } 1687 result = self.allspice_client.requests_patch( 1688 self.ATTACHMENT_PATH.format(**args), 1689 data=data, 1690 ) 1691 return Attachment.parse_response(self.allspice_client, result) 1692 1693 def delete_attachment(self, attachment: Attachment): 1694 """https://hub.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment""" 1695 1696 args = { 1697 **self.__fields_for_path(), 1698 "attachment_id": attachment.id, 1699 } 1700 self.allspice_client.requests_delete(self.ATTACHMENT_PATH.format(**args)) 1701 attachment.deleted = True 1702 1703 1704class Commit(ReadonlyApiObject): 1705 author: User 1706 commit: Dict[str, Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]] 1707 committer: Dict[str, Union[int, str, bool]] 1708 created: str 1709 files: List[Dict[str, str]] 1710 html_url: str 1711 inner_commit: Dict[str, Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]] 1712 parents: List[Union[Dict[str, str], Any]] 1713 sha: str 1714 stats: Dict[str, int] 1715 url: str 1716 1717 API_OBJECT = """/repos/{owner}/{repo}/commits/{sha}""" 1718 COMMIT_GET_STATUS = """/repos/{owner}/{repo}/commits/{sha}/status""" 1719 COMMIT_GET_STATUSES = """/repos/{owner}/{repo}/commits/{sha}/statuses""" 1720 1721 # Regex to extract owner and repo names from the url property 1722 URL_REGEXP = re.compile(r"/repos/([^/]+)/([^/]+)/git/commits") 1723 1724 def __init__(self, allspice_client): 1725 super().__init__(allspice_client) 1726 1727 _fields_to_parsers: ClassVar[dict] = { 1728 # NOTE: api may return None for commiters that are no allspice users 1729 "author": lambda allspice_client, u: ( 1730 User.parse_response(allspice_client, u) if u else None 1731 ) 1732 } 1733 1734 def __eq__(self, other): 1735 if not isinstance(other, Commit): 1736 return False 1737 return self.sha == other.sha 1738 1739 def __hash__(self): 1740 return hash(self.sha) 1741 1742 @classmethod 1743 def parse_response(cls, allspice_client, result) -> "Commit": 1744 commit_cache = result["commit"] 1745 api_object = cls(allspice_client) 1746 cls._initialize(allspice_client, api_object, result) 1747 # inner_commit for legacy reasons 1748 Commit._add_read_property("inner_commit", commit_cache, api_object) 1749 return api_object 1750 1751 def get_status(self) -> CommitCombinedStatus: 1752 """ 1753 Get a combined status consisting of all statues on this commit. 1754 1755 Note that the returned object is a CommitCombinedStatus object, which 1756 also contains a list of all statuses on the commit. 1757 1758 https://hub.allspice.io/api/swagger#/repository/repoGetCommitStatus 1759 """ 1760 1761 result = self.allspice_client.requests_get( 1762 self.COMMIT_GET_STATUS.format(**self._fields_for_path) 1763 ) 1764 return CommitCombinedStatus.parse_response(self.allspice_client, result) 1765 1766 def get_statuses(self) -> List[CommitStatus]: 1767 """ 1768 Get a list of all statuses on this commit. 1769 1770 https://hub.allspice.io/api/swagger#/repository/repoListCommitStatuses 1771 """ 1772 1773 results = self.allspice_client.requests_get( 1774 self.COMMIT_GET_STATUSES.format(**self._fields_for_path) 1775 ) 1776 return [CommitStatus.parse_response(self.allspice_client, result) for result in results] 1777 1778 @cached_property 1779 def _fields_for_path(self) -> dict[str, str]: 1780 matches = self.URL_REGEXP.search(self.url) 1781 if not matches: 1782 raise ValueError(f"Invalid commit URL: {self.url}") 1783 1784 return { 1785 "owner": matches.group(1), 1786 "repo": matches.group(2), 1787 "sha": self.sha, 1788 } 1789 1790 1791class CommitStatusState(Enum): 1792 PENDING = "pending" 1793 SUCCESS = "success" 1794 ERROR = "error" 1795 FAILURE = "failure" 1796 WARNING = "warning" 1797 1798 @classmethod 1799 def try_init(cls, value: str) -> CommitStatusState | str: 1800 """ 1801 Try converting a string to the enum, and if that fails, return the 1802 string itself. 1803 """ 1804 1805 try: 1806 return cls(value) 1807 except ValueError: 1808 return value 1809 1810 1811class CommitStatus(ReadonlyApiObject): 1812 context: str 1813 created_at: str 1814 creator: User 1815 description: str 1816 id: int 1817 status: CommitStatusState 1818 target_url: str 1819 updated_at: str 1820 url: str 1821 1822 def __init__(self, allspice_client): 1823 super().__init__(allspice_client) 1824 1825 _fields_to_parsers: ClassVar[dict] = { 1826 # Gitea/ASH doesn't actually validate that the status is a "valid" 1827 # status, so we can expect empty or unknown strings in the status field. 1828 "status": lambda _, s: CommitStatusState.try_init(s), 1829 "creator": lambda allspice_client, u: ( 1830 User.parse_response(allspice_client, u) if u else None 1831 ), 1832 } 1833 1834 def __eq__(self, other): 1835 if not isinstance(other, CommitStatus): 1836 return False 1837 return self.id == other.id 1838 1839 def __hash__(self): 1840 return hash(self.id) 1841 1842 1843class CommitCombinedStatus(ReadonlyApiObject): 1844 commit_url: str 1845 repository: Repository 1846 sha: str 1847 state: CommitStatusState 1848 statuses: List["CommitStatus"] 1849 total_count: int 1850 url: str 1851 1852 def __init__(self, allspice_client): 1853 super().__init__(allspice_client) 1854 1855 _fields_to_parsers: ClassVar[dict] = { 1856 # See CommitStatus 1857 "state": lambda _, s: CommitStatusState.try_init(s), 1858 "statuses": lambda allspice_client, statuses: [ 1859 CommitStatus.parse_response(allspice_client, status) for status in statuses 1860 ], 1861 "repository": lambda allspice_client, r: Repository.parse_response(allspice_client, r), 1862 } 1863 1864 def __eq__(self, other): 1865 if not isinstance(other, CommitCombinedStatus): 1866 return False 1867 return self.sha == other.sha 1868 1869 def __hash__(self): 1870 return hash(self.sha) 1871 1872 @classmethod 1873 def parse_response(cls, allspice_client, result) -> "CommitCombinedStatus": 1874 api_object = cls(allspice_client) 1875 cls._initialize(allspice_client, api_object, result) 1876 return api_object 1877 1878 1879class Issue(ApiObject): 1880 assets: List[Any] 1881 assignee: Any 1882 assignees: Any 1883 body: str 1884 closed_at: Any 1885 comments: int 1886 created_at: str 1887 due_date: Any 1888 html_url: str 1889 id: int 1890 is_locked: bool 1891 labels: List[Any] 1892 milestone: Optional["Milestone"] 1893 number: int 1894 original_author: str 1895 original_author_id: int 1896 pin_order: int 1897 pull_request: Any 1898 ref: str 1899 repository: Dict[str, Union[int, str]] 1900 state: str 1901 title: str 1902 updated_at: str 1903 url: str 1904 user: User 1905 1906 API_OBJECT = """/repos/{owner}/{repo}/issues/{index}""" # <owner, repo, index> 1907 GET_TIME = """/repos/%s/%s/issues/%s/times""" # <owner, repo, index> 1908 GET_COMMENTS = """/repos/{owner}/{repo}/issues/{index}/comments""" 1909 CREATE_ISSUE = """/repos/{owner}/{repo}/issues""" 1910 1911 OPENED = "open" 1912 CLOSED = "closed" 1913 1914 def __init__(self, allspice_client): 1915 super().__init__(allspice_client) 1916 1917 def __eq__(self, other): 1918 if not isinstance(other, Issue): 1919 return False 1920 return self.repository == other.repository and self.id == other.id 1921 1922 def __hash__(self): 1923 return hash(self.repository) ^ hash(self.id) 1924 1925 _fields_to_parsers: ClassVar[dict] = { 1926 "milestone": lambda allspice_client, m: Milestone.parse_response(allspice_client, m), 1927 "user": lambda allspice_client, u: User.parse_response(allspice_client, u), 1928 "assignee": lambda allspice_client, u: User.parse_response(allspice_client, u), 1929 "assignees": lambda allspice_client, us: [ 1930 User.parse_response(allspice_client, u) for u in us 1931 ], 1932 "state": lambda _, s: (Issue.CLOSED if s == "closed" else Issue.OPENED), 1933 } 1934 1935 _parsers_to_fields: ClassVar[dict] = { 1936 "milestone": lambda m: m.id, 1937 } 1938 1939 _patchable_fields: ClassVar[set[str]] = { 1940 "assignee", 1941 "assignees", 1942 "body", 1943 "due_date", 1944 "milestone", 1945 "state", 1946 "title", 1947 } 1948 1949 def commit(self): 1950 args = { 1951 "owner": self.repository.owner.username, 1952 "repo": self.repository.name, 1953 "index": self.number, 1954 } 1955 self._commit(args) 1956 1957 @classmethod 1958 def request(cls, allspice_client, owner: str, repo: str, number: str): 1959 api_object = cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number}) 1960 # The repository in the response is a RepositoryMeta object, so request 1961 # the full repository object and add it to the issue object. 1962 repository = Repository.request(allspice_client, owner, repo) 1963 setattr(api_object, "_repository", repository) 1964 # For legacy reasons 1965 cls._add_read_property("repo", repository, api_object) 1966 return api_object 1967 1968 @classmethod 1969 def create_issue(cls, allspice_client, repo: Repository, title: str, body: str = ""): 1970 args = {"owner": repo.owner.username, "repo": repo.name} 1971 data = {"title": title, "body": body} 1972 result = allspice_client.requests_post(Issue.CREATE_ISSUE.format(**args), data=data) 1973 issue = Issue.parse_response(allspice_client, result) 1974 setattr(issue, "_repository", repo) 1975 cls._add_read_property("repo", repo, issue) 1976 return issue 1977 1978 @property 1979 def owner(self) -> Organization | User: 1980 return self.repository.owner 1981 1982 def get_time_sum(self, user: User) -> int: 1983 results = self.allspice_client.requests_get( 1984 Issue.GET_TIME % (self.owner.username, self.repository.name, self.number) 1985 ) 1986 return sum(result["time"] for result in results if result and result["user_id"] == user.id) 1987 1988 def get_times(self) -> Optional[Dict]: 1989 return self.allspice_client.requests_get( 1990 Issue.GET_TIME % (self.owner.username, self.repository.name, self.number) 1991 ) 1992 1993 def delete_time(self, time_id: str): 1994 path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times/{time_id}" 1995 self.allspice_client.requests_delete(path) 1996 1997 def add_time(self, time: int, created: Optional[str] = None, user_name: Optional[User] = None): 1998 path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times" 1999 self.allspice_client.requests_post( 2000 path, data={"created": created, "time": int(time), "user_name": user_name} 2001 ) 2002 2003 def get_comments(self) -> List[Comment]: 2004 """https://hub.allspice.io/api/swagger#/issue/issueGetComments""" 2005 2006 results = self.allspice_client.requests_get( 2007 self.GET_COMMENTS.format( 2008 owner=self.owner.username, repo=self.repository.name, index=self.number 2009 ) 2010 ) 2011 2012 return [Comment.parse_response(self.allspice_client, result) for result in results] 2013 2014 def create_comment(self, body: str) -> Comment: 2015 """https://hub.allspice.io/api/swagger#/issue/issueCreateComment""" 2016 2017 path = self.GET_COMMENTS.format( 2018 owner=self.owner.username, repo=self.repository.name, index=self.number 2019 ) 2020 2021 response = self.allspice_client.requests_post(path, data={"body": body}) 2022 return Comment.parse_response(self.allspice_client, response) 2023 2024 2025class DesignReview(ApiObject): 2026 """ 2027 A Design Review. See 2028 https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest. 2029 2030 Note: The base and head fields are not `Branch` objects - they are plain strings 2031 referring to the branch names. This is because DRs can exist for branches that have 2032 been deleted, which don't have an associated `Branch` object from the API. You can use 2033 the `Repository.get_branch` method to get a `Branch` object for a branch if you know 2034 it exists. 2035 """ 2036 2037 additions: int 2038 allow_maintainer_edit: bool 2039 allow_maintainer_edits: Any 2040 assignee: User 2041 assignees: List["User"] 2042 base: str 2043 body: str 2044 changed_files: int 2045 closed_at: Any 2046 comments: int 2047 created_at: str 2048 deletions: int 2049 diff_url: str 2050 draft: bool 2051 due_date: Optional[str] 2052 head: str 2053 html_url: str 2054 id: int 2055 is_locked: bool 2056 labels: List[Any] 2057 merge_base: str 2058 merge_commit_sha: Any 2059 mergeable: bool 2060 merged: bool 2061 merged_at: Any 2062 merged_by: Any 2063 milestone: Any 2064 number: int 2065 patch_url: str 2066 pin_order: int 2067 repository: Optional["Repository"] 2068 requested_reviewers: Any 2069 review_comments: int 2070 state: str 2071 title: str 2072 updated_at: str 2073 url: str 2074 user: User 2075 2076 API_OBJECT = "/repos/{owner}/{repo}/pulls/{index}" 2077 MERGE_DESIGN_REVIEW = "/repos/{owner}/{repo}/pulls/{index}/merge" 2078 GET_COMMENTS = "/repos/{owner}/{repo}/issues/{index}/comments" 2079 2080 OPEN = "open" 2081 CLOSED = "closed" 2082 2083 class MergeType(Enum): 2084 MERGE = "merge" 2085 REBASE = "rebase" 2086 REBASE_MERGE = "rebase-merge" 2087 SQUASH = "squash" 2088 MANUALLY_MERGED = "manually-merged" 2089 2090 def __init__(self, allspice_client): 2091 super().__init__(allspice_client) 2092 2093 def __eq__(self, other): 2094 if not isinstance(other, DesignReview): 2095 return False 2096 return self.repository == other.repository and self.id == other.id 2097 2098 def __hash__(self): 2099 return hash(self.repository) ^ hash(self.id) 2100 2101 @classmethod 2102 def parse_response(cls, allspice_client, result) -> "DesignReview": 2103 api_object = super().parse_response(allspice_client, result) 2104 cls._add_read_property( 2105 "repository", 2106 Repository.parse_response(allspice_client, result["base"]["repo"]), 2107 api_object, 2108 ) 2109 2110 return api_object 2111 2112 @classmethod 2113 def request(cls, allspice_client, owner: str, repo: str, number: str): 2114 """See https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest""" 2115 return cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number}) 2116 2117 _fields_to_parsers: ClassVar[dict] = { 2118 "assignee": lambda allspice_client, u: User.parse_response(allspice_client, u), 2119 "assignees": lambda allspice_client, us: [ 2120 User.parse_response(allspice_client, u) for u in us 2121 ], 2122 "base": lambda _, b: b["ref"], 2123 "head": lambda _, h: h["ref"], 2124 "merged_by": lambda allspice_client, u: User.parse_response(allspice_client, u), 2125 "milestone": lambda allspice_client, m: Milestone.parse_response(allspice_client, m), 2126 "user": lambda allspice_client, u: User.parse_response(allspice_client, u), 2127 } 2128 2129 _patchable_fields: ClassVar[set[str]] = { 2130 "allow_maintainer_edits", 2131 "assignee", 2132 "assignees", 2133 "base", 2134 "body", 2135 "due_date", 2136 "milestone", 2137 "state", 2138 "title", 2139 } 2140 2141 _parsers_to_fields: ClassVar[dict] = { 2142 "assignee": lambda u: u.username, 2143 "assignees": lambda us: [u.username for u in us], 2144 "base": lambda b: b.name if isinstance(b, Branch) else b, 2145 "milestone": lambda m: m.id, 2146 } 2147 2148 def commit(self): 2149 data = self.get_dirty_fields() 2150 if "due_date" in data and data["due_date"] is None: 2151 data["unset_due_date"] = True 2152 2153 args = { 2154 "owner": self.repository.owner.username, 2155 "repo": self.repository.name, 2156 "index": self.number, 2157 } 2158 self._commit(args, data) 2159 2160 def merge(self, merge_type: MergeType): 2161 """ 2162 Merge the pull request. See 2163 https://hub.allspice.io/api/swagger#/repository/repoMergePullRequest 2164 2165 :param merge_type: The type of merge to perform. See the MergeType enum. 2166 """ 2167 2168 self.allspice_client.requests_put( 2169 self.MERGE_DESIGN_REVIEW.format( 2170 owner=self.repository.owner.username, 2171 repo=self.repository.name, 2172 index=self.number, 2173 ), 2174 data={"Do": merge_type.value}, 2175 ) 2176 2177 def get_comments(self) -> List[Comment]: 2178 """ 2179 Get the comments on this pull request, but not specifically on a review. 2180 2181 https://hub.allspice.io/api/swagger#/issue/issueGetComments 2182 2183 :return: A list of comments on this pull request. 2184 """ 2185 2186 results = self.allspice_client.requests_get( 2187 self.GET_COMMENTS.format( 2188 owner=self.repository.owner.username, 2189 repo=self.repository.name, 2190 index=self.number, 2191 ) 2192 ) 2193 return [Comment.parse_response(self.allspice_client, result) for result in results] 2194 2195 def create_comment(self, body: str): 2196 """ 2197 Create a comment on this pull request. This uses the same endpoint as the 2198 comments on issues, and will not be associated with any reviews. 2199 2200 https://hub.allspice.io/api/swagger#/issue/issueCreateComment 2201 2202 :param body: The body of the comment. 2203 :return: The comment that was created. 2204 """ 2205 2206 result = self.allspice_client.requests_post( 2207 self.GET_COMMENTS.format( 2208 owner=self.repository.owner.username, 2209 repo=self.repository.name, 2210 index=self.number, 2211 ), 2212 data={"body": body}, 2213 ) 2214 return Comment.parse_response(self.allspice_client, result) 2215 2216 2217class Team(ApiObject): 2218 can_create_org_repo: bool 2219 description: str 2220 id: int 2221 includes_all_repositories: bool 2222 name: str 2223 organization: Optional["Organization"] 2224 permission: str 2225 units: List[str] 2226 units_map: Dict[str, str] 2227 2228 API_OBJECT = """/teams/{id}""" # <id> 2229 ADD_REPO = """/teams/%s/repos/%s/%s""" # <id, org, repo> 2230 TEAM_DELETE = """/teams/%s""" # <id> 2231 GET_MEMBERS = """/teams/%s/members""" # <id> 2232 GET_REPOS = """/teams/%s/repos""" # <id> 2233 2234 def __init__(self, allspice_client): 2235 super().__init__(allspice_client) 2236 2237 def __eq__(self, other): 2238 if not isinstance(other, Team): 2239 return False 2240 return self.organization == other.organization and self.id == other.id 2241 2242 def __hash__(self): 2243 return hash(self.organization) ^ hash(self.id) 2244 2245 _fields_to_parsers: ClassVar[dict] = { 2246 "organization": lambda allspice_client, o: Organization.parse_response(allspice_client, o) 2247 } 2248 2249 _patchable_fields: ClassVar[set[str]] = { 2250 "can_create_org_repo", 2251 "description", 2252 "includes_all_repositories", 2253 "name", 2254 "permission", 2255 "units", 2256 "units_map", 2257 } 2258 2259 @classmethod 2260 def request(cls, allspice_client, id: int): 2261 return cls._request(allspice_client, {"id": id}) 2262 2263 def commit(self): 2264 args = {"id": self.id} 2265 self._commit(args) 2266 2267 def add_user(self, user: User): 2268 """https://hub.allspice.io/api/swagger#/organization/orgAddTeamMember""" 2269 url = f"/teams/{self.id}/members/{user.login}" 2270 self.allspice_client.requests_put(url) 2271 2272 def add_repo(self, org: Organization, repo: Union[Repository, str]): 2273 if isinstance(repo, Repository): 2274 repo_name = repo.name 2275 else: 2276 repo_name = repo 2277 self.allspice_client.requests_put(Team.ADD_REPO % (self.id, org.username, repo_name)) 2278 2279 def get_members(self): 2280 """Get all users assigned to the team.""" 2281 results = self.allspice_client.requests_get_paginated( 2282 Team.GET_MEMBERS % self.id, 2283 ) 2284 return [User.parse_response(self.allspice_client, result) for result in results] 2285 2286 def get_repos(self): 2287 """Get all repos of this Team.""" 2288 results = self.allspice_client.requests_get(Team.GET_REPOS % self.id) 2289 return [Repository.parse_response(self.allspice_client, result) for result in results] 2290 2291 def delete(self): 2292 self.allspice_client.requests_delete(Team.TEAM_DELETE % self.id) 2293 self.deleted = True 2294 2295 def remove_team_member(self, user_name: str): 2296 url = f"/teams/{self.id}/members/{user_name}" 2297 self.allspice_client.requests_delete(url) 2298 2299 2300class Release(ApiObject): 2301 """ 2302 A release on a repo. 2303 """ 2304 2305 assets: List[Union[Any, Dict[str, Union[int, str]], "ReleaseAsset"]] 2306 author: User 2307 body: str 2308 created_at: str 2309 draft: bool 2310 html_url: str 2311 id: int 2312 name: str 2313 prerelease: bool 2314 published_at: str 2315 repo: Optional["Repository"] 2316 repository: Optional["Repository"] 2317 tag_name: str 2318 tarball_url: str 2319 target_commitish: str 2320 upload_url: str 2321 url: str 2322 zipball_url: str 2323 2324 API_OBJECT = "/repos/{owner}/{repo}/releases/{id}" 2325 RELEASE_CREATE_ASSET = "/repos/{owner}/{repo}/releases/{id}/assets" 2326 # Note that we don't strictly need the get_assets route, as the release 2327 # object already contains the assets. 2328 2329 def __init__(self, allspice_client): 2330 super().__init__(allspice_client) 2331 2332 def __eq__(self, other): 2333 if not isinstance(other, Release): 2334 return False 2335 return self.repo == other.repo and self.id == other.id 2336 2337 def __hash__(self): 2338 return hash(self.repo) ^ hash(self.id) 2339 2340 _fields_to_parsers: ClassVar[dict] = { 2341 "author": lambda allspice_client, author: User.parse_response(allspice_client, author), 2342 } 2343 _patchable_fields: ClassVar[set[str]] = { 2344 "body", 2345 "draft", 2346 "name", 2347 "prerelease", 2348 "tag_name", 2349 "target_commitish", 2350 } 2351 2352 @classmethod 2353 def parse_response(cls, allspice_client, result, repo) -> Release: 2354 release = super().parse_response(allspice_client, result) 2355 Release._add_read_property("repository", repo, release) 2356 # For legacy reasons 2357 Release._add_read_property("repo", repo, release) 2358 setattr( 2359 release, 2360 "_assets", 2361 [ 2362 ReleaseAsset.parse_response(allspice_client, asset, release) 2363 for asset in result["assets"] 2364 ], 2365 ) 2366 return release 2367 2368 @classmethod 2369 def request( 2370 cls, 2371 allspice_client, 2372 owner: str, 2373 repo: str, 2374 id: Optional[int] = None, 2375 ) -> Release: 2376 args = {"owner": owner, "repo": repo, "id": id} 2377 release_response = cls._get_gitea_api_object(allspice_client, args) 2378 repository = Repository.request(allspice_client, owner, repo) 2379 release = cls.parse_response(allspice_client, release_response, repository) 2380 return release 2381 2382 def commit(self): 2383 args = {"owner": self.repo.owner.name, "repo": self.repo.name, "id": self.id} 2384 self._commit(args) 2385 2386 def create_asset(self, file: IO, name: Optional[str] = None) -> ReleaseAsset: 2387 """ 2388 Create an asset for this release. 2389 2390 https://hub.allspice.io/api/swagger#/repository/repoCreateReleaseAsset 2391 2392 :param file: The file to upload. This should be a file-like object. 2393 :param name: The name of the file. 2394 :return: The created asset. 2395 """ 2396 2397 args: dict[str, Any] = {"files": {"attachment": file}} 2398 if name is not None: 2399 args["params"] = {"name": name} 2400 2401 result = self.allspice_client.requests_post( 2402 self.RELEASE_CREATE_ASSET.format( 2403 owner=self.repo.owner.username, 2404 repo=self.repo.name, 2405 id=self.id, 2406 ), 2407 **args, 2408 ) 2409 return ReleaseAsset.parse_response(self.allspice_client, result, self) 2410 2411 def delete(self): 2412 args = {"owner": self.repo.owner.name, "repo": self.repo.name, "id": self.id} 2413 self.allspice_client.requests_delete(self.API_OBJECT.format(**args)) 2414 self.deleted = True 2415 2416 2417class ReleaseAsset(ApiObject): 2418 browser_download_url: str 2419 created_at: str 2420 download_count: int 2421 id: int 2422 name: str 2423 release: Optional["Release"] 2424 size: int 2425 uuid: str 2426 2427 API_OBJECT = "/repos/{owner}/{repo}/releases/{release_id}/assets/{id}" 2428 2429 def __init__(self, allspice_client): 2430 super().__init__(allspice_client) 2431 2432 def __eq__(self, other): 2433 if not isinstance(other, ReleaseAsset): 2434 return False 2435 return self.release == other.release and self.id == other.id 2436 2437 def __hash__(self): 2438 return hash(self.release) ^ hash(self.id) 2439 2440 _fields_to_parsers: ClassVar[dict] = {} 2441 _patchable_fields: ClassVar[set[str]] = { 2442 "name", 2443 } 2444 2445 @classmethod 2446 def parse_response(cls, allspice_client, result, release) -> ReleaseAsset: 2447 asset = super().parse_response(allspice_client, result) 2448 ReleaseAsset._add_read_property("release", release, asset) 2449 return asset 2450 2451 @classmethod 2452 def request( 2453 cls, 2454 allspice_client, 2455 owner: str, 2456 repo: str, 2457 release_id: int, 2458 id: int, 2459 ) -> ReleaseAsset: 2460 args = {"owner": owner, "repo": repo, "release_id": release_id, "id": id} 2461 asset_response = cls._get_gitea_api_object(allspice_client, args) 2462 release = Release.request(allspice_client, owner, repo, release_id) 2463 asset = cls.parse_response(allspice_client, asset_response, release) 2464 return asset 2465 2466 def commit(self): 2467 args = { 2468 "owner": self.release.repo.owner, 2469 "repo": self.release.repo.name, 2470 "release_id": self.release.id, 2471 "id": self.id, 2472 } 2473 self._commit(args) 2474 2475 def download(self) -> bytes: 2476 """ 2477 Download the raw, binary data of this asset. 2478 2479 Note 1: if the file you are requesting is a text file, you might want to 2480 use .decode() on the result to get a string. For example: 2481 2482 asset.download().decode("utf-8") 2483 2484 Note 2: this method will store the entire file in memory. If you are 2485 downloading a large file, you might want to use download_to_file instead. 2486 """ 2487 2488 return self.allspice_client.requests.get( 2489 self.browser_download_url, 2490 headers=self.allspice_client.headers, 2491 ).content 2492 2493 def download_to_file(self, io: IO): 2494 """ 2495 Download the raw, binary data of this asset to a file-like object. 2496 2497 Example: 2498 2499 with open("my_file.zip", "wb") as f: 2500 asset.download_to_file(f) 2501 2502 :param io: The file-like object to write the data to. 2503 """ 2504 2505 response = self.allspice_client.requests.get( 2506 self.browser_download_url, 2507 headers=self.allspice_client.headers, 2508 stream=True, 2509 ) 2510 # 4kb chunks 2511 for chunk in response.iter_content(chunk_size=4096): 2512 if chunk: 2513 io.write(chunk) 2514 2515 def delete(self): 2516 args = { 2517 "owner": self.release.repo.owner.name, 2518 "repo": self.release.repo.name, 2519 "release_id": self.release.id, 2520 "id": self.id, 2521 } 2522 self.allspice_client.requests_delete(self.API_OBJECT.format(**args)) 2523 self.deleted = True 2524 2525 2526class Content(ReadonlyApiObject): 2527 content: Any 2528 download_url: str 2529 encoding: Any 2530 git_url: str 2531 html_url: str 2532 last_commit_sha: str 2533 name: str 2534 path: str 2535 sha: str 2536 size: int 2537 submodule_git_url: Any 2538 target: Any 2539 type: str 2540 url: str 2541 2542 FILE = "file" 2543 2544 def __init__(self, allspice_client): 2545 super().__init__(allspice_client) 2546 2547 def __eq__(self, other): 2548 if not isinstance(other, Content): 2549 return False 2550 2551 return self.sha == other.sha and self.name == other.name 2552 2553 def __hash__(self): 2554 return hash(self.sha) ^ hash(self.name) 2555 2556 2557Ref = Union[Branch, Commit, str] 2558 2559 2560class Util: 2561 @staticmethod 2562 def convert_time(time: str) -> datetime: 2563 """Parsing of strange Gitea time format ("%Y-%m-%dT%H:%M:%S:%z" but with ":" in time zone notation)""" 2564 try: 2565 return datetime.strptime(time[:-3] + "00", "%Y-%m-%dT%H:%M:%S%z") 2566 except ValueError: 2567 return datetime.strptime(time[:-3] + "00", "%Y-%m-%dT%H:%M:%S") 2568 2569 @staticmethod 2570 def format_time(time: datetime) -> str: 2571 """ 2572 Format a datetime object to Gitea's time format. 2573 2574 :param time: The time to format 2575 :return: Formatted time 2576 """ 2577 2578 return time.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S") + "Z" 2579 2580 @staticmethod 2581 def data_params_for_ref(ref: Optional[Ref]) -> Dict: 2582 """ 2583 Given a "ref", returns a dict with the ref parameter for the API call. 2584 2585 If the ref is None, returns an empty dict. You can pass this to the API 2586 directly. 2587 """ 2588 2589 if isinstance(ref, Branch): 2590 return {"ref": ref.name} 2591 elif isinstance(ref, Commit): 2592 return {"ref": ref.sha} 2593 elif ref: 2594 return {"ref": ref} 2595 else: 2596 return {}
33class Organization(ApiObject): 34 """see https://hub.allspice.io/api/swagger#/organization/orgGetAll""" 35 36 active: Optional[bool] 37 avatar_url: str 38 created: Optional[str] 39 description: str 40 email: str 41 followers_count: Optional[int] 42 following_count: Optional[int] 43 full_name: str 44 html_url: Optional[str] 45 id: int 46 is_admin: Optional[bool] 47 language: Optional[str] 48 last_login: Optional[str] 49 location: str 50 login: Optional[str] 51 login_name: Optional[str] 52 name: Optional[str] 53 prohibit_login: Optional[bool] 54 repo_admin_change_team_access: Optional[bool] 55 restricted: Optional[bool] 56 source_id: Optional[int] 57 starred_repos_count: Optional[int] 58 username: str 59 visibility: str 60 website: str 61 62 API_OBJECT = """/orgs/{name}""" # <org> 63 ORG_REPOS_REQUEST = """/orgs/%s/repos""" # <org> 64 ORG_TEAMS_REQUEST = """/orgs/%s/teams""" # <org> 65 ORG_TEAMS_CREATE = """/orgs/%s/teams""" # <org> 66 ORG_GET_MEMBERS = """/orgs/%s/members""" # <org> 67 ORG_IS_MEMBER = """/orgs/%s/members/%s""" # <org>, <username> 68 ORG_HEATMAP = """/users/%s/heatmap""" # <username> 69 70 def __init__(self, allspice_client): 71 super().__init__(allspice_client) 72 73 def __eq__(self, other): 74 if not isinstance(other, Organization): 75 return False 76 return self.allspice_client == other.allspice_client and self.name == other.name 77 78 def __hash__(self): 79 return hash(self.allspice_client) ^ hash(self.name) 80 81 @classmethod 82 def request(cls, allspice_client, name: str) -> Self: 83 return cls._request(allspice_client, {"name": name}) 84 85 @classmethod 86 def parse_response(cls, allspice_client, result) -> "Organization": 87 api_object = super().parse_response(allspice_client, result) 88 # add "name" field to make this behave similar to users for gitea < 1.18 89 # also necessary for repository-owner when org is repo owner 90 if not hasattr(api_object, "name"): 91 Organization._add_read_property("name", result["username"], api_object) 92 return api_object 93 94 _patchable_fields: ClassVar[set[str]] = { 95 "description", 96 "full_name", 97 "location", 98 "visibility", 99 "website", 100 } 101 102 def commit(self): 103 args = {"name": self.name} 104 self._commit(args) 105 106 def create_repo( 107 self, 108 repoName: str, 109 description: str = "", 110 private: bool = False, 111 autoInit=True, 112 gitignores: Optional[str] = None, 113 license: Optional[str] = None, 114 readme: str = "Default", 115 issue_labels: Optional[str] = None, 116 default_branch="master", 117 ): 118 """Create an organization Repository 119 120 Throws: 121 AlreadyExistsException: If the Repository exists already. 122 Exception: If something else went wrong. 123 """ 124 result = self.allspice_client.requests_post( 125 f"/orgs/{self.name}/repos", 126 data={ 127 "name": repoName, 128 "description": description, 129 "private": private, 130 "auto_init": autoInit, 131 "gitignores": gitignores, 132 "license": license, 133 "issue_labels": issue_labels, 134 "readme": readme, 135 "default_branch": default_branch, 136 }, 137 ) 138 if "id" in result: 139 self.allspice_client.logger.info("Successfully created Repository %s " % result["name"]) 140 else: 141 self.allspice_client.logger.error(result["message"]) 142 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 143 return Repository.parse_response(self.allspice_client, result) 144 145 def get_repositories(self) -> List["Repository"]: 146 results = self.allspice_client.requests_get_paginated( 147 Organization.ORG_REPOS_REQUEST % self.username 148 ) 149 return [Repository.parse_response(self.allspice_client, result) for result in results] 150 151 def get_repository(self, name) -> "Repository": 152 repos = self.get_repositories() 153 for repo in repos: 154 if repo.name == name: 155 return repo 156 raise NotFoundException("Repository %s not existent in organization." % name) 157 158 def get_teams(self) -> List["Team"]: 159 results = self.allspice_client.requests_get(Organization.ORG_TEAMS_REQUEST % self.username) 160 teams = [Team.parse_response(self.allspice_client, result) for result in results] 161 # organisation seems to be missing using this request, so we add org manually 162 for t in teams: 163 setattr(t, "_organization", self) 164 return teams 165 166 def get_team(self, name) -> "Team": 167 teams = self.get_teams() 168 for team in teams: 169 if team.name == name: 170 return team 171 raise NotFoundException("Team not existent in organization.") 172 173 def create_team( 174 self, 175 name: str, 176 description: str = "", 177 permission: str = "read", 178 can_create_org_repo: bool = False, 179 includes_all_repositories: bool = False, 180 units=( 181 "repo.code", 182 "repo.issues", 183 "repo.ext_issues", 184 "repo.wiki", 185 "repo.pulls", 186 "repo.releases", 187 "repo.ext_wiki", 188 ), 189 units_map={}, 190 ) -> "Team": 191 """Alias for AllSpice#create_team""" 192 # TODO: Move AllSpice#create_team to Organization#create_team and 193 # deprecate AllSpice#create_team. 194 return self.allspice_client.create_team( 195 org=self, 196 name=name, 197 description=description, 198 permission=permission, 199 can_create_org_repo=can_create_org_repo, 200 includes_all_repositories=includes_all_repositories, 201 units=units, 202 units_map=units_map, 203 ) 204 205 def get_members(self) -> List["User"]: 206 results = self.allspice_client.requests_get(Organization.ORG_GET_MEMBERS % self.username) 207 return [User.parse_response(self.allspice_client, result) for result in results] 208 209 def is_member(self, username) -> bool: 210 if isinstance(username, User): 211 username = username.username 212 try: 213 # returns 204 if its ok, 404 if its not 214 self.allspice_client.requests_get( 215 Organization.ORG_IS_MEMBER % (self.username, username) 216 ) 217 return True 218 except Exception: 219 return False 220 221 def remove_member(self, user: "User"): 222 path = f"/orgs/{self.username}/members/{user.username}" 223 self.allspice_client.requests_delete(path) 224 225 def delete(self): 226 """Delete this Organization. Invalidates this Objects data. Also deletes all Repositories owned by the User""" 227 for repo in self.get_repositories(): 228 repo.delete() 229 self.allspice_client.requests_delete(Organization.API_OBJECT.format(name=self.username)) 230 self.deleted = True 231 232 def get_heatmap(self) -> List[Tuple[datetime, int]]: 233 results = self.allspice_client.requests_get(User.USER_HEATMAP % self.username) 234 results = [ 235 (datetime.fromtimestamp(result["timestamp"]), result["contributions"]) 236 for result in results 237 ] 238 return results
see allspice.allspice.io/api/swagger#/organization/orgGetAll">https://huballspice.allspice.io/api/swagger#/organization/orgGetAll
85 @classmethod 86 def parse_response(cls, allspice_client, result) -> "Organization": 87 api_object = super().parse_response(allspice_client, result) 88 # add "name" field to make this behave similar to users for gitea < 1.18 89 # also necessary for repository-owner when org is repo owner 90 if not hasattr(api_object, "name"): 91 Organization._add_read_property("name", result["username"], api_object) 92 return api_object
106 def create_repo( 107 self, 108 repoName: str, 109 description: str = "", 110 private: bool = False, 111 autoInit=True, 112 gitignores: Optional[str] = None, 113 license: Optional[str] = None, 114 readme: str = "Default", 115 issue_labels: Optional[str] = None, 116 default_branch="master", 117 ): 118 """Create an organization Repository 119 120 Throws: 121 AlreadyExistsException: If the Repository exists already. 122 Exception: If something else went wrong. 123 """ 124 result = self.allspice_client.requests_post( 125 f"/orgs/{self.name}/repos", 126 data={ 127 "name": repoName, 128 "description": description, 129 "private": private, 130 "auto_init": autoInit, 131 "gitignores": gitignores, 132 "license": license, 133 "issue_labels": issue_labels, 134 "readme": readme, 135 "default_branch": default_branch, 136 }, 137 ) 138 if "id" in result: 139 self.allspice_client.logger.info("Successfully created Repository %s " % result["name"]) 140 else: 141 self.allspice_client.logger.error(result["message"]) 142 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 143 return Repository.parse_response(self.allspice_client, result)
Create an organization Repository
Throws: AlreadyExistsException: If the Repository exists already. Exception: If something else went wrong.
158 def get_teams(self) -> List["Team"]: 159 results = self.allspice_client.requests_get(Organization.ORG_TEAMS_REQUEST % self.username) 160 teams = [Team.parse_response(self.allspice_client, result) for result in results] 161 # organisation seems to be missing using this request, so we add org manually 162 for t in teams: 163 setattr(t, "_organization", self) 164 return teams
173 def create_team( 174 self, 175 name: str, 176 description: str = "", 177 permission: str = "read", 178 can_create_org_repo: bool = False, 179 includes_all_repositories: bool = False, 180 units=( 181 "repo.code", 182 "repo.issues", 183 "repo.ext_issues", 184 "repo.wiki", 185 "repo.pulls", 186 "repo.releases", 187 "repo.ext_wiki", 188 ), 189 units_map={}, 190 ) -> "Team": 191 """Alias for AllSpice#create_team""" 192 # TODO: Move AllSpice#create_team to Organization#create_team and 193 # deprecate AllSpice#create_team. 194 return self.allspice_client.create_team( 195 org=self, 196 name=name, 197 description=description, 198 permission=permission, 199 can_create_org_repo=can_create_org_repo, 200 includes_all_repositories=includes_all_repositories, 201 units=units, 202 units_map=units_map, 203 )
Alias for AllSpice#create_team
209 def is_member(self, username) -> bool: 210 if isinstance(username, User): 211 username = username.username 212 try: 213 # returns 204 if its ok, 404 if its not 214 self.allspice_client.requests_get( 215 Organization.ORG_IS_MEMBER % (self.username, username) 216 ) 217 return True 218 except Exception: 219 return False
225 def delete(self): 226 """Delete this Organization. Invalidates this Objects data. Also deletes all Repositories owned by the User""" 227 for repo in self.get_repositories(): 228 repo.delete() 229 self.allspice_client.requests_delete(Organization.API_OBJECT.format(name=self.username)) 230 self.deleted = True
Delete this Organization. Invalidates this Objects data. Also deletes all Repositories owned by the User
241class User(ApiObject): 242 active: bool 243 admin: Any 244 allow_create_organization: Any 245 allow_git_hook: Any 246 allow_import_local: Any 247 avatar_url: str 248 created: str 249 description: str 250 email: str 251 emails: List[Any] 252 followers_count: int 253 following_count: int 254 full_name: str 255 html_url: str 256 id: int 257 is_admin: bool 258 language: str 259 last_login: str 260 location: str 261 login: str 262 login_name: str 263 max_repo_creation: Any 264 must_change_password: Any 265 password: Any 266 prohibit_login: bool 267 restricted: bool 268 source_id: int 269 starred_repos_count: int 270 username: str 271 visibility: str 272 website: str 273 274 API_OBJECT = """/users/{name}""" # <org> 275 USER_MAIL = """/user/emails?sudo=%s""" # <name> 276 USER_PATCH = """/admin/users/%s""" # <username> 277 ADMIN_DELETE_USER = """/admin/users/%s""" # <username> 278 ADMIN_EDIT_USER = """/admin/users/{username}""" # <username> 279 USER_HEATMAP = """/users/%s/heatmap""" # <username> 280 281 def __init__(self, allspice_client): 282 super().__init__(allspice_client) 283 self._emails = [] 284 285 def __eq__(self, other): 286 if not isinstance(other, User): 287 return False 288 return self.allspice_client == other.allspice_client and self.id == other.id 289 290 def __hash__(self): 291 return hash(self.allspice_client) ^ hash(self.id) 292 293 @property 294 def emails(self): 295 self.__request_emails() 296 return self._emails 297 298 @classmethod 299 def request(cls, allspice_client, name: str) -> "User": 300 api_object = cls._request(allspice_client, {"name": name}) 301 return api_object 302 303 _patchable_fields: ClassVar[set[str]] = { 304 "active", 305 "admin", 306 "allow_create_organization", 307 "allow_git_hook", 308 "allow_import_local", 309 "email", 310 "full_name", 311 "location", 312 "login_name", 313 "max_repo_creation", 314 "must_change_password", 315 "password", 316 "prohibit_login", 317 "website", 318 } 319 320 def commit(self, login_name: str, source_id: int = 0): 321 """ 322 Unfortunately it is necessary to require the login name 323 as well as the login source (that is not supplied when getting a user) for 324 changing a user. 325 Usually source_id is 0 and the login_name is equal to the username. 326 """ 327 values = self.get_dirty_fields() 328 values.update( 329 # api-doc says that the "source_id" is necessary; works without though 330 {"login_name": login_name, "source_id": source_id} 331 ) 332 args = {"username": self.username} 333 self.allspice_client.requests_patch(User.ADMIN_EDIT_USER.format(**args), data=values) 334 self._dirty_fields = {} 335 336 def create_repo( 337 self, 338 repoName: str, 339 description: str = "", 340 private: bool = False, 341 autoInit=True, 342 gitignores: Optional[str] = None, 343 license: Optional[str] = None, 344 readme: str = "Default", 345 issue_labels: Optional[str] = None, 346 default_branch="master", 347 ): 348 """Create a user Repository 349 350 Throws: 351 AlreadyExistsException: If the Repository exists already. 352 Exception: If something else went wrong. 353 """ 354 result = self.allspice_client.requests_post( 355 "/user/repos", 356 data={ 357 "name": repoName, 358 "description": description, 359 "private": private, 360 "auto_init": autoInit, 361 "gitignores": gitignores, 362 "license": license, 363 "issue_labels": issue_labels, 364 "readme": readme, 365 "default_branch": default_branch, 366 }, 367 ) 368 if "id" in result: 369 self.allspice_client.logger.info("Successfully created Repository %s " % result["name"]) 370 else: 371 self.allspice_client.logger.error(result["message"]) 372 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 373 return Repository.parse_response(self.allspice_client, result) 374 375 def get_repositories(self) -> List["Repository"]: 376 """Get all Repositories owned by this User.""" 377 url = f"/users/{self.username}/repos" 378 results = self.allspice_client.requests_get_paginated(url) 379 return [Repository.parse_response(self.allspice_client, result) for result in results] 380 381 def get_orgs(self) -> List[Organization]: 382 """Get all Organizations this user is a member of.""" 383 url = f"/users/{self.username}/orgs" 384 results = self.allspice_client.requests_get_paginated(url) 385 return [Organization.parse_response(self.allspice_client, result) for result in results] 386 387 def get_teams(self) -> List["Team"]: 388 url = "/user/teams" 389 results = self.allspice_client.requests_get_paginated(url, sudo=self) 390 return [Team.parse_response(self.allspice_client, result) for result in results] 391 392 def get_accessible_repos(self) -> List["Repository"]: 393 """Get all Repositories accessible by the logged in User.""" 394 results = self.allspice_client.requests_get("/user/repos", sudo=self) 395 return [Repository.parse_response(self.allspice_client, result) for result in results] 396 397 def __request_emails(self): 398 result = self.allspice_client.requests_get(User.USER_MAIL % self.login) 399 # report if the adress changed by this 400 for mail in result: 401 self._emails.append(mail["email"]) 402 if mail["primary"]: 403 self._email = mail["email"] 404 405 def delete(self): 406 """Deletes this User. Also deletes all Repositories he owns.""" 407 self.allspice_client.requests_delete(User.ADMIN_DELETE_USER % self.username) 408 self.deleted = True 409 410 def get_heatmap(self) -> List[Tuple[datetime, int]]: 411 results = self.allspice_client.requests_get(User.USER_HEATMAP % self.username) 412 results = [ 413 (datetime.fromtimestamp(result["timestamp"]), result["contributions"]) 414 for result in results 415 ] 416 return results
320 def commit(self, login_name: str, source_id: int = 0): 321 """ 322 Unfortunately it is necessary to require the login name 323 as well as the login source (that is not supplied when getting a user) for 324 changing a user. 325 Usually source_id is 0 and the login_name is equal to the username. 326 """ 327 values = self.get_dirty_fields() 328 values.update( 329 # api-doc says that the "source_id" is necessary; works without though 330 {"login_name": login_name, "source_id": source_id} 331 ) 332 args = {"username": self.username} 333 self.allspice_client.requests_patch(User.ADMIN_EDIT_USER.format(**args), data=values) 334 self._dirty_fields = {}
Unfortunately it is necessary to require the login name as well as the login source (that is not supplied when getting a user) for changing a user. Usually source_id is 0 and the login_name is equal to the username.
336 def create_repo( 337 self, 338 repoName: str, 339 description: str = "", 340 private: bool = False, 341 autoInit=True, 342 gitignores: Optional[str] = None, 343 license: Optional[str] = None, 344 readme: str = "Default", 345 issue_labels: Optional[str] = None, 346 default_branch="master", 347 ): 348 """Create a user Repository 349 350 Throws: 351 AlreadyExistsException: If the Repository exists already. 352 Exception: If something else went wrong. 353 """ 354 result = self.allspice_client.requests_post( 355 "/user/repos", 356 data={ 357 "name": repoName, 358 "description": description, 359 "private": private, 360 "auto_init": autoInit, 361 "gitignores": gitignores, 362 "license": license, 363 "issue_labels": issue_labels, 364 "readme": readme, 365 "default_branch": default_branch, 366 }, 367 ) 368 if "id" in result: 369 self.allspice_client.logger.info("Successfully created Repository %s " % result["name"]) 370 else: 371 self.allspice_client.logger.error(result["message"]) 372 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 373 return Repository.parse_response(self.allspice_client, result)
Create a user Repository
Throws: AlreadyExistsException: If the Repository exists already. Exception: If something else went wrong.
375 def get_repositories(self) -> List["Repository"]: 376 """Get all Repositories owned by this User.""" 377 url = f"/users/{self.username}/repos" 378 results = self.allspice_client.requests_get_paginated(url) 379 return [Repository.parse_response(self.allspice_client, result) for result in results]
Get all Repositories owned by this User.
381 def get_orgs(self) -> List[Organization]: 382 """Get all Organizations this user is a member of.""" 383 url = f"/users/{self.username}/orgs" 384 results = self.allspice_client.requests_get_paginated(url) 385 return [Organization.parse_response(self.allspice_client, result) for result in results]
Get all Organizations this user is a member of.
392 def get_accessible_repos(self) -> List["Repository"]: 393 """Get all Repositories accessible by the logged in User.""" 394 results = self.allspice_client.requests_get("/user/repos", sudo=self) 395 return [Repository.parse_response(self.allspice_client, result) for result in results]
Get all Repositories accessible by the logged in User.
405 def delete(self): 406 """Deletes this User. Also deletes all Repositories he owns.""" 407 self.allspice_client.requests_delete(User.ADMIN_DELETE_USER % self.username) 408 self.deleted = True
Deletes this User. Also deletes all Repositories he owns.
419class Branch(ReadonlyApiObject): 420 commit: Dict[str, Optional[Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]]] 421 effective_branch_protection_name: str 422 enable_status_check: bool 423 name: str 424 protected: bool 425 required_approvals: int 426 status_check_contexts: List[Any] 427 user_can_merge: bool 428 user_can_push: bool 429 430 API_OBJECT = """/repos/{owner}/{repo}/branches/{branch}""" 431 432 def __init__(self, allspice_client): 433 super().__init__(allspice_client) 434 435 def __eq__(self, other): 436 if not isinstance(other, Branch): 437 return False 438 return self.commit == other.commit and self.name == other.name 439 440 def __hash__(self): 441 return hash(self.commit["id"]) ^ hash(self.name) 442 443 _fields_to_parsers: ClassVar[dict] = { 444 # This is not a commit object 445 # "commit": lambda allspice_client, c: Commit.parse_response(allspice_client, c) 446 } 447 448 @classmethod 449 def request(cls, allspice_client, owner: str, repo: str, branch: str): 450 return cls._request(allspice_client, {"owner": owner, "repo": repo, "branch": branch})
Inherited Members
453class GitEntry(ReadonlyApiObject): 454 """ 455 An object representing a file or directory in the Git tree. 456 """ 457 458 mode: str 459 path: str 460 sha: str 461 size: int 462 type: str 463 url: str 464 465 def __init__(self, allspice_client): 466 super().__init__(allspice_client) 467 468 def __eq__(self, other) -> bool: 469 if not isinstance(other, GitEntry): 470 return False 471 return self.sha == other.sha 472 473 def __hash__(self) -> int: 474 return hash(self.sha)
An object representing a file or directory in the Git tree.
Inherited Members
477class Repository(ApiObject): 478 allow_fast_forward_only_merge: bool 479 allow_manual_merge: Any 480 allow_merge_commits: bool 481 allow_rebase: bool 482 allow_rebase_explicit: bool 483 allow_rebase_update: bool 484 allow_squash_merge: bool 485 archived: bool 486 archived_at: str 487 autodetect_manual_merge: Any 488 avatar_url: str 489 clone_url: str 490 created_at: str 491 default_allow_maintainer_edit: bool 492 default_branch: str 493 default_delete_branch_after_merge: bool 494 default_merge_style: str 495 description: str 496 empty: bool 497 enable_prune: Any 498 external_tracker: Any 499 external_wiki: Any 500 fork: bool 501 forks_count: int 502 full_name: str 503 has_actions: bool 504 has_issues: bool 505 has_packages: bool 506 has_projects: bool 507 has_pull_requests: bool 508 has_releases: bool 509 has_wiki: bool 510 html_url: str 511 id: int 512 ignore_whitespace_conflicts: bool 513 internal: bool 514 internal_tracker: Dict[str, bool] 515 language: str 516 languages_url: str 517 link: str 518 mirror: bool 519 mirror_interval: str 520 mirror_updated: str 521 name: str 522 object_format_name: str 523 open_issues_count: int 524 open_pr_counter: int 525 original_url: str 526 owner: Union["User", "Organization"] 527 parent: Any 528 permissions: Dict[str, bool] 529 private: bool 530 projects_mode: str 531 release_counter: int 532 repo_transfer: Any 533 size: int 534 ssh_url: str 535 stars_count: int 536 template: bool 537 updated_at: datetime 538 url: str 539 watchers_count: int 540 website: str 541 542 API_OBJECT = """/repos/{owner}/{name}""" # <owner>, <reponame> 543 REPO_IS_COLLABORATOR = """/repos/%s/%s/collaborators/%s""" # <owner>, <reponame>, <username> 544 REPO_SEARCH = """/repos/search/""" 545 REPO_BRANCHES = """/repos/%s/%s/branches""" # <owner>, <reponame> 546 REPO_BRANCH = """/repos/{owner}/{repo}/branches/{branch}""" 547 REPO_ISSUES = """/repos/{owner}/{repo}/issues""" # <owner, reponame> 548 REPO_DESIGN_REVIEWS = """/repos/{owner}/{repo}/pulls""" 549 REPO_DELETE = """/repos/%s/%s""" # <owner>, <reponame> 550 REPO_TIMES = """/repos/%s/%s/times""" # <owner>, <reponame> 551 REPO_USER_TIME = """/repos/%s/%s/times/%s""" # <owner>, <reponame>, <username> 552 REPO_COMMITS = "/repos/%s/%s/commits" # <owner>, <reponame> 553 REPO_TRANSFER = "/repos/{owner}/{repo}/transfer" 554 REPO_MILESTONES = """/repos/{owner}/{repo}/milestones""" 555 REPO_GET_ARCHIVE = "/repos/{owner}/{repo}/archive/{ref}.{format}" 556 REPO_GET_ALLSPICE_JSON = "/repos/{owner}/{repo}/allspice_generated/json/{content}" 557 REPO_GET_ALLSPICE_SVG = "/repos/{owner}/{repo}/allspice_generated/svg/{content}" 558 REPO_GET_TOPICS = "/repos/{owner}/{repo}/topics" 559 REPO_ADD_TOPIC = "/repos/{owner}/{repo}/topics/{topic}" 560 REPO_GET_RELEASES = "/repos/{owner}/{repo}/releases" 561 REPO_GET_LATEST_RELEASE = "/repos/{owner}/{repo}/releases/latest" 562 REPO_GET_RELEASE_BY_TAG = "/repos/{owner}/{repo}/releases/tags/{tag}" 563 REPO_GET_COMMIT_STATUS = "/repos/{owner}/{repo}/statuses/{sha}" 564 REPO_GET_RAW_FILE = "/repos/{owner}/{repo}/raw/{path}" 565 REPO_GET_TREE = "/repos/{owner}/{repo}/git/trees/{ref}" 566 567 class ArchiveFormat(Enum): 568 """ 569 Archive formats for Repository.get_archive 570 """ 571 572 TAR = "tar.gz" 573 ZIP = "zip" 574 575 class CommitStatusSort(Enum): 576 """ 577 Sort order for Repository.get_commit_status 578 """ 579 580 OLDEST = "oldest" 581 RECENT_UPDATE = "recentupdate" 582 LEAST_UPDATE = "leastupdate" 583 LEAST_INDEX = "leastindex" 584 HIGHEST_INDEX = "highestindex" 585 586 def __init__(self, allspice_client): 587 super().__init__(allspice_client) 588 589 def __eq__(self, other): 590 if not isinstance(other, Repository): 591 return False 592 return self.owner == other.owner and self.name == other.name 593 594 def __hash__(self): 595 return hash(self.owner) ^ hash(self.name) 596 597 _fields_to_parsers: ClassVar[dict] = { 598 # dont know how to tell apart user and org as owner except form email being empty. 599 "owner": lambda allspice_client, r: ( 600 Organization.parse_response(allspice_client, r) 601 if r["email"] == "" 602 else User.parse_response(allspice_client, r) 603 ), 604 "updated_at": lambda _, t: Util.convert_time(t), 605 } 606 607 @classmethod 608 def request( 609 cls, 610 allspice_client, 611 owner: str, 612 name: str, 613 ) -> Repository: 614 return cls._request(allspice_client, {"owner": owner, "name": name}) 615 616 @classmethod 617 def search( 618 cls, 619 allspice_client, 620 query: Optional[str] = None, 621 topic: bool = False, 622 include_description: bool = False, 623 user: Optional[User] = None, 624 owner_to_prioritize: Union[User, Organization, None] = None, 625 ) -> list[Repository]: 626 """ 627 Search for repositories. 628 629 See https://hub.allspice.io/api/swagger#/repository/repoSearch 630 631 :param query: The query string to search for 632 :param topic: If true, the query string will only be matched against the 633 repository's topic. 634 :param include_description: If true, the query string will be matched 635 against the repository's description as well. 636 :param user: If specified, only repositories that this user owns or 637 contributes to will be searched. 638 :param owner_to_prioritize: If specified, repositories owned by the 639 given entity will be prioritized in the search. 640 :returns: All repositories matching the query. If there are many 641 repositories matching this query, this may take some time. 642 """ 643 644 params = {} 645 646 if query is not None: 647 params["q"] = query 648 if topic: 649 params["topic"] = topic 650 if include_description: 651 params["include_description"] = include_description 652 if user is not None: 653 params["user"] = user.id 654 if owner_to_prioritize is not None: 655 params["owner_to_prioritize"] = owner_to_prioritize.id 656 657 responses = allspice_client.requests_get_paginated(cls.REPO_SEARCH, params=params) 658 659 return [Repository.parse_response(allspice_client, response) for response in responses] 660 661 _patchable_fields: ClassVar[set[str]] = { 662 "allow_manual_merge", 663 "allow_merge_commits", 664 "allow_rebase", 665 "allow_rebase_explicit", 666 "allow_rebase_update", 667 "allow_squash_merge", 668 "archived", 669 "autodetect_manual_merge", 670 "default_branch", 671 "default_delete_branch_after_merge", 672 "default_merge_style", 673 "description", 674 "enable_prune", 675 "external_tracker", 676 "external_wiki", 677 "has_issues", 678 "has_projects", 679 "has_pull_requests", 680 "has_wiki", 681 "ignore_whitespace_conflicts", 682 "internal_tracker", 683 "mirror_interval", 684 "name", 685 "private", 686 "template", 687 "website", 688 } 689 690 def commit(self): 691 args = {"owner": self.owner.username, "name": self.name} 692 self._commit(args) 693 694 def get_branches(self) -> List["Branch"]: 695 """Get all the Branches of this Repository.""" 696 697 results = self.allspice_client.requests_get_paginated( 698 Repository.REPO_BRANCHES % (self.owner.username, self.name) 699 ) 700 return [Branch.parse_response(self.allspice_client, result) for result in results] 701 702 def get_branch(self, name: str) -> "Branch": 703 """Get a specific Branch of this Repository.""" 704 result = self.allspice_client.requests_get( 705 Repository.REPO_BRANCH.format(owner=self.owner.username, repo=self.name, branch=name) 706 ) 707 return Branch.parse_response(self.allspice_client, result) 708 709 def add_branch(self, create_from: Ref, newname: str) -> "Branch": 710 """Add a branch to the repository""" 711 # Note: will only work with gitea 1.13 or higher! 712 713 ref_name = Util.data_params_for_ref(create_from) 714 if "ref" not in ref_name: 715 raise ValueError("create_from must be a Branch, Commit or string") 716 ref_name = ref_name["ref"] 717 718 data = {"new_branch_name": newname, "old_ref_name": ref_name} 719 result = self.allspice_client.requests_post( 720 Repository.REPO_BRANCHES % (self.owner.username, self.name), data=data 721 ) 722 return Branch.parse_response(self.allspice_client, result) 723 724 def get_issues( 725 self, 726 state: Literal["open", "closed", "all"] = "all", 727 search_query: Optional[str] = None, 728 labels: Optional[List[str]] = None, 729 milestones: Optional[List[Union[Milestone, str]]] = None, 730 assignee: Optional[Union[User, str]] = None, 731 since: Optional[datetime] = None, 732 before: Optional[datetime] = None, 733 ) -> List["Issue"]: 734 """ 735 Get all Issues of this Repository (open and closed) 736 737 https://hub.allspice.io/api/swagger#/repository/repoListIssues 738 739 All params of this method are optional filters. If you don't specify a filter, it 740 will not be applied. 741 742 :param state: The state of the Issues to get. If None, all Issues are returned. 743 :param search_query: Filter issues by text. This is equivalent to searching for 744 `search_query` in the Issues on the web interface. 745 :param labels: Filter issues by labels. 746 :param milestones: Filter issues by milestones. 747 :param assignee: Filter issues by the assigned user. 748 :param since: Filter issues by the date they were created. 749 :param before: Filter issues by the date they were created. 750 :return: A list of Issues. 751 """ 752 753 data = { 754 "state": state, 755 } 756 if search_query: 757 data["q"] = search_query 758 if labels: 759 data["labels"] = ",".join(labels) 760 if milestones: 761 data["milestone"] = ",".join( 762 [ 763 milestone.name if isinstance(milestone, Milestone) else milestone 764 for milestone in milestones 765 ] 766 ) 767 if assignee: 768 if isinstance(assignee, User): 769 data["assignee"] = assignee.username 770 else: 771 data["assignee"] = assignee 772 if since: 773 data["since"] = Util.format_time(since) 774 if before: 775 data["before"] = Util.format_time(before) 776 777 results = self.allspice_client.requests_get_paginated( 778 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 779 params=data, 780 ) 781 782 issues = [] 783 for result in results: 784 issue = Issue.parse_response(self.allspice_client, result) 785 # See Issue.request 786 setattr(issue, "_repository", self) 787 # This is mostly for compatibility with an older implementation 788 Issue._add_read_property("repo", self, issue) 789 issues.append(issue) 790 791 return issues 792 793 def get_design_reviews( 794 self, 795 state: Literal["open", "closed", "all"] = "all", 796 milestone: Optional[Union[Milestone, str]] = None, 797 labels: Optional[List[str]] = None, 798 ) -> List["DesignReview"]: 799 """ 800 Get all Design Reviews of this Repository. 801 802 https://hub.allspice.io/api/swagger#/repository/repoListPullRequests 803 804 :param state: The state of the Design Reviews to get. If None, all Design Reviews 805 are returned. 806 :param milestone: The milestone of the Design Reviews to get. 807 :param labels: A list of label IDs to filter DRs by. 808 :return: A list of Design Reviews. 809 """ 810 811 params = { 812 "state": state, 813 } 814 if milestone: 815 if isinstance(milestone, Milestone): 816 params["milestone"] = milestone.name 817 else: 818 params["milestone"] = milestone 819 if labels: 820 params["labels"] = ",".join(labels) 821 822 results = self.allspice_client.requests_get_paginated( 823 self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name), 824 params=params, 825 ) 826 return [DesignReview.parse_response(self.allspice_client, result) for result in results] 827 828 def get_commits( 829 self, 830 sha: Optional[str] = None, 831 path: Optional[str] = None, 832 stat: bool = True, 833 ) -> List["Commit"]: 834 """ 835 Get all the Commits of this Repository. 836 837 https://hub.allspice.io/api/swagger#/repository/repoGetAllCommits 838 839 :param sha: The SHA of the commit to start listing commits from. 840 :param path: filepath of a file/dir. 841 :param stat: Include the number of additions and deletions in the response. 842 Disable for speedup. 843 :return: A list of Commits. 844 """ 845 846 data = {} 847 if sha: 848 data["sha"] = sha 849 if path: 850 data["path"] = path 851 if not stat: 852 data["stat"] = False 853 854 try: 855 results = self.allspice_client.requests_get_paginated( 856 Repository.REPO_COMMITS % (self.owner.username, self.name), 857 params=data, 858 ) 859 except ConflictException as err: 860 logging.warning(err) 861 logging.warning("Repository %s/%s is Empty" % (self.owner.username, self.name)) 862 results = [] 863 return [Commit.parse_response(self.allspice_client, result) for result in results] 864 865 def get_issues_state(self, state) -> List["Issue"]: 866 """ 867 DEPRECATED: Use get_issues() instead. 868 869 Get issues of state Issue.open or Issue.closed of a repository. 870 """ 871 872 assert state in [Issue.OPENED, Issue.CLOSED] 873 issues = [] 874 data = {"state": state} 875 results = self.allspice_client.requests_get_paginated( 876 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 877 params=data, 878 ) 879 for result in results: 880 issue = Issue.parse_response(self.allspice_client, result) 881 # adding data not contained in the issue response 882 # See Issue.request() 883 setattr(issue, "_repository", self) 884 Issue._add_read_property("repo", self, issue) 885 Issue._add_read_property("owner", self.owner, issue) 886 issues.append(issue) 887 return issues 888 889 def get_times(self): 890 results = self.allspice_client.requests_get( 891 Repository.REPO_TIMES % (self.owner.username, self.name) 892 ) 893 return results 894 895 def get_user_time(self, username) -> float: 896 if isinstance(username, User): 897 username = username.username 898 results = self.allspice_client.requests_get( 899 Repository.REPO_USER_TIME % (self.owner.username, self.name, username) 900 ) 901 time = sum(r["time"] for r in results) 902 return time 903 904 def get_full_name(self) -> str: 905 return self.owner.username + "/" + self.name 906 907 def create_issue(self, title, assignees=frozenset(), description="") -> ApiObject: 908 data = { 909 "assignees": assignees, 910 "body": description, 911 "closed": False, 912 "title": title, 913 } 914 result = self.allspice_client.requests_post( 915 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 916 data=data, 917 ) 918 919 issue = Issue.parse_response(self.allspice_client, result) 920 setattr(issue, "_repository", self) 921 Issue._add_read_property("repo", self, issue) 922 return issue 923 924 def create_design_review( 925 self, 926 title: str, 927 head: Union[Branch, str], 928 base: Union[Branch, str], 929 assignees: Optional[Set[Union[User, str]]] = None, 930 body: Optional[str] = None, 931 due_date: Optional[datetime] = None, 932 milestone: Optional["Milestone"] = None, 933 ) -> "DesignReview": 934 """ 935 Create a new Design Review. 936 937 See https://hub.allspice.io/api/swagger#/repository/repoCreatePullRequest 938 939 :param title: Title of the Design Review 940 :param head: Branch or name of the branch to merge into the base branch 941 :param base: Branch or name of the branch to merge into 942 :param assignees: Optional. A list of users to assign this review. List can be of 943 User objects or of usernames. 944 :param body: An Optional Description for the Design Review. 945 :param due_date: An Optional Due date for the Design Review. 946 :param milestone: An Optional Milestone for the Design Review 947 :return: The created Design Review 948 """ 949 950 data: dict[str, Any] = { 951 "title": title, 952 } 953 954 if isinstance(head, Branch): 955 data["head"] = head.name 956 else: 957 data["head"] = head 958 if isinstance(base, Branch): 959 data["base"] = base.name 960 else: 961 data["base"] = base 962 if assignees: 963 data["assignees"] = [a.username if isinstance(a, User) else a for a in assignees] 964 if body: 965 data["body"] = body 966 if due_date: 967 data["due_date"] = Util.format_time(due_date) 968 if milestone: 969 data["milestone"] = milestone.id 970 971 result = self.allspice_client.requests_post( 972 self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name), 973 data=data, 974 ) 975 976 return DesignReview.parse_response(self.allspice_client, result) 977 978 def create_milestone( 979 self, 980 title: str, 981 description: str, 982 due_date: Optional[str] = None, 983 state: str = "open", 984 ) -> "Milestone": 985 url = Repository.REPO_MILESTONES.format(owner=self.owner.username, repo=self.name) 986 data = {"title": title, "description": description, "state": state} 987 if due_date: 988 data["due_date"] = due_date 989 result = self.allspice_client.requests_post(url, data=data) 990 return Milestone.parse_response(self.allspice_client, result) 991 992 def create_gitea_hook(self, hook_url: str, events: List[str]): 993 url = f"/repos/{self.owner.username}/{self.name}/hooks" 994 data = { 995 "type": "gitea", 996 "config": {"content_type": "json", "url": hook_url}, 997 "events": events, 998 "active": True, 999 } 1000 return self.allspice_client.requests_post(url, data=data) 1001 1002 def list_hooks(self): 1003 url = f"/repos/{self.owner.username}/{self.name}/hooks" 1004 return self.allspice_client.requests_get(url) 1005 1006 def delete_hook(self, id: str): 1007 url = f"/repos/{self.owner.username}/{self.name}/hooks/{id}" 1008 self.allspice_client.requests_delete(url) 1009 1010 def is_collaborator(self, username) -> bool: 1011 if isinstance(username, User): 1012 username = username.username 1013 try: 1014 # returns 204 if its ok, 404 if its not 1015 self.allspice_client.requests_get( 1016 Repository.REPO_IS_COLLABORATOR % (self.owner.username, self.name, username) 1017 ) 1018 return True 1019 except Exception: 1020 return False 1021 1022 def get_users_with_access(self) -> Sequence[User]: 1023 url = f"/repos/{self.owner.username}/{self.name}/collaborators" 1024 response = self.allspice_client.requests_get(url) 1025 collabs = [User.parse_response(self.allspice_client, user) for user in response] 1026 if isinstance(self.owner, User): 1027 return [*collabs, self.owner] 1028 else: 1029 # owner must be org 1030 teams = self.owner.get_teams() 1031 for team in teams: 1032 team_repos = team.get_repos() 1033 if self.name in [n.name for n in team_repos]: 1034 collabs += team.get_members() 1035 return collabs 1036 1037 def remove_collaborator(self, user_name: str): 1038 url = f"/repos/{self.owner.username}/{self.name}/collaborators/{user_name}" 1039 self.allspice_client.requests_delete(url) 1040 1041 def transfer_ownership( 1042 self, 1043 new_owner: Union[User, Organization], 1044 new_teams: Set[Team] | FrozenSet[Team] = frozenset(), 1045 ): 1046 url = Repository.REPO_TRANSFER.format(owner=self.owner.username, repo=self.name) 1047 data: dict[str, Any] = {"new_owner": new_owner.username} 1048 if isinstance(new_owner, Organization): 1049 new_team_ids = [team.id for team in new_teams if team in new_owner.get_teams()] 1050 data["team_ids"] = new_team_ids 1051 self.allspice_client.requests_post(url, data=data) 1052 # TODO: make sure this instance is either updated or discarded 1053 1054 def get_git_content( 1055 self, 1056 ref: Optional["Ref"] = None, 1057 commit: "Optional[Commit]" = None, 1058 ) -> List[Content]: 1059 """ 1060 Get the metadata for all files in the root directory. 1061 1062 https://hub.allspice.io/api/swagger#/repository/repoGetContentsList 1063 1064 :param ref: branch or commit to get content from 1065 :param commit: commit to get content from (deprecated) 1066 """ 1067 url = f"/repos/{self.owner.username}/{self.name}/contents" 1068 data = Util.data_params_for_ref(ref or commit) 1069 1070 result = [ 1071 Content.parse_response(self.allspice_client, f) 1072 for f in self.allspice_client.requests_get(url, data) 1073 ] 1074 return result 1075 1076 def get_tree(self, ref: Optional[Ref] = None, recursive: bool = False) -> List[GitEntry]: 1077 """ 1078 Get the repository's tree on a given ref. 1079 1080 By default, this will only return the top-level entries in the tree. If you want 1081 to get the entire tree, set `recursive` to True. 1082 1083 :param ref: The ref to get the tree from. If not provided, the default branch is used. 1084 :param recursive: Whether to get the entire tree or just the top-level entries. 1085 """ 1086 1087 ref = Util.data_params_for_ref(ref).get("ref", self.default_branch) 1088 url = self.REPO_GET_TREE.format(owner=self.owner.username, repo=self.name, ref=ref) 1089 params = {"recursive": recursive} 1090 results = self.allspice_client.requests_get_paginated(url, params=params) 1091 return [GitEntry.parse_response(self.allspice_client, result) for result in results] 1092 1093 def get_file_content( 1094 self, 1095 content: Content, 1096 ref: Optional[Ref] = None, 1097 commit: Optional[Commit] = None, 1098 ) -> Union[str, List["Content"]]: 1099 """https://hub.allspice.io/api/swagger#/repository/repoGetContents""" 1100 url = f"/repos/{self.owner.username}/{self.name}/contents/{content.path}" 1101 data = Util.data_params_for_ref(ref or commit) 1102 1103 if content.type == Content.FILE: 1104 return self.allspice_client.requests_get(url, data)["content"] 1105 else: 1106 return [ 1107 Content.parse_response(self.allspice_client, f) 1108 for f in self.allspice_client.requests_get(url, data) 1109 ] 1110 1111 def get_raw_file( 1112 self, 1113 file_path: str, 1114 ref: Optional[Ref] = None, 1115 ) -> bytes: 1116 """ 1117 Get the raw, binary data of a single file. 1118 1119 Note 1: if the file you are requesting is a text file, you might want to 1120 use .decode() on the result to get a string. For example: 1121 1122 content = repo.get_raw_file("file.txt").decode("utf-8") 1123 1124 Note 2: this method will store the entire file in memory. If you want 1125 to download a large file, you might want to use `download_to_file` 1126 instead. 1127 1128 See https://hub.allspice.io/api/swagger#/repository/repoGetRawFile 1129 1130 :param file_path: The path to the file to get. 1131 :param ref: The branch or commit to get the file from. If not provided, 1132 the default branch is used. 1133 """ 1134 1135 url = self.REPO_GET_RAW_FILE.format( 1136 owner=self.owner.username, 1137 repo=self.name, 1138 path=file_path, 1139 ) 1140 params = Util.data_params_for_ref(ref) 1141 return self.allspice_client.requests_get_raw(url, params=params) 1142 1143 def download_to_file( 1144 self, 1145 file_path: str, 1146 io: IO, 1147 ref: Optional[Ref] = None, 1148 ) -> None: 1149 """ 1150 Download the binary data of a file to a file-like object. 1151 1152 Example: 1153 1154 with open("schematic.DSN", "wb") as f: 1155 Repository.download_to_file("Schematics/my_schematic.DSN", f) 1156 1157 :param file_path: The path to the file in the repository from the root 1158 of the repository. 1159 :param io: The file-like object to write the data to. 1160 """ 1161 1162 url = self.allspice_client._AllSpice__get_url( 1163 self.REPO_GET_RAW_FILE.format( 1164 owner=self.owner.username, 1165 repo=self.name, 1166 path=file_path, 1167 ) 1168 ) 1169 params = Util.data_params_for_ref(ref) 1170 response = self.allspice_client.requests.get( 1171 url, 1172 params=params, 1173 headers=self.allspice_client.headers, 1174 stream=True, 1175 ) 1176 1177 for chunk in response.iter_content(chunk_size=4096): 1178 if chunk: 1179 io.write(chunk) 1180 1181 def get_generated_json(self, content: Union[Content, str], ref: Optional[Ref] = None) -> dict: 1182 """ 1183 Get the json blob for a cad file if it exists, otherwise enqueue 1184 a new job and return a 503 status. 1185 1186 WARNING: This is still experimental and not recommended for critical 1187 applications. The structure and content of the returned dictionary can 1188 change at any time. 1189 1190 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON 1191 """ 1192 1193 if isinstance(content, Content): 1194 content = content.path 1195 1196 url = self.REPO_GET_ALLSPICE_JSON.format( 1197 owner=self.owner.username, 1198 repo=self.name, 1199 content=content, 1200 ) 1201 data = Util.data_params_for_ref(ref) 1202 return self.allspice_client.requests_get(url, data) 1203 1204 def get_generated_svg(self, content: Union[Content, str], ref: Optional[Ref] = None) -> bytes: 1205 """ 1206 Get the svg blob for a cad file if it exists, otherwise enqueue 1207 a new job and return a 503 status. 1208 1209 WARNING: This is still experimental and not yet recommended for 1210 critical applications. The content of the returned svg can change 1211 at any time. 1212 1213 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG 1214 """ 1215 1216 if isinstance(content, Content): 1217 content = content.path 1218 1219 url = self.REPO_GET_ALLSPICE_SVG.format( 1220 owner=self.owner.username, 1221 repo=self.name, 1222 content=content, 1223 ) 1224 data = Util.data_params_for_ref(ref) 1225 return self.allspice_client.requests_get_raw(url, data) 1226 1227 def create_file(self, file_path: str, content: str, data: Optional[dict] = None): 1228 """https://hub.allspice.io/api/swagger#/repository/repoCreateFile""" 1229 if not data: 1230 data = {} 1231 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1232 data.update({"content": content}) 1233 return self.allspice_client.requests_post(url, data) 1234 1235 def change_file(self, file_path: str, file_sha: str, content: str, data: Optional[dict] = None): 1236 """https://hub.allspice.io/api/swagger#/repository/repoCreateFile""" 1237 if not data: 1238 data = {} 1239 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1240 data.update({"sha": file_sha, "content": content}) 1241 return self.allspice_client.requests_put(url, data) 1242 1243 def delete_file(self, file_path: str, file_sha: str, data: Optional[dict] = None): 1244 """https://hub.allspice.io/api/swagger#/repository/repoDeleteFile""" 1245 if not data: 1246 data = {} 1247 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1248 data.update({"sha": file_sha}) 1249 return self.allspice_client.requests_delete(url, data) 1250 1251 def get_archive( 1252 self, 1253 ref: Ref = "main", 1254 archive_format: ArchiveFormat = ArchiveFormat.ZIP, 1255 ) -> bytes: 1256 """ 1257 Download all the files in a specific ref of a repository as a zip or tarball 1258 archive. 1259 1260 https://hub.allspice.io/api/swagger#/repository/repoGetArchive 1261 1262 :param ref: branch or commit to get content from, defaults to the "main" branch 1263 :param archive_format: zip or tar, defaults to zip 1264 """ 1265 1266 ref_string = Util.data_params_for_ref(ref)["ref"] 1267 url = self.REPO_GET_ARCHIVE.format( 1268 owner=self.owner.username, 1269 repo=self.name, 1270 ref=ref_string, 1271 format=archive_format.value, 1272 ) 1273 return self.allspice_client.requests_get_raw(url) 1274 1275 def get_topics(self) -> list[str]: 1276 """ 1277 Gets the list of topics on this repository. 1278 1279 See http://localhost:3000/api/swagger#/repository/repoListTopics 1280 """ 1281 1282 url = self.REPO_GET_TOPICS.format( 1283 owner=self.owner.username, 1284 repo=self.name, 1285 ) 1286 return self.allspice_client.requests_get(url)["topics"] 1287 1288 def add_topic(self, topic: str): 1289 """ 1290 Adds a topic to the repository. 1291 1292 See https://hub.allspice.io/api/swagger#/repository/repoAddTopic 1293 1294 :param topic: The topic to add. Topic names must consist only of 1295 lowercase letters, numnbers and dashes (-), and cannot start with 1296 dashes. Topic names also must be under 35 characters long. 1297 """ 1298 1299 url = self.REPO_ADD_TOPIC.format(owner=self.owner.username, repo=self.name, topic=topic) 1300 self.allspice_client.requests_put(url) 1301 1302 def create_release( 1303 self, 1304 tag_name: str, 1305 name: Optional[str] = None, 1306 body: Optional[str] = None, 1307 draft: bool = False, 1308 ): 1309 """ 1310 Create a release for this repository. The release will be created for 1311 the tag with the given name. If there is no tag with this name, create 1312 the tag first. 1313 1314 See https://hub.allspice.io/api/swagger#/repository/repoCreateRelease 1315 """ 1316 1317 url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name) 1318 data = { 1319 "tag_name": tag_name, 1320 "draft": draft, 1321 } 1322 if name is not None: 1323 data["name"] = name 1324 if body is not None: 1325 data["body"] = body 1326 response = self.allspice_client.requests_post(url, data) 1327 return Release.parse_response(self.allspice_client, response, self) 1328 1329 def get_releases( 1330 self, draft: Optional[bool] = None, pre_release: Optional[bool] = None 1331 ) -> List[Release]: 1332 """ 1333 Get the list of releases for this repository. 1334 1335 See https://hub.allspice.io/api/swagger#/repository/repoListReleases 1336 """ 1337 1338 data = {} 1339 1340 if draft is not None: 1341 data["draft"] = draft 1342 if pre_release is not None: 1343 data["pre-release"] = pre_release 1344 1345 url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name) 1346 responses = self.allspice_client.requests_get_paginated(url, params=data) 1347 1348 return [ 1349 Release.parse_response(self.allspice_client, response, self) for response in responses 1350 ] 1351 1352 def get_latest_release(self) -> Release: 1353 """ 1354 Get the latest release for this repository. 1355 1356 See https://hub.allspice.io/api/swagger#/repository/repoGetLatestRelease 1357 """ 1358 1359 url = self.REPO_GET_LATEST_RELEASE.format(owner=self.owner.username, repo=self.name) 1360 response = self.allspice_client.requests_get(url) 1361 release = Release.parse_response(self.allspice_client, response, self) 1362 return release 1363 1364 def get_release_by_tag(self, tag: str) -> Release: 1365 """ 1366 Get a release by its tag. 1367 1368 See https://hub.allspice.io/api/swagger#/repository/repoGetReleaseByTag 1369 """ 1370 1371 url = self.REPO_GET_RELEASE_BY_TAG.format( 1372 owner=self.owner.username, repo=self.name, tag=tag 1373 ) 1374 response = self.allspice_client.requests_get(url) 1375 release = Release.parse_response(self.allspice_client, response, self) 1376 return release 1377 1378 def get_commit_statuses( 1379 self, 1380 commit: Union[str, Commit], 1381 sort: Optional[CommitStatusSort] = None, 1382 state: Optional[CommitStatusState] = None, 1383 ) -> List[CommitStatus]: 1384 """ 1385 Get a list of statuses for a commit. 1386 1387 This is roughly equivalent to the Commit.get_statuses method, but this 1388 method allows you to sort and filter commits and is more convenient if 1389 you have a commit SHA and don't need to get the commit itself. 1390 1391 See https://hub.allspice.io/api/swagger#/repository/repoListStatuses 1392 """ 1393 1394 if isinstance(commit, Commit): 1395 commit = commit.sha 1396 1397 params = {} 1398 if sort is not None: 1399 params["sort"] = sort.value 1400 if state is not None: 1401 params["state"] = state.value 1402 1403 url = self.REPO_GET_COMMIT_STATUS.format( 1404 owner=self.owner.username, repo=self.name, sha=commit 1405 ) 1406 response = self.allspice_client.requests_get_paginated(url, params=params) 1407 return [CommitStatus.parse_response(self.allspice_client, status) for status in response] 1408 1409 def create_commit_status( 1410 self, 1411 commit: Union[str, Commit], 1412 context: Optional[str] = None, 1413 description: Optional[str] = None, 1414 state: Optional[CommitStatusState] = None, 1415 target_url: Optional[str] = None, 1416 ) -> CommitStatus: 1417 """ 1418 Create a status on a commit. 1419 1420 See https://hub.allspice.io/api/swagger#/repository/repoCreateStatus 1421 """ 1422 1423 if isinstance(commit, Commit): 1424 commit = commit.sha 1425 1426 data = {} 1427 if context is not None: 1428 data["context"] = context 1429 if description is not None: 1430 data["description"] = description 1431 if state is not None: 1432 data["state"] = state.value 1433 if target_url is not None: 1434 data["target_url"] = target_url 1435 1436 url = self.REPO_GET_COMMIT_STATUS.format( 1437 owner=self.owner.username, repo=self.name, sha=commit 1438 ) 1439 response = self.allspice_client.requests_post(url, data=data) 1440 return CommitStatus.parse_response(self.allspice_client, response) 1441 1442 def delete(self): 1443 self.allspice_client.requests_delete( 1444 Repository.REPO_DELETE % (self.owner.username, self.name) 1445 ) 1446 self.deleted = True
616 @classmethod 617 def search( 618 cls, 619 allspice_client, 620 query: Optional[str] = None, 621 topic: bool = False, 622 include_description: bool = False, 623 user: Optional[User] = None, 624 owner_to_prioritize: Union[User, Organization, None] = None, 625 ) -> list[Repository]: 626 """ 627 Search for repositories. 628 629 See https://hub.allspice.io/api/swagger#/repository/repoSearch 630 631 :param query: The query string to search for 632 :param topic: If true, the query string will only be matched against the 633 repository's topic. 634 :param include_description: If true, the query string will be matched 635 against the repository's description as well. 636 :param user: If specified, only repositories that this user owns or 637 contributes to will be searched. 638 :param owner_to_prioritize: If specified, repositories owned by the 639 given entity will be prioritized in the search. 640 :returns: All repositories matching the query. If there are many 641 repositories matching this query, this may take some time. 642 """ 643 644 params = {} 645 646 if query is not None: 647 params["q"] = query 648 if topic: 649 params["topic"] = topic 650 if include_description: 651 params["include_description"] = include_description 652 if user is not None: 653 params["user"] = user.id 654 if owner_to_prioritize is not None: 655 params["owner_to_prioritize"] = owner_to_prioritize.id 656 657 responses = allspice_client.requests_get_paginated(cls.REPO_SEARCH, params=params) 658 659 return [Repository.parse_response(allspice_client, response) for response in responses]
Search for repositories.
See allspice.allspice.io/api/swagger#/repository/repoSearch">https://huballspice.allspice.io/api/swagger#/repository/repoSearch
Parameters
- query: The query string to search for
- topic: If true, the query string will only be matched against the repository's topic.
- include_description: If true, the query string will be matched against the repository's description as well.
- user: If specified, only repositories that this user owns or contributes to will be searched.
- owner_to_prioritize: If specified, repositories owned by the given entity will be prioritized in the search. :returns: All repositories matching the query. If there are many repositories matching this query, this may take some time.
694 def get_branches(self) -> List["Branch"]: 695 """Get all the Branches of this Repository.""" 696 697 results = self.allspice_client.requests_get_paginated( 698 Repository.REPO_BRANCHES % (self.owner.username, self.name) 699 ) 700 return [Branch.parse_response(self.allspice_client, result) for result in results]
Get all the Branches of this Repository.
702 def get_branch(self, name: str) -> "Branch": 703 """Get a specific Branch of this Repository.""" 704 result = self.allspice_client.requests_get( 705 Repository.REPO_BRANCH.format(owner=self.owner.username, repo=self.name, branch=name) 706 ) 707 return Branch.parse_response(self.allspice_client, result)
Get a specific Branch of this Repository.
709 def add_branch(self, create_from: Ref, newname: str) -> "Branch": 710 """Add a branch to the repository""" 711 # Note: will only work with gitea 1.13 or higher! 712 713 ref_name = Util.data_params_for_ref(create_from) 714 if "ref" not in ref_name: 715 raise ValueError("create_from must be a Branch, Commit or string") 716 ref_name = ref_name["ref"] 717 718 data = {"new_branch_name": newname, "old_ref_name": ref_name} 719 result = self.allspice_client.requests_post( 720 Repository.REPO_BRANCHES % (self.owner.username, self.name), data=data 721 ) 722 return Branch.parse_response(self.allspice_client, result)
Add a branch to the repository
724 def get_issues( 725 self, 726 state: Literal["open", "closed", "all"] = "all", 727 search_query: Optional[str] = None, 728 labels: Optional[List[str]] = None, 729 milestones: Optional[List[Union[Milestone, str]]] = None, 730 assignee: Optional[Union[User, str]] = None, 731 since: Optional[datetime] = None, 732 before: Optional[datetime] = None, 733 ) -> List["Issue"]: 734 """ 735 Get all Issues of this Repository (open and closed) 736 737 https://hub.allspice.io/api/swagger#/repository/repoListIssues 738 739 All params of this method are optional filters. If you don't specify a filter, it 740 will not be applied. 741 742 :param state: The state of the Issues to get. If None, all Issues are returned. 743 :param search_query: Filter issues by text. This is equivalent to searching for 744 `search_query` in the Issues on the web interface. 745 :param labels: Filter issues by labels. 746 :param milestones: Filter issues by milestones. 747 :param assignee: Filter issues by the assigned user. 748 :param since: Filter issues by the date they were created. 749 :param before: Filter issues by the date they were created. 750 :return: A list of Issues. 751 """ 752 753 data = { 754 "state": state, 755 } 756 if search_query: 757 data["q"] = search_query 758 if labels: 759 data["labels"] = ",".join(labels) 760 if milestones: 761 data["milestone"] = ",".join( 762 [ 763 milestone.name if isinstance(milestone, Milestone) else milestone 764 for milestone in milestones 765 ] 766 ) 767 if assignee: 768 if isinstance(assignee, User): 769 data["assignee"] = assignee.username 770 else: 771 data["assignee"] = assignee 772 if since: 773 data["since"] = Util.format_time(since) 774 if before: 775 data["before"] = Util.format_time(before) 776 777 results = self.allspice_client.requests_get_paginated( 778 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 779 params=data, 780 ) 781 782 issues = [] 783 for result in results: 784 issue = Issue.parse_response(self.allspice_client, result) 785 # See Issue.request 786 setattr(issue, "_repository", self) 787 # This is mostly for compatibility with an older implementation 788 Issue._add_read_property("repo", self, issue) 789 issues.append(issue) 790 791 return issues
Get all Issues of this Repository (open and closed)
allspice.allspice.io/api/swagger#/repository/repoListIssues">https://huballspice.allspice.io/api/swagger#/repository/repoListIssues
All params of this method are optional filters. If you don't specify a filter, it will not be applied.
Parameters
- state: The state of the Issues to get. If None, all Issues are returned.
- search_query: Filter issues by text. This is equivalent to searching for
search_query
in the Issues on the web interface. - labels: Filter issues by labels.
- milestones: Filter issues by milestones.
- assignee: Filter issues by the assigned user.
- since: Filter issues by the date they were created.
- before: Filter issues by the date they were created.
Returns
A list of Issues.
793 def get_design_reviews( 794 self, 795 state: Literal["open", "closed", "all"] = "all", 796 milestone: Optional[Union[Milestone, str]] = None, 797 labels: Optional[List[str]] = None, 798 ) -> List["DesignReview"]: 799 """ 800 Get all Design Reviews of this Repository. 801 802 https://hub.allspice.io/api/swagger#/repository/repoListPullRequests 803 804 :param state: The state of the Design Reviews to get. If None, all Design Reviews 805 are returned. 806 :param milestone: The milestone of the Design Reviews to get. 807 :param labels: A list of label IDs to filter DRs by. 808 :return: A list of Design Reviews. 809 """ 810 811 params = { 812 "state": state, 813 } 814 if milestone: 815 if isinstance(milestone, Milestone): 816 params["milestone"] = milestone.name 817 else: 818 params["milestone"] = milestone 819 if labels: 820 params["labels"] = ",".join(labels) 821 822 results = self.allspice_client.requests_get_paginated( 823 self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name), 824 params=params, 825 ) 826 return [DesignReview.parse_response(self.allspice_client, result) for result in results]
Get all Design Reviews of this Repository.
allspice.allspice.io/api/swagger#/repository/repoListPullRequests">https://huballspice.allspice.io/api/swagger#/repository/repoListPullRequests
Parameters
- state: The state of the Design Reviews to get. If None, all Design Reviews are returned.
- milestone: The milestone of the Design Reviews to get.
- labels: A list of label IDs to filter DRs by.
Returns
A list of Design Reviews.
828 def get_commits( 829 self, 830 sha: Optional[str] = None, 831 path: Optional[str] = None, 832 stat: bool = True, 833 ) -> List["Commit"]: 834 """ 835 Get all the Commits of this Repository. 836 837 https://hub.allspice.io/api/swagger#/repository/repoGetAllCommits 838 839 :param sha: The SHA of the commit to start listing commits from. 840 :param path: filepath of a file/dir. 841 :param stat: Include the number of additions and deletions in the response. 842 Disable for speedup. 843 :return: A list of Commits. 844 """ 845 846 data = {} 847 if sha: 848 data["sha"] = sha 849 if path: 850 data["path"] = path 851 if not stat: 852 data["stat"] = False 853 854 try: 855 results = self.allspice_client.requests_get_paginated( 856 Repository.REPO_COMMITS % (self.owner.username, self.name), 857 params=data, 858 ) 859 except ConflictException as err: 860 logging.warning(err) 861 logging.warning("Repository %s/%s is Empty" % (self.owner.username, self.name)) 862 results = [] 863 return [Commit.parse_response(self.allspice_client, result) for result in results]
Get all the Commits of this Repository.
allspice.allspice.io/api/swagger#/repository/repoGetAllCommits">https://huballspice.allspice.io/api/swagger#/repository/repoGetAllCommits
Parameters
- sha: The SHA of the commit to start listing commits from.
- path: filepath of a file/dir.
- stat: Include the number of additions and deletions in the response. Disable for speedup.
Returns
A list of Commits.
865 def get_issues_state(self, state) -> List["Issue"]: 866 """ 867 DEPRECATED: Use get_issues() instead. 868 869 Get issues of state Issue.open or Issue.closed of a repository. 870 """ 871 872 assert state in [Issue.OPENED, Issue.CLOSED] 873 issues = [] 874 data = {"state": state} 875 results = self.allspice_client.requests_get_paginated( 876 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 877 params=data, 878 ) 879 for result in results: 880 issue = Issue.parse_response(self.allspice_client, result) 881 # adding data not contained in the issue response 882 # See Issue.request() 883 setattr(issue, "_repository", self) 884 Issue._add_read_property("repo", self, issue) 885 Issue._add_read_property("owner", self.owner, issue) 886 issues.append(issue) 887 return issues
DEPRECATED: Use get_issues() instead.
Get issues of state Issue.open or Issue.closed of a repository.
895 def get_user_time(self, username) -> float: 896 if isinstance(username, User): 897 username = username.username 898 results = self.allspice_client.requests_get( 899 Repository.REPO_USER_TIME % (self.owner.username, self.name, username) 900 ) 901 time = sum(r["time"] for r in results) 902 return time
907 def create_issue(self, title, assignees=frozenset(), description="") -> ApiObject: 908 data = { 909 "assignees": assignees, 910 "body": description, 911 "closed": False, 912 "title": title, 913 } 914 result = self.allspice_client.requests_post( 915 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 916 data=data, 917 ) 918 919 issue = Issue.parse_response(self.allspice_client, result) 920 setattr(issue, "_repository", self) 921 Issue._add_read_property("repo", self, issue) 922 return issue
924 def create_design_review( 925 self, 926 title: str, 927 head: Union[Branch, str], 928 base: Union[Branch, str], 929 assignees: Optional[Set[Union[User, str]]] = None, 930 body: Optional[str] = None, 931 due_date: Optional[datetime] = None, 932 milestone: Optional["Milestone"] = None, 933 ) -> "DesignReview": 934 """ 935 Create a new Design Review. 936 937 See https://hub.allspice.io/api/swagger#/repository/repoCreatePullRequest 938 939 :param title: Title of the Design Review 940 :param head: Branch or name of the branch to merge into the base branch 941 :param base: Branch or name of the branch to merge into 942 :param assignees: Optional. A list of users to assign this review. List can be of 943 User objects or of usernames. 944 :param body: An Optional Description for the Design Review. 945 :param due_date: An Optional Due date for the Design Review. 946 :param milestone: An Optional Milestone for the Design Review 947 :return: The created Design Review 948 """ 949 950 data: dict[str, Any] = { 951 "title": title, 952 } 953 954 if isinstance(head, Branch): 955 data["head"] = head.name 956 else: 957 data["head"] = head 958 if isinstance(base, Branch): 959 data["base"] = base.name 960 else: 961 data["base"] = base 962 if assignees: 963 data["assignees"] = [a.username if isinstance(a, User) else a for a in assignees] 964 if body: 965 data["body"] = body 966 if due_date: 967 data["due_date"] = Util.format_time(due_date) 968 if milestone: 969 data["milestone"] = milestone.id 970 971 result = self.allspice_client.requests_post( 972 self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name), 973 data=data, 974 ) 975 976 return DesignReview.parse_response(self.allspice_client, result)
Create a new Design Review.
See allspice.allspice.io/api/swagger#/repository/repoCreatePullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoCreatePullRequest
Parameters
- title: Title of the Design Review
- head: Branch or name of the branch to merge into the base branch
- base: Branch or name of the branch to merge into
- assignees: Optional. A list of users to assign this review. List can be of User objects or of usernames.
- body: An Optional Description for the Design Review.
- due_date: An Optional Due date for the Design Review.
- milestone: An Optional Milestone for the Design Review
Returns
The created Design Review
978 def create_milestone( 979 self, 980 title: str, 981 description: str, 982 due_date: Optional[str] = None, 983 state: str = "open", 984 ) -> "Milestone": 985 url = Repository.REPO_MILESTONES.format(owner=self.owner.username, repo=self.name) 986 data = {"title": title, "description": description, "state": state} 987 if due_date: 988 data["due_date"] = due_date 989 result = self.allspice_client.requests_post(url, data=data) 990 return Milestone.parse_response(self.allspice_client, result)
992 def create_gitea_hook(self, hook_url: str, events: List[str]): 993 url = f"/repos/{self.owner.username}/{self.name}/hooks" 994 data = { 995 "type": "gitea", 996 "config": {"content_type": "json", "url": hook_url}, 997 "events": events, 998 "active": True, 999 } 1000 return self.allspice_client.requests_post(url, data=data)
1010 def is_collaborator(self, username) -> bool: 1011 if isinstance(username, User): 1012 username = username.username 1013 try: 1014 # returns 204 if its ok, 404 if its not 1015 self.allspice_client.requests_get( 1016 Repository.REPO_IS_COLLABORATOR % (self.owner.username, self.name, username) 1017 ) 1018 return True 1019 except Exception: 1020 return False
1022 def get_users_with_access(self) -> Sequence[User]: 1023 url = f"/repos/{self.owner.username}/{self.name}/collaborators" 1024 response = self.allspice_client.requests_get(url) 1025 collabs = [User.parse_response(self.allspice_client, user) for user in response] 1026 if isinstance(self.owner, User): 1027 return [*collabs, self.owner] 1028 else: 1029 # owner must be org 1030 teams = self.owner.get_teams() 1031 for team in teams: 1032 team_repos = team.get_repos() 1033 if self.name in [n.name for n in team_repos]: 1034 collabs += team.get_members() 1035 return collabs
1041 def transfer_ownership( 1042 self, 1043 new_owner: Union[User, Organization], 1044 new_teams: Set[Team] | FrozenSet[Team] = frozenset(), 1045 ): 1046 url = Repository.REPO_TRANSFER.format(owner=self.owner.username, repo=self.name) 1047 data: dict[str, Any] = {"new_owner": new_owner.username} 1048 if isinstance(new_owner, Organization): 1049 new_team_ids = [team.id for team in new_teams if team in new_owner.get_teams()] 1050 data["team_ids"] = new_team_ids 1051 self.allspice_client.requests_post(url, data=data) 1052 # TODO: make sure this instance is either updated or discarded
1054 def get_git_content( 1055 self, 1056 ref: Optional["Ref"] = None, 1057 commit: "Optional[Commit]" = None, 1058 ) -> List[Content]: 1059 """ 1060 Get the metadata for all files in the root directory. 1061 1062 https://hub.allspice.io/api/swagger#/repository/repoGetContentsList 1063 1064 :param ref: branch or commit to get content from 1065 :param commit: commit to get content from (deprecated) 1066 """ 1067 url = f"/repos/{self.owner.username}/{self.name}/contents" 1068 data = Util.data_params_for_ref(ref or commit) 1069 1070 result = [ 1071 Content.parse_response(self.allspice_client, f) 1072 for f in self.allspice_client.requests_get(url, data) 1073 ] 1074 return result
Get the metadata for all files in the root directory.
allspice.allspice.io/api/swagger#/repository/repoGetContentsList">https://huballspice.allspice.io/api/swagger#/repository/repoGetContentsList
Parameters
- ref: branch or commit to get content from
- commit: commit to get content from (deprecated)
1076 def get_tree(self, ref: Optional[Ref] = None, recursive: bool = False) -> List[GitEntry]: 1077 """ 1078 Get the repository's tree on a given ref. 1079 1080 By default, this will only return the top-level entries in the tree. If you want 1081 to get the entire tree, set `recursive` to True. 1082 1083 :param ref: The ref to get the tree from. If not provided, the default branch is used. 1084 :param recursive: Whether to get the entire tree or just the top-level entries. 1085 """ 1086 1087 ref = Util.data_params_for_ref(ref).get("ref", self.default_branch) 1088 url = self.REPO_GET_TREE.format(owner=self.owner.username, repo=self.name, ref=ref) 1089 params = {"recursive": recursive} 1090 results = self.allspice_client.requests_get_paginated(url, params=params) 1091 return [GitEntry.parse_response(self.allspice_client, result) for result in results]
Get the repository's tree on a given ref.
By default, this will only return the top-level entries in the tree. If you want
to get the entire tree, set recursive
to True.
Parameters
- ref: The ref to get the tree from. If not provided, the default branch is used.
- recursive: Whether to get the entire tree or just the top-level entries.
1093 def get_file_content( 1094 self, 1095 content: Content, 1096 ref: Optional[Ref] = None, 1097 commit: Optional[Commit] = None, 1098 ) -> Union[str, List["Content"]]: 1099 """https://hub.allspice.io/api/swagger#/repository/repoGetContents""" 1100 url = f"/repos/{self.owner.username}/{self.name}/contents/{content.path}" 1101 data = Util.data_params_for_ref(ref or commit) 1102 1103 if content.type == Content.FILE: 1104 return self.allspice_client.requests_get(url, data)["content"] 1105 else: 1106 return [ 1107 Content.parse_response(self.allspice_client, f) 1108 for f in self.allspice_client.requests_get(url, data) 1109 ]
allspice.allspice.io/api/swagger#/repository/repoGetContents">https://huballspice.allspice.io/api/swagger#/repository/repoGetContents
1111 def get_raw_file( 1112 self, 1113 file_path: str, 1114 ref: Optional[Ref] = None, 1115 ) -> bytes: 1116 """ 1117 Get the raw, binary data of a single file. 1118 1119 Note 1: if the file you are requesting is a text file, you might want to 1120 use .decode() on the result to get a string. For example: 1121 1122 content = repo.get_raw_file("file.txt").decode("utf-8") 1123 1124 Note 2: this method will store the entire file in memory. If you want 1125 to download a large file, you might want to use `download_to_file` 1126 instead. 1127 1128 See https://hub.allspice.io/api/swagger#/repository/repoGetRawFile 1129 1130 :param file_path: The path to the file to get. 1131 :param ref: The branch or commit to get the file from. If not provided, 1132 the default branch is used. 1133 """ 1134 1135 url = self.REPO_GET_RAW_FILE.format( 1136 owner=self.owner.username, 1137 repo=self.name, 1138 path=file_path, 1139 ) 1140 params = Util.data_params_for_ref(ref) 1141 return self.allspice_client.requests_get_raw(url, params=params)
Get the raw, binary data of a single file.
Note 1: if the file you are requesting is a text file, you might want to use .decode() on the result to get a string. For example:
content = repo.get_raw_file("file.txt").decode("utf-8")
Note 2: this method will store the entire file in memory. If you want
to download a large file, you might want to use download_to_file
instead.
See allspice.allspice.io/api/swagger#/repository/repoGetRawFile">https://huballspice.allspice.io/api/swagger#/repository/repoGetRawFile
Parameters
- file_path: The path to the file to get.
- ref: The branch or commit to get the file from. If not provided, the default branch is used.
1143 def download_to_file( 1144 self, 1145 file_path: str, 1146 io: IO, 1147 ref: Optional[Ref] = None, 1148 ) -> None: 1149 """ 1150 Download the binary data of a file to a file-like object. 1151 1152 Example: 1153 1154 with open("schematic.DSN", "wb") as f: 1155 Repository.download_to_file("Schematics/my_schematic.DSN", f) 1156 1157 :param file_path: The path to the file in the repository from the root 1158 of the repository. 1159 :param io: The file-like object to write the data to. 1160 """ 1161 1162 url = self.allspice_client._AllSpice__get_url( 1163 self.REPO_GET_RAW_FILE.format( 1164 owner=self.owner.username, 1165 repo=self.name, 1166 path=file_path, 1167 ) 1168 ) 1169 params = Util.data_params_for_ref(ref) 1170 response = self.allspice_client.requests.get( 1171 url, 1172 params=params, 1173 headers=self.allspice_client.headers, 1174 stream=True, 1175 ) 1176 1177 for chunk in response.iter_content(chunk_size=4096): 1178 if chunk: 1179 io.write(chunk)
Download the binary data of a file to a file-like object.
Example:
with open("schematic.DSN", "wb") as f:
Repository.download_to_file("Schematics/my_schematic.DSN", f)
Parameters
- file_path: The path to the file in the repository from the root of the repository.
- io: The file-like object to write the data to.
1181 def get_generated_json(self, content: Union[Content, str], ref: Optional[Ref] = None) -> dict: 1182 """ 1183 Get the json blob for a cad file if it exists, otherwise enqueue 1184 a new job and return a 503 status. 1185 1186 WARNING: This is still experimental and not recommended for critical 1187 applications. The structure and content of the returned dictionary can 1188 change at any time. 1189 1190 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON 1191 """ 1192 1193 if isinstance(content, Content): 1194 content = content.path 1195 1196 url = self.REPO_GET_ALLSPICE_JSON.format( 1197 owner=self.owner.username, 1198 repo=self.name, 1199 content=content, 1200 ) 1201 data = Util.data_params_for_ref(ref) 1202 return self.allspice_client.requests_get(url, data)
Get the json blob for a cad file if it exists, otherwise enqueue a new job and return a 503 status.
WARNING: This is still experimental and not recommended for critical applications. The structure and content of the returned dictionary can change at any time.
See allspice.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON">https://huballspice.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON
1204 def get_generated_svg(self, content: Union[Content, str], ref: Optional[Ref] = None) -> bytes: 1205 """ 1206 Get the svg blob for a cad file if it exists, otherwise enqueue 1207 a new job and return a 503 status. 1208 1209 WARNING: This is still experimental and not yet recommended for 1210 critical applications. The content of the returned svg can change 1211 at any time. 1212 1213 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG 1214 """ 1215 1216 if isinstance(content, Content): 1217 content = content.path 1218 1219 url = self.REPO_GET_ALLSPICE_SVG.format( 1220 owner=self.owner.username, 1221 repo=self.name, 1222 content=content, 1223 ) 1224 data = Util.data_params_for_ref(ref) 1225 return self.allspice_client.requests_get_raw(url, data)
Get the svg blob for a cad file if it exists, otherwise enqueue a new job and return a 503 status.
WARNING: This is still experimental and not yet recommended for critical applications. The content of the returned svg can change at any time.
See allspice.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG">https://huballspice.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG
1227 def create_file(self, file_path: str, content: str, data: Optional[dict] = None): 1228 """https://hub.allspice.io/api/swagger#/repository/repoCreateFile""" 1229 if not data: 1230 data = {} 1231 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1232 data.update({"content": content}) 1233 return self.allspice_client.requests_post(url, data)
allspice.allspice.io/api/swagger#/repository/repoCreateFile">https://huballspice.allspice.io/api/swagger#/repository/repoCreateFile
1235 def change_file(self, file_path: str, file_sha: str, content: str, data: Optional[dict] = None): 1236 """https://hub.allspice.io/api/swagger#/repository/repoCreateFile""" 1237 if not data: 1238 data = {} 1239 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1240 data.update({"sha": file_sha, "content": content}) 1241 return self.allspice_client.requests_put(url, data)
allspice.allspice.io/api/swagger#/repository/repoCreateFile">https://huballspice.allspice.io/api/swagger#/repository/repoCreateFile
1243 def delete_file(self, file_path: str, file_sha: str, data: Optional[dict] = None): 1244 """https://hub.allspice.io/api/swagger#/repository/repoDeleteFile""" 1245 if not data: 1246 data = {} 1247 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1248 data.update({"sha": file_sha}) 1249 return self.allspice_client.requests_delete(url, data)
allspice.allspice.io/api/swagger#/repository/repoDeleteFile">https://huballspice.allspice.io/api/swagger#/repository/repoDeleteFile
1251 def get_archive( 1252 self, 1253 ref: Ref = "main", 1254 archive_format: ArchiveFormat = ArchiveFormat.ZIP, 1255 ) -> bytes: 1256 """ 1257 Download all the files in a specific ref of a repository as a zip or tarball 1258 archive. 1259 1260 https://hub.allspice.io/api/swagger#/repository/repoGetArchive 1261 1262 :param ref: branch or commit to get content from, defaults to the "main" branch 1263 :param archive_format: zip or tar, defaults to zip 1264 """ 1265 1266 ref_string = Util.data_params_for_ref(ref)["ref"] 1267 url = self.REPO_GET_ARCHIVE.format( 1268 owner=self.owner.username, 1269 repo=self.name, 1270 ref=ref_string, 1271 format=archive_format.value, 1272 ) 1273 return self.allspice_client.requests_get_raw(url)
Download all the files in a specific ref of a repository as a zip or tarball archive.
allspice.allspice.io/api/swagger#/repository/repoGetArchive">https://huballspice.allspice.io/api/swagger#/repository/repoGetArchive
Parameters
- ref: branch or commit to get content from, defaults to the "main" branch
- archive_format: zip or tar, defaults to zip
1275 def get_topics(self) -> list[str]: 1276 """ 1277 Gets the list of topics on this repository. 1278 1279 See http://localhost:3000/api/swagger#/repository/repoListTopics 1280 """ 1281 1282 url = self.REPO_GET_TOPICS.format( 1283 owner=self.owner.username, 1284 repo=self.name, 1285 ) 1286 return self.allspice_client.requests_get(url)["topics"]
Gets the list of topics on this repository.
See http://localhost:3000/api/swagger#/repository/repoListTopics
1288 def add_topic(self, topic: str): 1289 """ 1290 Adds a topic to the repository. 1291 1292 See https://hub.allspice.io/api/swagger#/repository/repoAddTopic 1293 1294 :param topic: The topic to add. Topic names must consist only of 1295 lowercase letters, numnbers and dashes (-), and cannot start with 1296 dashes. Topic names also must be under 35 characters long. 1297 """ 1298 1299 url = self.REPO_ADD_TOPIC.format(owner=self.owner.username, repo=self.name, topic=topic) 1300 self.allspice_client.requests_put(url)
Adds a topic to the repository.
See allspice.allspice.io/api/swagger#/repository/repoAddTopic">https://huballspice.allspice.io/api/swagger#/repository/repoAddTopic
Parameters
- topic: The topic to add. Topic names must consist only of lowercase letters, numnbers and dashes (-), and cannot start with dashes. Topic names also must be under 35 characters long.
1302 def create_release( 1303 self, 1304 tag_name: str, 1305 name: Optional[str] = None, 1306 body: Optional[str] = None, 1307 draft: bool = False, 1308 ): 1309 """ 1310 Create a release for this repository. The release will be created for 1311 the tag with the given name. If there is no tag with this name, create 1312 the tag first. 1313 1314 See https://hub.allspice.io/api/swagger#/repository/repoCreateRelease 1315 """ 1316 1317 url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name) 1318 data = { 1319 "tag_name": tag_name, 1320 "draft": draft, 1321 } 1322 if name is not None: 1323 data["name"] = name 1324 if body is not None: 1325 data["body"] = body 1326 response = self.allspice_client.requests_post(url, data) 1327 return Release.parse_response(self.allspice_client, response, self)
Create a release for this repository. The release will be created for the tag with the given name. If there is no tag with this name, create the tag first.
See allspice.allspice.io/api/swagger#/repository/repoCreateRelease">https://huballspice.allspice.io/api/swagger#/repository/repoCreateRelease
1329 def get_releases( 1330 self, draft: Optional[bool] = None, pre_release: Optional[bool] = None 1331 ) -> List[Release]: 1332 """ 1333 Get the list of releases for this repository. 1334 1335 See https://hub.allspice.io/api/swagger#/repository/repoListReleases 1336 """ 1337 1338 data = {} 1339 1340 if draft is not None: 1341 data["draft"] = draft 1342 if pre_release is not None: 1343 data["pre-release"] = pre_release 1344 1345 url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name) 1346 responses = self.allspice_client.requests_get_paginated(url, params=data) 1347 1348 return [ 1349 Release.parse_response(self.allspice_client, response, self) for response in responses 1350 ]
Get the list of releases for this repository.
See allspice.allspice.io/api/swagger#/repository/repoListReleases">https://huballspice.allspice.io/api/swagger#/repository/repoListReleases
1352 def get_latest_release(self) -> Release: 1353 """ 1354 Get the latest release for this repository. 1355 1356 See https://hub.allspice.io/api/swagger#/repository/repoGetLatestRelease 1357 """ 1358 1359 url = self.REPO_GET_LATEST_RELEASE.format(owner=self.owner.username, repo=self.name) 1360 response = self.allspice_client.requests_get(url) 1361 release = Release.parse_response(self.allspice_client, response, self) 1362 return release
Get the latest release for this repository.
See allspice.allspice.io/api/swagger#/repository/repoGetLatestRelease">https://huballspice.allspice.io/api/swagger#/repository/repoGetLatestRelease
1364 def get_release_by_tag(self, tag: str) -> Release: 1365 """ 1366 Get a release by its tag. 1367 1368 See https://hub.allspice.io/api/swagger#/repository/repoGetReleaseByTag 1369 """ 1370 1371 url = self.REPO_GET_RELEASE_BY_TAG.format( 1372 owner=self.owner.username, repo=self.name, tag=tag 1373 ) 1374 response = self.allspice_client.requests_get(url) 1375 release = Release.parse_response(self.allspice_client, response, self) 1376 return release
Get a release by its tag.
See allspice.allspice.io/api/swagger#/repository/repoGetReleaseByTag">https://huballspice.allspice.io/api/swagger#/repository/repoGetReleaseByTag
1378 def get_commit_statuses( 1379 self, 1380 commit: Union[str, Commit], 1381 sort: Optional[CommitStatusSort] = None, 1382 state: Optional[CommitStatusState] = None, 1383 ) -> List[CommitStatus]: 1384 """ 1385 Get a list of statuses for a commit. 1386 1387 This is roughly equivalent to the Commit.get_statuses method, but this 1388 method allows you to sort and filter commits and is more convenient if 1389 you have a commit SHA and don't need to get the commit itself. 1390 1391 See https://hub.allspice.io/api/swagger#/repository/repoListStatuses 1392 """ 1393 1394 if isinstance(commit, Commit): 1395 commit = commit.sha 1396 1397 params = {} 1398 if sort is not None: 1399 params["sort"] = sort.value 1400 if state is not None: 1401 params["state"] = state.value 1402 1403 url = self.REPO_GET_COMMIT_STATUS.format( 1404 owner=self.owner.username, repo=self.name, sha=commit 1405 ) 1406 response = self.allspice_client.requests_get_paginated(url, params=params) 1407 return [CommitStatus.parse_response(self.allspice_client, status) for status in response]
Get a list of statuses for a commit.
This is roughly equivalent to the Commit.get_statuses method, but this method allows you to sort and filter commits and is more convenient if you have a commit SHA and don't need to get the commit itself.
See allspice.allspice.io/api/swagger#/repository/repoListStatuses">https://huballspice.allspice.io/api/swagger#/repository/repoListStatuses
1409 def create_commit_status( 1410 self, 1411 commit: Union[str, Commit], 1412 context: Optional[str] = None, 1413 description: Optional[str] = None, 1414 state: Optional[CommitStatusState] = None, 1415 target_url: Optional[str] = None, 1416 ) -> CommitStatus: 1417 """ 1418 Create a status on a commit. 1419 1420 See https://hub.allspice.io/api/swagger#/repository/repoCreateStatus 1421 """ 1422 1423 if isinstance(commit, Commit): 1424 commit = commit.sha 1425 1426 data = {} 1427 if context is not None: 1428 data["context"] = context 1429 if description is not None: 1430 data["description"] = description 1431 if state is not None: 1432 data["state"] = state.value 1433 if target_url is not None: 1434 data["target_url"] = target_url 1435 1436 url = self.REPO_GET_COMMIT_STATUS.format( 1437 owner=self.owner.username, repo=self.name, sha=commit 1438 ) 1439 response = self.allspice_client.requests_post(url, data=data) 1440 return CommitStatus.parse_response(self.allspice_client, response)
Create a status on a commit.
See allspice.allspice.io/api/swagger#/repository/repoCreateStatus">https://huballspice.allspice.io/api/swagger#/repository/repoCreateStatus
567 class ArchiveFormat(Enum): 568 """ 569 Archive formats for Repository.get_archive 570 """ 571 572 TAR = "tar.gz" 573 ZIP = "zip"
Archive formats for Repository.get_archive
575 class CommitStatusSort(Enum): 576 """ 577 Sort order for Repository.get_commit_status 578 """ 579 580 OLDEST = "oldest" 581 RECENT_UPDATE = "recentupdate" 582 LEAST_UPDATE = "leastupdate" 583 LEAST_INDEX = "leastindex" 584 HIGHEST_INDEX = "highestindex"
Sort order for Repository.get_commit_status
1449class Milestone(ApiObject): 1450 allow_merge_commits: Any 1451 allow_rebase: Any 1452 allow_rebase_explicit: Any 1453 allow_squash_merge: Any 1454 archived: Any 1455 closed_at: Any 1456 closed_issues: int 1457 created_at: str 1458 default_branch: Any 1459 description: str 1460 due_on: Any 1461 has_issues: Any 1462 has_pull_requests: Any 1463 has_wiki: Any 1464 id: int 1465 ignore_whitespace_conflicts: Any 1466 name: Any 1467 open_issues: int 1468 private: Any 1469 state: str 1470 title: str 1471 updated_at: str 1472 website: Any 1473 1474 API_OBJECT = """/repos/{owner}/{repo}/milestones/{number}""" # <owner, repo> 1475 1476 def __init__(self, allspice_client): 1477 super().__init__(allspice_client) 1478 1479 def __eq__(self, other): 1480 if not isinstance(other, Milestone): 1481 return False 1482 return self.allspice_client == other.allspice_client and self.id == other.id 1483 1484 def __hash__(self): 1485 return hash(self.allspice_client) ^ hash(self.id) 1486 1487 _fields_to_parsers: ClassVar[dict] = { 1488 "closed_at": lambda _, t: Util.convert_time(t), 1489 "due_on": lambda _, t: Util.convert_time(t), 1490 } 1491 1492 _patchable_fields: ClassVar[set[str]] = { 1493 "allow_merge_commits", 1494 "allow_rebase", 1495 "allow_rebase_explicit", 1496 "allow_squash_merge", 1497 "archived", 1498 "default_branch", 1499 "description", 1500 "has_issues", 1501 "has_pull_requests", 1502 "has_wiki", 1503 "ignore_whitespace_conflicts", 1504 "name", 1505 "private", 1506 "website", 1507 } 1508 1509 @classmethod 1510 def request(cls, allspice_client, owner: str, repo: str, number: str): 1511 return cls._request(allspice_client, {"owner": owner, "repo": repo, "number": number})
1514class Attachment(ReadonlyApiObject): 1515 """ 1516 An asset attached to a comment. 1517 1518 You cannot edit or delete the attachment from this object - see the instance methods 1519 Comment.edit_attachment and delete_attachment for that. 1520 """ 1521 1522 browser_download_url: str 1523 created_at: str 1524 download_count: int 1525 id: int 1526 name: str 1527 size: int 1528 uuid: str 1529 1530 def __init__(self, allspice_client): 1531 super().__init__(allspice_client) 1532 1533 def __eq__(self, other): 1534 if not isinstance(other, Attachment): 1535 return False 1536 1537 return self.uuid == other.uuid 1538 1539 def __hash__(self): 1540 return hash(self.uuid) 1541 1542 def download_to_file(self, io: IO): 1543 """ 1544 Download the raw, binary data of this Attachment to a file-like object. 1545 1546 Example: 1547 1548 with open("my_file.zip", "wb") as f: 1549 attachment.download_to_file(f) 1550 1551 :param io: The file-like object to write the data to. 1552 """ 1553 1554 response = self.allspice_client.requests.get( 1555 self.browser_download_url, 1556 headers=self.allspice_client.headers, 1557 stream=True, 1558 ) 1559 # 4kb chunks 1560 for chunk in response.iter_content(chunk_size=4096): 1561 if chunk: 1562 io.write(chunk)
An asset attached to a comment.
You cannot edit or delete the attachment from this object - see the instance methods Comment.edit_attachment and delete_attachment for that.
1542 def download_to_file(self, io: IO): 1543 """ 1544 Download the raw, binary data of this Attachment to a file-like object. 1545 1546 Example: 1547 1548 with open("my_file.zip", "wb") as f: 1549 attachment.download_to_file(f) 1550 1551 :param io: The file-like object to write the data to. 1552 """ 1553 1554 response = self.allspice_client.requests.get( 1555 self.browser_download_url, 1556 headers=self.allspice_client.headers, 1557 stream=True, 1558 ) 1559 # 4kb chunks 1560 for chunk in response.iter_content(chunk_size=4096): 1561 if chunk: 1562 io.write(chunk)
Download the raw, binary data of this Attachment to a file-like object.
Example:
with open("my_file.zip", "wb") as f:
attachment.download_to_file(f)
Parameters
- io: The file-like object to write the data to.
Inherited Members
1565class Comment(ApiObject): 1566 assets: List[Union[Any, Dict[str, Union[int, str]]]] 1567 body: str 1568 created_at: datetime 1569 html_url: str 1570 id: int 1571 issue_url: str 1572 original_author: str 1573 original_author_id: int 1574 pull_request_url: str 1575 updated_at: datetime 1576 user: User 1577 1578 API_OBJECT = """/repos/{owner}/{repo}/issues/comments/{id}""" 1579 GET_ATTACHMENTS_PATH = """/repos/{owner}/{repo}/issues/comments/{id}/assets""" 1580 ATTACHMENT_PATH = """/repos/{owner}/{repo}/issues/comments/{id}/assets/{attachment_id}""" 1581 1582 def __init__(self, allspice_client): 1583 super().__init__(allspice_client) 1584 1585 def __eq__(self, other): 1586 if not isinstance(other, Comment): 1587 return False 1588 return self.repository == other.repository and self.id == other.id 1589 1590 def __hash__(self): 1591 return hash(self.repository) ^ hash(self.id) 1592 1593 @classmethod 1594 def request(cls, allspice_client, owner: str, repo: str, id: str) -> "Comment": 1595 return cls._request(allspice_client, {"owner": owner, "repo": repo, "id": id}) 1596 1597 _fields_to_parsers: ClassVar[dict] = { 1598 "user": lambda allspice_client, r: User.parse_response(allspice_client, r), 1599 "created_at": lambda _, t: Util.convert_time(t), 1600 "updated_at": lambda _, t: Util.convert_time(t), 1601 } 1602 1603 _patchable_fields: ClassVar[set[str]] = {"body"} 1604 1605 @property 1606 def parent_url(self) -> str: 1607 """URL of the parent of this comment (the issue or the pull request)""" 1608 1609 if self.issue_url is not None and self.issue_url != "": 1610 return self.issue_url 1611 else: 1612 return self.pull_request_url 1613 1614 @cached_property 1615 def repository(self) -> Repository: 1616 """The repository this comment was posted on.""" 1617 1618 owner_name, repo_name = self.parent_url.split("/")[-4:-2] 1619 return Repository.request(self.allspice_client, owner_name, repo_name) 1620 1621 def __fields_for_path(self): 1622 return { 1623 "owner": self.repository.owner.username, 1624 "repo": self.repository.name, 1625 "id": self.id, 1626 } 1627 1628 def commit(self): 1629 self._commit(self.__fields_for_path()) 1630 1631 def delete(self): 1632 self.allspice_client.requests_delete(self.API_OBJECT.format(**self.__fields_for_path())) 1633 self.deleted = True 1634 1635 def get_attachments(self) -> List[Attachment]: 1636 """ 1637 Get all attachments on this comment. This returns Attachment objects, which 1638 contain a link to download the attachment. 1639 1640 https://hub.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments 1641 """ 1642 1643 results = self.allspice_client.requests_get( 1644 self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()) 1645 ) 1646 return [Attachment.parse_response(self.allspice_client, result) for result in results] 1647 1648 def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment: 1649 """ 1650 Create an attachment on this comment. 1651 1652 https://hub.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment 1653 1654 :param file: The file to attach. This should be a file-like object. 1655 :param name: The name of the file. If not provided, the name of the file will be 1656 used. 1657 :return: The created attachment. 1658 """ 1659 1660 args: dict[str, Any] = { 1661 "files": {"attachment": file}, 1662 } 1663 if name is not None: 1664 args["params"] = {"name": name} 1665 1666 result = self.allspice_client.requests_post( 1667 self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()), 1668 **args, 1669 ) 1670 return Attachment.parse_response(self.allspice_client, result) 1671 1672 def edit_attachment(self, attachment: Attachment, data: dict) -> Attachment: 1673 """ 1674 Edit an attachment. 1675 1676 The list of params that can be edited is available at 1677 https://hub.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment 1678 1679 :param attachment: The attachment to be edited 1680 :param data: The data parameter should be a dictionary of the fields to edit. 1681 :return: The edited attachment 1682 """ 1683 1684 args = { 1685 **self.__fields_for_path(), 1686 "attachment_id": attachment.id, 1687 } 1688 result = self.allspice_client.requests_patch( 1689 self.ATTACHMENT_PATH.format(**args), 1690 data=data, 1691 ) 1692 return Attachment.parse_response(self.allspice_client, result) 1693 1694 def delete_attachment(self, attachment: Attachment): 1695 """https://hub.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment""" 1696 1697 args = { 1698 **self.__fields_for_path(), 1699 "attachment_id": attachment.id, 1700 } 1701 self.allspice_client.requests_delete(self.ATTACHMENT_PATH.format(**args)) 1702 attachment.deleted = True
1605 @property 1606 def parent_url(self) -> str: 1607 """URL of the parent of this comment (the issue or the pull request)""" 1608 1609 if self.issue_url is not None and self.issue_url != "": 1610 return self.issue_url 1611 else: 1612 return self.pull_request_url
URL of the parent of this comment (the issue or the pull request)
1614 @cached_property 1615 def repository(self) -> Repository: 1616 """The repository this comment was posted on.""" 1617 1618 owner_name, repo_name = self.parent_url.split("/")[-4:-2] 1619 return Repository.request(self.allspice_client, owner_name, repo_name)
The repository this comment was posted on.
1635 def get_attachments(self) -> List[Attachment]: 1636 """ 1637 Get all attachments on this comment. This returns Attachment objects, which 1638 contain a link to download the attachment. 1639 1640 https://hub.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments 1641 """ 1642 1643 results = self.allspice_client.requests_get( 1644 self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()) 1645 ) 1646 return [Attachment.parse_response(self.allspice_client, result) for result in results]
Get all attachments on this comment. This returns Attachment objects, which contain a link to download the attachment.
allspice.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments">https://huballspice.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments
1648 def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment: 1649 """ 1650 Create an attachment on this comment. 1651 1652 https://hub.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment 1653 1654 :param file: The file to attach. This should be a file-like object. 1655 :param name: The name of the file. If not provided, the name of the file will be 1656 used. 1657 :return: The created attachment. 1658 """ 1659 1660 args: dict[str, Any] = { 1661 "files": {"attachment": file}, 1662 } 1663 if name is not None: 1664 args["params"] = {"name": name} 1665 1666 result = self.allspice_client.requests_post( 1667 self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()), 1668 **args, 1669 ) 1670 return Attachment.parse_response(self.allspice_client, result)
Create an attachment on this comment.
allspice.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment
Parameters
- file: The file to attach. This should be a file-like object.
- name: The name of the file. If not provided, the name of the file will be used.
Returns
The created attachment.
1672 def edit_attachment(self, attachment: Attachment, data: dict) -> Attachment: 1673 """ 1674 Edit an attachment. 1675 1676 The list of params that can be edited is available at 1677 https://hub.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment 1678 1679 :param attachment: The attachment to be edited 1680 :param data: The data parameter should be a dictionary of the fields to edit. 1681 :return: The edited attachment 1682 """ 1683 1684 args = { 1685 **self.__fields_for_path(), 1686 "attachment_id": attachment.id, 1687 } 1688 result = self.allspice_client.requests_patch( 1689 self.ATTACHMENT_PATH.format(**args), 1690 data=data, 1691 ) 1692 return Attachment.parse_response(self.allspice_client, result)
Edit an attachment.
The list of params that can be edited is available at allspice.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment
Parameters
- attachment: The attachment to be edited
- data: The data parameter should be a dictionary of the fields to edit.
Returns
The edited attachment
1694 def delete_attachment(self, attachment: Attachment): 1695 """https://hub.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment""" 1696 1697 args = { 1698 **self.__fields_for_path(), 1699 "attachment_id": attachment.id, 1700 } 1701 self.allspice_client.requests_delete(self.ATTACHMENT_PATH.format(**args)) 1702 attachment.deleted = True
allspice.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment
1705class Commit(ReadonlyApiObject): 1706 author: User 1707 commit: Dict[str, Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]] 1708 committer: Dict[str, Union[int, str, bool]] 1709 created: str 1710 files: List[Dict[str, str]] 1711 html_url: str 1712 inner_commit: Dict[str, Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]] 1713 parents: List[Union[Dict[str, str], Any]] 1714 sha: str 1715 stats: Dict[str, int] 1716 url: str 1717 1718 API_OBJECT = """/repos/{owner}/{repo}/commits/{sha}""" 1719 COMMIT_GET_STATUS = """/repos/{owner}/{repo}/commits/{sha}/status""" 1720 COMMIT_GET_STATUSES = """/repos/{owner}/{repo}/commits/{sha}/statuses""" 1721 1722 # Regex to extract owner and repo names from the url property 1723 URL_REGEXP = re.compile(r"/repos/([^/]+)/([^/]+)/git/commits") 1724 1725 def __init__(self, allspice_client): 1726 super().__init__(allspice_client) 1727 1728 _fields_to_parsers: ClassVar[dict] = { 1729 # NOTE: api may return None for commiters that are no allspice users 1730 "author": lambda allspice_client, u: ( 1731 User.parse_response(allspice_client, u) if u else None 1732 ) 1733 } 1734 1735 def __eq__(self, other): 1736 if not isinstance(other, Commit): 1737 return False 1738 return self.sha == other.sha 1739 1740 def __hash__(self): 1741 return hash(self.sha) 1742 1743 @classmethod 1744 def parse_response(cls, allspice_client, result) -> "Commit": 1745 commit_cache = result["commit"] 1746 api_object = cls(allspice_client) 1747 cls._initialize(allspice_client, api_object, result) 1748 # inner_commit for legacy reasons 1749 Commit._add_read_property("inner_commit", commit_cache, api_object) 1750 return api_object 1751 1752 def get_status(self) -> CommitCombinedStatus: 1753 """ 1754 Get a combined status consisting of all statues on this commit. 1755 1756 Note that the returned object is a CommitCombinedStatus object, which 1757 also contains a list of all statuses on the commit. 1758 1759 https://hub.allspice.io/api/swagger#/repository/repoGetCommitStatus 1760 """ 1761 1762 result = self.allspice_client.requests_get( 1763 self.COMMIT_GET_STATUS.format(**self._fields_for_path) 1764 ) 1765 return CommitCombinedStatus.parse_response(self.allspice_client, result) 1766 1767 def get_statuses(self) -> List[CommitStatus]: 1768 """ 1769 Get a list of all statuses on this commit. 1770 1771 https://hub.allspice.io/api/swagger#/repository/repoListCommitStatuses 1772 """ 1773 1774 results = self.allspice_client.requests_get( 1775 self.COMMIT_GET_STATUSES.format(**self._fields_for_path) 1776 ) 1777 return [CommitStatus.parse_response(self.allspice_client, result) for result in results] 1778 1779 @cached_property 1780 def _fields_for_path(self) -> dict[str, str]: 1781 matches = self.URL_REGEXP.search(self.url) 1782 if not matches: 1783 raise ValueError(f"Invalid commit URL: {self.url}") 1784 1785 return { 1786 "owner": matches.group(1), 1787 "repo": matches.group(2), 1788 "sha": self.sha, 1789 }
1743 @classmethod 1744 def parse_response(cls, allspice_client, result) -> "Commit": 1745 commit_cache = result["commit"] 1746 api_object = cls(allspice_client) 1747 cls._initialize(allspice_client, api_object, result) 1748 # inner_commit for legacy reasons 1749 Commit._add_read_property("inner_commit", commit_cache, api_object) 1750 return api_object
1752 def get_status(self) -> CommitCombinedStatus: 1753 """ 1754 Get a combined status consisting of all statues on this commit. 1755 1756 Note that the returned object is a CommitCombinedStatus object, which 1757 also contains a list of all statuses on the commit. 1758 1759 https://hub.allspice.io/api/swagger#/repository/repoGetCommitStatus 1760 """ 1761 1762 result = self.allspice_client.requests_get( 1763 self.COMMIT_GET_STATUS.format(**self._fields_for_path) 1764 ) 1765 return CommitCombinedStatus.parse_response(self.allspice_client, result)
Get a combined status consisting of all statues on this commit.
Note that the returned object is a CommitCombinedStatus object, which also contains a list of all statuses on the commit.
allspice.allspice.io/api/swagger#/repository/repoGetCommitStatus">https://huballspice.allspice.io/api/swagger#/repository/repoGetCommitStatus
1767 def get_statuses(self) -> List[CommitStatus]: 1768 """ 1769 Get a list of all statuses on this commit. 1770 1771 https://hub.allspice.io/api/swagger#/repository/repoListCommitStatuses 1772 """ 1773 1774 results = self.allspice_client.requests_get( 1775 self.COMMIT_GET_STATUSES.format(**self._fields_for_path) 1776 ) 1777 return [CommitStatus.parse_response(self.allspice_client, result) for result in results]
Get a list of all statuses on this commit.
allspice.allspice.io/api/swagger#/repository/repoListCommitStatuses">https://huballspice.allspice.io/api/swagger#/repository/repoListCommitStatuses
Inherited Members
1792class CommitStatusState(Enum): 1793 PENDING = "pending" 1794 SUCCESS = "success" 1795 ERROR = "error" 1796 FAILURE = "failure" 1797 WARNING = "warning" 1798 1799 @classmethod 1800 def try_init(cls, value: str) -> CommitStatusState | str: 1801 """ 1802 Try converting a string to the enum, and if that fails, return the 1803 string itself. 1804 """ 1805 1806 try: 1807 return cls(value) 1808 except ValueError: 1809 return value
1799 @classmethod 1800 def try_init(cls, value: str) -> CommitStatusState | str: 1801 """ 1802 Try converting a string to the enum, and if that fails, return the 1803 string itself. 1804 """ 1805 1806 try: 1807 return cls(value) 1808 except ValueError: 1809 return value
Try converting a string to the enum, and if that fails, return the string itself.
1812class CommitStatus(ReadonlyApiObject): 1813 context: str 1814 created_at: str 1815 creator: User 1816 description: str 1817 id: int 1818 status: CommitStatusState 1819 target_url: str 1820 updated_at: str 1821 url: str 1822 1823 def __init__(self, allspice_client): 1824 super().__init__(allspice_client) 1825 1826 _fields_to_parsers: ClassVar[dict] = { 1827 # Gitea/ASH doesn't actually validate that the status is a "valid" 1828 # status, so we can expect empty or unknown strings in the status field. 1829 "status": lambda _, s: CommitStatusState.try_init(s), 1830 "creator": lambda allspice_client, u: ( 1831 User.parse_response(allspice_client, u) if u else None 1832 ), 1833 } 1834 1835 def __eq__(self, other): 1836 if not isinstance(other, CommitStatus): 1837 return False 1838 return self.id == other.id 1839 1840 def __hash__(self): 1841 return hash(self.id)
Inherited Members
1844class CommitCombinedStatus(ReadonlyApiObject): 1845 commit_url: str 1846 repository: Repository 1847 sha: str 1848 state: CommitStatusState 1849 statuses: List["CommitStatus"] 1850 total_count: int 1851 url: str 1852 1853 def __init__(self, allspice_client): 1854 super().__init__(allspice_client) 1855 1856 _fields_to_parsers: ClassVar[dict] = { 1857 # See CommitStatus 1858 "state": lambda _, s: CommitStatusState.try_init(s), 1859 "statuses": lambda allspice_client, statuses: [ 1860 CommitStatus.parse_response(allspice_client, status) for status in statuses 1861 ], 1862 "repository": lambda allspice_client, r: Repository.parse_response(allspice_client, r), 1863 } 1864 1865 def __eq__(self, other): 1866 if not isinstance(other, CommitCombinedStatus): 1867 return False 1868 return self.sha == other.sha 1869 1870 def __hash__(self): 1871 return hash(self.sha) 1872 1873 @classmethod 1874 def parse_response(cls, allspice_client, result) -> "CommitCombinedStatus": 1875 api_object = cls(allspice_client) 1876 cls._initialize(allspice_client, api_object, result) 1877 return api_object
Inherited Members
1880class Issue(ApiObject): 1881 assets: List[Any] 1882 assignee: Any 1883 assignees: Any 1884 body: str 1885 closed_at: Any 1886 comments: int 1887 created_at: str 1888 due_date: Any 1889 html_url: str 1890 id: int 1891 is_locked: bool 1892 labels: List[Any] 1893 milestone: Optional["Milestone"] 1894 number: int 1895 original_author: str 1896 original_author_id: int 1897 pin_order: int 1898 pull_request: Any 1899 ref: str 1900 repository: Dict[str, Union[int, str]] 1901 state: str 1902 title: str 1903 updated_at: str 1904 url: str 1905 user: User 1906 1907 API_OBJECT = """/repos/{owner}/{repo}/issues/{index}""" # <owner, repo, index> 1908 GET_TIME = """/repos/%s/%s/issues/%s/times""" # <owner, repo, index> 1909 GET_COMMENTS = """/repos/{owner}/{repo}/issues/{index}/comments""" 1910 CREATE_ISSUE = """/repos/{owner}/{repo}/issues""" 1911 1912 OPENED = "open" 1913 CLOSED = "closed" 1914 1915 def __init__(self, allspice_client): 1916 super().__init__(allspice_client) 1917 1918 def __eq__(self, other): 1919 if not isinstance(other, Issue): 1920 return False 1921 return self.repository == other.repository and self.id == other.id 1922 1923 def __hash__(self): 1924 return hash(self.repository) ^ hash(self.id) 1925 1926 _fields_to_parsers: ClassVar[dict] = { 1927 "milestone": lambda allspice_client, m: Milestone.parse_response(allspice_client, m), 1928 "user": lambda allspice_client, u: User.parse_response(allspice_client, u), 1929 "assignee": lambda allspice_client, u: User.parse_response(allspice_client, u), 1930 "assignees": lambda allspice_client, us: [ 1931 User.parse_response(allspice_client, u) for u in us 1932 ], 1933 "state": lambda _, s: (Issue.CLOSED if s == "closed" else Issue.OPENED), 1934 } 1935 1936 _parsers_to_fields: ClassVar[dict] = { 1937 "milestone": lambda m: m.id, 1938 } 1939 1940 _patchable_fields: ClassVar[set[str]] = { 1941 "assignee", 1942 "assignees", 1943 "body", 1944 "due_date", 1945 "milestone", 1946 "state", 1947 "title", 1948 } 1949 1950 def commit(self): 1951 args = { 1952 "owner": self.repository.owner.username, 1953 "repo": self.repository.name, 1954 "index": self.number, 1955 } 1956 self._commit(args) 1957 1958 @classmethod 1959 def request(cls, allspice_client, owner: str, repo: str, number: str): 1960 api_object = cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number}) 1961 # The repository in the response is a RepositoryMeta object, so request 1962 # the full repository object and add it to the issue object. 1963 repository = Repository.request(allspice_client, owner, repo) 1964 setattr(api_object, "_repository", repository) 1965 # For legacy reasons 1966 cls._add_read_property("repo", repository, api_object) 1967 return api_object 1968 1969 @classmethod 1970 def create_issue(cls, allspice_client, repo: Repository, title: str, body: str = ""): 1971 args = {"owner": repo.owner.username, "repo": repo.name} 1972 data = {"title": title, "body": body} 1973 result = allspice_client.requests_post(Issue.CREATE_ISSUE.format(**args), data=data) 1974 issue = Issue.parse_response(allspice_client, result) 1975 setattr(issue, "_repository", repo) 1976 cls._add_read_property("repo", repo, issue) 1977 return issue 1978 1979 @property 1980 def owner(self) -> Organization | User: 1981 return self.repository.owner 1982 1983 def get_time_sum(self, user: User) -> int: 1984 results = self.allspice_client.requests_get( 1985 Issue.GET_TIME % (self.owner.username, self.repository.name, self.number) 1986 ) 1987 return sum(result["time"] for result in results if result and result["user_id"] == user.id) 1988 1989 def get_times(self) -> Optional[Dict]: 1990 return self.allspice_client.requests_get( 1991 Issue.GET_TIME % (self.owner.username, self.repository.name, self.number) 1992 ) 1993 1994 def delete_time(self, time_id: str): 1995 path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times/{time_id}" 1996 self.allspice_client.requests_delete(path) 1997 1998 def add_time(self, time: int, created: Optional[str] = None, user_name: Optional[User] = None): 1999 path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times" 2000 self.allspice_client.requests_post( 2001 path, data={"created": created, "time": int(time), "user_name": user_name} 2002 ) 2003 2004 def get_comments(self) -> List[Comment]: 2005 """https://hub.allspice.io/api/swagger#/issue/issueGetComments""" 2006 2007 results = self.allspice_client.requests_get( 2008 self.GET_COMMENTS.format( 2009 owner=self.owner.username, repo=self.repository.name, index=self.number 2010 ) 2011 ) 2012 2013 return [Comment.parse_response(self.allspice_client, result) for result in results] 2014 2015 def create_comment(self, body: str) -> Comment: 2016 """https://hub.allspice.io/api/swagger#/issue/issueCreateComment""" 2017 2018 path = self.GET_COMMENTS.format( 2019 owner=self.owner.username, repo=self.repository.name, index=self.number 2020 ) 2021 2022 response = self.allspice_client.requests_post(path, data={"body": body}) 2023 return Comment.parse_response(self.allspice_client, response)
1958 @classmethod 1959 def request(cls, allspice_client, owner: str, repo: str, number: str): 1960 api_object = cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number}) 1961 # The repository in the response is a RepositoryMeta object, so request 1962 # the full repository object and add it to the issue object. 1963 repository = Repository.request(allspice_client, owner, repo) 1964 setattr(api_object, "_repository", repository) 1965 # For legacy reasons 1966 cls._add_read_property("repo", repository, api_object) 1967 return api_object
1969 @classmethod 1970 def create_issue(cls, allspice_client, repo: Repository, title: str, body: str = ""): 1971 args = {"owner": repo.owner.username, "repo": repo.name} 1972 data = {"title": title, "body": body} 1973 result = allspice_client.requests_post(Issue.CREATE_ISSUE.format(**args), data=data) 1974 issue = Issue.parse_response(allspice_client, result) 1975 setattr(issue, "_repository", repo) 1976 cls._add_read_property("repo", repo, issue) 1977 return issue
1998 def add_time(self, time: int, created: Optional[str] = None, user_name: Optional[User] = None): 1999 path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times" 2000 self.allspice_client.requests_post( 2001 path, data={"created": created, "time": int(time), "user_name": user_name} 2002 )
2004 def get_comments(self) -> List[Comment]: 2005 """https://hub.allspice.io/api/swagger#/issue/issueGetComments""" 2006 2007 results = self.allspice_client.requests_get( 2008 self.GET_COMMENTS.format( 2009 owner=self.owner.username, repo=self.repository.name, index=self.number 2010 ) 2011 ) 2012 2013 return [Comment.parse_response(self.allspice_client, result) for result in results]
allspice.allspice.io/api/swagger#/issue/issueGetComments">https://huballspice.allspice.io/api/swagger#/issue/issueGetComments
2015 def create_comment(self, body: str) -> Comment: 2016 """https://hub.allspice.io/api/swagger#/issue/issueCreateComment""" 2017 2018 path = self.GET_COMMENTS.format( 2019 owner=self.owner.username, repo=self.repository.name, index=self.number 2020 ) 2021 2022 response = self.allspice_client.requests_post(path, data={"body": body}) 2023 return Comment.parse_response(self.allspice_client, response)
allspice.allspice.io/api/swagger#/issue/issueCreateComment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateComment
2026class DesignReview(ApiObject): 2027 """ 2028 A Design Review. See 2029 https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest. 2030 2031 Note: The base and head fields are not `Branch` objects - they are plain strings 2032 referring to the branch names. This is because DRs can exist for branches that have 2033 been deleted, which don't have an associated `Branch` object from the API. You can use 2034 the `Repository.get_branch` method to get a `Branch` object for a branch if you know 2035 it exists. 2036 """ 2037 2038 additions: int 2039 allow_maintainer_edit: bool 2040 allow_maintainer_edits: Any 2041 assignee: User 2042 assignees: List["User"] 2043 base: str 2044 body: str 2045 changed_files: int 2046 closed_at: Any 2047 comments: int 2048 created_at: str 2049 deletions: int 2050 diff_url: str 2051 draft: bool 2052 due_date: Optional[str] 2053 head: str 2054 html_url: str 2055 id: int 2056 is_locked: bool 2057 labels: List[Any] 2058 merge_base: str 2059 merge_commit_sha: Any 2060 mergeable: bool 2061 merged: bool 2062 merged_at: Any 2063 merged_by: Any 2064 milestone: Any 2065 number: int 2066 patch_url: str 2067 pin_order: int 2068 repository: Optional["Repository"] 2069 requested_reviewers: Any 2070 review_comments: int 2071 state: str 2072 title: str 2073 updated_at: str 2074 url: str 2075 user: User 2076 2077 API_OBJECT = "/repos/{owner}/{repo}/pulls/{index}" 2078 MERGE_DESIGN_REVIEW = "/repos/{owner}/{repo}/pulls/{index}/merge" 2079 GET_COMMENTS = "/repos/{owner}/{repo}/issues/{index}/comments" 2080 2081 OPEN = "open" 2082 CLOSED = "closed" 2083 2084 class MergeType(Enum): 2085 MERGE = "merge" 2086 REBASE = "rebase" 2087 REBASE_MERGE = "rebase-merge" 2088 SQUASH = "squash" 2089 MANUALLY_MERGED = "manually-merged" 2090 2091 def __init__(self, allspice_client): 2092 super().__init__(allspice_client) 2093 2094 def __eq__(self, other): 2095 if not isinstance(other, DesignReview): 2096 return False 2097 return self.repository == other.repository and self.id == other.id 2098 2099 def __hash__(self): 2100 return hash(self.repository) ^ hash(self.id) 2101 2102 @classmethod 2103 def parse_response(cls, allspice_client, result) -> "DesignReview": 2104 api_object = super().parse_response(allspice_client, result) 2105 cls._add_read_property( 2106 "repository", 2107 Repository.parse_response(allspice_client, result["base"]["repo"]), 2108 api_object, 2109 ) 2110 2111 return api_object 2112 2113 @classmethod 2114 def request(cls, allspice_client, owner: str, repo: str, number: str): 2115 """See https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest""" 2116 return cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number}) 2117 2118 _fields_to_parsers: ClassVar[dict] = { 2119 "assignee": lambda allspice_client, u: User.parse_response(allspice_client, u), 2120 "assignees": lambda allspice_client, us: [ 2121 User.parse_response(allspice_client, u) for u in us 2122 ], 2123 "base": lambda _, b: b["ref"], 2124 "head": lambda _, h: h["ref"], 2125 "merged_by": lambda allspice_client, u: User.parse_response(allspice_client, u), 2126 "milestone": lambda allspice_client, m: Milestone.parse_response(allspice_client, m), 2127 "user": lambda allspice_client, u: User.parse_response(allspice_client, u), 2128 } 2129 2130 _patchable_fields: ClassVar[set[str]] = { 2131 "allow_maintainer_edits", 2132 "assignee", 2133 "assignees", 2134 "base", 2135 "body", 2136 "due_date", 2137 "milestone", 2138 "state", 2139 "title", 2140 } 2141 2142 _parsers_to_fields: ClassVar[dict] = { 2143 "assignee": lambda u: u.username, 2144 "assignees": lambda us: [u.username for u in us], 2145 "base": lambda b: b.name if isinstance(b, Branch) else b, 2146 "milestone": lambda m: m.id, 2147 } 2148 2149 def commit(self): 2150 data = self.get_dirty_fields() 2151 if "due_date" in data and data["due_date"] is None: 2152 data["unset_due_date"] = True 2153 2154 args = { 2155 "owner": self.repository.owner.username, 2156 "repo": self.repository.name, 2157 "index": self.number, 2158 } 2159 self._commit(args, data) 2160 2161 def merge(self, merge_type: MergeType): 2162 """ 2163 Merge the pull request. See 2164 https://hub.allspice.io/api/swagger#/repository/repoMergePullRequest 2165 2166 :param merge_type: The type of merge to perform. See the MergeType enum. 2167 """ 2168 2169 self.allspice_client.requests_put( 2170 self.MERGE_DESIGN_REVIEW.format( 2171 owner=self.repository.owner.username, 2172 repo=self.repository.name, 2173 index=self.number, 2174 ), 2175 data={"Do": merge_type.value}, 2176 ) 2177 2178 def get_comments(self) -> List[Comment]: 2179 """ 2180 Get the comments on this pull request, but not specifically on a review. 2181 2182 https://hub.allspice.io/api/swagger#/issue/issueGetComments 2183 2184 :return: A list of comments on this pull request. 2185 """ 2186 2187 results = self.allspice_client.requests_get( 2188 self.GET_COMMENTS.format( 2189 owner=self.repository.owner.username, 2190 repo=self.repository.name, 2191 index=self.number, 2192 ) 2193 ) 2194 return [Comment.parse_response(self.allspice_client, result) for result in results] 2195 2196 def create_comment(self, body: str): 2197 """ 2198 Create a comment on this pull request. This uses the same endpoint as the 2199 comments on issues, and will not be associated with any reviews. 2200 2201 https://hub.allspice.io/api/swagger#/issue/issueCreateComment 2202 2203 :param body: The body of the comment. 2204 :return: The comment that was created. 2205 """ 2206 2207 result = self.allspice_client.requests_post( 2208 self.GET_COMMENTS.format( 2209 owner=self.repository.owner.username, 2210 repo=self.repository.name, 2211 index=self.number, 2212 ), 2213 data={"body": body}, 2214 ) 2215 return Comment.parse_response(self.allspice_client, result)
A Design Review. See allspice.allspice.io/api/swagger#/repository/repoGetPullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoGetPullRequest.
Note: The base and head fields are not Branch
objects - they are plain strings
referring to the branch names. This is because DRs can exist for branches that have
been deleted, which don't have an associated Branch
object from the API. You can use
the Repository.get_branch
method to get a Branch
object for a branch if you know
it exists.
2102 @classmethod 2103 def parse_response(cls, allspice_client, result) -> "DesignReview": 2104 api_object = super().parse_response(allspice_client, result) 2105 cls._add_read_property( 2106 "repository", 2107 Repository.parse_response(allspice_client, result["base"]["repo"]), 2108 api_object, 2109 ) 2110 2111 return api_object
2113 @classmethod 2114 def request(cls, allspice_client, owner: str, repo: str, number: str): 2115 """See https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest""" 2116 return cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number})
See allspice.allspice.io/api/swagger#/repository/repoGetPullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoGetPullRequest
2149 def commit(self): 2150 data = self.get_dirty_fields() 2151 if "due_date" in data and data["due_date"] is None: 2152 data["unset_due_date"] = True 2153 2154 args = { 2155 "owner": self.repository.owner.username, 2156 "repo": self.repository.name, 2157 "index": self.number, 2158 } 2159 self._commit(args, data)
2161 def merge(self, merge_type: MergeType): 2162 """ 2163 Merge the pull request. See 2164 https://hub.allspice.io/api/swagger#/repository/repoMergePullRequest 2165 2166 :param merge_type: The type of merge to perform. See the MergeType enum. 2167 """ 2168 2169 self.allspice_client.requests_put( 2170 self.MERGE_DESIGN_REVIEW.format( 2171 owner=self.repository.owner.username, 2172 repo=self.repository.name, 2173 index=self.number, 2174 ), 2175 data={"Do": merge_type.value}, 2176 )
Merge the pull request. See allspice.allspice.io/api/swagger#/repository/repoMergePullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoMergePullRequest
Parameters
- merge_type: The type of merge to perform. See the MergeType enum.
2178 def get_comments(self) -> List[Comment]: 2179 """ 2180 Get the comments on this pull request, but not specifically on a review. 2181 2182 https://hub.allspice.io/api/swagger#/issue/issueGetComments 2183 2184 :return: A list of comments on this pull request. 2185 """ 2186 2187 results = self.allspice_client.requests_get( 2188 self.GET_COMMENTS.format( 2189 owner=self.repository.owner.username, 2190 repo=self.repository.name, 2191 index=self.number, 2192 ) 2193 ) 2194 return [Comment.parse_response(self.allspice_client, result) for result in results]
Get the comments on this pull request, but not specifically on a review.
allspice.allspice.io/api/swagger#/issue/issueGetComments">https://huballspice.allspice.io/api/swagger#/issue/issueGetComments
Returns
A list of comments on this pull request.
2196 def create_comment(self, body: str): 2197 """ 2198 Create a comment on this pull request. This uses the same endpoint as the 2199 comments on issues, and will not be associated with any reviews. 2200 2201 https://hub.allspice.io/api/swagger#/issue/issueCreateComment 2202 2203 :param body: The body of the comment. 2204 :return: The comment that was created. 2205 """ 2206 2207 result = self.allspice_client.requests_post( 2208 self.GET_COMMENTS.format( 2209 owner=self.repository.owner.username, 2210 repo=self.repository.name, 2211 index=self.number, 2212 ), 2213 data={"body": body}, 2214 ) 2215 return Comment.parse_response(self.allspice_client, result)
Create a comment on this pull request. This uses the same endpoint as the comments on issues, and will not be associated with any reviews.
allspice.allspice.io/api/swagger#/issue/issueCreateComment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateComment
Parameters
- body: The body of the comment.
Returns
The comment that was created.
2218class Team(ApiObject): 2219 can_create_org_repo: bool 2220 description: str 2221 id: int 2222 includes_all_repositories: bool 2223 name: str 2224 organization: Optional["Organization"] 2225 permission: str 2226 units: List[str] 2227 units_map: Dict[str, str] 2228 2229 API_OBJECT = """/teams/{id}""" # <id> 2230 ADD_REPO = """/teams/%s/repos/%s/%s""" # <id, org, repo> 2231 TEAM_DELETE = """/teams/%s""" # <id> 2232 GET_MEMBERS = """/teams/%s/members""" # <id> 2233 GET_REPOS = """/teams/%s/repos""" # <id> 2234 2235 def __init__(self, allspice_client): 2236 super().__init__(allspice_client) 2237 2238 def __eq__(self, other): 2239 if not isinstance(other, Team): 2240 return False 2241 return self.organization == other.organization and self.id == other.id 2242 2243 def __hash__(self): 2244 return hash(self.organization) ^ hash(self.id) 2245 2246 _fields_to_parsers: ClassVar[dict] = { 2247 "organization": lambda allspice_client, o: Organization.parse_response(allspice_client, o) 2248 } 2249 2250 _patchable_fields: ClassVar[set[str]] = { 2251 "can_create_org_repo", 2252 "description", 2253 "includes_all_repositories", 2254 "name", 2255 "permission", 2256 "units", 2257 "units_map", 2258 } 2259 2260 @classmethod 2261 def request(cls, allspice_client, id: int): 2262 return cls._request(allspice_client, {"id": id}) 2263 2264 def commit(self): 2265 args = {"id": self.id} 2266 self._commit(args) 2267 2268 def add_user(self, user: User): 2269 """https://hub.allspice.io/api/swagger#/organization/orgAddTeamMember""" 2270 url = f"/teams/{self.id}/members/{user.login}" 2271 self.allspice_client.requests_put(url) 2272 2273 def add_repo(self, org: Organization, repo: Union[Repository, str]): 2274 if isinstance(repo, Repository): 2275 repo_name = repo.name 2276 else: 2277 repo_name = repo 2278 self.allspice_client.requests_put(Team.ADD_REPO % (self.id, org.username, repo_name)) 2279 2280 def get_members(self): 2281 """Get all users assigned to the team.""" 2282 results = self.allspice_client.requests_get_paginated( 2283 Team.GET_MEMBERS % self.id, 2284 ) 2285 return [User.parse_response(self.allspice_client, result) for result in results] 2286 2287 def get_repos(self): 2288 """Get all repos of this Team.""" 2289 results = self.allspice_client.requests_get(Team.GET_REPOS % self.id) 2290 return [Repository.parse_response(self.allspice_client, result) for result in results] 2291 2292 def delete(self): 2293 self.allspice_client.requests_delete(Team.TEAM_DELETE % self.id) 2294 self.deleted = True 2295 2296 def remove_team_member(self, user_name: str): 2297 url = f"/teams/{self.id}/members/{user_name}" 2298 self.allspice_client.requests_delete(url)
2268 def add_user(self, user: User): 2269 """https://hub.allspice.io/api/swagger#/organization/orgAddTeamMember""" 2270 url = f"/teams/{self.id}/members/{user.login}" 2271 self.allspice_client.requests_put(url)
allspice.allspice.io/api/swagger#/organization/orgAddTeamMember">https://huballspice.allspice.io/api/swagger#/organization/orgAddTeamMember
2280 def get_members(self): 2281 """Get all users assigned to the team.""" 2282 results = self.allspice_client.requests_get_paginated( 2283 Team.GET_MEMBERS % self.id, 2284 ) 2285 return [User.parse_response(self.allspice_client, result) for result in results]
Get all users assigned to the team.
2287 def get_repos(self): 2288 """Get all repos of this Team.""" 2289 results = self.allspice_client.requests_get(Team.GET_REPOS % self.id) 2290 return [Repository.parse_response(self.allspice_client, result) for result in results]
Get all repos of this Team.
2301class Release(ApiObject): 2302 """ 2303 A release on a repo. 2304 """ 2305 2306 assets: List[Union[Any, Dict[str, Union[int, str]], "ReleaseAsset"]] 2307 author: User 2308 body: str 2309 created_at: str 2310 draft: bool 2311 html_url: str 2312 id: int 2313 name: str 2314 prerelease: bool 2315 published_at: str 2316 repo: Optional["Repository"] 2317 repository: Optional["Repository"] 2318 tag_name: str 2319 tarball_url: str 2320 target_commitish: str 2321 upload_url: str 2322 url: str 2323 zipball_url: str 2324 2325 API_OBJECT = "/repos/{owner}/{repo}/releases/{id}" 2326 RELEASE_CREATE_ASSET = "/repos/{owner}/{repo}/releases/{id}/assets" 2327 # Note that we don't strictly need the get_assets route, as the release 2328 # object already contains the assets. 2329 2330 def __init__(self, allspice_client): 2331 super().__init__(allspice_client) 2332 2333 def __eq__(self, other): 2334 if not isinstance(other, Release): 2335 return False 2336 return self.repo == other.repo and self.id == other.id 2337 2338 def __hash__(self): 2339 return hash(self.repo) ^ hash(self.id) 2340 2341 _fields_to_parsers: ClassVar[dict] = { 2342 "author": lambda allspice_client, author: User.parse_response(allspice_client, author), 2343 } 2344 _patchable_fields: ClassVar[set[str]] = { 2345 "body", 2346 "draft", 2347 "name", 2348 "prerelease", 2349 "tag_name", 2350 "target_commitish", 2351 } 2352 2353 @classmethod 2354 def parse_response(cls, allspice_client, result, repo) -> Release: 2355 release = super().parse_response(allspice_client, result) 2356 Release._add_read_property("repository", repo, release) 2357 # For legacy reasons 2358 Release._add_read_property("repo", repo, release) 2359 setattr( 2360 release, 2361 "_assets", 2362 [ 2363 ReleaseAsset.parse_response(allspice_client, asset, release) 2364 for asset in result["assets"] 2365 ], 2366 ) 2367 return release 2368 2369 @classmethod 2370 def request( 2371 cls, 2372 allspice_client, 2373 owner: str, 2374 repo: str, 2375 id: Optional[int] = None, 2376 ) -> Release: 2377 args = {"owner": owner, "repo": repo, "id": id} 2378 release_response = cls._get_gitea_api_object(allspice_client, args) 2379 repository = Repository.request(allspice_client, owner, repo) 2380 release = cls.parse_response(allspice_client, release_response, repository) 2381 return release 2382 2383 def commit(self): 2384 args = {"owner": self.repo.owner.name, "repo": self.repo.name, "id": self.id} 2385 self._commit(args) 2386 2387 def create_asset(self, file: IO, name: Optional[str] = None) -> ReleaseAsset: 2388 """ 2389 Create an asset for this release. 2390 2391 https://hub.allspice.io/api/swagger#/repository/repoCreateReleaseAsset 2392 2393 :param file: The file to upload. This should be a file-like object. 2394 :param name: The name of the file. 2395 :return: The created asset. 2396 """ 2397 2398 args: dict[str, Any] = {"files": {"attachment": file}} 2399 if name is not None: 2400 args["params"] = {"name": name} 2401 2402 result = self.allspice_client.requests_post( 2403 self.RELEASE_CREATE_ASSET.format( 2404 owner=self.repo.owner.username, 2405 repo=self.repo.name, 2406 id=self.id, 2407 ), 2408 **args, 2409 ) 2410 return ReleaseAsset.parse_response(self.allspice_client, result, self) 2411 2412 def delete(self): 2413 args = {"owner": self.repo.owner.name, "repo": self.repo.name, "id": self.id} 2414 self.allspice_client.requests_delete(self.API_OBJECT.format(**args)) 2415 self.deleted = True
A release on a repo.
2353 @classmethod 2354 def parse_response(cls, allspice_client, result, repo) -> Release: 2355 release = super().parse_response(allspice_client, result) 2356 Release._add_read_property("repository", repo, release) 2357 # For legacy reasons 2358 Release._add_read_property("repo", repo, release) 2359 setattr( 2360 release, 2361 "_assets", 2362 [ 2363 ReleaseAsset.parse_response(allspice_client, asset, release) 2364 for asset in result["assets"] 2365 ], 2366 ) 2367 return release
2369 @classmethod 2370 def request( 2371 cls, 2372 allspice_client, 2373 owner: str, 2374 repo: str, 2375 id: Optional[int] = None, 2376 ) -> Release: 2377 args = {"owner": owner, "repo": repo, "id": id} 2378 release_response = cls._get_gitea_api_object(allspice_client, args) 2379 repository = Repository.request(allspice_client, owner, repo) 2380 release = cls.parse_response(allspice_client, release_response, repository) 2381 return release
2387 def create_asset(self, file: IO, name: Optional[str] = None) -> ReleaseAsset: 2388 """ 2389 Create an asset for this release. 2390 2391 https://hub.allspice.io/api/swagger#/repository/repoCreateReleaseAsset 2392 2393 :param file: The file to upload. This should be a file-like object. 2394 :param name: The name of the file. 2395 :return: The created asset. 2396 """ 2397 2398 args: dict[str, Any] = {"files": {"attachment": file}} 2399 if name is not None: 2400 args["params"] = {"name": name} 2401 2402 result = self.allspice_client.requests_post( 2403 self.RELEASE_CREATE_ASSET.format( 2404 owner=self.repo.owner.username, 2405 repo=self.repo.name, 2406 id=self.id, 2407 ), 2408 **args, 2409 ) 2410 return ReleaseAsset.parse_response(self.allspice_client, result, self)
Create an asset for this release.
allspice.allspice.io/api/swagger#/repository/repoCreateReleaseAsset">https://huballspice.allspice.io/api/swagger#/repository/repoCreateReleaseAsset
Parameters
- file: The file to upload. This should be a file-like object.
- name: The name of the file.
Returns
The created asset.
2418class ReleaseAsset(ApiObject): 2419 browser_download_url: str 2420 created_at: str 2421 download_count: int 2422 id: int 2423 name: str 2424 release: Optional["Release"] 2425 size: int 2426 uuid: str 2427 2428 API_OBJECT = "/repos/{owner}/{repo}/releases/{release_id}/assets/{id}" 2429 2430 def __init__(self, allspice_client): 2431 super().__init__(allspice_client) 2432 2433 def __eq__(self, other): 2434 if not isinstance(other, ReleaseAsset): 2435 return False 2436 return self.release == other.release and self.id == other.id 2437 2438 def __hash__(self): 2439 return hash(self.release) ^ hash(self.id) 2440 2441 _fields_to_parsers: ClassVar[dict] = {} 2442 _patchable_fields: ClassVar[set[str]] = { 2443 "name", 2444 } 2445 2446 @classmethod 2447 def parse_response(cls, allspice_client, result, release) -> ReleaseAsset: 2448 asset = super().parse_response(allspice_client, result) 2449 ReleaseAsset._add_read_property("release", release, asset) 2450 return asset 2451 2452 @classmethod 2453 def request( 2454 cls, 2455 allspice_client, 2456 owner: str, 2457 repo: str, 2458 release_id: int, 2459 id: int, 2460 ) -> ReleaseAsset: 2461 args = {"owner": owner, "repo": repo, "release_id": release_id, "id": id} 2462 asset_response = cls._get_gitea_api_object(allspice_client, args) 2463 release = Release.request(allspice_client, owner, repo, release_id) 2464 asset = cls.parse_response(allspice_client, asset_response, release) 2465 return asset 2466 2467 def commit(self): 2468 args = { 2469 "owner": self.release.repo.owner, 2470 "repo": self.release.repo.name, 2471 "release_id": self.release.id, 2472 "id": self.id, 2473 } 2474 self._commit(args) 2475 2476 def download(self) -> bytes: 2477 """ 2478 Download the raw, binary data of this asset. 2479 2480 Note 1: if the file you are requesting is a text file, you might want to 2481 use .decode() on the result to get a string. For example: 2482 2483 asset.download().decode("utf-8") 2484 2485 Note 2: this method will store the entire file in memory. If you are 2486 downloading a large file, you might want to use download_to_file instead. 2487 """ 2488 2489 return self.allspice_client.requests.get( 2490 self.browser_download_url, 2491 headers=self.allspice_client.headers, 2492 ).content 2493 2494 def download_to_file(self, io: IO): 2495 """ 2496 Download the raw, binary data of this asset to a file-like object. 2497 2498 Example: 2499 2500 with open("my_file.zip", "wb") as f: 2501 asset.download_to_file(f) 2502 2503 :param io: The file-like object to write the data to. 2504 """ 2505 2506 response = self.allspice_client.requests.get( 2507 self.browser_download_url, 2508 headers=self.allspice_client.headers, 2509 stream=True, 2510 ) 2511 # 4kb chunks 2512 for chunk in response.iter_content(chunk_size=4096): 2513 if chunk: 2514 io.write(chunk) 2515 2516 def delete(self): 2517 args = { 2518 "owner": self.release.repo.owner.name, 2519 "repo": self.release.repo.name, 2520 "release_id": self.release.id, 2521 "id": self.id, 2522 } 2523 self.allspice_client.requests_delete(self.API_OBJECT.format(**args)) 2524 self.deleted = True
2452 @classmethod 2453 def request( 2454 cls, 2455 allspice_client, 2456 owner: str, 2457 repo: str, 2458 release_id: int, 2459 id: int, 2460 ) -> ReleaseAsset: 2461 args = {"owner": owner, "repo": repo, "release_id": release_id, "id": id} 2462 asset_response = cls._get_gitea_api_object(allspice_client, args) 2463 release = Release.request(allspice_client, owner, repo, release_id) 2464 asset = cls.parse_response(allspice_client, asset_response, release) 2465 return asset
2476 def download(self) -> bytes: 2477 """ 2478 Download the raw, binary data of this asset. 2479 2480 Note 1: if the file you are requesting is a text file, you might want to 2481 use .decode() on the result to get a string. For example: 2482 2483 asset.download().decode("utf-8") 2484 2485 Note 2: this method will store the entire file in memory. If you are 2486 downloading a large file, you might want to use download_to_file instead. 2487 """ 2488 2489 return self.allspice_client.requests.get( 2490 self.browser_download_url, 2491 headers=self.allspice_client.headers, 2492 ).content
Download the raw, binary data of this asset.
Note 1: if the file you are requesting is a text file, you might want to use .decode() on the result to get a string. For example:
asset.download().decode("utf-8")
Note 2: this method will store the entire file in memory. If you are downloading a large file, you might want to use download_to_file instead.
2494 def download_to_file(self, io: IO): 2495 """ 2496 Download the raw, binary data of this asset to a file-like object. 2497 2498 Example: 2499 2500 with open("my_file.zip", "wb") as f: 2501 asset.download_to_file(f) 2502 2503 :param io: The file-like object to write the data to. 2504 """ 2505 2506 response = self.allspice_client.requests.get( 2507 self.browser_download_url, 2508 headers=self.allspice_client.headers, 2509 stream=True, 2510 ) 2511 # 4kb chunks 2512 for chunk in response.iter_content(chunk_size=4096): 2513 if chunk: 2514 io.write(chunk)
Download the raw, binary data of this asset to a file-like object.
Example:
with open("my_file.zip", "wb") as f:
asset.download_to_file(f)
Parameters
- io: The file-like object to write the data to.
2527class Content(ReadonlyApiObject): 2528 content: Any 2529 download_url: str 2530 encoding: Any 2531 git_url: str 2532 html_url: str 2533 last_commit_sha: str 2534 name: str 2535 path: str 2536 sha: str 2537 size: int 2538 submodule_git_url: Any 2539 target: Any 2540 type: str 2541 url: str 2542 2543 FILE = "file" 2544 2545 def __init__(self, allspice_client): 2546 super().__init__(allspice_client) 2547 2548 def __eq__(self, other): 2549 if not isinstance(other, Content): 2550 return False 2551 2552 return self.sha == other.sha and self.name == other.name 2553 2554 def __hash__(self): 2555 return hash(self.sha) ^ hash(self.name)
Inherited Members
2561class Util: 2562 @staticmethod 2563 def convert_time(time: str) -> datetime: 2564 """Parsing of strange Gitea time format ("%Y-%m-%dT%H:%M:%S:%z" but with ":" in time zone notation)""" 2565 try: 2566 return datetime.strptime(time[:-3] + "00", "%Y-%m-%dT%H:%M:%S%z") 2567 except ValueError: 2568 return datetime.strptime(time[:-3] + "00", "%Y-%m-%dT%H:%M:%S") 2569 2570 @staticmethod 2571 def format_time(time: datetime) -> str: 2572 """ 2573 Format a datetime object to Gitea's time format. 2574 2575 :param time: The time to format 2576 :return: Formatted time 2577 """ 2578 2579 return time.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S") + "Z" 2580 2581 @staticmethod 2582 def data_params_for_ref(ref: Optional[Ref]) -> Dict: 2583 """ 2584 Given a "ref", returns a dict with the ref parameter for the API call. 2585 2586 If the ref is None, returns an empty dict. You can pass this to the API 2587 directly. 2588 """ 2589 2590 if isinstance(ref, Branch): 2591 return {"ref": ref.name} 2592 elif isinstance(ref, Commit): 2593 return {"ref": ref.sha} 2594 elif ref: 2595 return {"ref": ref} 2596 else: 2597 return {}
2562 @staticmethod 2563 def convert_time(time: str) -> datetime: 2564 """Parsing of strange Gitea time format ("%Y-%m-%dT%H:%M:%S:%z" but with ":" in time zone notation)""" 2565 try: 2566 return datetime.strptime(time[:-3] + "00", "%Y-%m-%dT%H:%M:%S%z") 2567 except ValueError: 2568 return datetime.strptime(time[:-3] + "00", "%Y-%m-%dT%H:%M:%S")
Parsing of strange Gitea time format ("%Y-%m-%dT%H:%M:%S:%z" but with ":" in time zone notation)
2570 @staticmethod 2571 def format_time(time: datetime) -> str: 2572 """ 2573 Format a datetime object to Gitea's time format. 2574 2575 :param time: The time to format 2576 :return: Formatted time 2577 """ 2578 2579 return time.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S") + "Z"
Format a datetime object to Gitea's time format.
Parameters
- time: The time to format
Returns
Formatted time
2581 @staticmethod 2582 def data_params_for_ref(ref: Optional[Ref]) -> Dict: 2583 """ 2584 Given a "ref", returns a dict with the ref parameter for the API call. 2585 2586 If the ref is None, returns an empty dict. You can pass this to the API 2587 directly. 2588 """ 2589 2590 if isinstance(ref, Branch): 2591 return {"ref": ref.name} 2592 elif isinstance(ref, Commit): 2593 return {"ref": ref.sha} 2594 elif ref: 2595 return {"ref": ref} 2596 else: 2597 return {}
Given a "ref", returns a dict with the ref parameter for the API call.
If the ref is None, returns an empty dict. You can pass this to the API directly.