allspice.apiobject
1from __future__ import annotations 2 3import logging 4import re 5from datetime import datetime, timezone 6from enum import Enum 7from functools import cached_property 8from typing import ( 9 IO, 10 Any, 11 ClassVar, 12 Dict, 13 FrozenSet, 14 List, 15 Literal, 16 Optional, 17 Sequence, 18 Set, 19 Tuple, 20 Union, 21) 22 23try: 24 from typing_extensions import Self 25except ImportError: 26 from typing import Self 27 28from .baseapiobject import ApiObject, ReadonlyApiObject 29from .exceptions import ConflictException, NotFoundException 30 31 32class Organization(ApiObject): 33 """see https://hub.allspice.io/api/swagger#/organization/orgGetAll""" 34 35 active: Optional[bool] 36 avatar_url: str 37 created: Optional[str] 38 description: str 39 email: str 40 followers_count: Optional[int] 41 following_count: Optional[int] 42 full_name: str 43 id: int 44 is_admin: Optional[bool] 45 language: Optional[str] 46 last_login: Optional[str] 47 location: str 48 login: Optional[str] 49 login_name: Optional[str] 50 name: Optional[str] 51 prohibit_login: Optional[bool] 52 repo_admin_change_team_access: Optional[bool] 53 restricted: Optional[bool] 54 starred_repos_count: Optional[int] 55 username: str 56 visibility: str 57 website: str 58 59 API_OBJECT = """/orgs/{name}""" # <org> 60 ORG_REPOS_REQUEST = """/orgs/%s/repos""" # <org> 61 ORG_TEAMS_REQUEST = """/orgs/%s/teams""" # <org> 62 ORG_TEAMS_CREATE = """/orgs/%s/teams""" # <org> 63 ORG_GET_MEMBERS = """/orgs/%s/members""" # <org> 64 ORG_IS_MEMBER = """/orgs/%s/members/%s""" # <org>, <username> 65 ORG_HEATMAP = """/users/%s/heatmap""" # <username> 66 67 def __init__(self, allspice_client): 68 super().__init__(allspice_client) 69 70 def __eq__(self, other): 71 if not isinstance(other, Organization): 72 return False 73 return self.allspice_client == other.allspice_client and self.name == other.name 74 75 def __hash__(self): 76 return hash(self.allspice_client) ^ hash(self.name) 77 78 @classmethod 79 def request(cls, allspice_client, name: str) -> Self: 80 return cls._request(allspice_client, {"name": name}) 81 82 @classmethod 83 def parse_response(cls, allspice_client, result) -> "Organization": 84 api_object = super().parse_response(allspice_client, result) 85 # add "name" field to make this behave similar to users for gitea < 1.18 86 # also necessary for repository-owner when org is repo owner 87 if not hasattr(api_object, "name"): 88 Organization._add_read_property("name", result["username"], api_object) 89 return api_object 90 91 _patchable_fields: ClassVar[set[str]] = { 92 "description", 93 "full_name", 94 "location", 95 "visibility", 96 "website", 97 } 98 99 def commit(self): 100 args = {"name": self.name} 101 self._commit(args) 102 103 def create_repo( 104 self, 105 repoName: str, 106 description: str = "", 107 private: bool = False, 108 autoInit=True, 109 gitignores: Optional[str] = None, 110 license: Optional[str] = None, 111 readme: str = "Default", 112 issue_labels: Optional[str] = None, 113 default_branch="master", 114 ): 115 """Create an organization Repository 116 117 Throws: 118 AlreadyExistsException: If the Repository exists already. 119 Exception: If something else went wrong. 120 """ 121 result = self.allspice_client.requests_post( 122 f"/orgs/{self.name}/repos", 123 data={ 124 "name": repoName, 125 "description": description, 126 "private": private, 127 "auto_init": autoInit, 128 "gitignores": gitignores, 129 "license": license, 130 "issue_labels": issue_labels, 131 "readme": readme, 132 "default_branch": default_branch, 133 }, 134 ) 135 if "id" in result: 136 self.allspice_client.logger.info("Successfully created Repository %s " % result["name"]) 137 else: 138 self.allspice_client.logger.error(result["message"]) 139 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 140 return Repository.parse_response(self.allspice_client, result) 141 142 def get_repositories(self) -> List["Repository"]: 143 results = self.allspice_client.requests_get_paginated( 144 Organization.ORG_REPOS_REQUEST % self.username 145 ) 146 return [Repository.parse_response(self.allspice_client, result) for result in results] 147 148 def get_repository(self, name) -> "Repository": 149 repos = self.get_repositories() 150 for repo in repos: 151 if repo.name == name: 152 return repo 153 raise NotFoundException("Repository %s not existent in organization." % name) 154 155 def get_teams(self) -> List["Team"]: 156 results = self.allspice_client.requests_get(Organization.ORG_TEAMS_REQUEST % self.username) 157 teams = [Team.parse_response(self.allspice_client, result) for result in results] 158 # organisation seems to be missing using this request, so we add org manually 159 for t in teams: 160 setattr(t, "_organization", self) 161 return teams 162 163 def get_team(self, name) -> "Team": 164 teams = self.get_teams() 165 for team in teams: 166 if team.name == name: 167 return team 168 raise NotFoundException("Team not existent in organization.") 169 170 def create_team( 171 self, 172 name: str, 173 description: str = "", 174 permission: str = "read", 175 can_create_org_repo: bool = False, 176 includes_all_repositories: bool = False, 177 units=( 178 "repo.code", 179 "repo.issues", 180 "repo.ext_issues", 181 "repo.wiki", 182 "repo.pulls", 183 "repo.releases", 184 "repo.ext_wiki", 185 ), 186 units_map={}, 187 ) -> "Team": 188 """Alias for AllSpice#create_team""" 189 # TODO: Move AllSpice#create_team to Organization#create_team and 190 # deprecate AllSpice#create_team. 191 return self.allspice_client.create_team( 192 org=self, 193 name=name, 194 description=description, 195 permission=permission, 196 can_create_org_repo=can_create_org_repo, 197 includes_all_repositories=includes_all_repositories, 198 units=units, 199 units_map=units_map, 200 ) 201 202 def get_members(self) -> List["User"]: 203 results = self.allspice_client.requests_get(Organization.ORG_GET_MEMBERS % self.username) 204 return [User.parse_response(self.allspice_client, result) for result in results] 205 206 def is_member(self, username) -> bool: 207 if isinstance(username, User): 208 username = username.username 209 try: 210 # returns 204 if its ok, 404 if its not 211 self.allspice_client.requests_get( 212 Organization.ORG_IS_MEMBER % (self.username, username) 213 ) 214 return True 215 except Exception: 216 return False 217 218 def remove_member(self, user: "User"): 219 path = f"/orgs/{self.username}/members/{user.username}" 220 self.allspice_client.requests_delete(path) 221 222 def delete(self): 223 """Delete this Organization. Invalidates this Objects data. Also deletes all Repositories owned by the User""" 224 for repo in self.get_repositories(): 225 repo.delete() 226 self.allspice_client.requests_delete(Organization.API_OBJECT.format(name=self.username)) 227 self.deleted = True 228 229 def get_heatmap(self) -> List[Tuple[datetime, int]]: 230 results = self.allspice_client.requests_get(User.USER_HEATMAP % self.username) 231 results = [ 232 (datetime.fromtimestamp(result["timestamp"]), result["contributions"]) 233 for result in results 234 ] 235 return results 236 237 238class User(ApiObject): 239 active: bool 240 admin: Any 241 allow_create_organization: Any 242 allow_git_hook: Any 243 allow_import_local: Any 244 avatar_url: str 245 created: str 246 description: str 247 email: str 248 emails: List[Any] 249 followers_count: int 250 following_count: int 251 full_name: str 252 id: int 253 is_admin: bool 254 language: str 255 last_login: str 256 location: str 257 login: str 258 login_name: str 259 max_repo_creation: Any 260 must_change_password: Any 261 password: Any 262 prohibit_login: bool 263 restricted: bool 264 starred_repos_count: int 265 username: str 266 visibility: str 267 website: str 268 269 API_OBJECT = """/users/{name}""" # <org> 270 USER_MAIL = """/user/emails?sudo=%s""" # <name> 271 USER_PATCH = """/admin/users/%s""" # <username> 272 ADMIN_DELETE_USER = """/admin/users/%s""" # <username> 273 ADMIN_EDIT_USER = """/admin/users/{username}""" # <username> 274 USER_HEATMAP = """/users/%s/heatmap""" # <username> 275 276 def __init__(self, allspice_client): 277 super().__init__(allspice_client) 278 self._emails = [] 279 280 def __eq__(self, other): 281 if not isinstance(other, User): 282 return False 283 return self.allspice_client == other.allspice_client and self.id == other.id 284 285 def __hash__(self): 286 return hash(self.allspice_client) ^ hash(self.id) 287 288 @property 289 def emails(self): 290 self.__request_emails() 291 return self._emails 292 293 @classmethod 294 def request(cls, allspice_client, name: str) -> "User": 295 api_object = cls._request(allspice_client, {"name": name}) 296 return api_object 297 298 _patchable_fields: ClassVar[set[str]] = { 299 "active", 300 "admin", 301 "allow_create_organization", 302 "allow_git_hook", 303 "allow_import_local", 304 "email", 305 "full_name", 306 "location", 307 "login_name", 308 "max_repo_creation", 309 "must_change_password", 310 "password", 311 "prohibit_login", 312 "website", 313 } 314 315 def commit(self, login_name: str, source_id: int = 0): 316 """ 317 Unfortunately it is necessary to require the login name 318 as well as the login source (that is not supplied when getting a user) for 319 changing a user. 320 Usually source_id is 0 and the login_name is equal to the username. 321 """ 322 values = self.get_dirty_fields() 323 values.update( 324 # api-doc says that the "source_id" is necessary; works without though 325 {"login_name": login_name, "source_id": source_id} 326 ) 327 args = {"username": self.username} 328 self.allspice_client.requests_patch(User.ADMIN_EDIT_USER.format(**args), data=values) 329 self._dirty_fields = {} 330 331 def create_repo( 332 self, 333 repoName: str, 334 description: str = "", 335 private: bool = False, 336 autoInit=True, 337 gitignores: Optional[str] = None, 338 license: Optional[str] = None, 339 readme: str = "Default", 340 issue_labels: Optional[str] = None, 341 default_branch="master", 342 ): 343 """Create a user Repository 344 345 Throws: 346 AlreadyExistsException: If the Repository exists already. 347 Exception: If something else went wrong. 348 """ 349 result = self.allspice_client.requests_post( 350 "/user/repos", 351 data={ 352 "name": repoName, 353 "description": description, 354 "private": private, 355 "auto_init": autoInit, 356 "gitignores": gitignores, 357 "license": license, 358 "issue_labels": issue_labels, 359 "readme": readme, 360 "default_branch": default_branch, 361 }, 362 ) 363 if "id" in result: 364 self.allspice_client.logger.info("Successfully created Repository %s " % result["name"]) 365 else: 366 self.allspice_client.logger.error(result["message"]) 367 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 368 return Repository.parse_response(self.allspice_client, result) 369 370 def get_repositories(self) -> List["Repository"]: 371 """Get all Repositories owned by this User.""" 372 url = f"/users/{self.username}/repos" 373 results = self.allspice_client.requests_get_paginated(url) 374 return [Repository.parse_response(self.allspice_client, result) for result in results] 375 376 def get_orgs(self) -> List[Organization]: 377 """Get all Organizations this user is a member of.""" 378 url = f"/users/{self.username}/orgs" 379 results = self.allspice_client.requests_get_paginated(url) 380 return [Organization.parse_response(self.allspice_client, result) for result in results] 381 382 def get_teams(self) -> List["Team"]: 383 url = "/user/teams" 384 results = self.allspice_client.requests_get_paginated(url, sudo=self) 385 return [Team.parse_response(self.allspice_client, result) for result in results] 386 387 def get_accessible_repos(self) -> List["Repository"]: 388 """Get all Repositories accessible by the logged in User.""" 389 results = self.allspice_client.requests_get("/user/repos", sudo=self) 390 return [Repository.parse_response(self.allspice_client, result) for result in results] 391 392 def __request_emails(self): 393 result = self.allspice_client.requests_get(User.USER_MAIL % self.login) 394 # report if the adress changed by this 395 for mail in result: 396 self._emails.append(mail["email"]) 397 if mail["primary"]: 398 self._email = mail["email"] 399 400 def delete(self): 401 """Deletes this User. Also deletes all Repositories he owns.""" 402 self.allspice_client.requests_delete(User.ADMIN_DELETE_USER % self.username) 403 self.deleted = True 404 405 def get_heatmap(self) -> List[Tuple[datetime, int]]: 406 results = self.allspice_client.requests_get(User.USER_HEATMAP % self.username) 407 results = [ 408 (datetime.fromtimestamp(result["timestamp"]), result["contributions"]) 409 for result in results 410 ] 411 return results 412 413 414class Branch(ReadonlyApiObject): 415 commit: Dict[str, Optional[Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]]] 416 effective_branch_protection_name: str 417 enable_status_check: bool 418 name: str 419 protected: bool 420 required_approvals: int 421 status_check_contexts: List[Any] 422 user_can_merge: bool 423 user_can_push: bool 424 425 API_OBJECT = """/repos/{owner}/{repo}/branches/{branch}""" 426 427 def __init__(self, allspice_client): 428 super().__init__(allspice_client) 429 430 def __eq__(self, other): 431 if not isinstance(other, Branch): 432 return False 433 return self.commit == other.commit and self.name == other.name 434 435 def __hash__(self): 436 return hash(self.commit["id"]) ^ hash(self.name) 437 438 _fields_to_parsers: ClassVar[dict] = { 439 # This is not a commit object 440 # "commit": lambda allspice_client, c: Commit.parse_response(allspice_client, c) 441 } 442 443 @classmethod 444 def request(cls, allspice_client, owner: str, repo: str, branch: str): 445 return cls._request(allspice_client, {"owner": owner, "repo": repo, "branch": branch}) 446 447 448class GitEntry(ReadonlyApiObject): 449 """ 450 An object representing a file or directory in the Git tree. 451 """ 452 453 mode: str 454 path: str 455 sha: str 456 size: int 457 type: str 458 url: str 459 460 def __init__(self, allspice_client): 461 super().__init__(allspice_client) 462 463 def __eq__(self, other) -> bool: 464 if not isinstance(other, GitEntry): 465 return False 466 return self.sha == other.sha 467 468 def __hash__(self) -> int: 469 return hash(self.sha) 470 471 472class Repository(ApiObject): 473 allow_manual_merge: Any 474 allow_merge_commits: bool 475 allow_rebase: bool 476 allow_rebase_explicit: bool 477 allow_rebase_update: bool 478 allow_squash_merge: bool 479 archived: bool 480 archived_at: str 481 autodetect_manual_merge: Any 482 avatar_url: str 483 clone_url: str 484 created_at: str 485 default_allow_maintainer_edit: bool 486 default_branch: str 487 default_delete_branch_after_merge: bool 488 default_merge_style: str 489 description: str 490 empty: bool 491 enable_prune: Any 492 external_tracker: Any 493 external_wiki: Any 494 fork: bool 495 forks_count: int 496 full_name: str 497 has_actions: bool 498 has_issues: bool 499 has_packages: bool 500 has_projects: bool 501 has_pull_requests: bool 502 has_releases: bool 503 has_wiki: bool 504 html_url: str 505 id: int 506 ignore_whitespace_conflicts: bool 507 internal: bool 508 internal_tracker: Dict[str, bool] 509 language: str 510 languages_url: str 511 link: str 512 mirror: bool 513 mirror_interval: str 514 mirror_updated: str 515 name: str 516 open_issues_count: int 517 open_pr_counter: int 518 original_url: str 519 owner: Union["User", "Organization"] 520 parent: Any 521 permissions: Dict[str, bool] 522 private: bool 523 release_counter: int 524 repo_transfer: Any 525 size: int 526 ssh_url: str 527 stars_count: int 528 template: bool 529 updated_at: datetime 530 url: str 531 watchers_count: int 532 website: str 533 534 API_OBJECT = """/repos/{owner}/{name}""" # <owner>, <reponame> 535 REPO_IS_COLLABORATOR = """/repos/%s/%s/collaborators/%s""" # <owner>, <reponame>, <username> 536 REPO_SEARCH = """/repos/search/""" 537 REPO_BRANCHES = """/repos/%s/%s/branches""" # <owner>, <reponame> 538 REPO_BRANCH = """/repos/{owner}/{repo}/branches/{branch}""" 539 REPO_ISSUES = """/repos/{owner}/{repo}/issues""" # <owner, reponame> 540 REPO_DESIGN_REVIEWS = """/repos/{owner}/{repo}/pulls""" 541 REPO_DELETE = """/repos/%s/%s""" # <owner>, <reponame> 542 REPO_TIMES = """/repos/%s/%s/times""" # <owner>, <reponame> 543 REPO_USER_TIME = """/repos/%s/%s/times/%s""" # <owner>, <reponame>, <username> 544 REPO_COMMITS = "/repos/%s/%s/commits" # <owner>, <reponame> 545 REPO_TRANSFER = "/repos/{owner}/{repo}/transfer" 546 REPO_MILESTONES = """/repos/{owner}/{repo}/milestones""" 547 REPO_GET_ARCHIVE = "/repos/{owner}/{repo}/archive/{ref}.{format}" 548 REPO_GET_ALLSPICE_JSON = "/repos/{owner}/{repo}/allspice_generated/json/{content}" 549 REPO_GET_ALLSPICE_SVG = "/repos/{owner}/{repo}/allspice_generated/svg/{content}" 550 REPO_GET_TOPICS = "/repos/{owner}/{repo}/topics" 551 REPO_ADD_TOPIC = "/repos/{owner}/{repo}/topics/{topic}" 552 REPO_GET_RELEASES = "/repos/{owner}/{repo}/releases" 553 REPO_GET_LATEST_RELEASE = "/repos/{owner}/{repo}/releases/latest" 554 REPO_GET_RELEASE_BY_TAG = "/repos/{owner}/{repo}/releases/tags/{tag}" 555 REPO_GET_COMMIT_STATUS = "/repos/{owner}/{repo}/statuses/{sha}" 556 REPO_GET_RAW_FILE = "/repos/{owner}/{repo}/raw/{path}" 557 REPO_GET_TREE = "/repos/{owner}/{repo}/git/trees/{ref}" 558 559 class ArchiveFormat(Enum): 560 """ 561 Archive formats for Repository.get_archive 562 """ 563 564 TAR = "tar.gz" 565 ZIP = "zip" 566 567 class CommitStatusSort(Enum): 568 """ 569 Sort order for Repository.get_commit_status 570 """ 571 572 OLDEST = "oldest" 573 RECENT_UPDATE = "recentupdate" 574 LEAST_UPDATE = "leastupdate" 575 LEAST_INDEX = "leastindex" 576 HIGHEST_INDEX = "highestindex" 577 578 def __init__(self, allspice_client): 579 super().__init__(allspice_client) 580 581 def __eq__(self, other): 582 if not isinstance(other, Repository): 583 return False 584 return self.owner == other.owner and self.name == other.name 585 586 def __hash__(self): 587 return hash(self.owner) ^ hash(self.name) 588 589 _fields_to_parsers: ClassVar[dict] = { 590 # dont know how to tell apart user and org as owner except form email being empty. 591 "owner": lambda allspice_client, r: ( 592 Organization.parse_response(allspice_client, r) 593 if r["email"] == "" 594 else User.parse_response(allspice_client, r) 595 ), 596 "updated_at": lambda _, t: Util.convert_time(t), 597 } 598 599 @classmethod 600 def request( 601 cls, 602 allspice_client, 603 owner: str, 604 name: str, 605 ) -> Repository: 606 return cls._request(allspice_client, {"owner": owner, "name": name}) 607 608 @classmethod 609 def search( 610 cls, 611 allspice_client, 612 query: Optional[str] = None, 613 topic: bool = False, 614 include_description: bool = False, 615 user: Optional[User] = None, 616 owner_to_prioritize: Union[User, Organization, None] = None, 617 ) -> list[Repository]: 618 """ 619 Search for repositories. 620 621 See https://hub.allspice.io/api/swagger#/repository/repoSearch 622 623 :param query: The query string to search for 624 :param topic: If true, the query string will only be matched against the 625 repository's topic. 626 :param include_description: If true, the query string will be matched 627 against the repository's description as well. 628 :param user: If specified, only repositories that this user owns or 629 contributes to will be searched. 630 :param owner_to_prioritize: If specified, repositories owned by the 631 given entity will be prioritized in the search. 632 :returns: All repositories matching the query. If there are many 633 repositories matching this query, this may take some time. 634 """ 635 636 params = {} 637 638 if query is not None: 639 params["q"] = query 640 if topic: 641 params["topic"] = topic 642 if include_description: 643 params["include_description"] = include_description 644 if user is not None: 645 params["user"] = user.id 646 if owner_to_prioritize is not None: 647 params["owner_to_prioritize"] = owner_to_prioritize.id 648 649 responses = allspice_client.requests_get_paginated(cls.REPO_SEARCH, params=params) 650 651 return [Repository.parse_response(allspice_client, response) for response in responses] 652 653 _patchable_fields: ClassVar[set[str]] = { 654 "allow_manual_merge", 655 "allow_merge_commits", 656 "allow_rebase", 657 "allow_rebase_explicit", 658 "allow_rebase_update", 659 "allow_squash_merge", 660 "archived", 661 "autodetect_manual_merge", 662 "default_branch", 663 "default_delete_branch_after_merge", 664 "default_merge_style", 665 "description", 666 "enable_prune", 667 "external_tracker", 668 "external_wiki", 669 "has_issues", 670 "has_projects", 671 "has_pull_requests", 672 "has_wiki", 673 "ignore_whitespace_conflicts", 674 "internal_tracker", 675 "mirror_interval", 676 "name", 677 "private", 678 "template", 679 "website", 680 } 681 682 def commit(self): 683 args = {"owner": self.owner.username, "name": self.name} 684 self._commit(args) 685 686 def get_branches(self) -> List["Branch"]: 687 """Get all the Branches of this Repository.""" 688 689 results = self.allspice_client.requests_get_paginated( 690 Repository.REPO_BRANCHES % (self.owner.username, self.name) 691 ) 692 return [Branch.parse_response(self.allspice_client, result) for result in results] 693 694 def get_branch(self, name: str) -> "Branch": 695 """Get a specific Branch of this Repository.""" 696 result = self.allspice_client.requests_get( 697 Repository.REPO_BRANCH.format(owner=self.owner.username, repo=self.name, branch=name) 698 ) 699 return Branch.parse_response(self.allspice_client, result) 700 701 def add_branch(self, create_from: Ref, newname: str) -> "Branch": 702 """Add a branch to the repository""" 703 # Note: will only work with gitea 1.13 or higher! 704 705 ref_name = Util.data_params_for_ref(create_from) 706 if "ref" not in ref_name: 707 raise ValueError("create_from must be a Branch, Commit or string") 708 ref_name = ref_name["ref"] 709 710 data = {"new_branch_name": newname, "old_ref_name": ref_name} 711 result = self.allspice_client.requests_post( 712 Repository.REPO_BRANCHES % (self.owner.username, self.name), data=data 713 ) 714 return Branch.parse_response(self.allspice_client, result) 715 716 def get_issues( 717 self, 718 state: Literal["open", "closed", "all"] = "all", 719 search_query: Optional[str] = None, 720 labels: Optional[List[str]] = None, 721 milestones: Optional[List[Union[Milestone, str]]] = None, 722 assignee: Optional[Union[User, str]] = None, 723 since: Optional[datetime] = None, 724 before: Optional[datetime] = None, 725 ) -> List["Issue"]: 726 """ 727 Get all Issues of this Repository (open and closed) 728 729 https://hub.allspice.io/api/swagger#/repository/repoListIssues 730 731 All params of this method are optional filters. If you don't specify a filter, it 732 will not be applied. 733 734 :param state: The state of the Issues to get. If None, all Issues are returned. 735 :param search_query: Filter issues by text. This is equivalent to searching for 736 `search_query` in the Issues on the web interface. 737 :param labels: Filter issues by labels. 738 :param milestones: Filter issues by milestones. 739 :param assignee: Filter issues by the assigned user. 740 :param since: Filter issues by the date they were created. 741 :param before: Filter issues by the date they were created. 742 :return: A list of Issues. 743 """ 744 745 data = { 746 "state": state, 747 } 748 if search_query: 749 data["q"] = search_query 750 if labels: 751 data["labels"] = ",".join(labels) 752 if milestones: 753 data["milestone"] = ",".join( 754 [ 755 milestone.name if isinstance(milestone, Milestone) else milestone 756 for milestone in milestones 757 ] 758 ) 759 if assignee: 760 if isinstance(assignee, User): 761 data["assignee"] = assignee.username 762 else: 763 data["assignee"] = assignee 764 if since: 765 data["since"] = Util.format_time(since) 766 if before: 767 data["before"] = Util.format_time(before) 768 769 results = self.allspice_client.requests_get_paginated( 770 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 771 params=data, 772 ) 773 774 issues = [] 775 for result in results: 776 issue = Issue.parse_response(self.allspice_client, result) 777 # See Issue.request 778 setattr(issue, "_repository", self) 779 # This is mostly for compatibility with an older implementation 780 Issue._add_read_property("repo", self, issue) 781 issues.append(issue) 782 783 return issues 784 785 def get_design_reviews( 786 self, 787 state: Literal["open", "closed", "all"] = "all", 788 milestone: Optional[Union[Milestone, str]] = None, 789 labels: Optional[List[str]] = None, 790 ) -> List["DesignReview"]: 791 """ 792 Get all Design Reviews of this Repository. 793 794 https://hub.allspice.io/api/swagger#/repository/repoListPullRequests 795 796 :param state: The state of the Design Reviews to get. If None, all Design Reviews 797 are returned. 798 :param milestone: The milestone of the Design Reviews to get. 799 :param labels: A list of label IDs to filter DRs by. 800 :return: A list of Design Reviews. 801 """ 802 803 params = { 804 "state": state, 805 } 806 if milestone: 807 if isinstance(milestone, Milestone): 808 params["milestone"] = milestone.name 809 else: 810 params["milestone"] = milestone 811 if labels: 812 params["labels"] = ",".join(labels) 813 814 results = self.allspice_client.requests_get_paginated( 815 self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name), 816 params=params, 817 ) 818 return [DesignReview.parse_response(self.allspice_client, result) for result in results] 819 820 def get_commits( 821 self, 822 sha: Optional[str] = None, 823 path: Optional[str] = None, 824 stat: bool = True, 825 ) -> List["Commit"]: 826 """ 827 Get all the Commits of this Repository. 828 829 https://hub.allspice.io/api/swagger#/repository/repoGetAllCommits 830 831 :param sha: The SHA of the commit to start listing commits from. 832 :param path: filepath of a file/dir. 833 :param stat: Include the number of additions and deletions in the response. 834 Disable for speedup. 835 :return: A list of Commits. 836 """ 837 838 data = {} 839 if sha: 840 data["sha"] = sha 841 if path: 842 data["path"] = path 843 if not stat: 844 data["stat"] = False 845 846 try: 847 results = self.allspice_client.requests_get_paginated( 848 Repository.REPO_COMMITS % (self.owner.username, self.name), 849 params=data, 850 ) 851 except ConflictException as err: 852 logging.warning(err) 853 logging.warning("Repository %s/%s is Empty" % (self.owner.username, self.name)) 854 results = [] 855 return [Commit.parse_response(self.allspice_client, result) for result in results] 856 857 def get_issues_state(self, state) -> List["Issue"]: 858 """ 859 DEPRECATED: Use get_issues() instead. 860 861 Get issues of state Issue.open or Issue.closed of a repository. 862 """ 863 864 assert state in [Issue.OPENED, Issue.CLOSED] 865 issues = [] 866 data = {"state": state} 867 results = self.allspice_client.requests_get_paginated( 868 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 869 params=data, 870 ) 871 for result in results: 872 issue = Issue.parse_response(self.allspice_client, result) 873 # adding data not contained in the issue response 874 # See Issue.request() 875 setattr(issue, "_repository", self) 876 Issue._add_read_property("repo", self, issue) 877 Issue._add_read_property("owner", self.owner, issue) 878 issues.append(issue) 879 return issues 880 881 def get_times(self): 882 results = self.allspice_client.requests_get( 883 Repository.REPO_TIMES % (self.owner.username, self.name) 884 ) 885 return results 886 887 def get_user_time(self, username) -> float: 888 if isinstance(username, User): 889 username = username.username 890 results = self.allspice_client.requests_get( 891 Repository.REPO_USER_TIME % (self.owner.username, self.name, username) 892 ) 893 time = sum(r["time"] for r in results) 894 return time 895 896 def get_full_name(self) -> str: 897 return self.owner.username + "/" + self.name 898 899 def create_issue(self, title, assignees=frozenset(), description="") -> ApiObject: 900 data = { 901 "assignees": assignees, 902 "body": description, 903 "closed": False, 904 "title": title, 905 } 906 result = self.allspice_client.requests_post( 907 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 908 data=data, 909 ) 910 911 issue = Issue.parse_response(self.allspice_client, result) 912 setattr(issue, "_repository", self) 913 Issue._add_read_property("repo", self, issue) 914 return issue 915 916 def create_design_review( 917 self, 918 title: str, 919 head: Union[Branch, str], 920 base: Union[Branch, str], 921 assignees: Optional[Set[Union[User, str]]] = None, 922 body: Optional[str] = None, 923 due_date: Optional[datetime] = None, 924 milestone: Optional["Milestone"] = None, 925 ) -> "DesignReview": 926 """ 927 Create a new Design Review. 928 929 See https://hub.allspice.io/api/swagger#/repository/repoCreatePullRequest 930 931 :param title: Title of the Design Review 932 :param head: Branch or name of the branch to merge into the base branch 933 :param base: Branch or name of the branch to merge into 934 :param assignees: Optional. A list of users to assign this review. List can be of 935 User objects or of usernames. 936 :param body: An Optional Description for the Design Review. 937 :param due_date: An Optional Due date for the Design Review. 938 :param milestone: An Optional Milestone for the Design Review 939 :return: The created Design Review 940 """ 941 942 data: dict[str, Any] = { 943 "title": title, 944 } 945 946 if isinstance(head, Branch): 947 data["head"] = head.name 948 else: 949 data["head"] = head 950 if isinstance(base, Branch): 951 data["base"] = base.name 952 else: 953 data["base"] = base 954 if assignees: 955 data["assignees"] = [a.username if isinstance(a, User) else a for a in assignees] 956 if body: 957 data["body"] = body 958 if due_date: 959 data["due_date"] = Util.format_time(due_date) 960 if milestone: 961 data["milestone"] = milestone.id 962 963 result = self.allspice_client.requests_post( 964 self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name), 965 data=data, 966 ) 967 968 return DesignReview.parse_response(self.allspice_client, result) 969 970 def create_milestone( 971 self, 972 title: str, 973 description: str, 974 due_date: Optional[str] = None, 975 state: str = "open", 976 ) -> "Milestone": 977 url = Repository.REPO_MILESTONES.format(owner=self.owner.username, repo=self.name) 978 data = {"title": title, "description": description, "state": state} 979 if due_date: 980 data["due_date"] = due_date 981 result = self.allspice_client.requests_post(url, data=data) 982 return Milestone.parse_response(self.allspice_client, result) 983 984 def create_gitea_hook(self, hook_url: str, events: List[str]): 985 url = f"/repos/{self.owner.username}/{self.name}/hooks" 986 data = { 987 "type": "gitea", 988 "config": {"content_type": "json", "url": hook_url}, 989 "events": events, 990 "active": True, 991 } 992 return self.allspice_client.requests_post(url, data=data) 993 994 def list_hooks(self): 995 url = f"/repos/{self.owner.username}/{self.name}/hooks" 996 return self.allspice_client.requests_get(url) 997 998 def delete_hook(self, id: str): 999 url = f"/repos/{self.owner.username}/{self.name}/hooks/{id}" 1000 self.allspice_client.requests_delete(url) 1001 1002 def is_collaborator(self, username) -> bool: 1003 if isinstance(username, User): 1004 username = username.username 1005 try: 1006 # returns 204 if its ok, 404 if its not 1007 self.allspice_client.requests_get( 1008 Repository.REPO_IS_COLLABORATOR % (self.owner.username, self.name, username) 1009 ) 1010 return True 1011 except Exception: 1012 return False 1013 1014 def get_users_with_access(self) -> Sequence[User]: 1015 url = f"/repos/{self.owner.username}/{self.name}/collaborators" 1016 response = self.allspice_client.requests_get(url) 1017 collabs = [User.parse_response(self.allspice_client, user) for user in response] 1018 if isinstance(self.owner, User): 1019 return [*collabs, self.owner] 1020 else: 1021 # owner must be org 1022 teams = self.owner.get_teams() 1023 for team in teams: 1024 team_repos = team.get_repos() 1025 if self.name in [n.name for n in team_repos]: 1026 collabs += team.get_members() 1027 return collabs 1028 1029 def remove_collaborator(self, user_name: str): 1030 url = f"/repos/{self.owner.username}/{self.name}/collaborators/{user_name}" 1031 self.allspice_client.requests_delete(url) 1032 1033 def transfer_ownership( 1034 self, 1035 new_owner: Union[User, Organization], 1036 new_teams: Set[Team] | FrozenSet[Team] = frozenset(), 1037 ): 1038 url = Repository.REPO_TRANSFER.format(owner=self.owner.username, repo=self.name) 1039 data: dict[str, Any] = {"new_owner": new_owner.username} 1040 if isinstance(new_owner, Organization): 1041 new_team_ids = [team.id for team in new_teams if team in new_owner.get_teams()] 1042 data["team_ids"] = new_team_ids 1043 self.allspice_client.requests_post(url, data=data) 1044 # TODO: make sure this instance is either updated or discarded 1045 1046 def get_git_content( 1047 self, 1048 ref: Optional["Ref"] = None, 1049 commit: "Optional[Commit]" = None, 1050 ) -> List[Content]: 1051 """ 1052 Get the metadata for all files in the root directory. 1053 1054 https://hub.allspice.io/api/swagger#/repository/repoGetContentsList 1055 1056 :param ref: branch or commit to get content from 1057 :param commit: commit to get content from (deprecated) 1058 """ 1059 url = f"/repos/{self.owner.username}/{self.name}/contents" 1060 data = Util.data_params_for_ref(ref or commit) 1061 1062 result = [ 1063 Content.parse_response(self.allspice_client, f) 1064 for f in self.allspice_client.requests_get(url, data) 1065 ] 1066 return result 1067 1068 def get_tree(self, ref: Optional[Ref] = None, recursive: bool = False) -> List[GitEntry]: 1069 """ 1070 Get the repository's tree on a given ref. 1071 1072 By default, this will only return the top-level entries in the tree. If you want 1073 to get the entire tree, set `recursive` to True. 1074 1075 :param ref: The ref to get the tree from. If not provided, the default branch is used. 1076 :param recursive: Whether to get the entire tree or just the top-level entries. 1077 """ 1078 1079 ref = Util.data_params_for_ref(ref).get("ref", self.default_branch) 1080 url = self.REPO_GET_TREE.format(owner=self.owner.username, repo=self.name, ref=ref) 1081 params = {"recursive": recursive} 1082 results = self.allspice_client.requests_get_paginated(url, params=params) 1083 return [GitEntry.parse_response(self.allspice_client, result) for result in results] 1084 1085 def get_file_content( 1086 self, 1087 content: Content, 1088 ref: Optional[Ref] = None, 1089 commit: Optional[Commit] = None, 1090 ) -> Union[str, List["Content"]]: 1091 """https://hub.allspice.io/api/swagger#/repository/repoGetContents""" 1092 url = f"/repos/{self.owner.username}/{self.name}/contents/{content.path}" 1093 data = Util.data_params_for_ref(ref or commit) 1094 1095 if content.type == Content.FILE: 1096 return self.allspice_client.requests_get(url, data)["content"] 1097 else: 1098 return [ 1099 Content.parse_response(self.allspice_client, f) 1100 for f in self.allspice_client.requests_get(url, data) 1101 ] 1102 1103 def get_raw_file( 1104 self, 1105 file_path: str, 1106 ref: Optional[Ref] = None, 1107 ) -> bytes: 1108 """ 1109 Get the raw, binary data of a single file. 1110 1111 Note 1: if the file you are requesting is a text file, you might want to 1112 use .decode() on the result to get a string. For example: 1113 1114 content = repo.get_raw_file("file.txt").decode("utf-8") 1115 1116 Note 2: this method will store the entire file in memory. If you want 1117 to download a large file, you might want to use `download_to_file` 1118 instead. 1119 1120 See https://hub.allspice.io/api/swagger#/repository/repoGetRawFile 1121 1122 :param file_path: The path to the file to get. 1123 :param ref: The branch or commit to get the file from. If not provided, 1124 the default branch is used. 1125 """ 1126 1127 url = self.REPO_GET_RAW_FILE.format( 1128 owner=self.owner.username, 1129 repo=self.name, 1130 path=file_path, 1131 ) 1132 params = Util.data_params_for_ref(ref) 1133 return self.allspice_client.requests_get_raw(url, params=params) 1134 1135 def download_to_file( 1136 self, 1137 file_path: str, 1138 io: IO, 1139 ref: Optional[Ref] = None, 1140 ) -> None: 1141 """ 1142 Download the binary data of a file to a file-like object. 1143 1144 Example: 1145 1146 with open("schematic.DSN", "wb") as f: 1147 Repository.download_to_file("Schematics/my_schematic.DSN", f) 1148 1149 :param file_path: The path to the file in the repository from the root 1150 of the repository. 1151 :param io: The file-like object to write the data to. 1152 """ 1153 1154 url = self.allspice_client._AllSpice__get_url( 1155 self.REPO_GET_RAW_FILE.format( 1156 owner=self.owner.username, 1157 repo=self.name, 1158 path=file_path, 1159 ) 1160 ) 1161 params = Util.data_params_for_ref(ref) 1162 response = self.allspice_client.requests.get( 1163 url, 1164 params=params, 1165 headers=self.allspice_client.headers, 1166 stream=True, 1167 ) 1168 1169 for chunk in response.iter_content(chunk_size=4096): 1170 if chunk: 1171 io.write(chunk) 1172 1173 def get_generated_json(self, content: Union[Content, str], ref: Optional[Ref] = None) -> dict: 1174 """ 1175 Get the json blob for a cad file if it exists, otherwise enqueue 1176 a new job and return a 503 status. 1177 1178 WARNING: This is still experimental and not recommended for critical 1179 applications. The structure and content of the returned dictionary can 1180 change at any time. 1181 1182 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON 1183 """ 1184 1185 if isinstance(content, Content): 1186 content = content.path 1187 1188 url = self.REPO_GET_ALLSPICE_JSON.format( 1189 owner=self.owner.username, 1190 repo=self.name, 1191 content=content, 1192 ) 1193 data = Util.data_params_for_ref(ref) 1194 return self.allspice_client.requests_get(url, data) 1195 1196 def get_generated_svg(self, content: Union[Content, str], ref: Optional[Ref] = None) -> bytes: 1197 """ 1198 Get the svg blob for a cad file if it exists, otherwise enqueue 1199 a new job and return a 503 status. 1200 1201 WARNING: This is still experimental and not yet recommended for 1202 critical applications. The content of the returned svg can change 1203 at any time. 1204 1205 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG 1206 """ 1207 1208 if isinstance(content, Content): 1209 content = content.path 1210 1211 url = self.REPO_GET_ALLSPICE_SVG.format( 1212 owner=self.owner.username, 1213 repo=self.name, 1214 content=content, 1215 ) 1216 data = Util.data_params_for_ref(ref) 1217 return self.allspice_client.requests_get_raw(url, data) 1218 1219 def create_file(self, file_path: str, content: str, data: Optional[dict] = None): 1220 """https://hub.allspice.io/api/swagger#/repository/repoCreateFile""" 1221 if not data: 1222 data = {} 1223 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1224 data.update({"content": content}) 1225 return self.allspice_client.requests_post(url, data) 1226 1227 def change_file(self, file_path: str, file_sha: str, content: str, data: Optional[dict] = None): 1228 """https://hub.allspice.io/api/swagger#/repository/repoCreateFile""" 1229 if not data: 1230 data = {} 1231 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1232 data.update({"sha": file_sha, "content": content}) 1233 return self.allspice_client.requests_put(url, data) 1234 1235 def delete_file(self, file_path: str, file_sha: str, data: Optional[dict] = None): 1236 """https://hub.allspice.io/api/swagger#/repository/repoDeleteFile""" 1237 if not data: 1238 data = {} 1239 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1240 data.update({"sha": file_sha}) 1241 return self.allspice_client.requests_delete(url, data) 1242 1243 def get_archive( 1244 self, 1245 ref: Ref = "main", 1246 archive_format: ArchiveFormat = ArchiveFormat.ZIP, 1247 ) -> bytes: 1248 """ 1249 Download all the files in a specific ref of a repository as a zip or tarball 1250 archive. 1251 1252 https://hub.allspice.io/api/swagger#/repository/repoGetArchive 1253 1254 :param ref: branch or commit to get content from, defaults to the "main" branch 1255 :param archive_format: zip or tar, defaults to zip 1256 """ 1257 1258 ref_string = Util.data_params_for_ref(ref)["ref"] 1259 url = self.REPO_GET_ARCHIVE.format( 1260 owner=self.owner.username, 1261 repo=self.name, 1262 ref=ref_string, 1263 format=archive_format.value, 1264 ) 1265 return self.allspice_client.requests_get_raw(url) 1266 1267 def get_topics(self) -> list[str]: 1268 """ 1269 Gets the list of topics on this repository. 1270 1271 See http://localhost:3000/api/swagger#/repository/repoListTopics 1272 """ 1273 1274 url = self.REPO_GET_TOPICS.format( 1275 owner=self.owner.username, 1276 repo=self.name, 1277 ) 1278 return self.allspice_client.requests_get(url)["topics"] 1279 1280 def add_topic(self, topic: str): 1281 """ 1282 Adds a topic to the repository. 1283 1284 See https://hub.allspice.io/api/swagger#/repository/repoAddTopic 1285 1286 :param topic: The topic to add. Topic names must consist only of 1287 lowercase letters, numnbers and dashes (-), and cannot start with 1288 dashes. Topic names also must be under 35 characters long. 1289 """ 1290 1291 url = self.REPO_ADD_TOPIC.format(owner=self.owner.username, repo=self.name, topic=topic) 1292 self.allspice_client.requests_put(url) 1293 1294 def create_release( 1295 self, 1296 tag_name: str, 1297 name: Optional[str] = None, 1298 body: Optional[str] = None, 1299 draft: bool = False, 1300 ): 1301 """ 1302 Create a release for this repository. The release will be created for 1303 the tag with the given name. If there is no tag with this name, create 1304 the tag first. 1305 1306 See https://hub.allspice.io/api/swagger#/repository/repoCreateRelease 1307 """ 1308 1309 url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name) 1310 data = { 1311 "tag_name": tag_name, 1312 "draft": draft, 1313 } 1314 if name is not None: 1315 data["name"] = name 1316 if body is not None: 1317 data["body"] = body 1318 response = self.allspice_client.requests_post(url, data) 1319 return Release.parse_response(self.allspice_client, response, self) 1320 1321 def get_releases( 1322 self, draft: Optional[bool] = None, pre_release: Optional[bool] = None 1323 ) -> List[Release]: 1324 """ 1325 Get the list of releases for this repository. 1326 1327 See https://hub.allspice.io/api/swagger#/repository/repoListReleases 1328 """ 1329 1330 data = {} 1331 1332 if draft is not None: 1333 data["draft"] = draft 1334 if pre_release is not None: 1335 data["pre-release"] = pre_release 1336 1337 url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name) 1338 responses = self.allspice_client.requests_get_paginated(url, params=data) 1339 1340 return [ 1341 Release.parse_response(self.allspice_client, response, self) for response in responses 1342 ] 1343 1344 def get_latest_release(self) -> Release: 1345 """ 1346 Get the latest release for this repository. 1347 1348 See https://hub.allspice.io/api/swagger#/repository/repoGetLatestRelease 1349 """ 1350 1351 url = self.REPO_GET_LATEST_RELEASE.format(owner=self.owner.username, repo=self.name) 1352 response = self.allspice_client.requests_get(url) 1353 release = Release.parse_response(self.allspice_client, response, self) 1354 return release 1355 1356 def get_release_by_tag(self, tag: str) -> Release: 1357 """ 1358 Get a release by its tag. 1359 1360 See https://hub.allspice.io/api/swagger#/repository/repoGetReleaseByTag 1361 """ 1362 1363 url = self.REPO_GET_RELEASE_BY_TAG.format( 1364 owner=self.owner.username, repo=self.name, tag=tag 1365 ) 1366 response = self.allspice_client.requests_get(url) 1367 release = Release.parse_response(self.allspice_client, response, self) 1368 return release 1369 1370 def get_commit_statuses( 1371 self, 1372 commit: Union[str, Commit], 1373 sort: Optional[CommitStatusSort] = None, 1374 state: Optional[CommitStatusState] = None, 1375 ) -> List[CommitStatus]: 1376 """ 1377 Get a list of statuses for a commit. 1378 1379 This is roughly equivalent to the Commit.get_statuses method, but this 1380 method allows you to sort and filter commits and is more convenient if 1381 you have a commit SHA and don't need to get the commit itself. 1382 1383 See https://hub.allspice.io/api/swagger#/repository/repoListStatuses 1384 """ 1385 1386 if isinstance(commit, Commit): 1387 commit = commit.sha 1388 1389 params = {} 1390 if sort is not None: 1391 params["sort"] = sort.value 1392 if state is not None: 1393 params["state"] = state.value 1394 1395 url = self.REPO_GET_COMMIT_STATUS.format( 1396 owner=self.owner.username, repo=self.name, sha=commit 1397 ) 1398 response = self.allspice_client.requests_get_paginated(url, params=params) 1399 return [CommitStatus.parse_response(self.allspice_client, status) for status in response] 1400 1401 def create_commit_status( 1402 self, 1403 commit: Union[str, Commit], 1404 context: Optional[str] = None, 1405 description: Optional[str] = None, 1406 state: Optional[CommitStatusState] = None, 1407 target_url: Optional[str] = None, 1408 ) -> CommitStatus: 1409 """ 1410 Create a status on a commit. 1411 1412 See https://hub.allspice.io/api/swagger#/repository/repoCreateStatus 1413 """ 1414 1415 if isinstance(commit, Commit): 1416 commit = commit.sha 1417 1418 data = {} 1419 if context is not None: 1420 data["context"] = context 1421 if description is not None: 1422 data["description"] = description 1423 if state is not None: 1424 data["state"] = state.value 1425 if target_url is not None: 1426 data["target_url"] = target_url 1427 1428 url = self.REPO_GET_COMMIT_STATUS.format( 1429 owner=self.owner.username, repo=self.name, sha=commit 1430 ) 1431 response = self.allspice_client.requests_post(url, data=data) 1432 return CommitStatus.parse_response(self.allspice_client, response) 1433 1434 def delete(self): 1435 self.allspice_client.requests_delete( 1436 Repository.REPO_DELETE % (self.owner.username, self.name) 1437 ) 1438 self.deleted = True 1439 1440 1441class Milestone(ApiObject): 1442 allow_merge_commits: Any 1443 allow_rebase: Any 1444 allow_rebase_explicit: Any 1445 allow_squash_merge: Any 1446 archived: Any 1447 closed_at: Any 1448 closed_issues: int 1449 created_at: str 1450 default_branch: Any 1451 description: str 1452 due_on: Any 1453 has_issues: Any 1454 has_pull_requests: Any 1455 has_wiki: Any 1456 id: int 1457 ignore_whitespace_conflicts: Any 1458 name: Any 1459 open_issues: int 1460 private: Any 1461 state: str 1462 title: str 1463 updated_at: str 1464 website: Any 1465 1466 API_OBJECT = """/repos/{owner}/{repo}/milestones/{number}""" # <owner, repo> 1467 1468 def __init__(self, allspice_client): 1469 super().__init__(allspice_client) 1470 1471 def __eq__(self, other): 1472 if not isinstance(other, Milestone): 1473 return False 1474 return self.allspice_client == other.allspice_client and self.id == other.id 1475 1476 def __hash__(self): 1477 return hash(self.allspice_client) ^ hash(self.id) 1478 1479 _fields_to_parsers: ClassVar[dict] = { 1480 "closed_at": lambda _, t: Util.convert_time(t), 1481 "due_on": lambda _, t: Util.convert_time(t), 1482 } 1483 1484 _patchable_fields: ClassVar[set[str]] = { 1485 "allow_merge_commits", 1486 "allow_rebase", 1487 "allow_rebase_explicit", 1488 "allow_squash_merge", 1489 "archived", 1490 "default_branch", 1491 "description", 1492 "has_issues", 1493 "has_pull_requests", 1494 "has_wiki", 1495 "ignore_whitespace_conflicts", 1496 "name", 1497 "private", 1498 "website", 1499 } 1500 1501 @classmethod 1502 def request(cls, allspice_client, owner: str, repo: str, number: str): 1503 return cls._request(allspice_client, {"owner": owner, "repo": repo, "number": number}) 1504 1505 1506class Attachment(ReadonlyApiObject): 1507 """ 1508 An asset attached to a comment. 1509 1510 You cannot edit or delete the attachment from this object - see the instance methods 1511 Comment.edit_attachment and delete_attachment for that. 1512 """ 1513 1514 browser_download_url: str 1515 created_at: str 1516 download_count: int 1517 id: int 1518 name: str 1519 size: int 1520 uuid: str 1521 1522 def __init__(self, allspice_client): 1523 super().__init__(allspice_client) 1524 1525 def __eq__(self, other): 1526 if not isinstance(other, Attachment): 1527 return False 1528 1529 return self.uuid == other.uuid 1530 1531 def __hash__(self): 1532 return hash(self.uuid) 1533 1534 def download_to_file(self, io: IO): 1535 """ 1536 Download the raw, binary data of this Attachment to a file-like object. 1537 1538 Example: 1539 1540 with open("my_file.zip", "wb") as f: 1541 attachment.download_to_file(f) 1542 1543 :param io: The file-like object to write the data to. 1544 """ 1545 1546 response = self.allspice_client.requests.get( 1547 self.browser_download_url, 1548 headers=self.allspice_client.headers, 1549 stream=True, 1550 ) 1551 # 4kb chunks 1552 for chunk in response.iter_content(chunk_size=4096): 1553 if chunk: 1554 io.write(chunk) 1555 1556 1557class Comment(ApiObject): 1558 assets: List[Union[Any, Dict[str, Union[int, str]]]] 1559 body: str 1560 created_at: datetime 1561 html_url: str 1562 id: int 1563 issue_url: str 1564 original_author: str 1565 original_author_id: int 1566 pull_request_url: str 1567 updated_at: datetime 1568 user: User 1569 1570 API_OBJECT = """/repos/{owner}/{repo}/issues/comments/{id}""" 1571 GET_ATTACHMENTS_PATH = """/repos/{owner}/{repo}/issues/comments/{id}/assets""" 1572 ATTACHMENT_PATH = """/repos/{owner}/{repo}/issues/comments/{id}/assets/{attachment_id}""" 1573 1574 def __init__(self, allspice_client): 1575 super().__init__(allspice_client) 1576 1577 def __eq__(self, other): 1578 if not isinstance(other, Comment): 1579 return False 1580 return self.repository == other.repository and self.id == other.id 1581 1582 def __hash__(self): 1583 return hash(self.repository) ^ hash(self.id) 1584 1585 @classmethod 1586 def request(cls, allspice_client, owner: str, repo: str, id: str) -> "Comment": 1587 return cls._request(allspice_client, {"owner": owner, "repo": repo, "id": id}) 1588 1589 _fields_to_parsers: ClassVar[dict] = { 1590 "user": lambda allspice_client, r: User.parse_response(allspice_client, r), 1591 "created_at": lambda _, t: Util.convert_time(t), 1592 "updated_at": lambda _, t: Util.convert_time(t), 1593 } 1594 1595 _patchable_fields: ClassVar[set[str]] = {"body"} 1596 1597 @property 1598 def parent_url(self) -> str: 1599 """URL of the parent of this comment (the issue or the pull request)""" 1600 1601 if self.issue_url is not None and self.issue_url != "": 1602 return self.issue_url 1603 else: 1604 return self.pull_request_url 1605 1606 @cached_property 1607 def repository(self) -> Repository: 1608 """The repository this comment was posted on.""" 1609 1610 owner_name, repo_name = self.parent_url.split("/")[-4:-2] 1611 return Repository.request(self.allspice_client, owner_name, repo_name) 1612 1613 def __fields_for_path(self): 1614 return { 1615 "owner": self.repository.owner.username, 1616 "repo": self.repository.name, 1617 "id": self.id, 1618 } 1619 1620 def commit(self): 1621 self._commit(self.__fields_for_path()) 1622 1623 def delete(self): 1624 self.allspice_client.requests_delete(self.API_OBJECT.format(**self.__fields_for_path())) 1625 self.deleted = True 1626 1627 def get_attachments(self) -> List[Attachment]: 1628 """ 1629 Get all attachments on this comment. This returns Attachment objects, which 1630 contain a link to download the attachment. 1631 1632 https://hub.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments 1633 """ 1634 1635 results = self.allspice_client.requests_get( 1636 self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()) 1637 ) 1638 return [Attachment.parse_response(self.allspice_client, result) for result in results] 1639 1640 def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment: 1641 """ 1642 Create an attachment on this comment. 1643 1644 https://hub.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment 1645 1646 :param file: The file to attach. This should be a file-like object. 1647 :param name: The name of the file. If not provided, the name of the file will be 1648 used. 1649 :return: The created attachment. 1650 """ 1651 1652 args: dict[str, Any] = { 1653 "files": {"attachment": file}, 1654 } 1655 if name is not None: 1656 args["params"] = {"name": name} 1657 1658 result = self.allspice_client.requests_post( 1659 self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()), 1660 **args, 1661 ) 1662 return Attachment.parse_response(self.allspice_client, result) 1663 1664 def edit_attachment(self, attachment: Attachment, data: dict) -> Attachment: 1665 """ 1666 Edit an attachment. 1667 1668 The list of params that can be edited is available at 1669 https://hub.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment 1670 1671 :param attachment: The attachment to be edited 1672 :param data: The data parameter should be a dictionary of the fields to edit. 1673 :return: The edited attachment 1674 """ 1675 1676 args = { 1677 **self.__fields_for_path(), 1678 "attachment_id": attachment.id, 1679 } 1680 result = self.allspice_client.requests_patch( 1681 self.ATTACHMENT_PATH.format(**args), 1682 data=data, 1683 ) 1684 return Attachment.parse_response(self.allspice_client, result) 1685 1686 def delete_attachment(self, attachment: Attachment): 1687 """https://hub.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment""" 1688 1689 args = { 1690 **self.__fields_for_path(), 1691 "attachment_id": attachment.id, 1692 } 1693 self.allspice_client.requests_delete(self.ATTACHMENT_PATH.format(**args)) 1694 attachment.deleted = True 1695 1696 1697class Commit(ReadonlyApiObject): 1698 author: User 1699 commit: Dict[str, Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]] 1700 committer: Dict[str, Union[int, str, bool]] 1701 created: str 1702 files: List[Dict[str, str]] 1703 html_url: str 1704 inner_commit: Dict[str, Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]] 1705 parents: List[Union[Dict[str, str], Any]] 1706 sha: str 1707 stats: Dict[str, int] 1708 url: str 1709 1710 API_OBJECT = """/repos/{owner}/{repo}/commits/{sha}""" 1711 COMMIT_GET_STATUS = """/repos/{owner}/{repo}/commits/{sha}/status""" 1712 COMMIT_GET_STATUSES = """/repos/{owner}/{repo}/commits/{sha}/statuses""" 1713 1714 # Regex to extract owner and repo names from the url property 1715 URL_REGEXP = re.compile(r"/repos/([^/]+)/([^/]+)/git/commits") 1716 1717 def __init__(self, allspice_client): 1718 super().__init__(allspice_client) 1719 1720 _fields_to_parsers: ClassVar[dict] = { 1721 # NOTE: api may return None for commiters that are no allspice users 1722 "author": lambda allspice_client, u: ( 1723 User.parse_response(allspice_client, u) if u else None 1724 ) 1725 } 1726 1727 def __eq__(self, other): 1728 if not isinstance(other, Commit): 1729 return False 1730 return self.sha == other.sha 1731 1732 def __hash__(self): 1733 return hash(self.sha) 1734 1735 @classmethod 1736 def parse_response(cls, allspice_client, result) -> "Commit": 1737 commit_cache = result["commit"] 1738 api_object = cls(allspice_client) 1739 cls._initialize(allspice_client, api_object, result) 1740 # inner_commit for legacy reasons 1741 Commit._add_read_property("inner_commit", commit_cache, api_object) 1742 return api_object 1743 1744 def get_status(self) -> CommitCombinedStatus: 1745 """ 1746 Get a combined status consisting of all statues on this commit. 1747 1748 Note that the returned object is a CommitCombinedStatus object, which 1749 also contains a list of all statuses on the commit. 1750 1751 https://hub.allspice.io/api/swagger#/repository/repoGetCommitStatus 1752 """ 1753 1754 result = self.allspice_client.requests_get( 1755 self.COMMIT_GET_STATUS.format(**self._fields_for_path) 1756 ) 1757 return CommitCombinedStatus.parse_response(self.allspice_client, result) 1758 1759 def get_statuses(self) -> List[CommitStatus]: 1760 """ 1761 Get a list of all statuses on this commit. 1762 1763 https://hub.allspice.io/api/swagger#/repository/repoListCommitStatuses 1764 """ 1765 1766 results = self.allspice_client.requests_get( 1767 self.COMMIT_GET_STATUSES.format(**self._fields_for_path) 1768 ) 1769 return [CommitStatus.parse_response(self.allspice_client, result) for result in results] 1770 1771 @cached_property 1772 def _fields_for_path(self) -> dict[str, str]: 1773 matches = self.URL_REGEXP.search(self.url) 1774 if not matches: 1775 raise ValueError(f"Invalid commit URL: {self.url}") 1776 1777 return { 1778 "owner": matches.group(1), 1779 "repo": matches.group(2), 1780 "sha": self.sha, 1781 } 1782 1783 1784class CommitStatusState(Enum): 1785 PENDING = "pending" 1786 SUCCESS = "success" 1787 ERROR = "error" 1788 FAILURE = "failure" 1789 WARNING = "warning" 1790 1791 @classmethod 1792 def try_init(cls, value: str) -> CommitStatusState | str: 1793 """ 1794 Try converting a string to the enum, and if that fails, return the 1795 string itself. 1796 """ 1797 1798 try: 1799 return cls(value) 1800 except ValueError: 1801 return value 1802 1803 1804class CommitStatus(ReadonlyApiObject): 1805 context: str 1806 created_at: str 1807 creator: User 1808 description: str 1809 id: int 1810 status: CommitStatusState 1811 target_url: str 1812 updated_at: str 1813 url: str 1814 1815 def __init__(self, allspice_client): 1816 super().__init__(allspice_client) 1817 1818 _fields_to_parsers: ClassVar[dict] = { 1819 # Gitea/ASH doesn't actually validate that the status is a "valid" 1820 # status, so we can expect empty or unknown strings in the status field. 1821 "status": lambda _, s: CommitStatusState.try_init(s), 1822 "creator": lambda allspice_client, u: ( 1823 User.parse_response(allspice_client, u) if u else None 1824 ), 1825 } 1826 1827 def __eq__(self, other): 1828 if not isinstance(other, CommitStatus): 1829 return False 1830 return self.id == other.id 1831 1832 def __hash__(self): 1833 return hash(self.id) 1834 1835 1836class CommitCombinedStatus(ReadonlyApiObject): 1837 commit_url: str 1838 repository: Repository 1839 sha: str 1840 state: CommitStatusState 1841 statuses: List["CommitStatus"] 1842 total_count: int 1843 url: str 1844 1845 def __init__(self, allspice_client): 1846 super().__init__(allspice_client) 1847 1848 _fields_to_parsers: ClassVar[dict] = { 1849 # See CommitStatus 1850 "state": lambda _, s: CommitStatusState.try_init(s), 1851 "statuses": lambda allspice_client, statuses: [ 1852 CommitStatus.parse_response(allspice_client, status) for status in statuses 1853 ], 1854 "repository": lambda allspice_client, r: Repository.parse_response(allspice_client, r), 1855 } 1856 1857 def __eq__(self, other): 1858 if not isinstance(other, CommitCombinedStatus): 1859 return False 1860 return self.sha == other.sha 1861 1862 def __hash__(self): 1863 return hash(self.sha) 1864 1865 @classmethod 1866 def parse_response(cls, allspice_client, result) -> "CommitCombinedStatus": 1867 api_object = cls(allspice_client) 1868 cls._initialize(allspice_client, api_object, result) 1869 return api_object 1870 1871 1872class Issue(ApiObject): 1873 assets: List[Any] 1874 assignee: Any 1875 assignees: Any 1876 body: str 1877 closed_at: Any 1878 comments: int 1879 created_at: str 1880 due_date: Any 1881 html_url: str 1882 id: int 1883 is_locked: bool 1884 labels: List[Any] 1885 milestone: Optional["Milestone"] 1886 number: int 1887 original_author: str 1888 original_author_id: int 1889 pin_order: int 1890 pull_request: Any 1891 ref: str 1892 repository: Dict[str, Union[int, str]] 1893 state: str 1894 title: str 1895 updated_at: str 1896 url: str 1897 user: User 1898 1899 API_OBJECT = """/repos/{owner}/{repo}/issues/{index}""" # <owner, repo, index> 1900 GET_TIME = """/repos/%s/%s/issues/%s/times""" # <owner, repo, index> 1901 GET_COMMENTS = """/repos/{owner}/{repo}/issues/{index}/comments""" 1902 CREATE_ISSUE = """/repos/{owner}/{repo}/issues""" 1903 1904 OPENED = "open" 1905 CLOSED = "closed" 1906 1907 def __init__(self, allspice_client): 1908 super().__init__(allspice_client) 1909 1910 def __eq__(self, other): 1911 if not isinstance(other, Issue): 1912 return False 1913 return self.repository == other.repository and self.id == other.id 1914 1915 def __hash__(self): 1916 return hash(self.repository) ^ hash(self.id) 1917 1918 _fields_to_parsers: ClassVar[dict] = { 1919 "milestone": lambda allspice_client, m: Milestone.parse_response(allspice_client, m), 1920 "user": lambda allspice_client, u: User.parse_response(allspice_client, u), 1921 "assignee": lambda allspice_client, u: User.parse_response(allspice_client, u), 1922 "assignees": lambda allspice_client, us: [ 1923 User.parse_response(allspice_client, u) for u in us 1924 ], 1925 "state": lambda _, s: (Issue.CLOSED if s == "closed" else Issue.OPENED), 1926 } 1927 1928 _parsers_to_fields: ClassVar[dict] = { 1929 "milestone": lambda m: m.id, 1930 } 1931 1932 _patchable_fields: ClassVar[set[str]] = { 1933 "assignee", 1934 "assignees", 1935 "body", 1936 "due_date", 1937 "milestone", 1938 "state", 1939 "title", 1940 } 1941 1942 def commit(self): 1943 args = { 1944 "owner": self.repository.owner.username, 1945 "repo": self.repository.name, 1946 "index": self.number, 1947 } 1948 self._commit(args) 1949 1950 @classmethod 1951 def request(cls, allspice_client, owner: str, repo: str, number: str): 1952 api_object = cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number}) 1953 # The repository in the response is a RepositoryMeta object, so request 1954 # the full repository object and add it to the issue object. 1955 repository = Repository.request(allspice_client, owner, repo) 1956 setattr(api_object, "_repository", repository) 1957 # For legacy reasons 1958 cls._add_read_property("repo", repository, api_object) 1959 return api_object 1960 1961 @classmethod 1962 def create_issue(cls, allspice_client, repo: Repository, title: str, body: str = ""): 1963 args = {"owner": repo.owner.username, "repo": repo.name} 1964 data = {"title": title, "body": body} 1965 result = allspice_client.requests_post(Issue.CREATE_ISSUE.format(**args), data=data) 1966 issue = Issue.parse_response(allspice_client, result) 1967 setattr(issue, "_repository", repo) 1968 cls._add_read_property("repo", repo, issue) 1969 return issue 1970 1971 @property 1972 def owner(self) -> Organization | User: 1973 return self.repository.owner 1974 1975 def get_time_sum(self, user: User) -> int: 1976 results = self.allspice_client.requests_get( 1977 Issue.GET_TIME % (self.owner.username, self.repository.name, self.number) 1978 ) 1979 return sum(result["time"] for result in results if result and result["user_id"] == user.id) 1980 1981 def get_times(self) -> Optional[Dict]: 1982 return self.allspice_client.requests_get( 1983 Issue.GET_TIME % (self.owner.username, self.repository.name, self.number) 1984 ) 1985 1986 def delete_time(self, time_id: str): 1987 path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times/{time_id}" 1988 self.allspice_client.requests_delete(path) 1989 1990 def add_time(self, time: int, created: Optional[str] = None, user_name: Optional[User] = None): 1991 path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times" 1992 self.allspice_client.requests_post( 1993 path, data={"created": created, "time": int(time), "user_name": user_name} 1994 ) 1995 1996 def get_comments(self) -> List[Comment]: 1997 """https://hub.allspice.io/api/swagger#/issue/issueGetComments""" 1998 1999 results = self.allspice_client.requests_get( 2000 self.GET_COMMENTS.format( 2001 owner=self.owner.username, repo=self.repository.name, index=self.number 2002 ) 2003 ) 2004 2005 return [Comment.parse_response(self.allspice_client, result) for result in results] 2006 2007 def create_comment(self, body: str) -> Comment: 2008 """https://hub.allspice.io/api/swagger#/issue/issueCreateComment""" 2009 2010 path = self.GET_COMMENTS.format( 2011 owner=self.owner.username, repo=self.repository.name, index=self.number 2012 ) 2013 2014 response = self.allspice_client.requests_post(path, data={"body": body}) 2015 return Comment.parse_response(self.allspice_client, response) 2016 2017 2018class DesignReview(ApiObject): 2019 """ 2020 A Design Review. See 2021 https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest. 2022 2023 Note: The base and head fields are not `Branch` objects - they are plain strings 2024 referring to the branch names. This is because DRs can exist for branches that have 2025 been deleted, which don't have an associated `Branch` object from the API. You can use 2026 the `Repository.get_branch` method to get a `Branch` object for a branch if you know 2027 it exists. 2028 """ 2029 2030 allow_maintainer_edit: bool 2031 allow_maintainer_edits: Any 2032 assignee: User 2033 assignees: List["User"] 2034 base: str 2035 body: str 2036 closed_at: Any 2037 comments: int 2038 created_at: str 2039 diff_url: str 2040 due_date: Optional[str] 2041 head: str 2042 html_url: str 2043 id: int 2044 is_locked: bool 2045 labels: List[Any] 2046 merge_base: str 2047 merge_commit_sha: Any 2048 mergeable: bool 2049 merged: bool 2050 merged_at: Any 2051 merged_by: Any 2052 milestone: Any 2053 number: int 2054 patch_url: str 2055 pin_order: int 2056 repository: Optional["Repository"] 2057 requested_reviewers: Any 2058 state: str 2059 title: str 2060 updated_at: str 2061 url: str 2062 user: User 2063 2064 API_OBJECT = "/repos/{owner}/{repo}/pulls/{index}" 2065 MERGE_DESIGN_REVIEW = "/repos/{owner}/{repo}/pulls/{index}/merge" 2066 GET_COMMENTS = "/repos/{owner}/{repo}/issues/{index}/comments" 2067 2068 OPEN = "open" 2069 CLOSED = "closed" 2070 2071 class MergeType(Enum): 2072 MERGE = "merge" 2073 REBASE = "rebase" 2074 REBASE_MERGE = "rebase-merge" 2075 SQUASH = "squash" 2076 MANUALLY_MERGED = "manually-merged" 2077 2078 def __init__(self, allspice_client): 2079 super().__init__(allspice_client) 2080 2081 def __eq__(self, other): 2082 if not isinstance(other, DesignReview): 2083 return False 2084 return self.repository == other.repository and self.id == other.id 2085 2086 def __hash__(self): 2087 return hash(self.repository) ^ hash(self.id) 2088 2089 @classmethod 2090 def parse_response(cls, allspice_client, result) -> "DesignReview": 2091 api_object = super().parse_response(allspice_client, result) 2092 cls._add_read_property( 2093 "repository", 2094 Repository.parse_response(allspice_client, result["base"]["repo"]), 2095 api_object, 2096 ) 2097 2098 return api_object 2099 2100 @classmethod 2101 def request(cls, allspice_client, owner: str, repo: str, number: str): 2102 """See https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest""" 2103 return cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number}) 2104 2105 _fields_to_parsers: ClassVar[dict] = { 2106 "assignee": lambda allspice_client, u: User.parse_response(allspice_client, u), 2107 "assignees": lambda allspice_client, us: [ 2108 User.parse_response(allspice_client, u) for u in us 2109 ], 2110 "base": lambda _, b: b["ref"], 2111 "head": lambda _, h: h["ref"], 2112 "merged_by": lambda allspice_client, u: User.parse_response(allspice_client, u), 2113 "milestone": lambda allspice_client, m: Milestone.parse_response(allspice_client, m), 2114 "user": lambda allspice_client, u: User.parse_response(allspice_client, u), 2115 } 2116 2117 _patchable_fields: ClassVar[set[str]] = { 2118 "allow_maintainer_edits", 2119 "assignee", 2120 "assignees", 2121 "base", 2122 "body", 2123 "due_date", 2124 "milestone", 2125 "state", 2126 "title", 2127 } 2128 2129 _parsers_to_fields: ClassVar[dict] = { 2130 "assignee": lambda u: u.username, 2131 "assignees": lambda us: [u.username for u in us], 2132 "base": lambda b: b.name if isinstance(b, Branch) else b, 2133 "milestone": lambda m: m.id, 2134 } 2135 2136 def commit(self): 2137 data = self.get_dirty_fields() 2138 if "due_date" in data and data["due_date"] is None: 2139 data["unset_due_date"] = True 2140 2141 args = { 2142 "owner": self.repository.owner.username, 2143 "repo": self.repository.name, 2144 "index": self.number, 2145 } 2146 self._commit(args, data) 2147 2148 def merge(self, merge_type: MergeType): 2149 """ 2150 Merge the pull request. See 2151 https://hub.allspice.io/api/swagger#/repository/repoMergePullRequest 2152 2153 :param merge_type: The type of merge to perform. See the MergeType enum. 2154 """ 2155 2156 self.allspice_client.requests_put( 2157 self.MERGE_DESIGN_REVIEW.format( 2158 owner=self.repository.owner.username, 2159 repo=self.repository.name, 2160 index=self.number, 2161 ), 2162 data={"Do": merge_type.value}, 2163 ) 2164 2165 def get_comments(self) -> List[Comment]: 2166 """ 2167 Get the comments on this pull request, but not specifically on a review. 2168 2169 https://hub.allspice.io/api/swagger#/issue/issueGetComments 2170 2171 :return: A list of comments on this pull request. 2172 """ 2173 2174 results = self.allspice_client.requests_get( 2175 self.GET_COMMENTS.format( 2176 owner=self.repository.owner.username, 2177 repo=self.repository.name, 2178 index=self.number, 2179 ) 2180 ) 2181 return [Comment.parse_response(self.allspice_client, result) for result in results] 2182 2183 def create_comment(self, body: str): 2184 """ 2185 Create a comment on this pull request. This uses the same endpoint as the 2186 comments on issues, and will not be associated with any reviews. 2187 2188 https://hub.allspice.io/api/swagger#/issue/issueCreateComment 2189 2190 :param body: The body of the comment. 2191 :return: The comment that was created. 2192 """ 2193 2194 result = self.allspice_client.requests_post( 2195 self.GET_COMMENTS.format( 2196 owner=self.repository.owner.username, 2197 repo=self.repository.name, 2198 index=self.number, 2199 ), 2200 data={"body": body}, 2201 ) 2202 return Comment.parse_response(self.allspice_client, result) 2203 2204 2205class Team(ApiObject): 2206 can_create_org_repo: bool 2207 description: str 2208 id: int 2209 includes_all_repositories: bool 2210 name: str 2211 organization: Optional["Organization"] 2212 permission: str 2213 units: List[str] 2214 units_map: Dict[str, str] 2215 2216 API_OBJECT = """/teams/{id}""" # <id> 2217 ADD_REPO = """/teams/%s/repos/%s/%s""" # <id, org, repo> 2218 TEAM_DELETE = """/teams/%s""" # <id> 2219 GET_MEMBERS = """/teams/%s/members""" # <id> 2220 GET_REPOS = """/teams/%s/repos""" # <id> 2221 2222 def __init__(self, allspice_client): 2223 super().__init__(allspice_client) 2224 2225 def __eq__(self, other): 2226 if not isinstance(other, Team): 2227 return False 2228 return self.organization == other.organization and self.id == other.id 2229 2230 def __hash__(self): 2231 return hash(self.organization) ^ hash(self.id) 2232 2233 _fields_to_parsers: ClassVar[dict] = { 2234 "organization": lambda allspice_client, o: Organization.parse_response(allspice_client, o) 2235 } 2236 2237 _patchable_fields: ClassVar[set[str]] = { 2238 "can_create_org_repo", 2239 "description", 2240 "includes_all_repositories", 2241 "name", 2242 "permission", 2243 "units", 2244 "units_map", 2245 } 2246 2247 @classmethod 2248 def request(cls, allspice_client, id: int): 2249 return cls._request(allspice_client, {"id": id}) 2250 2251 def commit(self): 2252 args = {"id": self.id} 2253 self._commit(args) 2254 2255 def add_user(self, user: User): 2256 """https://hub.allspice.io/api/swagger#/organization/orgAddTeamMember""" 2257 url = f"/teams/{self.id}/members/{user.login}" 2258 self.allspice_client.requests_put(url) 2259 2260 def add_repo(self, org: Organization, repo: Union[Repository, str]): 2261 if isinstance(repo, Repository): 2262 repo_name = repo.name 2263 else: 2264 repo_name = repo 2265 self.allspice_client.requests_put(Team.ADD_REPO % (self.id, org.username, repo_name)) 2266 2267 def get_members(self): 2268 """Get all users assigned to the team.""" 2269 results = self.allspice_client.requests_get_paginated( 2270 Team.GET_MEMBERS % self.id, 2271 ) 2272 return [User.parse_response(self.allspice_client, result) for result in results] 2273 2274 def get_repos(self): 2275 """Get all repos of this Team.""" 2276 results = self.allspice_client.requests_get(Team.GET_REPOS % self.id) 2277 return [Repository.parse_response(self.allspice_client, result) for result in results] 2278 2279 def delete(self): 2280 self.allspice_client.requests_delete(Team.TEAM_DELETE % self.id) 2281 self.deleted = True 2282 2283 def remove_team_member(self, user_name: str): 2284 url = f"/teams/{self.id}/members/{user_name}" 2285 self.allspice_client.requests_delete(url) 2286 2287 2288class Release(ApiObject): 2289 """ 2290 A release on a repo. 2291 """ 2292 2293 assets: List[Union[Any, Dict[str, Union[int, str]], "ReleaseAsset"]] 2294 author: User 2295 body: str 2296 created_at: str 2297 draft: bool 2298 html_url: str 2299 id: int 2300 name: str 2301 prerelease: bool 2302 published_at: str 2303 repo: Optional["Repository"] 2304 repository: Optional["Repository"] 2305 tag_name: str 2306 tarball_url: str 2307 target_commitish: str 2308 upload_url: str 2309 url: str 2310 zipball_url: str 2311 2312 API_OBJECT = "/repos/{owner}/{repo}/releases/{id}" 2313 RELEASE_CREATE_ASSET = "/repos/{owner}/{repo}/releases/{id}/assets" 2314 # Note that we don't strictly need the get_assets route, as the release 2315 # object already contains the assets. 2316 2317 def __init__(self, allspice_client): 2318 super().__init__(allspice_client) 2319 2320 def __eq__(self, other): 2321 if not isinstance(other, Release): 2322 return False 2323 return self.repo == other.repo and self.id == other.id 2324 2325 def __hash__(self): 2326 return hash(self.repo) ^ hash(self.id) 2327 2328 _fields_to_parsers: ClassVar[dict] = { 2329 "author": lambda allspice_client, author: User.parse_response(allspice_client, author), 2330 } 2331 _patchable_fields: ClassVar[set[str]] = { 2332 "body", 2333 "draft", 2334 "name", 2335 "prerelease", 2336 "tag_name", 2337 "target_commitish", 2338 } 2339 2340 @classmethod 2341 def parse_response(cls, allspice_client, result, repo) -> Release: 2342 release = super().parse_response(allspice_client, result) 2343 Release._add_read_property("repository", repo, release) 2344 # For legacy reasons 2345 Release._add_read_property("repo", repo, release) 2346 setattr( 2347 release, 2348 "_assets", 2349 [ 2350 ReleaseAsset.parse_response(allspice_client, asset, release) 2351 for asset in result["assets"] 2352 ], 2353 ) 2354 return release 2355 2356 @classmethod 2357 def request( 2358 cls, 2359 allspice_client, 2360 owner: str, 2361 repo: str, 2362 id: Optional[int] = None, 2363 ) -> Release: 2364 args = {"owner": owner, "repo": repo, "id": id} 2365 release_response = cls._get_gitea_api_object(allspice_client, args) 2366 repository = Repository.request(allspice_client, owner, repo) 2367 release = cls.parse_response(allspice_client, release_response, repository) 2368 return release 2369 2370 def commit(self): 2371 args = {"owner": self.repo.owner.name, "repo": self.repo.name, "id": self.id} 2372 self._commit(args) 2373 2374 def create_asset(self, file: IO, name: Optional[str] = None) -> ReleaseAsset: 2375 """ 2376 Create an asset for this release. 2377 2378 https://hub.allspice.io/api/swagger#/repository/repoCreateReleaseAsset 2379 2380 :param file: The file to upload. This should be a file-like object. 2381 :param name: The name of the file. 2382 :return: The created asset. 2383 """ 2384 2385 args: dict[str, Any] = {"files": {"attachment": file}} 2386 if name is not None: 2387 args["params"] = {"name": name} 2388 2389 result = self.allspice_client.requests_post( 2390 self.RELEASE_CREATE_ASSET.format( 2391 owner=self.repo.owner.username, 2392 repo=self.repo.name, 2393 id=self.id, 2394 ), 2395 **args, 2396 ) 2397 return ReleaseAsset.parse_response(self.allspice_client, result, self) 2398 2399 def delete(self): 2400 args = {"owner": self.repo.owner.name, "repo": self.repo.name, "id": self.id} 2401 self.allspice_client.requests_delete(self.API_OBJECT.format(**args)) 2402 self.deleted = True 2403 2404 2405class ReleaseAsset(ApiObject): 2406 browser_download_url: str 2407 created_at: str 2408 download_count: int 2409 id: int 2410 name: str 2411 release: Optional["Release"] 2412 size: int 2413 uuid: str 2414 2415 API_OBJECT = "/repos/{owner}/{repo}/releases/{release_id}/assets/{id}" 2416 2417 def __init__(self, allspice_client): 2418 super().__init__(allspice_client) 2419 2420 def __eq__(self, other): 2421 if not isinstance(other, ReleaseAsset): 2422 return False 2423 return self.release == other.release and self.id == other.id 2424 2425 def __hash__(self): 2426 return hash(self.release) ^ hash(self.id) 2427 2428 _fields_to_parsers: ClassVar[dict] = {} 2429 _patchable_fields: ClassVar[set[str]] = { 2430 "name", 2431 } 2432 2433 @classmethod 2434 def parse_response(cls, allspice_client, result, release) -> ReleaseAsset: 2435 asset = super().parse_response(allspice_client, result) 2436 ReleaseAsset._add_read_property("release", release, asset) 2437 return asset 2438 2439 @classmethod 2440 def request( 2441 cls, 2442 allspice_client, 2443 owner: str, 2444 repo: str, 2445 release_id: int, 2446 id: int, 2447 ) -> ReleaseAsset: 2448 args = {"owner": owner, "repo": repo, "release_id": release_id, "id": id} 2449 asset_response = cls._get_gitea_api_object(allspice_client, args) 2450 release = Release.request(allspice_client, owner, repo, release_id) 2451 asset = cls.parse_response(allspice_client, asset_response, release) 2452 return asset 2453 2454 def commit(self): 2455 args = { 2456 "owner": self.release.repo.owner, 2457 "repo": self.release.repo.name, 2458 "release_id": self.release.id, 2459 "id": self.id, 2460 } 2461 self._commit(args) 2462 2463 def download(self) -> bytes: 2464 """ 2465 Download the raw, binary data of this asset. 2466 2467 Note 1: if the file you are requesting is a text file, you might want to 2468 use .decode() on the result to get a string. For example: 2469 2470 asset.download().decode("utf-8") 2471 2472 Note 2: this method will store the entire file in memory. If you are 2473 downloading a large file, you might want to use download_to_file instead. 2474 """ 2475 2476 return self.allspice_client.requests.get( 2477 self.browser_download_url, 2478 headers=self.allspice_client.headers, 2479 ).content 2480 2481 def download_to_file(self, io: IO): 2482 """ 2483 Download the raw, binary data of this asset to a file-like object. 2484 2485 Example: 2486 2487 with open("my_file.zip", "wb") as f: 2488 asset.download_to_file(f) 2489 2490 :param io: The file-like object to write the data to. 2491 """ 2492 2493 response = self.allspice_client.requests.get( 2494 self.browser_download_url, 2495 headers=self.allspice_client.headers, 2496 stream=True, 2497 ) 2498 # 4kb chunks 2499 for chunk in response.iter_content(chunk_size=4096): 2500 if chunk: 2501 io.write(chunk) 2502 2503 def delete(self): 2504 args = { 2505 "owner": self.release.repo.owner.name, 2506 "repo": self.release.repo.name, 2507 "release_id": self.release.id, 2508 "id": self.id, 2509 } 2510 self.allspice_client.requests_delete(self.API_OBJECT.format(**args)) 2511 self.deleted = True 2512 2513 2514class Content(ReadonlyApiObject): 2515 content: Any 2516 download_url: str 2517 encoding: Any 2518 git_url: str 2519 html_url: str 2520 last_commit_sha: str 2521 name: str 2522 path: str 2523 sha: str 2524 size: int 2525 submodule_git_url: Any 2526 target: Any 2527 type: str 2528 url: str 2529 2530 FILE = "file" 2531 2532 def __init__(self, allspice_client): 2533 super().__init__(allspice_client) 2534 2535 def __eq__(self, other): 2536 if not isinstance(other, Content): 2537 return False 2538 2539 return self.sha == other.sha and self.name == other.name 2540 2541 def __hash__(self): 2542 return hash(self.sha) ^ hash(self.name) 2543 2544 2545Ref = Union[Branch, Commit, str] 2546 2547 2548class Util: 2549 @staticmethod 2550 def convert_time(time: str) -> datetime: 2551 """Parsing of strange Gitea time format ("%Y-%m-%dT%H:%M:%S:%z" but with ":" in time zone notation)""" 2552 try: 2553 return datetime.strptime(time[:-3] + "00", "%Y-%m-%dT%H:%M:%S%z") 2554 except ValueError: 2555 return datetime.strptime(time[:-3] + "00", "%Y-%m-%dT%H:%M:%S") 2556 2557 @staticmethod 2558 def format_time(time: datetime) -> str: 2559 """ 2560 Format a datetime object to Gitea's time format. 2561 2562 :param time: The time to format 2563 :return: Formatted time 2564 """ 2565 2566 return time.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S") + "Z" 2567 2568 @staticmethod 2569 def data_params_for_ref(ref: Optional[Ref]) -> Dict: 2570 """ 2571 Given a "ref", returns a dict with the ref parameter for the API call. 2572 2573 If the ref is None, returns an empty dict. You can pass this to the API 2574 directly. 2575 """ 2576 2577 if isinstance(ref, Branch): 2578 return {"ref": ref.name} 2579 elif isinstance(ref, Commit): 2580 return {"ref": ref.sha} 2581 elif ref: 2582 return {"ref": ref} 2583 else: 2584 return {}
33class Organization(ApiObject): 34 """see https://hub.allspice.io/api/swagger#/organization/orgGetAll""" 35 36 active: Optional[bool] 37 avatar_url: str 38 created: Optional[str] 39 description: str 40 email: str 41 followers_count: Optional[int] 42 following_count: Optional[int] 43 full_name: str 44 id: int 45 is_admin: Optional[bool] 46 language: Optional[str] 47 last_login: Optional[str] 48 location: str 49 login: Optional[str] 50 login_name: Optional[str] 51 name: Optional[str] 52 prohibit_login: Optional[bool] 53 repo_admin_change_team_access: Optional[bool] 54 restricted: Optional[bool] 55 starred_repos_count: Optional[int] 56 username: str 57 visibility: str 58 website: str 59 60 API_OBJECT = """/orgs/{name}""" # <org> 61 ORG_REPOS_REQUEST = """/orgs/%s/repos""" # <org> 62 ORG_TEAMS_REQUEST = """/orgs/%s/teams""" # <org> 63 ORG_TEAMS_CREATE = """/orgs/%s/teams""" # <org> 64 ORG_GET_MEMBERS = """/orgs/%s/members""" # <org> 65 ORG_IS_MEMBER = """/orgs/%s/members/%s""" # <org>, <username> 66 ORG_HEATMAP = """/users/%s/heatmap""" # <username> 67 68 def __init__(self, allspice_client): 69 super().__init__(allspice_client) 70 71 def __eq__(self, other): 72 if not isinstance(other, Organization): 73 return False 74 return self.allspice_client == other.allspice_client and self.name == other.name 75 76 def __hash__(self): 77 return hash(self.allspice_client) ^ hash(self.name) 78 79 @classmethod 80 def request(cls, allspice_client, name: str) -> Self: 81 return cls._request(allspice_client, {"name": name}) 82 83 @classmethod 84 def parse_response(cls, allspice_client, result) -> "Organization": 85 api_object = super().parse_response(allspice_client, result) 86 # add "name" field to make this behave similar to users for gitea < 1.18 87 # also necessary for repository-owner when org is repo owner 88 if not hasattr(api_object, "name"): 89 Organization._add_read_property("name", result["username"], api_object) 90 return api_object 91 92 _patchable_fields: ClassVar[set[str]] = { 93 "description", 94 "full_name", 95 "location", 96 "visibility", 97 "website", 98 } 99 100 def commit(self): 101 args = {"name": self.name} 102 self._commit(args) 103 104 def create_repo( 105 self, 106 repoName: str, 107 description: str = "", 108 private: bool = False, 109 autoInit=True, 110 gitignores: Optional[str] = None, 111 license: Optional[str] = None, 112 readme: str = "Default", 113 issue_labels: Optional[str] = None, 114 default_branch="master", 115 ): 116 """Create an organization Repository 117 118 Throws: 119 AlreadyExistsException: If the Repository exists already. 120 Exception: If something else went wrong. 121 """ 122 result = self.allspice_client.requests_post( 123 f"/orgs/{self.name}/repos", 124 data={ 125 "name": repoName, 126 "description": description, 127 "private": private, 128 "auto_init": autoInit, 129 "gitignores": gitignores, 130 "license": license, 131 "issue_labels": issue_labels, 132 "readme": readme, 133 "default_branch": default_branch, 134 }, 135 ) 136 if "id" in result: 137 self.allspice_client.logger.info("Successfully created Repository %s " % result["name"]) 138 else: 139 self.allspice_client.logger.error(result["message"]) 140 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 141 return Repository.parse_response(self.allspice_client, result) 142 143 def get_repositories(self) -> List["Repository"]: 144 results = self.allspice_client.requests_get_paginated( 145 Organization.ORG_REPOS_REQUEST % self.username 146 ) 147 return [Repository.parse_response(self.allspice_client, result) for result in results] 148 149 def get_repository(self, name) -> "Repository": 150 repos = self.get_repositories() 151 for repo in repos: 152 if repo.name == name: 153 return repo 154 raise NotFoundException("Repository %s not existent in organization." % name) 155 156 def get_teams(self) -> List["Team"]: 157 results = self.allspice_client.requests_get(Organization.ORG_TEAMS_REQUEST % self.username) 158 teams = [Team.parse_response(self.allspice_client, result) for result in results] 159 # organisation seems to be missing using this request, so we add org manually 160 for t in teams: 161 setattr(t, "_organization", self) 162 return teams 163 164 def get_team(self, name) -> "Team": 165 teams = self.get_teams() 166 for team in teams: 167 if team.name == name: 168 return team 169 raise NotFoundException("Team not existent in organization.") 170 171 def create_team( 172 self, 173 name: str, 174 description: str = "", 175 permission: str = "read", 176 can_create_org_repo: bool = False, 177 includes_all_repositories: bool = False, 178 units=( 179 "repo.code", 180 "repo.issues", 181 "repo.ext_issues", 182 "repo.wiki", 183 "repo.pulls", 184 "repo.releases", 185 "repo.ext_wiki", 186 ), 187 units_map={}, 188 ) -> "Team": 189 """Alias for AllSpice#create_team""" 190 # TODO: Move AllSpice#create_team to Organization#create_team and 191 # deprecate AllSpice#create_team. 192 return self.allspice_client.create_team( 193 org=self, 194 name=name, 195 description=description, 196 permission=permission, 197 can_create_org_repo=can_create_org_repo, 198 includes_all_repositories=includes_all_repositories, 199 units=units, 200 units_map=units_map, 201 ) 202 203 def get_members(self) -> List["User"]: 204 results = self.allspice_client.requests_get(Organization.ORG_GET_MEMBERS % self.username) 205 return [User.parse_response(self.allspice_client, result) for result in results] 206 207 def is_member(self, username) -> bool: 208 if isinstance(username, User): 209 username = username.username 210 try: 211 # returns 204 if its ok, 404 if its not 212 self.allspice_client.requests_get( 213 Organization.ORG_IS_MEMBER % (self.username, username) 214 ) 215 return True 216 except Exception: 217 return False 218 219 def remove_member(self, user: "User"): 220 path = f"/orgs/{self.username}/members/{user.username}" 221 self.allspice_client.requests_delete(path) 222 223 def delete(self): 224 """Delete this Organization. Invalidates this Objects data. Also deletes all Repositories owned by the User""" 225 for repo in self.get_repositories(): 226 repo.delete() 227 self.allspice_client.requests_delete(Organization.API_OBJECT.format(name=self.username)) 228 self.deleted = True 229 230 def get_heatmap(self) -> List[Tuple[datetime, int]]: 231 results = self.allspice_client.requests_get(User.USER_HEATMAP % self.username) 232 results = [ 233 (datetime.fromtimestamp(result["timestamp"]), result["contributions"]) 234 for result in results 235 ] 236 return results
see allspice.allspice.io/api/swagger#/organization/orgGetAll">https://huballspice.allspice.io/api/swagger#/organization/orgGetAll
83 @classmethod 84 def parse_response(cls, allspice_client, result) -> "Organization": 85 api_object = super().parse_response(allspice_client, result) 86 # add "name" field to make this behave similar to users for gitea < 1.18 87 # also necessary for repository-owner when org is repo owner 88 if not hasattr(api_object, "name"): 89 Organization._add_read_property("name", result["username"], api_object) 90 return api_object
104 def create_repo( 105 self, 106 repoName: str, 107 description: str = "", 108 private: bool = False, 109 autoInit=True, 110 gitignores: Optional[str] = None, 111 license: Optional[str] = None, 112 readme: str = "Default", 113 issue_labels: Optional[str] = None, 114 default_branch="master", 115 ): 116 """Create an organization Repository 117 118 Throws: 119 AlreadyExistsException: If the Repository exists already. 120 Exception: If something else went wrong. 121 """ 122 result = self.allspice_client.requests_post( 123 f"/orgs/{self.name}/repos", 124 data={ 125 "name": repoName, 126 "description": description, 127 "private": private, 128 "auto_init": autoInit, 129 "gitignores": gitignores, 130 "license": license, 131 "issue_labels": issue_labels, 132 "readme": readme, 133 "default_branch": default_branch, 134 }, 135 ) 136 if "id" in result: 137 self.allspice_client.logger.info("Successfully created Repository %s " % result["name"]) 138 else: 139 self.allspice_client.logger.error(result["message"]) 140 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 141 return Repository.parse_response(self.allspice_client, result)
Create an organization Repository
Throws: AlreadyExistsException: If the Repository exists already. Exception: If something else went wrong.
156 def get_teams(self) -> List["Team"]: 157 results = self.allspice_client.requests_get(Organization.ORG_TEAMS_REQUEST % self.username) 158 teams = [Team.parse_response(self.allspice_client, result) for result in results] 159 # organisation seems to be missing using this request, so we add org manually 160 for t in teams: 161 setattr(t, "_organization", self) 162 return teams
171 def create_team( 172 self, 173 name: str, 174 description: str = "", 175 permission: str = "read", 176 can_create_org_repo: bool = False, 177 includes_all_repositories: bool = False, 178 units=( 179 "repo.code", 180 "repo.issues", 181 "repo.ext_issues", 182 "repo.wiki", 183 "repo.pulls", 184 "repo.releases", 185 "repo.ext_wiki", 186 ), 187 units_map={}, 188 ) -> "Team": 189 """Alias for AllSpice#create_team""" 190 # TODO: Move AllSpice#create_team to Organization#create_team and 191 # deprecate AllSpice#create_team. 192 return self.allspice_client.create_team( 193 org=self, 194 name=name, 195 description=description, 196 permission=permission, 197 can_create_org_repo=can_create_org_repo, 198 includes_all_repositories=includes_all_repositories, 199 units=units, 200 units_map=units_map, 201 )
Alias for AllSpice#create_team
207 def is_member(self, username) -> bool: 208 if isinstance(username, User): 209 username = username.username 210 try: 211 # returns 204 if its ok, 404 if its not 212 self.allspice_client.requests_get( 213 Organization.ORG_IS_MEMBER % (self.username, username) 214 ) 215 return True 216 except Exception: 217 return False
223 def delete(self): 224 """Delete this Organization. Invalidates this Objects data. Also deletes all Repositories owned by the User""" 225 for repo in self.get_repositories(): 226 repo.delete() 227 self.allspice_client.requests_delete(Organization.API_OBJECT.format(name=self.username)) 228 self.deleted = True
Delete this Organization. Invalidates this Objects data. Also deletes all Repositories owned by the User
239class User(ApiObject): 240 active: bool 241 admin: Any 242 allow_create_organization: Any 243 allow_git_hook: Any 244 allow_import_local: Any 245 avatar_url: str 246 created: str 247 description: str 248 email: str 249 emails: List[Any] 250 followers_count: int 251 following_count: int 252 full_name: str 253 id: int 254 is_admin: bool 255 language: str 256 last_login: str 257 location: str 258 login: str 259 login_name: str 260 max_repo_creation: Any 261 must_change_password: Any 262 password: Any 263 prohibit_login: bool 264 restricted: bool 265 starred_repos_count: int 266 username: str 267 visibility: str 268 website: str 269 270 API_OBJECT = """/users/{name}""" # <org> 271 USER_MAIL = """/user/emails?sudo=%s""" # <name> 272 USER_PATCH = """/admin/users/%s""" # <username> 273 ADMIN_DELETE_USER = """/admin/users/%s""" # <username> 274 ADMIN_EDIT_USER = """/admin/users/{username}""" # <username> 275 USER_HEATMAP = """/users/%s/heatmap""" # <username> 276 277 def __init__(self, allspice_client): 278 super().__init__(allspice_client) 279 self._emails = [] 280 281 def __eq__(self, other): 282 if not isinstance(other, User): 283 return False 284 return self.allspice_client == other.allspice_client and self.id == other.id 285 286 def __hash__(self): 287 return hash(self.allspice_client) ^ hash(self.id) 288 289 @property 290 def emails(self): 291 self.__request_emails() 292 return self._emails 293 294 @classmethod 295 def request(cls, allspice_client, name: str) -> "User": 296 api_object = cls._request(allspice_client, {"name": name}) 297 return api_object 298 299 _patchable_fields: ClassVar[set[str]] = { 300 "active", 301 "admin", 302 "allow_create_organization", 303 "allow_git_hook", 304 "allow_import_local", 305 "email", 306 "full_name", 307 "location", 308 "login_name", 309 "max_repo_creation", 310 "must_change_password", 311 "password", 312 "prohibit_login", 313 "website", 314 } 315 316 def commit(self, login_name: str, source_id: int = 0): 317 """ 318 Unfortunately it is necessary to require the login name 319 as well as the login source (that is not supplied when getting a user) for 320 changing a user. 321 Usually source_id is 0 and the login_name is equal to the username. 322 """ 323 values = self.get_dirty_fields() 324 values.update( 325 # api-doc says that the "source_id" is necessary; works without though 326 {"login_name": login_name, "source_id": source_id} 327 ) 328 args = {"username": self.username} 329 self.allspice_client.requests_patch(User.ADMIN_EDIT_USER.format(**args), data=values) 330 self._dirty_fields = {} 331 332 def create_repo( 333 self, 334 repoName: str, 335 description: str = "", 336 private: bool = False, 337 autoInit=True, 338 gitignores: Optional[str] = None, 339 license: Optional[str] = None, 340 readme: str = "Default", 341 issue_labels: Optional[str] = None, 342 default_branch="master", 343 ): 344 """Create a user Repository 345 346 Throws: 347 AlreadyExistsException: If the Repository exists already. 348 Exception: If something else went wrong. 349 """ 350 result = self.allspice_client.requests_post( 351 "/user/repos", 352 data={ 353 "name": repoName, 354 "description": description, 355 "private": private, 356 "auto_init": autoInit, 357 "gitignores": gitignores, 358 "license": license, 359 "issue_labels": issue_labels, 360 "readme": readme, 361 "default_branch": default_branch, 362 }, 363 ) 364 if "id" in result: 365 self.allspice_client.logger.info("Successfully created Repository %s " % result["name"]) 366 else: 367 self.allspice_client.logger.error(result["message"]) 368 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 369 return Repository.parse_response(self.allspice_client, result) 370 371 def get_repositories(self) -> List["Repository"]: 372 """Get all Repositories owned by this User.""" 373 url = f"/users/{self.username}/repos" 374 results = self.allspice_client.requests_get_paginated(url) 375 return [Repository.parse_response(self.allspice_client, result) for result in results] 376 377 def get_orgs(self) -> List[Organization]: 378 """Get all Organizations this user is a member of.""" 379 url = f"/users/{self.username}/orgs" 380 results = self.allspice_client.requests_get_paginated(url) 381 return [Organization.parse_response(self.allspice_client, result) for result in results] 382 383 def get_teams(self) -> List["Team"]: 384 url = "/user/teams" 385 results = self.allspice_client.requests_get_paginated(url, sudo=self) 386 return [Team.parse_response(self.allspice_client, result) for result in results] 387 388 def get_accessible_repos(self) -> List["Repository"]: 389 """Get all Repositories accessible by the logged in User.""" 390 results = self.allspice_client.requests_get("/user/repos", sudo=self) 391 return [Repository.parse_response(self.allspice_client, result) for result in results] 392 393 def __request_emails(self): 394 result = self.allspice_client.requests_get(User.USER_MAIL % self.login) 395 # report if the adress changed by this 396 for mail in result: 397 self._emails.append(mail["email"]) 398 if mail["primary"]: 399 self._email = mail["email"] 400 401 def delete(self): 402 """Deletes this User. Also deletes all Repositories he owns.""" 403 self.allspice_client.requests_delete(User.ADMIN_DELETE_USER % self.username) 404 self.deleted = True 405 406 def get_heatmap(self) -> List[Tuple[datetime, int]]: 407 results = self.allspice_client.requests_get(User.USER_HEATMAP % self.username) 408 results = [ 409 (datetime.fromtimestamp(result["timestamp"]), result["contributions"]) 410 for result in results 411 ] 412 return results
316 def commit(self, login_name: str, source_id: int = 0): 317 """ 318 Unfortunately it is necessary to require the login name 319 as well as the login source (that is not supplied when getting a user) for 320 changing a user. 321 Usually source_id is 0 and the login_name is equal to the username. 322 """ 323 values = self.get_dirty_fields() 324 values.update( 325 # api-doc says that the "source_id" is necessary; works without though 326 {"login_name": login_name, "source_id": source_id} 327 ) 328 args = {"username": self.username} 329 self.allspice_client.requests_patch(User.ADMIN_EDIT_USER.format(**args), data=values) 330 self._dirty_fields = {}
Unfortunately it is necessary to require the login name as well as the login source (that is not supplied when getting a user) for changing a user. Usually source_id is 0 and the login_name is equal to the username.
332 def create_repo( 333 self, 334 repoName: str, 335 description: str = "", 336 private: bool = False, 337 autoInit=True, 338 gitignores: Optional[str] = None, 339 license: Optional[str] = None, 340 readme: str = "Default", 341 issue_labels: Optional[str] = None, 342 default_branch="master", 343 ): 344 """Create a user Repository 345 346 Throws: 347 AlreadyExistsException: If the Repository exists already. 348 Exception: If something else went wrong. 349 """ 350 result = self.allspice_client.requests_post( 351 "/user/repos", 352 data={ 353 "name": repoName, 354 "description": description, 355 "private": private, 356 "auto_init": autoInit, 357 "gitignores": gitignores, 358 "license": license, 359 "issue_labels": issue_labels, 360 "readme": readme, 361 "default_branch": default_branch, 362 }, 363 ) 364 if "id" in result: 365 self.allspice_client.logger.info("Successfully created Repository %s " % result["name"]) 366 else: 367 self.allspice_client.logger.error(result["message"]) 368 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 369 return Repository.parse_response(self.allspice_client, result)
Create a user Repository
Throws: AlreadyExistsException: If the Repository exists already. Exception: If something else went wrong.
371 def get_repositories(self) -> List["Repository"]: 372 """Get all Repositories owned by this User.""" 373 url = f"/users/{self.username}/repos" 374 results = self.allspice_client.requests_get_paginated(url) 375 return [Repository.parse_response(self.allspice_client, result) for result in results]
Get all Repositories owned by this User.
377 def get_orgs(self) -> List[Organization]: 378 """Get all Organizations this user is a member of.""" 379 url = f"/users/{self.username}/orgs" 380 results = self.allspice_client.requests_get_paginated(url) 381 return [Organization.parse_response(self.allspice_client, result) for result in results]
Get all Organizations this user is a member of.
388 def get_accessible_repos(self) -> List["Repository"]: 389 """Get all Repositories accessible by the logged in User.""" 390 results = self.allspice_client.requests_get("/user/repos", sudo=self) 391 return [Repository.parse_response(self.allspice_client, result) for result in results]
Get all Repositories accessible by the logged in User.
401 def delete(self): 402 """Deletes this User. Also deletes all Repositories he owns.""" 403 self.allspice_client.requests_delete(User.ADMIN_DELETE_USER % self.username) 404 self.deleted = True
Deletes this User. Also deletes all Repositories he owns.
415class Branch(ReadonlyApiObject): 416 commit: Dict[str, Optional[Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]]] 417 effective_branch_protection_name: str 418 enable_status_check: bool 419 name: str 420 protected: bool 421 required_approvals: int 422 status_check_contexts: List[Any] 423 user_can_merge: bool 424 user_can_push: bool 425 426 API_OBJECT = """/repos/{owner}/{repo}/branches/{branch}""" 427 428 def __init__(self, allspice_client): 429 super().__init__(allspice_client) 430 431 def __eq__(self, other): 432 if not isinstance(other, Branch): 433 return False 434 return self.commit == other.commit and self.name == other.name 435 436 def __hash__(self): 437 return hash(self.commit["id"]) ^ hash(self.name) 438 439 _fields_to_parsers: ClassVar[dict] = { 440 # This is not a commit object 441 # "commit": lambda allspice_client, c: Commit.parse_response(allspice_client, c) 442 } 443 444 @classmethod 445 def request(cls, allspice_client, owner: str, repo: str, branch: str): 446 return cls._request(allspice_client, {"owner": owner, "repo": repo, "branch": branch})
Inherited Members
449class GitEntry(ReadonlyApiObject): 450 """ 451 An object representing a file or directory in the Git tree. 452 """ 453 454 mode: str 455 path: str 456 sha: str 457 size: int 458 type: str 459 url: str 460 461 def __init__(self, allspice_client): 462 super().__init__(allspice_client) 463 464 def __eq__(self, other) -> bool: 465 if not isinstance(other, GitEntry): 466 return False 467 return self.sha == other.sha 468 469 def __hash__(self) -> int: 470 return hash(self.sha)
An object representing a file or directory in the Git tree.
Inherited Members
473class Repository(ApiObject): 474 allow_manual_merge: Any 475 allow_merge_commits: bool 476 allow_rebase: bool 477 allow_rebase_explicit: bool 478 allow_rebase_update: bool 479 allow_squash_merge: bool 480 archived: bool 481 archived_at: str 482 autodetect_manual_merge: Any 483 avatar_url: str 484 clone_url: str 485 created_at: str 486 default_allow_maintainer_edit: bool 487 default_branch: str 488 default_delete_branch_after_merge: bool 489 default_merge_style: str 490 description: str 491 empty: bool 492 enable_prune: Any 493 external_tracker: Any 494 external_wiki: Any 495 fork: bool 496 forks_count: int 497 full_name: str 498 has_actions: bool 499 has_issues: bool 500 has_packages: bool 501 has_projects: bool 502 has_pull_requests: bool 503 has_releases: bool 504 has_wiki: bool 505 html_url: str 506 id: int 507 ignore_whitespace_conflicts: bool 508 internal: bool 509 internal_tracker: Dict[str, bool] 510 language: str 511 languages_url: str 512 link: str 513 mirror: bool 514 mirror_interval: str 515 mirror_updated: str 516 name: str 517 open_issues_count: int 518 open_pr_counter: int 519 original_url: str 520 owner: Union["User", "Organization"] 521 parent: Any 522 permissions: Dict[str, bool] 523 private: bool 524 release_counter: int 525 repo_transfer: Any 526 size: int 527 ssh_url: str 528 stars_count: int 529 template: bool 530 updated_at: datetime 531 url: str 532 watchers_count: int 533 website: str 534 535 API_OBJECT = """/repos/{owner}/{name}""" # <owner>, <reponame> 536 REPO_IS_COLLABORATOR = """/repos/%s/%s/collaborators/%s""" # <owner>, <reponame>, <username> 537 REPO_SEARCH = """/repos/search/""" 538 REPO_BRANCHES = """/repos/%s/%s/branches""" # <owner>, <reponame> 539 REPO_BRANCH = """/repos/{owner}/{repo}/branches/{branch}""" 540 REPO_ISSUES = """/repos/{owner}/{repo}/issues""" # <owner, reponame> 541 REPO_DESIGN_REVIEWS = """/repos/{owner}/{repo}/pulls""" 542 REPO_DELETE = """/repos/%s/%s""" # <owner>, <reponame> 543 REPO_TIMES = """/repos/%s/%s/times""" # <owner>, <reponame> 544 REPO_USER_TIME = """/repos/%s/%s/times/%s""" # <owner>, <reponame>, <username> 545 REPO_COMMITS = "/repos/%s/%s/commits" # <owner>, <reponame> 546 REPO_TRANSFER = "/repos/{owner}/{repo}/transfer" 547 REPO_MILESTONES = """/repos/{owner}/{repo}/milestones""" 548 REPO_GET_ARCHIVE = "/repos/{owner}/{repo}/archive/{ref}.{format}" 549 REPO_GET_ALLSPICE_JSON = "/repos/{owner}/{repo}/allspice_generated/json/{content}" 550 REPO_GET_ALLSPICE_SVG = "/repos/{owner}/{repo}/allspice_generated/svg/{content}" 551 REPO_GET_TOPICS = "/repos/{owner}/{repo}/topics" 552 REPO_ADD_TOPIC = "/repos/{owner}/{repo}/topics/{topic}" 553 REPO_GET_RELEASES = "/repos/{owner}/{repo}/releases" 554 REPO_GET_LATEST_RELEASE = "/repos/{owner}/{repo}/releases/latest" 555 REPO_GET_RELEASE_BY_TAG = "/repos/{owner}/{repo}/releases/tags/{tag}" 556 REPO_GET_COMMIT_STATUS = "/repos/{owner}/{repo}/statuses/{sha}" 557 REPO_GET_RAW_FILE = "/repos/{owner}/{repo}/raw/{path}" 558 REPO_GET_TREE = "/repos/{owner}/{repo}/git/trees/{ref}" 559 560 class ArchiveFormat(Enum): 561 """ 562 Archive formats for Repository.get_archive 563 """ 564 565 TAR = "tar.gz" 566 ZIP = "zip" 567 568 class CommitStatusSort(Enum): 569 """ 570 Sort order for Repository.get_commit_status 571 """ 572 573 OLDEST = "oldest" 574 RECENT_UPDATE = "recentupdate" 575 LEAST_UPDATE = "leastupdate" 576 LEAST_INDEX = "leastindex" 577 HIGHEST_INDEX = "highestindex" 578 579 def __init__(self, allspice_client): 580 super().__init__(allspice_client) 581 582 def __eq__(self, other): 583 if not isinstance(other, Repository): 584 return False 585 return self.owner == other.owner and self.name == other.name 586 587 def __hash__(self): 588 return hash(self.owner) ^ hash(self.name) 589 590 _fields_to_parsers: ClassVar[dict] = { 591 # dont know how to tell apart user and org as owner except form email being empty. 592 "owner": lambda allspice_client, r: ( 593 Organization.parse_response(allspice_client, r) 594 if r["email"] == "" 595 else User.parse_response(allspice_client, r) 596 ), 597 "updated_at": lambda _, t: Util.convert_time(t), 598 } 599 600 @classmethod 601 def request( 602 cls, 603 allspice_client, 604 owner: str, 605 name: str, 606 ) -> Repository: 607 return cls._request(allspice_client, {"owner": owner, "name": name}) 608 609 @classmethod 610 def search( 611 cls, 612 allspice_client, 613 query: Optional[str] = None, 614 topic: bool = False, 615 include_description: bool = False, 616 user: Optional[User] = None, 617 owner_to_prioritize: Union[User, Organization, None] = None, 618 ) -> list[Repository]: 619 """ 620 Search for repositories. 621 622 See https://hub.allspice.io/api/swagger#/repository/repoSearch 623 624 :param query: The query string to search for 625 :param topic: If true, the query string will only be matched against the 626 repository's topic. 627 :param include_description: If true, the query string will be matched 628 against the repository's description as well. 629 :param user: If specified, only repositories that this user owns or 630 contributes to will be searched. 631 :param owner_to_prioritize: If specified, repositories owned by the 632 given entity will be prioritized in the search. 633 :returns: All repositories matching the query. If there are many 634 repositories matching this query, this may take some time. 635 """ 636 637 params = {} 638 639 if query is not None: 640 params["q"] = query 641 if topic: 642 params["topic"] = topic 643 if include_description: 644 params["include_description"] = include_description 645 if user is not None: 646 params["user"] = user.id 647 if owner_to_prioritize is not None: 648 params["owner_to_prioritize"] = owner_to_prioritize.id 649 650 responses = allspice_client.requests_get_paginated(cls.REPO_SEARCH, params=params) 651 652 return [Repository.parse_response(allspice_client, response) for response in responses] 653 654 _patchable_fields: ClassVar[set[str]] = { 655 "allow_manual_merge", 656 "allow_merge_commits", 657 "allow_rebase", 658 "allow_rebase_explicit", 659 "allow_rebase_update", 660 "allow_squash_merge", 661 "archived", 662 "autodetect_manual_merge", 663 "default_branch", 664 "default_delete_branch_after_merge", 665 "default_merge_style", 666 "description", 667 "enable_prune", 668 "external_tracker", 669 "external_wiki", 670 "has_issues", 671 "has_projects", 672 "has_pull_requests", 673 "has_wiki", 674 "ignore_whitespace_conflicts", 675 "internal_tracker", 676 "mirror_interval", 677 "name", 678 "private", 679 "template", 680 "website", 681 } 682 683 def commit(self): 684 args = {"owner": self.owner.username, "name": self.name} 685 self._commit(args) 686 687 def get_branches(self) -> List["Branch"]: 688 """Get all the Branches of this Repository.""" 689 690 results = self.allspice_client.requests_get_paginated( 691 Repository.REPO_BRANCHES % (self.owner.username, self.name) 692 ) 693 return [Branch.parse_response(self.allspice_client, result) for result in results] 694 695 def get_branch(self, name: str) -> "Branch": 696 """Get a specific Branch of this Repository.""" 697 result = self.allspice_client.requests_get( 698 Repository.REPO_BRANCH.format(owner=self.owner.username, repo=self.name, branch=name) 699 ) 700 return Branch.parse_response(self.allspice_client, result) 701 702 def add_branch(self, create_from: Ref, newname: str) -> "Branch": 703 """Add a branch to the repository""" 704 # Note: will only work with gitea 1.13 or higher! 705 706 ref_name = Util.data_params_for_ref(create_from) 707 if "ref" not in ref_name: 708 raise ValueError("create_from must be a Branch, Commit or string") 709 ref_name = ref_name["ref"] 710 711 data = {"new_branch_name": newname, "old_ref_name": ref_name} 712 result = self.allspice_client.requests_post( 713 Repository.REPO_BRANCHES % (self.owner.username, self.name), data=data 714 ) 715 return Branch.parse_response(self.allspice_client, result) 716 717 def get_issues( 718 self, 719 state: Literal["open", "closed", "all"] = "all", 720 search_query: Optional[str] = None, 721 labels: Optional[List[str]] = None, 722 milestones: Optional[List[Union[Milestone, str]]] = None, 723 assignee: Optional[Union[User, str]] = None, 724 since: Optional[datetime] = None, 725 before: Optional[datetime] = None, 726 ) -> List["Issue"]: 727 """ 728 Get all Issues of this Repository (open and closed) 729 730 https://hub.allspice.io/api/swagger#/repository/repoListIssues 731 732 All params of this method are optional filters. If you don't specify a filter, it 733 will not be applied. 734 735 :param state: The state of the Issues to get. If None, all Issues are returned. 736 :param search_query: Filter issues by text. This is equivalent to searching for 737 `search_query` in the Issues on the web interface. 738 :param labels: Filter issues by labels. 739 :param milestones: Filter issues by milestones. 740 :param assignee: Filter issues by the assigned user. 741 :param since: Filter issues by the date they were created. 742 :param before: Filter issues by the date they were created. 743 :return: A list of Issues. 744 """ 745 746 data = { 747 "state": state, 748 } 749 if search_query: 750 data["q"] = search_query 751 if labels: 752 data["labels"] = ",".join(labels) 753 if milestones: 754 data["milestone"] = ",".join( 755 [ 756 milestone.name if isinstance(milestone, Milestone) else milestone 757 for milestone in milestones 758 ] 759 ) 760 if assignee: 761 if isinstance(assignee, User): 762 data["assignee"] = assignee.username 763 else: 764 data["assignee"] = assignee 765 if since: 766 data["since"] = Util.format_time(since) 767 if before: 768 data["before"] = Util.format_time(before) 769 770 results = self.allspice_client.requests_get_paginated( 771 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 772 params=data, 773 ) 774 775 issues = [] 776 for result in results: 777 issue = Issue.parse_response(self.allspice_client, result) 778 # See Issue.request 779 setattr(issue, "_repository", self) 780 # This is mostly for compatibility with an older implementation 781 Issue._add_read_property("repo", self, issue) 782 issues.append(issue) 783 784 return issues 785 786 def get_design_reviews( 787 self, 788 state: Literal["open", "closed", "all"] = "all", 789 milestone: Optional[Union[Milestone, str]] = None, 790 labels: Optional[List[str]] = None, 791 ) -> List["DesignReview"]: 792 """ 793 Get all Design Reviews of this Repository. 794 795 https://hub.allspice.io/api/swagger#/repository/repoListPullRequests 796 797 :param state: The state of the Design Reviews to get. If None, all Design Reviews 798 are returned. 799 :param milestone: The milestone of the Design Reviews to get. 800 :param labels: A list of label IDs to filter DRs by. 801 :return: A list of Design Reviews. 802 """ 803 804 params = { 805 "state": state, 806 } 807 if milestone: 808 if isinstance(milestone, Milestone): 809 params["milestone"] = milestone.name 810 else: 811 params["milestone"] = milestone 812 if labels: 813 params["labels"] = ",".join(labels) 814 815 results = self.allspice_client.requests_get_paginated( 816 self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name), 817 params=params, 818 ) 819 return [DesignReview.parse_response(self.allspice_client, result) for result in results] 820 821 def get_commits( 822 self, 823 sha: Optional[str] = None, 824 path: Optional[str] = None, 825 stat: bool = True, 826 ) -> List["Commit"]: 827 """ 828 Get all the Commits of this Repository. 829 830 https://hub.allspice.io/api/swagger#/repository/repoGetAllCommits 831 832 :param sha: The SHA of the commit to start listing commits from. 833 :param path: filepath of a file/dir. 834 :param stat: Include the number of additions and deletions in the response. 835 Disable for speedup. 836 :return: A list of Commits. 837 """ 838 839 data = {} 840 if sha: 841 data["sha"] = sha 842 if path: 843 data["path"] = path 844 if not stat: 845 data["stat"] = False 846 847 try: 848 results = self.allspice_client.requests_get_paginated( 849 Repository.REPO_COMMITS % (self.owner.username, self.name), 850 params=data, 851 ) 852 except ConflictException as err: 853 logging.warning(err) 854 logging.warning("Repository %s/%s is Empty" % (self.owner.username, self.name)) 855 results = [] 856 return [Commit.parse_response(self.allspice_client, result) for result in results] 857 858 def get_issues_state(self, state) -> List["Issue"]: 859 """ 860 DEPRECATED: Use get_issues() instead. 861 862 Get issues of state Issue.open or Issue.closed of a repository. 863 """ 864 865 assert state in [Issue.OPENED, Issue.CLOSED] 866 issues = [] 867 data = {"state": state} 868 results = self.allspice_client.requests_get_paginated( 869 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 870 params=data, 871 ) 872 for result in results: 873 issue = Issue.parse_response(self.allspice_client, result) 874 # adding data not contained in the issue response 875 # See Issue.request() 876 setattr(issue, "_repository", self) 877 Issue._add_read_property("repo", self, issue) 878 Issue._add_read_property("owner", self.owner, issue) 879 issues.append(issue) 880 return issues 881 882 def get_times(self): 883 results = self.allspice_client.requests_get( 884 Repository.REPO_TIMES % (self.owner.username, self.name) 885 ) 886 return results 887 888 def get_user_time(self, username) -> float: 889 if isinstance(username, User): 890 username = username.username 891 results = self.allspice_client.requests_get( 892 Repository.REPO_USER_TIME % (self.owner.username, self.name, username) 893 ) 894 time = sum(r["time"] for r in results) 895 return time 896 897 def get_full_name(self) -> str: 898 return self.owner.username + "/" + self.name 899 900 def create_issue(self, title, assignees=frozenset(), description="") -> ApiObject: 901 data = { 902 "assignees": assignees, 903 "body": description, 904 "closed": False, 905 "title": title, 906 } 907 result = self.allspice_client.requests_post( 908 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 909 data=data, 910 ) 911 912 issue = Issue.parse_response(self.allspice_client, result) 913 setattr(issue, "_repository", self) 914 Issue._add_read_property("repo", self, issue) 915 return issue 916 917 def create_design_review( 918 self, 919 title: str, 920 head: Union[Branch, str], 921 base: Union[Branch, str], 922 assignees: Optional[Set[Union[User, str]]] = None, 923 body: Optional[str] = None, 924 due_date: Optional[datetime] = None, 925 milestone: Optional["Milestone"] = None, 926 ) -> "DesignReview": 927 """ 928 Create a new Design Review. 929 930 See https://hub.allspice.io/api/swagger#/repository/repoCreatePullRequest 931 932 :param title: Title of the Design Review 933 :param head: Branch or name of the branch to merge into the base branch 934 :param base: Branch or name of the branch to merge into 935 :param assignees: Optional. A list of users to assign this review. List can be of 936 User objects or of usernames. 937 :param body: An Optional Description for the Design Review. 938 :param due_date: An Optional Due date for the Design Review. 939 :param milestone: An Optional Milestone for the Design Review 940 :return: The created Design Review 941 """ 942 943 data: dict[str, Any] = { 944 "title": title, 945 } 946 947 if isinstance(head, Branch): 948 data["head"] = head.name 949 else: 950 data["head"] = head 951 if isinstance(base, Branch): 952 data["base"] = base.name 953 else: 954 data["base"] = base 955 if assignees: 956 data["assignees"] = [a.username if isinstance(a, User) else a for a in assignees] 957 if body: 958 data["body"] = body 959 if due_date: 960 data["due_date"] = Util.format_time(due_date) 961 if milestone: 962 data["milestone"] = milestone.id 963 964 result = self.allspice_client.requests_post( 965 self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name), 966 data=data, 967 ) 968 969 return DesignReview.parse_response(self.allspice_client, result) 970 971 def create_milestone( 972 self, 973 title: str, 974 description: str, 975 due_date: Optional[str] = None, 976 state: str = "open", 977 ) -> "Milestone": 978 url = Repository.REPO_MILESTONES.format(owner=self.owner.username, repo=self.name) 979 data = {"title": title, "description": description, "state": state} 980 if due_date: 981 data["due_date"] = due_date 982 result = self.allspice_client.requests_post(url, data=data) 983 return Milestone.parse_response(self.allspice_client, result) 984 985 def create_gitea_hook(self, hook_url: str, events: List[str]): 986 url = f"/repos/{self.owner.username}/{self.name}/hooks" 987 data = { 988 "type": "gitea", 989 "config": {"content_type": "json", "url": hook_url}, 990 "events": events, 991 "active": True, 992 } 993 return self.allspice_client.requests_post(url, data=data) 994 995 def list_hooks(self): 996 url = f"/repos/{self.owner.username}/{self.name}/hooks" 997 return self.allspice_client.requests_get(url) 998 999 def delete_hook(self, id: str): 1000 url = f"/repos/{self.owner.username}/{self.name}/hooks/{id}" 1001 self.allspice_client.requests_delete(url) 1002 1003 def is_collaborator(self, username) -> bool: 1004 if isinstance(username, User): 1005 username = username.username 1006 try: 1007 # returns 204 if its ok, 404 if its not 1008 self.allspice_client.requests_get( 1009 Repository.REPO_IS_COLLABORATOR % (self.owner.username, self.name, username) 1010 ) 1011 return True 1012 except Exception: 1013 return False 1014 1015 def get_users_with_access(self) -> Sequence[User]: 1016 url = f"/repos/{self.owner.username}/{self.name}/collaborators" 1017 response = self.allspice_client.requests_get(url) 1018 collabs = [User.parse_response(self.allspice_client, user) for user in response] 1019 if isinstance(self.owner, User): 1020 return [*collabs, self.owner] 1021 else: 1022 # owner must be org 1023 teams = self.owner.get_teams() 1024 for team in teams: 1025 team_repos = team.get_repos() 1026 if self.name in [n.name for n in team_repos]: 1027 collabs += team.get_members() 1028 return collabs 1029 1030 def remove_collaborator(self, user_name: str): 1031 url = f"/repos/{self.owner.username}/{self.name}/collaborators/{user_name}" 1032 self.allspice_client.requests_delete(url) 1033 1034 def transfer_ownership( 1035 self, 1036 new_owner: Union[User, Organization], 1037 new_teams: Set[Team] | FrozenSet[Team] = frozenset(), 1038 ): 1039 url = Repository.REPO_TRANSFER.format(owner=self.owner.username, repo=self.name) 1040 data: dict[str, Any] = {"new_owner": new_owner.username} 1041 if isinstance(new_owner, Organization): 1042 new_team_ids = [team.id for team in new_teams if team in new_owner.get_teams()] 1043 data["team_ids"] = new_team_ids 1044 self.allspice_client.requests_post(url, data=data) 1045 # TODO: make sure this instance is either updated or discarded 1046 1047 def get_git_content( 1048 self, 1049 ref: Optional["Ref"] = None, 1050 commit: "Optional[Commit]" = None, 1051 ) -> List[Content]: 1052 """ 1053 Get the metadata for all files in the root directory. 1054 1055 https://hub.allspice.io/api/swagger#/repository/repoGetContentsList 1056 1057 :param ref: branch or commit to get content from 1058 :param commit: commit to get content from (deprecated) 1059 """ 1060 url = f"/repos/{self.owner.username}/{self.name}/contents" 1061 data = Util.data_params_for_ref(ref or commit) 1062 1063 result = [ 1064 Content.parse_response(self.allspice_client, f) 1065 for f in self.allspice_client.requests_get(url, data) 1066 ] 1067 return result 1068 1069 def get_tree(self, ref: Optional[Ref] = None, recursive: bool = False) -> List[GitEntry]: 1070 """ 1071 Get the repository's tree on a given ref. 1072 1073 By default, this will only return the top-level entries in the tree. If you want 1074 to get the entire tree, set `recursive` to True. 1075 1076 :param ref: The ref to get the tree from. If not provided, the default branch is used. 1077 :param recursive: Whether to get the entire tree or just the top-level entries. 1078 """ 1079 1080 ref = Util.data_params_for_ref(ref).get("ref", self.default_branch) 1081 url = self.REPO_GET_TREE.format(owner=self.owner.username, repo=self.name, ref=ref) 1082 params = {"recursive": recursive} 1083 results = self.allspice_client.requests_get_paginated(url, params=params) 1084 return [GitEntry.parse_response(self.allspice_client, result) for result in results] 1085 1086 def get_file_content( 1087 self, 1088 content: Content, 1089 ref: Optional[Ref] = None, 1090 commit: Optional[Commit] = None, 1091 ) -> Union[str, List["Content"]]: 1092 """https://hub.allspice.io/api/swagger#/repository/repoGetContents""" 1093 url = f"/repos/{self.owner.username}/{self.name}/contents/{content.path}" 1094 data = Util.data_params_for_ref(ref or commit) 1095 1096 if content.type == Content.FILE: 1097 return self.allspice_client.requests_get(url, data)["content"] 1098 else: 1099 return [ 1100 Content.parse_response(self.allspice_client, f) 1101 for f in self.allspice_client.requests_get(url, data) 1102 ] 1103 1104 def get_raw_file( 1105 self, 1106 file_path: str, 1107 ref: Optional[Ref] = None, 1108 ) -> bytes: 1109 """ 1110 Get the raw, binary data of a single file. 1111 1112 Note 1: if the file you are requesting is a text file, you might want to 1113 use .decode() on the result to get a string. For example: 1114 1115 content = repo.get_raw_file("file.txt").decode("utf-8") 1116 1117 Note 2: this method will store the entire file in memory. If you want 1118 to download a large file, you might want to use `download_to_file` 1119 instead. 1120 1121 See https://hub.allspice.io/api/swagger#/repository/repoGetRawFile 1122 1123 :param file_path: The path to the file to get. 1124 :param ref: The branch or commit to get the file from. If not provided, 1125 the default branch is used. 1126 """ 1127 1128 url = self.REPO_GET_RAW_FILE.format( 1129 owner=self.owner.username, 1130 repo=self.name, 1131 path=file_path, 1132 ) 1133 params = Util.data_params_for_ref(ref) 1134 return self.allspice_client.requests_get_raw(url, params=params) 1135 1136 def download_to_file( 1137 self, 1138 file_path: str, 1139 io: IO, 1140 ref: Optional[Ref] = None, 1141 ) -> None: 1142 """ 1143 Download the binary data of a file to a file-like object. 1144 1145 Example: 1146 1147 with open("schematic.DSN", "wb") as f: 1148 Repository.download_to_file("Schematics/my_schematic.DSN", f) 1149 1150 :param file_path: The path to the file in the repository from the root 1151 of the repository. 1152 :param io: The file-like object to write the data to. 1153 """ 1154 1155 url = self.allspice_client._AllSpice__get_url( 1156 self.REPO_GET_RAW_FILE.format( 1157 owner=self.owner.username, 1158 repo=self.name, 1159 path=file_path, 1160 ) 1161 ) 1162 params = Util.data_params_for_ref(ref) 1163 response = self.allspice_client.requests.get( 1164 url, 1165 params=params, 1166 headers=self.allspice_client.headers, 1167 stream=True, 1168 ) 1169 1170 for chunk in response.iter_content(chunk_size=4096): 1171 if chunk: 1172 io.write(chunk) 1173 1174 def get_generated_json(self, content: Union[Content, str], ref: Optional[Ref] = None) -> dict: 1175 """ 1176 Get the json blob for a cad file if it exists, otherwise enqueue 1177 a new job and return a 503 status. 1178 1179 WARNING: This is still experimental and not recommended for critical 1180 applications. The structure and content of the returned dictionary can 1181 change at any time. 1182 1183 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON 1184 """ 1185 1186 if isinstance(content, Content): 1187 content = content.path 1188 1189 url = self.REPO_GET_ALLSPICE_JSON.format( 1190 owner=self.owner.username, 1191 repo=self.name, 1192 content=content, 1193 ) 1194 data = Util.data_params_for_ref(ref) 1195 return self.allspice_client.requests_get(url, data) 1196 1197 def get_generated_svg(self, content: Union[Content, str], ref: Optional[Ref] = None) -> bytes: 1198 """ 1199 Get the svg blob for a cad file if it exists, otherwise enqueue 1200 a new job and return a 503 status. 1201 1202 WARNING: This is still experimental and not yet recommended for 1203 critical applications. The content of the returned svg can change 1204 at any time. 1205 1206 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG 1207 """ 1208 1209 if isinstance(content, Content): 1210 content = content.path 1211 1212 url = self.REPO_GET_ALLSPICE_SVG.format( 1213 owner=self.owner.username, 1214 repo=self.name, 1215 content=content, 1216 ) 1217 data = Util.data_params_for_ref(ref) 1218 return self.allspice_client.requests_get_raw(url, data) 1219 1220 def create_file(self, file_path: str, content: str, data: Optional[dict] = None): 1221 """https://hub.allspice.io/api/swagger#/repository/repoCreateFile""" 1222 if not data: 1223 data = {} 1224 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1225 data.update({"content": content}) 1226 return self.allspice_client.requests_post(url, data) 1227 1228 def change_file(self, file_path: str, file_sha: str, content: str, data: Optional[dict] = None): 1229 """https://hub.allspice.io/api/swagger#/repository/repoCreateFile""" 1230 if not data: 1231 data = {} 1232 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1233 data.update({"sha": file_sha, "content": content}) 1234 return self.allspice_client.requests_put(url, data) 1235 1236 def delete_file(self, file_path: str, file_sha: str, data: Optional[dict] = None): 1237 """https://hub.allspice.io/api/swagger#/repository/repoDeleteFile""" 1238 if not data: 1239 data = {} 1240 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1241 data.update({"sha": file_sha}) 1242 return self.allspice_client.requests_delete(url, data) 1243 1244 def get_archive( 1245 self, 1246 ref: Ref = "main", 1247 archive_format: ArchiveFormat = ArchiveFormat.ZIP, 1248 ) -> bytes: 1249 """ 1250 Download all the files in a specific ref of a repository as a zip or tarball 1251 archive. 1252 1253 https://hub.allspice.io/api/swagger#/repository/repoGetArchive 1254 1255 :param ref: branch or commit to get content from, defaults to the "main" branch 1256 :param archive_format: zip or tar, defaults to zip 1257 """ 1258 1259 ref_string = Util.data_params_for_ref(ref)["ref"] 1260 url = self.REPO_GET_ARCHIVE.format( 1261 owner=self.owner.username, 1262 repo=self.name, 1263 ref=ref_string, 1264 format=archive_format.value, 1265 ) 1266 return self.allspice_client.requests_get_raw(url) 1267 1268 def get_topics(self) -> list[str]: 1269 """ 1270 Gets the list of topics on this repository. 1271 1272 See http://localhost:3000/api/swagger#/repository/repoListTopics 1273 """ 1274 1275 url = self.REPO_GET_TOPICS.format( 1276 owner=self.owner.username, 1277 repo=self.name, 1278 ) 1279 return self.allspice_client.requests_get(url)["topics"] 1280 1281 def add_topic(self, topic: str): 1282 """ 1283 Adds a topic to the repository. 1284 1285 See https://hub.allspice.io/api/swagger#/repository/repoAddTopic 1286 1287 :param topic: The topic to add. Topic names must consist only of 1288 lowercase letters, numnbers and dashes (-), and cannot start with 1289 dashes. Topic names also must be under 35 characters long. 1290 """ 1291 1292 url = self.REPO_ADD_TOPIC.format(owner=self.owner.username, repo=self.name, topic=topic) 1293 self.allspice_client.requests_put(url) 1294 1295 def create_release( 1296 self, 1297 tag_name: str, 1298 name: Optional[str] = None, 1299 body: Optional[str] = None, 1300 draft: bool = False, 1301 ): 1302 """ 1303 Create a release for this repository. The release will be created for 1304 the tag with the given name. If there is no tag with this name, create 1305 the tag first. 1306 1307 See https://hub.allspice.io/api/swagger#/repository/repoCreateRelease 1308 """ 1309 1310 url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name) 1311 data = { 1312 "tag_name": tag_name, 1313 "draft": draft, 1314 } 1315 if name is not None: 1316 data["name"] = name 1317 if body is not None: 1318 data["body"] = body 1319 response = self.allspice_client.requests_post(url, data) 1320 return Release.parse_response(self.allspice_client, response, self) 1321 1322 def get_releases( 1323 self, draft: Optional[bool] = None, pre_release: Optional[bool] = None 1324 ) -> List[Release]: 1325 """ 1326 Get the list of releases for this repository. 1327 1328 See https://hub.allspice.io/api/swagger#/repository/repoListReleases 1329 """ 1330 1331 data = {} 1332 1333 if draft is not None: 1334 data["draft"] = draft 1335 if pre_release is not None: 1336 data["pre-release"] = pre_release 1337 1338 url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name) 1339 responses = self.allspice_client.requests_get_paginated(url, params=data) 1340 1341 return [ 1342 Release.parse_response(self.allspice_client, response, self) for response in responses 1343 ] 1344 1345 def get_latest_release(self) -> Release: 1346 """ 1347 Get the latest release for this repository. 1348 1349 See https://hub.allspice.io/api/swagger#/repository/repoGetLatestRelease 1350 """ 1351 1352 url = self.REPO_GET_LATEST_RELEASE.format(owner=self.owner.username, repo=self.name) 1353 response = self.allspice_client.requests_get(url) 1354 release = Release.parse_response(self.allspice_client, response, self) 1355 return release 1356 1357 def get_release_by_tag(self, tag: str) -> Release: 1358 """ 1359 Get a release by its tag. 1360 1361 See https://hub.allspice.io/api/swagger#/repository/repoGetReleaseByTag 1362 """ 1363 1364 url = self.REPO_GET_RELEASE_BY_TAG.format( 1365 owner=self.owner.username, repo=self.name, tag=tag 1366 ) 1367 response = self.allspice_client.requests_get(url) 1368 release = Release.parse_response(self.allspice_client, response, self) 1369 return release 1370 1371 def get_commit_statuses( 1372 self, 1373 commit: Union[str, Commit], 1374 sort: Optional[CommitStatusSort] = None, 1375 state: Optional[CommitStatusState] = None, 1376 ) -> List[CommitStatus]: 1377 """ 1378 Get a list of statuses for a commit. 1379 1380 This is roughly equivalent to the Commit.get_statuses method, but this 1381 method allows you to sort and filter commits and is more convenient if 1382 you have a commit SHA and don't need to get the commit itself. 1383 1384 See https://hub.allspice.io/api/swagger#/repository/repoListStatuses 1385 """ 1386 1387 if isinstance(commit, Commit): 1388 commit = commit.sha 1389 1390 params = {} 1391 if sort is not None: 1392 params["sort"] = sort.value 1393 if state is not None: 1394 params["state"] = state.value 1395 1396 url = self.REPO_GET_COMMIT_STATUS.format( 1397 owner=self.owner.username, repo=self.name, sha=commit 1398 ) 1399 response = self.allspice_client.requests_get_paginated(url, params=params) 1400 return [CommitStatus.parse_response(self.allspice_client, status) for status in response] 1401 1402 def create_commit_status( 1403 self, 1404 commit: Union[str, Commit], 1405 context: Optional[str] = None, 1406 description: Optional[str] = None, 1407 state: Optional[CommitStatusState] = None, 1408 target_url: Optional[str] = None, 1409 ) -> CommitStatus: 1410 """ 1411 Create a status on a commit. 1412 1413 See https://hub.allspice.io/api/swagger#/repository/repoCreateStatus 1414 """ 1415 1416 if isinstance(commit, Commit): 1417 commit = commit.sha 1418 1419 data = {} 1420 if context is not None: 1421 data["context"] = context 1422 if description is not None: 1423 data["description"] = description 1424 if state is not None: 1425 data["state"] = state.value 1426 if target_url is not None: 1427 data["target_url"] = target_url 1428 1429 url = self.REPO_GET_COMMIT_STATUS.format( 1430 owner=self.owner.username, repo=self.name, sha=commit 1431 ) 1432 response = self.allspice_client.requests_post(url, data=data) 1433 return CommitStatus.parse_response(self.allspice_client, response) 1434 1435 def delete(self): 1436 self.allspice_client.requests_delete( 1437 Repository.REPO_DELETE % (self.owner.username, self.name) 1438 ) 1439 self.deleted = True
609 @classmethod 610 def search( 611 cls, 612 allspice_client, 613 query: Optional[str] = None, 614 topic: bool = False, 615 include_description: bool = False, 616 user: Optional[User] = None, 617 owner_to_prioritize: Union[User, Organization, None] = None, 618 ) -> list[Repository]: 619 """ 620 Search for repositories. 621 622 See https://hub.allspice.io/api/swagger#/repository/repoSearch 623 624 :param query: The query string to search for 625 :param topic: If true, the query string will only be matched against the 626 repository's topic. 627 :param include_description: If true, the query string will be matched 628 against the repository's description as well. 629 :param user: If specified, only repositories that this user owns or 630 contributes to will be searched. 631 :param owner_to_prioritize: If specified, repositories owned by the 632 given entity will be prioritized in the search. 633 :returns: All repositories matching the query. If there are many 634 repositories matching this query, this may take some time. 635 """ 636 637 params = {} 638 639 if query is not None: 640 params["q"] = query 641 if topic: 642 params["topic"] = topic 643 if include_description: 644 params["include_description"] = include_description 645 if user is not None: 646 params["user"] = user.id 647 if owner_to_prioritize is not None: 648 params["owner_to_prioritize"] = owner_to_prioritize.id 649 650 responses = allspice_client.requests_get_paginated(cls.REPO_SEARCH, params=params) 651 652 return [Repository.parse_response(allspice_client, response) for response in responses]
Search for repositories.
See allspice.allspice.io/api/swagger#/repository/repoSearch">https://huballspice.allspice.io/api/swagger#/repository/repoSearch
Parameters
- query: The query string to search for
- topic: If true, the query string will only be matched against the repository's topic.
- include_description: If true, the query string will be matched against the repository's description as well.
- user: If specified, only repositories that this user owns or contributes to will be searched.
- owner_to_prioritize: If specified, repositories owned by the given entity will be prioritized in the search. :returns: All repositories matching the query. If there are many repositories matching this query, this may take some time.
687 def get_branches(self) -> List["Branch"]: 688 """Get all the Branches of this Repository.""" 689 690 results = self.allspice_client.requests_get_paginated( 691 Repository.REPO_BRANCHES % (self.owner.username, self.name) 692 ) 693 return [Branch.parse_response(self.allspice_client, result) for result in results]
Get all the Branches of this Repository.
695 def get_branch(self, name: str) -> "Branch": 696 """Get a specific Branch of this Repository.""" 697 result = self.allspice_client.requests_get( 698 Repository.REPO_BRANCH.format(owner=self.owner.username, repo=self.name, branch=name) 699 ) 700 return Branch.parse_response(self.allspice_client, result)
Get a specific Branch of this Repository.
702 def add_branch(self, create_from: Ref, newname: str) -> "Branch": 703 """Add a branch to the repository""" 704 # Note: will only work with gitea 1.13 or higher! 705 706 ref_name = Util.data_params_for_ref(create_from) 707 if "ref" not in ref_name: 708 raise ValueError("create_from must be a Branch, Commit or string") 709 ref_name = ref_name["ref"] 710 711 data = {"new_branch_name": newname, "old_ref_name": ref_name} 712 result = self.allspice_client.requests_post( 713 Repository.REPO_BRANCHES % (self.owner.username, self.name), data=data 714 ) 715 return Branch.parse_response(self.allspice_client, result)
Add a branch to the repository
717 def get_issues( 718 self, 719 state: Literal["open", "closed", "all"] = "all", 720 search_query: Optional[str] = None, 721 labels: Optional[List[str]] = None, 722 milestones: Optional[List[Union[Milestone, str]]] = None, 723 assignee: Optional[Union[User, str]] = None, 724 since: Optional[datetime] = None, 725 before: Optional[datetime] = None, 726 ) -> List["Issue"]: 727 """ 728 Get all Issues of this Repository (open and closed) 729 730 https://hub.allspice.io/api/swagger#/repository/repoListIssues 731 732 All params of this method are optional filters. If you don't specify a filter, it 733 will not be applied. 734 735 :param state: The state of the Issues to get. If None, all Issues are returned. 736 :param search_query: Filter issues by text. This is equivalent to searching for 737 `search_query` in the Issues on the web interface. 738 :param labels: Filter issues by labels. 739 :param milestones: Filter issues by milestones. 740 :param assignee: Filter issues by the assigned user. 741 :param since: Filter issues by the date they were created. 742 :param before: Filter issues by the date they were created. 743 :return: A list of Issues. 744 """ 745 746 data = { 747 "state": state, 748 } 749 if search_query: 750 data["q"] = search_query 751 if labels: 752 data["labels"] = ",".join(labels) 753 if milestones: 754 data["milestone"] = ",".join( 755 [ 756 milestone.name if isinstance(milestone, Milestone) else milestone 757 for milestone in milestones 758 ] 759 ) 760 if assignee: 761 if isinstance(assignee, User): 762 data["assignee"] = assignee.username 763 else: 764 data["assignee"] = assignee 765 if since: 766 data["since"] = Util.format_time(since) 767 if before: 768 data["before"] = Util.format_time(before) 769 770 results = self.allspice_client.requests_get_paginated( 771 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 772 params=data, 773 ) 774 775 issues = [] 776 for result in results: 777 issue = Issue.parse_response(self.allspice_client, result) 778 # See Issue.request 779 setattr(issue, "_repository", self) 780 # This is mostly for compatibility with an older implementation 781 Issue._add_read_property("repo", self, issue) 782 issues.append(issue) 783 784 return issues
Get all Issues of this Repository (open and closed)
allspice.allspice.io/api/swagger#/repository/repoListIssues">https://huballspice.allspice.io/api/swagger#/repository/repoListIssues
All params of this method are optional filters. If you don't specify a filter, it will not be applied.
Parameters
- state: The state of the Issues to get. If None, all Issues are returned.
- search_query: Filter issues by text. This is equivalent to searching for
search_query
in the Issues on the web interface. - labels: Filter issues by labels.
- milestones: Filter issues by milestones.
- assignee: Filter issues by the assigned user.
- since: Filter issues by the date they were created.
- before: Filter issues by the date they were created.
Returns
A list of Issues.
786 def get_design_reviews( 787 self, 788 state: Literal["open", "closed", "all"] = "all", 789 milestone: Optional[Union[Milestone, str]] = None, 790 labels: Optional[List[str]] = None, 791 ) -> List["DesignReview"]: 792 """ 793 Get all Design Reviews of this Repository. 794 795 https://hub.allspice.io/api/swagger#/repository/repoListPullRequests 796 797 :param state: The state of the Design Reviews to get. If None, all Design Reviews 798 are returned. 799 :param milestone: The milestone of the Design Reviews to get. 800 :param labels: A list of label IDs to filter DRs by. 801 :return: A list of Design Reviews. 802 """ 803 804 params = { 805 "state": state, 806 } 807 if milestone: 808 if isinstance(milestone, Milestone): 809 params["milestone"] = milestone.name 810 else: 811 params["milestone"] = milestone 812 if labels: 813 params["labels"] = ",".join(labels) 814 815 results = self.allspice_client.requests_get_paginated( 816 self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name), 817 params=params, 818 ) 819 return [DesignReview.parse_response(self.allspice_client, result) for result in results]
Get all Design Reviews of this Repository.
allspice.allspice.io/api/swagger#/repository/repoListPullRequests">https://huballspice.allspice.io/api/swagger#/repository/repoListPullRequests
Parameters
- state: The state of the Design Reviews to get. If None, all Design Reviews are returned.
- milestone: The milestone of the Design Reviews to get.
- labels: A list of label IDs to filter DRs by.
Returns
A list of Design Reviews.
821 def get_commits( 822 self, 823 sha: Optional[str] = None, 824 path: Optional[str] = None, 825 stat: bool = True, 826 ) -> List["Commit"]: 827 """ 828 Get all the Commits of this Repository. 829 830 https://hub.allspice.io/api/swagger#/repository/repoGetAllCommits 831 832 :param sha: The SHA of the commit to start listing commits from. 833 :param path: filepath of a file/dir. 834 :param stat: Include the number of additions and deletions in the response. 835 Disable for speedup. 836 :return: A list of Commits. 837 """ 838 839 data = {} 840 if sha: 841 data["sha"] = sha 842 if path: 843 data["path"] = path 844 if not stat: 845 data["stat"] = False 846 847 try: 848 results = self.allspice_client.requests_get_paginated( 849 Repository.REPO_COMMITS % (self.owner.username, self.name), 850 params=data, 851 ) 852 except ConflictException as err: 853 logging.warning(err) 854 logging.warning("Repository %s/%s is Empty" % (self.owner.username, self.name)) 855 results = [] 856 return [Commit.parse_response(self.allspice_client, result) for result in results]
Get all the Commits of this Repository.
allspice.allspice.io/api/swagger#/repository/repoGetAllCommits">https://huballspice.allspice.io/api/swagger#/repository/repoGetAllCommits
Parameters
- sha: The SHA of the commit to start listing commits from.
- path: filepath of a file/dir.
- stat: Include the number of additions and deletions in the response. Disable for speedup.
Returns
A list of Commits.
858 def get_issues_state(self, state) -> List["Issue"]: 859 """ 860 DEPRECATED: Use get_issues() instead. 861 862 Get issues of state Issue.open or Issue.closed of a repository. 863 """ 864 865 assert state in [Issue.OPENED, Issue.CLOSED] 866 issues = [] 867 data = {"state": state} 868 results = self.allspice_client.requests_get_paginated( 869 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 870 params=data, 871 ) 872 for result in results: 873 issue = Issue.parse_response(self.allspice_client, result) 874 # adding data not contained in the issue response 875 # See Issue.request() 876 setattr(issue, "_repository", self) 877 Issue._add_read_property("repo", self, issue) 878 Issue._add_read_property("owner", self.owner, issue) 879 issues.append(issue) 880 return issues
DEPRECATED: Use get_issues() instead.
Get issues of state Issue.open or Issue.closed of a repository.
888 def get_user_time(self, username) -> float: 889 if isinstance(username, User): 890 username = username.username 891 results = self.allspice_client.requests_get( 892 Repository.REPO_USER_TIME % (self.owner.username, self.name, username) 893 ) 894 time = sum(r["time"] for r in results) 895 return time
900 def create_issue(self, title, assignees=frozenset(), description="") -> ApiObject: 901 data = { 902 "assignees": assignees, 903 "body": description, 904 "closed": False, 905 "title": title, 906 } 907 result = self.allspice_client.requests_post( 908 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 909 data=data, 910 ) 911 912 issue = Issue.parse_response(self.allspice_client, result) 913 setattr(issue, "_repository", self) 914 Issue._add_read_property("repo", self, issue) 915 return issue
917 def create_design_review( 918 self, 919 title: str, 920 head: Union[Branch, str], 921 base: Union[Branch, str], 922 assignees: Optional[Set[Union[User, str]]] = None, 923 body: Optional[str] = None, 924 due_date: Optional[datetime] = None, 925 milestone: Optional["Milestone"] = None, 926 ) -> "DesignReview": 927 """ 928 Create a new Design Review. 929 930 See https://hub.allspice.io/api/swagger#/repository/repoCreatePullRequest 931 932 :param title: Title of the Design Review 933 :param head: Branch or name of the branch to merge into the base branch 934 :param base: Branch or name of the branch to merge into 935 :param assignees: Optional. A list of users to assign this review. List can be of 936 User objects or of usernames. 937 :param body: An Optional Description for the Design Review. 938 :param due_date: An Optional Due date for the Design Review. 939 :param milestone: An Optional Milestone for the Design Review 940 :return: The created Design Review 941 """ 942 943 data: dict[str, Any] = { 944 "title": title, 945 } 946 947 if isinstance(head, Branch): 948 data["head"] = head.name 949 else: 950 data["head"] = head 951 if isinstance(base, Branch): 952 data["base"] = base.name 953 else: 954 data["base"] = base 955 if assignees: 956 data["assignees"] = [a.username if isinstance(a, User) else a for a in assignees] 957 if body: 958 data["body"] = body 959 if due_date: 960 data["due_date"] = Util.format_time(due_date) 961 if milestone: 962 data["milestone"] = milestone.id 963 964 result = self.allspice_client.requests_post( 965 self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name), 966 data=data, 967 ) 968 969 return DesignReview.parse_response(self.allspice_client, result)
Create a new Design Review.
See allspice.allspice.io/api/swagger#/repository/repoCreatePullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoCreatePullRequest
Parameters
- title: Title of the Design Review
- head: Branch or name of the branch to merge into the base branch
- base: Branch or name of the branch to merge into
- assignees: Optional. A list of users to assign this review. List can be of User objects or of usernames.
- body: An Optional Description for the Design Review.
- due_date: An Optional Due date for the Design Review.
- milestone: An Optional Milestone for the Design Review
Returns
The created Design Review
971 def create_milestone( 972 self, 973 title: str, 974 description: str, 975 due_date: Optional[str] = None, 976 state: str = "open", 977 ) -> "Milestone": 978 url = Repository.REPO_MILESTONES.format(owner=self.owner.username, repo=self.name) 979 data = {"title": title, "description": description, "state": state} 980 if due_date: 981 data["due_date"] = due_date 982 result = self.allspice_client.requests_post(url, data=data) 983 return Milestone.parse_response(self.allspice_client, result)
985 def create_gitea_hook(self, hook_url: str, events: List[str]): 986 url = f"/repos/{self.owner.username}/{self.name}/hooks" 987 data = { 988 "type": "gitea", 989 "config": {"content_type": "json", "url": hook_url}, 990 "events": events, 991 "active": True, 992 } 993 return self.allspice_client.requests_post(url, data=data)
1003 def is_collaborator(self, username) -> bool: 1004 if isinstance(username, User): 1005 username = username.username 1006 try: 1007 # returns 204 if its ok, 404 if its not 1008 self.allspice_client.requests_get( 1009 Repository.REPO_IS_COLLABORATOR % (self.owner.username, self.name, username) 1010 ) 1011 return True 1012 except Exception: 1013 return False
1015 def get_users_with_access(self) -> Sequence[User]: 1016 url = f"/repos/{self.owner.username}/{self.name}/collaborators" 1017 response = self.allspice_client.requests_get(url) 1018 collabs = [User.parse_response(self.allspice_client, user) for user in response] 1019 if isinstance(self.owner, User): 1020 return [*collabs, self.owner] 1021 else: 1022 # owner must be org 1023 teams = self.owner.get_teams() 1024 for team in teams: 1025 team_repos = team.get_repos() 1026 if self.name in [n.name for n in team_repos]: 1027 collabs += team.get_members() 1028 return collabs
1034 def transfer_ownership( 1035 self, 1036 new_owner: Union[User, Organization], 1037 new_teams: Set[Team] | FrozenSet[Team] = frozenset(), 1038 ): 1039 url = Repository.REPO_TRANSFER.format(owner=self.owner.username, repo=self.name) 1040 data: dict[str, Any] = {"new_owner": new_owner.username} 1041 if isinstance(new_owner, Organization): 1042 new_team_ids = [team.id for team in new_teams if team in new_owner.get_teams()] 1043 data["team_ids"] = new_team_ids 1044 self.allspice_client.requests_post(url, data=data) 1045 # TODO: make sure this instance is either updated or discarded
1047 def get_git_content( 1048 self, 1049 ref: Optional["Ref"] = None, 1050 commit: "Optional[Commit]" = None, 1051 ) -> List[Content]: 1052 """ 1053 Get the metadata for all files in the root directory. 1054 1055 https://hub.allspice.io/api/swagger#/repository/repoGetContentsList 1056 1057 :param ref: branch or commit to get content from 1058 :param commit: commit to get content from (deprecated) 1059 """ 1060 url = f"/repos/{self.owner.username}/{self.name}/contents" 1061 data = Util.data_params_for_ref(ref or commit) 1062 1063 result = [ 1064 Content.parse_response(self.allspice_client, f) 1065 for f in self.allspice_client.requests_get(url, data) 1066 ] 1067 return result
Get the metadata for all files in the root directory.
allspice.allspice.io/api/swagger#/repository/repoGetContentsList">https://huballspice.allspice.io/api/swagger#/repository/repoGetContentsList
Parameters
- ref: branch or commit to get content from
- commit: commit to get content from (deprecated)
1069 def get_tree(self, ref: Optional[Ref] = None, recursive: bool = False) -> List[GitEntry]: 1070 """ 1071 Get the repository's tree on a given ref. 1072 1073 By default, this will only return the top-level entries in the tree. If you want 1074 to get the entire tree, set `recursive` to True. 1075 1076 :param ref: The ref to get the tree from. If not provided, the default branch is used. 1077 :param recursive: Whether to get the entire tree or just the top-level entries. 1078 """ 1079 1080 ref = Util.data_params_for_ref(ref).get("ref", self.default_branch) 1081 url = self.REPO_GET_TREE.format(owner=self.owner.username, repo=self.name, ref=ref) 1082 params = {"recursive": recursive} 1083 results = self.allspice_client.requests_get_paginated(url, params=params) 1084 return [GitEntry.parse_response(self.allspice_client, result) for result in results]
Get the repository's tree on a given ref.
By default, this will only return the top-level entries in the tree. If you want
to get the entire tree, set recursive
to True.
Parameters
- ref: The ref to get the tree from. If not provided, the default branch is used.
- recursive: Whether to get the entire tree or just the top-level entries.
1086 def get_file_content( 1087 self, 1088 content: Content, 1089 ref: Optional[Ref] = None, 1090 commit: Optional[Commit] = None, 1091 ) -> Union[str, List["Content"]]: 1092 """https://hub.allspice.io/api/swagger#/repository/repoGetContents""" 1093 url = f"/repos/{self.owner.username}/{self.name}/contents/{content.path}" 1094 data = Util.data_params_for_ref(ref or commit) 1095 1096 if content.type == Content.FILE: 1097 return self.allspice_client.requests_get(url, data)["content"] 1098 else: 1099 return [ 1100 Content.parse_response(self.allspice_client, f) 1101 for f in self.allspice_client.requests_get(url, data) 1102 ]
allspice.allspice.io/api/swagger#/repository/repoGetContents">https://huballspice.allspice.io/api/swagger#/repository/repoGetContents
1104 def get_raw_file( 1105 self, 1106 file_path: str, 1107 ref: Optional[Ref] = None, 1108 ) -> bytes: 1109 """ 1110 Get the raw, binary data of a single file. 1111 1112 Note 1: if the file you are requesting is a text file, you might want to 1113 use .decode() on the result to get a string. For example: 1114 1115 content = repo.get_raw_file("file.txt").decode("utf-8") 1116 1117 Note 2: this method will store the entire file in memory. If you want 1118 to download a large file, you might want to use `download_to_file` 1119 instead. 1120 1121 See https://hub.allspice.io/api/swagger#/repository/repoGetRawFile 1122 1123 :param file_path: The path to the file to get. 1124 :param ref: The branch or commit to get the file from. If not provided, 1125 the default branch is used. 1126 """ 1127 1128 url = self.REPO_GET_RAW_FILE.format( 1129 owner=self.owner.username, 1130 repo=self.name, 1131 path=file_path, 1132 ) 1133 params = Util.data_params_for_ref(ref) 1134 return self.allspice_client.requests_get_raw(url, params=params)
Get the raw, binary data of a single file.
Note 1: if the file you are requesting is a text file, you might want to use .decode() on the result to get a string. For example:
content = repo.get_raw_file("file.txt").decode("utf-8")
Note 2: this method will store the entire file in memory. If you want
to download a large file, you might want to use download_to_file
instead.
See allspice.allspice.io/api/swagger#/repository/repoGetRawFile">https://huballspice.allspice.io/api/swagger#/repository/repoGetRawFile
Parameters
- file_path: The path to the file to get.
- ref: The branch or commit to get the file from. If not provided, the default branch is used.
1136 def download_to_file( 1137 self, 1138 file_path: str, 1139 io: IO, 1140 ref: Optional[Ref] = None, 1141 ) -> None: 1142 """ 1143 Download the binary data of a file to a file-like object. 1144 1145 Example: 1146 1147 with open("schematic.DSN", "wb") as f: 1148 Repository.download_to_file("Schematics/my_schematic.DSN", f) 1149 1150 :param file_path: The path to the file in the repository from the root 1151 of the repository. 1152 :param io: The file-like object to write the data to. 1153 """ 1154 1155 url = self.allspice_client._AllSpice__get_url( 1156 self.REPO_GET_RAW_FILE.format( 1157 owner=self.owner.username, 1158 repo=self.name, 1159 path=file_path, 1160 ) 1161 ) 1162 params = Util.data_params_for_ref(ref) 1163 response = self.allspice_client.requests.get( 1164 url, 1165 params=params, 1166 headers=self.allspice_client.headers, 1167 stream=True, 1168 ) 1169 1170 for chunk in response.iter_content(chunk_size=4096): 1171 if chunk: 1172 io.write(chunk)
Download the binary data of a file to a file-like object.
Example:
with open("schematic.DSN", "wb") as f:
Repository.download_to_file("Schematics/my_schematic.DSN", f)
Parameters
- file_path: The path to the file in the repository from the root of the repository.
- io: The file-like object to write the data to.
1174 def get_generated_json(self, content: Union[Content, str], ref: Optional[Ref] = None) -> dict: 1175 """ 1176 Get the json blob for a cad file if it exists, otherwise enqueue 1177 a new job and return a 503 status. 1178 1179 WARNING: This is still experimental and not recommended for critical 1180 applications. The structure and content of the returned dictionary can 1181 change at any time. 1182 1183 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON 1184 """ 1185 1186 if isinstance(content, Content): 1187 content = content.path 1188 1189 url = self.REPO_GET_ALLSPICE_JSON.format( 1190 owner=self.owner.username, 1191 repo=self.name, 1192 content=content, 1193 ) 1194 data = Util.data_params_for_ref(ref) 1195 return self.allspice_client.requests_get(url, data)
Get the json blob for a cad file if it exists, otherwise enqueue a new job and return a 503 status.
WARNING: This is still experimental and not recommended for critical applications. The structure and content of the returned dictionary can change at any time.
See allspice.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON">https://huballspice.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON
1197 def get_generated_svg(self, content: Union[Content, str], ref: Optional[Ref] = None) -> bytes: 1198 """ 1199 Get the svg blob for a cad file if it exists, otherwise enqueue 1200 a new job and return a 503 status. 1201 1202 WARNING: This is still experimental and not yet recommended for 1203 critical applications. The content of the returned svg can change 1204 at any time. 1205 1206 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG 1207 """ 1208 1209 if isinstance(content, Content): 1210 content = content.path 1211 1212 url = self.REPO_GET_ALLSPICE_SVG.format( 1213 owner=self.owner.username, 1214 repo=self.name, 1215 content=content, 1216 ) 1217 data = Util.data_params_for_ref(ref) 1218 return self.allspice_client.requests_get_raw(url, data)
Get the svg blob for a cad file if it exists, otherwise enqueue a new job and return a 503 status.
WARNING: This is still experimental and not yet recommended for critical applications. The content of the returned svg can change at any time.
See allspice.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG">https://huballspice.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG
1220 def create_file(self, file_path: str, content: str, data: Optional[dict] = None): 1221 """https://hub.allspice.io/api/swagger#/repository/repoCreateFile""" 1222 if not data: 1223 data = {} 1224 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1225 data.update({"content": content}) 1226 return self.allspice_client.requests_post(url, data)
allspice.allspice.io/api/swagger#/repository/repoCreateFile">https://huballspice.allspice.io/api/swagger#/repository/repoCreateFile
1228 def change_file(self, file_path: str, file_sha: str, content: str, data: Optional[dict] = None): 1229 """https://hub.allspice.io/api/swagger#/repository/repoCreateFile""" 1230 if not data: 1231 data = {} 1232 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1233 data.update({"sha": file_sha, "content": content}) 1234 return self.allspice_client.requests_put(url, data)
allspice.allspice.io/api/swagger#/repository/repoCreateFile">https://huballspice.allspice.io/api/swagger#/repository/repoCreateFile
1236 def delete_file(self, file_path: str, file_sha: str, data: Optional[dict] = None): 1237 """https://hub.allspice.io/api/swagger#/repository/repoDeleteFile""" 1238 if not data: 1239 data = {} 1240 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1241 data.update({"sha": file_sha}) 1242 return self.allspice_client.requests_delete(url, data)
allspice.allspice.io/api/swagger#/repository/repoDeleteFile">https://huballspice.allspice.io/api/swagger#/repository/repoDeleteFile
1244 def get_archive( 1245 self, 1246 ref: Ref = "main", 1247 archive_format: ArchiveFormat = ArchiveFormat.ZIP, 1248 ) -> bytes: 1249 """ 1250 Download all the files in a specific ref of a repository as a zip or tarball 1251 archive. 1252 1253 https://hub.allspice.io/api/swagger#/repository/repoGetArchive 1254 1255 :param ref: branch or commit to get content from, defaults to the "main" branch 1256 :param archive_format: zip or tar, defaults to zip 1257 """ 1258 1259 ref_string = Util.data_params_for_ref(ref)["ref"] 1260 url = self.REPO_GET_ARCHIVE.format( 1261 owner=self.owner.username, 1262 repo=self.name, 1263 ref=ref_string, 1264 format=archive_format.value, 1265 ) 1266 return self.allspice_client.requests_get_raw(url)
Download all the files in a specific ref of a repository as a zip or tarball archive.
allspice.allspice.io/api/swagger#/repository/repoGetArchive">https://huballspice.allspice.io/api/swagger#/repository/repoGetArchive
Parameters
- ref: branch or commit to get content from, defaults to the "main" branch
- archive_format: zip or tar, defaults to zip
1268 def get_topics(self) -> list[str]: 1269 """ 1270 Gets the list of topics on this repository. 1271 1272 See http://localhost:3000/api/swagger#/repository/repoListTopics 1273 """ 1274 1275 url = self.REPO_GET_TOPICS.format( 1276 owner=self.owner.username, 1277 repo=self.name, 1278 ) 1279 return self.allspice_client.requests_get(url)["topics"]
Gets the list of topics on this repository.
See http://localhost:3000/api/swagger#/repository/repoListTopics
1281 def add_topic(self, topic: str): 1282 """ 1283 Adds a topic to the repository. 1284 1285 See https://hub.allspice.io/api/swagger#/repository/repoAddTopic 1286 1287 :param topic: The topic to add. Topic names must consist only of 1288 lowercase letters, numnbers and dashes (-), and cannot start with 1289 dashes. Topic names also must be under 35 characters long. 1290 """ 1291 1292 url = self.REPO_ADD_TOPIC.format(owner=self.owner.username, repo=self.name, topic=topic) 1293 self.allspice_client.requests_put(url)
Adds a topic to the repository.
See allspice.allspice.io/api/swagger#/repository/repoAddTopic">https://huballspice.allspice.io/api/swagger#/repository/repoAddTopic
Parameters
- topic: The topic to add. Topic names must consist only of lowercase letters, numnbers and dashes (-), and cannot start with dashes. Topic names also must be under 35 characters long.
1295 def create_release( 1296 self, 1297 tag_name: str, 1298 name: Optional[str] = None, 1299 body: Optional[str] = None, 1300 draft: bool = False, 1301 ): 1302 """ 1303 Create a release for this repository. The release will be created for 1304 the tag with the given name. If there is no tag with this name, create 1305 the tag first. 1306 1307 See https://hub.allspice.io/api/swagger#/repository/repoCreateRelease 1308 """ 1309 1310 url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name) 1311 data = { 1312 "tag_name": tag_name, 1313 "draft": draft, 1314 } 1315 if name is not None: 1316 data["name"] = name 1317 if body is not None: 1318 data["body"] = body 1319 response = self.allspice_client.requests_post(url, data) 1320 return Release.parse_response(self.allspice_client, response, self)
Create a release for this repository. The release will be created for the tag with the given name. If there is no tag with this name, create the tag first.
See allspice.allspice.io/api/swagger#/repository/repoCreateRelease">https://huballspice.allspice.io/api/swagger#/repository/repoCreateRelease
1322 def get_releases( 1323 self, draft: Optional[bool] = None, pre_release: Optional[bool] = None 1324 ) -> List[Release]: 1325 """ 1326 Get the list of releases for this repository. 1327 1328 See https://hub.allspice.io/api/swagger#/repository/repoListReleases 1329 """ 1330 1331 data = {} 1332 1333 if draft is not None: 1334 data["draft"] = draft 1335 if pre_release is not None: 1336 data["pre-release"] = pre_release 1337 1338 url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name) 1339 responses = self.allspice_client.requests_get_paginated(url, params=data) 1340 1341 return [ 1342 Release.parse_response(self.allspice_client, response, self) for response in responses 1343 ]
Get the list of releases for this repository.
See allspice.allspice.io/api/swagger#/repository/repoListReleases">https://huballspice.allspice.io/api/swagger#/repository/repoListReleases
1345 def get_latest_release(self) -> Release: 1346 """ 1347 Get the latest release for this repository. 1348 1349 See https://hub.allspice.io/api/swagger#/repository/repoGetLatestRelease 1350 """ 1351 1352 url = self.REPO_GET_LATEST_RELEASE.format(owner=self.owner.username, repo=self.name) 1353 response = self.allspice_client.requests_get(url) 1354 release = Release.parse_response(self.allspice_client, response, self) 1355 return release
Get the latest release for this repository.
See allspice.allspice.io/api/swagger#/repository/repoGetLatestRelease">https://huballspice.allspice.io/api/swagger#/repository/repoGetLatestRelease
1357 def get_release_by_tag(self, tag: str) -> Release: 1358 """ 1359 Get a release by its tag. 1360 1361 See https://hub.allspice.io/api/swagger#/repository/repoGetReleaseByTag 1362 """ 1363 1364 url = self.REPO_GET_RELEASE_BY_TAG.format( 1365 owner=self.owner.username, repo=self.name, tag=tag 1366 ) 1367 response = self.allspice_client.requests_get(url) 1368 release = Release.parse_response(self.allspice_client, response, self) 1369 return release
Get a release by its tag.
See allspice.allspice.io/api/swagger#/repository/repoGetReleaseByTag">https://huballspice.allspice.io/api/swagger#/repository/repoGetReleaseByTag
1371 def get_commit_statuses( 1372 self, 1373 commit: Union[str, Commit], 1374 sort: Optional[CommitStatusSort] = None, 1375 state: Optional[CommitStatusState] = None, 1376 ) -> List[CommitStatus]: 1377 """ 1378 Get a list of statuses for a commit. 1379 1380 This is roughly equivalent to the Commit.get_statuses method, but this 1381 method allows you to sort and filter commits and is more convenient if 1382 you have a commit SHA and don't need to get the commit itself. 1383 1384 See https://hub.allspice.io/api/swagger#/repository/repoListStatuses 1385 """ 1386 1387 if isinstance(commit, Commit): 1388 commit = commit.sha 1389 1390 params = {} 1391 if sort is not None: 1392 params["sort"] = sort.value 1393 if state is not None: 1394 params["state"] = state.value 1395 1396 url = self.REPO_GET_COMMIT_STATUS.format( 1397 owner=self.owner.username, repo=self.name, sha=commit 1398 ) 1399 response = self.allspice_client.requests_get_paginated(url, params=params) 1400 return [CommitStatus.parse_response(self.allspice_client, status) for status in response]
Get a list of statuses for a commit.
This is roughly equivalent to the Commit.get_statuses method, but this method allows you to sort and filter commits and is more convenient if you have a commit SHA and don't need to get the commit itself.
See allspice.allspice.io/api/swagger#/repository/repoListStatuses">https://huballspice.allspice.io/api/swagger#/repository/repoListStatuses
1402 def create_commit_status( 1403 self, 1404 commit: Union[str, Commit], 1405 context: Optional[str] = None, 1406 description: Optional[str] = None, 1407 state: Optional[CommitStatusState] = None, 1408 target_url: Optional[str] = None, 1409 ) -> CommitStatus: 1410 """ 1411 Create a status on a commit. 1412 1413 See https://hub.allspice.io/api/swagger#/repository/repoCreateStatus 1414 """ 1415 1416 if isinstance(commit, Commit): 1417 commit = commit.sha 1418 1419 data = {} 1420 if context is not None: 1421 data["context"] = context 1422 if description is not None: 1423 data["description"] = description 1424 if state is not None: 1425 data["state"] = state.value 1426 if target_url is not None: 1427 data["target_url"] = target_url 1428 1429 url = self.REPO_GET_COMMIT_STATUS.format( 1430 owner=self.owner.username, repo=self.name, sha=commit 1431 ) 1432 response = self.allspice_client.requests_post(url, data=data) 1433 return CommitStatus.parse_response(self.allspice_client, response)
Create a status on a commit.
See allspice.allspice.io/api/swagger#/repository/repoCreateStatus">https://huballspice.allspice.io/api/swagger#/repository/repoCreateStatus
560 class ArchiveFormat(Enum): 561 """ 562 Archive formats for Repository.get_archive 563 """ 564 565 TAR = "tar.gz" 566 ZIP = "zip"
Archive formats for Repository.get_archive
Inherited Members
- enum.Enum
- name
- value
568 class CommitStatusSort(Enum): 569 """ 570 Sort order for Repository.get_commit_status 571 """ 572 573 OLDEST = "oldest" 574 RECENT_UPDATE = "recentupdate" 575 LEAST_UPDATE = "leastupdate" 576 LEAST_INDEX = "leastindex" 577 HIGHEST_INDEX = "highestindex"
Sort order for Repository.get_commit_status
Inherited Members
- enum.Enum
- name
- value
1442class Milestone(ApiObject): 1443 allow_merge_commits: Any 1444 allow_rebase: Any 1445 allow_rebase_explicit: Any 1446 allow_squash_merge: Any 1447 archived: Any 1448 closed_at: Any 1449 closed_issues: int 1450 created_at: str 1451 default_branch: Any 1452 description: str 1453 due_on: Any 1454 has_issues: Any 1455 has_pull_requests: Any 1456 has_wiki: Any 1457 id: int 1458 ignore_whitespace_conflicts: Any 1459 name: Any 1460 open_issues: int 1461 private: Any 1462 state: str 1463 title: str 1464 updated_at: str 1465 website: Any 1466 1467 API_OBJECT = """/repos/{owner}/{repo}/milestones/{number}""" # <owner, repo> 1468 1469 def __init__(self, allspice_client): 1470 super().__init__(allspice_client) 1471 1472 def __eq__(self, other): 1473 if not isinstance(other, Milestone): 1474 return False 1475 return self.allspice_client == other.allspice_client and self.id == other.id 1476 1477 def __hash__(self): 1478 return hash(self.allspice_client) ^ hash(self.id) 1479 1480 _fields_to_parsers: ClassVar[dict] = { 1481 "closed_at": lambda _, t: Util.convert_time(t), 1482 "due_on": lambda _, t: Util.convert_time(t), 1483 } 1484 1485 _patchable_fields: ClassVar[set[str]] = { 1486 "allow_merge_commits", 1487 "allow_rebase", 1488 "allow_rebase_explicit", 1489 "allow_squash_merge", 1490 "archived", 1491 "default_branch", 1492 "description", 1493 "has_issues", 1494 "has_pull_requests", 1495 "has_wiki", 1496 "ignore_whitespace_conflicts", 1497 "name", 1498 "private", 1499 "website", 1500 } 1501 1502 @classmethod 1503 def request(cls, allspice_client, owner: str, repo: str, number: str): 1504 return cls._request(allspice_client, {"owner": owner, "repo": repo, "number": number})
1507class Attachment(ReadonlyApiObject): 1508 """ 1509 An asset attached to a comment. 1510 1511 You cannot edit or delete the attachment from this object - see the instance methods 1512 Comment.edit_attachment and delete_attachment for that. 1513 """ 1514 1515 browser_download_url: str 1516 created_at: str 1517 download_count: int 1518 id: int 1519 name: str 1520 size: int 1521 uuid: str 1522 1523 def __init__(self, allspice_client): 1524 super().__init__(allspice_client) 1525 1526 def __eq__(self, other): 1527 if not isinstance(other, Attachment): 1528 return False 1529 1530 return self.uuid == other.uuid 1531 1532 def __hash__(self): 1533 return hash(self.uuid) 1534 1535 def download_to_file(self, io: IO): 1536 """ 1537 Download the raw, binary data of this Attachment to a file-like object. 1538 1539 Example: 1540 1541 with open("my_file.zip", "wb") as f: 1542 attachment.download_to_file(f) 1543 1544 :param io: The file-like object to write the data to. 1545 """ 1546 1547 response = self.allspice_client.requests.get( 1548 self.browser_download_url, 1549 headers=self.allspice_client.headers, 1550 stream=True, 1551 ) 1552 # 4kb chunks 1553 for chunk in response.iter_content(chunk_size=4096): 1554 if chunk: 1555 io.write(chunk)
An asset attached to a comment.
You cannot edit or delete the attachment from this object - see the instance methods Comment.edit_attachment and delete_attachment for that.
1535 def download_to_file(self, io: IO): 1536 """ 1537 Download the raw, binary data of this Attachment to a file-like object. 1538 1539 Example: 1540 1541 with open("my_file.zip", "wb") as f: 1542 attachment.download_to_file(f) 1543 1544 :param io: The file-like object to write the data to. 1545 """ 1546 1547 response = self.allspice_client.requests.get( 1548 self.browser_download_url, 1549 headers=self.allspice_client.headers, 1550 stream=True, 1551 ) 1552 # 4kb chunks 1553 for chunk in response.iter_content(chunk_size=4096): 1554 if chunk: 1555 io.write(chunk)
Download the raw, binary data of this Attachment to a file-like object.
Example:
with open("my_file.zip", "wb") as f:
attachment.download_to_file(f)
Parameters
- io: The file-like object to write the data to.
Inherited Members
1558class Comment(ApiObject): 1559 assets: List[Union[Any, Dict[str, Union[int, str]]]] 1560 body: str 1561 created_at: datetime 1562 html_url: str 1563 id: int 1564 issue_url: str 1565 original_author: str 1566 original_author_id: int 1567 pull_request_url: str 1568 updated_at: datetime 1569 user: User 1570 1571 API_OBJECT = """/repos/{owner}/{repo}/issues/comments/{id}""" 1572 GET_ATTACHMENTS_PATH = """/repos/{owner}/{repo}/issues/comments/{id}/assets""" 1573 ATTACHMENT_PATH = """/repos/{owner}/{repo}/issues/comments/{id}/assets/{attachment_id}""" 1574 1575 def __init__(self, allspice_client): 1576 super().__init__(allspice_client) 1577 1578 def __eq__(self, other): 1579 if not isinstance(other, Comment): 1580 return False 1581 return self.repository == other.repository and self.id == other.id 1582 1583 def __hash__(self): 1584 return hash(self.repository) ^ hash(self.id) 1585 1586 @classmethod 1587 def request(cls, allspice_client, owner: str, repo: str, id: str) -> "Comment": 1588 return cls._request(allspice_client, {"owner": owner, "repo": repo, "id": id}) 1589 1590 _fields_to_parsers: ClassVar[dict] = { 1591 "user": lambda allspice_client, r: User.parse_response(allspice_client, r), 1592 "created_at": lambda _, t: Util.convert_time(t), 1593 "updated_at": lambda _, t: Util.convert_time(t), 1594 } 1595 1596 _patchable_fields: ClassVar[set[str]] = {"body"} 1597 1598 @property 1599 def parent_url(self) -> str: 1600 """URL of the parent of this comment (the issue or the pull request)""" 1601 1602 if self.issue_url is not None and self.issue_url != "": 1603 return self.issue_url 1604 else: 1605 return self.pull_request_url 1606 1607 @cached_property 1608 def repository(self) -> Repository: 1609 """The repository this comment was posted on.""" 1610 1611 owner_name, repo_name = self.parent_url.split("/")[-4:-2] 1612 return Repository.request(self.allspice_client, owner_name, repo_name) 1613 1614 def __fields_for_path(self): 1615 return { 1616 "owner": self.repository.owner.username, 1617 "repo": self.repository.name, 1618 "id": self.id, 1619 } 1620 1621 def commit(self): 1622 self._commit(self.__fields_for_path()) 1623 1624 def delete(self): 1625 self.allspice_client.requests_delete(self.API_OBJECT.format(**self.__fields_for_path())) 1626 self.deleted = True 1627 1628 def get_attachments(self) -> List[Attachment]: 1629 """ 1630 Get all attachments on this comment. This returns Attachment objects, which 1631 contain a link to download the attachment. 1632 1633 https://hub.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments 1634 """ 1635 1636 results = self.allspice_client.requests_get( 1637 self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()) 1638 ) 1639 return [Attachment.parse_response(self.allspice_client, result) for result in results] 1640 1641 def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment: 1642 """ 1643 Create an attachment on this comment. 1644 1645 https://hub.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment 1646 1647 :param file: The file to attach. This should be a file-like object. 1648 :param name: The name of the file. If not provided, the name of the file will be 1649 used. 1650 :return: The created attachment. 1651 """ 1652 1653 args: dict[str, Any] = { 1654 "files": {"attachment": file}, 1655 } 1656 if name is not None: 1657 args["params"] = {"name": name} 1658 1659 result = self.allspice_client.requests_post( 1660 self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()), 1661 **args, 1662 ) 1663 return Attachment.parse_response(self.allspice_client, result) 1664 1665 def edit_attachment(self, attachment: Attachment, data: dict) -> Attachment: 1666 """ 1667 Edit an attachment. 1668 1669 The list of params that can be edited is available at 1670 https://hub.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment 1671 1672 :param attachment: The attachment to be edited 1673 :param data: The data parameter should be a dictionary of the fields to edit. 1674 :return: The edited attachment 1675 """ 1676 1677 args = { 1678 **self.__fields_for_path(), 1679 "attachment_id": attachment.id, 1680 } 1681 result = self.allspice_client.requests_patch( 1682 self.ATTACHMENT_PATH.format(**args), 1683 data=data, 1684 ) 1685 return Attachment.parse_response(self.allspice_client, result) 1686 1687 def delete_attachment(self, attachment: Attachment): 1688 """https://hub.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment""" 1689 1690 args = { 1691 **self.__fields_for_path(), 1692 "attachment_id": attachment.id, 1693 } 1694 self.allspice_client.requests_delete(self.ATTACHMENT_PATH.format(**args)) 1695 attachment.deleted = True
1598 @property 1599 def parent_url(self) -> str: 1600 """URL of the parent of this comment (the issue or the pull request)""" 1601 1602 if self.issue_url is not None and self.issue_url != "": 1603 return self.issue_url 1604 else: 1605 return self.pull_request_url
URL of the parent of this comment (the issue or the pull request)
1607 @cached_property 1608 def repository(self) -> Repository: 1609 """The repository this comment was posted on.""" 1610 1611 owner_name, repo_name = self.parent_url.split("/")[-4:-2] 1612 return Repository.request(self.allspice_client, owner_name, repo_name)
The repository this comment was posted on.
1628 def get_attachments(self) -> List[Attachment]: 1629 """ 1630 Get all attachments on this comment. This returns Attachment objects, which 1631 contain a link to download the attachment. 1632 1633 https://hub.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments 1634 """ 1635 1636 results = self.allspice_client.requests_get( 1637 self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()) 1638 ) 1639 return [Attachment.parse_response(self.allspice_client, result) for result in results]
Get all attachments on this comment. This returns Attachment objects, which contain a link to download the attachment.
allspice.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments">https://huballspice.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments
1641 def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment: 1642 """ 1643 Create an attachment on this comment. 1644 1645 https://hub.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment 1646 1647 :param file: The file to attach. This should be a file-like object. 1648 :param name: The name of the file. If not provided, the name of the file will be 1649 used. 1650 :return: The created attachment. 1651 """ 1652 1653 args: dict[str, Any] = { 1654 "files": {"attachment": file}, 1655 } 1656 if name is not None: 1657 args["params"] = {"name": name} 1658 1659 result = self.allspice_client.requests_post( 1660 self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()), 1661 **args, 1662 ) 1663 return Attachment.parse_response(self.allspice_client, result)
Create an attachment on this comment.
allspice.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment
Parameters
- file: The file to attach. This should be a file-like object.
- name: The name of the file. If not provided, the name of the file will be used.
Returns
The created attachment.
1665 def edit_attachment(self, attachment: Attachment, data: dict) -> Attachment: 1666 """ 1667 Edit an attachment. 1668 1669 The list of params that can be edited is available at 1670 https://hub.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment 1671 1672 :param attachment: The attachment to be edited 1673 :param data: The data parameter should be a dictionary of the fields to edit. 1674 :return: The edited attachment 1675 """ 1676 1677 args = { 1678 **self.__fields_for_path(), 1679 "attachment_id": attachment.id, 1680 } 1681 result = self.allspice_client.requests_patch( 1682 self.ATTACHMENT_PATH.format(**args), 1683 data=data, 1684 ) 1685 return Attachment.parse_response(self.allspice_client, result)
Edit an attachment.
The list of params that can be edited is available at allspice.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment
Parameters
- attachment: The attachment to be edited
- data: The data parameter should be a dictionary of the fields to edit.
Returns
The edited attachment
1687 def delete_attachment(self, attachment: Attachment): 1688 """https://hub.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment""" 1689 1690 args = { 1691 **self.__fields_for_path(), 1692 "attachment_id": attachment.id, 1693 } 1694 self.allspice_client.requests_delete(self.ATTACHMENT_PATH.format(**args)) 1695 attachment.deleted = True
allspice.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment
1698class Commit(ReadonlyApiObject): 1699 author: User 1700 commit: Dict[str, Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]] 1701 committer: Dict[str, Union[int, str, bool]] 1702 created: str 1703 files: List[Dict[str, str]] 1704 html_url: str 1705 inner_commit: Dict[str, Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]] 1706 parents: List[Union[Dict[str, str], Any]] 1707 sha: str 1708 stats: Dict[str, int] 1709 url: str 1710 1711 API_OBJECT = """/repos/{owner}/{repo}/commits/{sha}""" 1712 COMMIT_GET_STATUS = """/repos/{owner}/{repo}/commits/{sha}/status""" 1713 COMMIT_GET_STATUSES = """/repos/{owner}/{repo}/commits/{sha}/statuses""" 1714 1715 # Regex to extract owner and repo names from the url property 1716 URL_REGEXP = re.compile(r"/repos/([^/]+)/([^/]+)/git/commits") 1717 1718 def __init__(self, allspice_client): 1719 super().__init__(allspice_client) 1720 1721 _fields_to_parsers: ClassVar[dict] = { 1722 # NOTE: api may return None for commiters that are no allspice users 1723 "author": lambda allspice_client, u: ( 1724 User.parse_response(allspice_client, u) if u else None 1725 ) 1726 } 1727 1728 def __eq__(self, other): 1729 if not isinstance(other, Commit): 1730 return False 1731 return self.sha == other.sha 1732 1733 def __hash__(self): 1734 return hash(self.sha) 1735 1736 @classmethod 1737 def parse_response(cls, allspice_client, result) -> "Commit": 1738 commit_cache = result["commit"] 1739 api_object = cls(allspice_client) 1740 cls._initialize(allspice_client, api_object, result) 1741 # inner_commit for legacy reasons 1742 Commit._add_read_property("inner_commit", commit_cache, api_object) 1743 return api_object 1744 1745 def get_status(self) -> CommitCombinedStatus: 1746 """ 1747 Get a combined status consisting of all statues on this commit. 1748 1749 Note that the returned object is a CommitCombinedStatus object, which 1750 also contains a list of all statuses on the commit. 1751 1752 https://hub.allspice.io/api/swagger#/repository/repoGetCommitStatus 1753 """ 1754 1755 result = self.allspice_client.requests_get( 1756 self.COMMIT_GET_STATUS.format(**self._fields_for_path) 1757 ) 1758 return CommitCombinedStatus.parse_response(self.allspice_client, result) 1759 1760 def get_statuses(self) -> List[CommitStatus]: 1761 """ 1762 Get a list of all statuses on this commit. 1763 1764 https://hub.allspice.io/api/swagger#/repository/repoListCommitStatuses 1765 """ 1766 1767 results = self.allspice_client.requests_get( 1768 self.COMMIT_GET_STATUSES.format(**self._fields_for_path) 1769 ) 1770 return [CommitStatus.parse_response(self.allspice_client, result) for result in results] 1771 1772 @cached_property 1773 def _fields_for_path(self) -> dict[str, str]: 1774 matches = self.URL_REGEXP.search(self.url) 1775 if not matches: 1776 raise ValueError(f"Invalid commit URL: {self.url}") 1777 1778 return { 1779 "owner": matches.group(1), 1780 "repo": matches.group(2), 1781 "sha": self.sha, 1782 }
1736 @classmethod 1737 def parse_response(cls, allspice_client, result) -> "Commit": 1738 commit_cache = result["commit"] 1739 api_object = cls(allspice_client) 1740 cls._initialize(allspice_client, api_object, result) 1741 # inner_commit for legacy reasons 1742 Commit._add_read_property("inner_commit", commit_cache, api_object) 1743 return api_object
1745 def get_status(self) -> CommitCombinedStatus: 1746 """ 1747 Get a combined status consisting of all statues on this commit. 1748 1749 Note that the returned object is a CommitCombinedStatus object, which 1750 also contains a list of all statuses on the commit. 1751 1752 https://hub.allspice.io/api/swagger#/repository/repoGetCommitStatus 1753 """ 1754 1755 result = self.allspice_client.requests_get( 1756 self.COMMIT_GET_STATUS.format(**self._fields_for_path) 1757 ) 1758 return CommitCombinedStatus.parse_response(self.allspice_client, result)
Get a combined status consisting of all statues on this commit.
Note that the returned object is a CommitCombinedStatus object, which also contains a list of all statuses on the commit.
allspice.allspice.io/api/swagger#/repository/repoGetCommitStatus">https://huballspice.allspice.io/api/swagger#/repository/repoGetCommitStatus
1760 def get_statuses(self) -> List[CommitStatus]: 1761 """ 1762 Get a list of all statuses on this commit. 1763 1764 https://hub.allspice.io/api/swagger#/repository/repoListCommitStatuses 1765 """ 1766 1767 results = self.allspice_client.requests_get( 1768 self.COMMIT_GET_STATUSES.format(**self._fields_for_path) 1769 ) 1770 return [CommitStatus.parse_response(self.allspice_client, result) for result in results]
Get a list of all statuses on this commit.
allspice.allspice.io/api/swagger#/repository/repoListCommitStatuses">https://huballspice.allspice.io/api/swagger#/repository/repoListCommitStatuses
Inherited Members
1785class CommitStatusState(Enum): 1786 PENDING = "pending" 1787 SUCCESS = "success" 1788 ERROR = "error" 1789 FAILURE = "failure" 1790 WARNING = "warning" 1791 1792 @classmethod 1793 def try_init(cls, value: str) -> CommitStatusState | str: 1794 """ 1795 Try converting a string to the enum, and if that fails, return the 1796 string itself. 1797 """ 1798 1799 try: 1800 return cls(value) 1801 except ValueError: 1802 return value
1792 @classmethod 1793 def try_init(cls, value: str) -> CommitStatusState | str: 1794 """ 1795 Try converting a string to the enum, and if that fails, return the 1796 string itself. 1797 """ 1798 1799 try: 1800 return cls(value) 1801 except ValueError: 1802 return value
Try converting a string to the enum, and if that fails, return the string itself.
Inherited Members
- enum.Enum
- name
- value
1805class CommitStatus(ReadonlyApiObject): 1806 context: str 1807 created_at: str 1808 creator: User 1809 description: str 1810 id: int 1811 status: CommitStatusState 1812 target_url: str 1813 updated_at: str 1814 url: str 1815 1816 def __init__(self, allspice_client): 1817 super().__init__(allspice_client) 1818 1819 _fields_to_parsers: ClassVar[dict] = { 1820 # Gitea/ASH doesn't actually validate that the status is a "valid" 1821 # status, so we can expect empty or unknown strings in the status field. 1822 "status": lambda _, s: CommitStatusState.try_init(s), 1823 "creator": lambda allspice_client, u: ( 1824 User.parse_response(allspice_client, u) if u else None 1825 ), 1826 } 1827 1828 def __eq__(self, other): 1829 if not isinstance(other, CommitStatus): 1830 return False 1831 return self.id == other.id 1832 1833 def __hash__(self): 1834 return hash(self.id)
Inherited Members
1837class CommitCombinedStatus(ReadonlyApiObject): 1838 commit_url: str 1839 repository: Repository 1840 sha: str 1841 state: CommitStatusState 1842 statuses: List["CommitStatus"] 1843 total_count: int 1844 url: str 1845 1846 def __init__(self, allspice_client): 1847 super().__init__(allspice_client) 1848 1849 _fields_to_parsers: ClassVar[dict] = { 1850 # See CommitStatus 1851 "state": lambda _, s: CommitStatusState.try_init(s), 1852 "statuses": lambda allspice_client, statuses: [ 1853 CommitStatus.parse_response(allspice_client, status) for status in statuses 1854 ], 1855 "repository": lambda allspice_client, r: Repository.parse_response(allspice_client, r), 1856 } 1857 1858 def __eq__(self, other): 1859 if not isinstance(other, CommitCombinedStatus): 1860 return False 1861 return self.sha == other.sha 1862 1863 def __hash__(self): 1864 return hash(self.sha) 1865 1866 @classmethod 1867 def parse_response(cls, allspice_client, result) -> "CommitCombinedStatus": 1868 api_object = cls(allspice_client) 1869 cls._initialize(allspice_client, api_object, result) 1870 return api_object
Inherited Members
1873class Issue(ApiObject): 1874 assets: List[Any] 1875 assignee: Any 1876 assignees: Any 1877 body: str 1878 closed_at: Any 1879 comments: int 1880 created_at: str 1881 due_date: Any 1882 html_url: str 1883 id: int 1884 is_locked: bool 1885 labels: List[Any] 1886 milestone: Optional["Milestone"] 1887 number: int 1888 original_author: str 1889 original_author_id: int 1890 pin_order: int 1891 pull_request: Any 1892 ref: str 1893 repository: Dict[str, Union[int, str]] 1894 state: str 1895 title: str 1896 updated_at: str 1897 url: str 1898 user: User 1899 1900 API_OBJECT = """/repos/{owner}/{repo}/issues/{index}""" # <owner, repo, index> 1901 GET_TIME = """/repos/%s/%s/issues/%s/times""" # <owner, repo, index> 1902 GET_COMMENTS = """/repos/{owner}/{repo}/issues/{index}/comments""" 1903 CREATE_ISSUE = """/repos/{owner}/{repo}/issues""" 1904 1905 OPENED = "open" 1906 CLOSED = "closed" 1907 1908 def __init__(self, allspice_client): 1909 super().__init__(allspice_client) 1910 1911 def __eq__(self, other): 1912 if not isinstance(other, Issue): 1913 return False 1914 return self.repository == other.repository and self.id == other.id 1915 1916 def __hash__(self): 1917 return hash(self.repository) ^ hash(self.id) 1918 1919 _fields_to_parsers: ClassVar[dict] = { 1920 "milestone": lambda allspice_client, m: Milestone.parse_response(allspice_client, m), 1921 "user": lambda allspice_client, u: User.parse_response(allspice_client, u), 1922 "assignee": lambda allspice_client, u: User.parse_response(allspice_client, u), 1923 "assignees": lambda allspice_client, us: [ 1924 User.parse_response(allspice_client, u) for u in us 1925 ], 1926 "state": lambda _, s: (Issue.CLOSED if s == "closed" else Issue.OPENED), 1927 } 1928 1929 _parsers_to_fields: ClassVar[dict] = { 1930 "milestone": lambda m: m.id, 1931 } 1932 1933 _patchable_fields: ClassVar[set[str]] = { 1934 "assignee", 1935 "assignees", 1936 "body", 1937 "due_date", 1938 "milestone", 1939 "state", 1940 "title", 1941 } 1942 1943 def commit(self): 1944 args = { 1945 "owner": self.repository.owner.username, 1946 "repo": self.repository.name, 1947 "index": self.number, 1948 } 1949 self._commit(args) 1950 1951 @classmethod 1952 def request(cls, allspice_client, owner: str, repo: str, number: str): 1953 api_object = cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number}) 1954 # The repository in the response is a RepositoryMeta object, so request 1955 # the full repository object and add it to the issue object. 1956 repository = Repository.request(allspice_client, owner, repo) 1957 setattr(api_object, "_repository", repository) 1958 # For legacy reasons 1959 cls._add_read_property("repo", repository, api_object) 1960 return api_object 1961 1962 @classmethod 1963 def create_issue(cls, allspice_client, repo: Repository, title: str, body: str = ""): 1964 args = {"owner": repo.owner.username, "repo": repo.name} 1965 data = {"title": title, "body": body} 1966 result = allspice_client.requests_post(Issue.CREATE_ISSUE.format(**args), data=data) 1967 issue = Issue.parse_response(allspice_client, result) 1968 setattr(issue, "_repository", repo) 1969 cls._add_read_property("repo", repo, issue) 1970 return issue 1971 1972 @property 1973 def owner(self) -> Organization | User: 1974 return self.repository.owner 1975 1976 def get_time_sum(self, user: User) -> int: 1977 results = self.allspice_client.requests_get( 1978 Issue.GET_TIME % (self.owner.username, self.repository.name, self.number) 1979 ) 1980 return sum(result["time"] for result in results if result and result["user_id"] == user.id) 1981 1982 def get_times(self) -> Optional[Dict]: 1983 return self.allspice_client.requests_get( 1984 Issue.GET_TIME % (self.owner.username, self.repository.name, self.number) 1985 ) 1986 1987 def delete_time(self, time_id: str): 1988 path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times/{time_id}" 1989 self.allspice_client.requests_delete(path) 1990 1991 def add_time(self, time: int, created: Optional[str] = None, user_name: Optional[User] = None): 1992 path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times" 1993 self.allspice_client.requests_post( 1994 path, data={"created": created, "time": int(time), "user_name": user_name} 1995 ) 1996 1997 def get_comments(self) -> List[Comment]: 1998 """https://hub.allspice.io/api/swagger#/issue/issueGetComments""" 1999 2000 results = self.allspice_client.requests_get( 2001 self.GET_COMMENTS.format( 2002 owner=self.owner.username, repo=self.repository.name, index=self.number 2003 ) 2004 ) 2005 2006 return [Comment.parse_response(self.allspice_client, result) for result in results] 2007 2008 def create_comment(self, body: str) -> Comment: 2009 """https://hub.allspice.io/api/swagger#/issue/issueCreateComment""" 2010 2011 path = self.GET_COMMENTS.format( 2012 owner=self.owner.username, repo=self.repository.name, index=self.number 2013 ) 2014 2015 response = self.allspice_client.requests_post(path, data={"body": body}) 2016 return Comment.parse_response(self.allspice_client, response)
1951 @classmethod 1952 def request(cls, allspice_client, owner: str, repo: str, number: str): 1953 api_object = cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number}) 1954 # The repository in the response is a RepositoryMeta object, so request 1955 # the full repository object and add it to the issue object. 1956 repository = Repository.request(allspice_client, owner, repo) 1957 setattr(api_object, "_repository", repository) 1958 # For legacy reasons 1959 cls._add_read_property("repo", repository, api_object) 1960 return api_object
1962 @classmethod 1963 def create_issue(cls, allspice_client, repo: Repository, title: str, body: str = ""): 1964 args = {"owner": repo.owner.username, "repo": repo.name} 1965 data = {"title": title, "body": body} 1966 result = allspice_client.requests_post(Issue.CREATE_ISSUE.format(**args), data=data) 1967 issue = Issue.parse_response(allspice_client, result) 1968 setattr(issue, "_repository", repo) 1969 cls._add_read_property("repo", repo, issue) 1970 return issue
1991 def add_time(self, time: int, created: Optional[str] = None, user_name: Optional[User] = None): 1992 path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times" 1993 self.allspice_client.requests_post( 1994 path, data={"created": created, "time": int(time), "user_name": user_name} 1995 )
1997 def get_comments(self) -> List[Comment]: 1998 """https://hub.allspice.io/api/swagger#/issue/issueGetComments""" 1999 2000 results = self.allspice_client.requests_get( 2001 self.GET_COMMENTS.format( 2002 owner=self.owner.username, repo=self.repository.name, index=self.number 2003 ) 2004 ) 2005 2006 return [Comment.parse_response(self.allspice_client, result) for result in results]
allspice.allspice.io/api/swagger#/issue/issueGetComments">https://huballspice.allspice.io/api/swagger#/issue/issueGetComments
2008 def create_comment(self, body: str) -> Comment: 2009 """https://hub.allspice.io/api/swagger#/issue/issueCreateComment""" 2010 2011 path = self.GET_COMMENTS.format( 2012 owner=self.owner.username, repo=self.repository.name, index=self.number 2013 ) 2014 2015 response = self.allspice_client.requests_post(path, data={"body": body}) 2016 return Comment.parse_response(self.allspice_client, response)
allspice.allspice.io/api/swagger#/issue/issueCreateComment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateComment
2019class DesignReview(ApiObject): 2020 """ 2021 A Design Review. See 2022 https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest. 2023 2024 Note: The base and head fields are not `Branch` objects - they are plain strings 2025 referring to the branch names. This is because DRs can exist for branches that have 2026 been deleted, which don't have an associated `Branch` object from the API. You can use 2027 the `Repository.get_branch` method to get a `Branch` object for a branch if you know 2028 it exists. 2029 """ 2030 2031 allow_maintainer_edit: bool 2032 allow_maintainer_edits: Any 2033 assignee: User 2034 assignees: List["User"] 2035 base: str 2036 body: str 2037 closed_at: Any 2038 comments: int 2039 created_at: str 2040 diff_url: str 2041 due_date: Optional[str] 2042 head: str 2043 html_url: str 2044 id: int 2045 is_locked: bool 2046 labels: List[Any] 2047 merge_base: str 2048 merge_commit_sha: Any 2049 mergeable: bool 2050 merged: bool 2051 merged_at: Any 2052 merged_by: Any 2053 milestone: Any 2054 number: int 2055 patch_url: str 2056 pin_order: int 2057 repository: Optional["Repository"] 2058 requested_reviewers: Any 2059 state: str 2060 title: str 2061 updated_at: str 2062 url: str 2063 user: User 2064 2065 API_OBJECT = "/repos/{owner}/{repo}/pulls/{index}" 2066 MERGE_DESIGN_REVIEW = "/repos/{owner}/{repo}/pulls/{index}/merge" 2067 GET_COMMENTS = "/repos/{owner}/{repo}/issues/{index}/comments" 2068 2069 OPEN = "open" 2070 CLOSED = "closed" 2071 2072 class MergeType(Enum): 2073 MERGE = "merge" 2074 REBASE = "rebase" 2075 REBASE_MERGE = "rebase-merge" 2076 SQUASH = "squash" 2077 MANUALLY_MERGED = "manually-merged" 2078 2079 def __init__(self, allspice_client): 2080 super().__init__(allspice_client) 2081 2082 def __eq__(self, other): 2083 if not isinstance(other, DesignReview): 2084 return False 2085 return self.repository == other.repository and self.id == other.id 2086 2087 def __hash__(self): 2088 return hash(self.repository) ^ hash(self.id) 2089 2090 @classmethod 2091 def parse_response(cls, allspice_client, result) -> "DesignReview": 2092 api_object = super().parse_response(allspice_client, result) 2093 cls._add_read_property( 2094 "repository", 2095 Repository.parse_response(allspice_client, result["base"]["repo"]), 2096 api_object, 2097 ) 2098 2099 return api_object 2100 2101 @classmethod 2102 def request(cls, allspice_client, owner: str, repo: str, number: str): 2103 """See https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest""" 2104 return cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number}) 2105 2106 _fields_to_parsers: ClassVar[dict] = { 2107 "assignee": lambda allspice_client, u: User.parse_response(allspice_client, u), 2108 "assignees": lambda allspice_client, us: [ 2109 User.parse_response(allspice_client, u) for u in us 2110 ], 2111 "base": lambda _, b: b["ref"], 2112 "head": lambda _, h: h["ref"], 2113 "merged_by": lambda allspice_client, u: User.parse_response(allspice_client, u), 2114 "milestone": lambda allspice_client, m: Milestone.parse_response(allspice_client, m), 2115 "user": lambda allspice_client, u: User.parse_response(allspice_client, u), 2116 } 2117 2118 _patchable_fields: ClassVar[set[str]] = { 2119 "allow_maintainer_edits", 2120 "assignee", 2121 "assignees", 2122 "base", 2123 "body", 2124 "due_date", 2125 "milestone", 2126 "state", 2127 "title", 2128 } 2129 2130 _parsers_to_fields: ClassVar[dict] = { 2131 "assignee": lambda u: u.username, 2132 "assignees": lambda us: [u.username for u in us], 2133 "base": lambda b: b.name if isinstance(b, Branch) else b, 2134 "milestone": lambda m: m.id, 2135 } 2136 2137 def commit(self): 2138 data = self.get_dirty_fields() 2139 if "due_date" in data and data["due_date"] is None: 2140 data["unset_due_date"] = True 2141 2142 args = { 2143 "owner": self.repository.owner.username, 2144 "repo": self.repository.name, 2145 "index": self.number, 2146 } 2147 self._commit(args, data) 2148 2149 def merge(self, merge_type: MergeType): 2150 """ 2151 Merge the pull request. See 2152 https://hub.allspice.io/api/swagger#/repository/repoMergePullRequest 2153 2154 :param merge_type: The type of merge to perform. See the MergeType enum. 2155 """ 2156 2157 self.allspice_client.requests_put( 2158 self.MERGE_DESIGN_REVIEW.format( 2159 owner=self.repository.owner.username, 2160 repo=self.repository.name, 2161 index=self.number, 2162 ), 2163 data={"Do": merge_type.value}, 2164 ) 2165 2166 def get_comments(self) -> List[Comment]: 2167 """ 2168 Get the comments on this pull request, but not specifically on a review. 2169 2170 https://hub.allspice.io/api/swagger#/issue/issueGetComments 2171 2172 :return: A list of comments on this pull request. 2173 """ 2174 2175 results = self.allspice_client.requests_get( 2176 self.GET_COMMENTS.format( 2177 owner=self.repository.owner.username, 2178 repo=self.repository.name, 2179 index=self.number, 2180 ) 2181 ) 2182 return [Comment.parse_response(self.allspice_client, result) for result in results] 2183 2184 def create_comment(self, body: str): 2185 """ 2186 Create a comment on this pull request. This uses the same endpoint as the 2187 comments on issues, and will not be associated with any reviews. 2188 2189 https://hub.allspice.io/api/swagger#/issue/issueCreateComment 2190 2191 :param body: The body of the comment. 2192 :return: The comment that was created. 2193 """ 2194 2195 result = self.allspice_client.requests_post( 2196 self.GET_COMMENTS.format( 2197 owner=self.repository.owner.username, 2198 repo=self.repository.name, 2199 index=self.number, 2200 ), 2201 data={"body": body}, 2202 ) 2203 return Comment.parse_response(self.allspice_client, result)
A Design Review. See allspice.allspice.io/api/swagger#/repository/repoGetPullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoGetPullRequest.
Note: The base and head fields are not Branch
objects - they are plain strings
referring to the branch names. This is because DRs can exist for branches that have
been deleted, which don't have an associated Branch
object from the API. You can use
the Repository.get_branch
method to get a Branch
object for a branch if you know
it exists.
2090 @classmethod 2091 def parse_response(cls, allspice_client, result) -> "DesignReview": 2092 api_object = super().parse_response(allspice_client, result) 2093 cls._add_read_property( 2094 "repository", 2095 Repository.parse_response(allspice_client, result["base"]["repo"]), 2096 api_object, 2097 ) 2098 2099 return api_object
2101 @classmethod 2102 def request(cls, allspice_client, owner: str, repo: str, number: str): 2103 """See https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest""" 2104 return cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number})
See allspice.allspice.io/api/swagger#/repository/repoGetPullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoGetPullRequest
2137 def commit(self): 2138 data = self.get_dirty_fields() 2139 if "due_date" in data and data["due_date"] is None: 2140 data["unset_due_date"] = True 2141 2142 args = { 2143 "owner": self.repository.owner.username, 2144 "repo": self.repository.name, 2145 "index": self.number, 2146 } 2147 self._commit(args, data)
2149 def merge(self, merge_type: MergeType): 2150 """ 2151 Merge the pull request. See 2152 https://hub.allspice.io/api/swagger#/repository/repoMergePullRequest 2153 2154 :param merge_type: The type of merge to perform. See the MergeType enum. 2155 """ 2156 2157 self.allspice_client.requests_put( 2158 self.MERGE_DESIGN_REVIEW.format( 2159 owner=self.repository.owner.username, 2160 repo=self.repository.name, 2161 index=self.number, 2162 ), 2163 data={"Do": merge_type.value}, 2164 )
Merge the pull request. See allspice.allspice.io/api/swagger#/repository/repoMergePullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoMergePullRequest
Parameters
- merge_type: The type of merge to perform. See the MergeType enum.
2166 def get_comments(self) -> List[Comment]: 2167 """ 2168 Get the comments on this pull request, but not specifically on a review. 2169 2170 https://hub.allspice.io/api/swagger#/issue/issueGetComments 2171 2172 :return: A list of comments on this pull request. 2173 """ 2174 2175 results = self.allspice_client.requests_get( 2176 self.GET_COMMENTS.format( 2177 owner=self.repository.owner.username, 2178 repo=self.repository.name, 2179 index=self.number, 2180 ) 2181 ) 2182 return [Comment.parse_response(self.allspice_client, result) for result in results]
Get the comments on this pull request, but not specifically on a review.
allspice.allspice.io/api/swagger#/issue/issueGetComments">https://huballspice.allspice.io/api/swagger#/issue/issueGetComments
Returns
A list of comments on this pull request.
2184 def create_comment(self, body: str): 2185 """ 2186 Create a comment on this pull request. This uses the same endpoint as the 2187 comments on issues, and will not be associated with any reviews. 2188 2189 https://hub.allspice.io/api/swagger#/issue/issueCreateComment 2190 2191 :param body: The body of the comment. 2192 :return: The comment that was created. 2193 """ 2194 2195 result = self.allspice_client.requests_post( 2196 self.GET_COMMENTS.format( 2197 owner=self.repository.owner.username, 2198 repo=self.repository.name, 2199 index=self.number, 2200 ), 2201 data={"body": body}, 2202 ) 2203 return Comment.parse_response(self.allspice_client, result)
Create a comment on this pull request. This uses the same endpoint as the comments on issues, and will not be associated with any reviews.
allspice.allspice.io/api/swagger#/issue/issueCreateComment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateComment
Parameters
- body: The body of the comment.
Returns
The comment that was created.
2072 class MergeType(Enum): 2073 MERGE = "merge" 2074 REBASE = "rebase" 2075 REBASE_MERGE = "rebase-merge" 2076 SQUASH = "squash" 2077 MANUALLY_MERGED = "manually-merged"
Inherited Members
- enum.Enum
- name
- value
2206class Team(ApiObject): 2207 can_create_org_repo: bool 2208 description: str 2209 id: int 2210 includes_all_repositories: bool 2211 name: str 2212 organization: Optional["Organization"] 2213 permission: str 2214 units: List[str] 2215 units_map: Dict[str, str] 2216 2217 API_OBJECT = """/teams/{id}""" # <id> 2218 ADD_REPO = """/teams/%s/repos/%s/%s""" # <id, org, repo> 2219 TEAM_DELETE = """/teams/%s""" # <id> 2220 GET_MEMBERS = """/teams/%s/members""" # <id> 2221 GET_REPOS = """/teams/%s/repos""" # <id> 2222 2223 def __init__(self, allspice_client): 2224 super().__init__(allspice_client) 2225 2226 def __eq__(self, other): 2227 if not isinstance(other, Team): 2228 return False 2229 return self.organization == other.organization and self.id == other.id 2230 2231 def __hash__(self): 2232 return hash(self.organization) ^ hash(self.id) 2233 2234 _fields_to_parsers: ClassVar[dict] = { 2235 "organization": lambda allspice_client, o: Organization.parse_response(allspice_client, o) 2236 } 2237 2238 _patchable_fields: ClassVar[set[str]] = { 2239 "can_create_org_repo", 2240 "description", 2241 "includes_all_repositories", 2242 "name", 2243 "permission", 2244 "units", 2245 "units_map", 2246 } 2247 2248 @classmethod 2249 def request(cls, allspice_client, id: int): 2250 return cls._request(allspice_client, {"id": id}) 2251 2252 def commit(self): 2253 args = {"id": self.id} 2254 self._commit(args) 2255 2256 def add_user(self, user: User): 2257 """https://hub.allspice.io/api/swagger#/organization/orgAddTeamMember""" 2258 url = f"/teams/{self.id}/members/{user.login}" 2259 self.allspice_client.requests_put(url) 2260 2261 def add_repo(self, org: Organization, repo: Union[Repository, str]): 2262 if isinstance(repo, Repository): 2263 repo_name = repo.name 2264 else: 2265 repo_name = repo 2266 self.allspice_client.requests_put(Team.ADD_REPO % (self.id, org.username, repo_name)) 2267 2268 def get_members(self): 2269 """Get all users assigned to the team.""" 2270 results = self.allspice_client.requests_get_paginated( 2271 Team.GET_MEMBERS % self.id, 2272 ) 2273 return [User.parse_response(self.allspice_client, result) for result in results] 2274 2275 def get_repos(self): 2276 """Get all repos of this Team.""" 2277 results = self.allspice_client.requests_get(Team.GET_REPOS % self.id) 2278 return [Repository.parse_response(self.allspice_client, result) for result in results] 2279 2280 def delete(self): 2281 self.allspice_client.requests_delete(Team.TEAM_DELETE % self.id) 2282 self.deleted = True 2283 2284 def remove_team_member(self, user_name: str): 2285 url = f"/teams/{self.id}/members/{user_name}" 2286 self.allspice_client.requests_delete(url)
2256 def add_user(self, user: User): 2257 """https://hub.allspice.io/api/swagger#/organization/orgAddTeamMember""" 2258 url = f"/teams/{self.id}/members/{user.login}" 2259 self.allspice_client.requests_put(url)
allspice.allspice.io/api/swagger#/organization/orgAddTeamMember">https://huballspice.allspice.io/api/swagger#/organization/orgAddTeamMember
2268 def get_members(self): 2269 """Get all users assigned to the team.""" 2270 results = self.allspice_client.requests_get_paginated( 2271 Team.GET_MEMBERS % self.id, 2272 ) 2273 return [User.parse_response(self.allspice_client, result) for result in results]
Get all users assigned to the team.
2275 def get_repos(self): 2276 """Get all repos of this Team.""" 2277 results = self.allspice_client.requests_get(Team.GET_REPOS % self.id) 2278 return [Repository.parse_response(self.allspice_client, result) for result in results]
Get all repos of this Team.
2289class Release(ApiObject): 2290 """ 2291 A release on a repo. 2292 """ 2293 2294 assets: List[Union[Any, Dict[str, Union[int, str]], "ReleaseAsset"]] 2295 author: User 2296 body: str 2297 created_at: str 2298 draft: bool 2299 html_url: str 2300 id: int 2301 name: str 2302 prerelease: bool 2303 published_at: str 2304 repo: Optional["Repository"] 2305 repository: Optional["Repository"] 2306 tag_name: str 2307 tarball_url: str 2308 target_commitish: str 2309 upload_url: str 2310 url: str 2311 zipball_url: str 2312 2313 API_OBJECT = "/repos/{owner}/{repo}/releases/{id}" 2314 RELEASE_CREATE_ASSET = "/repos/{owner}/{repo}/releases/{id}/assets" 2315 # Note that we don't strictly need the get_assets route, as the release 2316 # object already contains the assets. 2317 2318 def __init__(self, allspice_client): 2319 super().__init__(allspice_client) 2320 2321 def __eq__(self, other): 2322 if not isinstance(other, Release): 2323 return False 2324 return self.repo == other.repo and self.id == other.id 2325 2326 def __hash__(self): 2327 return hash(self.repo) ^ hash(self.id) 2328 2329 _fields_to_parsers: ClassVar[dict] = { 2330 "author": lambda allspice_client, author: User.parse_response(allspice_client, author), 2331 } 2332 _patchable_fields: ClassVar[set[str]] = { 2333 "body", 2334 "draft", 2335 "name", 2336 "prerelease", 2337 "tag_name", 2338 "target_commitish", 2339 } 2340 2341 @classmethod 2342 def parse_response(cls, allspice_client, result, repo) -> Release: 2343 release = super().parse_response(allspice_client, result) 2344 Release._add_read_property("repository", repo, release) 2345 # For legacy reasons 2346 Release._add_read_property("repo", repo, release) 2347 setattr( 2348 release, 2349 "_assets", 2350 [ 2351 ReleaseAsset.parse_response(allspice_client, asset, release) 2352 for asset in result["assets"] 2353 ], 2354 ) 2355 return release 2356 2357 @classmethod 2358 def request( 2359 cls, 2360 allspice_client, 2361 owner: str, 2362 repo: str, 2363 id: Optional[int] = None, 2364 ) -> Release: 2365 args = {"owner": owner, "repo": repo, "id": id} 2366 release_response = cls._get_gitea_api_object(allspice_client, args) 2367 repository = Repository.request(allspice_client, owner, repo) 2368 release = cls.parse_response(allspice_client, release_response, repository) 2369 return release 2370 2371 def commit(self): 2372 args = {"owner": self.repo.owner.name, "repo": self.repo.name, "id": self.id} 2373 self._commit(args) 2374 2375 def create_asset(self, file: IO, name: Optional[str] = None) -> ReleaseAsset: 2376 """ 2377 Create an asset for this release. 2378 2379 https://hub.allspice.io/api/swagger#/repository/repoCreateReleaseAsset 2380 2381 :param file: The file to upload. This should be a file-like object. 2382 :param name: The name of the file. 2383 :return: The created asset. 2384 """ 2385 2386 args: dict[str, Any] = {"files": {"attachment": file}} 2387 if name is not None: 2388 args["params"] = {"name": name} 2389 2390 result = self.allspice_client.requests_post( 2391 self.RELEASE_CREATE_ASSET.format( 2392 owner=self.repo.owner.username, 2393 repo=self.repo.name, 2394 id=self.id, 2395 ), 2396 **args, 2397 ) 2398 return ReleaseAsset.parse_response(self.allspice_client, result, self) 2399 2400 def delete(self): 2401 args = {"owner": self.repo.owner.name, "repo": self.repo.name, "id": self.id} 2402 self.allspice_client.requests_delete(self.API_OBJECT.format(**args)) 2403 self.deleted = True
A release on a repo.
2341 @classmethod 2342 def parse_response(cls, allspice_client, result, repo) -> Release: 2343 release = super().parse_response(allspice_client, result) 2344 Release._add_read_property("repository", repo, release) 2345 # For legacy reasons 2346 Release._add_read_property("repo", repo, release) 2347 setattr( 2348 release, 2349 "_assets", 2350 [ 2351 ReleaseAsset.parse_response(allspice_client, asset, release) 2352 for asset in result["assets"] 2353 ], 2354 ) 2355 return release
2357 @classmethod 2358 def request( 2359 cls, 2360 allspice_client, 2361 owner: str, 2362 repo: str, 2363 id: Optional[int] = None, 2364 ) -> Release: 2365 args = {"owner": owner, "repo": repo, "id": id} 2366 release_response = cls._get_gitea_api_object(allspice_client, args) 2367 repository = Repository.request(allspice_client, owner, repo) 2368 release = cls.parse_response(allspice_client, release_response, repository) 2369 return release
2375 def create_asset(self, file: IO, name: Optional[str] = None) -> ReleaseAsset: 2376 """ 2377 Create an asset for this release. 2378 2379 https://hub.allspice.io/api/swagger#/repository/repoCreateReleaseAsset 2380 2381 :param file: The file to upload. This should be a file-like object. 2382 :param name: The name of the file. 2383 :return: The created asset. 2384 """ 2385 2386 args: dict[str, Any] = {"files": {"attachment": file}} 2387 if name is not None: 2388 args["params"] = {"name": name} 2389 2390 result = self.allspice_client.requests_post( 2391 self.RELEASE_CREATE_ASSET.format( 2392 owner=self.repo.owner.username, 2393 repo=self.repo.name, 2394 id=self.id, 2395 ), 2396 **args, 2397 ) 2398 return ReleaseAsset.parse_response(self.allspice_client, result, self)
Create an asset for this release.
allspice.allspice.io/api/swagger#/repository/repoCreateReleaseAsset">https://huballspice.allspice.io/api/swagger#/repository/repoCreateReleaseAsset
Parameters
- file: The file to upload. This should be a file-like object.
- name: The name of the file.
Returns
The created asset.
2406class ReleaseAsset(ApiObject): 2407 browser_download_url: str 2408 created_at: str 2409 download_count: int 2410 id: int 2411 name: str 2412 release: Optional["Release"] 2413 size: int 2414 uuid: str 2415 2416 API_OBJECT = "/repos/{owner}/{repo}/releases/{release_id}/assets/{id}" 2417 2418 def __init__(self, allspice_client): 2419 super().__init__(allspice_client) 2420 2421 def __eq__(self, other): 2422 if not isinstance(other, ReleaseAsset): 2423 return False 2424 return self.release == other.release and self.id == other.id 2425 2426 def __hash__(self): 2427 return hash(self.release) ^ hash(self.id) 2428 2429 _fields_to_parsers: ClassVar[dict] = {} 2430 _patchable_fields: ClassVar[set[str]] = { 2431 "name", 2432 } 2433 2434 @classmethod 2435 def parse_response(cls, allspice_client, result, release) -> ReleaseAsset: 2436 asset = super().parse_response(allspice_client, result) 2437 ReleaseAsset._add_read_property("release", release, asset) 2438 return asset 2439 2440 @classmethod 2441 def request( 2442 cls, 2443 allspice_client, 2444 owner: str, 2445 repo: str, 2446 release_id: int, 2447 id: int, 2448 ) -> ReleaseAsset: 2449 args = {"owner": owner, "repo": repo, "release_id": release_id, "id": id} 2450 asset_response = cls._get_gitea_api_object(allspice_client, args) 2451 release = Release.request(allspice_client, owner, repo, release_id) 2452 asset = cls.parse_response(allspice_client, asset_response, release) 2453 return asset 2454 2455 def commit(self): 2456 args = { 2457 "owner": self.release.repo.owner, 2458 "repo": self.release.repo.name, 2459 "release_id": self.release.id, 2460 "id": self.id, 2461 } 2462 self._commit(args) 2463 2464 def download(self) -> bytes: 2465 """ 2466 Download the raw, binary data of this asset. 2467 2468 Note 1: if the file you are requesting is a text file, you might want to 2469 use .decode() on the result to get a string. For example: 2470 2471 asset.download().decode("utf-8") 2472 2473 Note 2: this method will store the entire file in memory. If you are 2474 downloading a large file, you might want to use download_to_file instead. 2475 """ 2476 2477 return self.allspice_client.requests.get( 2478 self.browser_download_url, 2479 headers=self.allspice_client.headers, 2480 ).content 2481 2482 def download_to_file(self, io: IO): 2483 """ 2484 Download the raw, binary data of this asset to a file-like object. 2485 2486 Example: 2487 2488 with open("my_file.zip", "wb") as f: 2489 asset.download_to_file(f) 2490 2491 :param io: The file-like object to write the data to. 2492 """ 2493 2494 response = self.allspice_client.requests.get( 2495 self.browser_download_url, 2496 headers=self.allspice_client.headers, 2497 stream=True, 2498 ) 2499 # 4kb chunks 2500 for chunk in response.iter_content(chunk_size=4096): 2501 if chunk: 2502 io.write(chunk) 2503 2504 def delete(self): 2505 args = { 2506 "owner": self.release.repo.owner.name, 2507 "repo": self.release.repo.name, 2508 "release_id": self.release.id, 2509 "id": self.id, 2510 } 2511 self.allspice_client.requests_delete(self.API_OBJECT.format(**args)) 2512 self.deleted = True
2440 @classmethod 2441 def request( 2442 cls, 2443 allspice_client, 2444 owner: str, 2445 repo: str, 2446 release_id: int, 2447 id: int, 2448 ) -> ReleaseAsset: 2449 args = {"owner": owner, "repo": repo, "release_id": release_id, "id": id} 2450 asset_response = cls._get_gitea_api_object(allspice_client, args) 2451 release = Release.request(allspice_client, owner, repo, release_id) 2452 asset = cls.parse_response(allspice_client, asset_response, release) 2453 return asset
2464 def download(self) -> bytes: 2465 """ 2466 Download the raw, binary data of this asset. 2467 2468 Note 1: if the file you are requesting is a text file, you might want to 2469 use .decode() on the result to get a string. For example: 2470 2471 asset.download().decode("utf-8") 2472 2473 Note 2: this method will store the entire file in memory. If you are 2474 downloading a large file, you might want to use download_to_file instead. 2475 """ 2476 2477 return self.allspice_client.requests.get( 2478 self.browser_download_url, 2479 headers=self.allspice_client.headers, 2480 ).content
Download the raw, binary data of this asset.
Note 1: if the file you are requesting is a text file, you might want to use .decode() on the result to get a string. For example:
asset.download().decode("utf-8")
Note 2: this method will store the entire file in memory. If you are downloading a large file, you might want to use download_to_file instead.
2482 def download_to_file(self, io: IO): 2483 """ 2484 Download the raw, binary data of this asset to a file-like object. 2485 2486 Example: 2487 2488 with open("my_file.zip", "wb") as f: 2489 asset.download_to_file(f) 2490 2491 :param io: The file-like object to write the data to. 2492 """ 2493 2494 response = self.allspice_client.requests.get( 2495 self.browser_download_url, 2496 headers=self.allspice_client.headers, 2497 stream=True, 2498 ) 2499 # 4kb chunks 2500 for chunk in response.iter_content(chunk_size=4096): 2501 if chunk: 2502 io.write(chunk)
Download the raw, binary data of this asset to a file-like object.
Example:
with open("my_file.zip", "wb") as f:
asset.download_to_file(f)
Parameters
- io: The file-like object to write the data to.
2515class Content(ReadonlyApiObject): 2516 content: Any 2517 download_url: str 2518 encoding: Any 2519 git_url: str 2520 html_url: str 2521 last_commit_sha: str 2522 name: str 2523 path: str 2524 sha: str 2525 size: int 2526 submodule_git_url: Any 2527 target: Any 2528 type: str 2529 url: str 2530 2531 FILE = "file" 2532 2533 def __init__(self, allspice_client): 2534 super().__init__(allspice_client) 2535 2536 def __eq__(self, other): 2537 if not isinstance(other, Content): 2538 return False 2539 2540 return self.sha == other.sha and self.name == other.name 2541 2542 def __hash__(self): 2543 return hash(self.sha) ^ hash(self.name)
Inherited Members
2549class Util: 2550 @staticmethod 2551 def convert_time(time: str) -> datetime: 2552 """Parsing of strange Gitea time format ("%Y-%m-%dT%H:%M:%S:%z" but with ":" in time zone notation)""" 2553 try: 2554 return datetime.strptime(time[:-3] + "00", "%Y-%m-%dT%H:%M:%S%z") 2555 except ValueError: 2556 return datetime.strptime(time[:-3] + "00", "%Y-%m-%dT%H:%M:%S") 2557 2558 @staticmethod 2559 def format_time(time: datetime) -> str: 2560 """ 2561 Format a datetime object to Gitea's time format. 2562 2563 :param time: The time to format 2564 :return: Formatted time 2565 """ 2566 2567 return time.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S") + "Z" 2568 2569 @staticmethod 2570 def data_params_for_ref(ref: Optional[Ref]) -> Dict: 2571 """ 2572 Given a "ref", returns a dict with the ref parameter for the API call. 2573 2574 If the ref is None, returns an empty dict. You can pass this to the API 2575 directly. 2576 """ 2577 2578 if isinstance(ref, Branch): 2579 return {"ref": ref.name} 2580 elif isinstance(ref, Commit): 2581 return {"ref": ref.sha} 2582 elif ref: 2583 return {"ref": ref} 2584 else: 2585 return {}
2550 @staticmethod 2551 def convert_time(time: str) -> datetime: 2552 """Parsing of strange Gitea time format ("%Y-%m-%dT%H:%M:%S:%z" but with ":" in time zone notation)""" 2553 try: 2554 return datetime.strptime(time[:-3] + "00", "%Y-%m-%dT%H:%M:%S%z") 2555 except ValueError: 2556 return datetime.strptime(time[:-3] + "00", "%Y-%m-%dT%H:%M:%S")
Parsing of strange Gitea time format ("%Y-%m-%dT%H:%M:%S:%z" but with ":" in time zone notation)
2558 @staticmethod 2559 def format_time(time: datetime) -> str: 2560 """ 2561 Format a datetime object to Gitea's time format. 2562 2563 :param time: The time to format 2564 :return: Formatted time 2565 """ 2566 2567 return time.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S") + "Z"
Format a datetime object to Gitea's time format.
Parameters
- time: The time to format
Returns
Formatted time
2569 @staticmethod 2570 def data_params_for_ref(ref: Optional[Ref]) -> Dict: 2571 """ 2572 Given a "ref", returns a dict with the ref parameter for the API call. 2573 2574 If the ref is None, returns an empty dict. You can pass this to the API 2575 directly. 2576 """ 2577 2578 if isinstance(ref, Branch): 2579 return {"ref": ref.name} 2580 elif isinstance(ref, Commit): 2581 return {"ref": ref.sha} 2582 elif ref: 2583 return {"ref": ref} 2584 else: 2585 return {}
Given a "ref", returns a dict with the ref parameter for the API call.
If the ref is None, returns an empty dict. You can pass this to the API directly.