allspice.apiobject
1from __future__ import annotations 2 3import logging 4import re 5from datetime import datetime, timezone 6from enum import Enum 7from functools import cached_property 8from typing import ( 9 IO, 10 Any, 11 ClassVar, 12 Dict, 13 FrozenSet, 14 List, 15 Literal, 16 Optional, 17 Sequence, 18 Set, 19 Tuple, 20 Union, 21) 22 23try: 24 from typing_extensions import Self 25except ImportError: 26 from typing import Self 27 28from .baseapiobject import ApiObject, ReadonlyApiObject 29from .exceptions import ConflictException, NotFoundException 30 31 32class Organization(ApiObject): 33 """see https://hub.allspice.io/api/swagger#/organization/orgGetAll""" 34 35 active: Optional[bool] 36 avatar_url: str 37 created: Optional[str] 38 description: str 39 email: str 40 followers_count: Optional[int] 41 following_count: Optional[int] 42 full_name: str 43 html_url: Optional[str] 44 id: int 45 is_admin: Optional[bool] 46 language: Optional[str] 47 last_login: Optional[str] 48 location: str 49 login: Optional[str] 50 login_name: Optional[str] 51 name: Optional[str] 52 prohibit_login: Optional[bool] 53 repo_admin_change_team_access: Optional[bool] 54 restricted: Optional[bool] 55 source_id: Optional[int] 56 starred_repos_count: Optional[int] 57 username: str 58 visibility: str 59 website: str 60 61 API_OBJECT = """/orgs/{name}""" # <org> 62 ORG_REPOS_REQUEST = """/orgs/%s/repos""" # <org> 63 ORG_TEAMS_REQUEST = """/orgs/%s/teams""" # <org> 64 ORG_TEAMS_CREATE = """/orgs/%s/teams""" # <org> 65 ORG_GET_MEMBERS = """/orgs/%s/members""" # <org> 66 ORG_IS_MEMBER = """/orgs/%s/members/%s""" # <org>, <username> 67 ORG_HEATMAP = """/users/%s/heatmap""" # <username> 68 69 def __init__(self, allspice_client): 70 super().__init__(allspice_client) 71 72 def __eq__(self, other): 73 if not isinstance(other, Organization): 74 return False 75 return self.allspice_client == other.allspice_client and self.name == other.name 76 77 def __hash__(self): 78 return hash(self.allspice_client) ^ hash(self.name) 79 80 @classmethod 81 def request(cls, allspice_client, name: str) -> Self: 82 return cls._request(allspice_client, {"name": name}) 83 84 @classmethod 85 def parse_response(cls, allspice_client, result) -> "Organization": 86 api_object = super().parse_response(allspice_client, result) 87 # add "name" field to make this behave similar to users for gitea < 1.18 88 # also necessary for repository-owner when org is repo owner 89 if not hasattr(api_object, "name"): 90 Organization._add_read_property("name", result["username"], api_object) 91 return api_object 92 93 _patchable_fields: ClassVar[set[str]] = { 94 "description", 95 "full_name", 96 "location", 97 "visibility", 98 "website", 99 } 100 101 def commit(self): 102 args = {"name": self.name} 103 self._commit(args) 104 105 def create_repo( 106 self, 107 repoName: str, 108 description: str = "", 109 private: bool = False, 110 autoInit=True, 111 gitignores: Optional[str] = None, 112 license: Optional[str] = None, 113 readme: str = "Default", 114 issue_labels: Optional[str] = None, 115 default_branch="master", 116 ): 117 """Create an organization Repository 118 119 Throws: 120 AlreadyExistsException: If the Repository exists already. 121 Exception: If something else went wrong. 122 """ 123 result = self.allspice_client.requests_post( 124 f"/orgs/{self.name}/repos", 125 data={ 126 "name": repoName, 127 "description": description, 128 "private": private, 129 "auto_init": autoInit, 130 "gitignores": gitignores, 131 "license": license, 132 "issue_labels": issue_labels, 133 "readme": readme, 134 "default_branch": default_branch, 135 }, 136 ) 137 if "id" in result: 138 self.allspice_client.logger.info("Successfully created Repository %s " % result["name"]) 139 else: 140 self.allspice_client.logger.error(result["message"]) 141 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 142 return Repository.parse_response(self.allspice_client, result) 143 144 def get_repositories(self) -> List["Repository"]: 145 results = self.allspice_client.requests_get_paginated( 146 Organization.ORG_REPOS_REQUEST % self.username 147 ) 148 return [Repository.parse_response(self.allspice_client, result) for result in results] 149 150 def get_repository(self, name) -> "Repository": 151 repos = self.get_repositories() 152 for repo in repos: 153 if repo.name == name: 154 return repo 155 raise NotFoundException("Repository %s not existent in organization." % name) 156 157 def get_teams(self) -> List["Team"]: 158 results = self.allspice_client.requests_get(Organization.ORG_TEAMS_REQUEST % self.username) 159 teams = [Team.parse_response(self.allspice_client, result) for result in results] 160 # organisation seems to be missing using this request, so we add org manually 161 for t in teams: 162 setattr(t, "_organization", self) 163 return teams 164 165 def get_team(self, name) -> "Team": 166 teams = self.get_teams() 167 for team in teams: 168 if team.name == name: 169 return team 170 raise NotFoundException("Team not existent in organization.") 171 172 def create_team( 173 self, 174 name: str, 175 description: str = "", 176 permission: str = "read", 177 can_create_org_repo: bool = False, 178 includes_all_repositories: bool = False, 179 units=( 180 "repo.code", 181 "repo.issues", 182 "repo.ext_issues", 183 "repo.wiki", 184 "repo.pulls", 185 "repo.releases", 186 "repo.ext_wiki", 187 ), 188 units_map={}, 189 ) -> "Team": 190 """Alias for AllSpice#create_team""" 191 # TODO: Move AllSpice#create_team to Organization#create_team and 192 # deprecate AllSpice#create_team. 193 return self.allspice_client.create_team( 194 org=self, 195 name=name, 196 description=description, 197 permission=permission, 198 can_create_org_repo=can_create_org_repo, 199 includes_all_repositories=includes_all_repositories, 200 units=units, 201 units_map=units_map, 202 ) 203 204 def get_members(self) -> List["User"]: 205 results = self.allspice_client.requests_get(Organization.ORG_GET_MEMBERS % self.username) 206 return [User.parse_response(self.allspice_client, result) for result in results] 207 208 def is_member(self, username) -> bool: 209 if isinstance(username, User): 210 username = username.username 211 try: 212 # returns 204 if its ok, 404 if its not 213 self.allspice_client.requests_get( 214 Organization.ORG_IS_MEMBER % (self.username, username) 215 ) 216 return True 217 except Exception: 218 return False 219 220 def remove_member(self, user: "User"): 221 path = f"/orgs/{self.username}/members/{user.username}" 222 self.allspice_client.requests_delete(path) 223 224 def delete(self): 225 """Delete this Organization. Invalidates this Objects data. Also deletes all Repositories owned by the User""" 226 for repo in self.get_repositories(): 227 repo.delete() 228 self.allspice_client.requests_delete(Organization.API_OBJECT.format(name=self.username)) 229 self.deleted = True 230 231 def get_heatmap(self) -> List[Tuple[datetime, int]]: 232 results = self.allspice_client.requests_get(User.USER_HEATMAP % self.username) 233 results = [ 234 (datetime.fromtimestamp(result["timestamp"]), result["contributions"]) 235 for result in results 236 ] 237 return results 238 239 240class User(ApiObject): 241 active: bool 242 admin: Any 243 allow_create_organization: Any 244 allow_git_hook: Any 245 allow_import_local: Any 246 avatar_url: str 247 created: str 248 description: str 249 email: str 250 emails: List[Any] 251 followers_count: int 252 following_count: int 253 full_name: str 254 html_url: str 255 id: int 256 is_admin: bool 257 language: str 258 last_login: str 259 location: str 260 login: str 261 login_name: str 262 max_repo_creation: Any 263 must_change_password: Any 264 password: Any 265 prohibit_login: bool 266 restricted: bool 267 source_id: int 268 starred_repos_count: int 269 username: str 270 visibility: str 271 website: str 272 273 API_OBJECT = """/users/{name}""" # <org> 274 USER_MAIL = """/user/emails?sudo=%s""" # <name> 275 USER_PATCH = """/admin/users/%s""" # <username> 276 ADMIN_DELETE_USER = """/admin/users/%s""" # <username> 277 ADMIN_EDIT_USER = """/admin/users/{username}""" # <username> 278 USER_HEATMAP = """/users/%s/heatmap""" # <username> 279 280 def __init__(self, allspice_client): 281 super().__init__(allspice_client) 282 self._emails = [] 283 284 def __eq__(self, other): 285 if not isinstance(other, User): 286 return False 287 return self.allspice_client == other.allspice_client and self.id == other.id 288 289 def __hash__(self): 290 return hash(self.allspice_client) ^ hash(self.id) 291 292 @property 293 def emails(self): 294 self.__request_emails() 295 return self._emails 296 297 @classmethod 298 def request(cls, allspice_client, name: str) -> "User": 299 api_object = cls._request(allspice_client, {"name": name}) 300 return api_object 301 302 _patchable_fields: ClassVar[set[str]] = { 303 "active", 304 "admin", 305 "allow_create_organization", 306 "allow_git_hook", 307 "allow_import_local", 308 "email", 309 "full_name", 310 "location", 311 "login_name", 312 "max_repo_creation", 313 "must_change_password", 314 "password", 315 "prohibit_login", 316 "website", 317 } 318 319 def commit(self, login_name: str, source_id: int = 0): 320 """ 321 Unfortunately it is necessary to require the login name 322 as well as the login source (that is not supplied when getting a user) for 323 changing a user. 324 Usually source_id is 0 and the login_name is equal to the username. 325 """ 326 values = self.get_dirty_fields() 327 values.update( 328 # api-doc says that the "source_id" is necessary; works without though 329 {"login_name": login_name, "source_id": source_id} 330 ) 331 args = {"username": self.username} 332 self.allspice_client.requests_patch(User.ADMIN_EDIT_USER.format(**args), data=values) 333 self._dirty_fields = {} 334 335 def create_repo( 336 self, 337 repoName: str, 338 description: str = "", 339 private: bool = False, 340 autoInit=True, 341 gitignores: Optional[str] = None, 342 license: Optional[str] = None, 343 readme: str = "Default", 344 issue_labels: Optional[str] = None, 345 default_branch="master", 346 ): 347 """Create a user Repository 348 349 Throws: 350 AlreadyExistsException: If the Repository exists already. 351 Exception: If something else went wrong. 352 """ 353 result = self.allspice_client.requests_post( 354 "/user/repos", 355 data={ 356 "name": repoName, 357 "description": description, 358 "private": private, 359 "auto_init": autoInit, 360 "gitignores": gitignores, 361 "license": license, 362 "issue_labels": issue_labels, 363 "readme": readme, 364 "default_branch": default_branch, 365 }, 366 ) 367 if "id" in result: 368 self.allspice_client.logger.info("Successfully created Repository %s " % result["name"]) 369 else: 370 self.allspice_client.logger.error(result["message"]) 371 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 372 return Repository.parse_response(self.allspice_client, result) 373 374 def get_repositories(self) -> List["Repository"]: 375 """Get all Repositories owned by this User.""" 376 url = f"/users/{self.username}/repos" 377 results = self.allspice_client.requests_get_paginated(url) 378 return [Repository.parse_response(self.allspice_client, result) for result in results] 379 380 def get_orgs(self) -> List[Organization]: 381 """Get all Organizations this user is a member of.""" 382 url = f"/users/{self.username}/orgs" 383 results = self.allspice_client.requests_get_paginated(url) 384 return [Organization.parse_response(self.allspice_client, result) for result in results] 385 386 def get_teams(self) -> List["Team"]: 387 url = "/user/teams" 388 results = self.allspice_client.requests_get_paginated(url, sudo=self) 389 return [Team.parse_response(self.allspice_client, result) for result in results] 390 391 def get_accessible_repos(self) -> List["Repository"]: 392 """Get all Repositories accessible by the logged in User.""" 393 results = self.allspice_client.requests_get("/user/repos", sudo=self) 394 return [Repository.parse_response(self.allspice_client, result) for result in results] 395 396 def __request_emails(self): 397 result = self.allspice_client.requests_get(User.USER_MAIL % self.login) 398 # report if the adress changed by this 399 for mail in result: 400 self._emails.append(mail["email"]) 401 if mail["primary"]: 402 self._email = mail["email"] 403 404 def delete(self): 405 """Deletes this User. Also deletes all Repositories he owns.""" 406 self.allspice_client.requests_delete(User.ADMIN_DELETE_USER % self.username) 407 self.deleted = True 408 409 def get_heatmap(self) -> List[Tuple[datetime, int]]: 410 results = self.allspice_client.requests_get(User.USER_HEATMAP % self.username) 411 results = [ 412 (datetime.fromtimestamp(result["timestamp"]), result["contributions"]) 413 for result in results 414 ] 415 return results 416 417 418class Branch(ReadonlyApiObject): 419 commit: Dict[str, Optional[Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]]] 420 effective_branch_protection_name: str 421 enable_status_check: bool 422 name: str 423 protected: bool 424 required_approvals: int 425 status_check_contexts: List[Any] 426 user_can_merge: bool 427 user_can_push: bool 428 429 API_OBJECT = """/repos/{owner}/{repo}/branches/{branch}""" 430 431 def __init__(self, allspice_client): 432 super().__init__(allspice_client) 433 434 def __eq__(self, other): 435 if not isinstance(other, Branch): 436 return False 437 return self.commit == other.commit and self.name == other.name 438 439 def __hash__(self): 440 return hash(self.commit["id"]) ^ hash(self.name) 441 442 _fields_to_parsers: ClassVar[dict] = { 443 # This is not a commit object 444 # "commit": lambda allspice_client, c: Commit.parse_response(allspice_client, c) 445 } 446 447 @classmethod 448 def request(cls, allspice_client, owner: str, repo: str, branch: str): 449 return cls._request(allspice_client, {"owner": owner, "repo": repo, "branch": branch}) 450 451 452class GitEntry(ReadonlyApiObject): 453 """ 454 An object representing a file or directory in the Git tree. 455 """ 456 457 mode: str 458 path: str 459 sha: str 460 size: int 461 type: str 462 url: str 463 464 def __init__(self, allspice_client): 465 super().__init__(allspice_client) 466 467 def __eq__(self, other) -> bool: 468 if not isinstance(other, GitEntry): 469 return False 470 return self.sha == other.sha 471 472 def __hash__(self) -> int: 473 return hash(self.sha) 474 475 476class Repository(ApiObject): 477 allow_fast_forward_only_merge: bool 478 allow_manual_merge: Any 479 allow_merge_commits: bool 480 allow_rebase: bool 481 allow_rebase_explicit: bool 482 allow_rebase_update: bool 483 allow_squash_merge: bool 484 archived: bool 485 archived_at: str 486 autodetect_manual_merge: Any 487 avatar_url: str 488 clone_url: str 489 created_at: str 490 default_allow_maintainer_edit: bool 491 default_branch: str 492 default_delete_branch_after_merge: bool 493 default_merge_style: str 494 description: str 495 empty: bool 496 enable_prune: Any 497 external_tracker: Any 498 external_wiki: Any 499 fork: bool 500 forks_count: int 501 full_name: str 502 has_actions: bool 503 has_issues: bool 504 has_packages: bool 505 has_projects: bool 506 has_pull_requests: bool 507 has_releases: bool 508 has_wiki: bool 509 html_url: str 510 id: int 511 ignore_whitespace_conflicts: bool 512 internal: bool 513 internal_tracker: Dict[str, bool] 514 language: str 515 languages_url: str 516 link: str 517 mirror: bool 518 mirror_interval: str 519 mirror_updated: str 520 name: str 521 object_format_name: str 522 open_issues_count: int 523 open_pr_counter: int 524 original_url: str 525 owner: Union["User", "Organization"] 526 parent: Any 527 permissions: Dict[str, bool] 528 private: bool 529 projects_mode: str 530 release_counter: int 531 repo_transfer: Any 532 size: int 533 ssh_url: str 534 stars_count: int 535 template: bool 536 updated_at: datetime 537 url: str 538 watchers_count: int 539 website: str 540 541 API_OBJECT = """/repos/{owner}/{name}""" # <owner>, <reponame> 542 REPO_IS_COLLABORATOR = """/repos/%s/%s/collaborators/%s""" # <owner>, <reponame>, <username> 543 REPO_SEARCH = """/repos/search/""" 544 REPO_BRANCHES = """/repos/%s/%s/branches""" # <owner>, <reponame> 545 REPO_BRANCH = """/repos/{owner}/{repo}/branches/{branch}""" 546 REPO_ISSUES = """/repos/{owner}/{repo}/issues""" # <owner, reponame> 547 REPO_DESIGN_REVIEWS = """/repos/{owner}/{repo}/pulls""" 548 REPO_DELETE = """/repos/%s/%s""" # <owner>, <reponame> 549 REPO_TIMES = """/repos/%s/%s/times""" # <owner>, <reponame> 550 REPO_USER_TIME = """/repos/%s/%s/times/%s""" # <owner>, <reponame>, <username> 551 REPO_COMMITS = "/repos/%s/%s/commits" # <owner>, <reponame> 552 REPO_TRANSFER = "/repos/{owner}/{repo}/transfer" 553 REPO_MILESTONES = """/repos/{owner}/{repo}/milestones""" 554 REPO_GET_ARCHIVE = "/repos/{owner}/{repo}/archive/{ref}.{format}" 555 REPO_GET_ALLSPICE_JSON = "/repos/{owner}/{repo}/allspice_generated/json/{content}" 556 REPO_GET_ALLSPICE_SVG = "/repos/{owner}/{repo}/allspice_generated/svg/{content}" 557 REPO_GET_TOPICS = "/repos/{owner}/{repo}/topics" 558 REPO_ADD_TOPIC = "/repos/{owner}/{repo}/topics/{topic}" 559 REPO_GET_RELEASES = "/repos/{owner}/{repo}/releases" 560 REPO_GET_LATEST_RELEASE = "/repos/{owner}/{repo}/releases/latest" 561 REPO_GET_RELEASE_BY_TAG = "/repos/{owner}/{repo}/releases/tags/{tag}" 562 REPO_GET_COMMIT_STATUS = "/repos/{owner}/{repo}/statuses/{sha}" 563 REPO_GET_RAW_FILE = "/repos/{owner}/{repo}/raw/{path}" 564 REPO_GET_TREE = "/repos/{owner}/{repo}/git/trees/{ref}" 565 566 class ArchiveFormat(Enum): 567 """ 568 Archive formats for Repository.get_archive 569 """ 570 571 TAR = "tar.gz" 572 ZIP = "zip" 573 574 class CommitStatusSort(Enum): 575 """ 576 Sort order for Repository.get_commit_status 577 """ 578 579 OLDEST = "oldest" 580 RECENT_UPDATE = "recentupdate" 581 LEAST_UPDATE = "leastupdate" 582 LEAST_INDEX = "leastindex" 583 HIGHEST_INDEX = "highestindex" 584 585 def __init__(self, allspice_client): 586 super().__init__(allspice_client) 587 588 def __eq__(self, other): 589 if not isinstance(other, Repository): 590 return False 591 return self.owner == other.owner and self.name == other.name 592 593 def __hash__(self): 594 return hash(self.owner) ^ hash(self.name) 595 596 _fields_to_parsers: ClassVar[dict] = { 597 # dont know how to tell apart user and org as owner except form email being empty. 598 "owner": lambda allspice_client, r: ( 599 Organization.parse_response(allspice_client, r) 600 if r["email"] == "" 601 else User.parse_response(allspice_client, r) 602 ), 603 "updated_at": lambda _, t: Util.convert_time(t), 604 } 605 606 @classmethod 607 def request( 608 cls, 609 allspice_client, 610 owner: str, 611 name: str, 612 ) -> Repository: 613 return cls._request(allspice_client, {"owner": owner, "name": name}) 614 615 @classmethod 616 def search( 617 cls, 618 allspice_client, 619 query: Optional[str] = None, 620 topic: bool = False, 621 include_description: bool = False, 622 user: Optional[User] = None, 623 owner_to_prioritize: Union[User, Organization, None] = None, 624 ) -> list[Repository]: 625 """ 626 Search for repositories. 627 628 See https://hub.allspice.io/api/swagger#/repository/repoSearch 629 630 :param query: The query string to search for 631 :param topic: If true, the query string will only be matched against the 632 repository's topic. 633 :param include_description: If true, the query string will be matched 634 against the repository's description as well. 635 :param user: If specified, only repositories that this user owns or 636 contributes to will be searched. 637 :param owner_to_prioritize: If specified, repositories owned by the 638 given entity will be prioritized in the search. 639 :returns: All repositories matching the query. If there are many 640 repositories matching this query, this may take some time. 641 """ 642 643 params = {} 644 645 if query is not None: 646 params["q"] = query 647 if topic: 648 params["topic"] = topic 649 if include_description: 650 params["include_description"] = include_description 651 if user is not None: 652 params["user"] = user.id 653 if owner_to_prioritize is not None: 654 params["owner_to_prioritize"] = owner_to_prioritize.id 655 656 responses = allspice_client.requests_get_paginated(cls.REPO_SEARCH, params=params) 657 658 return [Repository.parse_response(allspice_client, response) for response in responses] 659 660 _patchable_fields: ClassVar[set[str]] = { 661 "allow_manual_merge", 662 "allow_merge_commits", 663 "allow_rebase", 664 "allow_rebase_explicit", 665 "allow_rebase_update", 666 "allow_squash_merge", 667 "archived", 668 "autodetect_manual_merge", 669 "default_branch", 670 "default_delete_branch_after_merge", 671 "default_merge_style", 672 "description", 673 "enable_prune", 674 "external_tracker", 675 "external_wiki", 676 "has_actions", 677 "has_issues", 678 "has_projects", 679 "has_pull_requests", 680 "has_wiki", 681 "ignore_whitespace_conflicts", 682 "internal_tracker", 683 "mirror_interval", 684 "name", 685 "private", 686 "template", 687 "website", 688 } 689 690 def commit(self): 691 args = {"owner": self.owner.username, "name": self.name} 692 self._commit(args) 693 694 def get_branches(self) -> List["Branch"]: 695 """Get all the Branches of this Repository.""" 696 697 results = self.allspice_client.requests_get_paginated( 698 Repository.REPO_BRANCHES % (self.owner.username, self.name) 699 ) 700 return [Branch.parse_response(self.allspice_client, result) for result in results] 701 702 def get_branch(self, name: str) -> "Branch": 703 """Get a specific Branch of this Repository.""" 704 result = self.allspice_client.requests_get( 705 Repository.REPO_BRANCH.format(owner=self.owner.username, repo=self.name, branch=name) 706 ) 707 return Branch.parse_response(self.allspice_client, result) 708 709 def add_branch(self, create_from: Ref, newname: str) -> "Branch": 710 """Add a branch to the repository""" 711 # Note: will only work with gitea 1.13 or higher! 712 713 ref_name = Util.data_params_for_ref(create_from) 714 if "ref" not in ref_name: 715 raise ValueError("create_from must be a Branch, Commit or string") 716 ref_name = ref_name["ref"] 717 718 data = {"new_branch_name": newname, "old_ref_name": ref_name} 719 result = self.allspice_client.requests_post( 720 Repository.REPO_BRANCHES % (self.owner.username, self.name), data=data 721 ) 722 return Branch.parse_response(self.allspice_client, result) 723 724 def get_issues( 725 self, 726 state: Literal["open", "closed", "all"] = "all", 727 search_query: Optional[str] = None, 728 labels: Optional[List[str]] = None, 729 milestones: Optional[List[Union[Milestone, str]]] = None, 730 assignee: Optional[Union[User, str]] = None, 731 since: Optional[datetime] = None, 732 before: Optional[datetime] = None, 733 ) -> List["Issue"]: 734 """ 735 Get all Issues of this Repository (open and closed) 736 737 https://hub.allspice.io/api/swagger#/repository/repoListIssues 738 739 All params of this method are optional filters. If you don't specify a filter, it 740 will not be applied. 741 742 :param state: The state of the Issues to get. If None, all Issues are returned. 743 :param search_query: Filter issues by text. This is equivalent to searching for 744 `search_query` in the Issues on the web interface. 745 :param labels: Filter issues by labels. 746 :param milestones: Filter issues by milestones. 747 :param assignee: Filter issues by the assigned user. 748 :param since: Filter issues by the date they were created. 749 :param before: Filter issues by the date they were created. 750 :return: A list of Issues. 751 """ 752 753 data = { 754 "state": state, 755 } 756 if search_query: 757 data["q"] = search_query 758 if labels: 759 data["labels"] = ",".join(labels) 760 if milestones: 761 data["milestone"] = ",".join( 762 [ 763 milestone.name if isinstance(milestone, Milestone) else milestone 764 for milestone in milestones 765 ] 766 ) 767 if assignee: 768 if isinstance(assignee, User): 769 data["assignee"] = assignee.username 770 else: 771 data["assignee"] = assignee 772 if since: 773 data["since"] = Util.format_time(since) 774 if before: 775 data["before"] = Util.format_time(before) 776 777 results = self.allspice_client.requests_get_paginated( 778 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 779 params=data, 780 ) 781 782 issues = [] 783 for result in results: 784 issue = Issue.parse_response(self.allspice_client, result) 785 # See Issue.request 786 setattr(issue, "_repository", self) 787 # This is mostly for compatibility with an older implementation 788 Issue._add_read_property("repo", self, issue) 789 issues.append(issue) 790 791 return issues 792 793 def get_design_reviews( 794 self, 795 state: Literal["open", "closed", "all"] = "all", 796 milestone: Optional[Union[Milestone, str]] = None, 797 labels: Optional[List[str]] = None, 798 ) -> List["DesignReview"]: 799 """ 800 Get all Design Reviews of this Repository. 801 802 https://hub.allspice.io/api/swagger#/repository/repoListPullRequests 803 804 :param state: The state of the Design Reviews to get. If None, all Design Reviews 805 are returned. 806 :param milestone: The milestone of the Design Reviews to get. 807 :param labels: A list of label IDs to filter DRs by. 808 :return: A list of Design Reviews. 809 """ 810 811 params = { 812 "state": state, 813 } 814 if milestone: 815 if isinstance(milestone, Milestone): 816 params["milestone"] = milestone.name 817 else: 818 params["milestone"] = milestone 819 if labels: 820 params["labels"] = ",".join(labels) 821 822 results = self.allspice_client.requests_get_paginated( 823 self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name), 824 params=params, 825 ) 826 return [DesignReview.parse_response(self.allspice_client, result) for result in results] 827 828 def get_commits( 829 self, 830 sha: Optional[str] = None, 831 path: Optional[str] = None, 832 stat: bool = True, 833 ) -> List["Commit"]: 834 """ 835 Get all the Commits of this Repository. 836 837 https://hub.allspice.io/api/swagger#/repository/repoGetAllCommits 838 839 :param sha: The SHA of the commit to start listing commits from. 840 :param path: filepath of a file/dir. 841 :param stat: Include the number of additions and deletions in the response. 842 Disable for speedup. 843 :return: A list of Commits. 844 """ 845 846 data = {} 847 if sha: 848 data["sha"] = sha 849 if path: 850 data["path"] = path 851 if not stat: 852 data["stat"] = False 853 854 try: 855 results = self.allspice_client.requests_get_paginated( 856 Repository.REPO_COMMITS % (self.owner.username, self.name), 857 params=data, 858 ) 859 except ConflictException as err: 860 logging.warning(err) 861 logging.warning("Repository %s/%s is Empty" % (self.owner.username, self.name)) 862 results = [] 863 return [Commit.parse_response(self.allspice_client, result) for result in results] 864 865 def get_issues_state(self, state) -> List["Issue"]: 866 """ 867 DEPRECATED: Use get_issues() instead. 868 869 Get issues of state Issue.open or Issue.closed of a repository. 870 """ 871 872 assert state in [Issue.OPENED, Issue.CLOSED] 873 issues = [] 874 data = {"state": state} 875 results = self.allspice_client.requests_get_paginated( 876 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 877 params=data, 878 ) 879 for result in results: 880 issue = Issue.parse_response(self.allspice_client, result) 881 # adding data not contained in the issue response 882 # See Issue.request() 883 setattr(issue, "_repository", self) 884 Issue._add_read_property("repo", self, issue) 885 Issue._add_read_property("owner", self.owner, issue) 886 issues.append(issue) 887 return issues 888 889 def get_times(self): 890 results = self.allspice_client.requests_get( 891 Repository.REPO_TIMES % (self.owner.username, self.name) 892 ) 893 return results 894 895 def get_user_time(self, username) -> float: 896 if isinstance(username, User): 897 username = username.username 898 results = self.allspice_client.requests_get( 899 Repository.REPO_USER_TIME % (self.owner.username, self.name, username) 900 ) 901 time = sum(r["time"] for r in results) 902 return time 903 904 def get_full_name(self) -> str: 905 return self.owner.username + "/" + self.name 906 907 def create_issue(self, title, assignees=frozenset(), description="") -> ApiObject: 908 data = { 909 "assignees": assignees, 910 "body": description, 911 "closed": False, 912 "title": title, 913 } 914 result = self.allspice_client.requests_post( 915 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 916 data=data, 917 ) 918 919 issue = Issue.parse_response(self.allspice_client, result) 920 setattr(issue, "_repository", self) 921 Issue._add_read_property("repo", self, issue) 922 return issue 923 924 def create_design_review( 925 self, 926 title: str, 927 head: Union[Branch, str], 928 base: Union[Branch, str], 929 assignees: Optional[Set[Union[User, str]]] = None, 930 body: Optional[str] = None, 931 due_date: Optional[datetime] = None, 932 milestone: Optional["Milestone"] = None, 933 ) -> "DesignReview": 934 """ 935 Create a new Design Review. 936 937 See https://hub.allspice.io/api/swagger#/repository/repoCreatePullRequest 938 939 :param title: Title of the Design Review 940 :param head: Branch or name of the branch to merge into the base branch 941 :param base: Branch or name of the branch to merge into 942 :param assignees: Optional. A list of users to assign this review. List can be of 943 User objects or of usernames. 944 :param body: An Optional Description for the Design Review. 945 :param due_date: An Optional Due date for the Design Review. 946 :param milestone: An Optional Milestone for the Design Review 947 :return: The created Design Review 948 """ 949 950 data: dict[str, Any] = { 951 "title": title, 952 } 953 954 if isinstance(head, Branch): 955 data["head"] = head.name 956 else: 957 data["head"] = head 958 if isinstance(base, Branch): 959 data["base"] = base.name 960 else: 961 data["base"] = base 962 if assignees: 963 data["assignees"] = [a.username if isinstance(a, User) else a for a in assignees] 964 if body: 965 data["body"] = body 966 if due_date: 967 data["due_date"] = Util.format_time(due_date) 968 if milestone: 969 data["milestone"] = milestone.id 970 971 result = self.allspice_client.requests_post( 972 self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name), 973 data=data, 974 ) 975 976 return DesignReview.parse_response(self.allspice_client, result) 977 978 def create_milestone( 979 self, 980 title: str, 981 description: str, 982 due_date: Optional[str] = None, 983 state: str = "open", 984 ) -> "Milestone": 985 url = Repository.REPO_MILESTONES.format(owner=self.owner.username, repo=self.name) 986 data = {"title": title, "description": description, "state": state} 987 if due_date: 988 data["due_date"] = due_date 989 result = self.allspice_client.requests_post(url, data=data) 990 return Milestone.parse_response(self.allspice_client, result) 991 992 def create_gitea_hook(self, hook_url: str, events: List[str]): 993 url = f"/repos/{self.owner.username}/{self.name}/hooks" 994 data = { 995 "type": "gitea", 996 "config": {"content_type": "json", "url": hook_url}, 997 "events": events, 998 "active": True, 999 } 1000 return self.allspice_client.requests_post(url, data=data) 1001 1002 def list_hooks(self): 1003 url = f"/repos/{self.owner.username}/{self.name}/hooks" 1004 return self.allspice_client.requests_get(url) 1005 1006 def delete_hook(self, id: str): 1007 url = f"/repos/{self.owner.username}/{self.name}/hooks/{id}" 1008 self.allspice_client.requests_delete(url) 1009 1010 def is_collaborator(self, username) -> bool: 1011 if isinstance(username, User): 1012 username = username.username 1013 try: 1014 # returns 204 if its ok, 404 if its not 1015 self.allspice_client.requests_get( 1016 Repository.REPO_IS_COLLABORATOR % (self.owner.username, self.name, username) 1017 ) 1018 return True 1019 except Exception: 1020 return False 1021 1022 def get_users_with_access(self) -> Sequence[User]: 1023 url = f"/repos/{self.owner.username}/{self.name}/collaborators" 1024 response = self.allspice_client.requests_get(url) 1025 collabs = [User.parse_response(self.allspice_client, user) for user in response] 1026 if isinstance(self.owner, User): 1027 return [*collabs, self.owner] 1028 else: 1029 # owner must be org 1030 teams = self.owner.get_teams() 1031 for team in teams: 1032 team_repos = team.get_repos() 1033 if self.name in [n.name for n in team_repos]: 1034 collabs += team.get_members() 1035 return collabs 1036 1037 def remove_collaborator(self, user_name: str): 1038 url = f"/repos/{self.owner.username}/{self.name}/collaborators/{user_name}" 1039 self.allspice_client.requests_delete(url) 1040 1041 def transfer_ownership( 1042 self, 1043 new_owner: Union[User, Organization], 1044 new_teams: Set[Team] | FrozenSet[Team] = frozenset(), 1045 ): 1046 url = Repository.REPO_TRANSFER.format(owner=self.owner.username, repo=self.name) 1047 data: dict[str, Any] = {"new_owner": new_owner.username} 1048 if isinstance(new_owner, Organization): 1049 new_team_ids = [team.id for team in new_teams if team in new_owner.get_teams()] 1050 data["team_ids"] = new_team_ids 1051 self.allspice_client.requests_post(url, data=data) 1052 # TODO: make sure this instance is either updated or discarded 1053 1054 def get_git_content( 1055 self, 1056 ref: Optional["Ref"] = None, 1057 commit: "Optional[Commit]" = None, 1058 ) -> List[Content]: 1059 """ 1060 Get the metadata for all files in the root directory. 1061 1062 https://hub.allspice.io/api/swagger#/repository/repoGetContentsList 1063 1064 :param ref: branch or commit to get content from 1065 :param commit: commit to get content from (deprecated) 1066 """ 1067 url = f"/repos/{self.owner.username}/{self.name}/contents" 1068 data = Util.data_params_for_ref(ref or commit) 1069 1070 result = [ 1071 Content.parse_response(self.allspice_client, f) 1072 for f in self.allspice_client.requests_get(url, data) 1073 ] 1074 return result 1075 1076 def get_tree(self, ref: Optional[Ref] = None, recursive: bool = False) -> List[GitEntry]: 1077 """ 1078 Get the repository's tree on a given ref. 1079 1080 By default, this will only return the top-level entries in the tree. If you want 1081 to get the entire tree, set `recursive` to True. 1082 1083 :param ref: The ref to get the tree from. If not provided, the default branch is used. 1084 :param recursive: Whether to get the entire tree or just the top-level entries. 1085 """ 1086 1087 ref = Util.data_params_for_ref(ref).get("ref", self.default_branch) 1088 url = self.REPO_GET_TREE.format(owner=self.owner.username, repo=self.name, ref=ref) 1089 params = {"recursive": recursive} 1090 results = self.allspice_client.requests_get_paginated(url, params=params) 1091 return [GitEntry.parse_response(self.allspice_client, result) for result in results] 1092 1093 def get_file_content( 1094 self, 1095 content: Content, 1096 ref: Optional[Ref] = None, 1097 commit: Optional[Commit] = None, 1098 ) -> Union[str, List["Content"]]: 1099 """https://hub.allspice.io/api/swagger#/repository/repoGetContents""" 1100 url = f"/repos/{self.owner.username}/{self.name}/contents/{content.path}" 1101 data = Util.data_params_for_ref(ref or commit) 1102 1103 if content.type == Content.FILE: 1104 return self.allspice_client.requests_get(url, data)["content"] 1105 else: 1106 return [ 1107 Content.parse_response(self.allspice_client, f) 1108 for f in self.allspice_client.requests_get(url, data) 1109 ] 1110 1111 def get_raw_file( 1112 self, 1113 file_path: str, 1114 ref: Optional[Ref] = None, 1115 ) -> bytes: 1116 """ 1117 Get the raw, binary data of a single file. 1118 1119 Note 1: if the file you are requesting is a text file, you might want to 1120 use .decode() on the result to get a string. For example: 1121 1122 content = repo.get_raw_file("file.txt").decode("utf-8") 1123 1124 Note 2: this method will store the entire file in memory. If you want 1125 to download a large file, you might want to use `download_to_file` 1126 instead. 1127 1128 See https://hub.allspice.io/api/swagger#/repository/repoGetRawFile 1129 1130 :param file_path: The path to the file to get. 1131 :param ref: The branch or commit to get the file from. If not provided, 1132 the default branch is used. 1133 """ 1134 1135 url = self.REPO_GET_RAW_FILE.format( 1136 owner=self.owner.username, 1137 repo=self.name, 1138 path=file_path, 1139 ) 1140 params = Util.data_params_for_ref(ref) 1141 return self.allspice_client.requests_get_raw(url, params=params) 1142 1143 def download_to_file( 1144 self, 1145 file_path: str, 1146 io: IO, 1147 ref: Optional[Ref] = None, 1148 ) -> None: 1149 """ 1150 Download the binary data of a file to a file-like object. 1151 1152 Example: 1153 1154 with open("schematic.DSN", "wb") as f: 1155 Repository.download_to_file("Schematics/my_schematic.DSN", f) 1156 1157 :param file_path: The path to the file in the repository from the root 1158 of the repository. 1159 :param io: The file-like object to write the data to. 1160 """ 1161 1162 url = self.allspice_client._AllSpice__get_url( 1163 self.REPO_GET_RAW_FILE.format( 1164 owner=self.owner.username, 1165 repo=self.name, 1166 path=file_path, 1167 ) 1168 ) 1169 params = Util.data_params_for_ref(ref) 1170 response = self.allspice_client.requests.get( 1171 url, 1172 params=params, 1173 headers=self.allspice_client.headers, 1174 stream=True, 1175 ) 1176 1177 for chunk in response.iter_content(chunk_size=4096): 1178 if chunk: 1179 io.write(chunk) 1180 1181 def get_generated_json(self, content: Union[Content, str], ref: Optional[Ref] = None) -> dict: 1182 """ 1183 Get the json blob for a cad file if it exists, otherwise enqueue 1184 a new job and return a 503 status. 1185 1186 WARNING: This is still experimental and not recommended for critical 1187 applications. The structure and content of the returned dictionary can 1188 change at any time. 1189 1190 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON 1191 """ 1192 1193 if isinstance(content, Content): 1194 content = content.path 1195 1196 url = self.REPO_GET_ALLSPICE_JSON.format( 1197 owner=self.owner.username, 1198 repo=self.name, 1199 content=content, 1200 ) 1201 data = Util.data_params_for_ref(ref) 1202 return self.allspice_client.requests_get(url, data) 1203 1204 def get_generated_svg(self, content: Union[Content, str], ref: Optional[Ref] = None) -> bytes: 1205 """ 1206 Get the svg blob for a cad file if it exists, otherwise enqueue 1207 a new job and return a 503 status. 1208 1209 WARNING: This is still experimental and not yet recommended for 1210 critical applications. The content of the returned svg can change 1211 at any time. 1212 1213 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG 1214 """ 1215 1216 if isinstance(content, Content): 1217 content = content.path 1218 1219 url = self.REPO_GET_ALLSPICE_SVG.format( 1220 owner=self.owner.username, 1221 repo=self.name, 1222 content=content, 1223 ) 1224 data = Util.data_params_for_ref(ref) 1225 return self.allspice_client.requests_get_raw(url, data) 1226 1227 def create_file(self, file_path: str, content: str, data: Optional[dict] = None): 1228 """https://hub.allspice.io/api/swagger#/repository/repoCreateFile""" 1229 if not data: 1230 data = {} 1231 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1232 data.update({"content": content}) 1233 return self.allspice_client.requests_post(url, data) 1234 1235 def change_file(self, file_path: str, file_sha: str, content: str, data: Optional[dict] = None): 1236 """https://hub.allspice.io/api/swagger#/repository/repoCreateFile""" 1237 if not data: 1238 data = {} 1239 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1240 data.update({"sha": file_sha, "content": content}) 1241 return self.allspice_client.requests_put(url, data) 1242 1243 def delete_file(self, file_path: str, file_sha: str, data: Optional[dict] = None): 1244 """https://hub.allspice.io/api/swagger#/repository/repoDeleteFile""" 1245 if not data: 1246 data = {} 1247 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1248 data.update({"sha": file_sha}) 1249 return self.allspice_client.requests_delete(url, data) 1250 1251 def get_archive( 1252 self, 1253 ref: Ref = "main", 1254 archive_format: ArchiveFormat = ArchiveFormat.ZIP, 1255 ) -> bytes: 1256 """ 1257 Download all the files in a specific ref of a repository as a zip or tarball 1258 archive. 1259 1260 https://hub.allspice.io/api/swagger#/repository/repoGetArchive 1261 1262 :param ref: branch or commit to get content from, defaults to the "main" branch 1263 :param archive_format: zip or tar, defaults to zip 1264 """ 1265 1266 ref_string = Util.data_params_for_ref(ref)["ref"] 1267 url = self.REPO_GET_ARCHIVE.format( 1268 owner=self.owner.username, 1269 repo=self.name, 1270 ref=ref_string, 1271 format=archive_format.value, 1272 ) 1273 return self.allspice_client.requests_get_raw(url) 1274 1275 def get_topics(self) -> list[str]: 1276 """ 1277 Gets the list of topics on this repository. 1278 1279 See http://localhost:3000/api/swagger#/repository/repoListTopics 1280 """ 1281 1282 url = self.REPO_GET_TOPICS.format( 1283 owner=self.owner.username, 1284 repo=self.name, 1285 ) 1286 return self.allspice_client.requests_get(url)["topics"] 1287 1288 def add_topic(self, topic: str): 1289 """ 1290 Adds a topic to the repository. 1291 1292 See https://hub.allspice.io/api/swagger#/repository/repoAddTopic 1293 1294 :param topic: The topic to add. Topic names must consist only of 1295 lowercase letters, numnbers and dashes (-), and cannot start with 1296 dashes. Topic names also must be under 35 characters long. 1297 """ 1298 1299 url = self.REPO_ADD_TOPIC.format(owner=self.owner.username, repo=self.name, topic=topic) 1300 self.allspice_client.requests_put(url) 1301 1302 def create_release( 1303 self, 1304 tag_name: str, 1305 name: Optional[str] = None, 1306 body: Optional[str] = None, 1307 draft: bool = False, 1308 ): 1309 """ 1310 Create a release for this repository. The release will be created for 1311 the tag with the given name. If there is no tag with this name, create 1312 the tag first. 1313 1314 See https://hub.allspice.io/api/swagger#/repository/repoCreateRelease 1315 """ 1316 1317 url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name) 1318 data = { 1319 "tag_name": tag_name, 1320 "draft": draft, 1321 } 1322 if name is not None: 1323 data["name"] = name 1324 if body is not None: 1325 data["body"] = body 1326 response = self.allspice_client.requests_post(url, data) 1327 return Release.parse_response(self.allspice_client, response, self) 1328 1329 def get_releases( 1330 self, draft: Optional[bool] = None, pre_release: Optional[bool] = None 1331 ) -> List[Release]: 1332 """ 1333 Get the list of releases for this repository. 1334 1335 See https://hub.allspice.io/api/swagger#/repository/repoListReleases 1336 """ 1337 1338 data = {} 1339 1340 if draft is not None: 1341 data["draft"] = draft 1342 if pre_release is not None: 1343 data["pre-release"] = pre_release 1344 1345 url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name) 1346 responses = self.allspice_client.requests_get_paginated(url, params=data) 1347 1348 return [ 1349 Release.parse_response(self.allspice_client, response, self) for response in responses 1350 ] 1351 1352 def get_latest_release(self) -> Release: 1353 """ 1354 Get the latest release for this repository. 1355 1356 See https://hub.allspice.io/api/swagger#/repository/repoGetLatestRelease 1357 """ 1358 1359 url = self.REPO_GET_LATEST_RELEASE.format(owner=self.owner.username, repo=self.name) 1360 response = self.allspice_client.requests_get(url) 1361 release = Release.parse_response(self.allspice_client, response, self) 1362 return release 1363 1364 def get_release_by_tag(self, tag: str) -> Release: 1365 """ 1366 Get a release by its tag. 1367 1368 See https://hub.allspice.io/api/swagger#/repository/repoGetReleaseByTag 1369 """ 1370 1371 url = self.REPO_GET_RELEASE_BY_TAG.format( 1372 owner=self.owner.username, repo=self.name, tag=tag 1373 ) 1374 response = self.allspice_client.requests_get(url) 1375 release = Release.parse_response(self.allspice_client, response, self) 1376 return release 1377 1378 def get_commit_statuses( 1379 self, 1380 commit: Union[str, Commit], 1381 sort: Optional[CommitStatusSort] = None, 1382 state: Optional[CommitStatusState] = None, 1383 ) -> List[CommitStatus]: 1384 """ 1385 Get a list of statuses for a commit. 1386 1387 This is roughly equivalent to the Commit.get_statuses method, but this 1388 method allows you to sort and filter commits and is more convenient if 1389 you have a commit SHA and don't need to get the commit itself. 1390 1391 See https://hub.allspice.io/api/swagger#/repository/repoListStatuses 1392 """ 1393 1394 if isinstance(commit, Commit): 1395 commit = commit.sha 1396 1397 params = {} 1398 if sort is not None: 1399 params["sort"] = sort.value 1400 if state is not None: 1401 params["state"] = state.value 1402 1403 url = self.REPO_GET_COMMIT_STATUS.format( 1404 owner=self.owner.username, repo=self.name, sha=commit 1405 ) 1406 response = self.allspice_client.requests_get_paginated(url, params=params) 1407 return [CommitStatus.parse_response(self.allspice_client, status) for status in response] 1408 1409 def create_commit_status( 1410 self, 1411 commit: Union[str, Commit], 1412 context: Optional[str] = None, 1413 description: Optional[str] = None, 1414 state: Optional[CommitStatusState] = None, 1415 target_url: Optional[str] = None, 1416 ) -> CommitStatus: 1417 """ 1418 Create a status on a commit. 1419 1420 See https://hub.allspice.io/api/swagger#/repository/repoCreateStatus 1421 """ 1422 1423 if isinstance(commit, Commit): 1424 commit = commit.sha 1425 1426 data = {} 1427 if context is not None: 1428 data["context"] = context 1429 if description is not None: 1430 data["description"] = description 1431 if state is not None: 1432 data["state"] = state.value 1433 if target_url is not None: 1434 data["target_url"] = target_url 1435 1436 url = self.REPO_GET_COMMIT_STATUS.format( 1437 owner=self.owner.username, repo=self.name, sha=commit 1438 ) 1439 response = self.allspice_client.requests_post(url, data=data) 1440 return CommitStatus.parse_response(self.allspice_client, response) 1441 1442 def delete(self): 1443 self.allspice_client.requests_delete( 1444 Repository.REPO_DELETE % (self.owner.username, self.name) 1445 ) 1446 self.deleted = True 1447 1448 1449class Milestone(ApiObject): 1450 allow_merge_commits: Any 1451 allow_rebase: Any 1452 allow_rebase_explicit: Any 1453 allow_squash_merge: Any 1454 archived: Any 1455 closed_at: Any 1456 closed_issues: int 1457 created_at: str 1458 default_branch: Any 1459 description: str 1460 due_on: Any 1461 has_issues: Any 1462 has_pull_requests: Any 1463 has_wiki: Any 1464 id: int 1465 ignore_whitespace_conflicts: Any 1466 name: Any 1467 open_issues: int 1468 private: Any 1469 state: str 1470 title: str 1471 updated_at: str 1472 website: Any 1473 1474 API_OBJECT = """/repos/{owner}/{repo}/milestones/{number}""" # <owner, repo> 1475 1476 def __init__(self, allspice_client): 1477 super().__init__(allspice_client) 1478 1479 def __eq__(self, other): 1480 if not isinstance(other, Milestone): 1481 return False 1482 return self.allspice_client == other.allspice_client and self.id == other.id 1483 1484 def __hash__(self): 1485 return hash(self.allspice_client) ^ hash(self.id) 1486 1487 _fields_to_parsers: ClassVar[dict] = { 1488 "closed_at": lambda _, t: Util.convert_time(t), 1489 "due_on": lambda _, t: Util.convert_time(t), 1490 } 1491 1492 _patchable_fields: ClassVar[set[str]] = { 1493 "allow_merge_commits", 1494 "allow_rebase", 1495 "allow_rebase_explicit", 1496 "allow_squash_merge", 1497 "archived", 1498 "default_branch", 1499 "description", 1500 "has_issues", 1501 "has_pull_requests", 1502 "has_wiki", 1503 "ignore_whitespace_conflicts", 1504 "name", 1505 "private", 1506 "website", 1507 } 1508 1509 @classmethod 1510 def request(cls, allspice_client, owner: str, repo: str, number: str): 1511 return cls._request(allspice_client, {"owner": owner, "repo": repo, "number": number}) 1512 1513 1514class Attachment(ReadonlyApiObject): 1515 """ 1516 An asset attached to a comment. 1517 1518 You cannot edit or delete the attachment from this object - see the instance methods 1519 Comment.edit_attachment and delete_attachment for that. 1520 """ 1521 1522 browser_download_url: str 1523 created_at: str 1524 download_count: int 1525 id: int 1526 name: str 1527 size: int 1528 uuid: str 1529 1530 def __init__(self, allspice_client): 1531 super().__init__(allspice_client) 1532 1533 def __eq__(self, other): 1534 if not isinstance(other, Attachment): 1535 return False 1536 1537 return self.uuid == other.uuid 1538 1539 def __hash__(self): 1540 return hash(self.uuid) 1541 1542 def download_to_file(self, io: IO): 1543 """ 1544 Download the raw, binary data of this Attachment to a file-like object. 1545 1546 Example: 1547 1548 with open("my_file.zip", "wb") as f: 1549 attachment.download_to_file(f) 1550 1551 :param io: The file-like object to write the data to. 1552 """ 1553 1554 response = self.allspice_client.requests.get( 1555 self.browser_download_url, 1556 headers=self.allspice_client.headers, 1557 stream=True, 1558 ) 1559 # 4kb chunks 1560 for chunk in response.iter_content(chunk_size=4096): 1561 if chunk: 1562 io.write(chunk) 1563 1564 1565class Comment(ApiObject): 1566 assets: List[Union[Any, Dict[str, Union[int, str]]]] 1567 body: str 1568 created_at: datetime 1569 html_url: str 1570 id: int 1571 issue_url: str 1572 original_author: str 1573 original_author_id: int 1574 pull_request_url: str 1575 updated_at: datetime 1576 user: User 1577 1578 API_OBJECT = """/repos/{owner}/{repo}/issues/comments/{id}""" 1579 GET_ATTACHMENTS_PATH = """/repos/{owner}/{repo}/issues/comments/{id}/assets""" 1580 ATTACHMENT_PATH = """/repos/{owner}/{repo}/issues/comments/{id}/assets/{attachment_id}""" 1581 1582 def __init__(self, allspice_client): 1583 super().__init__(allspice_client) 1584 1585 def __eq__(self, other): 1586 if not isinstance(other, Comment): 1587 return False 1588 return self.repository == other.repository and self.id == other.id 1589 1590 def __hash__(self): 1591 return hash(self.repository) ^ hash(self.id) 1592 1593 @classmethod 1594 def request(cls, allspice_client, owner: str, repo: str, id: str) -> "Comment": 1595 return cls._request(allspice_client, {"owner": owner, "repo": repo, "id": id}) 1596 1597 _fields_to_parsers: ClassVar[dict] = { 1598 "user": lambda allspice_client, r: User.parse_response(allspice_client, r), 1599 "created_at": lambda _, t: Util.convert_time(t), 1600 "updated_at": lambda _, t: Util.convert_time(t), 1601 } 1602 1603 _patchable_fields: ClassVar[set[str]] = {"body"} 1604 1605 @property 1606 def parent_url(self) -> str: 1607 """URL of the parent of this comment (the issue or the pull request)""" 1608 1609 if self.issue_url is not None and self.issue_url != "": 1610 return self.issue_url 1611 else: 1612 return self.pull_request_url 1613 1614 @cached_property 1615 def repository(self) -> Repository: 1616 """The repository this comment was posted on.""" 1617 1618 owner_name, repo_name = self.parent_url.split("/")[-4:-2] 1619 return Repository.request(self.allspice_client, owner_name, repo_name) 1620 1621 def __fields_for_path(self): 1622 return { 1623 "owner": self.repository.owner.username, 1624 "repo": self.repository.name, 1625 "id": self.id, 1626 } 1627 1628 def commit(self): 1629 self._commit(self.__fields_for_path()) 1630 1631 def delete(self): 1632 self.allspice_client.requests_delete(self.API_OBJECT.format(**self.__fields_for_path())) 1633 self.deleted = True 1634 1635 def get_attachments(self) -> List[Attachment]: 1636 """ 1637 Get all attachments on this comment. This returns Attachment objects, which 1638 contain a link to download the attachment. 1639 1640 https://hub.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments 1641 """ 1642 1643 results = self.allspice_client.requests_get( 1644 self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()) 1645 ) 1646 return [Attachment.parse_response(self.allspice_client, result) for result in results] 1647 1648 def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment: 1649 """ 1650 Create an attachment on this comment. 1651 1652 https://hub.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment 1653 1654 :param file: The file to attach. This should be a file-like object. 1655 :param name: The name of the file. If not provided, the name of the file will be 1656 used. 1657 :return: The created attachment. 1658 """ 1659 1660 args: dict[str, Any] = { 1661 "files": {"attachment": file}, 1662 } 1663 if name is not None: 1664 args["params"] = {"name": name} 1665 1666 result = self.allspice_client.requests_post( 1667 self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()), 1668 **args, 1669 ) 1670 return Attachment.parse_response(self.allspice_client, result) 1671 1672 def edit_attachment(self, attachment: Attachment, data: dict) -> Attachment: 1673 """ 1674 Edit an attachment. 1675 1676 The list of params that can be edited is available at 1677 https://hub.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment 1678 1679 :param attachment: The attachment to be edited 1680 :param data: The data parameter should be a dictionary of the fields to edit. 1681 :return: The edited attachment 1682 """ 1683 1684 args = { 1685 **self.__fields_for_path(), 1686 "attachment_id": attachment.id, 1687 } 1688 result = self.allspice_client.requests_patch( 1689 self.ATTACHMENT_PATH.format(**args), 1690 data=data, 1691 ) 1692 return Attachment.parse_response(self.allspice_client, result) 1693 1694 def delete_attachment(self, attachment: Attachment): 1695 """https://hub.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment""" 1696 1697 args = { 1698 **self.__fields_for_path(), 1699 "attachment_id": attachment.id, 1700 } 1701 self.allspice_client.requests_delete(self.ATTACHMENT_PATH.format(**args)) 1702 attachment.deleted = True 1703 1704 1705class Commit(ReadonlyApiObject): 1706 author: User 1707 commit: Dict[str, Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]] 1708 committer: Dict[str, Union[int, str, bool]] 1709 created: str 1710 files: List[Dict[str, str]] 1711 html_url: str 1712 inner_commit: Dict[str, Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]] 1713 parents: List[Union[Dict[str, str], Any]] 1714 sha: str 1715 stats: Dict[str, int] 1716 url: str 1717 1718 API_OBJECT = """/repos/{owner}/{repo}/commits/{sha}""" 1719 COMMIT_GET_STATUS = """/repos/{owner}/{repo}/commits/{sha}/status""" 1720 COMMIT_GET_STATUSES = """/repos/{owner}/{repo}/commits/{sha}/statuses""" 1721 1722 # Regex to extract owner and repo names from the url property 1723 URL_REGEXP = re.compile(r"/repos/([^/]+)/([^/]+)/git/commits") 1724 1725 def __init__(self, allspice_client): 1726 super().__init__(allspice_client) 1727 1728 _fields_to_parsers: ClassVar[dict] = { 1729 # NOTE: api may return None for commiters that are no allspice users 1730 "author": lambda allspice_client, u: ( 1731 User.parse_response(allspice_client, u) if u else None 1732 ) 1733 } 1734 1735 def __eq__(self, other): 1736 if not isinstance(other, Commit): 1737 return False 1738 return self.sha == other.sha 1739 1740 def __hash__(self): 1741 return hash(self.sha) 1742 1743 @classmethod 1744 def parse_response(cls, allspice_client, result) -> "Commit": 1745 commit_cache = result["commit"] 1746 api_object = cls(allspice_client) 1747 cls._initialize(allspice_client, api_object, result) 1748 # inner_commit for legacy reasons 1749 Commit._add_read_property("inner_commit", commit_cache, api_object) 1750 return api_object 1751 1752 def get_status(self) -> CommitCombinedStatus: 1753 """ 1754 Get a combined status consisting of all statues on this commit. 1755 1756 Note that the returned object is a CommitCombinedStatus object, which 1757 also contains a list of all statuses on the commit. 1758 1759 https://hub.allspice.io/api/swagger#/repository/repoGetCommitStatus 1760 """ 1761 1762 result = self.allspice_client.requests_get( 1763 self.COMMIT_GET_STATUS.format(**self._fields_for_path) 1764 ) 1765 return CommitCombinedStatus.parse_response(self.allspice_client, result) 1766 1767 def get_statuses(self) -> List[CommitStatus]: 1768 """ 1769 Get a list of all statuses on this commit. 1770 1771 https://hub.allspice.io/api/swagger#/repository/repoListCommitStatuses 1772 """ 1773 1774 results = self.allspice_client.requests_get( 1775 self.COMMIT_GET_STATUSES.format(**self._fields_for_path) 1776 ) 1777 return [CommitStatus.parse_response(self.allspice_client, result) for result in results] 1778 1779 @cached_property 1780 def _fields_for_path(self) -> dict[str, str]: 1781 matches = self.URL_REGEXP.search(self.url) 1782 if not matches: 1783 raise ValueError(f"Invalid commit URL: {self.url}") 1784 1785 return { 1786 "owner": matches.group(1), 1787 "repo": matches.group(2), 1788 "sha": self.sha, 1789 } 1790 1791 1792class CommitStatusState(Enum): 1793 PENDING = "pending" 1794 SUCCESS = "success" 1795 ERROR = "error" 1796 FAILURE = "failure" 1797 WARNING = "warning" 1798 1799 @classmethod 1800 def try_init(cls, value: str) -> CommitStatusState | str: 1801 """ 1802 Try converting a string to the enum, and if that fails, return the 1803 string itself. 1804 """ 1805 1806 try: 1807 return cls(value) 1808 except ValueError: 1809 return value 1810 1811 1812class CommitStatus(ReadonlyApiObject): 1813 context: str 1814 created_at: str 1815 creator: User 1816 description: str 1817 id: int 1818 status: CommitStatusState 1819 target_url: str 1820 updated_at: str 1821 url: str 1822 1823 def __init__(self, allspice_client): 1824 super().__init__(allspice_client) 1825 1826 _fields_to_parsers: ClassVar[dict] = { 1827 # Gitea/ASH doesn't actually validate that the status is a "valid" 1828 # status, so we can expect empty or unknown strings in the status field. 1829 "status": lambda _, s: CommitStatusState.try_init(s), 1830 "creator": lambda allspice_client, u: ( 1831 User.parse_response(allspice_client, u) if u else None 1832 ), 1833 } 1834 1835 def __eq__(self, other): 1836 if not isinstance(other, CommitStatus): 1837 return False 1838 return self.id == other.id 1839 1840 def __hash__(self): 1841 return hash(self.id) 1842 1843 1844class CommitCombinedStatus(ReadonlyApiObject): 1845 commit_url: str 1846 repository: Repository 1847 sha: str 1848 state: CommitStatusState 1849 statuses: List["CommitStatus"] 1850 total_count: int 1851 url: str 1852 1853 def __init__(self, allspice_client): 1854 super().__init__(allspice_client) 1855 1856 _fields_to_parsers: ClassVar[dict] = { 1857 # See CommitStatus 1858 "state": lambda _, s: CommitStatusState.try_init(s), 1859 "statuses": lambda allspice_client, statuses: [ 1860 CommitStatus.parse_response(allspice_client, status) for status in statuses 1861 ], 1862 "repository": lambda allspice_client, r: Repository.parse_response(allspice_client, r), 1863 } 1864 1865 def __eq__(self, other): 1866 if not isinstance(other, CommitCombinedStatus): 1867 return False 1868 return self.sha == other.sha 1869 1870 def __hash__(self): 1871 return hash(self.sha) 1872 1873 @classmethod 1874 def parse_response(cls, allspice_client, result) -> "CommitCombinedStatus": 1875 api_object = cls(allspice_client) 1876 cls._initialize(allspice_client, api_object, result) 1877 return api_object 1878 1879 1880class Issue(ApiObject): 1881 """ 1882 An issue on a repository. 1883 1884 Note: `Issue.assets` may not have any entries even if the issue has 1885 attachments. This happens when an issue is fetched via a bulk method like 1886 `Repository.get_issues`. In most cases, prefer using 1887 `Issue.get_attachments` to get the attachments on an issue. 1888 """ 1889 1890 assets: List[Union[Any, "Attachment"]] 1891 assignee: Any 1892 assignees: Any 1893 body: str 1894 closed_at: Any 1895 comments: int 1896 created_at: str 1897 due_date: Any 1898 html_url: str 1899 id: int 1900 is_locked: bool 1901 labels: List[Any] 1902 milestone: Optional["Milestone"] 1903 number: int 1904 original_author: str 1905 original_author_id: int 1906 pin_order: int 1907 pull_request: Any 1908 ref: str 1909 repository: Dict[str, Union[int, str]] 1910 state: str 1911 title: str 1912 updated_at: str 1913 url: str 1914 user: User 1915 1916 API_OBJECT = """/repos/{owner}/{repo}/issues/{index}""" # <owner, repo, index> 1917 GET_TIME = """/repos/%s/%s/issues/%s/times""" # <owner, repo, index> 1918 GET_COMMENTS = """/repos/{owner}/{repo}/issues/{index}/comments""" 1919 CREATE_ISSUE = """/repos/{owner}/{repo}/issues""" 1920 GET_ATTACHMENTS = """/repos/{owner}/{repo}/issues/{index}/assets""" 1921 1922 OPENED = "open" 1923 CLOSED = "closed" 1924 1925 def __init__(self, allspice_client): 1926 super().__init__(allspice_client) 1927 1928 def __eq__(self, other): 1929 if not isinstance(other, Issue): 1930 return False 1931 return self.repository == other.repository and self.id == other.id 1932 1933 def __hash__(self): 1934 return hash(self.repository) ^ hash(self.id) 1935 1936 _fields_to_parsers: ClassVar[dict] = { 1937 "milestone": lambda allspice_client, m: Milestone.parse_response(allspice_client, m), 1938 "user": lambda allspice_client, u: User.parse_response(allspice_client, u), 1939 "assets": lambda allspice_client, assets: [ 1940 Attachment.parse_response(allspice_client, a) for a in assets 1941 ], 1942 "assignee": lambda allspice_client, u: User.parse_response(allspice_client, u), 1943 "assignees": lambda allspice_client, us: [ 1944 User.parse_response(allspice_client, u) for u in us 1945 ], 1946 "state": lambda _, s: (Issue.CLOSED if s == "closed" else Issue.OPENED), 1947 } 1948 1949 _parsers_to_fields: ClassVar[dict] = { 1950 "milestone": lambda m: m.id, 1951 } 1952 1953 _patchable_fields: ClassVar[set[str]] = { 1954 "assignee", 1955 "assignees", 1956 "body", 1957 "due_date", 1958 "milestone", 1959 "state", 1960 "title", 1961 } 1962 1963 def commit(self): 1964 args = { 1965 "owner": self.repository.owner.username, 1966 "repo": self.repository.name, 1967 "index": self.number, 1968 } 1969 self._commit(args) 1970 1971 @classmethod 1972 def request(cls, allspice_client, owner: str, repo: str, number: str): 1973 api_object = cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number}) 1974 # The repository in the response is a RepositoryMeta object, so request 1975 # the full repository object and add it to the issue object. 1976 repository = Repository.request(allspice_client, owner, repo) 1977 setattr(api_object, "_repository", repository) 1978 # For legacy reasons 1979 cls._add_read_property("repo", repository, api_object) 1980 return api_object 1981 1982 @classmethod 1983 def create_issue(cls, allspice_client, repo: Repository, title: str, body: str = ""): 1984 args = {"owner": repo.owner.username, "repo": repo.name} 1985 data = {"title": title, "body": body} 1986 result = allspice_client.requests_post(Issue.CREATE_ISSUE.format(**args), data=data) 1987 issue = Issue.parse_response(allspice_client, result) 1988 setattr(issue, "_repository", repo) 1989 cls._add_read_property("repo", repo, issue) 1990 return issue 1991 1992 @property 1993 def owner(self) -> Organization | User: 1994 return self.repository.owner 1995 1996 def get_time_sum(self, user: User) -> int: 1997 results = self.allspice_client.requests_get( 1998 Issue.GET_TIME % (self.owner.username, self.repository.name, self.number) 1999 ) 2000 return sum(result["time"] for result in results if result and result["user_id"] == user.id) 2001 2002 def get_times(self) -> Optional[Dict]: 2003 return self.allspice_client.requests_get( 2004 Issue.GET_TIME % (self.owner.username, self.repository.name, self.number) 2005 ) 2006 2007 def delete_time(self, time_id: str): 2008 path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times/{time_id}" 2009 self.allspice_client.requests_delete(path) 2010 2011 def add_time(self, time: int, created: Optional[str] = None, user_name: Optional[User] = None): 2012 path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times" 2013 self.allspice_client.requests_post( 2014 path, data={"created": created, "time": int(time), "user_name": user_name} 2015 ) 2016 2017 def get_comments(self) -> List[Comment]: 2018 """https://hub.allspice.io/api/swagger#/issue/issueGetComments""" 2019 2020 results = self.allspice_client.requests_get( 2021 self.GET_COMMENTS.format( 2022 owner=self.owner.username, repo=self.repository.name, index=self.number 2023 ) 2024 ) 2025 2026 return [Comment.parse_response(self.allspice_client, result) for result in results] 2027 2028 def create_comment(self, body: str) -> Comment: 2029 """https://hub.allspice.io/api/swagger#/issue/issueCreateComment""" 2030 2031 path = self.GET_COMMENTS.format( 2032 owner=self.owner.username, repo=self.repository.name, index=self.number 2033 ) 2034 2035 response = self.allspice_client.requests_post(path, data={"body": body}) 2036 return Comment.parse_response(self.allspice_client, response) 2037 2038 def get_attachments(self) -> List[Attachment]: 2039 """ 2040 Fetch all attachments on this issue. 2041 2042 Unlike the assets field, this will always fetch all attachments from the 2043 API. 2044 2045 See https://hub.allspice.io/api/swagger#/issue/issueListIssueAttachments 2046 """ 2047 2048 path = self.GET_ATTACHMENTS.format( 2049 owner=self.owner.username, repo=self.repository.name, index=self.number 2050 ) 2051 response = self.allspice_client.requests_get(path) 2052 2053 return [Attachment.parse_response(self.allspice_client, result) for result in response] 2054 2055 def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment: 2056 """ 2057 Create an attachment on this issue. 2058 2059 https://hub.allspice.io/api/swagger#/issue/issueCreateIssueAttachment 2060 2061 :param file: The file to attach. This should be a file-like object. 2062 :param name: The name of the file. If not provided, the name of the file will be 2063 used. 2064 :return: The created attachment. 2065 """ 2066 2067 args: dict[str, Any] = { 2068 "files": {"attachment": file}, 2069 } 2070 if name is not None: 2071 args["params"] = {"name": name} 2072 2073 result = self.allspice_client.requests_post( 2074 self.GET_ATTACHMENTS.format( 2075 owner=self.owner.username, repo=self.repository.name, index=self.number 2076 ), 2077 **args, 2078 ) 2079 2080 return Attachment.parse_response(self.allspice_client, result) 2081 2082 2083class DesignReview(ApiObject): 2084 """ 2085 A Design Review. See 2086 https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest. 2087 2088 Note: The base and head fields are not `Branch` objects - they are plain strings 2089 referring to the branch names. This is because DRs can exist for branches that have 2090 been deleted, which don't have an associated `Branch` object from the API. You can use 2091 the `Repository.get_branch` method to get a `Branch` object for a branch if you know 2092 it exists. 2093 """ 2094 2095 additions: int 2096 allow_maintainer_edit: bool 2097 allow_maintainer_edits: Any 2098 assignee: User 2099 assignees: List["User"] 2100 base: str 2101 body: str 2102 changed_files: int 2103 closed_at: Optional[str] 2104 comments: int 2105 created_at: str 2106 deletions: int 2107 diff_url: str 2108 draft: bool 2109 due_date: Optional[str] 2110 head: str 2111 html_url: str 2112 id: int 2113 is_locked: bool 2114 labels: List[Any] 2115 merge_base: str 2116 merge_commit_sha: Optional[str] 2117 mergeable: bool 2118 merged: bool 2119 merged_at: Optional[str] 2120 merged_by: Optional["User"] 2121 milestone: Any 2122 number: int 2123 patch_url: str 2124 pin_order: int 2125 repository: Optional["Repository"] 2126 requested_reviewers: Any 2127 review_comments: int 2128 state: str 2129 title: str 2130 updated_at: str 2131 url: str 2132 user: User 2133 2134 API_OBJECT = "/repos/{owner}/{repo}/pulls/{index}" 2135 MERGE_DESIGN_REVIEW = "/repos/{owner}/{repo}/pulls/{index}/merge" 2136 GET_COMMENTS = "/repos/{owner}/{repo}/issues/{index}/comments" 2137 2138 OPEN = "open" 2139 CLOSED = "closed" 2140 2141 class MergeType(Enum): 2142 MERGE = "merge" 2143 REBASE = "rebase" 2144 REBASE_MERGE = "rebase-merge" 2145 SQUASH = "squash" 2146 MANUALLY_MERGED = "manually-merged" 2147 2148 def __init__(self, allspice_client): 2149 super().__init__(allspice_client) 2150 2151 def __eq__(self, other): 2152 if not isinstance(other, DesignReview): 2153 return False 2154 return self.repository == other.repository and self.id == other.id 2155 2156 def __hash__(self): 2157 return hash(self.repository) ^ hash(self.id) 2158 2159 @classmethod 2160 def parse_response(cls, allspice_client, result) -> "DesignReview": 2161 api_object = super().parse_response(allspice_client, result) 2162 cls._add_read_property( 2163 "repository", 2164 Repository.parse_response(allspice_client, result["base"]["repo"]), 2165 api_object, 2166 ) 2167 2168 return api_object 2169 2170 @classmethod 2171 def request(cls, allspice_client, owner: str, repo: str, number: str): 2172 """See https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest""" 2173 return cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number}) 2174 2175 _fields_to_parsers: ClassVar[dict] = { 2176 "assignee": lambda allspice_client, u: User.parse_response(allspice_client, u), 2177 "assignees": lambda allspice_client, us: [ 2178 User.parse_response(allspice_client, u) for u in us 2179 ], 2180 "base": lambda _, b: b["ref"], 2181 "head": lambda _, h: h["ref"], 2182 "merged_by": lambda allspice_client, u: User.parse_response(allspice_client, u), 2183 "milestone": lambda allspice_client, m: Milestone.parse_response(allspice_client, m), 2184 "user": lambda allspice_client, u: User.parse_response(allspice_client, u), 2185 } 2186 2187 _patchable_fields: ClassVar[set[str]] = { 2188 "allow_maintainer_edits", 2189 "assignee", 2190 "assignees", 2191 "base", 2192 "body", 2193 "due_date", 2194 "milestone", 2195 "state", 2196 "title", 2197 } 2198 2199 _parsers_to_fields: ClassVar[dict] = { 2200 "assignee": lambda u: u.username, 2201 "assignees": lambda us: [u.username for u in us], 2202 "base": lambda b: b.name if isinstance(b, Branch) else b, 2203 "milestone": lambda m: m.id, 2204 } 2205 2206 def commit(self): 2207 data = self.get_dirty_fields() 2208 if "due_date" in data and data["due_date"] is None: 2209 data["unset_due_date"] = True 2210 2211 args = { 2212 "owner": self.repository.owner.username, 2213 "repo": self.repository.name, 2214 "index": self.number, 2215 } 2216 self._commit(args, data) 2217 2218 def merge(self, merge_type: MergeType): 2219 """ 2220 Merge the pull request. See 2221 https://hub.allspice.io/api/swagger#/repository/repoMergePullRequest 2222 2223 :param merge_type: The type of merge to perform. See the MergeType enum. 2224 """ 2225 2226 self.allspice_client.requests_post( 2227 self.MERGE_DESIGN_REVIEW.format( 2228 owner=self.repository.owner.username, 2229 repo=self.repository.name, 2230 index=self.number, 2231 ), 2232 data={"Do": merge_type.value}, 2233 ) 2234 2235 def get_comments(self) -> List[Comment]: 2236 """ 2237 Get the comments on this pull request, but not specifically on a review. 2238 2239 https://hub.allspice.io/api/swagger#/issue/issueGetComments 2240 2241 :return: A list of comments on this pull request. 2242 """ 2243 2244 results = self.allspice_client.requests_get( 2245 self.GET_COMMENTS.format( 2246 owner=self.repository.owner.username, 2247 repo=self.repository.name, 2248 index=self.number, 2249 ) 2250 ) 2251 return [Comment.parse_response(self.allspice_client, result) for result in results] 2252 2253 def create_comment(self, body: str): 2254 """ 2255 Create a comment on this pull request. This uses the same endpoint as the 2256 comments on issues, and will not be associated with any reviews. 2257 2258 https://hub.allspice.io/api/swagger#/issue/issueCreateComment 2259 2260 :param body: The body of the comment. 2261 :return: The comment that was created. 2262 """ 2263 2264 result = self.allspice_client.requests_post( 2265 self.GET_COMMENTS.format( 2266 owner=self.repository.owner.username, 2267 repo=self.repository.name, 2268 index=self.number, 2269 ), 2270 data={"body": body}, 2271 ) 2272 return Comment.parse_response(self.allspice_client, result) 2273 2274 2275class Team(ApiObject): 2276 can_create_org_repo: bool 2277 description: str 2278 id: int 2279 includes_all_repositories: bool 2280 name: str 2281 organization: Optional["Organization"] 2282 permission: str 2283 units: List[str] 2284 units_map: Dict[str, str] 2285 2286 API_OBJECT = """/teams/{id}""" # <id> 2287 ADD_REPO = """/teams/%s/repos/%s/%s""" # <id, org, repo> 2288 TEAM_DELETE = """/teams/%s""" # <id> 2289 GET_MEMBERS = """/teams/%s/members""" # <id> 2290 GET_REPOS = """/teams/%s/repos""" # <id> 2291 2292 def __init__(self, allspice_client): 2293 super().__init__(allspice_client) 2294 2295 def __eq__(self, other): 2296 if not isinstance(other, Team): 2297 return False 2298 return self.organization == other.organization and self.id == other.id 2299 2300 def __hash__(self): 2301 return hash(self.organization) ^ hash(self.id) 2302 2303 _fields_to_parsers: ClassVar[dict] = { 2304 "organization": lambda allspice_client, o: Organization.parse_response(allspice_client, o) 2305 } 2306 2307 _patchable_fields: ClassVar[set[str]] = { 2308 "can_create_org_repo", 2309 "description", 2310 "includes_all_repositories", 2311 "name", 2312 "permission", 2313 "units", 2314 "units_map", 2315 } 2316 2317 @classmethod 2318 def request(cls, allspice_client, id: int): 2319 return cls._request(allspice_client, {"id": id}) 2320 2321 def commit(self): 2322 args = {"id": self.id} 2323 self._commit(args) 2324 2325 def add_user(self, user: User): 2326 """https://hub.allspice.io/api/swagger#/organization/orgAddTeamMember""" 2327 url = f"/teams/{self.id}/members/{user.login}" 2328 self.allspice_client.requests_put(url) 2329 2330 def add_repo(self, org: Organization, repo: Union[Repository, str]): 2331 if isinstance(repo, Repository): 2332 repo_name = repo.name 2333 else: 2334 repo_name = repo 2335 self.allspice_client.requests_put(Team.ADD_REPO % (self.id, org.username, repo_name)) 2336 2337 def get_members(self): 2338 """Get all users assigned to the team.""" 2339 results = self.allspice_client.requests_get_paginated( 2340 Team.GET_MEMBERS % self.id, 2341 ) 2342 return [User.parse_response(self.allspice_client, result) for result in results] 2343 2344 def get_repos(self): 2345 """Get all repos of this Team.""" 2346 results = self.allspice_client.requests_get(Team.GET_REPOS % self.id) 2347 return [Repository.parse_response(self.allspice_client, result) for result in results] 2348 2349 def delete(self): 2350 self.allspice_client.requests_delete(Team.TEAM_DELETE % self.id) 2351 self.deleted = True 2352 2353 def remove_team_member(self, user_name: str): 2354 url = f"/teams/{self.id}/members/{user_name}" 2355 self.allspice_client.requests_delete(url) 2356 2357 2358class Release(ApiObject): 2359 """ 2360 A release on a repo. 2361 """ 2362 2363 assets: List[Union[Any, Dict[str, Union[int, str]], "ReleaseAsset"]] 2364 author: User 2365 body: str 2366 created_at: str 2367 draft: bool 2368 html_url: str 2369 id: int 2370 name: str 2371 prerelease: bool 2372 published_at: str 2373 repo: Optional["Repository"] 2374 repository: Optional["Repository"] 2375 tag_name: str 2376 tarball_url: str 2377 target_commitish: str 2378 upload_url: str 2379 url: str 2380 zipball_url: str 2381 2382 API_OBJECT = "/repos/{owner}/{repo}/releases/{id}" 2383 RELEASE_CREATE_ASSET = "/repos/{owner}/{repo}/releases/{id}/assets" 2384 # Note that we don't strictly need the get_assets route, as the release 2385 # object already contains the assets. 2386 2387 def __init__(self, allspice_client): 2388 super().__init__(allspice_client) 2389 2390 def __eq__(self, other): 2391 if not isinstance(other, Release): 2392 return False 2393 return self.repo == other.repo and self.id == other.id 2394 2395 def __hash__(self): 2396 return hash(self.repo) ^ hash(self.id) 2397 2398 _fields_to_parsers: ClassVar[dict] = { 2399 "author": lambda allspice_client, author: User.parse_response(allspice_client, author), 2400 } 2401 _patchable_fields: ClassVar[set[str]] = { 2402 "body", 2403 "draft", 2404 "name", 2405 "prerelease", 2406 "tag_name", 2407 "target_commitish", 2408 } 2409 2410 @classmethod 2411 def parse_response(cls, allspice_client, result, repo) -> Release: 2412 release = super().parse_response(allspice_client, result) 2413 Release._add_read_property("repository", repo, release) 2414 # For legacy reasons 2415 Release._add_read_property("repo", repo, release) 2416 setattr( 2417 release, 2418 "_assets", 2419 [ 2420 ReleaseAsset.parse_response(allspice_client, asset, release) 2421 for asset in result["assets"] 2422 ], 2423 ) 2424 return release 2425 2426 @classmethod 2427 def request( 2428 cls, 2429 allspice_client, 2430 owner: str, 2431 repo: str, 2432 id: Optional[int] = None, 2433 ) -> Release: 2434 args = {"owner": owner, "repo": repo, "id": id} 2435 release_response = cls._get_gitea_api_object(allspice_client, args) 2436 repository = Repository.request(allspice_client, owner, repo) 2437 release = cls.parse_response(allspice_client, release_response, repository) 2438 return release 2439 2440 def commit(self): 2441 if self.repo is None: 2442 raise ValueError("Cannot commit a release without a repository.") 2443 2444 args = {"owner": self.repo.owner.username, "repo": self.repo.name, "id": self.id} 2445 self._commit(args) 2446 2447 def create_asset(self, file: IO, name: Optional[str] = None) -> ReleaseAsset: 2448 """ 2449 Create an asset for this release. 2450 2451 https://hub.allspice.io/api/swagger#/repository/repoCreateReleaseAsset 2452 2453 :param file: The file to upload. This should be a file-like object. 2454 :param name: The name of the file. 2455 :return: The created asset. 2456 """ 2457 2458 if self.repo is None: 2459 raise ValueError("Cannot commit a release without a repository.") 2460 2461 args: dict[str, Any] = {"files": {"attachment": file}} 2462 if name is not None: 2463 args["params"] = {"name": name} 2464 2465 result = self.allspice_client.requests_post( 2466 self.RELEASE_CREATE_ASSET.format( 2467 owner=self.repo.owner.username, 2468 repo=self.repo.name, 2469 id=self.id, 2470 ), 2471 **args, 2472 ) 2473 return ReleaseAsset.parse_response(self.allspice_client, result, self) 2474 2475 def delete(self): 2476 if self.repo is None: 2477 raise ValueError("Cannot commit a release without a repository.") 2478 2479 args = {"owner": self.repo.owner.username, "repo": self.repo.name, "id": self.id} 2480 self.allspice_client.requests_delete(self.API_OBJECT.format(**args)) 2481 self.deleted = True 2482 2483 2484class ReleaseAsset(ApiObject): 2485 browser_download_url: str 2486 created_at: str 2487 download_count: int 2488 id: int 2489 name: str 2490 release: Optional["Release"] 2491 size: int 2492 uuid: str 2493 2494 API_OBJECT = "/repos/{owner}/{repo}/releases/{release_id}/assets/{id}" 2495 2496 def __init__(self, allspice_client): 2497 super().__init__(allspice_client) 2498 2499 def __eq__(self, other): 2500 if not isinstance(other, ReleaseAsset): 2501 return False 2502 return self.release == other.release and self.id == other.id 2503 2504 def __hash__(self): 2505 return hash(self.release) ^ hash(self.id) 2506 2507 _fields_to_parsers: ClassVar[dict] = {} 2508 _patchable_fields: ClassVar[set[str]] = { 2509 "name", 2510 } 2511 2512 @classmethod 2513 def parse_response(cls, allspice_client, result, release) -> ReleaseAsset: 2514 asset = super().parse_response(allspice_client, result) 2515 ReleaseAsset._add_read_property("release", release, asset) 2516 return asset 2517 2518 @classmethod 2519 def request( 2520 cls, 2521 allspice_client, 2522 owner: str, 2523 repo: str, 2524 release_id: int, 2525 id: int, 2526 ) -> ReleaseAsset: 2527 args = {"owner": owner, "repo": repo, "release_id": release_id, "id": id} 2528 asset_response = cls._get_gitea_api_object(allspice_client, args) 2529 release = Release.request(allspice_client, owner, repo, release_id) 2530 asset = cls.parse_response(allspice_client, asset_response, release) 2531 return asset 2532 2533 def commit(self): 2534 if self.release is None or self.release.repo is None: 2535 raise ValueError("Cannot commit a release asset without a release or a repository.") 2536 2537 args = { 2538 "owner": self.release.repo.owner.username, 2539 "repo": self.release.repo.name, 2540 "release_id": self.release.id, 2541 "id": self.id, 2542 } 2543 self._commit(args) 2544 2545 def download(self) -> bytes: 2546 """ 2547 Download the raw, binary data of this asset. 2548 2549 Note 1: if the file you are requesting is a text file, you might want to 2550 use .decode() on the result to get a string. For example: 2551 2552 asset.download().decode("utf-8") 2553 2554 Note 2: this method will store the entire file in memory. If you are 2555 downloading a large file, you might want to use download_to_file instead. 2556 """ 2557 2558 return self.allspice_client.requests.get( 2559 self.browser_download_url, 2560 headers=self.allspice_client.headers, 2561 ).content 2562 2563 def download_to_file(self, io: IO): 2564 """ 2565 Download the raw, binary data of this asset to a file-like object. 2566 2567 Example: 2568 2569 with open("my_file.zip", "wb") as f: 2570 asset.download_to_file(f) 2571 2572 :param io: The file-like object to write the data to. 2573 """ 2574 2575 response = self.allspice_client.requests.get( 2576 self.browser_download_url, 2577 headers=self.allspice_client.headers, 2578 stream=True, 2579 ) 2580 # 4kb chunks 2581 for chunk in response.iter_content(chunk_size=4096): 2582 if chunk: 2583 io.write(chunk) 2584 2585 def delete(self): 2586 if self.release is None or self.release.repo is None: 2587 raise ValueError("Cannot commit a release asset without a release or a repository.") 2588 2589 args = { 2590 "owner": self.release.repo.owner.username, 2591 "repo": self.release.repo.name, 2592 "release_id": self.release.id, 2593 "id": self.id, 2594 } 2595 self.allspice_client.requests_delete(self.API_OBJECT.format(**args)) 2596 self.deleted = True 2597 2598 2599class Content(ReadonlyApiObject): 2600 content: Any 2601 download_url: str 2602 encoding: Any 2603 git_url: str 2604 html_url: str 2605 last_commit_sha: str 2606 name: str 2607 path: str 2608 sha: str 2609 size: int 2610 submodule_git_url: Any 2611 target: Any 2612 type: str 2613 url: str 2614 2615 FILE = "file" 2616 2617 def __init__(self, allspice_client): 2618 super().__init__(allspice_client) 2619 2620 def __eq__(self, other): 2621 if not isinstance(other, Content): 2622 return False 2623 2624 return self.sha == other.sha and self.name == other.name 2625 2626 def __hash__(self): 2627 return hash(self.sha) ^ hash(self.name) 2628 2629 2630Ref = Union[Branch, Commit, str] 2631 2632 2633class Util: 2634 @staticmethod 2635 def convert_time(time: str) -> datetime: 2636 """Parsing of strange Gitea time format ("%Y-%m-%dT%H:%M:%S:%z" but with ":" in time zone notation)""" 2637 try: 2638 return datetime.strptime(time[:-3] + "00", "%Y-%m-%dT%H:%M:%S%z") 2639 except ValueError: 2640 return datetime.strptime(time[:-3] + "00", "%Y-%m-%dT%H:%M:%S") 2641 2642 @staticmethod 2643 def format_time(time: datetime) -> str: 2644 """ 2645 Format a datetime object to Gitea's time format. 2646 2647 :param time: The time to format 2648 :return: Formatted time 2649 """ 2650 2651 return time.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S") + "Z" 2652 2653 @staticmethod 2654 def data_params_for_ref(ref: Optional[Ref]) -> Dict: 2655 """ 2656 Given a "ref", returns a dict with the ref parameter for the API call. 2657 2658 If the ref is None, returns an empty dict. You can pass this to the API 2659 directly. 2660 """ 2661 2662 if isinstance(ref, Branch): 2663 return {"ref": ref.name} 2664 elif isinstance(ref, Commit): 2665 return {"ref": ref.sha} 2666 elif ref: 2667 return {"ref": ref} 2668 else: 2669 return {}
33class Organization(ApiObject): 34 """see https://hub.allspice.io/api/swagger#/organization/orgGetAll""" 35 36 active: Optional[bool] 37 avatar_url: str 38 created: Optional[str] 39 description: str 40 email: str 41 followers_count: Optional[int] 42 following_count: Optional[int] 43 full_name: str 44 html_url: Optional[str] 45 id: int 46 is_admin: Optional[bool] 47 language: Optional[str] 48 last_login: Optional[str] 49 location: str 50 login: Optional[str] 51 login_name: Optional[str] 52 name: Optional[str] 53 prohibit_login: Optional[bool] 54 repo_admin_change_team_access: Optional[bool] 55 restricted: Optional[bool] 56 source_id: Optional[int] 57 starred_repos_count: Optional[int] 58 username: str 59 visibility: str 60 website: str 61 62 API_OBJECT = """/orgs/{name}""" # <org> 63 ORG_REPOS_REQUEST = """/orgs/%s/repos""" # <org> 64 ORG_TEAMS_REQUEST = """/orgs/%s/teams""" # <org> 65 ORG_TEAMS_CREATE = """/orgs/%s/teams""" # <org> 66 ORG_GET_MEMBERS = """/orgs/%s/members""" # <org> 67 ORG_IS_MEMBER = """/orgs/%s/members/%s""" # <org>, <username> 68 ORG_HEATMAP = """/users/%s/heatmap""" # <username> 69 70 def __init__(self, allspice_client): 71 super().__init__(allspice_client) 72 73 def __eq__(self, other): 74 if not isinstance(other, Organization): 75 return False 76 return self.allspice_client == other.allspice_client and self.name == other.name 77 78 def __hash__(self): 79 return hash(self.allspice_client) ^ hash(self.name) 80 81 @classmethod 82 def request(cls, allspice_client, name: str) -> Self: 83 return cls._request(allspice_client, {"name": name}) 84 85 @classmethod 86 def parse_response(cls, allspice_client, result) -> "Organization": 87 api_object = super().parse_response(allspice_client, result) 88 # add "name" field to make this behave similar to users for gitea < 1.18 89 # also necessary for repository-owner when org is repo owner 90 if not hasattr(api_object, "name"): 91 Organization._add_read_property("name", result["username"], api_object) 92 return api_object 93 94 _patchable_fields: ClassVar[set[str]] = { 95 "description", 96 "full_name", 97 "location", 98 "visibility", 99 "website", 100 } 101 102 def commit(self): 103 args = {"name": self.name} 104 self._commit(args) 105 106 def create_repo( 107 self, 108 repoName: str, 109 description: str = "", 110 private: bool = False, 111 autoInit=True, 112 gitignores: Optional[str] = None, 113 license: Optional[str] = None, 114 readme: str = "Default", 115 issue_labels: Optional[str] = None, 116 default_branch="master", 117 ): 118 """Create an organization Repository 119 120 Throws: 121 AlreadyExistsException: If the Repository exists already. 122 Exception: If something else went wrong. 123 """ 124 result = self.allspice_client.requests_post( 125 f"/orgs/{self.name}/repos", 126 data={ 127 "name": repoName, 128 "description": description, 129 "private": private, 130 "auto_init": autoInit, 131 "gitignores": gitignores, 132 "license": license, 133 "issue_labels": issue_labels, 134 "readme": readme, 135 "default_branch": default_branch, 136 }, 137 ) 138 if "id" in result: 139 self.allspice_client.logger.info("Successfully created Repository %s " % result["name"]) 140 else: 141 self.allspice_client.logger.error(result["message"]) 142 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 143 return Repository.parse_response(self.allspice_client, result) 144 145 def get_repositories(self) -> List["Repository"]: 146 results = self.allspice_client.requests_get_paginated( 147 Organization.ORG_REPOS_REQUEST % self.username 148 ) 149 return [Repository.parse_response(self.allspice_client, result) for result in results] 150 151 def get_repository(self, name) -> "Repository": 152 repos = self.get_repositories() 153 for repo in repos: 154 if repo.name == name: 155 return repo 156 raise NotFoundException("Repository %s not existent in organization." % name) 157 158 def get_teams(self) -> List["Team"]: 159 results = self.allspice_client.requests_get(Organization.ORG_TEAMS_REQUEST % self.username) 160 teams = [Team.parse_response(self.allspice_client, result) for result in results] 161 # organisation seems to be missing using this request, so we add org manually 162 for t in teams: 163 setattr(t, "_organization", self) 164 return teams 165 166 def get_team(self, name) -> "Team": 167 teams = self.get_teams() 168 for team in teams: 169 if team.name == name: 170 return team 171 raise NotFoundException("Team not existent in organization.") 172 173 def create_team( 174 self, 175 name: str, 176 description: str = "", 177 permission: str = "read", 178 can_create_org_repo: bool = False, 179 includes_all_repositories: bool = False, 180 units=( 181 "repo.code", 182 "repo.issues", 183 "repo.ext_issues", 184 "repo.wiki", 185 "repo.pulls", 186 "repo.releases", 187 "repo.ext_wiki", 188 ), 189 units_map={}, 190 ) -> "Team": 191 """Alias for AllSpice#create_team""" 192 # TODO: Move AllSpice#create_team to Organization#create_team and 193 # deprecate AllSpice#create_team. 194 return self.allspice_client.create_team( 195 org=self, 196 name=name, 197 description=description, 198 permission=permission, 199 can_create_org_repo=can_create_org_repo, 200 includes_all_repositories=includes_all_repositories, 201 units=units, 202 units_map=units_map, 203 ) 204 205 def get_members(self) -> List["User"]: 206 results = self.allspice_client.requests_get(Organization.ORG_GET_MEMBERS % self.username) 207 return [User.parse_response(self.allspice_client, result) for result in results] 208 209 def is_member(self, username) -> bool: 210 if isinstance(username, User): 211 username = username.username 212 try: 213 # returns 204 if its ok, 404 if its not 214 self.allspice_client.requests_get( 215 Organization.ORG_IS_MEMBER % (self.username, username) 216 ) 217 return True 218 except Exception: 219 return False 220 221 def remove_member(self, user: "User"): 222 path = f"/orgs/{self.username}/members/{user.username}" 223 self.allspice_client.requests_delete(path) 224 225 def delete(self): 226 """Delete this Organization. Invalidates this Objects data. Also deletes all Repositories owned by the User""" 227 for repo in self.get_repositories(): 228 repo.delete() 229 self.allspice_client.requests_delete(Organization.API_OBJECT.format(name=self.username)) 230 self.deleted = True 231 232 def get_heatmap(self) -> List[Tuple[datetime, int]]: 233 results = self.allspice_client.requests_get(User.USER_HEATMAP % self.username) 234 results = [ 235 (datetime.fromtimestamp(result["timestamp"]), result["contributions"]) 236 for result in results 237 ] 238 return results
see allspice.allspice.io/api/swagger#/organization/orgGetAll">https://huballspice.allspice.io/api/swagger#/organization/orgGetAll
85 @classmethod 86 def parse_response(cls, allspice_client, result) -> "Organization": 87 api_object = super().parse_response(allspice_client, result) 88 # add "name" field to make this behave similar to users for gitea < 1.18 89 # also necessary for repository-owner when org is repo owner 90 if not hasattr(api_object, "name"): 91 Organization._add_read_property("name", result["username"], api_object) 92 return api_object
106 def create_repo( 107 self, 108 repoName: str, 109 description: str = "", 110 private: bool = False, 111 autoInit=True, 112 gitignores: Optional[str] = None, 113 license: Optional[str] = None, 114 readme: str = "Default", 115 issue_labels: Optional[str] = None, 116 default_branch="master", 117 ): 118 """Create an organization Repository 119 120 Throws: 121 AlreadyExistsException: If the Repository exists already. 122 Exception: If something else went wrong. 123 """ 124 result = self.allspice_client.requests_post( 125 f"/orgs/{self.name}/repos", 126 data={ 127 "name": repoName, 128 "description": description, 129 "private": private, 130 "auto_init": autoInit, 131 "gitignores": gitignores, 132 "license": license, 133 "issue_labels": issue_labels, 134 "readme": readme, 135 "default_branch": default_branch, 136 }, 137 ) 138 if "id" in result: 139 self.allspice_client.logger.info("Successfully created Repository %s " % result["name"]) 140 else: 141 self.allspice_client.logger.error(result["message"]) 142 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 143 return Repository.parse_response(self.allspice_client, result)
Create an organization Repository
Throws: AlreadyExistsException: If the Repository exists already. Exception: If something else went wrong.
158 def get_teams(self) -> List["Team"]: 159 results = self.allspice_client.requests_get(Organization.ORG_TEAMS_REQUEST % self.username) 160 teams = [Team.parse_response(self.allspice_client, result) for result in results] 161 # organisation seems to be missing using this request, so we add org manually 162 for t in teams: 163 setattr(t, "_organization", self) 164 return teams
173 def create_team( 174 self, 175 name: str, 176 description: str = "", 177 permission: str = "read", 178 can_create_org_repo: bool = False, 179 includes_all_repositories: bool = False, 180 units=( 181 "repo.code", 182 "repo.issues", 183 "repo.ext_issues", 184 "repo.wiki", 185 "repo.pulls", 186 "repo.releases", 187 "repo.ext_wiki", 188 ), 189 units_map={}, 190 ) -> "Team": 191 """Alias for AllSpice#create_team""" 192 # TODO: Move AllSpice#create_team to Organization#create_team and 193 # deprecate AllSpice#create_team. 194 return self.allspice_client.create_team( 195 org=self, 196 name=name, 197 description=description, 198 permission=permission, 199 can_create_org_repo=can_create_org_repo, 200 includes_all_repositories=includes_all_repositories, 201 units=units, 202 units_map=units_map, 203 )
Alias for AllSpice#create_team
209 def is_member(self, username) -> bool: 210 if isinstance(username, User): 211 username = username.username 212 try: 213 # returns 204 if its ok, 404 if its not 214 self.allspice_client.requests_get( 215 Organization.ORG_IS_MEMBER % (self.username, username) 216 ) 217 return True 218 except Exception: 219 return False
225 def delete(self): 226 """Delete this Organization. Invalidates this Objects data. Also deletes all Repositories owned by the User""" 227 for repo in self.get_repositories(): 228 repo.delete() 229 self.allspice_client.requests_delete(Organization.API_OBJECT.format(name=self.username)) 230 self.deleted = True
Delete this Organization. Invalidates this Objects data. Also deletes all Repositories owned by the User
241class User(ApiObject): 242 active: bool 243 admin: Any 244 allow_create_organization: Any 245 allow_git_hook: Any 246 allow_import_local: Any 247 avatar_url: str 248 created: str 249 description: str 250 email: str 251 emails: List[Any] 252 followers_count: int 253 following_count: int 254 full_name: str 255 html_url: str 256 id: int 257 is_admin: bool 258 language: str 259 last_login: str 260 location: str 261 login: str 262 login_name: str 263 max_repo_creation: Any 264 must_change_password: Any 265 password: Any 266 prohibit_login: bool 267 restricted: bool 268 source_id: int 269 starred_repos_count: int 270 username: str 271 visibility: str 272 website: str 273 274 API_OBJECT = """/users/{name}""" # <org> 275 USER_MAIL = """/user/emails?sudo=%s""" # <name> 276 USER_PATCH = """/admin/users/%s""" # <username> 277 ADMIN_DELETE_USER = """/admin/users/%s""" # <username> 278 ADMIN_EDIT_USER = """/admin/users/{username}""" # <username> 279 USER_HEATMAP = """/users/%s/heatmap""" # <username> 280 281 def __init__(self, allspice_client): 282 super().__init__(allspice_client) 283 self._emails = [] 284 285 def __eq__(self, other): 286 if not isinstance(other, User): 287 return False 288 return self.allspice_client == other.allspice_client and self.id == other.id 289 290 def __hash__(self): 291 return hash(self.allspice_client) ^ hash(self.id) 292 293 @property 294 def emails(self): 295 self.__request_emails() 296 return self._emails 297 298 @classmethod 299 def request(cls, allspice_client, name: str) -> "User": 300 api_object = cls._request(allspice_client, {"name": name}) 301 return api_object 302 303 _patchable_fields: ClassVar[set[str]] = { 304 "active", 305 "admin", 306 "allow_create_organization", 307 "allow_git_hook", 308 "allow_import_local", 309 "email", 310 "full_name", 311 "location", 312 "login_name", 313 "max_repo_creation", 314 "must_change_password", 315 "password", 316 "prohibit_login", 317 "website", 318 } 319 320 def commit(self, login_name: str, source_id: int = 0): 321 """ 322 Unfortunately it is necessary to require the login name 323 as well as the login source (that is not supplied when getting a user) for 324 changing a user. 325 Usually source_id is 0 and the login_name is equal to the username. 326 """ 327 values = self.get_dirty_fields() 328 values.update( 329 # api-doc says that the "source_id" is necessary; works without though 330 {"login_name": login_name, "source_id": source_id} 331 ) 332 args = {"username": self.username} 333 self.allspice_client.requests_patch(User.ADMIN_EDIT_USER.format(**args), data=values) 334 self._dirty_fields = {} 335 336 def create_repo( 337 self, 338 repoName: str, 339 description: str = "", 340 private: bool = False, 341 autoInit=True, 342 gitignores: Optional[str] = None, 343 license: Optional[str] = None, 344 readme: str = "Default", 345 issue_labels: Optional[str] = None, 346 default_branch="master", 347 ): 348 """Create a user Repository 349 350 Throws: 351 AlreadyExistsException: If the Repository exists already. 352 Exception: If something else went wrong. 353 """ 354 result = self.allspice_client.requests_post( 355 "/user/repos", 356 data={ 357 "name": repoName, 358 "description": description, 359 "private": private, 360 "auto_init": autoInit, 361 "gitignores": gitignores, 362 "license": license, 363 "issue_labels": issue_labels, 364 "readme": readme, 365 "default_branch": default_branch, 366 }, 367 ) 368 if "id" in result: 369 self.allspice_client.logger.info("Successfully created Repository %s " % result["name"]) 370 else: 371 self.allspice_client.logger.error(result["message"]) 372 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 373 return Repository.parse_response(self.allspice_client, result) 374 375 def get_repositories(self) -> List["Repository"]: 376 """Get all Repositories owned by this User.""" 377 url = f"/users/{self.username}/repos" 378 results = self.allspice_client.requests_get_paginated(url) 379 return [Repository.parse_response(self.allspice_client, result) for result in results] 380 381 def get_orgs(self) -> List[Organization]: 382 """Get all Organizations this user is a member of.""" 383 url = f"/users/{self.username}/orgs" 384 results = self.allspice_client.requests_get_paginated(url) 385 return [Organization.parse_response(self.allspice_client, result) for result in results] 386 387 def get_teams(self) -> List["Team"]: 388 url = "/user/teams" 389 results = self.allspice_client.requests_get_paginated(url, sudo=self) 390 return [Team.parse_response(self.allspice_client, result) for result in results] 391 392 def get_accessible_repos(self) -> List["Repository"]: 393 """Get all Repositories accessible by the logged in User.""" 394 results = self.allspice_client.requests_get("/user/repos", sudo=self) 395 return [Repository.parse_response(self.allspice_client, result) for result in results] 396 397 def __request_emails(self): 398 result = self.allspice_client.requests_get(User.USER_MAIL % self.login) 399 # report if the adress changed by this 400 for mail in result: 401 self._emails.append(mail["email"]) 402 if mail["primary"]: 403 self._email = mail["email"] 404 405 def delete(self): 406 """Deletes this User. Also deletes all Repositories he owns.""" 407 self.allspice_client.requests_delete(User.ADMIN_DELETE_USER % self.username) 408 self.deleted = True 409 410 def get_heatmap(self) -> List[Tuple[datetime, int]]: 411 results = self.allspice_client.requests_get(User.USER_HEATMAP % self.username) 412 results = [ 413 (datetime.fromtimestamp(result["timestamp"]), result["contributions"]) 414 for result in results 415 ] 416 return results
320 def commit(self, login_name: str, source_id: int = 0): 321 """ 322 Unfortunately it is necessary to require the login name 323 as well as the login source (that is not supplied when getting a user) for 324 changing a user. 325 Usually source_id is 0 and the login_name is equal to the username. 326 """ 327 values = self.get_dirty_fields() 328 values.update( 329 # api-doc says that the "source_id" is necessary; works without though 330 {"login_name": login_name, "source_id": source_id} 331 ) 332 args = {"username": self.username} 333 self.allspice_client.requests_patch(User.ADMIN_EDIT_USER.format(**args), data=values) 334 self._dirty_fields = {}
Unfortunately it is necessary to require the login name as well as the login source (that is not supplied when getting a user) for changing a user. Usually source_id is 0 and the login_name is equal to the username.
336 def create_repo( 337 self, 338 repoName: str, 339 description: str = "", 340 private: bool = False, 341 autoInit=True, 342 gitignores: Optional[str] = None, 343 license: Optional[str] = None, 344 readme: str = "Default", 345 issue_labels: Optional[str] = None, 346 default_branch="master", 347 ): 348 """Create a user Repository 349 350 Throws: 351 AlreadyExistsException: If the Repository exists already. 352 Exception: If something else went wrong. 353 """ 354 result = self.allspice_client.requests_post( 355 "/user/repos", 356 data={ 357 "name": repoName, 358 "description": description, 359 "private": private, 360 "auto_init": autoInit, 361 "gitignores": gitignores, 362 "license": license, 363 "issue_labels": issue_labels, 364 "readme": readme, 365 "default_branch": default_branch, 366 }, 367 ) 368 if "id" in result: 369 self.allspice_client.logger.info("Successfully created Repository %s " % result["name"]) 370 else: 371 self.allspice_client.logger.error(result["message"]) 372 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 373 return Repository.parse_response(self.allspice_client, result)
Create a user Repository
Throws: AlreadyExistsException: If the Repository exists already. Exception: If something else went wrong.
375 def get_repositories(self) -> List["Repository"]: 376 """Get all Repositories owned by this User.""" 377 url = f"/users/{self.username}/repos" 378 results = self.allspice_client.requests_get_paginated(url) 379 return [Repository.parse_response(self.allspice_client, result) for result in results]
Get all Repositories owned by this User.
381 def get_orgs(self) -> List[Organization]: 382 """Get all Organizations this user is a member of.""" 383 url = f"/users/{self.username}/orgs" 384 results = self.allspice_client.requests_get_paginated(url) 385 return [Organization.parse_response(self.allspice_client, result) for result in results]
Get all Organizations this user is a member of.
392 def get_accessible_repos(self) -> List["Repository"]: 393 """Get all Repositories accessible by the logged in User.""" 394 results = self.allspice_client.requests_get("/user/repos", sudo=self) 395 return [Repository.parse_response(self.allspice_client, result) for result in results]
Get all Repositories accessible by the logged in User.
405 def delete(self): 406 """Deletes this User. Also deletes all Repositories he owns.""" 407 self.allspice_client.requests_delete(User.ADMIN_DELETE_USER % self.username) 408 self.deleted = True
Deletes this User. Also deletes all Repositories he owns.
419class Branch(ReadonlyApiObject): 420 commit: Dict[str, Optional[Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]]] 421 effective_branch_protection_name: str 422 enable_status_check: bool 423 name: str 424 protected: bool 425 required_approvals: int 426 status_check_contexts: List[Any] 427 user_can_merge: bool 428 user_can_push: bool 429 430 API_OBJECT = """/repos/{owner}/{repo}/branches/{branch}""" 431 432 def __init__(self, allspice_client): 433 super().__init__(allspice_client) 434 435 def __eq__(self, other): 436 if not isinstance(other, Branch): 437 return False 438 return self.commit == other.commit and self.name == other.name 439 440 def __hash__(self): 441 return hash(self.commit["id"]) ^ hash(self.name) 442 443 _fields_to_parsers: ClassVar[dict] = { 444 # This is not a commit object 445 # "commit": lambda allspice_client, c: Commit.parse_response(allspice_client, c) 446 } 447 448 @classmethod 449 def request(cls, allspice_client, owner: str, repo: str, branch: str): 450 return cls._request(allspice_client, {"owner": owner, "repo": repo, "branch": branch})
Inherited Members
453class GitEntry(ReadonlyApiObject): 454 """ 455 An object representing a file or directory in the Git tree. 456 """ 457 458 mode: str 459 path: str 460 sha: str 461 size: int 462 type: str 463 url: str 464 465 def __init__(self, allspice_client): 466 super().__init__(allspice_client) 467 468 def __eq__(self, other) -> bool: 469 if not isinstance(other, GitEntry): 470 return False 471 return self.sha == other.sha 472 473 def __hash__(self) -> int: 474 return hash(self.sha)
An object representing a file or directory in the Git tree.
Inherited Members
477class Repository(ApiObject): 478 allow_fast_forward_only_merge: bool 479 allow_manual_merge: Any 480 allow_merge_commits: bool 481 allow_rebase: bool 482 allow_rebase_explicit: bool 483 allow_rebase_update: bool 484 allow_squash_merge: bool 485 archived: bool 486 archived_at: str 487 autodetect_manual_merge: Any 488 avatar_url: str 489 clone_url: str 490 created_at: str 491 default_allow_maintainer_edit: bool 492 default_branch: str 493 default_delete_branch_after_merge: bool 494 default_merge_style: str 495 description: str 496 empty: bool 497 enable_prune: Any 498 external_tracker: Any 499 external_wiki: Any 500 fork: bool 501 forks_count: int 502 full_name: str 503 has_actions: bool 504 has_issues: bool 505 has_packages: bool 506 has_projects: bool 507 has_pull_requests: bool 508 has_releases: bool 509 has_wiki: bool 510 html_url: str 511 id: int 512 ignore_whitespace_conflicts: bool 513 internal: bool 514 internal_tracker: Dict[str, bool] 515 language: str 516 languages_url: str 517 link: str 518 mirror: bool 519 mirror_interval: str 520 mirror_updated: str 521 name: str 522 object_format_name: str 523 open_issues_count: int 524 open_pr_counter: int 525 original_url: str 526 owner: Union["User", "Organization"] 527 parent: Any 528 permissions: Dict[str, bool] 529 private: bool 530 projects_mode: str 531 release_counter: int 532 repo_transfer: Any 533 size: int 534 ssh_url: str 535 stars_count: int 536 template: bool 537 updated_at: datetime 538 url: str 539 watchers_count: int 540 website: str 541 542 API_OBJECT = """/repos/{owner}/{name}""" # <owner>, <reponame> 543 REPO_IS_COLLABORATOR = """/repos/%s/%s/collaborators/%s""" # <owner>, <reponame>, <username> 544 REPO_SEARCH = """/repos/search/""" 545 REPO_BRANCHES = """/repos/%s/%s/branches""" # <owner>, <reponame> 546 REPO_BRANCH = """/repos/{owner}/{repo}/branches/{branch}""" 547 REPO_ISSUES = """/repos/{owner}/{repo}/issues""" # <owner, reponame> 548 REPO_DESIGN_REVIEWS = """/repos/{owner}/{repo}/pulls""" 549 REPO_DELETE = """/repos/%s/%s""" # <owner>, <reponame> 550 REPO_TIMES = """/repos/%s/%s/times""" # <owner>, <reponame> 551 REPO_USER_TIME = """/repos/%s/%s/times/%s""" # <owner>, <reponame>, <username> 552 REPO_COMMITS = "/repos/%s/%s/commits" # <owner>, <reponame> 553 REPO_TRANSFER = "/repos/{owner}/{repo}/transfer" 554 REPO_MILESTONES = """/repos/{owner}/{repo}/milestones""" 555 REPO_GET_ARCHIVE = "/repos/{owner}/{repo}/archive/{ref}.{format}" 556 REPO_GET_ALLSPICE_JSON = "/repos/{owner}/{repo}/allspice_generated/json/{content}" 557 REPO_GET_ALLSPICE_SVG = "/repos/{owner}/{repo}/allspice_generated/svg/{content}" 558 REPO_GET_TOPICS = "/repos/{owner}/{repo}/topics" 559 REPO_ADD_TOPIC = "/repos/{owner}/{repo}/topics/{topic}" 560 REPO_GET_RELEASES = "/repos/{owner}/{repo}/releases" 561 REPO_GET_LATEST_RELEASE = "/repos/{owner}/{repo}/releases/latest" 562 REPO_GET_RELEASE_BY_TAG = "/repos/{owner}/{repo}/releases/tags/{tag}" 563 REPO_GET_COMMIT_STATUS = "/repos/{owner}/{repo}/statuses/{sha}" 564 REPO_GET_RAW_FILE = "/repos/{owner}/{repo}/raw/{path}" 565 REPO_GET_TREE = "/repos/{owner}/{repo}/git/trees/{ref}" 566 567 class ArchiveFormat(Enum): 568 """ 569 Archive formats for Repository.get_archive 570 """ 571 572 TAR = "tar.gz" 573 ZIP = "zip" 574 575 class CommitStatusSort(Enum): 576 """ 577 Sort order for Repository.get_commit_status 578 """ 579 580 OLDEST = "oldest" 581 RECENT_UPDATE = "recentupdate" 582 LEAST_UPDATE = "leastupdate" 583 LEAST_INDEX = "leastindex" 584 HIGHEST_INDEX = "highestindex" 585 586 def __init__(self, allspice_client): 587 super().__init__(allspice_client) 588 589 def __eq__(self, other): 590 if not isinstance(other, Repository): 591 return False 592 return self.owner == other.owner and self.name == other.name 593 594 def __hash__(self): 595 return hash(self.owner) ^ hash(self.name) 596 597 _fields_to_parsers: ClassVar[dict] = { 598 # dont know how to tell apart user and org as owner except form email being empty. 599 "owner": lambda allspice_client, r: ( 600 Organization.parse_response(allspice_client, r) 601 if r["email"] == "" 602 else User.parse_response(allspice_client, r) 603 ), 604 "updated_at": lambda _, t: Util.convert_time(t), 605 } 606 607 @classmethod 608 def request( 609 cls, 610 allspice_client, 611 owner: str, 612 name: str, 613 ) -> Repository: 614 return cls._request(allspice_client, {"owner": owner, "name": name}) 615 616 @classmethod 617 def search( 618 cls, 619 allspice_client, 620 query: Optional[str] = None, 621 topic: bool = False, 622 include_description: bool = False, 623 user: Optional[User] = None, 624 owner_to_prioritize: Union[User, Organization, None] = None, 625 ) -> list[Repository]: 626 """ 627 Search for repositories. 628 629 See https://hub.allspice.io/api/swagger#/repository/repoSearch 630 631 :param query: The query string to search for 632 :param topic: If true, the query string will only be matched against the 633 repository's topic. 634 :param include_description: If true, the query string will be matched 635 against the repository's description as well. 636 :param user: If specified, only repositories that this user owns or 637 contributes to will be searched. 638 :param owner_to_prioritize: If specified, repositories owned by the 639 given entity will be prioritized in the search. 640 :returns: All repositories matching the query. If there are many 641 repositories matching this query, this may take some time. 642 """ 643 644 params = {} 645 646 if query is not None: 647 params["q"] = query 648 if topic: 649 params["topic"] = topic 650 if include_description: 651 params["include_description"] = include_description 652 if user is not None: 653 params["user"] = user.id 654 if owner_to_prioritize is not None: 655 params["owner_to_prioritize"] = owner_to_prioritize.id 656 657 responses = allspice_client.requests_get_paginated(cls.REPO_SEARCH, params=params) 658 659 return [Repository.parse_response(allspice_client, response) for response in responses] 660 661 _patchable_fields: ClassVar[set[str]] = { 662 "allow_manual_merge", 663 "allow_merge_commits", 664 "allow_rebase", 665 "allow_rebase_explicit", 666 "allow_rebase_update", 667 "allow_squash_merge", 668 "archived", 669 "autodetect_manual_merge", 670 "default_branch", 671 "default_delete_branch_after_merge", 672 "default_merge_style", 673 "description", 674 "enable_prune", 675 "external_tracker", 676 "external_wiki", 677 "has_actions", 678 "has_issues", 679 "has_projects", 680 "has_pull_requests", 681 "has_wiki", 682 "ignore_whitespace_conflicts", 683 "internal_tracker", 684 "mirror_interval", 685 "name", 686 "private", 687 "template", 688 "website", 689 } 690 691 def commit(self): 692 args = {"owner": self.owner.username, "name": self.name} 693 self._commit(args) 694 695 def get_branches(self) -> List["Branch"]: 696 """Get all the Branches of this Repository.""" 697 698 results = self.allspice_client.requests_get_paginated( 699 Repository.REPO_BRANCHES % (self.owner.username, self.name) 700 ) 701 return [Branch.parse_response(self.allspice_client, result) for result in results] 702 703 def get_branch(self, name: str) -> "Branch": 704 """Get a specific Branch of this Repository.""" 705 result = self.allspice_client.requests_get( 706 Repository.REPO_BRANCH.format(owner=self.owner.username, repo=self.name, branch=name) 707 ) 708 return Branch.parse_response(self.allspice_client, result) 709 710 def add_branch(self, create_from: Ref, newname: str) -> "Branch": 711 """Add a branch to the repository""" 712 # Note: will only work with gitea 1.13 or higher! 713 714 ref_name = Util.data_params_for_ref(create_from) 715 if "ref" not in ref_name: 716 raise ValueError("create_from must be a Branch, Commit or string") 717 ref_name = ref_name["ref"] 718 719 data = {"new_branch_name": newname, "old_ref_name": ref_name} 720 result = self.allspice_client.requests_post( 721 Repository.REPO_BRANCHES % (self.owner.username, self.name), data=data 722 ) 723 return Branch.parse_response(self.allspice_client, result) 724 725 def get_issues( 726 self, 727 state: Literal["open", "closed", "all"] = "all", 728 search_query: Optional[str] = None, 729 labels: Optional[List[str]] = None, 730 milestones: Optional[List[Union[Milestone, str]]] = None, 731 assignee: Optional[Union[User, str]] = None, 732 since: Optional[datetime] = None, 733 before: Optional[datetime] = None, 734 ) -> List["Issue"]: 735 """ 736 Get all Issues of this Repository (open and closed) 737 738 https://hub.allspice.io/api/swagger#/repository/repoListIssues 739 740 All params of this method are optional filters. If you don't specify a filter, it 741 will not be applied. 742 743 :param state: The state of the Issues to get. If None, all Issues are returned. 744 :param search_query: Filter issues by text. This is equivalent to searching for 745 `search_query` in the Issues on the web interface. 746 :param labels: Filter issues by labels. 747 :param milestones: Filter issues by milestones. 748 :param assignee: Filter issues by the assigned user. 749 :param since: Filter issues by the date they were created. 750 :param before: Filter issues by the date they were created. 751 :return: A list of Issues. 752 """ 753 754 data = { 755 "state": state, 756 } 757 if search_query: 758 data["q"] = search_query 759 if labels: 760 data["labels"] = ",".join(labels) 761 if milestones: 762 data["milestone"] = ",".join( 763 [ 764 milestone.name if isinstance(milestone, Milestone) else milestone 765 for milestone in milestones 766 ] 767 ) 768 if assignee: 769 if isinstance(assignee, User): 770 data["assignee"] = assignee.username 771 else: 772 data["assignee"] = assignee 773 if since: 774 data["since"] = Util.format_time(since) 775 if before: 776 data["before"] = Util.format_time(before) 777 778 results = self.allspice_client.requests_get_paginated( 779 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 780 params=data, 781 ) 782 783 issues = [] 784 for result in results: 785 issue = Issue.parse_response(self.allspice_client, result) 786 # See Issue.request 787 setattr(issue, "_repository", self) 788 # This is mostly for compatibility with an older implementation 789 Issue._add_read_property("repo", self, issue) 790 issues.append(issue) 791 792 return issues 793 794 def get_design_reviews( 795 self, 796 state: Literal["open", "closed", "all"] = "all", 797 milestone: Optional[Union[Milestone, str]] = None, 798 labels: Optional[List[str]] = None, 799 ) -> List["DesignReview"]: 800 """ 801 Get all Design Reviews of this Repository. 802 803 https://hub.allspice.io/api/swagger#/repository/repoListPullRequests 804 805 :param state: The state of the Design Reviews to get. If None, all Design Reviews 806 are returned. 807 :param milestone: The milestone of the Design Reviews to get. 808 :param labels: A list of label IDs to filter DRs by. 809 :return: A list of Design Reviews. 810 """ 811 812 params = { 813 "state": state, 814 } 815 if milestone: 816 if isinstance(milestone, Milestone): 817 params["milestone"] = milestone.name 818 else: 819 params["milestone"] = milestone 820 if labels: 821 params["labels"] = ",".join(labels) 822 823 results = self.allspice_client.requests_get_paginated( 824 self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name), 825 params=params, 826 ) 827 return [DesignReview.parse_response(self.allspice_client, result) for result in results] 828 829 def get_commits( 830 self, 831 sha: Optional[str] = None, 832 path: Optional[str] = None, 833 stat: bool = True, 834 ) -> List["Commit"]: 835 """ 836 Get all the Commits of this Repository. 837 838 https://hub.allspice.io/api/swagger#/repository/repoGetAllCommits 839 840 :param sha: The SHA of the commit to start listing commits from. 841 :param path: filepath of a file/dir. 842 :param stat: Include the number of additions and deletions in the response. 843 Disable for speedup. 844 :return: A list of Commits. 845 """ 846 847 data = {} 848 if sha: 849 data["sha"] = sha 850 if path: 851 data["path"] = path 852 if not stat: 853 data["stat"] = False 854 855 try: 856 results = self.allspice_client.requests_get_paginated( 857 Repository.REPO_COMMITS % (self.owner.username, self.name), 858 params=data, 859 ) 860 except ConflictException as err: 861 logging.warning(err) 862 logging.warning("Repository %s/%s is Empty" % (self.owner.username, self.name)) 863 results = [] 864 return [Commit.parse_response(self.allspice_client, result) for result in results] 865 866 def get_issues_state(self, state) -> List["Issue"]: 867 """ 868 DEPRECATED: Use get_issues() instead. 869 870 Get issues of state Issue.open or Issue.closed of a repository. 871 """ 872 873 assert state in [Issue.OPENED, Issue.CLOSED] 874 issues = [] 875 data = {"state": state} 876 results = self.allspice_client.requests_get_paginated( 877 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 878 params=data, 879 ) 880 for result in results: 881 issue = Issue.parse_response(self.allspice_client, result) 882 # adding data not contained in the issue response 883 # See Issue.request() 884 setattr(issue, "_repository", self) 885 Issue._add_read_property("repo", self, issue) 886 Issue._add_read_property("owner", self.owner, issue) 887 issues.append(issue) 888 return issues 889 890 def get_times(self): 891 results = self.allspice_client.requests_get( 892 Repository.REPO_TIMES % (self.owner.username, self.name) 893 ) 894 return results 895 896 def get_user_time(self, username) -> float: 897 if isinstance(username, User): 898 username = username.username 899 results = self.allspice_client.requests_get( 900 Repository.REPO_USER_TIME % (self.owner.username, self.name, username) 901 ) 902 time = sum(r["time"] for r in results) 903 return time 904 905 def get_full_name(self) -> str: 906 return self.owner.username + "/" + self.name 907 908 def create_issue(self, title, assignees=frozenset(), description="") -> ApiObject: 909 data = { 910 "assignees": assignees, 911 "body": description, 912 "closed": False, 913 "title": title, 914 } 915 result = self.allspice_client.requests_post( 916 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 917 data=data, 918 ) 919 920 issue = Issue.parse_response(self.allspice_client, result) 921 setattr(issue, "_repository", self) 922 Issue._add_read_property("repo", self, issue) 923 return issue 924 925 def create_design_review( 926 self, 927 title: str, 928 head: Union[Branch, str], 929 base: Union[Branch, str], 930 assignees: Optional[Set[Union[User, str]]] = None, 931 body: Optional[str] = None, 932 due_date: Optional[datetime] = None, 933 milestone: Optional["Milestone"] = None, 934 ) -> "DesignReview": 935 """ 936 Create a new Design Review. 937 938 See https://hub.allspice.io/api/swagger#/repository/repoCreatePullRequest 939 940 :param title: Title of the Design Review 941 :param head: Branch or name of the branch to merge into the base branch 942 :param base: Branch or name of the branch to merge into 943 :param assignees: Optional. A list of users to assign this review. List can be of 944 User objects or of usernames. 945 :param body: An Optional Description for the Design Review. 946 :param due_date: An Optional Due date for the Design Review. 947 :param milestone: An Optional Milestone for the Design Review 948 :return: The created Design Review 949 """ 950 951 data: dict[str, Any] = { 952 "title": title, 953 } 954 955 if isinstance(head, Branch): 956 data["head"] = head.name 957 else: 958 data["head"] = head 959 if isinstance(base, Branch): 960 data["base"] = base.name 961 else: 962 data["base"] = base 963 if assignees: 964 data["assignees"] = [a.username if isinstance(a, User) else a for a in assignees] 965 if body: 966 data["body"] = body 967 if due_date: 968 data["due_date"] = Util.format_time(due_date) 969 if milestone: 970 data["milestone"] = milestone.id 971 972 result = self.allspice_client.requests_post( 973 self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name), 974 data=data, 975 ) 976 977 return DesignReview.parse_response(self.allspice_client, result) 978 979 def create_milestone( 980 self, 981 title: str, 982 description: str, 983 due_date: Optional[str] = None, 984 state: str = "open", 985 ) -> "Milestone": 986 url = Repository.REPO_MILESTONES.format(owner=self.owner.username, repo=self.name) 987 data = {"title": title, "description": description, "state": state} 988 if due_date: 989 data["due_date"] = due_date 990 result = self.allspice_client.requests_post(url, data=data) 991 return Milestone.parse_response(self.allspice_client, result) 992 993 def create_gitea_hook(self, hook_url: str, events: List[str]): 994 url = f"/repos/{self.owner.username}/{self.name}/hooks" 995 data = { 996 "type": "gitea", 997 "config": {"content_type": "json", "url": hook_url}, 998 "events": events, 999 "active": True, 1000 } 1001 return self.allspice_client.requests_post(url, data=data) 1002 1003 def list_hooks(self): 1004 url = f"/repos/{self.owner.username}/{self.name}/hooks" 1005 return self.allspice_client.requests_get(url) 1006 1007 def delete_hook(self, id: str): 1008 url = f"/repos/{self.owner.username}/{self.name}/hooks/{id}" 1009 self.allspice_client.requests_delete(url) 1010 1011 def is_collaborator(self, username) -> bool: 1012 if isinstance(username, User): 1013 username = username.username 1014 try: 1015 # returns 204 if its ok, 404 if its not 1016 self.allspice_client.requests_get( 1017 Repository.REPO_IS_COLLABORATOR % (self.owner.username, self.name, username) 1018 ) 1019 return True 1020 except Exception: 1021 return False 1022 1023 def get_users_with_access(self) -> Sequence[User]: 1024 url = f"/repos/{self.owner.username}/{self.name}/collaborators" 1025 response = self.allspice_client.requests_get(url) 1026 collabs = [User.parse_response(self.allspice_client, user) for user in response] 1027 if isinstance(self.owner, User): 1028 return [*collabs, self.owner] 1029 else: 1030 # owner must be org 1031 teams = self.owner.get_teams() 1032 for team in teams: 1033 team_repos = team.get_repos() 1034 if self.name in [n.name for n in team_repos]: 1035 collabs += team.get_members() 1036 return collabs 1037 1038 def remove_collaborator(self, user_name: str): 1039 url = f"/repos/{self.owner.username}/{self.name}/collaborators/{user_name}" 1040 self.allspice_client.requests_delete(url) 1041 1042 def transfer_ownership( 1043 self, 1044 new_owner: Union[User, Organization], 1045 new_teams: Set[Team] | FrozenSet[Team] = frozenset(), 1046 ): 1047 url = Repository.REPO_TRANSFER.format(owner=self.owner.username, repo=self.name) 1048 data: dict[str, Any] = {"new_owner": new_owner.username} 1049 if isinstance(new_owner, Organization): 1050 new_team_ids = [team.id for team in new_teams if team in new_owner.get_teams()] 1051 data["team_ids"] = new_team_ids 1052 self.allspice_client.requests_post(url, data=data) 1053 # TODO: make sure this instance is either updated or discarded 1054 1055 def get_git_content( 1056 self, 1057 ref: Optional["Ref"] = None, 1058 commit: "Optional[Commit]" = None, 1059 ) -> List[Content]: 1060 """ 1061 Get the metadata for all files in the root directory. 1062 1063 https://hub.allspice.io/api/swagger#/repository/repoGetContentsList 1064 1065 :param ref: branch or commit to get content from 1066 :param commit: commit to get content from (deprecated) 1067 """ 1068 url = f"/repos/{self.owner.username}/{self.name}/contents" 1069 data = Util.data_params_for_ref(ref or commit) 1070 1071 result = [ 1072 Content.parse_response(self.allspice_client, f) 1073 for f in self.allspice_client.requests_get(url, data) 1074 ] 1075 return result 1076 1077 def get_tree(self, ref: Optional[Ref] = None, recursive: bool = False) -> List[GitEntry]: 1078 """ 1079 Get the repository's tree on a given ref. 1080 1081 By default, this will only return the top-level entries in the tree. If you want 1082 to get the entire tree, set `recursive` to True. 1083 1084 :param ref: The ref to get the tree from. If not provided, the default branch is used. 1085 :param recursive: Whether to get the entire tree or just the top-level entries. 1086 """ 1087 1088 ref = Util.data_params_for_ref(ref).get("ref", self.default_branch) 1089 url = self.REPO_GET_TREE.format(owner=self.owner.username, repo=self.name, ref=ref) 1090 params = {"recursive": recursive} 1091 results = self.allspice_client.requests_get_paginated(url, params=params) 1092 return [GitEntry.parse_response(self.allspice_client, result) for result in results] 1093 1094 def get_file_content( 1095 self, 1096 content: Content, 1097 ref: Optional[Ref] = None, 1098 commit: Optional[Commit] = None, 1099 ) -> Union[str, List["Content"]]: 1100 """https://hub.allspice.io/api/swagger#/repository/repoGetContents""" 1101 url = f"/repos/{self.owner.username}/{self.name}/contents/{content.path}" 1102 data = Util.data_params_for_ref(ref or commit) 1103 1104 if content.type == Content.FILE: 1105 return self.allspice_client.requests_get(url, data)["content"] 1106 else: 1107 return [ 1108 Content.parse_response(self.allspice_client, f) 1109 for f in self.allspice_client.requests_get(url, data) 1110 ] 1111 1112 def get_raw_file( 1113 self, 1114 file_path: str, 1115 ref: Optional[Ref] = None, 1116 ) -> bytes: 1117 """ 1118 Get the raw, binary data of a single file. 1119 1120 Note 1: if the file you are requesting is a text file, you might want to 1121 use .decode() on the result to get a string. For example: 1122 1123 content = repo.get_raw_file("file.txt").decode("utf-8") 1124 1125 Note 2: this method will store the entire file in memory. If you want 1126 to download a large file, you might want to use `download_to_file` 1127 instead. 1128 1129 See https://hub.allspice.io/api/swagger#/repository/repoGetRawFile 1130 1131 :param file_path: The path to the file to get. 1132 :param ref: The branch or commit to get the file from. If not provided, 1133 the default branch is used. 1134 """ 1135 1136 url = self.REPO_GET_RAW_FILE.format( 1137 owner=self.owner.username, 1138 repo=self.name, 1139 path=file_path, 1140 ) 1141 params = Util.data_params_for_ref(ref) 1142 return self.allspice_client.requests_get_raw(url, params=params) 1143 1144 def download_to_file( 1145 self, 1146 file_path: str, 1147 io: IO, 1148 ref: Optional[Ref] = None, 1149 ) -> None: 1150 """ 1151 Download the binary data of a file to a file-like object. 1152 1153 Example: 1154 1155 with open("schematic.DSN", "wb") as f: 1156 Repository.download_to_file("Schematics/my_schematic.DSN", f) 1157 1158 :param file_path: The path to the file in the repository from the root 1159 of the repository. 1160 :param io: The file-like object to write the data to. 1161 """ 1162 1163 url = self.allspice_client._AllSpice__get_url( 1164 self.REPO_GET_RAW_FILE.format( 1165 owner=self.owner.username, 1166 repo=self.name, 1167 path=file_path, 1168 ) 1169 ) 1170 params = Util.data_params_for_ref(ref) 1171 response = self.allspice_client.requests.get( 1172 url, 1173 params=params, 1174 headers=self.allspice_client.headers, 1175 stream=True, 1176 ) 1177 1178 for chunk in response.iter_content(chunk_size=4096): 1179 if chunk: 1180 io.write(chunk) 1181 1182 def get_generated_json(self, content: Union[Content, str], ref: Optional[Ref] = None) -> dict: 1183 """ 1184 Get the json blob for a cad file if it exists, otherwise enqueue 1185 a new job and return a 503 status. 1186 1187 WARNING: This is still experimental and not recommended for critical 1188 applications. The structure and content of the returned dictionary can 1189 change at any time. 1190 1191 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON 1192 """ 1193 1194 if isinstance(content, Content): 1195 content = content.path 1196 1197 url = self.REPO_GET_ALLSPICE_JSON.format( 1198 owner=self.owner.username, 1199 repo=self.name, 1200 content=content, 1201 ) 1202 data = Util.data_params_for_ref(ref) 1203 return self.allspice_client.requests_get(url, data) 1204 1205 def get_generated_svg(self, content: Union[Content, str], ref: Optional[Ref] = None) -> bytes: 1206 """ 1207 Get the svg blob for a cad file if it exists, otherwise enqueue 1208 a new job and return a 503 status. 1209 1210 WARNING: This is still experimental and not yet recommended for 1211 critical applications. The content of the returned svg can change 1212 at any time. 1213 1214 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG 1215 """ 1216 1217 if isinstance(content, Content): 1218 content = content.path 1219 1220 url = self.REPO_GET_ALLSPICE_SVG.format( 1221 owner=self.owner.username, 1222 repo=self.name, 1223 content=content, 1224 ) 1225 data = Util.data_params_for_ref(ref) 1226 return self.allspice_client.requests_get_raw(url, data) 1227 1228 def create_file(self, file_path: str, content: str, data: Optional[dict] = None): 1229 """https://hub.allspice.io/api/swagger#/repository/repoCreateFile""" 1230 if not data: 1231 data = {} 1232 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1233 data.update({"content": content}) 1234 return self.allspice_client.requests_post(url, data) 1235 1236 def change_file(self, file_path: str, file_sha: str, content: str, data: Optional[dict] = None): 1237 """https://hub.allspice.io/api/swagger#/repository/repoCreateFile""" 1238 if not data: 1239 data = {} 1240 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1241 data.update({"sha": file_sha, "content": content}) 1242 return self.allspice_client.requests_put(url, data) 1243 1244 def delete_file(self, file_path: str, file_sha: str, data: Optional[dict] = None): 1245 """https://hub.allspice.io/api/swagger#/repository/repoDeleteFile""" 1246 if not data: 1247 data = {} 1248 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1249 data.update({"sha": file_sha}) 1250 return self.allspice_client.requests_delete(url, data) 1251 1252 def get_archive( 1253 self, 1254 ref: Ref = "main", 1255 archive_format: ArchiveFormat = ArchiveFormat.ZIP, 1256 ) -> bytes: 1257 """ 1258 Download all the files in a specific ref of a repository as a zip or tarball 1259 archive. 1260 1261 https://hub.allspice.io/api/swagger#/repository/repoGetArchive 1262 1263 :param ref: branch or commit to get content from, defaults to the "main" branch 1264 :param archive_format: zip or tar, defaults to zip 1265 """ 1266 1267 ref_string = Util.data_params_for_ref(ref)["ref"] 1268 url = self.REPO_GET_ARCHIVE.format( 1269 owner=self.owner.username, 1270 repo=self.name, 1271 ref=ref_string, 1272 format=archive_format.value, 1273 ) 1274 return self.allspice_client.requests_get_raw(url) 1275 1276 def get_topics(self) -> list[str]: 1277 """ 1278 Gets the list of topics on this repository. 1279 1280 See http://localhost:3000/api/swagger#/repository/repoListTopics 1281 """ 1282 1283 url = self.REPO_GET_TOPICS.format( 1284 owner=self.owner.username, 1285 repo=self.name, 1286 ) 1287 return self.allspice_client.requests_get(url)["topics"] 1288 1289 def add_topic(self, topic: str): 1290 """ 1291 Adds a topic to the repository. 1292 1293 See https://hub.allspice.io/api/swagger#/repository/repoAddTopic 1294 1295 :param topic: The topic to add. Topic names must consist only of 1296 lowercase letters, numnbers and dashes (-), and cannot start with 1297 dashes. Topic names also must be under 35 characters long. 1298 """ 1299 1300 url = self.REPO_ADD_TOPIC.format(owner=self.owner.username, repo=self.name, topic=topic) 1301 self.allspice_client.requests_put(url) 1302 1303 def create_release( 1304 self, 1305 tag_name: str, 1306 name: Optional[str] = None, 1307 body: Optional[str] = None, 1308 draft: bool = False, 1309 ): 1310 """ 1311 Create a release for this repository. The release will be created for 1312 the tag with the given name. If there is no tag with this name, create 1313 the tag first. 1314 1315 See https://hub.allspice.io/api/swagger#/repository/repoCreateRelease 1316 """ 1317 1318 url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name) 1319 data = { 1320 "tag_name": tag_name, 1321 "draft": draft, 1322 } 1323 if name is not None: 1324 data["name"] = name 1325 if body is not None: 1326 data["body"] = body 1327 response = self.allspice_client.requests_post(url, data) 1328 return Release.parse_response(self.allspice_client, response, self) 1329 1330 def get_releases( 1331 self, draft: Optional[bool] = None, pre_release: Optional[bool] = None 1332 ) -> List[Release]: 1333 """ 1334 Get the list of releases for this repository. 1335 1336 See https://hub.allspice.io/api/swagger#/repository/repoListReleases 1337 """ 1338 1339 data = {} 1340 1341 if draft is not None: 1342 data["draft"] = draft 1343 if pre_release is not None: 1344 data["pre-release"] = pre_release 1345 1346 url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name) 1347 responses = self.allspice_client.requests_get_paginated(url, params=data) 1348 1349 return [ 1350 Release.parse_response(self.allspice_client, response, self) for response in responses 1351 ] 1352 1353 def get_latest_release(self) -> Release: 1354 """ 1355 Get the latest release for this repository. 1356 1357 See https://hub.allspice.io/api/swagger#/repository/repoGetLatestRelease 1358 """ 1359 1360 url = self.REPO_GET_LATEST_RELEASE.format(owner=self.owner.username, repo=self.name) 1361 response = self.allspice_client.requests_get(url) 1362 release = Release.parse_response(self.allspice_client, response, self) 1363 return release 1364 1365 def get_release_by_tag(self, tag: str) -> Release: 1366 """ 1367 Get a release by its tag. 1368 1369 See https://hub.allspice.io/api/swagger#/repository/repoGetReleaseByTag 1370 """ 1371 1372 url = self.REPO_GET_RELEASE_BY_TAG.format( 1373 owner=self.owner.username, repo=self.name, tag=tag 1374 ) 1375 response = self.allspice_client.requests_get(url) 1376 release = Release.parse_response(self.allspice_client, response, self) 1377 return release 1378 1379 def get_commit_statuses( 1380 self, 1381 commit: Union[str, Commit], 1382 sort: Optional[CommitStatusSort] = None, 1383 state: Optional[CommitStatusState] = None, 1384 ) -> List[CommitStatus]: 1385 """ 1386 Get a list of statuses for a commit. 1387 1388 This is roughly equivalent to the Commit.get_statuses method, but this 1389 method allows you to sort and filter commits and is more convenient if 1390 you have a commit SHA and don't need to get the commit itself. 1391 1392 See https://hub.allspice.io/api/swagger#/repository/repoListStatuses 1393 """ 1394 1395 if isinstance(commit, Commit): 1396 commit = commit.sha 1397 1398 params = {} 1399 if sort is not None: 1400 params["sort"] = sort.value 1401 if state is not None: 1402 params["state"] = state.value 1403 1404 url = self.REPO_GET_COMMIT_STATUS.format( 1405 owner=self.owner.username, repo=self.name, sha=commit 1406 ) 1407 response = self.allspice_client.requests_get_paginated(url, params=params) 1408 return [CommitStatus.parse_response(self.allspice_client, status) for status in response] 1409 1410 def create_commit_status( 1411 self, 1412 commit: Union[str, Commit], 1413 context: Optional[str] = None, 1414 description: Optional[str] = None, 1415 state: Optional[CommitStatusState] = None, 1416 target_url: Optional[str] = None, 1417 ) -> CommitStatus: 1418 """ 1419 Create a status on a commit. 1420 1421 See https://hub.allspice.io/api/swagger#/repository/repoCreateStatus 1422 """ 1423 1424 if isinstance(commit, Commit): 1425 commit = commit.sha 1426 1427 data = {} 1428 if context is not None: 1429 data["context"] = context 1430 if description is not None: 1431 data["description"] = description 1432 if state is not None: 1433 data["state"] = state.value 1434 if target_url is not None: 1435 data["target_url"] = target_url 1436 1437 url = self.REPO_GET_COMMIT_STATUS.format( 1438 owner=self.owner.username, repo=self.name, sha=commit 1439 ) 1440 response = self.allspice_client.requests_post(url, data=data) 1441 return CommitStatus.parse_response(self.allspice_client, response) 1442 1443 def delete(self): 1444 self.allspice_client.requests_delete( 1445 Repository.REPO_DELETE % (self.owner.username, self.name) 1446 ) 1447 self.deleted = True
616 @classmethod 617 def search( 618 cls, 619 allspice_client, 620 query: Optional[str] = None, 621 topic: bool = False, 622 include_description: bool = False, 623 user: Optional[User] = None, 624 owner_to_prioritize: Union[User, Organization, None] = None, 625 ) -> list[Repository]: 626 """ 627 Search for repositories. 628 629 See https://hub.allspice.io/api/swagger#/repository/repoSearch 630 631 :param query: The query string to search for 632 :param topic: If true, the query string will only be matched against the 633 repository's topic. 634 :param include_description: If true, the query string will be matched 635 against the repository's description as well. 636 :param user: If specified, only repositories that this user owns or 637 contributes to will be searched. 638 :param owner_to_prioritize: If specified, repositories owned by the 639 given entity will be prioritized in the search. 640 :returns: All repositories matching the query. If there are many 641 repositories matching this query, this may take some time. 642 """ 643 644 params = {} 645 646 if query is not None: 647 params["q"] = query 648 if topic: 649 params["topic"] = topic 650 if include_description: 651 params["include_description"] = include_description 652 if user is not None: 653 params["user"] = user.id 654 if owner_to_prioritize is not None: 655 params["owner_to_prioritize"] = owner_to_prioritize.id 656 657 responses = allspice_client.requests_get_paginated(cls.REPO_SEARCH, params=params) 658 659 return [Repository.parse_response(allspice_client, response) for response in responses]
Search for repositories.
See allspice.allspice.io/api/swagger#/repository/repoSearch">https://huballspice.allspice.io/api/swagger#/repository/repoSearch
Parameters
- query: The query string to search for
- topic: If true, the query string will only be matched against the repository's topic.
- include_description: If true, the query string will be matched against the repository's description as well.
- user: If specified, only repositories that this user owns or contributes to will be searched.
- owner_to_prioritize: If specified, repositories owned by the given entity will be prioritized in the search. :returns: All repositories matching the query. If there are many repositories matching this query, this may take some time.
695 def get_branches(self) -> List["Branch"]: 696 """Get all the Branches of this Repository.""" 697 698 results = self.allspice_client.requests_get_paginated( 699 Repository.REPO_BRANCHES % (self.owner.username, self.name) 700 ) 701 return [Branch.parse_response(self.allspice_client, result) for result in results]
Get all the Branches of this Repository.
703 def get_branch(self, name: str) -> "Branch": 704 """Get a specific Branch of this Repository.""" 705 result = self.allspice_client.requests_get( 706 Repository.REPO_BRANCH.format(owner=self.owner.username, repo=self.name, branch=name) 707 ) 708 return Branch.parse_response(self.allspice_client, result)
Get a specific Branch of this Repository.
710 def add_branch(self, create_from: Ref, newname: str) -> "Branch": 711 """Add a branch to the repository""" 712 # Note: will only work with gitea 1.13 or higher! 713 714 ref_name = Util.data_params_for_ref(create_from) 715 if "ref" not in ref_name: 716 raise ValueError("create_from must be a Branch, Commit or string") 717 ref_name = ref_name["ref"] 718 719 data = {"new_branch_name": newname, "old_ref_name": ref_name} 720 result = self.allspice_client.requests_post( 721 Repository.REPO_BRANCHES % (self.owner.username, self.name), data=data 722 ) 723 return Branch.parse_response(self.allspice_client, result)
Add a branch to the repository
725 def get_issues( 726 self, 727 state: Literal["open", "closed", "all"] = "all", 728 search_query: Optional[str] = None, 729 labels: Optional[List[str]] = None, 730 milestones: Optional[List[Union[Milestone, str]]] = None, 731 assignee: Optional[Union[User, str]] = None, 732 since: Optional[datetime] = None, 733 before: Optional[datetime] = None, 734 ) -> List["Issue"]: 735 """ 736 Get all Issues of this Repository (open and closed) 737 738 https://hub.allspice.io/api/swagger#/repository/repoListIssues 739 740 All params of this method are optional filters. If you don't specify a filter, it 741 will not be applied. 742 743 :param state: The state of the Issues to get. If None, all Issues are returned. 744 :param search_query: Filter issues by text. This is equivalent to searching for 745 `search_query` in the Issues on the web interface. 746 :param labels: Filter issues by labels. 747 :param milestones: Filter issues by milestones. 748 :param assignee: Filter issues by the assigned user. 749 :param since: Filter issues by the date they were created. 750 :param before: Filter issues by the date they were created. 751 :return: A list of Issues. 752 """ 753 754 data = { 755 "state": state, 756 } 757 if search_query: 758 data["q"] = search_query 759 if labels: 760 data["labels"] = ",".join(labels) 761 if milestones: 762 data["milestone"] = ",".join( 763 [ 764 milestone.name if isinstance(milestone, Milestone) else milestone 765 for milestone in milestones 766 ] 767 ) 768 if assignee: 769 if isinstance(assignee, User): 770 data["assignee"] = assignee.username 771 else: 772 data["assignee"] = assignee 773 if since: 774 data["since"] = Util.format_time(since) 775 if before: 776 data["before"] = Util.format_time(before) 777 778 results = self.allspice_client.requests_get_paginated( 779 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 780 params=data, 781 ) 782 783 issues = [] 784 for result in results: 785 issue = Issue.parse_response(self.allspice_client, result) 786 # See Issue.request 787 setattr(issue, "_repository", self) 788 # This is mostly for compatibility with an older implementation 789 Issue._add_read_property("repo", self, issue) 790 issues.append(issue) 791 792 return issues
Get all Issues of this Repository (open and closed)
allspice.allspice.io/api/swagger#/repository/repoListIssues">https://huballspice.allspice.io/api/swagger#/repository/repoListIssues
All params of this method are optional filters. If you don't specify a filter, it will not be applied.
Parameters
- state: The state of the Issues to get. If None, all Issues are returned.
- search_query: Filter issues by text. This is equivalent to searching for
search_query
in the Issues on the web interface. - labels: Filter issues by labels.
- milestones: Filter issues by milestones.
- assignee: Filter issues by the assigned user.
- since: Filter issues by the date they were created.
- before: Filter issues by the date they were created.
Returns
A list of Issues.
794 def get_design_reviews( 795 self, 796 state: Literal["open", "closed", "all"] = "all", 797 milestone: Optional[Union[Milestone, str]] = None, 798 labels: Optional[List[str]] = None, 799 ) -> List["DesignReview"]: 800 """ 801 Get all Design Reviews of this Repository. 802 803 https://hub.allspice.io/api/swagger#/repository/repoListPullRequests 804 805 :param state: The state of the Design Reviews to get. If None, all Design Reviews 806 are returned. 807 :param milestone: The milestone of the Design Reviews to get. 808 :param labels: A list of label IDs to filter DRs by. 809 :return: A list of Design Reviews. 810 """ 811 812 params = { 813 "state": state, 814 } 815 if milestone: 816 if isinstance(milestone, Milestone): 817 params["milestone"] = milestone.name 818 else: 819 params["milestone"] = milestone 820 if labels: 821 params["labels"] = ",".join(labels) 822 823 results = self.allspice_client.requests_get_paginated( 824 self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name), 825 params=params, 826 ) 827 return [DesignReview.parse_response(self.allspice_client, result) for result in results]
Get all Design Reviews of this Repository.
allspice.allspice.io/api/swagger#/repository/repoListPullRequests">https://huballspice.allspice.io/api/swagger#/repository/repoListPullRequests
Parameters
- state: The state of the Design Reviews to get. If None, all Design Reviews are returned.
- milestone: The milestone of the Design Reviews to get.
- labels: A list of label IDs to filter DRs by.
Returns
A list of Design Reviews.
829 def get_commits( 830 self, 831 sha: Optional[str] = None, 832 path: Optional[str] = None, 833 stat: bool = True, 834 ) -> List["Commit"]: 835 """ 836 Get all the Commits of this Repository. 837 838 https://hub.allspice.io/api/swagger#/repository/repoGetAllCommits 839 840 :param sha: The SHA of the commit to start listing commits from. 841 :param path: filepath of a file/dir. 842 :param stat: Include the number of additions and deletions in the response. 843 Disable for speedup. 844 :return: A list of Commits. 845 """ 846 847 data = {} 848 if sha: 849 data["sha"] = sha 850 if path: 851 data["path"] = path 852 if not stat: 853 data["stat"] = False 854 855 try: 856 results = self.allspice_client.requests_get_paginated( 857 Repository.REPO_COMMITS % (self.owner.username, self.name), 858 params=data, 859 ) 860 except ConflictException as err: 861 logging.warning(err) 862 logging.warning("Repository %s/%s is Empty" % (self.owner.username, self.name)) 863 results = [] 864 return [Commit.parse_response(self.allspice_client, result) for result in results]
Get all the Commits of this Repository.
allspice.allspice.io/api/swagger#/repository/repoGetAllCommits">https://huballspice.allspice.io/api/swagger#/repository/repoGetAllCommits
Parameters
- sha: The SHA of the commit to start listing commits from.
- path: filepath of a file/dir.
- stat: Include the number of additions and deletions in the response. Disable for speedup.
Returns
A list of Commits.
866 def get_issues_state(self, state) -> List["Issue"]: 867 """ 868 DEPRECATED: Use get_issues() instead. 869 870 Get issues of state Issue.open or Issue.closed of a repository. 871 """ 872 873 assert state in [Issue.OPENED, Issue.CLOSED] 874 issues = [] 875 data = {"state": state} 876 results = self.allspice_client.requests_get_paginated( 877 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 878 params=data, 879 ) 880 for result in results: 881 issue = Issue.parse_response(self.allspice_client, result) 882 # adding data not contained in the issue response 883 # See Issue.request() 884 setattr(issue, "_repository", self) 885 Issue._add_read_property("repo", self, issue) 886 Issue._add_read_property("owner", self.owner, issue) 887 issues.append(issue) 888 return issues
DEPRECATED: Use get_issues() instead.
Get issues of state Issue.open or Issue.closed of a repository.
896 def get_user_time(self, username) -> float: 897 if isinstance(username, User): 898 username = username.username 899 results = self.allspice_client.requests_get( 900 Repository.REPO_USER_TIME % (self.owner.username, self.name, username) 901 ) 902 time = sum(r["time"] for r in results) 903 return time
908 def create_issue(self, title, assignees=frozenset(), description="") -> ApiObject: 909 data = { 910 "assignees": assignees, 911 "body": description, 912 "closed": False, 913 "title": title, 914 } 915 result = self.allspice_client.requests_post( 916 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 917 data=data, 918 ) 919 920 issue = Issue.parse_response(self.allspice_client, result) 921 setattr(issue, "_repository", self) 922 Issue._add_read_property("repo", self, issue) 923 return issue
925 def create_design_review( 926 self, 927 title: str, 928 head: Union[Branch, str], 929 base: Union[Branch, str], 930 assignees: Optional[Set[Union[User, str]]] = None, 931 body: Optional[str] = None, 932 due_date: Optional[datetime] = None, 933 milestone: Optional["Milestone"] = None, 934 ) -> "DesignReview": 935 """ 936 Create a new Design Review. 937 938 See https://hub.allspice.io/api/swagger#/repository/repoCreatePullRequest 939 940 :param title: Title of the Design Review 941 :param head: Branch or name of the branch to merge into the base branch 942 :param base: Branch or name of the branch to merge into 943 :param assignees: Optional. A list of users to assign this review. List can be of 944 User objects or of usernames. 945 :param body: An Optional Description for the Design Review. 946 :param due_date: An Optional Due date for the Design Review. 947 :param milestone: An Optional Milestone for the Design Review 948 :return: The created Design Review 949 """ 950 951 data: dict[str, Any] = { 952 "title": title, 953 } 954 955 if isinstance(head, Branch): 956 data["head"] = head.name 957 else: 958 data["head"] = head 959 if isinstance(base, Branch): 960 data["base"] = base.name 961 else: 962 data["base"] = base 963 if assignees: 964 data["assignees"] = [a.username if isinstance(a, User) else a for a in assignees] 965 if body: 966 data["body"] = body 967 if due_date: 968 data["due_date"] = Util.format_time(due_date) 969 if milestone: 970 data["milestone"] = milestone.id 971 972 result = self.allspice_client.requests_post( 973 self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name), 974 data=data, 975 ) 976 977 return DesignReview.parse_response(self.allspice_client, result)
Create a new Design Review.
See allspice.allspice.io/api/swagger#/repository/repoCreatePullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoCreatePullRequest
Parameters
- title: Title of the Design Review
- head: Branch or name of the branch to merge into the base branch
- base: Branch or name of the branch to merge into
- assignees: Optional. A list of users to assign this review. List can be of User objects or of usernames.
- body: An Optional Description for the Design Review.
- due_date: An Optional Due date for the Design Review.
- milestone: An Optional Milestone for the Design Review
Returns
The created Design Review
979 def create_milestone( 980 self, 981 title: str, 982 description: str, 983 due_date: Optional[str] = None, 984 state: str = "open", 985 ) -> "Milestone": 986 url = Repository.REPO_MILESTONES.format(owner=self.owner.username, repo=self.name) 987 data = {"title": title, "description": description, "state": state} 988 if due_date: 989 data["due_date"] = due_date 990 result = self.allspice_client.requests_post(url, data=data) 991 return Milestone.parse_response(self.allspice_client, result)
993 def create_gitea_hook(self, hook_url: str, events: List[str]): 994 url = f"/repos/{self.owner.username}/{self.name}/hooks" 995 data = { 996 "type": "gitea", 997 "config": {"content_type": "json", "url": hook_url}, 998 "events": events, 999 "active": True, 1000 } 1001 return self.allspice_client.requests_post(url, data=data)
1011 def is_collaborator(self, username) -> bool: 1012 if isinstance(username, User): 1013 username = username.username 1014 try: 1015 # returns 204 if its ok, 404 if its not 1016 self.allspice_client.requests_get( 1017 Repository.REPO_IS_COLLABORATOR % (self.owner.username, self.name, username) 1018 ) 1019 return True 1020 except Exception: 1021 return False
1023 def get_users_with_access(self) -> Sequence[User]: 1024 url = f"/repos/{self.owner.username}/{self.name}/collaborators" 1025 response = self.allspice_client.requests_get(url) 1026 collabs = [User.parse_response(self.allspice_client, user) for user in response] 1027 if isinstance(self.owner, User): 1028 return [*collabs, self.owner] 1029 else: 1030 # owner must be org 1031 teams = self.owner.get_teams() 1032 for team in teams: 1033 team_repos = team.get_repos() 1034 if self.name in [n.name for n in team_repos]: 1035 collabs += team.get_members() 1036 return collabs
1042 def transfer_ownership( 1043 self, 1044 new_owner: Union[User, Organization], 1045 new_teams: Set[Team] | FrozenSet[Team] = frozenset(), 1046 ): 1047 url = Repository.REPO_TRANSFER.format(owner=self.owner.username, repo=self.name) 1048 data: dict[str, Any] = {"new_owner": new_owner.username} 1049 if isinstance(new_owner, Organization): 1050 new_team_ids = [team.id for team in new_teams if team in new_owner.get_teams()] 1051 data["team_ids"] = new_team_ids 1052 self.allspice_client.requests_post(url, data=data) 1053 # TODO: make sure this instance is either updated or discarded
1055 def get_git_content( 1056 self, 1057 ref: Optional["Ref"] = None, 1058 commit: "Optional[Commit]" = None, 1059 ) -> List[Content]: 1060 """ 1061 Get the metadata for all files in the root directory. 1062 1063 https://hub.allspice.io/api/swagger#/repository/repoGetContentsList 1064 1065 :param ref: branch or commit to get content from 1066 :param commit: commit to get content from (deprecated) 1067 """ 1068 url = f"/repos/{self.owner.username}/{self.name}/contents" 1069 data = Util.data_params_for_ref(ref or commit) 1070 1071 result = [ 1072 Content.parse_response(self.allspice_client, f) 1073 for f in self.allspice_client.requests_get(url, data) 1074 ] 1075 return result
Get the metadata for all files in the root directory.
allspice.allspice.io/api/swagger#/repository/repoGetContentsList">https://huballspice.allspice.io/api/swagger#/repository/repoGetContentsList
Parameters
- ref: branch or commit to get content from
- commit: commit to get content from (deprecated)
1077 def get_tree(self, ref: Optional[Ref] = None, recursive: bool = False) -> List[GitEntry]: 1078 """ 1079 Get the repository's tree on a given ref. 1080 1081 By default, this will only return the top-level entries in the tree. If you want 1082 to get the entire tree, set `recursive` to True. 1083 1084 :param ref: The ref to get the tree from. If not provided, the default branch is used. 1085 :param recursive: Whether to get the entire tree or just the top-level entries. 1086 """ 1087 1088 ref = Util.data_params_for_ref(ref).get("ref", self.default_branch) 1089 url = self.REPO_GET_TREE.format(owner=self.owner.username, repo=self.name, ref=ref) 1090 params = {"recursive": recursive} 1091 results = self.allspice_client.requests_get_paginated(url, params=params) 1092 return [GitEntry.parse_response(self.allspice_client, result) for result in results]
Get the repository's tree on a given ref.
By default, this will only return the top-level entries in the tree. If you want
to get the entire tree, set recursive
to True.
Parameters
- ref: The ref to get the tree from. If not provided, the default branch is used.
- recursive: Whether to get the entire tree or just the top-level entries.
1094 def get_file_content( 1095 self, 1096 content: Content, 1097 ref: Optional[Ref] = None, 1098 commit: Optional[Commit] = None, 1099 ) -> Union[str, List["Content"]]: 1100 """https://hub.allspice.io/api/swagger#/repository/repoGetContents""" 1101 url = f"/repos/{self.owner.username}/{self.name}/contents/{content.path}" 1102 data = Util.data_params_for_ref(ref or commit) 1103 1104 if content.type == Content.FILE: 1105 return self.allspice_client.requests_get(url, data)["content"] 1106 else: 1107 return [ 1108 Content.parse_response(self.allspice_client, f) 1109 for f in self.allspice_client.requests_get(url, data) 1110 ]
allspice.allspice.io/api/swagger#/repository/repoGetContents">https://huballspice.allspice.io/api/swagger#/repository/repoGetContents
1112 def get_raw_file( 1113 self, 1114 file_path: str, 1115 ref: Optional[Ref] = None, 1116 ) -> bytes: 1117 """ 1118 Get the raw, binary data of a single file. 1119 1120 Note 1: if the file you are requesting is a text file, you might want to 1121 use .decode() on the result to get a string. For example: 1122 1123 content = repo.get_raw_file("file.txt").decode("utf-8") 1124 1125 Note 2: this method will store the entire file in memory. If you want 1126 to download a large file, you might want to use `download_to_file` 1127 instead. 1128 1129 See https://hub.allspice.io/api/swagger#/repository/repoGetRawFile 1130 1131 :param file_path: The path to the file to get. 1132 :param ref: The branch or commit to get the file from. If not provided, 1133 the default branch is used. 1134 """ 1135 1136 url = self.REPO_GET_RAW_FILE.format( 1137 owner=self.owner.username, 1138 repo=self.name, 1139 path=file_path, 1140 ) 1141 params = Util.data_params_for_ref(ref) 1142 return self.allspice_client.requests_get_raw(url, params=params)
Get the raw, binary data of a single file.
Note 1: if the file you are requesting is a text file, you might want to use .decode() on the result to get a string. For example:
content = repo.get_raw_file("file.txt").decode("utf-8")
Note 2: this method will store the entire file in memory. If you want
to download a large file, you might want to use download_to_file
instead.
See allspice.allspice.io/api/swagger#/repository/repoGetRawFile">https://huballspice.allspice.io/api/swagger#/repository/repoGetRawFile
Parameters
- file_path: The path to the file to get.
- ref: The branch or commit to get the file from. If not provided, the default branch is used.
1144 def download_to_file( 1145 self, 1146 file_path: str, 1147 io: IO, 1148 ref: Optional[Ref] = None, 1149 ) -> None: 1150 """ 1151 Download the binary data of a file to a file-like object. 1152 1153 Example: 1154 1155 with open("schematic.DSN", "wb") as f: 1156 Repository.download_to_file("Schematics/my_schematic.DSN", f) 1157 1158 :param file_path: The path to the file in the repository from the root 1159 of the repository. 1160 :param io: The file-like object to write the data to. 1161 """ 1162 1163 url = self.allspice_client._AllSpice__get_url( 1164 self.REPO_GET_RAW_FILE.format( 1165 owner=self.owner.username, 1166 repo=self.name, 1167 path=file_path, 1168 ) 1169 ) 1170 params = Util.data_params_for_ref(ref) 1171 response = self.allspice_client.requests.get( 1172 url, 1173 params=params, 1174 headers=self.allspice_client.headers, 1175 stream=True, 1176 ) 1177 1178 for chunk in response.iter_content(chunk_size=4096): 1179 if chunk: 1180 io.write(chunk)
Download the binary data of a file to a file-like object.
Example:
with open("schematic.DSN", "wb") as f:
Repository.download_to_file("Schematics/my_schematic.DSN", f)
Parameters
- file_path: The path to the file in the repository from the root of the repository.
- io: The file-like object to write the data to.
1182 def get_generated_json(self, content: Union[Content, str], ref: Optional[Ref] = None) -> dict: 1183 """ 1184 Get the json blob for a cad file if it exists, otherwise enqueue 1185 a new job and return a 503 status. 1186 1187 WARNING: This is still experimental and not recommended for critical 1188 applications. The structure and content of the returned dictionary can 1189 change at any time. 1190 1191 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON 1192 """ 1193 1194 if isinstance(content, Content): 1195 content = content.path 1196 1197 url = self.REPO_GET_ALLSPICE_JSON.format( 1198 owner=self.owner.username, 1199 repo=self.name, 1200 content=content, 1201 ) 1202 data = Util.data_params_for_ref(ref) 1203 return self.allspice_client.requests_get(url, data)
Get the json blob for a cad file if it exists, otherwise enqueue a new job and return a 503 status.
WARNING: This is still experimental and not recommended for critical applications. The structure and content of the returned dictionary can change at any time.
See allspice.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON">https://huballspice.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON
1205 def get_generated_svg(self, content: Union[Content, str], ref: Optional[Ref] = None) -> bytes: 1206 """ 1207 Get the svg blob for a cad file if it exists, otherwise enqueue 1208 a new job and return a 503 status. 1209 1210 WARNING: This is still experimental and not yet recommended for 1211 critical applications. The content of the returned svg can change 1212 at any time. 1213 1214 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG 1215 """ 1216 1217 if isinstance(content, Content): 1218 content = content.path 1219 1220 url = self.REPO_GET_ALLSPICE_SVG.format( 1221 owner=self.owner.username, 1222 repo=self.name, 1223 content=content, 1224 ) 1225 data = Util.data_params_for_ref(ref) 1226 return self.allspice_client.requests_get_raw(url, data)
Get the svg blob for a cad file if it exists, otherwise enqueue a new job and return a 503 status.
WARNING: This is still experimental and not yet recommended for critical applications. The content of the returned svg can change at any time.
See allspice.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG">https://huballspice.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG
1228 def create_file(self, file_path: str, content: str, data: Optional[dict] = None): 1229 """https://hub.allspice.io/api/swagger#/repository/repoCreateFile""" 1230 if not data: 1231 data = {} 1232 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1233 data.update({"content": content}) 1234 return self.allspice_client.requests_post(url, data)
allspice.allspice.io/api/swagger#/repository/repoCreateFile">https://huballspice.allspice.io/api/swagger#/repository/repoCreateFile
1236 def change_file(self, file_path: str, file_sha: str, content: str, data: Optional[dict] = None): 1237 """https://hub.allspice.io/api/swagger#/repository/repoCreateFile""" 1238 if not data: 1239 data = {} 1240 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1241 data.update({"sha": file_sha, "content": content}) 1242 return self.allspice_client.requests_put(url, data)
allspice.allspice.io/api/swagger#/repository/repoCreateFile">https://huballspice.allspice.io/api/swagger#/repository/repoCreateFile
1244 def delete_file(self, file_path: str, file_sha: str, data: Optional[dict] = None): 1245 """https://hub.allspice.io/api/swagger#/repository/repoDeleteFile""" 1246 if not data: 1247 data = {} 1248 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1249 data.update({"sha": file_sha}) 1250 return self.allspice_client.requests_delete(url, data)
allspice.allspice.io/api/swagger#/repository/repoDeleteFile">https://huballspice.allspice.io/api/swagger#/repository/repoDeleteFile
1252 def get_archive( 1253 self, 1254 ref: Ref = "main", 1255 archive_format: ArchiveFormat = ArchiveFormat.ZIP, 1256 ) -> bytes: 1257 """ 1258 Download all the files in a specific ref of a repository as a zip or tarball 1259 archive. 1260 1261 https://hub.allspice.io/api/swagger#/repository/repoGetArchive 1262 1263 :param ref: branch or commit to get content from, defaults to the "main" branch 1264 :param archive_format: zip or tar, defaults to zip 1265 """ 1266 1267 ref_string = Util.data_params_for_ref(ref)["ref"] 1268 url = self.REPO_GET_ARCHIVE.format( 1269 owner=self.owner.username, 1270 repo=self.name, 1271 ref=ref_string, 1272 format=archive_format.value, 1273 ) 1274 return self.allspice_client.requests_get_raw(url)
Download all the files in a specific ref of a repository as a zip or tarball archive.
allspice.allspice.io/api/swagger#/repository/repoGetArchive">https://huballspice.allspice.io/api/swagger#/repository/repoGetArchive
Parameters
- ref: branch or commit to get content from, defaults to the "main" branch
- archive_format: zip or tar, defaults to zip
1276 def get_topics(self) -> list[str]: 1277 """ 1278 Gets the list of topics on this repository. 1279 1280 See http://localhost:3000/api/swagger#/repository/repoListTopics 1281 """ 1282 1283 url = self.REPO_GET_TOPICS.format( 1284 owner=self.owner.username, 1285 repo=self.name, 1286 ) 1287 return self.allspice_client.requests_get(url)["topics"]
Gets the list of topics on this repository.
See http://localhost:3000/api/swagger#/repository/repoListTopics
1289 def add_topic(self, topic: str): 1290 """ 1291 Adds a topic to the repository. 1292 1293 See https://hub.allspice.io/api/swagger#/repository/repoAddTopic 1294 1295 :param topic: The topic to add. Topic names must consist only of 1296 lowercase letters, numnbers and dashes (-), and cannot start with 1297 dashes. Topic names also must be under 35 characters long. 1298 """ 1299 1300 url = self.REPO_ADD_TOPIC.format(owner=self.owner.username, repo=self.name, topic=topic) 1301 self.allspice_client.requests_put(url)
Adds a topic to the repository.
See allspice.allspice.io/api/swagger#/repository/repoAddTopic">https://huballspice.allspice.io/api/swagger#/repository/repoAddTopic
Parameters
- topic: The topic to add. Topic names must consist only of lowercase letters, numnbers and dashes (-), and cannot start with dashes. Topic names also must be under 35 characters long.
1303 def create_release( 1304 self, 1305 tag_name: str, 1306 name: Optional[str] = None, 1307 body: Optional[str] = None, 1308 draft: bool = False, 1309 ): 1310 """ 1311 Create a release for this repository. The release will be created for 1312 the tag with the given name. If there is no tag with this name, create 1313 the tag first. 1314 1315 See https://hub.allspice.io/api/swagger#/repository/repoCreateRelease 1316 """ 1317 1318 url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name) 1319 data = { 1320 "tag_name": tag_name, 1321 "draft": draft, 1322 } 1323 if name is not None: 1324 data["name"] = name 1325 if body is not None: 1326 data["body"] = body 1327 response = self.allspice_client.requests_post(url, data) 1328 return Release.parse_response(self.allspice_client, response, self)
Create a release for this repository. The release will be created for the tag with the given name. If there is no tag with this name, create the tag first.
See allspice.allspice.io/api/swagger#/repository/repoCreateRelease">https://huballspice.allspice.io/api/swagger#/repository/repoCreateRelease
1330 def get_releases( 1331 self, draft: Optional[bool] = None, pre_release: Optional[bool] = None 1332 ) -> List[Release]: 1333 """ 1334 Get the list of releases for this repository. 1335 1336 See https://hub.allspice.io/api/swagger#/repository/repoListReleases 1337 """ 1338 1339 data = {} 1340 1341 if draft is not None: 1342 data["draft"] = draft 1343 if pre_release is not None: 1344 data["pre-release"] = pre_release 1345 1346 url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name) 1347 responses = self.allspice_client.requests_get_paginated(url, params=data) 1348 1349 return [ 1350 Release.parse_response(self.allspice_client, response, self) for response in responses 1351 ]
Get the list of releases for this repository.
See allspice.allspice.io/api/swagger#/repository/repoListReleases">https://huballspice.allspice.io/api/swagger#/repository/repoListReleases
1353 def get_latest_release(self) -> Release: 1354 """ 1355 Get the latest release for this repository. 1356 1357 See https://hub.allspice.io/api/swagger#/repository/repoGetLatestRelease 1358 """ 1359 1360 url = self.REPO_GET_LATEST_RELEASE.format(owner=self.owner.username, repo=self.name) 1361 response = self.allspice_client.requests_get(url) 1362 release = Release.parse_response(self.allspice_client, response, self) 1363 return release
Get the latest release for this repository.
See allspice.allspice.io/api/swagger#/repository/repoGetLatestRelease">https://huballspice.allspice.io/api/swagger#/repository/repoGetLatestRelease
1365 def get_release_by_tag(self, tag: str) -> Release: 1366 """ 1367 Get a release by its tag. 1368 1369 See https://hub.allspice.io/api/swagger#/repository/repoGetReleaseByTag 1370 """ 1371 1372 url = self.REPO_GET_RELEASE_BY_TAG.format( 1373 owner=self.owner.username, repo=self.name, tag=tag 1374 ) 1375 response = self.allspice_client.requests_get(url) 1376 release = Release.parse_response(self.allspice_client, response, self) 1377 return release
Get a release by its tag.
See allspice.allspice.io/api/swagger#/repository/repoGetReleaseByTag">https://huballspice.allspice.io/api/swagger#/repository/repoGetReleaseByTag
1379 def get_commit_statuses( 1380 self, 1381 commit: Union[str, Commit], 1382 sort: Optional[CommitStatusSort] = None, 1383 state: Optional[CommitStatusState] = None, 1384 ) -> List[CommitStatus]: 1385 """ 1386 Get a list of statuses for a commit. 1387 1388 This is roughly equivalent to the Commit.get_statuses method, but this 1389 method allows you to sort and filter commits and is more convenient if 1390 you have a commit SHA and don't need to get the commit itself. 1391 1392 See https://hub.allspice.io/api/swagger#/repository/repoListStatuses 1393 """ 1394 1395 if isinstance(commit, Commit): 1396 commit = commit.sha 1397 1398 params = {} 1399 if sort is not None: 1400 params["sort"] = sort.value 1401 if state is not None: 1402 params["state"] = state.value 1403 1404 url = self.REPO_GET_COMMIT_STATUS.format( 1405 owner=self.owner.username, repo=self.name, sha=commit 1406 ) 1407 response = self.allspice_client.requests_get_paginated(url, params=params) 1408 return [CommitStatus.parse_response(self.allspice_client, status) for status in response]
Get a list of statuses for a commit.
This is roughly equivalent to the Commit.get_statuses method, but this method allows you to sort and filter commits and is more convenient if you have a commit SHA and don't need to get the commit itself.
See allspice.allspice.io/api/swagger#/repository/repoListStatuses">https://huballspice.allspice.io/api/swagger#/repository/repoListStatuses
1410 def create_commit_status( 1411 self, 1412 commit: Union[str, Commit], 1413 context: Optional[str] = None, 1414 description: Optional[str] = None, 1415 state: Optional[CommitStatusState] = None, 1416 target_url: Optional[str] = None, 1417 ) -> CommitStatus: 1418 """ 1419 Create a status on a commit. 1420 1421 See https://hub.allspice.io/api/swagger#/repository/repoCreateStatus 1422 """ 1423 1424 if isinstance(commit, Commit): 1425 commit = commit.sha 1426 1427 data = {} 1428 if context is not None: 1429 data["context"] = context 1430 if description is not None: 1431 data["description"] = description 1432 if state is not None: 1433 data["state"] = state.value 1434 if target_url is not None: 1435 data["target_url"] = target_url 1436 1437 url = self.REPO_GET_COMMIT_STATUS.format( 1438 owner=self.owner.username, repo=self.name, sha=commit 1439 ) 1440 response = self.allspice_client.requests_post(url, data=data) 1441 return CommitStatus.parse_response(self.allspice_client, response)
Create a status on a commit.
See allspice.allspice.io/api/swagger#/repository/repoCreateStatus">https://huballspice.allspice.io/api/swagger#/repository/repoCreateStatus
567 class ArchiveFormat(Enum): 568 """ 569 Archive formats for Repository.get_archive 570 """ 571 572 TAR = "tar.gz" 573 ZIP = "zip"
Archive formats for Repository.get_archive
575 class CommitStatusSort(Enum): 576 """ 577 Sort order for Repository.get_commit_status 578 """ 579 580 OLDEST = "oldest" 581 RECENT_UPDATE = "recentupdate" 582 LEAST_UPDATE = "leastupdate" 583 LEAST_INDEX = "leastindex" 584 HIGHEST_INDEX = "highestindex"
Sort order for Repository.get_commit_status
1450class Milestone(ApiObject): 1451 allow_merge_commits: Any 1452 allow_rebase: Any 1453 allow_rebase_explicit: Any 1454 allow_squash_merge: Any 1455 archived: Any 1456 closed_at: Any 1457 closed_issues: int 1458 created_at: str 1459 default_branch: Any 1460 description: str 1461 due_on: Any 1462 has_issues: Any 1463 has_pull_requests: Any 1464 has_wiki: Any 1465 id: int 1466 ignore_whitespace_conflicts: Any 1467 name: Any 1468 open_issues: int 1469 private: Any 1470 state: str 1471 title: str 1472 updated_at: str 1473 website: Any 1474 1475 API_OBJECT = """/repos/{owner}/{repo}/milestones/{number}""" # <owner, repo> 1476 1477 def __init__(self, allspice_client): 1478 super().__init__(allspice_client) 1479 1480 def __eq__(self, other): 1481 if not isinstance(other, Milestone): 1482 return False 1483 return self.allspice_client == other.allspice_client and self.id == other.id 1484 1485 def __hash__(self): 1486 return hash(self.allspice_client) ^ hash(self.id) 1487 1488 _fields_to_parsers: ClassVar[dict] = { 1489 "closed_at": lambda _, t: Util.convert_time(t), 1490 "due_on": lambda _, t: Util.convert_time(t), 1491 } 1492 1493 _patchable_fields: ClassVar[set[str]] = { 1494 "allow_merge_commits", 1495 "allow_rebase", 1496 "allow_rebase_explicit", 1497 "allow_squash_merge", 1498 "archived", 1499 "default_branch", 1500 "description", 1501 "has_issues", 1502 "has_pull_requests", 1503 "has_wiki", 1504 "ignore_whitespace_conflicts", 1505 "name", 1506 "private", 1507 "website", 1508 } 1509 1510 @classmethod 1511 def request(cls, allspice_client, owner: str, repo: str, number: str): 1512 return cls._request(allspice_client, {"owner": owner, "repo": repo, "number": number})
1515class Attachment(ReadonlyApiObject): 1516 """ 1517 An asset attached to a comment. 1518 1519 You cannot edit or delete the attachment from this object - see the instance methods 1520 Comment.edit_attachment and delete_attachment for that. 1521 """ 1522 1523 browser_download_url: str 1524 created_at: str 1525 download_count: int 1526 id: int 1527 name: str 1528 size: int 1529 uuid: str 1530 1531 def __init__(self, allspice_client): 1532 super().__init__(allspice_client) 1533 1534 def __eq__(self, other): 1535 if not isinstance(other, Attachment): 1536 return False 1537 1538 return self.uuid == other.uuid 1539 1540 def __hash__(self): 1541 return hash(self.uuid) 1542 1543 def download_to_file(self, io: IO): 1544 """ 1545 Download the raw, binary data of this Attachment to a file-like object. 1546 1547 Example: 1548 1549 with open("my_file.zip", "wb") as f: 1550 attachment.download_to_file(f) 1551 1552 :param io: The file-like object to write the data to. 1553 """ 1554 1555 response = self.allspice_client.requests.get( 1556 self.browser_download_url, 1557 headers=self.allspice_client.headers, 1558 stream=True, 1559 ) 1560 # 4kb chunks 1561 for chunk in response.iter_content(chunk_size=4096): 1562 if chunk: 1563 io.write(chunk)
An asset attached to a comment.
You cannot edit or delete the attachment from this object - see the instance methods Comment.edit_attachment and delete_attachment for that.
1543 def download_to_file(self, io: IO): 1544 """ 1545 Download the raw, binary data of this Attachment to a file-like object. 1546 1547 Example: 1548 1549 with open("my_file.zip", "wb") as f: 1550 attachment.download_to_file(f) 1551 1552 :param io: The file-like object to write the data to. 1553 """ 1554 1555 response = self.allspice_client.requests.get( 1556 self.browser_download_url, 1557 headers=self.allspice_client.headers, 1558 stream=True, 1559 ) 1560 # 4kb chunks 1561 for chunk in response.iter_content(chunk_size=4096): 1562 if chunk: 1563 io.write(chunk)
Download the raw, binary data of this Attachment to a file-like object.
Example:
with open("my_file.zip", "wb") as f:
attachment.download_to_file(f)
Parameters
- io: The file-like object to write the data to.
Inherited Members
1566class Comment(ApiObject): 1567 assets: List[Union[Any, Dict[str, Union[int, str]]]] 1568 body: str 1569 created_at: datetime 1570 html_url: str 1571 id: int 1572 issue_url: str 1573 original_author: str 1574 original_author_id: int 1575 pull_request_url: str 1576 updated_at: datetime 1577 user: User 1578 1579 API_OBJECT = """/repos/{owner}/{repo}/issues/comments/{id}""" 1580 GET_ATTACHMENTS_PATH = """/repos/{owner}/{repo}/issues/comments/{id}/assets""" 1581 ATTACHMENT_PATH = """/repos/{owner}/{repo}/issues/comments/{id}/assets/{attachment_id}""" 1582 1583 def __init__(self, allspice_client): 1584 super().__init__(allspice_client) 1585 1586 def __eq__(self, other): 1587 if not isinstance(other, Comment): 1588 return False 1589 return self.repository == other.repository and self.id == other.id 1590 1591 def __hash__(self): 1592 return hash(self.repository) ^ hash(self.id) 1593 1594 @classmethod 1595 def request(cls, allspice_client, owner: str, repo: str, id: str) -> "Comment": 1596 return cls._request(allspice_client, {"owner": owner, "repo": repo, "id": id}) 1597 1598 _fields_to_parsers: ClassVar[dict] = { 1599 "user": lambda allspice_client, r: User.parse_response(allspice_client, r), 1600 "created_at": lambda _, t: Util.convert_time(t), 1601 "updated_at": lambda _, t: Util.convert_time(t), 1602 } 1603 1604 _patchable_fields: ClassVar[set[str]] = {"body"} 1605 1606 @property 1607 def parent_url(self) -> str: 1608 """URL of the parent of this comment (the issue or the pull request)""" 1609 1610 if self.issue_url is not None and self.issue_url != "": 1611 return self.issue_url 1612 else: 1613 return self.pull_request_url 1614 1615 @cached_property 1616 def repository(self) -> Repository: 1617 """The repository this comment was posted on.""" 1618 1619 owner_name, repo_name = self.parent_url.split("/")[-4:-2] 1620 return Repository.request(self.allspice_client, owner_name, repo_name) 1621 1622 def __fields_for_path(self): 1623 return { 1624 "owner": self.repository.owner.username, 1625 "repo": self.repository.name, 1626 "id": self.id, 1627 } 1628 1629 def commit(self): 1630 self._commit(self.__fields_for_path()) 1631 1632 def delete(self): 1633 self.allspice_client.requests_delete(self.API_OBJECT.format(**self.__fields_for_path())) 1634 self.deleted = True 1635 1636 def get_attachments(self) -> List[Attachment]: 1637 """ 1638 Get all attachments on this comment. This returns Attachment objects, which 1639 contain a link to download the attachment. 1640 1641 https://hub.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments 1642 """ 1643 1644 results = self.allspice_client.requests_get( 1645 self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()) 1646 ) 1647 return [Attachment.parse_response(self.allspice_client, result) for result in results] 1648 1649 def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment: 1650 """ 1651 Create an attachment on this comment. 1652 1653 https://hub.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment 1654 1655 :param file: The file to attach. This should be a file-like object. 1656 :param name: The name of the file. If not provided, the name of the file will be 1657 used. 1658 :return: The created attachment. 1659 """ 1660 1661 args: dict[str, Any] = { 1662 "files": {"attachment": file}, 1663 } 1664 if name is not None: 1665 args["params"] = {"name": name} 1666 1667 result = self.allspice_client.requests_post( 1668 self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()), 1669 **args, 1670 ) 1671 return Attachment.parse_response(self.allspice_client, result) 1672 1673 def edit_attachment(self, attachment: Attachment, data: dict) -> Attachment: 1674 """ 1675 Edit an attachment. 1676 1677 The list of params that can be edited is available at 1678 https://hub.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment 1679 1680 :param attachment: The attachment to be edited 1681 :param data: The data parameter should be a dictionary of the fields to edit. 1682 :return: The edited attachment 1683 """ 1684 1685 args = { 1686 **self.__fields_for_path(), 1687 "attachment_id": attachment.id, 1688 } 1689 result = self.allspice_client.requests_patch( 1690 self.ATTACHMENT_PATH.format(**args), 1691 data=data, 1692 ) 1693 return Attachment.parse_response(self.allspice_client, result) 1694 1695 def delete_attachment(self, attachment: Attachment): 1696 """https://hub.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment""" 1697 1698 args = { 1699 **self.__fields_for_path(), 1700 "attachment_id": attachment.id, 1701 } 1702 self.allspice_client.requests_delete(self.ATTACHMENT_PATH.format(**args)) 1703 attachment.deleted = True
1606 @property 1607 def parent_url(self) -> str: 1608 """URL of the parent of this comment (the issue or the pull request)""" 1609 1610 if self.issue_url is not None and self.issue_url != "": 1611 return self.issue_url 1612 else: 1613 return self.pull_request_url
URL of the parent of this comment (the issue or the pull request)
1615 @cached_property 1616 def repository(self) -> Repository: 1617 """The repository this comment was posted on.""" 1618 1619 owner_name, repo_name = self.parent_url.split("/")[-4:-2] 1620 return Repository.request(self.allspice_client, owner_name, repo_name)
The repository this comment was posted on.
1636 def get_attachments(self) -> List[Attachment]: 1637 """ 1638 Get all attachments on this comment. This returns Attachment objects, which 1639 contain a link to download the attachment. 1640 1641 https://hub.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments 1642 """ 1643 1644 results = self.allspice_client.requests_get( 1645 self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()) 1646 ) 1647 return [Attachment.parse_response(self.allspice_client, result) for result in results]
Get all attachments on this comment. This returns Attachment objects, which contain a link to download the attachment.
allspice.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments">https://huballspice.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments
1649 def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment: 1650 """ 1651 Create an attachment on this comment. 1652 1653 https://hub.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment 1654 1655 :param file: The file to attach. This should be a file-like object. 1656 :param name: The name of the file. If not provided, the name of the file will be 1657 used. 1658 :return: The created attachment. 1659 """ 1660 1661 args: dict[str, Any] = { 1662 "files": {"attachment": file}, 1663 } 1664 if name is not None: 1665 args["params"] = {"name": name} 1666 1667 result = self.allspice_client.requests_post( 1668 self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()), 1669 **args, 1670 ) 1671 return Attachment.parse_response(self.allspice_client, result)
Create an attachment on this comment.
allspice.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment
Parameters
- file: The file to attach. This should be a file-like object.
- name: The name of the file. If not provided, the name of the file will be used.
Returns
The created attachment.
1673 def edit_attachment(self, attachment: Attachment, data: dict) -> Attachment: 1674 """ 1675 Edit an attachment. 1676 1677 The list of params that can be edited is available at 1678 https://hub.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment 1679 1680 :param attachment: The attachment to be edited 1681 :param data: The data parameter should be a dictionary of the fields to edit. 1682 :return: The edited attachment 1683 """ 1684 1685 args = { 1686 **self.__fields_for_path(), 1687 "attachment_id": attachment.id, 1688 } 1689 result = self.allspice_client.requests_patch( 1690 self.ATTACHMENT_PATH.format(**args), 1691 data=data, 1692 ) 1693 return Attachment.parse_response(self.allspice_client, result)
Edit an attachment.
The list of params that can be edited is available at allspice.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment
Parameters
- attachment: The attachment to be edited
- data: The data parameter should be a dictionary of the fields to edit.
Returns
The edited attachment
1695 def delete_attachment(self, attachment: Attachment): 1696 """https://hub.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment""" 1697 1698 args = { 1699 **self.__fields_for_path(), 1700 "attachment_id": attachment.id, 1701 } 1702 self.allspice_client.requests_delete(self.ATTACHMENT_PATH.format(**args)) 1703 attachment.deleted = True
allspice.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment
1706class Commit(ReadonlyApiObject): 1707 author: User 1708 commit: Dict[str, Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]] 1709 committer: Dict[str, Union[int, str, bool]] 1710 created: str 1711 files: List[Dict[str, str]] 1712 html_url: str 1713 inner_commit: Dict[str, Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]] 1714 parents: List[Union[Dict[str, str], Any]] 1715 sha: str 1716 stats: Dict[str, int] 1717 url: str 1718 1719 API_OBJECT = """/repos/{owner}/{repo}/commits/{sha}""" 1720 COMMIT_GET_STATUS = """/repos/{owner}/{repo}/commits/{sha}/status""" 1721 COMMIT_GET_STATUSES = """/repos/{owner}/{repo}/commits/{sha}/statuses""" 1722 1723 # Regex to extract owner and repo names from the url property 1724 URL_REGEXP = re.compile(r"/repos/([^/]+)/([^/]+)/git/commits") 1725 1726 def __init__(self, allspice_client): 1727 super().__init__(allspice_client) 1728 1729 _fields_to_parsers: ClassVar[dict] = { 1730 # NOTE: api may return None for commiters that are no allspice users 1731 "author": lambda allspice_client, u: ( 1732 User.parse_response(allspice_client, u) if u else None 1733 ) 1734 } 1735 1736 def __eq__(self, other): 1737 if not isinstance(other, Commit): 1738 return False 1739 return self.sha == other.sha 1740 1741 def __hash__(self): 1742 return hash(self.sha) 1743 1744 @classmethod 1745 def parse_response(cls, allspice_client, result) -> "Commit": 1746 commit_cache = result["commit"] 1747 api_object = cls(allspice_client) 1748 cls._initialize(allspice_client, api_object, result) 1749 # inner_commit for legacy reasons 1750 Commit._add_read_property("inner_commit", commit_cache, api_object) 1751 return api_object 1752 1753 def get_status(self) -> CommitCombinedStatus: 1754 """ 1755 Get a combined status consisting of all statues on this commit. 1756 1757 Note that the returned object is a CommitCombinedStatus object, which 1758 also contains a list of all statuses on the commit. 1759 1760 https://hub.allspice.io/api/swagger#/repository/repoGetCommitStatus 1761 """ 1762 1763 result = self.allspice_client.requests_get( 1764 self.COMMIT_GET_STATUS.format(**self._fields_for_path) 1765 ) 1766 return CommitCombinedStatus.parse_response(self.allspice_client, result) 1767 1768 def get_statuses(self) -> List[CommitStatus]: 1769 """ 1770 Get a list of all statuses on this commit. 1771 1772 https://hub.allspice.io/api/swagger#/repository/repoListCommitStatuses 1773 """ 1774 1775 results = self.allspice_client.requests_get( 1776 self.COMMIT_GET_STATUSES.format(**self._fields_for_path) 1777 ) 1778 return [CommitStatus.parse_response(self.allspice_client, result) for result in results] 1779 1780 @cached_property 1781 def _fields_for_path(self) -> dict[str, str]: 1782 matches = self.URL_REGEXP.search(self.url) 1783 if not matches: 1784 raise ValueError(f"Invalid commit URL: {self.url}") 1785 1786 return { 1787 "owner": matches.group(1), 1788 "repo": matches.group(2), 1789 "sha": self.sha, 1790 }
1744 @classmethod 1745 def parse_response(cls, allspice_client, result) -> "Commit": 1746 commit_cache = result["commit"] 1747 api_object = cls(allspice_client) 1748 cls._initialize(allspice_client, api_object, result) 1749 # inner_commit for legacy reasons 1750 Commit._add_read_property("inner_commit", commit_cache, api_object) 1751 return api_object
1753 def get_status(self) -> CommitCombinedStatus: 1754 """ 1755 Get a combined status consisting of all statues on this commit. 1756 1757 Note that the returned object is a CommitCombinedStatus object, which 1758 also contains a list of all statuses on the commit. 1759 1760 https://hub.allspice.io/api/swagger#/repository/repoGetCommitStatus 1761 """ 1762 1763 result = self.allspice_client.requests_get( 1764 self.COMMIT_GET_STATUS.format(**self._fields_for_path) 1765 ) 1766 return CommitCombinedStatus.parse_response(self.allspice_client, result)
Get a combined status consisting of all statues on this commit.
Note that the returned object is a CommitCombinedStatus object, which also contains a list of all statuses on the commit.
allspice.allspice.io/api/swagger#/repository/repoGetCommitStatus">https://huballspice.allspice.io/api/swagger#/repository/repoGetCommitStatus
1768 def get_statuses(self) -> List[CommitStatus]: 1769 """ 1770 Get a list of all statuses on this commit. 1771 1772 https://hub.allspice.io/api/swagger#/repository/repoListCommitStatuses 1773 """ 1774 1775 results = self.allspice_client.requests_get( 1776 self.COMMIT_GET_STATUSES.format(**self._fields_for_path) 1777 ) 1778 return [CommitStatus.parse_response(self.allspice_client, result) for result in results]
Get a list of all statuses on this commit.
allspice.allspice.io/api/swagger#/repository/repoListCommitStatuses">https://huballspice.allspice.io/api/swagger#/repository/repoListCommitStatuses
Inherited Members
1793class CommitStatusState(Enum): 1794 PENDING = "pending" 1795 SUCCESS = "success" 1796 ERROR = "error" 1797 FAILURE = "failure" 1798 WARNING = "warning" 1799 1800 @classmethod 1801 def try_init(cls, value: str) -> CommitStatusState | str: 1802 """ 1803 Try converting a string to the enum, and if that fails, return the 1804 string itself. 1805 """ 1806 1807 try: 1808 return cls(value) 1809 except ValueError: 1810 return value
1800 @classmethod 1801 def try_init(cls, value: str) -> CommitStatusState | str: 1802 """ 1803 Try converting a string to the enum, and if that fails, return the 1804 string itself. 1805 """ 1806 1807 try: 1808 return cls(value) 1809 except ValueError: 1810 return value
Try converting a string to the enum, and if that fails, return the string itself.
1813class CommitStatus(ReadonlyApiObject): 1814 context: str 1815 created_at: str 1816 creator: User 1817 description: str 1818 id: int 1819 status: CommitStatusState 1820 target_url: str 1821 updated_at: str 1822 url: str 1823 1824 def __init__(self, allspice_client): 1825 super().__init__(allspice_client) 1826 1827 _fields_to_parsers: ClassVar[dict] = { 1828 # Gitea/ASH doesn't actually validate that the status is a "valid" 1829 # status, so we can expect empty or unknown strings in the status field. 1830 "status": lambda _, s: CommitStatusState.try_init(s), 1831 "creator": lambda allspice_client, u: ( 1832 User.parse_response(allspice_client, u) if u else None 1833 ), 1834 } 1835 1836 def __eq__(self, other): 1837 if not isinstance(other, CommitStatus): 1838 return False 1839 return self.id == other.id 1840 1841 def __hash__(self): 1842 return hash(self.id)
Inherited Members
1845class CommitCombinedStatus(ReadonlyApiObject): 1846 commit_url: str 1847 repository: Repository 1848 sha: str 1849 state: CommitStatusState 1850 statuses: List["CommitStatus"] 1851 total_count: int 1852 url: str 1853 1854 def __init__(self, allspice_client): 1855 super().__init__(allspice_client) 1856 1857 _fields_to_parsers: ClassVar[dict] = { 1858 # See CommitStatus 1859 "state": lambda _, s: CommitStatusState.try_init(s), 1860 "statuses": lambda allspice_client, statuses: [ 1861 CommitStatus.parse_response(allspice_client, status) for status in statuses 1862 ], 1863 "repository": lambda allspice_client, r: Repository.parse_response(allspice_client, r), 1864 } 1865 1866 def __eq__(self, other): 1867 if not isinstance(other, CommitCombinedStatus): 1868 return False 1869 return self.sha == other.sha 1870 1871 def __hash__(self): 1872 return hash(self.sha) 1873 1874 @classmethod 1875 def parse_response(cls, allspice_client, result) -> "CommitCombinedStatus": 1876 api_object = cls(allspice_client) 1877 cls._initialize(allspice_client, api_object, result) 1878 return api_object
Inherited Members
1881class Issue(ApiObject): 1882 """ 1883 An issue on a repository. 1884 1885 Note: `Issue.assets` may not have any entries even if the issue has 1886 attachments. This happens when an issue is fetched via a bulk method like 1887 `Repository.get_issues`. In most cases, prefer using 1888 `Issue.get_attachments` to get the attachments on an issue. 1889 """ 1890 1891 assets: List[Union[Any, "Attachment"]] 1892 assignee: Any 1893 assignees: Any 1894 body: str 1895 closed_at: Any 1896 comments: int 1897 created_at: str 1898 due_date: Any 1899 html_url: str 1900 id: int 1901 is_locked: bool 1902 labels: List[Any] 1903 milestone: Optional["Milestone"] 1904 number: int 1905 original_author: str 1906 original_author_id: int 1907 pin_order: int 1908 pull_request: Any 1909 ref: str 1910 repository: Dict[str, Union[int, str]] 1911 state: str 1912 title: str 1913 updated_at: str 1914 url: str 1915 user: User 1916 1917 API_OBJECT = """/repos/{owner}/{repo}/issues/{index}""" # <owner, repo, index> 1918 GET_TIME = """/repos/%s/%s/issues/%s/times""" # <owner, repo, index> 1919 GET_COMMENTS = """/repos/{owner}/{repo}/issues/{index}/comments""" 1920 CREATE_ISSUE = """/repos/{owner}/{repo}/issues""" 1921 GET_ATTACHMENTS = """/repos/{owner}/{repo}/issues/{index}/assets""" 1922 1923 OPENED = "open" 1924 CLOSED = "closed" 1925 1926 def __init__(self, allspice_client): 1927 super().__init__(allspice_client) 1928 1929 def __eq__(self, other): 1930 if not isinstance(other, Issue): 1931 return False 1932 return self.repository == other.repository and self.id == other.id 1933 1934 def __hash__(self): 1935 return hash(self.repository) ^ hash(self.id) 1936 1937 _fields_to_parsers: ClassVar[dict] = { 1938 "milestone": lambda allspice_client, m: Milestone.parse_response(allspice_client, m), 1939 "user": lambda allspice_client, u: User.parse_response(allspice_client, u), 1940 "assets": lambda allspice_client, assets: [ 1941 Attachment.parse_response(allspice_client, a) for a in assets 1942 ], 1943 "assignee": lambda allspice_client, u: User.parse_response(allspice_client, u), 1944 "assignees": lambda allspice_client, us: [ 1945 User.parse_response(allspice_client, u) for u in us 1946 ], 1947 "state": lambda _, s: (Issue.CLOSED if s == "closed" else Issue.OPENED), 1948 } 1949 1950 _parsers_to_fields: ClassVar[dict] = { 1951 "milestone": lambda m: m.id, 1952 } 1953 1954 _patchable_fields: ClassVar[set[str]] = { 1955 "assignee", 1956 "assignees", 1957 "body", 1958 "due_date", 1959 "milestone", 1960 "state", 1961 "title", 1962 } 1963 1964 def commit(self): 1965 args = { 1966 "owner": self.repository.owner.username, 1967 "repo": self.repository.name, 1968 "index": self.number, 1969 } 1970 self._commit(args) 1971 1972 @classmethod 1973 def request(cls, allspice_client, owner: str, repo: str, number: str): 1974 api_object = cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number}) 1975 # The repository in the response is a RepositoryMeta object, so request 1976 # the full repository object and add it to the issue object. 1977 repository = Repository.request(allspice_client, owner, repo) 1978 setattr(api_object, "_repository", repository) 1979 # For legacy reasons 1980 cls._add_read_property("repo", repository, api_object) 1981 return api_object 1982 1983 @classmethod 1984 def create_issue(cls, allspice_client, repo: Repository, title: str, body: str = ""): 1985 args = {"owner": repo.owner.username, "repo": repo.name} 1986 data = {"title": title, "body": body} 1987 result = allspice_client.requests_post(Issue.CREATE_ISSUE.format(**args), data=data) 1988 issue = Issue.parse_response(allspice_client, result) 1989 setattr(issue, "_repository", repo) 1990 cls._add_read_property("repo", repo, issue) 1991 return issue 1992 1993 @property 1994 def owner(self) -> Organization | User: 1995 return self.repository.owner 1996 1997 def get_time_sum(self, user: User) -> int: 1998 results = self.allspice_client.requests_get( 1999 Issue.GET_TIME % (self.owner.username, self.repository.name, self.number) 2000 ) 2001 return sum(result["time"] for result in results if result and result["user_id"] == user.id) 2002 2003 def get_times(self) -> Optional[Dict]: 2004 return self.allspice_client.requests_get( 2005 Issue.GET_TIME % (self.owner.username, self.repository.name, self.number) 2006 ) 2007 2008 def delete_time(self, time_id: str): 2009 path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times/{time_id}" 2010 self.allspice_client.requests_delete(path) 2011 2012 def add_time(self, time: int, created: Optional[str] = None, user_name: Optional[User] = None): 2013 path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times" 2014 self.allspice_client.requests_post( 2015 path, data={"created": created, "time": int(time), "user_name": user_name} 2016 ) 2017 2018 def get_comments(self) -> List[Comment]: 2019 """https://hub.allspice.io/api/swagger#/issue/issueGetComments""" 2020 2021 results = self.allspice_client.requests_get( 2022 self.GET_COMMENTS.format( 2023 owner=self.owner.username, repo=self.repository.name, index=self.number 2024 ) 2025 ) 2026 2027 return [Comment.parse_response(self.allspice_client, result) for result in results] 2028 2029 def create_comment(self, body: str) -> Comment: 2030 """https://hub.allspice.io/api/swagger#/issue/issueCreateComment""" 2031 2032 path = self.GET_COMMENTS.format( 2033 owner=self.owner.username, repo=self.repository.name, index=self.number 2034 ) 2035 2036 response = self.allspice_client.requests_post(path, data={"body": body}) 2037 return Comment.parse_response(self.allspice_client, response) 2038 2039 def get_attachments(self) -> List[Attachment]: 2040 """ 2041 Fetch all attachments on this issue. 2042 2043 Unlike the assets field, this will always fetch all attachments from the 2044 API. 2045 2046 See https://hub.allspice.io/api/swagger#/issue/issueListIssueAttachments 2047 """ 2048 2049 path = self.GET_ATTACHMENTS.format( 2050 owner=self.owner.username, repo=self.repository.name, index=self.number 2051 ) 2052 response = self.allspice_client.requests_get(path) 2053 2054 return [Attachment.parse_response(self.allspice_client, result) for result in response] 2055 2056 def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment: 2057 """ 2058 Create an attachment on this issue. 2059 2060 https://hub.allspice.io/api/swagger#/issue/issueCreateIssueAttachment 2061 2062 :param file: The file to attach. This should be a file-like object. 2063 :param name: The name of the file. If not provided, the name of the file will be 2064 used. 2065 :return: The created attachment. 2066 """ 2067 2068 args: dict[str, Any] = { 2069 "files": {"attachment": file}, 2070 } 2071 if name is not None: 2072 args["params"] = {"name": name} 2073 2074 result = self.allspice_client.requests_post( 2075 self.GET_ATTACHMENTS.format( 2076 owner=self.owner.username, repo=self.repository.name, index=self.number 2077 ), 2078 **args, 2079 ) 2080 2081 return Attachment.parse_response(self.allspice_client, result)
An issue on a repository.
Note: Issue.assets
may not have any entries even if the issue has
attachments. This happens when an issue is fetched via a bulk method like
Repository.get_issues
. In most cases, prefer using
Issue.get_attachments
to get the attachments on an issue.
1972 @classmethod 1973 def request(cls, allspice_client, owner: str, repo: str, number: str): 1974 api_object = cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number}) 1975 # The repository in the response is a RepositoryMeta object, so request 1976 # the full repository object and add it to the issue object. 1977 repository = Repository.request(allspice_client, owner, repo) 1978 setattr(api_object, "_repository", repository) 1979 # For legacy reasons 1980 cls._add_read_property("repo", repository, api_object) 1981 return api_object
1983 @classmethod 1984 def create_issue(cls, allspice_client, repo: Repository, title: str, body: str = ""): 1985 args = {"owner": repo.owner.username, "repo": repo.name} 1986 data = {"title": title, "body": body} 1987 result = allspice_client.requests_post(Issue.CREATE_ISSUE.format(**args), data=data) 1988 issue = Issue.parse_response(allspice_client, result) 1989 setattr(issue, "_repository", repo) 1990 cls._add_read_property("repo", repo, issue) 1991 return issue
2012 def add_time(self, time: int, created: Optional[str] = None, user_name: Optional[User] = None): 2013 path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times" 2014 self.allspice_client.requests_post( 2015 path, data={"created": created, "time": int(time), "user_name": user_name} 2016 )
2018 def get_comments(self) -> List[Comment]: 2019 """https://hub.allspice.io/api/swagger#/issue/issueGetComments""" 2020 2021 results = self.allspice_client.requests_get( 2022 self.GET_COMMENTS.format( 2023 owner=self.owner.username, repo=self.repository.name, index=self.number 2024 ) 2025 ) 2026 2027 return [Comment.parse_response(self.allspice_client, result) for result in results]
allspice.allspice.io/api/swagger#/issue/issueGetComments">https://huballspice.allspice.io/api/swagger#/issue/issueGetComments
2029 def create_comment(self, body: str) -> Comment: 2030 """https://hub.allspice.io/api/swagger#/issue/issueCreateComment""" 2031 2032 path = self.GET_COMMENTS.format( 2033 owner=self.owner.username, repo=self.repository.name, index=self.number 2034 ) 2035 2036 response = self.allspice_client.requests_post(path, data={"body": body}) 2037 return Comment.parse_response(self.allspice_client, response)
allspice.allspice.io/api/swagger#/issue/issueCreateComment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateComment
2039 def get_attachments(self) -> List[Attachment]: 2040 """ 2041 Fetch all attachments on this issue. 2042 2043 Unlike the assets field, this will always fetch all attachments from the 2044 API. 2045 2046 See https://hub.allspice.io/api/swagger#/issue/issueListIssueAttachments 2047 """ 2048 2049 path = self.GET_ATTACHMENTS.format( 2050 owner=self.owner.username, repo=self.repository.name, index=self.number 2051 ) 2052 response = self.allspice_client.requests_get(path) 2053 2054 return [Attachment.parse_response(self.allspice_client, result) for result in response]
Fetch all attachments on this issue.
Unlike the assets field, this will always fetch all attachments from the API.
See allspice.allspice.io/api/swagger#/issue/issueListIssueAttachments">https://huballspice.allspice.io/api/swagger#/issue/issueListIssueAttachments
2056 def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment: 2057 """ 2058 Create an attachment on this issue. 2059 2060 https://hub.allspice.io/api/swagger#/issue/issueCreateIssueAttachment 2061 2062 :param file: The file to attach. This should be a file-like object. 2063 :param name: The name of the file. If not provided, the name of the file will be 2064 used. 2065 :return: The created attachment. 2066 """ 2067 2068 args: dict[str, Any] = { 2069 "files": {"attachment": file}, 2070 } 2071 if name is not None: 2072 args["params"] = {"name": name} 2073 2074 result = self.allspice_client.requests_post( 2075 self.GET_ATTACHMENTS.format( 2076 owner=self.owner.username, repo=self.repository.name, index=self.number 2077 ), 2078 **args, 2079 ) 2080 2081 return Attachment.parse_response(self.allspice_client, result)
Create an attachment on this issue.
allspice.allspice.io/api/swagger#/issue/issueCreateIssueAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateIssueAttachment
Parameters
- file: The file to attach. This should be a file-like object.
- name: The name of the file. If not provided, the name of the file will be used.
Returns
The created attachment.
2084class DesignReview(ApiObject): 2085 """ 2086 A Design Review. See 2087 https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest. 2088 2089 Note: The base and head fields are not `Branch` objects - they are plain strings 2090 referring to the branch names. This is because DRs can exist for branches that have 2091 been deleted, which don't have an associated `Branch` object from the API. You can use 2092 the `Repository.get_branch` method to get a `Branch` object for a branch if you know 2093 it exists. 2094 """ 2095 2096 additions: int 2097 allow_maintainer_edit: bool 2098 allow_maintainer_edits: Any 2099 assignee: User 2100 assignees: List["User"] 2101 base: str 2102 body: str 2103 changed_files: int 2104 closed_at: Optional[str] 2105 comments: int 2106 created_at: str 2107 deletions: int 2108 diff_url: str 2109 draft: bool 2110 due_date: Optional[str] 2111 head: str 2112 html_url: str 2113 id: int 2114 is_locked: bool 2115 labels: List[Any] 2116 merge_base: str 2117 merge_commit_sha: Optional[str] 2118 mergeable: bool 2119 merged: bool 2120 merged_at: Optional[str] 2121 merged_by: Optional["User"] 2122 milestone: Any 2123 number: int 2124 patch_url: str 2125 pin_order: int 2126 repository: Optional["Repository"] 2127 requested_reviewers: Any 2128 review_comments: int 2129 state: str 2130 title: str 2131 updated_at: str 2132 url: str 2133 user: User 2134 2135 API_OBJECT = "/repos/{owner}/{repo}/pulls/{index}" 2136 MERGE_DESIGN_REVIEW = "/repos/{owner}/{repo}/pulls/{index}/merge" 2137 GET_COMMENTS = "/repos/{owner}/{repo}/issues/{index}/comments" 2138 2139 OPEN = "open" 2140 CLOSED = "closed" 2141 2142 class MergeType(Enum): 2143 MERGE = "merge" 2144 REBASE = "rebase" 2145 REBASE_MERGE = "rebase-merge" 2146 SQUASH = "squash" 2147 MANUALLY_MERGED = "manually-merged" 2148 2149 def __init__(self, allspice_client): 2150 super().__init__(allspice_client) 2151 2152 def __eq__(self, other): 2153 if not isinstance(other, DesignReview): 2154 return False 2155 return self.repository == other.repository and self.id == other.id 2156 2157 def __hash__(self): 2158 return hash(self.repository) ^ hash(self.id) 2159 2160 @classmethod 2161 def parse_response(cls, allspice_client, result) -> "DesignReview": 2162 api_object = super().parse_response(allspice_client, result) 2163 cls._add_read_property( 2164 "repository", 2165 Repository.parse_response(allspice_client, result["base"]["repo"]), 2166 api_object, 2167 ) 2168 2169 return api_object 2170 2171 @classmethod 2172 def request(cls, allspice_client, owner: str, repo: str, number: str): 2173 """See https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest""" 2174 return cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number}) 2175 2176 _fields_to_parsers: ClassVar[dict] = { 2177 "assignee": lambda allspice_client, u: User.parse_response(allspice_client, u), 2178 "assignees": lambda allspice_client, us: [ 2179 User.parse_response(allspice_client, u) for u in us 2180 ], 2181 "base": lambda _, b: b["ref"], 2182 "head": lambda _, h: h["ref"], 2183 "merged_by": lambda allspice_client, u: User.parse_response(allspice_client, u), 2184 "milestone": lambda allspice_client, m: Milestone.parse_response(allspice_client, m), 2185 "user": lambda allspice_client, u: User.parse_response(allspice_client, u), 2186 } 2187 2188 _patchable_fields: ClassVar[set[str]] = { 2189 "allow_maintainer_edits", 2190 "assignee", 2191 "assignees", 2192 "base", 2193 "body", 2194 "due_date", 2195 "milestone", 2196 "state", 2197 "title", 2198 } 2199 2200 _parsers_to_fields: ClassVar[dict] = { 2201 "assignee": lambda u: u.username, 2202 "assignees": lambda us: [u.username for u in us], 2203 "base": lambda b: b.name if isinstance(b, Branch) else b, 2204 "milestone": lambda m: m.id, 2205 } 2206 2207 def commit(self): 2208 data = self.get_dirty_fields() 2209 if "due_date" in data and data["due_date"] is None: 2210 data["unset_due_date"] = True 2211 2212 args = { 2213 "owner": self.repository.owner.username, 2214 "repo": self.repository.name, 2215 "index": self.number, 2216 } 2217 self._commit(args, data) 2218 2219 def merge(self, merge_type: MergeType): 2220 """ 2221 Merge the pull request. See 2222 https://hub.allspice.io/api/swagger#/repository/repoMergePullRequest 2223 2224 :param merge_type: The type of merge to perform. See the MergeType enum. 2225 """ 2226 2227 self.allspice_client.requests_post( 2228 self.MERGE_DESIGN_REVIEW.format( 2229 owner=self.repository.owner.username, 2230 repo=self.repository.name, 2231 index=self.number, 2232 ), 2233 data={"Do": merge_type.value}, 2234 ) 2235 2236 def get_comments(self) -> List[Comment]: 2237 """ 2238 Get the comments on this pull request, but not specifically on a review. 2239 2240 https://hub.allspice.io/api/swagger#/issue/issueGetComments 2241 2242 :return: A list of comments on this pull request. 2243 """ 2244 2245 results = self.allspice_client.requests_get( 2246 self.GET_COMMENTS.format( 2247 owner=self.repository.owner.username, 2248 repo=self.repository.name, 2249 index=self.number, 2250 ) 2251 ) 2252 return [Comment.parse_response(self.allspice_client, result) for result in results] 2253 2254 def create_comment(self, body: str): 2255 """ 2256 Create a comment on this pull request. This uses the same endpoint as the 2257 comments on issues, and will not be associated with any reviews. 2258 2259 https://hub.allspice.io/api/swagger#/issue/issueCreateComment 2260 2261 :param body: The body of the comment. 2262 :return: The comment that was created. 2263 """ 2264 2265 result = self.allspice_client.requests_post( 2266 self.GET_COMMENTS.format( 2267 owner=self.repository.owner.username, 2268 repo=self.repository.name, 2269 index=self.number, 2270 ), 2271 data={"body": body}, 2272 ) 2273 return Comment.parse_response(self.allspice_client, result)
A Design Review. See allspice.allspice.io/api/swagger#/repository/repoGetPullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoGetPullRequest.
Note: The base and head fields are not Branch
objects - they are plain strings
referring to the branch names. This is because DRs can exist for branches that have
been deleted, which don't have an associated Branch
object from the API. You can use
the Repository.get_branch
method to get a Branch
object for a branch if you know
it exists.
2160 @classmethod 2161 def parse_response(cls, allspice_client, result) -> "DesignReview": 2162 api_object = super().parse_response(allspice_client, result) 2163 cls._add_read_property( 2164 "repository", 2165 Repository.parse_response(allspice_client, result["base"]["repo"]), 2166 api_object, 2167 ) 2168 2169 return api_object
2171 @classmethod 2172 def request(cls, allspice_client, owner: str, repo: str, number: str): 2173 """See https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest""" 2174 return cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number})
See allspice.allspice.io/api/swagger#/repository/repoGetPullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoGetPullRequest
2207 def commit(self): 2208 data = self.get_dirty_fields() 2209 if "due_date" in data and data["due_date"] is None: 2210 data["unset_due_date"] = True 2211 2212 args = { 2213 "owner": self.repository.owner.username, 2214 "repo": self.repository.name, 2215 "index": self.number, 2216 } 2217 self._commit(args, data)
2219 def merge(self, merge_type: MergeType): 2220 """ 2221 Merge the pull request. See 2222 https://hub.allspice.io/api/swagger#/repository/repoMergePullRequest 2223 2224 :param merge_type: The type of merge to perform. See the MergeType enum. 2225 """ 2226 2227 self.allspice_client.requests_post( 2228 self.MERGE_DESIGN_REVIEW.format( 2229 owner=self.repository.owner.username, 2230 repo=self.repository.name, 2231 index=self.number, 2232 ), 2233 data={"Do": merge_type.value}, 2234 )
Merge the pull request. See allspice.allspice.io/api/swagger#/repository/repoMergePullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoMergePullRequest
Parameters
- merge_type: The type of merge to perform. See the MergeType enum.
2236 def get_comments(self) -> List[Comment]: 2237 """ 2238 Get the comments on this pull request, but not specifically on a review. 2239 2240 https://hub.allspice.io/api/swagger#/issue/issueGetComments 2241 2242 :return: A list of comments on this pull request. 2243 """ 2244 2245 results = self.allspice_client.requests_get( 2246 self.GET_COMMENTS.format( 2247 owner=self.repository.owner.username, 2248 repo=self.repository.name, 2249 index=self.number, 2250 ) 2251 ) 2252 return [Comment.parse_response(self.allspice_client, result) for result in results]
Get the comments on this pull request, but not specifically on a review.
allspice.allspice.io/api/swagger#/issue/issueGetComments">https://huballspice.allspice.io/api/swagger#/issue/issueGetComments
Returns
A list of comments on this pull request.
2254 def create_comment(self, body: str): 2255 """ 2256 Create a comment on this pull request. This uses the same endpoint as the 2257 comments on issues, and will not be associated with any reviews. 2258 2259 https://hub.allspice.io/api/swagger#/issue/issueCreateComment 2260 2261 :param body: The body of the comment. 2262 :return: The comment that was created. 2263 """ 2264 2265 result = self.allspice_client.requests_post( 2266 self.GET_COMMENTS.format( 2267 owner=self.repository.owner.username, 2268 repo=self.repository.name, 2269 index=self.number, 2270 ), 2271 data={"body": body}, 2272 ) 2273 return Comment.parse_response(self.allspice_client, result)
Create a comment on this pull request. This uses the same endpoint as the comments on issues, and will not be associated with any reviews.
allspice.allspice.io/api/swagger#/issue/issueCreateComment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateComment
Parameters
- body: The body of the comment.
Returns
The comment that was created.
2276class Team(ApiObject): 2277 can_create_org_repo: bool 2278 description: str 2279 id: int 2280 includes_all_repositories: bool 2281 name: str 2282 organization: Optional["Organization"] 2283 permission: str 2284 units: List[str] 2285 units_map: Dict[str, str] 2286 2287 API_OBJECT = """/teams/{id}""" # <id> 2288 ADD_REPO = """/teams/%s/repos/%s/%s""" # <id, org, repo> 2289 TEAM_DELETE = """/teams/%s""" # <id> 2290 GET_MEMBERS = """/teams/%s/members""" # <id> 2291 GET_REPOS = """/teams/%s/repos""" # <id> 2292 2293 def __init__(self, allspice_client): 2294 super().__init__(allspice_client) 2295 2296 def __eq__(self, other): 2297 if not isinstance(other, Team): 2298 return False 2299 return self.organization == other.organization and self.id == other.id 2300 2301 def __hash__(self): 2302 return hash(self.organization) ^ hash(self.id) 2303 2304 _fields_to_parsers: ClassVar[dict] = { 2305 "organization": lambda allspice_client, o: Organization.parse_response(allspice_client, o) 2306 } 2307 2308 _patchable_fields: ClassVar[set[str]] = { 2309 "can_create_org_repo", 2310 "description", 2311 "includes_all_repositories", 2312 "name", 2313 "permission", 2314 "units", 2315 "units_map", 2316 } 2317 2318 @classmethod 2319 def request(cls, allspice_client, id: int): 2320 return cls._request(allspice_client, {"id": id}) 2321 2322 def commit(self): 2323 args = {"id": self.id} 2324 self._commit(args) 2325 2326 def add_user(self, user: User): 2327 """https://hub.allspice.io/api/swagger#/organization/orgAddTeamMember""" 2328 url = f"/teams/{self.id}/members/{user.login}" 2329 self.allspice_client.requests_put(url) 2330 2331 def add_repo(self, org: Organization, repo: Union[Repository, str]): 2332 if isinstance(repo, Repository): 2333 repo_name = repo.name 2334 else: 2335 repo_name = repo 2336 self.allspice_client.requests_put(Team.ADD_REPO % (self.id, org.username, repo_name)) 2337 2338 def get_members(self): 2339 """Get all users assigned to the team.""" 2340 results = self.allspice_client.requests_get_paginated( 2341 Team.GET_MEMBERS % self.id, 2342 ) 2343 return [User.parse_response(self.allspice_client, result) for result in results] 2344 2345 def get_repos(self): 2346 """Get all repos of this Team.""" 2347 results = self.allspice_client.requests_get(Team.GET_REPOS % self.id) 2348 return [Repository.parse_response(self.allspice_client, result) for result in results] 2349 2350 def delete(self): 2351 self.allspice_client.requests_delete(Team.TEAM_DELETE % self.id) 2352 self.deleted = True 2353 2354 def remove_team_member(self, user_name: str): 2355 url = f"/teams/{self.id}/members/{user_name}" 2356 self.allspice_client.requests_delete(url)
2326 def add_user(self, user: User): 2327 """https://hub.allspice.io/api/swagger#/organization/orgAddTeamMember""" 2328 url = f"/teams/{self.id}/members/{user.login}" 2329 self.allspice_client.requests_put(url)
allspice.allspice.io/api/swagger#/organization/orgAddTeamMember">https://huballspice.allspice.io/api/swagger#/organization/orgAddTeamMember
2338 def get_members(self): 2339 """Get all users assigned to the team.""" 2340 results = self.allspice_client.requests_get_paginated( 2341 Team.GET_MEMBERS % self.id, 2342 ) 2343 return [User.parse_response(self.allspice_client, result) for result in results]
Get all users assigned to the team.
2345 def get_repos(self): 2346 """Get all repos of this Team.""" 2347 results = self.allspice_client.requests_get(Team.GET_REPOS % self.id) 2348 return [Repository.parse_response(self.allspice_client, result) for result in results]
Get all repos of this Team.
2359class Release(ApiObject): 2360 """ 2361 A release on a repo. 2362 """ 2363 2364 assets: List[Union[Any, Dict[str, Union[int, str]], "ReleaseAsset"]] 2365 author: User 2366 body: str 2367 created_at: str 2368 draft: bool 2369 html_url: str 2370 id: int 2371 name: str 2372 prerelease: bool 2373 published_at: str 2374 repo: Optional["Repository"] 2375 repository: Optional["Repository"] 2376 tag_name: str 2377 tarball_url: str 2378 target_commitish: str 2379 upload_url: str 2380 url: str 2381 zipball_url: str 2382 2383 API_OBJECT = "/repos/{owner}/{repo}/releases/{id}" 2384 RELEASE_CREATE_ASSET = "/repos/{owner}/{repo}/releases/{id}/assets" 2385 # Note that we don't strictly need the get_assets route, as the release 2386 # object already contains the assets. 2387 2388 def __init__(self, allspice_client): 2389 super().__init__(allspice_client) 2390 2391 def __eq__(self, other): 2392 if not isinstance(other, Release): 2393 return False 2394 return self.repo == other.repo and self.id == other.id 2395 2396 def __hash__(self): 2397 return hash(self.repo) ^ hash(self.id) 2398 2399 _fields_to_parsers: ClassVar[dict] = { 2400 "author": lambda allspice_client, author: User.parse_response(allspice_client, author), 2401 } 2402 _patchable_fields: ClassVar[set[str]] = { 2403 "body", 2404 "draft", 2405 "name", 2406 "prerelease", 2407 "tag_name", 2408 "target_commitish", 2409 } 2410 2411 @classmethod 2412 def parse_response(cls, allspice_client, result, repo) -> Release: 2413 release = super().parse_response(allspice_client, result) 2414 Release._add_read_property("repository", repo, release) 2415 # For legacy reasons 2416 Release._add_read_property("repo", repo, release) 2417 setattr( 2418 release, 2419 "_assets", 2420 [ 2421 ReleaseAsset.parse_response(allspice_client, asset, release) 2422 for asset in result["assets"] 2423 ], 2424 ) 2425 return release 2426 2427 @classmethod 2428 def request( 2429 cls, 2430 allspice_client, 2431 owner: str, 2432 repo: str, 2433 id: Optional[int] = None, 2434 ) -> Release: 2435 args = {"owner": owner, "repo": repo, "id": id} 2436 release_response = cls._get_gitea_api_object(allspice_client, args) 2437 repository = Repository.request(allspice_client, owner, repo) 2438 release = cls.parse_response(allspice_client, release_response, repository) 2439 return release 2440 2441 def commit(self): 2442 if self.repo is None: 2443 raise ValueError("Cannot commit a release without a repository.") 2444 2445 args = {"owner": self.repo.owner.username, "repo": self.repo.name, "id": self.id} 2446 self._commit(args) 2447 2448 def create_asset(self, file: IO, name: Optional[str] = None) -> ReleaseAsset: 2449 """ 2450 Create an asset for this release. 2451 2452 https://hub.allspice.io/api/swagger#/repository/repoCreateReleaseAsset 2453 2454 :param file: The file to upload. This should be a file-like object. 2455 :param name: The name of the file. 2456 :return: The created asset. 2457 """ 2458 2459 if self.repo is None: 2460 raise ValueError("Cannot commit a release without a repository.") 2461 2462 args: dict[str, Any] = {"files": {"attachment": file}} 2463 if name is not None: 2464 args["params"] = {"name": name} 2465 2466 result = self.allspice_client.requests_post( 2467 self.RELEASE_CREATE_ASSET.format( 2468 owner=self.repo.owner.username, 2469 repo=self.repo.name, 2470 id=self.id, 2471 ), 2472 **args, 2473 ) 2474 return ReleaseAsset.parse_response(self.allspice_client, result, self) 2475 2476 def delete(self): 2477 if self.repo is None: 2478 raise ValueError("Cannot commit a release without a repository.") 2479 2480 args = {"owner": self.repo.owner.username, "repo": self.repo.name, "id": self.id} 2481 self.allspice_client.requests_delete(self.API_OBJECT.format(**args)) 2482 self.deleted = True
A release on a repo.
2411 @classmethod 2412 def parse_response(cls, allspice_client, result, repo) -> Release: 2413 release = super().parse_response(allspice_client, result) 2414 Release._add_read_property("repository", repo, release) 2415 # For legacy reasons 2416 Release._add_read_property("repo", repo, release) 2417 setattr( 2418 release, 2419 "_assets", 2420 [ 2421 ReleaseAsset.parse_response(allspice_client, asset, release) 2422 for asset in result["assets"] 2423 ], 2424 ) 2425 return release
2427 @classmethod 2428 def request( 2429 cls, 2430 allspice_client, 2431 owner: str, 2432 repo: str, 2433 id: Optional[int] = None, 2434 ) -> Release: 2435 args = {"owner": owner, "repo": repo, "id": id} 2436 release_response = cls._get_gitea_api_object(allspice_client, args) 2437 repository = Repository.request(allspice_client, owner, repo) 2438 release = cls.parse_response(allspice_client, release_response, repository) 2439 return release
2448 def create_asset(self, file: IO, name: Optional[str] = None) -> ReleaseAsset: 2449 """ 2450 Create an asset for this release. 2451 2452 https://hub.allspice.io/api/swagger#/repository/repoCreateReleaseAsset 2453 2454 :param file: The file to upload. This should be a file-like object. 2455 :param name: The name of the file. 2456 :return: The created asset. 2457 """ 2458 2459 if self.repo is None: 2460 raise ValueError("Cannot commit a release without a repository.") 2461 2462 args: dict[str, Any] = {"files": {"attachment": file}} 2463 if name is not None: 2464 args["params"] = {"name": name} 2465 2466 result = self.allspice_client.requests_post( 2467 self.RELEASE_CREATE_ASSET.format( 2468 owner=self.repo.owner.username, 2469 repo=self.repo.name, 2470 id=self.id, 2471 ), 2472 **args, 2473 ) 2474 return ReleaseAsset.parse_response(self.allspice_client, result, self)
Create an asset for this release.
allspice.allspice.io/api/swagger#/repository/repoCreateReleaseAsset">https://huballspice.allspice.io/api/swagger#/repository/repoCreateReleaseAsset
Parameters
- file: The file to upload. This should be a file-like object.
- name: The name of the file.
Returns
The created asset.
2476 def delete(self): 2477 if self.repo is None: 2478 raise ValueError("Cannot commit a release without a repository.") 2479 2480 args = {"owner": self.repo.owner.username, "repo": self.repo.name, "id": self.id} 2481 self.allspice_client.requests_delete(self.API_OBJECT.format(**args)) 2482 self.deleted = True
2485class ReleaseAsset(ApiObject): 2486 browser_download_url: str 2487 created_at: str 2488 download_count: int 2489 id: int 2490 name: str 2491 release: Optional["Release"] 2492 size: int 2493 uuid: str 2494 2495 API_OBJECT = "/repos/{owner}/{repo}/releases/{release_id}/assets/{id}" 2496 2497 def __init__(self, allspice_client): 2498 super().__init__(allspice_client) 2499 2500 def __eq__(self, other): 2501 if not isinstance(other, ReleaseAsset): 2502 return False 2503 return self.release == other.release and self.id == other.id 2504 2505 def __hash__(self): 2506 return hash(self.release) ^ hash(self.id) 2507 2508 _fields_to_parsers: ClassVar[dict] = {} 2509 _patchable_fields: ClassVar[set[str]] = { 2510 "name", 2511 } 2512 2513 @classmethod 2514 def parse_response(cls, allspice_client, result, release) -> ReleaseAsset: 2515 asset = super().parse_response(allspice_client, result) 2516 ReleaseAsset._add_read_property("release", release, asset) 2517 return asset 2518 2519 @classmethod 2520 def request( 2521 cls, 2522 allspice_client, 2523 owner: str, 2524 repo: str, 2525 release_id: int, 2526 id: int, 2527 ) -> ReleaseAsset: 2528 args = {"owner": owner, "repo": repo, "release_id": release_id, "id": id} 2529 asset_response = cls._get_gitea_api_object(allspice_client, args) 2530 release = Release.request(allspice_client, owner, repo, release_id) 2531 asset = cls.parse_response(allspice_client, asset_response, release) 2532 return asset 2533 2534 def commit(self): 2535 if self.release is None or self.release.repo is None: 2536 raise ValueError("Cannot commit a release asset without a release or a repository.") 2537 2538 args = { 2539 "owner": self.release.repo.owner.username, 2540 "repo": self.release.repo.name, 2541 "release_id": self.release.id, 2542 "id": self.id, 2543 } 2544 self._commit(args) 2545 2546 def download(self) -> bytes: 2547 """ 2548 Download the raw, binary data of this asset. 2549 2550 Note 1: if the file you are requesting is a text file, you might want to 2551 use .decode() on the result to get a string. For example: 2552 2553 asset.download().decode("utf-8") 2554 2555 Note 2: this method will store the entire file in memory. If you are 2556 downloading a large file, you might want to use download_to_file instead. 2557 """ 2558 2559 return self.allspice_client.requests.get( 2560 self.browser_download_url, 2561 headers=self.allspice_client.headers, 2562 ).content 2563 2564 def download_to_file(self, io: IO): 2565 """ 2566 Download the raw, binary data of this asset to a file-like object. 2567 2568 Example: 2569 2570 with open("my_file.zip", "wb") as f: 2571 asset.download_to_file(f) 2572 2573 :param io: The file-like object to write the data to. 2574 """ 2575 2576 response = self.allspice_client.requests.get( 2577 self.browser_download_url, 2578 headers=self.allspice_client.headers, 2579 stream=True, 2580 ) 2581 # 4kb chunks 2582 for chunk in response.iter_content(chunk_size=4096): 2583 if chunk: 2584 io.write(chunk) 2585 2586 def delete(self): 2587 if self.release is None or self.release.repo is None: 2588 raise ValueError("Cannot commit a release asset without a release or a repository.") 2589 2590 args = { 2591 "owner": self.release.repo.owner.username, 2592 "repo": self.release.repo.name, 2593 "release_id": self.release.id, 2594 "id": self.id, 2595 } 2596 self.allspice_client.requests_delete(self.API_OBJECT.format(**args)) 2597 self.deleted = True
2519 @classmethod 2520 def request( 2521 cls, 2522 allspice_client, 2523 owner: str, 2524 repo: str, 2525 release_id: int, 2526 id: int, 2527 ) -> ReleaseAsset: 2528 args = {"owner": owner, "repo": repo, "release_id": release_id, "id": id} 2529 asset_response = cls._get_gitea_api_object(allspice_client, args) 2530 release = Release.request(allspice_client, owner, repo, release_id) 2531 asset = cls.parse_response(allspice_client, asset_response, release) 2532 return asset
2534 def commit(self): 2535 if self.release is None or self.release.repo is None: 2536 raise ValueError("Cannot commit a release asset without a release or a repository.") 2537 2538 args = { 2539 "owner": self.release.repo.owner.username, 2540 "repo": self.release.repo.name, 2541 "release_id": self.release.id, 2542 "id": self.id, 2543 } 2544 self._commit(args)
2546 def download(self) -> bytes: 2547 """ 2548 Download the raw, binary data of this asset. 2549 2550 Note 1: if the file you are requesting is a text file, you might want to 2551 use .decode() on the result to get a string. For example: 2552 2553 asset.download().decode("utf-8") 2554 2555 Note 2: this method will store the entire file in memory. If you are 2556 downloading a large file, you might want to use download_to_file instead. 2557 """ 2558 2559 return self.allspice_client.requests.get( 2560 self.browser_download_url, 2561 headers=self.allspice_client.headers, 2562 ).content
Download the raw, binary data of this asset.
Note 1: if the file you are requesting is a text file, you might want to use .decode() on the result to get a string. For example:
asset.download().decode("utf-8")
Note 2: this method will store the entire file in memory. If you are downloading a large file, you might want to use download_to_file instead.
2564 def download_to_file(self, io: IO): 2565 """ 2566 Download the raw, binary data of this asset to a file-like object. 2567 2568 Example: 2569 2570 with open("my_file.zip", "wb") as f: 2571 asset.download_to_file(f) 2572 2573 :param io: The file-like object to write the data to. 2574 """ 2575 2576 response = self.allspice_client.requests.get( 2577 self.browser_download_url, 2578 headers=self.allspice_client.headers, 2579 stream=True, 2580 ) 2581 # 4kb chunks 2582 for chunk in response.iter_content(chunk_size=4096): 2583 if chunk: 2584 io.write(chunk)
Download the raw, binary data of this asset to a file-like object.
Example:
with open("my_file.zip", "wb") as f:
asset.download_to_file(f)
Parameters
- io: The file-like object to write the data to.
2586 def delete(self): 2587 if self.release is None or self.release.repo is None: 2588 raise ValueError("Cannot commit a release asset without a release or a repository.") 2589 2590 args = { 2591 "owner": self.release.repo.owner.username, 2592 "repo": self.release.repo.name, 2593 "release_id": self.release.id, 2594 "id": self.id, 2595 } 2596 self.allspice_client.requests_delete(self.API_OBJECT.format(**args)) 2597 self.deleted = True
2600class Content(ReadonlyApiObject): 2601 content: Any 2602 download_url: str 2603 encoding: Any 2604 git_url: str 2605 html_url: str 2606 last_commit_sha: str 2607 name: str 2608 path: str 2609 sha: str 2610 size: int 2611 submodule_git_url: Any 2612 target: Any 2613 type: str 2614 url: str 2615 2616 FILE = "file" 2617 2618 def __init__(self, allspice_client): 2619 super().__init__(allspice_client) 2620 2621 def __eq__(self, other): 2622 if not isinstance(other, Content): 2623 return False 2624 2625 return self.sha == other.sha and self.name == other.name 2626 2627 def __hash__(self): 2628 return hash(self.sha) ^ hash(self.name)
Inherited Members
2634class Util: 2635 @staticmethod 2636 def convert_time(time: str) -> datetime: 2637 """Parsing of strange Gitea time format ("%Y-%m-%dT%H:%M:%S:%z" but with ":" in time zone notation)""" 2638 try: 2639 return datetime.strptime(time[:-3] + "00", "%Y-%m-%dT%H:%M:%S%z") 2640 except ValueError: 2641 return datetime.strptime(time[:-3] + "00", "%Y-%m-%dT%H:%M:%S") 2642 2643 @staticmethod 2644 def format_time(time: datetime) -> str: 2645 """ 2646 Format a datetime object to Gitea's time format. 2647 2648 :param time: The time to format 2649 :return: Formatted time 2650 """ 2651 2652 return time.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S") + "Z" 2653 2654 @staticmethod 2655 def data_params_for_ref(ref: Optional[Ref]) -> Dict: 2656 """ 2657 Given a "ref", returns a dict with the ref parameter for the API call. 2658 2659 If the ref is None, returns an empty dict. You can pass this to the API 2660 directly. 2661 """ 2662 2663 if isinstance(ref, Branch): 2664 return {"ref": ref.name} 2665 elif isinstance(ref, Commit): 2666 return {"ref": ref.sha} 2667 elif ref: 2668 return {"ref": ref} 2669 else: 2670 return {}
2635 @staticmethod 2636 def convert_time(time: str) -> datetime: 2637 """Parsing of strange Gitea time format ("%Y-%m-%dT%H:%M:%S:%z" but with ":" in time zone notation)""" 2638 try: 2639 return datetime.strptime(time[:-3] + "00", "%Y-%m-%dT%H:%M:%S%z") 2640 except ValueError: 2641 return datetime.strptime(time[:-3] + "00", "%Y-%m-%dT%H:%M:%S")
Parsing of strange Gitea time format ("%Y-%m-%dT%H:%M:%S:%z" but with ":" in time zone notation)
2643 @staticmethod 2644 def format_time(time: datetime) -> str: 2645 """ 2646 Format a datetime object to Gitea's time format. 2647 2648 :param time: The time to format 2649 :return: Formatted time 2650 """ 2651 2652 return time.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S") + "Z"
Format a datetime object to Gitea's time format.
Parameters
- time: The time to format
Returns
Formatted time
2654 @staticmethod 2655 def data_params_for_ref(ref: Optional[Ref]) -> Dict: 2656 """ 2657 Given a "ref", returns a dict with the ref parameter for the API call. 2658 2659 If the ref is None, returns an empty dict. You can pass this to the API 2660 directly. 2661 """ 2662 2663 if isinstance(ref, Branch): 2664 return {"ref": ref.name} 2665 elif isinstance(ref, Commit): 2666 return {"ref": ref.sha} 2667 elif ref: 2668 return {"ref": ref} 2669 else: 2670 return {}
Given a "ref", returns a dict with the ref parameter for the API call.
If the ref is None, returns an empty dict. You can pass this to the API directly.