allspice.apiobject
1# cSpell:words ECAD 2 3from __future__ import annotations 4 5import logging 6import re 7from dataclasses import asdict, dataclass 8from datetime import datetime, timezone 9from enum import Enum 10from functools import cached_property 11from typing import ( 12 IO, 13 Any, 14 ClassVar, 15 Dict, 16 FrozenSet, 17 List, 18 Literal, 19 Optional, 20 Sequence, 21 Set, 22 Tuple, 23 Union, 24) 25 26try: 27 from typing_extensions import Self 28except ImportError: 29 from typing import Self 30 31from .baseapiobject import ApiObject, ReadonlyApiObject 32from .exceptions import ConflictException, NotFoundException 33 34 35class Organization(ApiObject): 36 """see https://hub.allspice.io/api/swagger#/organization/orgGetAll""" 37 38 active: Optional[bool] 39 avatar_url: str 40 created: Optional[str] 41 description: str 42 email: str 43 followers_count: Optional[int] 44 following_count: Optional[int] 45 full_name: str 46 html_url: Optional[str] 47 id: int 48 is_admin: Optional[bool] 49 language: Optional[str] 50 last_login: Optional[str] 51 location: str 52 login: Optional[str] 53 login_name: Optional[str] 54 name: Optional[str] 55 prohibit_login: Optional[bool] 56 repo_admin_change_team_access: Optional[bool] 57 restricted: Optional[bool] 58 source_id: Optional[int] 59 starred_repos_count: Optional[int] 60 username: str 61 visibility: str 62 website: str 63 64 API_OBJECT = """/orgs/{name}""" # <org> 65 ORG_REPOS_REQUEST = """/orgs/%s/repos""" # <org> 66 ORG_TEAMS_REQUEST = """/orgs/%s/teams""" # <org> 67 ORG_TEAMS_CREATE = """/orgs/%s/teams""" # <org> 68 ORG_GET_MEMBERS = """/orgs/%s/members""" # <org> 69 ORG_IS_MEMBER = """/orgs/%s/members/%s""" # <org>, <username> 70 ORG_HEATMAP = """/users/%s/heatmap""" # <username> 71 72 def __init__(self, allspice_client): 73 super().__init__(allspice_client) 74 75 def __eq__(self, other): 76 if not isinstance(other, Organization): 77 return False 78 return self.allspice_client == other.allspice_client and self.name == other.name 79 80 def __hash__(self): 81 return hash(self.allspice_client) ^ hash(self.name) 82 83 @classmethod 84 def request(cls, allspice_client, name: str) -> Self: 85 return cls._request(allspice_client, {"name": name}) 86 87 @classmethod 88 def parse_response(cls, allspice_client, result) -> "Organization": 89 api_object = super().parse_response(allspice_client, result) 90 # add "name" field to make this behave similar to users for gitea < 1.18 91 # also necessary for repository-owner when org is repo owner 92 if not hasattr(api_object, "name"): 93 Organization._add_read_property("name", result["username"], api_object) 94 return api_object 95 96 _patchable_fields: ClassVar[set[str]] = { 97 "description", 98 "full_name", 99 "location", 100 "visibility", 101 "website", 102 } 103 104 def commit(self): 105 args = {"name": self.name} 106 self._commit(args) 107 108 def create_repo( 109 self, 110 repoName: str, 111 description: str = "", 112 private: bool = False, 113 autoInit=True, 114 gitignores: Optional[str] = None, 115 license: Optional[str] = None, 116 readme: str = "Default", 117 issue_labels: Optional[str] = None, 118 default_branch="master", 119 ): 120 """Create an organization Repository 121 122 Throws: 123 AlreadyExistsException: If the Repository exists already. 124 Exception: If something else went wrong. 125 """ 126 result = self.allspice_client.requests_post( 127 f"/orgs/{self.name}/repos", 128 data={ 129 "name": repoName, 130 "description": description, 131 "private": private, 132 "auto_init": autoInit, 133 "gitignores": gitignores, 134 "license": license, 135 "issue_labels": issue_labels, 136 "readme": readme, 137 "default_branch": default_branch, 138 }, 139 ) 140 if "id" in result: 141 self.allspice_client.logger.info("Successfully created Repository %s " % result["name"]) 142 else: 143 self.allspice_client.logger.error(result["message"]) 144 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 145 return Repository.parse_response(self.allspice_client, result) 146 147 def get_repositories(self) -> List["Repository"]: 148 results = self.allspice_client.requests_get_paginated( 149 Organization.ORG_REPOS_REQUEST % self.username 150 ) 151 return [Repository.parse_response(self.allspice_client, result) for result in results] 152 153 def get_repository(self, name) -> "Repository": 154 repos = self.get_repositories() 155 for repo in repos: 156 if repo.name == name: 157 return repo 158 raise NotFoundException("Repository %s not existent in organization." % name) 159 160 def get_teams(self) -> List["Team"]: 161 results = self.allspice_client.requests_get(Organization.ORG_TEAMS_REQUEST % self.username) 162 teams = [Team.parse_response(self.allspice_client, result) for result in results] 163 # organisation seems to be missing using this request, so we add org manually 164 for t in teams: 165 setattr(t, "_organization", self) 166 return teams 167 168 def get_team(self, name) -> "Team": 169 teams = self.get_teams() 170 for team in teams: 171 if team.name == name: 172 return team 173 raise NotFoundException("Team not existent in organization.") 174 175 def create_team( 176 self, 177 name: str, 178 description: str = "", 179 permission: str = "read", 180 can_create_org_repo: bool = False, 181 includes_all_repositories: bool = False, 182 units=( 183 "repo.code", 184 "repo.issues", 185 "repo.ext_issues", 186 "repo.wiki", 187 "repo.pulls", 188 "repo.releases", 189 "repo.ext_wiki", 190 ), 191 units_map={}, 192 ) -> "Team": 193 """Alias for AllSpice#create_team""" 194 # TODO: Move AllSpice#create_team to Organization#create_team and 195 # deprecate AllSpice#create_team. 196 return self.allspice_client.create_team( 197 org=self, 198 name=name, 199 description=description, 200 permission=permission, 201 can_create_org_repo=can_create_org_repo, 202 includes_all_repositories=includes_all_repositories, 203 units=units, 204 units_map=units_map, 205 ) 206 207 def get_members(self) -> List["User"]: 208 results = self.allspice_client.requests_get(Organization.ORG_GET_MEMBERS % self.username) 209 return [User.parse_response(self.allspice_client, result) for result in results] 210 211 def is_member(self, username) -> bool: 212 if isinstance(username, User): 213 username = username.username 214 try: 215 # returns 204 if its ok, 404 if its not 216 self.allspice_client.requests_get( 217 Organization.ORG_IS_MEMBER % (self.username, username) 218 ) 219 return True 220 except Exception: 221 return False 222 223 def remove_member(self, user: "User"): 224 path = f"/orgs/{self.username}/members/{user.username}" 225 self.allspice_client.requests_delete(path) 226 227 def delete(self): 228 """Delete this Organization. Invalidates this Objects data. Also deletes all Repositories owned by the User""" 229 for repo in self.get_repositories(): 230 repo.delete() 231 self.allspice_client.requests_delete(Organization.API_OBJECT.format(name=self.username)) 232 self.deleted = True 233 234 def get_heatmap(self) -> List[Tuple[datetime, int]]: 235 results = self.allspice_client.requests_get(User.USER_HEATMAP % self.username) 236 results = [ 237 (datetime.fromtimestamp(result["timestamp"]), result["contributions"]) 238 for result in results 239 ] 240 return results 241 242 243class User(ApiObject): 244 active: bool 245 admin: Any 246 allow_create_organization: Any 247 allow_git_hook: Any 248 allow_import_local: Any 249 avatar_url: str 250 created: str 251 description: str 252 email: str 253 emails: List[Any] 254 followers_count: int 255 following_count: int 256 full_name: str 257 html_url: str 258 id: int 259 is_admin: bool 260 language: str 261 last_login: str 262 location: str 263 login: str 264 login_name: str 265 max_repo_creation: Any 266 must_change_password: Any 267 password: Any 268 prohibit_login: bool 269 restricted: bool 270 source_id: int 271 starred_repos_count: int 272 username: str 273 visibility: str 274 website: str 275 276 API_OBJECT = """/users/{name}""" # <org> 277 USER_MAIL = """/user/emails?sudo=%s""" # <name> 278 USER_PATCH = """/admin/users/%s""" # <username> 279 ADMIN_DELETE_USER = """/admin/users/%s""" # <username> 280 ADMIN_EDIT_USER = """/admin/users/{username}""" # <username> 281 USER_HEATMAP = """/users/%s/heatmap""" # <username> 282 283 def __init__(self, allspice_client): 284 super().__init__(allspice_client) 285 self._emails = [] 286 287 def __eq__(self, other): 288 if not isinstance(other, User): 289 return False 290 return self.allspice_client == other.allspice_client and self.id == other.id 291 292 def __hash__(self): 293 return hash(self.allspice_client) ^ hash(self.id) 294 295 @property 296 def emails(self): 297 self.__request_emails() 298 return self._emails 299 300 @classmethod 301 def request(cls, allspice_client, name: str) -> "User": 302 api_object = cls._request(allspice_client, {"name": name}) 303 return api_object 304 305 _patchable_fields: ClassVar[set[str]] = { 306 "active", 307 "admin", 308 "allow_create_organization", 309 "allow_git_hook", 310 "allow_import_local", 311 "email", 312 "full_name", 313 "location", 314 "login_name", 315 "max_repo_creation", 316 "must_change_password", 317 "password", 318 "prohibit_login", 319 "website", 320 } 321 322 def commit(self, login_name: str, source_id: int = 0): 323 """ 324 Unfortunately it is necessary to require the login name 325 as well as the login source (that is not supplied when getting a user) for 326 changing a user. 327 Usually source_id is 0 and the login_name is equal to the username. 328 """ 329 values = self.get_dirty_fields() 330 values.update( 331 # api-doc says that the "source_id" is necessary; works without though 332 {"login_name": login_name, "source_id": source_id} 333 ) 334 args = {"username": self.username} 335 self.allspice_client.requests_patch(User.ADMIN_EDIT_USER.format(**args), data=values) 336 self._dirty_fields = {} 337 338 def create_repo( 339 self, 340 repoName: str, 341 description: str = "", 342 private: bool = False, 343 autoInit=True, 344 gitignores: Optional[str] = None, 345 license: Optional[str] = None, 346 readme: str = "Default", 347 issue_labels: Optional[str] = None, 348 default_branch="master", 349 ): 350 """Create a user Repository 351 352 Throws: 353 AlreadyExistsException: If the Repository exists already. 354 Exception: If something else went wrong. 355 """ 356 result = self.allspice_client.requests_post( 357 "/user/repos", 358 data={ 359 "name": repoName, 360 "description": description, 361 "private": private, 362 "auto_init": autoInit, 363 "gitignores": gitignores, 364 "license": license, 365 "issue_labels": issue_labels, 366 "readme": readme, 367 "default_branch": default_branch, 368 }, 369 ) 370 if "id" in result: 371 self.allspice_client.logger.info("Successfully created Repository %s " % result["name"]) 372 else: 373 self.allspice_client.logger.error(result["message"]) 374 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 375 return Repository.parse_response(self.allspice_client, result) 376 377 def get_repositories(self) -> List["Repository"]: 378 """Get all Repositories owned by this User.""" 379 url = f"/users/{self.username}/repos" 380 results = self.allspice_client.requests_get_paginated(url) 381 return [Repository.parse_response(self.allspice_client, result) for result in results] 382 383 def get_orgs(self) -> List[Organization]: 384 """Get all Organizations this user is a member of.""" 385 url = f"/users/{self.username}/orgs" 386 results = self.allspice_client.requests_get_paginated(url) 387 return [Organization.parse_response(self.allspice_client, result) for result in results] 388 389 def get_teams(self) -> List["Team"]: 390 url = "/user/teams" 391 results = self.allspice_client.requests_get_paginated(url, sudo=self) 392 return [Team.parse_response(self.allspice_client, result) for result in results] 393 394 def get_accessible_repos(self) -> List["Repository"]: 395 """Get all Repositories accessible by the logged in User.""" 396 results = self.allspice_client.requests_get("/user/repos", sudo=self) 397 return [Repository.parse_response(self.allspice_client, result) for result in results] 398 399 def __request_emails(self): 400 result = self.allspice_client.requests_get(User.USER_MAIL % self.login) 401 # report if the adress changed by this 402 for mail in result: 403 self._emails.append(mail["email"]) 404 if mail["primary"]: 405 self._email = mail["email"] 406 407 def delete(self): 408 """Deletes this User. Also deletes all Repositories he owns.""" 409 self.allspice_client.requests_delete(User.ADMIN_DELETE_USER % self.username) 410 self.deleted = True 411 412 def get_heatmap(self) -> List[Tuple[datetime, int]]: 413 results = self.allspice_client.requests_get(User.USER_HEATMAP % self.username) 414 results = [ 415 (datetime.fromtimestamp(result["timestamp"]), result["contributions"]) 416 for result in results 417 ] 418 return results 419 420 421class Branch(ReadonlyApiObject): 422 commit: Dict[str, Optional[Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]]] 423 effective_branch_protection_name: str 424 enable_status_check: bool 425 name: str 426 protected: bool 427 required_approvals: int 428 status_check_contexts: List[Any] 429 user_can_merge: bool 430 user_can_push: bool 431 432 API_OBJECT = """/repos/{owner}/{repo}/branches/{branch}""" 433 434 def __init__(self, allspice_client): 435 super().__init__(allspice_client) 436 437 def __eq__(self, other): 438 if not isinstance(other, Branch): 439 return False 440 return self.commit == other.commit and self.name == other.name 441 442 def __hash__(self): 443 return hash(self.commit["id"]) ^ hash(self.name) 444 445 _fields_to_parsers: ClassVar[dict] = { 446 # This is not a commit object 447 # "commit": lambda allspice_client, c: Commit.parse_response(allspice_client, c) 448 } 449 450 @classmethod 451 def request(cls, allspice_client, owner: str, repo: str, branch: str): 452 return cls._request(allspice_client, {"owner": owner, "repo": repo, "branch": branch}) 453 454 455class GitEntry(ReadonlyApiObject): 456 """ 457 An object representing a file or directory in the Git tree. 458 """ 459 460 mode: str 461 path: str 462 sha: str 463 size: int 464 type: str 465 url: str 466 467 def __init__(self, allspice_client): 468 super().__init__(allspice_client) 469 470 def __eq__(self, other) -> bool: 471 if not isinstance(other, GitEntry): 472 return False 473 return self.sha == other.sha 474 475 def __hash__(self) -> int: 476 return hash(self.sha) 477 478 479class Repository(ApiObject): 480 allow_fast_forward_only_merge: bool 481 allow_manual_merge: Any 482 allow_merge_commits: bool 483 allow_rebase: bool 484 allow_rebase_explicit: bool 485 allow_rebase_update: bool 486 allow_squash_merge: bool 487 archived: bool 488 archived_at: str 489 autodetect_manual_merge: Any 490 avatar_url: str 491 clone_url: str 492 created_at: str 493 default_allow_maintainer_edit: bool 494 default_branch: str 495 default_delete_branch_after_merge: bool 496 default_merge_style: str 497 description: str 498 empty: bool 499 enable_prune: Any 500 external_tracker: Any 501 external_wiki: Any 502 fork: bool 503 forks_count: int 504 full_name: str 505 has_actions: bool 506 has_issues: bool 507 has_packages: bool 508 has_projects: bool 509 has_pull_requests: bool 510 has_releases: bool 511 has_wiki: bool 512 html_url: str 513 id: int 514 ignore_whitespace_conflicts: bool 515 internal: bool 516 internal_tracker: Dict[str, bool] 517 language: str 518 languages_url: str 519 licenses: Any 520 link: str 521 mirror: bool 522 mirror_interval: str 523 mirror_updated: str 524 name: str 525 object_format_name: str 526 open_issues_count: int 527 open_pr_counter: int 528 original_url: str 529 owner: Union["User", "Organization"] 530 parent: Any 531 permissions: Dict[str, bool] 532 private: bool 533 projects_mode: str 534 release_counter: int 535 repo_transfer: Any 536 size: int 537 ssh_url: str 538 stars_count: int 539 template: bool 540 topics: List[Union[Any, str]] 541 updated_at: datetime 542 url: str 543 watchers_count: int 544 website: str 545 546 API_OBJECT = """/repos/{owner}/{name}""" # <owner>, <reponame> 547 REPO_IS_COLLABORATOR = """/repos/%s/%s/collaborators/%s""" # <owner>, <reponame>, <username> 548 REPO_SEARCH = """/repos/search/""" 549 REPO_BRANCHES = """/repos/%s/%s/branches""" # <owner>, <reponame> 550 REPO_BRANCH = """/repos/{owner}/{repo}/branches/{branch}""" 551 REPO_ISSUES = """/repos/{owner}/{repo}/issues""" # <owner, reponame> 552 REPO_DESIGN_REVIEWS = """/repos/{owner}/{repo}/pulls""" 553 REPO_DELETE = """/repos/%s/%s""" # <owner>, <reponame> 554 REPO_TIMES = """/repos/%s/%s/times""" # <owner>, <reponame> 555 REPO_USER_TIME = """/repos/%s/%s/times/%s""" # <owner>, <reponame>, <username> 556 REPO_COMMITS = "/repos/%s/%s/commits" # <owner>, <reponame> 557 REPO_TRANSFER = "/repos/{owner}/{repo}/transfer" 558 REPO_MILESTONES = """/repos/{owner}/{repo}/milestones""" 559 REPO_GET_ARCHIVE = "/repos/{owner}/{repo}/archive/{ref}.{format}" 560 REPO_GET_ALLSPICE_JSON = "/repos/{owner}/{repo}/allspice_generated/json/{content}" 561 REPO_GET_ALLSPICE_SVG = "/repos/{owner}/{repo}/allspice_generated/svg/{content}" 562 REPO_GET_ALLSPICE_PROJECT = "/repos/{owner}/{repo}/allspice_generated/project/{content}" 563 REPO_GET_TOPICS = "/repos/{owner}/{repo}/topics" 564 REPO_ADD_TOPIC = "/repos/{owner}/{repo}/topics/{topic}" 565 REPO_GET_RELEASES = "/repos/{owner}/{repo}/releases" 566 REPO_GET_LATEST_RELEASE = "/repos/{owner}/{repo}/releases/latest" 567 REPO_GET_RELEASE_BY_TAG = "/repos/{owner}/{repo}/releases/tags/{tag}" 568 REPO_GET_COMMIT_STATUS = "/repos/{owner}/{repo}/statuses/{sha}" 569 REPO_GET_MEDIA = "/repos/{owner}/{repo}/media/{path}" 570 REPO_GET_TREE = "/repos/{owner}/{repo}/git/trees/{ref}" 571 572 class ArchiveFormat(Enum): 573 """ 574 Archive formats for Repository.get_archive 575 """ 576 577 TAR = "tar.gz" 578 ZIP = "zip" 579 580 class CommitStatusSort(Enum): 581 """ 582 Sort order for Repository.get_commit_status 583 """ 584 585 OLDEST = "oldest" 586 RECENT_UPDATE = "recentupdate" 587 LEAST_UPDATE = "leastupdate" 588 LEAST_INDEX = "leastindex" 589 HIGHEST_INDEX = "highestindex" 590 591 def __init__(self, allspice_client): 592 super().__init__(allspice_client) 593 594 def __eq__(self, other): 595 if not isinstance(other, Repository): 596 return False 597 return self.owner == other.owner and self.name == other.name 598 599 def __hash__(self): 600 return hash(self.owner) ^ hash(self.name) 601 602 _fields_to_parsers: ClassVar[dict] = { 603 # dont know how to tell apart user and org as owner except form email being empty. 604 "owner": lambda allspice_client, r: ( 605 Organization.parse_response(allspice_client, r) 606 if r["email"] == "" 607 else User.parse_response(allspice_client, r) 608 ), 609 "updated_at": lambda _, t: Util.convert_time(t), 610 } 611 612 @classmethod 613 def request( 614 cls, 615 allspice_client, 616 owner: str, 617 name: str, 618 ) -> Repository: 619 return cls._request(allspice_client, {"owner": owner, "name": name}) 620 621 @classmethod 622 def search( 623 cls, 624 allspice_client, 625 query: Optional[str] = None, 626 topic: bool = False, 627 include_description: bool = False, 628 user: Optional[User] = None, 629 owner_to_prioritize: Union[User, Organization, None] = None, 630 ) -> list[Repository]: 631 """ 632 Search for repositories. 633 634 See https://hub.allspice.io/api/swagger#/repository/repoSearch 635 636 :param query: The query string to search for 637 :param topic: If true, the query string will only be matched against the 638 repository's topic. 639 :param include_description: If true, the query string will be matched 640 against the repository's description as well. 641 :param user: If specified, only repositories that this user owns or 642 contributes to will be searched. 643 :param owner_to_prioritize: If specified, repositories owned by the 644 given entity will be prioritized in the search. 645 :returns: All repositories matching the query. If there are many 646 repositories matching this query, this may take some time. 647 """ 648 649 params = {} 650 651 if query is not None: 652 params["q"] = query 653 if topic: 654 params["topic"] = topic 655 if include_description: 656 params["include_description"] = include_description 657 if user is not None: 658 params["user"] = user.id 659 if owner_to_prioritize is not None: 660 params["owner_to_prioritize"] = owner_to_prioritize.id 661 662 responses = allspice_client.requests_get_paginated(cls.REPO_SEARCH, params=params) 663 664 return [Repository.parse_response(allspice_client, response) for response in responses] 665 666 _patchable_fields: ClassVar[set[str]] = { 667 "allow_manual_merge", 668 "allow_merge_commits", 669 "allow_rebase", 670 "allow_rebase_explicit", 671 "allow_rebase_update", 672 "allow_squash_merge", 673 "archived", 674 "autodetect_manual_merge", 675 "default_branch", 676 "default_delete_branch_after_merge", 677 "default_merge_style", 678 "description", 679 "enable_prune", 680 "external_tracker", 681 "external_wiki", 682 "has_actions", 683 "has_issues", 684 "has_projects", 685 "has_pull_requests", 686 "has_wiki", 687 "ignore_whitespace_conflicts", 688 "internal_tracker", 689 "mirror_interval", 690 "name", 691 "private", 692 "template", 693 "website", 694 } 695 696 def commit(self): 697 args = {"owner": self.owner.username, "name": self.name} 698 self._commit(args) 699 700 def get_branches(self) -> List["Branch"]: 701 """Get all the Branches of this Repository.""" 702 703 results = self.allspice_client.requests_get_paginated( 704 Repository.REPO_BRANCHES % (self.owner.username, self.name) 705 ) 706 return [Branch.parse_response(self.allspice_client, result) for result in results] 707 708 def get_branch(self, name: str) -> "Branch": 709 """Get a specific Branch of this Repository.""" 710 result = self.allspice_client.requests_get( 711 Repository.REPO_BRANCH.format(owner=self.owner.username, repo=self.name, branch=name) 712 ) 713 return Branch.parse_response(self.allspice_client, result) 714 715 def add_branch(self, create_from: Ref, newname: str) -> "Branch": 716 """Add a branch to the repository""" 717 # Note: will only work with gitea 1.13 or higher! 718 719 ref_name = Util.data_params_for_ref(create_from) 720 if "ref" not in ref_name: 721 raise ValueError("create_from must be a Branch, Commit or string") 722 ref_name = ref_name["ref"] 723 724 data = {"new_branch_name": newname, "old_ref_name": ref_name} 725 result = self.allspice_client.requests_post( 726 Repository.REPO_BRANCHES % (self.owner.username, self.name), data=data 727 ) 728 return Branch.parse_response(self.allspice_client, result) 729 730 def get_issues( 731 self, 732 state: Literal["open", "closed", "all"] = "all", 733 search_query: Optional[str] = None, 734 labels: Optional[List[str]] = None, 735 milestones: Optional[List[Union[Milestone, str]]] = None, 736 assignee: Optional[Union[User, str]] = None, 737 since: Optional[datetime] = None, 738 before: Optional[datetime] = None, 739 ) -> List["Issue"]: 740 """ 741 Get all Issues of this Repository (open and closed) 742 743 https://hub.allspice.io/api/swagger#/repository/repoListIssues 744 745 All params of this method are optional filters. If you don't specify a filter, it 746 will not be applied. 747 748 :param state: The state of the Issues to get. If None, all Issues are returned. 749 :param search_query: Filter issues by text. This is equivalent to searching for 750 `search_query` in the Issues on the web interface. 751 :param labels: Filter issues by labels. 752 :param milestones: Filter issues by milestones. 753 :param assignee: Filter issues by the assigned user. 754 :param since: Filter issues by the date they were created. 755 :param before: Filter issues by the date they were created. 756 :return: A list of Issues. 757 """ 758 759 data = { 760 "state": state, 761 } 762 if search_query: 763 data["q"] = search_query 764 if labels: 765 data["labels"] = ",".join(labels) 766 if milestones: 767 data["milestone"] = ",".join( 768 [ 769 milestone.name if isinstance(milestone, Milestone) else milestone 770 for milestone in milestones 771 ] 772 ) 773 if assignee: 774 if isinstance(assignee, User): 775 data["assignee"] = assignee.username 776 else: 777 data["assignee"] = assignee 778 if since: 779 data["since"] = Util.format_time(since) 780 if before: 781 data["before"] = Util.format_time(before) 782 783 results = self.allspice_client.requests_get_paginated( 784 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 785 params=data, 786 ) 787 788 issues = [] 789 for result in results: 790 issue = Issue.parse_response(self.allspice_client, result) 791 # See Issue.request 792 setattr(issue, "_repository", self) 793 # This is mostly for compatibility with an older implementation 794 Issue._add_read_property("repo", self, issue) 795 issues.append(issue) 796 797 return issues 798 799 def get_design_reviews( 800 self, 801 state: Literal["open", "closed", "all"] = "all", 802 milestone: Optional[Union[Milestone, str]] = None, 803 labels: Optional[List[str]] = None, 804 ) -> List["DesignReview"]: 805 """ 806 Get all Design Reviews of this Repository. 807 808 https://hub.allspice.io/api/swagger#/repository/repoListPullRequests 809 810 :param state: The state of the Design Reviews to get. If None, all Design Reviews 811 are returned. 812 :param milestone: The milestone of the Design Reviews to get. 813 :param labels: A list of label IDs to filter DRs by. 814 :return: A list of Design Reviews. 815 """ 816 817 params = { 818 "state": state, 819 } 820 if milestone: 821 if isinstance(milestone, Milestone): 822 params["milestone"] = milestone.name 823 else: 824 params["milestone"] = milestone 825 if labels: 826 params["labels"] = ",".join(labels) 827 828 results = self.allspice_client.requests_get_paginated( 829 self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name), 830 params=params, 831 ) 832 return [DesignReview.parse_response(self.allspice_client, result) for result in results] 833 834 def get_commits( 835 self, 836 sha: Optional[str] = None, 837 path: Optional[str] = None, 838 stat: bool = True, 839 ) -> List["Commit"]: 840 """ 841 Get all the Commits of this Repository. 842 843 https://hub.allspice.io/api/swagger#/repository/repoGetAllCommits 844 845 :param sha: The SHA of the commit to start listing commits from. 846 :param path: filepath of a file/dir. 847 :param stat: Include the number of additions and deletions in the response. 848 Disable for speedup. 849 :return: A list of Commits. 850 """ 851 852 data = {} 853 if sha: 854 data["sha"] = sha 855 if path: 856 data["path"] = path 857 if not stat: 858 data["stat"] = False 859 860 try: 861 results = self.allspice_client.requests_get_paginated( 862 Repository.REPO_COMMITS % (self.owner.username, self.name), 863 params=data, 864 ) 865 except ConflictException as err: 866 logging.warning(err) 867 logging.warning("Repository %s/%s is Empty" % (self.owner.username, self.name)) 868 results = [] 869 return [Commit.parse_response(self.allspice_client, result) for result in results] 870 871 def get_issues_state(self, state) -> List["Issue"]: 872 """ 873 DEPRECATED: Use get_issues() instead. 874 875 Get issues of state Issue.open or Issue.closed of a repository. 876 """ 877 878 assert state in [Issue.OPENED, Issue.CLOSED] 879 issues = [] 880 data = {"state": state} 881 results = self.allspice_client.requests_get_paginated( 882 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 883 params=data, 884 ) 885 for result in results: 886 issue = Issue.parse_response(self.allspice_client, result) 887 # adding data not contained in the issue response 888 # See Issue.request() 889 setattr(issue, "_repository", self) 890 Issue._add_read_property("repo", self, issue) 891 Issue._add_read_property("owner", self.owner, issue) 892 issues.append(issue) 893 return issues 894 895 def get_times(self): 896 results = self.allspice_client.requests_get( 897 Repository.REPO_TIMES % (self.owner.username, self.name) 898 ) 899 return results 900 901 def get_user_time(self, username) -> float: 902 if isinstance(username, User): 903 username = username.username 904 results = self.allspice_client.requests_get( 905 Repository.REPO_USER_TIME % (self.owner.username, self.name, username) 906 ) 907 time = sum(r["time"] for r in results) 908 return time 909 910 def get_full_name(self) -> str: 911 return self.owner.username + "/" + self.name 912 913 def create_issue(self, title, assignees=frozenset(), description="") -> ApiObject: 914 data = { 915 "assignees": assignees, 916 "body": description, 917 "closed": False, 918 "title": title, 919 } 920 result = self.allspice_client.requests_post( 921 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 922 data=data, 923 ) 924 925 issue = Issue.parse_response(self.allspice_client, result) 926 setattr(issue, "_repository", self) 927 Issue._add_read_property("repo", self, issue) 928 return issue 929 930 def create_design_review( 931 self, 932 title: str, 933 head: Union[Branch, str], 934 base: Union[Branch, str], 935 assignees: Optional[Set[Union[User, str]]] = None, 936 body: Optional[str] = None, 937 due_date: Optional[datetime] = None, 938 milestone: Optional["Milestone"] = None, 939 ) -> "DesignReview": 940 """ 941 Create a new Design Review. 942 943 See https://hub.allspice.io/api/swagger#/repository/repoCreatePullRequest 944 945 :param title: Title of the Design Review 946 :param head: Branch or name of the branch to merge into the base branch 947 :param base: Branch or name of the branch to merge into 948 :param assignees: Optional. A list of users to assign this review. List can be of 949 User objects or of usernames. 950 :param body: An Optional Description for the Design Review. 951 :param due_date: An Optional Due date for the Design Review. 952 :param milestone: An Optional Milestone for the Design Review 953 :return: The created Design Review 954 """ 955 956 data: dict[str, Any] = { 957 "title": title, 958 } 959 960 if isinstance(head, Branch): 961 data["head"] = head.name 962 else: 963 data["head"] = head 964 if isinstance(base, Branch): 965 data["base"] = base.name 966 else: 967 data["base"] = base 968 if assignees: 969 data["assignees"] = [a.username if isinstance(a, User) else a for a in assignees] 970 if body: 971 data["body"] = body 972 if due_date: 973 data["due_date"] = Util.format_time(due_date) 974 if milestone: 975 data["milestone"] = milestone.id 976 977 result = self.allspice_client.requests_post( 978 self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name), 979 data=data, 980 ) 981 982 return DesignReview.parse_response(self.allspice_client, result) 983 984 def create_milestone( 985 self, 986 title: str, 987 description: str, 988 due_date: Optional[str] = None, 989 state: str = "open", 990 ) -> "Milestone": 991 url = Repository.REPO_MILESTONES.format(owner=self.owner.username, repo=self.name) 992 data = {"title": title, "description": description, "state": state} 993 if due_date: 994 data["due_date"] = due_date 995 result = self.allspice_client.requests_post(url, data=data) 996 return Milestone.parse_response(self.allspice_client, result) 997 998 def create_gitea_hook(self, hook_url: str, events: List[str]): 999 url = f"/repos/{self.owner.username}/{self.name}/hooks" 1000 data = { 1001 "type": "gitea", 1002 "config": {"content_type": "json", "url": hook_url}, 1003 "events": events, 1004 "active": True, 1005 } 1006 return self.allspice_client.requests_post(url, data=data) 1007 1008 def list_hooks(self): 1009 url = f"/repos/{self.owner.username}/{self.name}/hooks" 1010 return self.allspice_client.requests_get(url) 1011 1012 def delete_hook(self, id: str): 1013 url = f"/repos/{self.owner.username}/{self.name}/hooks/{id}" 1014 self.allspice_client.requests_delete(url) 1015 1016 def is_collaborator(self, username) -> bool: 1017 if isinstance(username, User): 1018 username = username.username 1019 try: 1020 # returns 204 if its ok, 404 if its not 1021 self.allspice_client.requests_get( 1022 Repository.REPO_IS_COLLABORATOR % (self.owner.username, self.name, username) 1023 ) 1024 return True 1025 except Exception: 1026 return False 1027 1028 def get_users_with_access(self) -> Sequence[User]: 1029 url = f"/repos/{self.owner.username}/{self.name}/collaborators" 1030 response = self.allspice_client.requests_get(url) 1031 collabs = [User.parse_response(self.allspice_client, user) for user in response] 1032 if isinstance(self.owner, User): 1033 return [*collabs, self.owner] 1034 else: 1035 # owner must be org 1036 teams = self.owner.get_teams() 1037 for team in teams: 1038 team_repos = team.get_repos() 1039 if self.name in [n.name for n in team_repos]: 1040 collabs += team.get_members() 1041 return collabs 1042 1043 def remove_collaborator(self, user_name: str): 1044 url = f"/repos/{self.owner.username}/{self.name}/collaborators/{user_name}" 1045 self.allspice_client.requests_delete(url) 1046 1047 def transfer_ownership( 1048 self, 1049 new_owner: Union[User, Organization], 1050 new_teams: Set[Team] | FrozenSet[Team] = frozenset(), 1051 ): 1052 url = Repository.REPO_TRANSFER.format(owner=self.owner.username, repo=self.name) 1053 data: dict[str, Any] = {"new_owner": new_owner.username} 1054 if isinstance(new_owner, Organization): 1055 new_team_ids = [team.id for team in new_teams if team in new_owner.get_teams()] 1056 data["team_ids"] = new_team_ids 1057 self.allspice_client.requests_post(url, data=data) 1058 # TODO: make sure this instance is either updated or discarded 1059 1060 def get_git_content( 1061 self, 1062 ref: Optional["Ref"] = None, 1063 commit: "Optional[Commit]" = None, 1064 ) -> List[Content]: 1065 """ 1066 Get the metadata for all files in the root directory. 1067 1068 https://hub.allspice.io/api/swagger#/repository/repoGetContentsList 1069 1070 :param ref: branch or commit to get content from 1071 :param commit: commit to get content from (deprecated) 1072 """ 1073 url = f"/repos/{self.owner.username}/{self.name}/contents" 1074 data = Util.data_params_for_ref(ref or commit) 1075 1076 result = [ 1077 Content.parse_response(self.allspice_client, f) 1078 for f in self.allspice_client.requests_get(url, data) 1079 ] 1080 return result 1081 1082 def get_tree(self, ref: Optional[Ref] = None, recursive: bool = False) -> List[GitEntry]: 1083 """ 1084 Get the repository's tree on a given ref. 1085 1086 By default, this will only return the top-level entries in the tree. If you want 1087 to get the entire tree, set `recursive` to True. 1088 1089 :param ref: The ref to get the tree from. If not provided, the default branch is used. 1090 :param recursive: Whether to get the entire tree or just the top-level entries. 1091 """ 1092 1093 ref = Util.data_params_for_ref(ref).get("ref", self.default_branch) 1094 url = self.REPO_GET_TREE.format(owner=self.owner.username, repo=self.name, ref=ref) 1095 params = {"recursive": recursive} 1096 results = self.allspice_client.requests_get_paginated(url, params=params) 1097 return [GitEntry.parse_response(self.allspice_client, result) for result in results] 1098 1099 def get_file_content( 1100 self, 1101 content: Content, 1102 ref: Optional[Ref] = None, 1103 commit: Optional[Commit] = None, 1104 ) -> Union[str, List["Content"]]: 1105 """https://hub.allspice.io/api/swagger#/repository/repoGetContents""" 1106 url = f"/repos/{self.owner.username}/{self.name}/contents/{content.path}" 1107 data = Util.data_params_for_ref(ref or commit) 1108 1109 if content.type == Content.FILE: 1110 return self.allspice_client.requests_get(url, data)["content"] 1111 else: 1112 return [ 1113 Content.parse_response(self.allspice_client, f) 1114 for f in self.allspice_client.requests_get(url, data) 1115 ] 1116 1117 def get_raw_file( 1118 self, 1119 file_path: str, 1120 ref: Optional[Ref] = None, 1121 ) -> bytes: 1122 """ 1123 Get the raw, binary data of a single file. 1124 1125 Note 1: if the file you are requesting is a text file, you might want to 1126 use .decode() on the result to get a string. For example: 1127 1128 content = repo.get_raw_file("file.txt").decode("utf-8") 1129 1130 Note 2: this method will store the entire file in memory. If you want 1131 to download a large file, you might want to use `download_to_file` 1132 instead. 1133 1134 See https://hub.allspice.io/api/swagger#/repository/repoGetRawFileOrLFS 1135 1136 :param file_path: The path to the file to get. 1137 :param ref: The branch or commit to get the file from. If not provided, 1138 the default branch is used. 1139 """ 1140 1141 url = self.REPO_GET_MEDIA.format( 1142 owner=self.owner.username, 1143 repo=self.name, 1144 path=file_path, 1145 ) 1146 params = Util.data_params_for_ref(ref) 1147 return self.allspice_client.requests_get_raw(url, params=params) 1148 1149 def download_to_file( 1150 self, 1151 file_path: str, 1152 io: IO, 1153 ref: Optional[Ref] = None, 1154 ) -> None: 1155 """ 1156 Download the binary data of a file to a file-like object. 1157 1158 Example: 1159 1160 with open("schematic.DSN", "wb") as f: 1161 Repository.download_to_file("Schematics/my_schematic.DSN", f) 1162 1163 :param file_path: The path to the file in the repository from the root 1164 of the repository. 1165 :param io: The file-like object to write the data to. 1166 """ 1167 1168 url = self.allspice_client._AllSpice__get_url( 1169 self.REPO_GET_MEDIA.format( 1170 owner=self.owner.username, 1171 repo=self.name, 1172 path=file_path, 1173 ) 1174 ) 1175 params = Util.data_params_for_ref(ref) 1176 response = self.allspice_client.requests.get( 1177 url, 1178 params=params, 1179 headers=self.allspice_client.headers, 1180 stream=True, 1181 ) 1182 1183 for chunk in response.iter_content(chunk_size=4096): 1184 if chunk: 1185 io.write(chunk) 1186 1187 def get_generated_json( 1188 self, 1189 content: Union[Content, str], 1190 ref: Optional[Ref] = None, 1191 params: Optional[dict] = None, 1192 ) -> dict: 1193 """ 1194 Get the json blob for a cad file if it exists, otherwise enqueue 1195 a new job and return a 503 status. 1196 1197 WARNING: This is still experimental and not recommended for critical 1198 applications. The structure and content of the returned dictionary can 1199 change at any time. 1200 1201 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON 1202 """ 1203 1204 if isinstance(content, Content): 1205 content = content.path 1206 1207 url = self.REPO_GET_ALLSPICE_JSON.format( 1208 owner=self.owner.username, 1209 repo=self.name, 1210 content=content, 1211 ) 1212 data = Util.data_params_for_ref(ref) 1213 if params: 1214 data.update(params) 1215 if self.allspice_client.use_new_schdoc_renderer is not None: 1216 data["use_new_schdoc_renderer"] = ( 1217 "true" if self.allspice_client.use_new_schdoc_renderer else "false" 1218 ) 1219 return self.allspice_client.requests_get(url, data) 1220 1221 def get_generated_svg( 1222 self, 1223 content: Union[Content, str], 1224 ref: Optional[Ref] = None, 1225 params: Optional[dict] = None, 1226 ) -> bytes: 1227 """ 1228 Get the svg blob for a cad file if it exists, otherwise enqueue 1229 a new job and return a 503 status. 1230 1231 WARNING: This is still experimental and not yet recommended for 1232 critical applications. The content of the returned svg can change 1233 at any time. 1234 1235 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG 1236 """ 1237 1238 if isinstance(content, Content): 1239 content = content.path 1240 1241 url = self.REPO_GET_ALLSPICE_SVG.format( 1242 owner=self.owner.username, 1243 repo=self.name, 1244 content=content, 1245 ) 1246 data = Util.data_params_for_ref(ref) 1247 if params: 1248 data.update(params) 1249 if self.allspice_client.use_new_schdoc_renderer is not None: 1250 data["use_new_schdoc_renderer"] = ( 1251 "true" if self.allspice_client.use_new_schdoc_renderer else "false" 1252 ) 1253 return self.allspice_client.requests_get_raw(url, data) 1254 1255 def get_generated_projectdata( 1256 self, 1257 content: Union[Content, str], 1258 ref: Optional[Ref] = None, 1259 params: Optional[dict] = None, 1260 ) -> dict: 1261 """ 1262 Get the json project data based on the cad file provided 1263 1264 WARNING: This is still experimental and not yet recommended for 1265 critical applications. The content of the returned dictionary can change 1266 at any time. 1267 1268 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceProject 1269 """ 1270 if isinstance(content, Content): 1271 content = content.path 1272 1273 url = self.REPO_GET_ALLSPICE_PROJECT.format( 1274 owner=self.owner.username, 1275 repo=self.name, 1276 content=content, 1277 ) 1278 data = Util.data_params_for_ref(ref) 1279 if params: 1280 data.update(params) 1281 return self.allspice_client.requests_get(url, data) 1282 1283 def create_file(self, file_path: str, content: str, data: Optional[dict] = None): 1284 """https://hub.allspice.io/api/swagger#/repository/repoCreateFile""" 1285 if not data: 1286 data = {} 1287 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1288 data.update({"content": content}) 1289 return self.allspice_client.requests_post(url, data) 1290 1291 def change_file(self, file_path: str, file_sha: str, content: str, data: Optional[dict] = None): 1292 """https://hub.allspice.io/api/swagger#/repository/repoCreateFile""" 1293 if not data: 1294 data = {} 1295 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1296 data.update({"sha": file_sha, "content": content}) 1297 return self.allspice_client.requests_put(url, data) 1298 1299 def delete_file(self, file_path: str, file_sha: str, data: Optional[dict] = None): 1300 """https://hub.allspice.io/api/swagger#/repository/repoDeleteFile""" 1301 if not data: 1302 data = {} 1303 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1304 data.update({"sha": file_sha}) 1305 return self.allspice_client.requests_delete(url, data) 1306 1307 def get_archive( 1308 self, 1309 ref: Ref = "main", 1310 archive_format: ArchiveFormat = ArchiveFormat.ZIP, 1311 ) -> bytes: 1312 """ 1313 Download all the files in a specific ref of a repository as a zip or tarball 1314 archive. 1315 1316 https://hub.allspice.io/api/swagger#/repository/repoGetArchive 1317 1318 :param ref: branch or commit to get content from, defaults to the "main" branch 1319 :param archive_format: zip or tar, defaults to zip 1320 """ 1321 1322 ref_string = Util.data_params_for_ref(ref)["ref"] 1323 url = self.REPO_GET_ARCHIVE.format( 1324 owner=self.owner.username, 1325 repo=self.name, 1326 ref=ref_string, 1327 format=archive_format.value, 1328 ) 1329 return self.allspice_client.requests_get_raw(url) 1330 1331 def get_topics(self) -> list[str]: 1332 """ 1333 Gets the list of topics on this repository. 1334 1335 See http://localhost:3000/api/swagger#/repository/repoListTopics 1336 """ 1337 1338 url = self.REPO_GET_TOPICS.format( 1339 owner=self.owner.username, 1340 repo=self.name, 1341 ) 1342 return self.allspice_client.requests_get(url)["topics"] 1343 1344 def add_topic(self, topic: str): 1345 """ 1346 Adds a topic to the repository. 1347 1348 See https://hub.allspice.io/api/swagger#/repository/repoAddTopic 1349 1350 :param topic: The topic to add. Topic names must consist only of 1351 lowercase letters, numnbers and dashes (-), and cannot start with 1352 dashes. Topic names also must be under 35 characters long. 1353 """ 1354 1355 url = self.REPO_ADD_TOPIC.format(owner=self.owner.username, repo=self.name, topic=topic) 1356 self.allspice_client.requests_put(url) 1357 1358 def create_release( 1359 self, 1360 tag_name: str, 1361 name: Optional[str] = None, 1362 body: Optional[str] = None, 1363 draft: bool = False, 1364 ): 1365 """ 1366 Create a release for this repository. The release will be created for 1367 the tag with the given name. If there is no tag with this name, create 1368 the tag first. 1369 1370 See https://hub.allspice.io/api/swagger#/repository/repoCreateRelease 1371 """ 1372 1373 url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name) 1374 data = { 1375 "tag_name": tag_name, 1376 "draft": draft, 1377 } 1378 if name is not None: 1379 data["name"] = name 1380 if body is not None: 1381 data["body"] = body 1382 response = self.allspice_client.requests_post(url, data) 1383 return Release.parse_response(self.allspice_client, response, self) 1384 1385 def get_releases( 1386 self, draft: Optional[bool] = None, pre_release: Optional[bool] = None 1387 ) -> List[Release]: 1388 """ 1389 Get the list of releases for this repository. 1390 1391 See https://hub.allspice.io/api/swagger#/repository/repoListReleases 1392 """ 1393 1394 data = {} 1395 1396 if draft is not None: 1397 data["draft"] = draft 1398 if pre_release is not None: 1399 data["pre-release"] = pre_release 1400 1401 url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name) 1402 responses = self.allspice_client.requests_get_paginated(url, params=data) 1403 1404 return [ 1405 Release.parse_response(self.allspice_client, response, self) for response in responses 1406 ] 1407 1408 def get_latest_release(self) -> Release: 1409 """ 1410 Get the latest release for this repository. 1411 1412 See https://hub.allspice.io/api/swagger#/repository/repoGetLatestRelease 1413 """ 1414 1415 url = self.REPO_GET_LATEST_RELEASE.format(owner=self.owner.username, repo=self.name) 1416 response = self.allspice_client.requests_get(url) 1417 release = Release.parse_response(self.allspice_client, response, self) 1418 return release 1419 1420 def get_release_by_tag(self, tag: str) -> Release: 1421 """ 1422 Get a release by its tag. 1423 1424 See https://hub.allspice.io/api/swagger#/repository/repoGetReleaseByTag 1425 """ 1426 1427 url = self.REPO_GET_RELEASE_BY_TAG.format( 1428 owner=self.owner.username, repo=self.name, tag=tag 1429 ) 1430 response = self.allspice_client.requests_get(url) 1431 release = Release.parse_response(self.allspice_client, response, self) 1432 return release 1433 1434 def get_commit_statuses( 1435 self, 1436 commit: Union[str, Commit], 1437 sort: Optional[CommitStatusSort] = None, 1438 state: Optional[CommitStatusState] = None, 1439 ) -> List[CommitStatus]: 1440 """ 1441 Get a list of statuses for a commit. 1442 1443 This is roughly equivalent to the Commit.get_statuses method, but this 1444 method allows you to sort and filter commits and is more convenient if 1445 you have a commit SHA and don't need to get the commit itself. 1446 1447 See https://hub.allspice.io/api/swagger#/repository/repoListStatuses 1448 """ 1449 1450 if isinstance(commit, Commit): 1451 commit = commit.sha 1452 1453 params = {} 1454 if sort is not None: 1455 params["sort"] = sort.value 1456 if state is not None: 1457 params["state"] = state.value 1458 1459 url = self.REPO_GET_COMMIT_STATUS.format( 1460 owner=self.owner.username, repo=self.name, sha=commit 1461 ) 1462 response = self.allspice_client.requests_get_paginated(url, params=params) 1463 return [CommitStatus.parse_response(self.allspice_client, status) for status in response] 1464 1465 def create_commit_status( 1466 self, 1467 commit: Union[str, Commit], 1468 context: Optional[str] = None, 1469 description: Optional[str] = None, 1470 state: Optional[CommitStatusState] = None, 1471 target_url: Optional[str] = None, 1472 ) -> CommitStatus: 1473 """ 1474 Create a status on a commit. 1475 1476 See https://hub.allspice.io/api/swagger#/repository/repoCreateStatus 1477 """ 1478 1479 if isinstance(commit, Commit): 1480 commit = commit.sha 1481 1482 data = {} 1483 if context is not None: 1484 data["context"] = context 1485 if description is not None: 1486 data["description"] = description 1487 if state is not None: 1488 data["state"] = state.value 1489 if target_url is not None: 1490 data["target_url"] = target_url 1491 1492 url = self.REPO_GET_COMMIT_STATUS.format( 1493 owner=self.owner.username, repo=self.name, sha=commit 1494 ) 1495 response = self.allspice_client.requests_post(url, data=data) 1496 return CommitStatus.parse_response(self.allspice_client, response) 1497 1498 def delete(self): 1499 self.allspice_client.requests_delete( 1500 Repository.REPO_DELETE % (self.owner.username, self.name) 1501 ) 1502 self.deleted = True 1503 1504 1505class Milestone(ApiObject): 1506 allow_merge_commits: Any 1507 allow_rebase: Any 1508 allow_rebase_explicit: Any 1509 allow_squash_merge: Any 1510 archived: Any 1511 closed_at: Any 1512 closed_issues: int 1513 created_at: str 1514 default_branch: Any 1515 description: str 1516 due_on: Any 1517 has_issues: Any 1518 has_pull_requests: Any 1519 has_wiki: Any 1520 id: int 1521 ignore_whitespace_conflicts: Any 1522 name: Any 1523 open_issues: int 1524 private: Any 1525 state: str 1526 title: str 1527 updated_at: str 1528 website: Any 1529 1530 API_OBJECT = """/repos/{owner}/{repo}/milestones/{number}""" # <owner, repo> 1531 1532 def __init__(self, allspice_client): 1533 super().__init__(allspice_client) 1534 1535 def __eq__(self, other): 1536 if not isinstance(other, Milestone): 1537 return False 1538 return self.allspice_client == other.allspice_client and self.id == other.id 1539 1540 def __hash__(self): 1541 return hash(self.allspice_client) ^ hash(self.id) 1542 1543 _fields_to_parsers: ClassVar[dict] = { 1544 "closed_at": lambda _, t: Util.convert_time(t), 1545 "due_on": lambda _, t: Util.convert_time(t), 1546 } 1547 1548 _patchable_fields: ClassVar[set[str]] = { 1549 "allow_merge_commits", 1550 "allow_rebase", 1551 "allow_rebase_explicit", 1552 "allow_squash_merge", 1553 "archived", 1554 "default_branch", 1555 "description", 1556 "has_issues", 1557 "has_pull_requests", 1558 "has_wiki", 1559 "ignore_whitespace_conflicts", 1560 "name", 1561 "private", 1562 "website", 1563 } 1564 1565 @classmethod 1566 def request(cls, allspice_client, owner: str, repo: str, number: str): 1567 return cls._request(allspice_client, {"owner": owner, "repo": repo, "number": number}) 1568 1569 1570class Attachment(ReadonlyApiObject): 1571 """ 1572 An asset attached to a comment. 1573 1574 You cannot edit or delete the attachment from this object - see the instance methods 1575 Comment.edit_attachment and delete_attachment for that. 1576 """ 1577 1578 browser_download_url: str 1579 created_at: str 1580 download_count: int 1581 id: int 1582 name: str 1583 size: int 1584 uuid: str 1585 1586 def __init__(self, allspice_client): 1587 super().__init__(allspice_client) 1588 1589 def __eq__(self, other): 1590 if not isinstance(other, Attachment): 1591 return False 1592 1593 return self.uuid == other.uuid 1594 1595 def __hash__(self): 1596 return hash(self.uuid) 1597 1598 def download_to_file(self, io: IO): 1599 """ 1600 Download the raw, binary data of this Attachment to a file-like object. 1601 1602 Example: 1603 1604 with open("my_file.zip", "wb") as f: 1605 attachment.download_to_file(f) 1606 1607 :param io: The file-like object to write the data to. 1608 """ 1609 1610 response = self.allspice_client.requests.get( 1611 self.browser_download_url, 1612 headers=self.allspice_client.headers, 1613 stream=True, 1614 ) 1615 # 4kb chunks 1616 for chunk in response.iter_content(chunk_size=4096): 1617 if chunk: 1618 io.write(chunk) 1619 1620 1621class Comment(ApiObject): 1622 assets: List[Union[Any, Dict[str, Union[int, str]]]] 1623 body: str 1624 created_at: datetime 1625 html_url: str 1626 id: int 1627 issue_url: str 1628 original_author: str 1629 original_author_id: int 1630 pull_request_url: str 1631 updated_at: datetime 1632 user: User 1633 1634 API_OBJECT = """/repos/{owner}/{repo}/issues/comments/{id}""" 1635 GET_ATTACHMENTS_PATH = """/repos/{owner}/{repo}/issues/comments/{id}/assets""" 1636 ATTACHMENT_PATH = """/repos/{owner}/{repo}/issues/comments/{id}/assets/{attachment_id}""" 1637 1638 def __init__(self, allspice_client): 1639 super().__init__(allspice_client) 1640 1641 def __eq__(self, other): 1642 if not isinstance(other, Comment): 1643 return False 1644 return self.repository == other.repository and self.id == other.id 1645 1646 def __hash__(self): 1647 return hash(self.repository) ^ hash(self.id) 1648 1649 @classmethod 1650 def request(cls, allspice_client, owner: str, repo: str, id: str) -> "Comment": 1651 return cls._request(allspice_client, {"owner": owner, "repo": repo, "id": id}) 1652 1653 _fields_to_parsers: ClassVar[dict] = { 1654 "user": lambda allspice_client, r: User.parse_response(allspice_client, r), 1655 "created_at": lambda _, t: Util.convert_time(t), 1656 "updated_at": lambda _, t: Util.convert_time(t), 1657 } 1658 1659 _patchable_fields: ClassVar[set[str]] = {"body"} 1660 1661 @property 1662 def parent_url(self) -> str: 1663 """URL of the parent of this comment (the issue or the pull request)""" 1664 1665 if self.issue_url is not None and self.issue_url != "": 1666 return self.issue_url 1667 else: 1668 return self.pull_request_url 1669 1670 @cached_property 1671 def repository(self) -> Repository: 1672 """The repository this comment was posted on.""" 1673 1674 owner_name, repo_name = self.parent_url.split("/")[-4:-2] 1675 return Repository.request(self.allspice_client, owner_name, repo_name) 1676 1677 def __fields_for_path(self): 1678 return { 1679 "owner": self.repository.owner.username, 1680 "repo": self.repository.name, 1681 "id": self.id, 1682 } 1683 1684 def commit(self): 1685 self._commit(self.__fields_for_path()) 1686 1687 def delete(self): 1688 self.allspice_client.requests_delete(self.API_OBJECT.format(**self.__fields_for_path())) 1689 self.deleted = True 1690 1691 def get_attachments(self) -> List[Attachment]: 1692 """ 1693 Get all attachments on this comment. This returns Attachment objects, which 1694 contain a link to download the attachment. 1695 1696 https://hub.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments 1697 """ 1698 1699 results = self.allspice_client.requests_get( 1700 self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()) 1701 ) 1702 return [Attachment.parse_response(self.allspice_client, result) for result in results] 1703 1704 def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment: 1705 """ 1706 Create an attachment on this comment. 1707 1708 https://hub.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment 1709 1710 :param file: The file to attach. This should be a file-like object. 1711 :param name: The name of the file. If not provided, the name of the file will be 1712 used. 1713 :return: The created attachment. 1714 """ 1715 1716 args: dict[str, Any] = { 1717 "files": {"attachment": file}, 1718 } 1719 if name is not None: 1720 args["params"] = {"name": name} 1721 1722 result = self.allspice_client.requests_post( 1723 self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()), 1724 **args, 1725 ) 1726 return Attachment.parse_response(self.allspice_client, result) 1727 1728 def edit_attachment(self, attachment: Attachment, data: dict) -> Attachment: 1729 """ 1730 Edit an attachment. 1731 1732 The list of params that can be edited is available at 1733 https://hub.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment 1734 1735 :param attachment: The attachment to be edited 1736 :param data: The data parameter should be a dictionary of the fields to edit. 1737 :return: The edited attachment 1738 """ 1739 1740 args = { 1741 **self.__fields_for_path(), 1742 "attachment_id": attachment.id, 1743 } 1744 result = self.allspice_client.requests_patch( 1745 self.ATTACHMENT_PATH.format(**args), 1746 data=data, 1747 ) 1748 return Attachment.parse_response(self.allspice_client, result) 1749 1750 def delete_attachment(self, attachment: Attachment): 1751 """https://hub.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment""" 1752 1753 args = { 1754 **self.__fields_for_path(), 1755 "attachment_id": attachment.id, 1756 } 1757 self.allspice_client.requests_delete(self.ATTACHMENT_PATH.format(**args)) 1758 attachment.deleted = True 1759 1760 1761class Commit(ReadonlyApiObject): 1762 author: User 1763 commit: Dict[str, Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]] 1764 committer: Dict[str, Union[int, str, bool]] 1765 created: str 1766 files: List[Dict[str, str]] 1767 html_url: str 1768 inner_commit: Dict[str, Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]] 1769 parents: List[Union[Dict[str, str], Any]] 1770 sha: str 1771 stats: Dict[str, int] 1772 url: str 1773 1774 API_OBJECT = """/repos/{owner}/{repo}/commits/{sha}""" 1775 COMMIT_GET_STATUS = """/repos/{owner}/{repo}/commits/{sha}/status""" 1776 COMMIT_GET_STATUSES = """/repos/{owner}/{repo}/commits/{sha}/statuses""" 1777 1778 # Regex to extract owner and repo names from the url property 1779 URL_REGEXP = re.compile(r"/repos/([^/]+)/([^/]+)/git/commits") 1780 1781 def __init__(self, allspice_client): 1782 super().__init__(allspice_client) 1783 1784 _fields_to_parsers: ClassVar[dict] = { 1785 # NOTE: api may return None for commiters that are no allspice users 1786 "author": lambda allspice_client, u: User.parse_response(allspice_client, u) if u else None 1787 } 1788 1789 def __eq__(self, other): 1790 if not isinstance(other, Commit): 1791 return False 1792 return self.sha == other.sha 1793 1794 def __hash__(self): 1795 return hash(self.sha) 1796 1797 @classmethod 1798 def parse_response(cls, allspice_client, result) -> "Commit": 1799 commit_cache = result["commit"] 1800 api_object = cls(allspice_client) 1801 cls._initialize(allspice_client, api_object, result) 1802 # inner_commit for legacy reasons 1803 Commit._add_read_property("inner_commit", commit_cache, api_object) 1804 return api_object 1805 1806 def get_status(self) -> CommitCombinedStatus: 1807 """ 1808 Get a combined status consisting of all statues on this commit. 1809 1810 Note that the returned object is a CommitCombinedStatus object, which 1811 also contains a list of all statuses on the commit. 1812 1813 https://hub.allspice.io/api/swagger#/repository/repoGetCommitStatus 1814 """ 1815 1816 result = self.allspice_client.requests_get( 1817 self.COMMIT_GET_STATUS.format(**self._fields_for_path) 1818 ) 1819 return CommitCombinedStatus.parse_response(self.allspice_client, result) 1820 1821 def get_statuses(self) -> List[CommitStatus]: 1822 """ 1823 Get a list of all statuses on this commit. 1824 1825 https://hub.allspice.io/api/swagger#/repository/repoListCommitStatuses 1826 """ 1827 1828 results = self.allspice_client.requests_get( 1829 self.COMMIT_GET_STATUSES.format(**self._fields_for_path) 1830 ) 1831 return [CommitStatus.parse_response(self.allspice_client, result) for result in results] 1832 1833 @cached_property 1834 def _fields_for_path(self) -> dict[str, str]: 1835 matches = self.URL_REGEXP.search(self.url) 1836 if not matches: 1837 raise ValueError(f"Invalid commit URL: {self.url}") 1838 1839 return { 1840 "owner": matches.group(1), 1841 "repo": matches.group(2), 1842 "sha": self.sha, 1843 } 1844 1845 1846class CommitStatusState(Enum): 1847 PENDING = "pending" 1848 SUCCESS = "success" 1849 ERROR = "error" 1850 FAILURE = "failure" 1851 WARNING = "warning" 1852 1853 @classmethod 1854 def try_init(cls, value: str) -> CommitStatusState | str: 1855 """ 1856 Try converting a string to the enum, and if that fails, return the 1857 string itself. 1858 """ 1859 1860 try: 1861 return cls(value) 1862 except ValueError: 1863 return value 1864 1865 1866class CommitStatus(ReadonlyApiObject): 1867 context: str 1868 created_at: str 1869 creator: User 1870 description: str 1871 id: int 1872 status: CommitStatusState 1873 target_url: str 1874 updated_at: str 1875 url: str 1876 1877 def __init__(self, allspice_client): 1878 super().__init__(allspice_client) 1879 1880 _fields_to_parsers: ClassVar[dict] = { 1881 # Gitea/ASH doesn't actually validate that the status is a "valid" 1882 # status, so we can expect empty or unknown strings in the status field. 1883 "status": lambda _, s: CommitStatusState.try_init(s), 1884 "creator": lambda allspice_client, u: ( 1885 User.parse_response(allspice_client, u) if u else None 1886 ), 1887 } 1888 1889 def __eq__(self, other): 1890 if not isinstance(other, CommitStatus): 1891 return False 1892 return self.id == other.id 1893 1894 def __hash__(self): 1895 return hash(self.id) 1896 1897 1898class CommitCombinedStatus(ReadonlyApiObject): 1899 commit_url: str 1900 repository: Repository 1901 sha: str 1902 state: CommitStatusState 1903 statuses: List["CommitStatus"] 1904 total_count: int 1905 url: str 1906 1907 def __init__(self, allspice_client): 1908 super().__init__(allspice_client) 1909 1910 _fields_to_parsers: ClassVar[dict] = { 1911 # See CommitStatus 1912 "state": lambda _, s: CommitStatusState.try_init(s), 1913 "statuses": lambda allspice_client, statuses: [ 1914 CommitStatus.parse_response(allspice_client, status) for status in statuses 1915 ], 1916 "repository": lambda allspice_client, r: Repository.parse_response(allspice_client, r), 1917 } 1918 1919 def __eq__(self, other): 1920 if not isinstance(other, CommitCombinedStatus): 1921 return False 1922 return self.sha == other.sha 1923 1924 def __hash__(self): 1925 return hash(self.sha) 1926 1927 @classmethod 1928 def parse_response(cls, allspice_client, result) -> "CommitCombinedStatus": 1929 api_object = cls(allspice_client) 1930 cls._initialize(allspice_client, api_object, result) 1931 return api_object 1932 1933 1934class Issue(ApiObject): 1935 """ 1936 An issue on a repository. 1937 1938 Note: `Issue.assets` may not have any entries even if the issue has 1939 attachments. This happens when an issue is fetched via a bulk method like 1940 `Repository.get_issues`. In most cases, prefer using 1941 `Issue.get_attachments` to get the attachments on an issue. 1942 """ 1943 1944 assets: List[Union[Any, "Attachment"]] 1945 assignee: Any 1946 assignees: Any 1947 body: str 1948 closed_at: Any 1949 comments: int 1950 created_at: str 1951 due_date: Any 1952 html_url: str 1953 id: int 1954 is_locked: bool 1955 labels: List[Any] 1956 milestone: Optional["Milestone"] 1957 number: int 1958 original_author: str 1959 original_author_id: int 1960 pin_order: int 1961 pull_request: Any 1962 ref: str 1963 repository: Dict[str, Union[int, str]] 1964 state: str 1965 title: str 1966 updated_at: str 1967 url: str 1968 user: User 1969 1970 API_OBJECT = """/repos/{owner}/{repo}/issues/{index}""" # <owner, repo, index> 1971 GET_TIME = """/repos/%s/%s/issues/%s/times""" # <owner, repo, index> 1972 GET_COMMENTS = """/repos/{owner}/{repo}/issues/{index}/comments""" 1973 CREATE_ISSUE = """/repos/{owner}/{repo}/issues""" 1974 GET_ATTACHMENTS = """/repos/{owner}/{repo}/issues/{index}/assets""" 1975 1976 OPENED = "open" 1977 CLOSED = "closed" 1978 1979 def __init__(self, allspice_client): 1980 super().__init__(allspice_client) 1981 1982 def __eq__(self, other): 1983 if not isinstance(other, Issue): 1984 return False 1985 return self.repository == other.repository and self.id == other.id 1986 1987 def __hash__(self): 1988 return hash(self.repository) ^ hash(self.id) 1989 1990 _fields_to_parsers: ClassVar[dict] = { 1991 "milestone": lambda allspice_client, m: Milestone.parse_response(allspice_client, m), 1992 "user": lambda allspice_client, u: User.parse_response(allspice_client, u), 1993 "assets": lambda allspice_client, assets: [ 1994 Attachment.parse_response(allspice_client, a) for a in assets 1995 ], 1996 "assignee": lambda allspice_client, u: User.parse_response(allspice_client, u), 1997 "assignees": lambda allspice_client, us: [ 1998 User.parse_response(allspice_client, u) for u in us 1999 ], 2000 "state": lambda _, s: Issue.CLOSED if s == "closed" else Issue.OPENED, 2001 } 2002 2003 _parsers_to_fields: ClassVar[dict] = { 2004 "milestone": lambda m: m.id, 2005 } 2006 2007 _patchable_fields: ClassVar[set[str]] = { 2008 "assignee", 2009 "assignees", 2010 "body", 2011 "due_date", 2012 "milestone", 2013 "state", 2014 "title", 2015 } 2016 2017 def commit(self): 2018 args = { 2019 "owner": self.repository.owner.username, 2020 "repo": self.repository.name, 2021 "index": self.number, 2022 } 2023 self._commit(args) 2024 2025 @classmethod 2026 def request(cls, allspice_client, owner: str, repo: str, number: str): 2027 api_object = cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number}) 2028 # The repository in the response is a RepositoryMeta object, so request 2029 # the full repository object and add it to the issue object. 2030 repository = Repository.request(allspice_client, owner, repo) 2031 setattr(api_object, "_repository", repository) 2032 # For legacy reasons 2033 cls._add_read_property("repo", repository, api_object) 2034 return api_object 2035 2036 @classmethod 2037 def create_issue(cls, allspice_client, repo: Repository, title: str, body: str = ""): 2038 args = {"owner": repo.owner.username, "repo": repo.name} 2039 data = {"title": title, "body": body} 2040 result = allspice_client.requests_post(Issue.CREATE_ISSUE.format(**args), data=data) 2041 issue = Issue.parse_response(allspice_client, result) 2042 setattr(issue, "_repository", repo) 2043 cls._add_read_property("repo", repo, issue) 2044 return issue 2045 2046 @property 2047 def owner(self) -> Organization | User: 2048 return self.repository.owner 2049 2050 def get_time_sum(self, user: User) -> int: 2051 results = self.allspice_client.requests_get( 2052 Issue.GET_TIME % (self.owner.username, self.repository.name, self.number) 2053 ) 2054 return sum(result["time"] for result in results if result and result["user_id"] == user.id) 2055 2056 def get_times(self) -> Optional[Dict]: 2057 return self.allspice_client.requests_get( 2058 Issue.GET_TIME % (self.owner.username, self.repository.name, self.number) 2059 ) 2060 2061 def delete_time(self, time_id: str): 2062 path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times/{time_id}" 2063 self.allspice_client.requests_delete(path) 2064 2065 def add_time(self, time: int, created: Optional[str] = None, user_name: Optional[User] = None): 2066 path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times" 2067 self.allspice_client.requests_post( 2068 path, data={"created": created, "time": int(time), "user_name": user_name} 2069 ) 2070 2071 def get_comments(self) -> List[Comment]: 2072 """https://hub.allspice.io/api/swagger#/issue/issueGetComments""" 2073 2074 results = self.allspice_client.requests_get( 2075 self.GET_COMMENTS.format( 2076 owner=self.owner.username, repo=self.repository.name, index=self.number 2077 ) 2078 ) 2079 2080 return [Comment.parse_response(self.allspice_client, result) for result in results] 2081 2082 def create_comment(self, body: str) -> Comment: 2083 """https://hub.allspice.io/api/swagger#/issue/issueCreateComment""" 2084 2085 path = self.GET_COMMENTS.format( 2086 owner=self.owner.username, repo=self.repository.name, index=self.number 2087 ) 2088 2089 response = self.allspice_client.requests_post(path, data={"body": body}) 2090 return Comment.parse_response(self.allspice_client, response) 2091 2092 def get_attachments(self) -> List[Attachment]: 2093 """ 2094 Fetch all attachments on this issue. 2095 2096 Unlike the assets field, this will always fetch all attachments from the 2097 API. 2098 2099 See https://hub.allspice.io/api/swagger#/issue/issueListIssueAttachments 2100 """ 2101 2102 path = self.GET_ATTACHMENTS.format( 2103 owner=self.owner.username, repo=self.repository.name, index=self.number 2104 ) 2105 response = self.allspice_client.requests_get(path) 2106 2107 return [Attachment.parse_response(self.allspice_client, result) for result in response] 2108 2109 def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment: 2110 """ 2111 Create an attachment on this issue. 2112 2113 https://hub.allspice.io/api/swagger#/issue/issueCreateIssueAttachment 2114 2115 :param file: The file to attach. This should be a file-like object. 2116 :param name: The name of the file. If not provided, the name of the file will be 2117 used. 2118 :return: The created attachment. 2119 """ 2120 2121 args: dict[str, Any] = { 2122 "files": {"attachment": file}, 2123 } 2124 if name is not None: 2125 args["params"] = {"name": name} 2126 2127 result = self.allspice_client.requests_post( 2128 self.GET_ATTACHMENTS.format( 2129 owner=self.owner.username, repo=self.repository.name, index=self.number 2130 ), 2131 **args, 2132 ) 2133 2134 return Attachment.parse_response(self.allspice_client, result) 2135 2136 2137class DesignReviewReviewComment(ApiObject): 2138 """ 2139 A comment on a Design Review Review. 2140 """ 2141 2142 body: str 2143 commit_id: str 2144 created_at: str 2145 diff_hunk: str 2146 html_url: str 2147 id: int 2148 original_commit_id: str 2149 original_position: int 2150 path: str 2151 position: int 2152 pull_request_review_id: int 2153 pull_request_url: str 2154 resolver: Any 2155 sub_path: str 2156 updated_at: str 2157 user: User 2158 2159 def __init__(self, allspice_client): 2160 super().__init__(allspice_client) 2161 2162 _fields_to_parsers: ClassVar[dict] = { 2163 "resolver": lambda allspice_client, r: User.parse_response(allspice_client, r), 2164 "user": lambda allspice_client, u: User.parse_response(allspice_client, u), 2165 } 2166 2167 2168class DesignReviewReview(ReadonlyApiObject): 2169 """ 2170 A review on a Design Review. 2171 """ 2172 2173 body: str 2174 comments_count: int 2175 commit_id: str 2176 dismissed: bool 2177 html_url: str 2178 id: int 2179 official: bool 2180 pull_request_url: str 2181 stale: bool 2182 state: ReviewEvent 2183 submitted_at: str 2184 team: Any 2185 updated_at: str 2186 user: User 2187 2188 API_OBJECT = "/repos/{owner}/{repo}/pulls/{index}/reviews/{id}" 2189 GET_COMMENTS = "/repos/{owner}/{repo}/pulls/{index}/reviews/{id}/comments" 2190 2191 class ReviewEvent(Enum): 2192 APPROVED = "APPROVED" 2193 PENDING = "PENDING" 2194 COMMENT = "COMMENT" 2195 REQUEST_CHANGES = "REQUEST_CHANGES" 2196 REQUEST_REVIEW = "REQUEST_REVIEW" 2197 UNKNOWN = "" 2198 2199 @dataclass 2200 class ReviewComment: 2201 """ 2202 Data required to create a review comment on a design review. 2203 2204 :param body: The body of the comment. 2205 :param path: The path of the file to comment on. If you have a 2206 `Content` object, get the path using the `path` property. 2207 :param sub_path: The sub-path of the file to comment on. This is 2208 usually the page ID of the page in the multi-page document. 2209 :param new_position: The line number of the source code file after the 2210 change to add this comment on. Optional, leave unset if this is an ECAD 2211 file or the comment must be on the entire file. 2212 :param old_position: The line number of the source code file before the 2213 change to add this comment on. Optional, leave unset if this is an ECAD 2214 file or the comment must be on the entire file. 2215 """ 2216 2217 body: str 2218 path: str 2219 sub_path: Optional[str] = None 2220 new_position: Optional[int] = None 2221 old_position: Optional[int] = None 2222 2223 def __init__(self, allspice_client): 2224 super().__init__(allspice_client) 2225 2226 _fields_to_parsers: ClassVar[dict] = { 2227 "user": lambda allspice_client, u: User.parse_response(allspice_client, u), 2228 "state": lambda _, s: DesignReviewReview.ReviewEvent(s), 2229 } 2230 2231 def _get_dr_properties(self) -> dict[str, str]: 2232 """ 2233 Get the owner, repo name and design review number from the URL of this 2234 review's DR. 2235 """ 2236 2237 parts = self.pull_request_url.strip("/").split("/") 2238 2239 try: 2240 index = parts[-1] 2241 assert parts[-2] == "pulls" or parts[-2] == "pull", ( 2242 "Expected the second last part of the URL to be 'pulls' or 'pull', " 2243 ) 2244 repo = parts[-3] 2245 owner = parts[-4] 2246 2247 return { 2248 "owner": owner, 2249 "repo": repo, 2250 "index": index, 2251 } 2252 except IndexError: 2253 raise ValueError("Malformed design review URL: {}".format(self.pull_request_url)) 2254 2255 @cached_property 2256 def owner_name(self) -> str: 2257 """ 2258 The owner of the repository this review is on. 2259 """ 2260 2261 return self._get_dr_properties()["owner"] 2262 2263 @cached_property 2264 def repository_name(self) -> str: 2265 """ 2266 The name of the repository this review is on. 2267 """ 2268 2269 return self._get_dr_properties()["repo"] 2270 2271 @cached_property 2272 def index(self) -> str: 2273 """ 2274 The index of the design review this review is on. 2275 """ 2276 2277 return self._get_dr_properties()["index"] 2278 2279 def delete(self): 2280 """ 2281 Delete this review. 2282 """ 2283 2284 self.allspice_client.requests_delete( 2285 self.API_OBJECT.format(**self._get_dr_properties(), id=self.id) 2286 ) 2287 self.deleted = True 2288 2289 def get_comments(self) -> List[DesignReviewReviewComment]: 2290 """ 2291 Get the comments on this review. 2292 """ 2293 2294 result = self.allspice_client.requests_get( 2295 self.GET_COMMENTS.format(**self._get_dr_properties(), id=self.id) 2296 ) 2297 2298 return [ 2299 DesignReviewReviewComment.parse_response(self.allspice_client, comment) 2300 for comment in result 2301 ] 2302 2303 2304class DesignReview(ApiObject): 2305 """ 2306 A Design Review. See 2307 https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest. 2308 2309 Note: The base and head fields are not `Branch` objects - they are plain strings 2310 referring to the branch names. This is because DRs can exist for branches that have 2311 been deleted, which don't have an associated `Branch` object from the API. You can use 2312 the `Repository.get_branch` method to get a `Branch` object for a branch if you know 2313 it exists. 2314 """ 2315 2316 additions: Optional[int] 2317 allow_maintainer_edit: bool 2318 allow_maintainer_edits: Any 2319 assignee: User 2320 assignees: List["User"] 2321 base: str 2322 body: str 2323 changed_files: Optional[int] 2324 closed_at: Optional[str] 2325 comments: int 2326 created_at: str 2327 deletions: Optional[int] 2328 diff_url: str 2329 draft: bool 2330 due_date: Optional[str] 2331 head: str 2332 html_url: str 2333 id: int 2334 is_locked: bool 2335 labels: List[Any] 2336 merge_base: str 2337 merge_commit_sha: Optional[str] 2338 mergeable: bool 2339 merged: bool 2340 merged_at: Optional[str] 2341 merged_by: Any 2342 milestone: Any 2343 number: int 2344 patch_url: str 2345 pin_order: int 2346 repository: Optional["Repository"] 2347 requested_reviewers: Any 2348 requested_reviewers_teams: Any 2349 review_comments: int 2350 state: str 2351 title: str 2352 updated_at: str 2353 url: str 2354 user: User 2355 2356 API_OBJECT = "/repos/{owner}/{repo}/pulls/{index}" 2357 MERGE_DESIGN_REVIEW = "/repos/{owner}/{repo}/pulls/{index}/merge" 2358 GET_REVIEWS = "/repos/{owner}/{repo}/pulls/{index}/reviews" 2359 GET_REVIEW = "/repos/{owner}/{repo}/pulls/{index}/reviews/{review_id}" 2360 GET_COMMENTS = "/repos/{owner}/{repo}/issues/{index}/comments" 2361 2362 OPEN = "open" 2363 CLOSED = "closed" 2364 2365 class MergeType(Enum): 2366 MERGE = "merge" 2367 REBASE = "rebase" 2368 REBASE_MERGE = "rebase-merge" 2369 SQUASH = "squash" 2370 MANUALLY_MERGED = "manually-merged" 2371 2372 def __init__(self, allspice_client): 2373 super().__init__(allspice_client) 2374 2375 def __eq__(self, other): 2376 if not isinstance(other, DesignReview): 2377 return False 2378 return self.repository == other.repository and self.id == other.id 2379 2380 def __hash__(self): 2381 return hash(self.repository) ^ hash(self.id) 2382 2383 @classmethod 2384 def parse_response(cls, allspice_client, result) -> "DesignReview": 2385 api_object = super().parse_response(allspice_client, result) 2386 cls._add_read_property( 2387 "repository", 2388 Repository.parse_response(allspice_client, result["base"]["repo"]), 2389 api_object, 2390 ) 2391 2392 return api_object 2393 2394 @classmethod 2395 def request(cls, allspice_client, owner: str, repo: str, number: str): 2396 """See https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest""" 2397 return cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number}) 2398 2399 _fields_to_parsers: ClassVar[dict] = { 2400 "assignee": lambda allspice_client, u: User.parse_response(allspice_client, u), 2401 "assignees": lambda allspice_client, us: [ 2402 User.parse_response(allspice_client, u) for u in us 2403 ], 2404 "base": lambda _, b: b["ref"], 2405 "head": lambda _, h: h["ref"], 2406 "merged_by": lambda allspice_client, u: User.parse_response(allspice_client, u), 2407 "milestone": lambda allspice_client, m: Milestone.parse_response(allspice_client, m), 2408 "user": lambda allspice_client, u: User.parse_response(allspice_client, u), 2409 } 2410 2411 _patchable_fields: ClassVar[set[str]] = { 2412 "allow_maintainer_edits", 2413 "assignee", 2414 "assignees", 2415 "base", 2416 "body", 2417 "due_date", 2418 "milestone", 2419 "state", 2420 "title", 2421 } 2422 2423 _parsers_to_fields: ClassVar[dict] = { 2424 "assignee": lambda u: u.username, 2425 "assignees": lambda us: [u.username for u in us], 2426 "base": lambda b: b.name if isinstance(b, Branch) else b, 2427 "milestone": lambda m: m.id, 2428 } 2429 2430 def commit(self): 2431 data = self.get_dirty_fields() 2432 if "due_date" in data and data["due_date"] is None: 2433 data["unset_due_date"] = True 2434 2435 args = { 2436 "owner": self.repository.owner.username, 2437 "repo": self.repository.name, 2438 "index": self.number, 2439 } 2440 self._commit(args, data) 2441 2442 def merge(self, merge_type: MergeType): 2443 """ 2444 Merge the pull request. See 2445 https://hub.allspice.io/api/swagger#/repository/repoMergePullRequest 2446 2447 :param merge_type: The type of merge to perform. See the MergeType enum. 2448 """ 2449 2450 self.allspice_client.requests_post( 2451 self.MERGE_DESIGN_REVIEW.format( 2452 owner=self.repository.owner.username, 2453 repo=self.repository.name, 2454 index=self.number, 2455 ), 2456 data={"Do": merge_type.value}, 2457 ) 2458 2459 def get_comments(self) -> List[Comment]: 2460 """ 2461 Get the comments on this pull request, but not specifically on a review. 2462 2463 https://hub.allspice.io/api/swagger#/issue/issueGetComments 2464 2465 :return: A list of comments on this pull request. 2466 """ 2467 2468 results = self.allspice_client.requests_get( 2469 self.GET_COMMENTS.format( 2470 owner=self.repository.owner.username, 2471 repo=self.repository.name, 2472 index=self.number, 2473 ) 2474 ) 2475 return [Comment.parse_response(self.allspice_client, result) for result in results] 2476 2477 def create_comment(self, body: str): 2478 """ 2479 Create a comment on this pull request. This uses the same endpoint as the 2480 comments on issues, and will not be associated with any reviews. 2481 2482 https://hub.allspice.io/api/swagger#/issue/issueCreateComment 2483 2484 :param body: The body of the comment. 2485 :return: The comment that was created. 2486 """ 2487 2488 result = self.allspice_client.requests_post( 2489 self.GET_COMMENTS.format( 2490 owner=self.repository.owner.username, 2491 repo=self.repository.name, 2492 index=self.number, 2493 ), 2494 data={"body": body}, 2495 ) 2496 return Comment.parse_response(self.allspice_client, result) 2497 2498 def create_review( 2499 self, 2500 *, 2501 body: Optional[str] = None, 2502 event: Optional[DesignReviewReview.ReviewEvent] = None, 2503 comments: Optional[List[DesignReviewReview.ReviewComment]] = None, 2504 commit_id: Optional[str] = None, 2505 ) -> DesignReviewReview: 2506 """ 2507 Create a review on this design review. 2508 2509 https://hub.allspice.io/api/swagger#/repository/repoCreatePullRequestReview 2510 2511 Note: in most cases, you should not set the body or event when creating 2512 a review. The event is automatically set to "PENDING" when the review 2513 is created. You should then use `submit_review` to submit the review 2514 with the desired event and body. 2515 2516 :param body: The body of the review. This is the top-level comment on 2517 the review. If not provided, the review will be created with no body. 2518 :param event: The event of the review. This is the overall status of the 2519 review. See the ReviewEvent enum. If not provided, the API will 2520 default to "PENDING". 2521 :param comments: A list of comments on the review. Each comment should 2522 be a ReviewComment object. If not provided, only the base comment 2523 will be created. 2524 :param commit_id: The commit SHA to associate with the review. This is 2525 optional. 2526 """ 2527 2528 data: dict[str, Any] = {} 2529 2530 if body is not None: 2531 data["body"] = body 2532 if event is not None: 2533 data["event"] = event.value 2534 if commit_id is not None: 2535 data["commit_id"] = commit_id 2536 if comments is not None: 2537 data["comments"] = [asdict(comment) for comment in comments] 2538 2539 result = self.allspice_client.requests_post( 2540 self.GET_REVIEWS.format( 2541 owner=self.repository.owner.username, 2542 repo=self.repository.name, 2543 index=self.number, 2544 ), 2545 data=data, 2546 ) 2547 2548 return DesignReviewReview.parse_response(self.allspice_client, result) 2549 2550 def get_reviews(self) -> List[DesignReviewReview]: 2551 """ 2552 Get all reviews on this design review. 2553 2554 https://hub.allspice.io/api/swagger#/repository/repoListPullReviews 2555 """ 2556 2557 results = self.allspice_client.requests_get( 2558 self.GET_REVIEWS.format( 2559 owner=self.repository.owner.username, 2560 repo=self.repository.name, 2561 index=self.number, 2562 ) 2563 ) 2564 2565 return [ 2566 DesignReviewReview.parse_response(self.allspice_client, result) for result in results 2567 ] 2568 2569 def submit_review( 2570 self, 2571 review_id: int, 2572 event: DesignReviewReview.ReviewEvent, 2573 *, 2574 body: Optional[str] = None, 2575 ): 2576 """ 2577 Submit a review on this design review. 2578 2579 https://hub.allspice.io/api/swagger#/repository/repoSubmitPullReview 2580 2581 :param review_id: The ID of the review to submit. 2582 :param event: The event to submit the review with. See the ReviewEvent 2583 enum for the possible values. 2584 :param body: Optional body text for the review submission. 2585 """ 2586 2587 data = { 2588 "event": event.value, 2589 } 2590 if body is not None: 2591 data["body"] = body 2592 2593 result = self.allspice_client.requests_post( 2594 self.GET_REVIEW.format( 2595 owner=self.repository.owner.username, 2596 repo=self.repository.name, 2597 index=self.number, 2598 review_id=review_id, 2599 ), 2600 data=data, 2601 ) 2602 2603 return result 2604 2605 2606class Team(ApiObject): 2607 can_create_org_repo: bool 2608 description: str 2609 id: int 2610 includes_all_repositories: bool 2611 name: str 2612 organization: Optional["Organization"] 2613 permission: str 2614 units: List[str] 2615 units_map: Dict[str, str] 2616 2617 API_OBJECT = """/teams/{id}""" # <id> 2618 ADD_REPO = """/teams/%s/repos/%s/%s""" # <id, org, repo> 2619 TEAM_DELETE = """/teams/%s""" # <id> 2620 GET_MEMBERS = """/teams/%s/members""" # <id> 2621 GET_REPOS = """/teams/%s/repos""" # <id> 2622 2623 def __init__(self, allspice_client): 2624 super().__init__(allspice_client) 2625 2626 def __eq__(self, other): 2627 if not isinstance(other, Team): 2628 return False 2629 return self.organization == other.organization and self.id == other.id 2630 2631 def __hash__(self): 2632 return hash(self.organization) ^ hash(self.id) 2633 2634 _fields_to_parsers: ClassVar[dict] = { 2635 "organization": lambda allspice_client, o: Organization.parse_response(allspice_client, o) 2636 } 2637 2638 _patchable_fields: ClassVar[set[str]] = { 2639 "can_create_org_repo", 2640 "description", 2641 "includes_all_repositories", 2642 "name", 2643 "permission", 2644 "units", 2645 "units_map", 2646 } 2647 2648 @classmethod 2649 def request(cls, allspice_client, id: int): 2650 return cls._request(allspice_client, {"id": id}) 2651 2652 def commit(self): 2653 args = {"id": self.id} 2654 self._commit(args) 2655 2656 def add_user(self, user: User): 2657 """https://hub.allspice.io/api/swagger#/organization/orgAddTeamMember""" 2658 url = f"/teams/{self.id}/members/{user.login}" 2659 self.allspice_client.requests_put(url) 2660 2661 def add_repo(self, org: Organization, repo: Union[Repository, str]): 2662 if isinstance(repo, Repository): 2663 repo_name = repo.name 2664 else: 2665 repo_name = repo 2666 self.allspice_client.requests_put(Team.ADD_REPO % (self.id, org.username, repo_name)) 2667 2668 def get_members(self): 2669 """Get all users assigned to the team.""" 2670 results = self.allspice_client.requests_get_paginated( 2671 Team.GET_MEMBERS % self.id, 2672 ) 2673 return [User.parse_response(self.allspice_client, result) for result in results] 2674 2675 def get_repos(self): 2676 """Get all repos of this Team.""" 2677 results = self.allspice_client.requests_get(Team.GET_REPOS % self.id) 2678 return [Repository.parse_response(self.allspice_client, result) for result in results] 2679 2680 def delete(self): 2681 self.allspice_client.requests_delete(Team.TEAM_DELETE % self.id) 2682 self.deleted = True 2683 2684 def remove_team_member(self, user_name: str): 2685 url = f"/teams/{self.id}/members/{user_name}" 2686 self.allspice_client.requests_delete(url) 2687 2688 2689class Release(ApiObject): 2690 """ 2691 A release on a repo. 2692 """ 2693 2694 assets: List[Union[Any, Dict[str, Union[int, str]], "ReleaseAsset"]] 2695 author: User 2696 body: str 2697 created_at: str 2698 draft: bool 2699 html_url: str 2700 id: int 2701 name: str 2702 prerelease: bool 2703 published_at: str 2704 repo: Optional["Repository"] 2705 repository: Optional["Repository"] 2706 tag_name: str 2707 tarball_url: str 2708 target_commitish: str 2709 upload_url: str 2710 url: str 2711 zipball_url: str 2712 2713 API_OBJECT = "/repos/{owner}/{repo}/releases/{id}" 2714 RELEASE_CREATE_ASSET = "/repos/{owner}/{repo}/releases/{id}/assets" 2715 # Note that we don't strictly need the get_assets route, as the release 2716 # object already contains the assets. 2717 2718 def __init__(self, allspice_client): 2719 super().__init__(allspice_client) 2720 2721 def __eq__(self, other): 2722 if not isinstance(other, Release): 2723 return False 2724 return self.repo == other.repo and self.id == other.id 2725 2726 def __hash__(self): 2727 return hash(self.repo) ^ hash(self.id) 2728 2729 _fields_to_parsers: ClassVar[dict] = { 2730 "author": lambda allspice_client, author: User.parse_response(allspice_client, author), 2731 } 2732 _patchable_fields: ClassVar[set[str]] = { 2733 "body", 2734 "draft", 2735 "name", 2736 "prerelease", 2737 "tag_name", 2738 "target_commitish", 2739 } 2740 2741 @classmethod 2742 def parse_response(cls, allspice_client, result, repo) -> Release: 2743 release = super().parse_response(allspice_client, result) 2744 Release._add_read_property("repository", repo, release) 2745 # For legacy reasons 2746 Release._add_read_property("repo", repo, release) 2747 setattr( 2748 release, 2749 "_assets", 2750 [ 2751 ReleaseAsset.parse_response(allspice_client, asset, release) 2752 for asset in result["assets"] 2753 ], 2754 ) 2755 return release 2756 2757 @classmethod 2758 def request( 2759 cls, 2760 allspice_client, 2761 owner: str, 2762 repo: str, 2763 id: Optional[int] = None, 2764 ) -> Release: 2765 args = {"owner": owner, "repo": repo, "id": id} 2766 release_response = cls._get_gitea_api_object(allspice_client, args) 2767 repository = Repository.request(allspice_client, owner, repo) 2768 release = cls.parse_response(allspice_client, release_response, repository) 2769 return release 2770 2771 def commit(self): 2772 if self.repo is None: 2773 raise ValueError("Cannot commit a release without a repository.") 2774 2775 args = {"owner": self.repo.owner.username, "repo": self.repo.name, "id": self.id} 2776 self._commit(args) 2777 2778 def create_asset(self, file: IO, name: Optional[str] = None) -> ReleaseAsset: 2779 """ 2780 Create an asset for this release. 2781 2782 https://hub.allspice.io/api/swagger#/repository/repoCreateReleaseAsset 2783 2784 :param file: The file to upload. This should be a file-like object. 2785 :param name: The name of the file. 2786 :return: The created asset. 2787 """ 2788 2789 if self.repo is None: 2790 raise ValueError("Cannot commit a release without a repository.") 2791 2792 args: dict[str, Any] = {"files": {"attachment": file}} 2793 if name is not None: 2794 args["params"] = {"name": name} 2795 2796 result = self.allspice_client.requests_post( 2797 self.RELEASE_CREATE_ASSET.format( 2798 owner=self.repo.owner.username, 2799 repo=self.repo.name, 2800 id=self.id, 2801 ), 2802 **args, 2803 ) 2804 return ReleaseAsset.parse_response(self.allspice_client, result, self) 2805 2806 def delete(self): 2807 if self.repo is None: 2808 raise ValueError("Cannot commit a release without a repository.") 2809 2810 args = {"owner": self.repo.owner.username, "repo": self.repo.name, "id": self.id} 2811 self.allspice_client.requests_delete(self.API_OBJECT.format(**args)) 2812 self.deleted = True 2813 2814 2815class ReleaseAsset(ApiObject): 2816 browser_download_url: str 2817 created_at: str 2818 download_count: int 2819 id: int 2820 name: str 2821 release: Optional["Release"] 2822 size: int 2823 uuid: str 2824 2825 API_OBJECT = "/repos/{owner}/{repo}/releases/{release_id}/assets/{id}" 2826 2827 def __init__(self, allspice_client): 2828 super().__init__(allspice_client) 2829 2830 def __eq__(self, other): 2831 if not isinstance(other, ReleaseAsset): 2832 return False 2833 return self.release == other.release and self.id == other.id 2834 2835 def __hash__(self): 2836 return hash(self.release) ^ hash(self.id) 2837 2838 _fields_to_parsers: ClassVar[dict] = {} 2839 _patchable_fields: ClassVar[set[str]] = { 2840 "name", 2841 } 2842 2843 @classmethod 2844 def parse_response(cls, allspice_client, result, release) -> ReleaseAsset: 2845 asset = super().parse_response(allspice_client, result) 2846 ReleaseAsset._add_read_property("release", release, asset) 2847 return asset 2848 2849 @classmethod 2850 def request( 2851 cls, 2852 allspice_client, 2853 owner: str, 2854 repo: str, 2855 release_id: int, 2856 id: int, 2857 ) -> ReleaseAsset: 2858 args = {"owner": owner, "repo": repo, "release_id": release_id, "id": id} 2859 asset_response = cls._get_gitea_api_object(allspice_client, args) 2860 release = Release.request(allspice_client, owner, repo, release_id) 2861 asset = cls.parse_response(allspice_client, asset_response, release) 2862 return asset 2863 2864 def commit(self): 2865 if self.release is None or self.release.repo is None: 2866 raise ValueError("Cannot commit a release asset without a release or a repository.") 2867 2868 args = { 2869 "owner": self.release.repo.owner.username, 2870 "repo": self.release.repo.name, 2871 "release_id": self.release.id, 2872 "id": self.id, 2873 } 2874 self._commit(args) 2875 2876 def download(self) -> bytes: 2877 """ 2878 Download the raw, binary data of this asset. 2879 2880 Note 1: if the file you are requesting is a text file, you might want to 2881 use .decode() on the result to get a string. For example: 2882 2883 asset.download().decode("utf-8") 2884 2885 Note 2: this method will store the entire file in memory. If you are 2886 downloading a large file, you might want to use download_to_file instead. 2887 """ 2888 2889 return self.allspice_client.requests.get( 2890 self.browser_download_url, 2891 headers=self.allspice_client.headers, 2892 ).content 2893 2894 def download_to_file(self, io: IO): 2895 """ 2896 Download the raw, binary data of this asset to a file-like object. 2897 2898 Example: 2899 2900 with open("my_file.zip", "wb") as f: 2901 asset.download_to_file(f) 2902 2903 :param io: The file-like object to write the data to. 2904 """ 2905 2906 response = self.allspice_client.requests.get( 2907 self.browser_download_url, 2908 headers=self.allspice_client.headers, 2909 stream=True, 2910 ) 2911 # 4kb chunks 2912 for chunk in response.iter_content(chunk_size=4096): 2913 if chunk: 2914 io.write(chunk) 2915 2916 def delete(self): 2917 if self.release is None or self.release.repo is None: 2918 raise ValueError("Cannot commit a release asset without a release or a repository.") 2919 2920 args = { 2921 "owner": self.release.repo.owner.username, 2922 "repo": self.release.repo.name, 2923 "release_id": self.release.id, 2924 "id": self.id, 2925 } 2926 self.allspice_client.requests_delete(self.API_OBJECT.format(**args)) 2927 self.deleted = True 2928 2929 2930class Content(ReadonlyApiObject): 2931 content: Any 2932 download_url: str 2933 encoding: Any 2934 git_url: str 2935 html_url: str 2936 last_commit_sha: str 2937 name: str 2938 path: str 2939 sha: str 2940 size: int 2941 submodule_git_url: Any 2942 target: Any 2943 type: str 2944 url: str 2945 2946 FILE = "file" 2947 2948 def __init__(self, allspice_client): 2949 super().__init__(allspice_client) 2950 2951 def __eq__(self, other): 2952 if not isinstance(other, Content): 2953 return False 2954 2955 return self.sha == other.sha and self.name == other.name 2956 2957 def __hash__(self): 2958 return hash(self.sha) ^ hash(self.name) 2959 2960 2961Ref = Union[Branch, Commit, str] 2962 2963 2964class Util: 2965 @staticmethod 2966 def convert_time(time: str) -> datetime: 2967 """Parsing of strange Gitea time format ("%Y-%m-%dT%H:%M:%S:%z" but with ":" in time zone notation)""" 2968 try: 2969 return datetime.strptime(time[:-3] + "00", "%Y-%m-%dT%H:%M:%S%z") 2970 except ValueError: 2971 return datetime.strptime(time[:-3] + "00", "%Y-%m-%dT%H:%M:%S") 2972 2973 @staticmethod 2974 def format_time(time: datetime) -> str: 2975 """ 2976 Format a datetime object to Gitea's time format. 2977 2978 :param time: The time to format 2979 :return: Formatted time 2980 """ 2981 2982 return time.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S") + "Z" 2983 2984 @staticmethod 2985 def data_params_for_ref(ref: Optional[Ref]) -> Dict: 2986 """ 2987 Given a "ref", returns a dict with the ref parameter for the API call. 2988 2989 If the ref is None, returns an empty dict. You can pass this to the API 2990 directly. 2991 """ 2992 2993 if isinstance(ref, Branch): 2994 return {"ref": ref.name} 2995 elif isinstance(ref, Commit): 2996 return {"ref": ref.sha} 2997 elif ref: 2998 return {"ref": ref} 2999 else: 3000 return {}
36class Organization(ApiObject): 37 """see https://hub.allspice.io/api/swagger#/organization/orgGetAll""" 38 39 active: Optional[bool] 40 avatar_url: str 41 created: Optional[str] 42 description: str 43 email: str 44 followers_count: Optional[int] 45 following_count: Optional[int] 46 full_name: str 47 html_url: Optional[str] 48 id: int 49 is_admin: Optional[bool] 50 language: Optional[str] 51 last_login: Optional[str] 52 location: str 53 login: Optional[str] 54 login_name: Optional[str] 55 name: Optional[str] 56 prohibit_login: Optional[bool] 57 repo_admin_change_team_access: Optional[bool] 58 restricted: Optional[bool] 59 source_id: Optional[int] 60 starred_repos_count: Optional[int] 61 username: str 62 visibility: str 63 website: str 64 65 API_OBJECT = """/orgs/{name}""" # <org> 66 ORG_REPOS_REQUEST = """/orgs/%s/repos""" # <org> 67 ORG_TEAMS_REQUEST = """/orgs/%s/teams""" # <org> 68 ORG_TEAMS_CREATE = """/orgs/%s/teams""" # <org> 69 ORG_GET_MEMBERS = """/orgs/%s/members""" # <org> 70 ORG_IS_MEMBER = """/orgs/%s/members/%s""" # <org>, <username> 71 ORG_HEATMAP = """/users/%s/heatmap""" # <username> 72 73 def __init__(self, allspice_client): 74 super().__init__(allspice_client) 75 76 def __eq__(self, other): 77 if not isinstance(other, Organization): 78 return False 79 return self.allspice_client == other.allspice_client and self.name == other.name 80 81 def __hash__(self): 82 return hash(self.allspice_client) ^ hash(self.name) 83 84 @classmethod 85 def request(cls, allspice_client, name: str) -> Self: 86 return cls._request(allspice_client, {"name": name}) 87 88 @classmethod 89 def parse_response(cls, allspice_client, result) -> "Organization": 90 api_object = super().parse_response(allspice_client, result) 91 # add "name" field to make this behave similar to users for gitea < 1.18 92 # also necessary for repository-owner when org is repo owner 93 if not hasattr(api_object, "name"): 94 Organization._add_read_property("name", result["username"], api_object) 95 return api_object 96 97 _patchable_fields: ClassVar[set[str]] = { 98 "description", 99 "full_name", 100 "location", 101 "visibility", 102 "website", 103 } 104 105 def commit(self): 106 args = {"name": self.name} 107 self._commit(args) 108 109 def create_repo( 110 self, 111 repoName: str, 112 description: str = "", 113 private: bool = False, 114 autoInit=True, 115 gitignores: Optional[str] = None, 116 license: Optional[str] = None, 117 readme: str = "Default", 118 issue_labels: Optional[str] = None, 119 default_branch="master", 120 ): 121 """Create an organization Repository 122 123 Throws: 124 AlreadyExistsException: If the Repository exists already. 125 Exception: If something else went wrong. 126 """ 127 result = self.allspice_client.requests_post( 128 f"/orgs/{self.name}/repos", 129 data={ 130 "name": repoName, 131 "description": description, 132 "private": private, 133 "auto_init": autoInit, 134 "gitignores": gitignores, 135 "license": license, 136 "issue_labels": issue_labels, 137 "readme": readme, 138 "default_branch": default_branch, 139 }, 140 ) 141 if "id" in result: 142 self.allspice_client.logger.info("Successfully created Repository %s " % result["name"]) 143 else: 144 self.allspice_client.logger.error(result["message"]) 145 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 146 return Repository.parse_response(self.allspice_client, result) 147 148 def get_repositories(self) -> List["Repository"]: 149 results = self.allspice_client.requests_get_paginated( 150 Organization.ORG_REPOS_REQUEST % self.username 151 ) 152 return [Repository.parse_response(self.allspice_client, result) for result in results] 153 154 def get_repository(self, name) -> "Repository": 155 repos = self.get_repositories() 156 for repo in repos: 157 if repo.name == name: 158 return repo 159 raise NotFoundException("Repository %s not existent in organization." % name) 160 161 def get_teams(self) -> List["Team"]: 162 results = self.allspice_client.requests_get(Organization.ORG_TEAMS_REQUEST % self.username) 163 teams = [Team.parse_response(self.allspice_client, result) for result in results] 164 # organisation seems to be missing using this request, so we add org manually 165 for t in teams: 166 setattr(t, "_organization", self) 167 return teams 168 169 def get_team(self, name) -> "Team": 170 teams = self.get_teams() 171 for team in teams: 172 if team.name == name: 173 return team 174 raise NotFoundException("Team not existent in organization.") 175 176 def create_team( 177 self, 178 name: str, 179 description: str = "", 180 permission: str = "read", 181 can_create_org_repo: bool = False, 182 includes_all_repositories: bool = False, 183 units=( 184 "repo.code", 185 "repo.issues", 186 "repo.ext_issues", 187 "repo.wiki", 188 "repo.pulls", 189 "repo.releases", 190 "repo.ext_wiki", 191 ), 192 units_map={}, 193 ) -> "Team": 194 """Alias for AllSpice#create_team""" 195 # TODO: Move AllSpice#create_team to Organization#create_team and 196 # deprecate AllSpice#create_team. 197 return self.allspice_client.create_team( 198 org=self, 199 name=name, 200 description=description, 201 permission=permission, 202 can_create_org_repo=can_create_org_repo, 203 includes_all_repositories=includes_all_repositories, 204 units=units, 205 units_map=units_map, 206 ) 207 208 def get_members(self) -> List["User"]: 209 results = self.allspice_client.requests_get(Organization.ORG_GET_MEMBERS % self.username) 210 return [User.parse_response(self.allspice_client, result) for result in results] 211 212 def is_member(self, username) -> bool: 213 if isinstance(username, User): 214 username = username.username 215 try: 216 # returns 204 if its ok, 404 if its not 217 self.allspice_client.requests_get( 218 Organization.ORG_IS_MEMBER % (self.username, username) 219 ) 220 return True 221 except Exception: 222 return False 223 224 def remove_member(self, user: "User"): 225 path = f"/orgs/{self.username}/members/{user.username}" 226 self.allspice_client.requests_delete(path) 227 228 def delete(self): 229 """Delete this Organization. Invalidates this Objects data. Also deletes all Repositories owned by the User""" 230 for repo in self.get_repositories(): 231 repo.delete() 232 self.allspice_client.requests_delete(Organization.API_OBJECT.format(name=self.username)) 233 self.deleted = True 234 235 def get_heatmap(self) -> List[Tuple[datetime, int]]: 236 results = self.allspice_client.requests_get(User.USER_HEATMAP % self.username) 237 results = [ 238 (datetime.fromtimestamp(result["timestamp"]), result["contributions"]) 239 for result in results 240 ] 241 return results
see allspice.allspice.io/api/swagger#/organization/orgGetAll">https://huballspice.allspice.io/api/swagger#/organization/orgGetAll
88 @classmethod 89 def parse_response(cls, allspice_client, result) -> "Organization": 90 api_object = super().parse_response(allspice_client, result) 91 # add "name" field to make this behave similar to users for gitea < 1.18 92 # also necessary for repository-owner when org is repo owner 93 if not hasattr(api_object, "name"): 94 Organization._add_read_property("name", result["username"], api_object) 95 return api_object
109 def create_repo( 110 self, 111 repoName: str, 112 description: str = "", 113 private: bool = False, 114 autoInit=True, 115 gitignores: Optional[str] = None, 116 license: Optional[str] = None, 117 readme: str = "Default", 118 issue_labels: Optional[str] = None, 119 default_branch="master", 120 ): 121 """Create an organization Repository 122 123 Throws: 124 AlreadyExistsException: If the Repository exists already. 125 Exception: If something else went wrong. 126 """ 127 result = self.allspice_client.requests_post( 128 f"/orgs/{self.name}/repos", 129 data={ 130 "name": repoName, 131 "description": description, 132 "private": private, 133 "auto_init": autoInit, 134 "gitignores": gitignores, 135 "license": license, 136 "issue_labels": issue_labels, 137 "readme": readme, 138 "default_branch": default_branch, 139 }, 140 ) 141 if "id" in result: 142 self.allspice_client.logger.info("Successfully created Repository %s " % result["name"]) 143 else: 144 self.allspice_client.logger.error(result["message"]) 145 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 146 return Repository.parse_response(self.allspice_client, result)
Create an organization Repository
Throws: AlreadyExistsException: If the Repository exists already. Exception: If something else went wrong.
161 def get_teams(self) -> List["Team"]: 162 results = self.allspice_client.requests_get(Organization.ORG_TEAMS_REQUEST % self.username) 163 teams = [Team.parse_response(self.allspice_client, result) for result in results] 164 # organisation seems to be missing using this request, so we add org manually 165 for t in teams: 166 setattr(t, "_organization", self) 167 return teams
176 def create_team( 177 self, 178 name: str, 179 description: str = "", 180 permission: str = "read", 181 can_create_org_repo: bool = False, 182 includes_all_repositories: bool = False, 183 units=( 184 "repo.code", 185 "repo.issues", 186 "repo.ext_issues", 187 "repo.wiki", 188 "repo.pulls", 189 "repo.releases", 190 "repo.ext_wiki", 191 ), 192 units_map={}, 193 ) -> "Team": 194 """Alias for AllSpice#create_team""" 195 # TODO: Move AllSpice#create_team to Organization#create_team and 196 # deprecate AllSpice#create_team. 197 return self.allspice_client.create_team( 198 org=self, 199 name=name, 200 description=description, 201 permission=permission, 202 can_create_org_repo=can_create_org_repo, 203 includes_all_repositories=includes_all_repositories, 204 units=units, 205 units_map=units_map, 206 )
Alias for AllSpice#create_team
212 def is_member(self, username) -> bool: 213 if isinstance(username, User): 214 username = username.username 215 try: 216 # returns 204 if its ok, 404 if its not 217 self.allspice_client.requests_get( 218 Organization.ORG_IS_MEMBER % (self.username, username) 219 ) 220 return True 221 except Exception: 222 return False
228 def delete(self): 229 """Delete this Organization. Invalidates this Objects data. Also deletes all Repositories owned by the User""" 230 for repo in self.get_repositories(): 231 repo.delete() 232 self.allspice_client.requests_delete(Organization.API_OBJECT.format(name=self.username)) 233 self.deleted = True
Delete this Organization. Invalidates this Objects data. Also deletes all Repositories owned by the User
244class User(ApiObject): 245 active: bool 246 admin: Any 247 allow_create_organization: Any 248 allow_git_hook: Any 249 allow_import_local: Any 250 avatar_url: str 251 created: str 252 description: str 253 email: str 254 emails: List[Any] 255 followers_count: int 256 following_count: int 257 full_name: str 258 html_url: str 259 id: int 260 is_admin: bool 261 language: str 262 last_login: str 263 location: str 264 login: str 265 login_name: str 266 max_repo_creation: Any 267 must_change_password: Any 268 password: Any 269 prohibit_login: bool 270 restricted: bool 271 source_id: int 272 starred_repos_count: int 273 username: str 274 visibility: str 275 website: str 276 277 API_OBJECT = """/users/{name}""" # <org> 278 USER_MAIL = """/user/emails?sudo=%s""" # <name> 279 USER_PATCH = """/admin/users/%s""" # <username> 280 ADMIN_DELETE_USER = """/admin/users/%s""" # <username> 281 ADMIN_EDIT_USER = """/admin/users/{username}""" # <username> 282 USER_HEATMAP = """/users/%s/heatmap""" # <username> 283 284 def __init__(self, allspice_client): 285 super().__init__(allspice_client) 286 self._emails = [] 287 288 def __eq__(self, other): 289 if not isinstance(other, User): 290 return False 291 return self.allspice_client == other.allspice_client and self.id == other.id 292 293 def __hash__(self): 294 return hash(self.allspice_client) ^ hash(self.id) 295 296 @property 297 def emails(self): 298 self.__request_emails() 299 return self._emails 300 301 @classmethod 302 def request(cls, allspice_client, name: str) -> "User": 303 api_object = cls._request(allspice_client, {"name": name}) 304 return api_object 305 306 _patchable_fields: ClassVar[set[str]] = { 307 "active", 308 "admin", 309 "allow_create_organization", 310 "allow_git_hook", 311 "allow_import_local", 312 "email", 313 "full_name", 314 "location", 315 "login_name", 316 "max_repo_creation", 317 "must_change_password", 318 "password", 319 "prohibit_login", 320 "website", 321 } 322 323 def commit(self, login_name: str, source_id: int = 0): 324 """ 325 Unfortunately it is necessary to require the login name 326 as well as the login source (that is not supplied when getting a user) for 327 changing a user. 328 Usually source_id is 0 and the login_name is equal to the username. 329 """ 330 values = self.get_dirty_fields() 331 values.update( 332 # api-doc says that the "source_id" is necessary; works without though 333 {"login_name": login_name, "source_id": source_id} 334 ) 335 args = {"username": self.username} 336 self.allspice_client.requests_patch(User.ADMIN_EDIT_USER.format(**args), data=values) 337 self._dirty_fields = {} 338 339 def create_repo( 340 self, 341 repoName: str, 342 description: str = "", 343 private: bool = False, 344 autoInit=True, 345 gitignores: Optional[str] = None, 346 license: Optional[str] = None, 347 readme: str = "Default", 348 issue_labels: Optional[str] = None, 349 default_branch="master", 350 ): 351 """Create a user Repository 352 353 Throws: 354 AlreadyExistsException: If the Repository exists already. 355 Exception: If something else went wrong. 356 """ 357 result = self.allspice_client.requests_post( 358 "/user/repos", 359 data={ 360 "name": repoName, 361 "description": description, 362 "private": private, 363 "auto_init": autoInit, 364 "gitignores": gitignores, 365 "license": license, 366 "issue_labels": issue_labels, 367 "readme": readme, 368 "default_branch": default_branch, 369 }, 370 ) 371 if "id" in result: 372 self.allspice_client.logger.info("Successfully created Repository %s " % result["name"]) 373 else: 374 self.allspice_client.logger.error(result["message"]) 375 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 376 return Repository.parse_response(self.allspice_client, result) 377 378 def get_repositories(self) -> List["Repository"]: 379 """Get all Repositories owned by this User.""" 380 url = f"/users/{self.username}/repos" 381 results = self.allspice_client.requests_get_paginated(url) 382 return [Repository.parse_response(self.allspice_client, result) for result in results] 383 384 def get_orgs(self) -> List[Organization]: 385 """Get all Organizations this user is a member of.""" 386 url = f"/users/{self.username}/orgs" 387 results = self.allspice_client.requests_get_paginated(url) 388 return [Organization.parse_response(self.allspice_client, result) for result in results] 389 390 def get_teams(self) -> List["Team"]: 391 url = "/user/teams" 392 results = self.allspice_client.requests_get_paginated(url, sudo=self) 393 return [Team.parse_response(self.allspice_client, result) for result in results] 394 395 def get_accessible_repos(self) -> List["Repository"]: 396 """Get all Repositories accessible by the logged in User.""" 397 results = self.allspice_client.requests_get("/user/repos", sudo=self) 398 return [Repository.parse_response(self.allspice_client, result) for result in results] 399 400 def __request_emails(self): 401 result = self.allspice_client.requests_get(User.USER_MAIL % self.login) 402 # report if the adress changed by this 403 for mail in result: 404 self._emails.append(mail["email"]) 405 if mail["primary"]: 406 self._email = mail["email"] 407 408 def delete(self): 409 """Deletes this User. Also deletes all Repositories he owns.""" 410 self.allspice_client.requests_delete(User.ADMIN_DELETE_USER % self.username) 411 self.deleted = True 412 413 def get_heatmap(self) -> List[Tuple[datetime, int]]: 414 results = self.allspice_client.requests_get(User.USER_HEATMAP % self.username) 415 results = [ 416 (datetime.fromtimestamp(result["timestamp"]), result["contributions"]) 417 for result in results 418 ] 419 return results
323 def commit(self, login_name: str, source_id: int = 0): 324 """ 325 Unfortunately it is necessary to require the login name 326 as well as the login source (that is not supplied when getting a user) for 327 changing a user. 328 Usually source_id is 0 and the login_name is equal to the username. 329 """ 330 values = self.get_dirty_fields() 331 values.update( 332 # api-doc says that the "source_id" is necessary; works without though 333 {"login_name": login_name, "source_id": source_id} 334 ) 335 args = {"username": self.username} 336 self.allspice_client.requests_patch(User.ADMIN_EDIT_USER.format(**args), data=values) 337 self._dirty_fields = {}
Unfortunately it is necessary to require the login name as well as the login source (that is not supplied when getting a user) for changing a user. Usually source_id is 0 and the login_name is equal to the username.
339 def create_repo( 340 self, 341 repoName: str, 342 description: str = "", 343 private: bool = False, 344 autoInit=True, 345 gitignores: Optional[str] = None, 346 license: Optional[str] = None, 347 readme: str = "Default", 348 issue_labels: Optional[str] = None, 349 default_branch="master", 350 ): 351 """Create a user Repository 352 353 Throws: 354 AlreadyExistsException: If the Repository exists already. 355 Exception: If something else went wrong. 356 """ 357 result = self.allspice_client.requests_post( 358 "/user/repos", 359 data={ 360 "name": repoName, 361 "description": description, 362 "private": private, 363 "auto_init": autoInit, 364 "gitignores": gitignores, 365 "license": license, 366 "issue_labels": issue_labels, 367 "readme": readme, 368 "default_branch": default_branch, 369 }, 370 ) 371 if "id" in result: 372 self.allspice_client.logger.info("Successfully created Repository %s " % result["name"]) 373 else: 374 self.allspice_client.logger.error(result["message"]) 375 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 376 return Repository.parse_response(self.allspice_client, result)
Create a user Repository
Throws: AlreadyExistsException: If the Repository exists already. Exception: If something else went wrong.
378 def get_repositories(self) -> List["Repository"]: 379 """Get all Repositories owned by this User.""" 380 url = f"/users/{self.username}/repos" 381 results = self.allspice_client.requests_get_paginated(url) 382 return [Repository.parse_response(self.allspice_client, result) for result in results]
Get all Repositories owned by this User.
384 def get_orgs(self) -> List[Organization]: 385 """Get all Organizations this user is a member of.""" 386 url = f"/users/{self.username}/orgs" 387 results = self.allspice_client.requests_get_paginated(url) 388 return [Organization.parse_response(self.allspice_client, result) for result in results]
Get all Organizations this user is a member of.
395 def get_accessible_repos(self) -> List["Repository"]: 396 """Get all Repositories accessible by the logged in User.""" 397 results = self.allspice_client.requests_get("/user/repos", sudo=self) 398 return [Repository.parse_response(self.allspice_client, result) for result in results]
Get all Repositories accessible by the logged in User.
408 def delete(self): 409 """Deletes this User. Also deletes all Repositories he owns.""" 410 self.allspice_client.requests_delete(User.ADMIN_DELETE_USER % self.username) 411 self.deleted = True
Deletes this User. Also deletes all Repositories he owns.
422class Branch(ReadonlyApiObject): 423 commit: Dict[str, Optional[Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]]] 424 effective_branch_protection_name: str 425 enable_status_check: bool 426 name: str 427 protected: bool 428 required_approvals: int 429 status_check_contexts: List[Any] 430 user_can_merge: bool 431 user_can_push: bool 432 433 API_OBJECT = """/repos/{owner}/{repo}/branches/{branch}""" 434 435 def __init__(self, allspice_client): 436 super().__init__(allspice_client) 437 438 def __eq__(self, other): 439 if not isinstance(other, Branch): 440 return False 441 return self.commit == other.commit and self.name == other.name 442 443 def __hash__(self): 444 return hash(self.commit["id"]) ^ hash(self.name) 445 446 _fields_to_parsers: ClassVar[dict] = { 447 # This is not a commit object 448 # "commit": lambda allspice_client, c: Commit.parse_response(allspice_client, c) 449 } 450 451 @classmethod 452 def request(cls, allspice_client, owner: str, repo: str, branch: str): 453 return cls._request(allspice_client, {"owner": owner, "repo": repo, "branch": branch})
Inherited Members
456class GitEntry(ReadonlyApiObject): 457 """ 458 An object representing a file or directory in the Git tree. 459 """ 460 461 mode: str 462 path: str 463 sha: str 464 size: int 465 type: str 466 url: str 467 468 def __init__(self, allspice_client): 469 super().__init__(allspice_client) 470 471 def __eq__(self, other) -> bool: 472 if not isinstance(other, GitEntry): 473 return False 474 return self.sha == other.sha 475 476 def __hash__(self) -> int: 477 return hash(self.sha)
An object representing a file or directory in the Git tree.
Inherited Members
480class Repository(ApiObject): 481 allow_fast_forward_only_merge: bool 482 allow_manual_merge: Any 483 allow_merge_commits: bool 484 allow_rebase: bool 485 allow_rebase_explicit: bool 486 allow_rebase_update: bool 487 allow_squash_merge: bool 488 archived: bool 489 archived_at: str 490 autodetect_manual_merge: Any 491 avatar_url: str 492 clone_url: str 493 created_at: str 494 default_allow_maintainer_edit: bool 495 default_branch: str 496 default_delete_branch_after_merge: bool 497 default_merge_style: str 498 description: str 499 empty: bool 500 enable_prune: Any 501 external_tracker: Any 502 external_wiki: Any 503 fork: bool 504 forks_count: int 505 full_name: str 506 has_actions: bool 507 has_issues: bool 508 has_packages: bool 509 has_projects: bool 510 has_pull_requests: bool 511 has_releases: bool 512 has_wiki: bool 513 html_url: str 514 id: int 515 ignore_whitespace_conflicts: bool 516 internal: bool 517 internal_tracker: Dict[str, bool] 518 language: str 519 languages_url: str 520 licenses: Any 521 link: str 522 mirror: bool 523 mirror_interval: str 524 mirror_updated: str 525 name: str 526 object_format_name: str 527 open_issues_count: int 528 open_pr_counter: int 529 original_url: str 530 owner: Union["User", "Organization"] 531 parent: Any 532 permissions: Dict[str, bool] 533 private: bool 534 projects_mode: str 535 release_counter: int 536 repo_transfer: Any 537 size: int 538 ssh_url: str 539 stars_count: int 540 template: bool 541 topics: List[Union[Any, str]] 542 updated_at: datetime 543 url: str 544 watchers_count: int 545 website: str 546 547 API_OBJECT = """/repos/{owner}/{name}""" # <owner>, <reponame> 548 REPO_IS_COLLABORATOR = """/repos/%s/%s/collaborators/%s""" # <owner>, <reponame>, <username> 549 REPO_SEARCH = """/repos/search/""" 550 REPO_BRANCHES = """/repos/%s/%s/branches""" # <owner>, <reponame> 551 REPO_BRANCH = """/repos/{owner}/{repo}/branches/{branch}""" 552 REPO_ISSUES = """/repos/{owner}/{repo}/issues""" # <owner, reponame> 553 REPO_DESIGN_REVIEWS = """/repos/{owner}/{repo}/pulls""" 554 REPO_DELETE = """/repos/%s/%s""" # <owner>, <reponame> 555 REPO_TIMES = """/repos/%s/%s/times""" # <owner>, <reponame> 556 REPO_USER_TIME = """/repos/%s/%s/times/%s""" # <owner>, <reponame>, <username> 557 REPO_COMMITS = "/repos/%s/%s/commits" # <owner>, <reponame> 558 REPO_TRANSFER = "/repos/{owner}/{repo}/transfer" 559 REPO_MILESTONES = """/repos/{owner}/{repo}/milestones""" 560 REPO_GET_ARCHIVE = "/repos/{owner}/{repo}/archive/{ref}.{format}" 561 REPO_GET_ALLSPICE_JSON = "/repos/{owner}/{repo}/allspice_generated/json/{content}" 562 REPO_GET_ALLSPICE_SVG = "/repos/{owner}/{repo}/allspice_generated/svg/{content}" 563 REPO_GET_ALLSPICE_PROJECT = "/repos/{owner}/{repo}/allspice_generated/project/{content}" 564 REPO_GET_TOPICS = "/repos/{owner}/{repo}/topics" 565 REPO_ADD_TOPIC = "/repos/{owner}/{repo}/topics/{topic}" 566 REPO_GET_RELEASES = "/repos/{owner}/{repo}/releases" 567 REPO_GET_LATEST_RELEASE = "/repos/{owner}/{repo}/releases/latest" 568 REPO_GET_RELEASE_BY_TAG = "/repos/{owner}/{repo}/releases/tags/{tag}" 569 REPO_GET_COMMIT_STATUS = "/repos/{owner}/{repo}/statuses/{sha}" 570 REPO_GET_MEDIA = "/repos/{owner}/{repo}/media/{path}" 571 REPO_GET_TREE = "/repos/{owner}/{repo}/git/trees/{ref}" 572 573 class ArchiveFormat(Enum): 574 """ 575 Archive formats for Repository.get_archive 576 """ 577 578 TAR = "tar.gz" 579 ZIP = "zip" 580 581 class CommitStatusSort(Enum): 582 """ 583 Sort order for Repository.get_commit_status 584 """ 585 586 OLDEST = "oldest" 587 RECENT_UPDATE = "recentupdate" 588 LEAST_UPDATE = "leastupdate" 589 LEAST_INDEX = "leastindex" 590 HIGHEST_INDEX = "highestindex" 591 592 def __init__(self, allspice_client): 593 super().__init__(allspice_client) 594 595 def __eq__(self, other): 596 if not isinstance(other, Repository): 597 return False 598 return self.owner == other.owner and self.name == other.name 599 600 def __hash__(self): 601 return hash(self.owner) ^ hash(self.name) 602 603 _fields_to_parsers: ClassVar[dict] = { 604 # dont know how to tell apart user and org as owner except form email being empty. 605 "owner": lambda allspice_client, r: ( 606 Organization.parse_response(allspice_client, r) 607 if r["email"] == "" 608 else User.parse_response(allspice_client, r) 609 ), 610 "updated_at": lambda _, t: Util.convert_time(t), 611 } 612 613 @classmethod 614 def request( 615 cls, 616 allspice_client, 617 owner: str, 618 name: str, 619 ) -> Repository: 620 return cls._request(allspice_client, {"owner": owner, "name": name}) 621 622 @classmethod 623 def search( 624 cls, 625 allspice_client, 626 query: Optional[str] = None, 627 topic: bool = False, 628 include_description: bool = False, 629 user: Optional[User] = None, 630 owner_to_prioritize: Union[User, Organization, None] = None, 631 ) -> list[Repository]: 632 """ 633 Search for repositories. 634 635 See https://hub.allspice.io/api/swagger#/repository/repoSearch 636 637 :param query: The query string to search for 638 :param topic: If true, the query string will only be matched against the 639 repository's topic. 640 :param include_description: If true, the query string will be matched 641 against the repository's description as well. 642 :param user: If specified, only repositories that this user owns or 643 contributes to will be searched. 644 :param owner_to_prioritize: If specified, repositories owned by the 645 given entity will be prioritized in the search. 646 :returns: All repositories matching the query. If there are many 647 repositories matching this query, this may take some time. 648 """ 649 650 params = {} 651 652 if query is not None: 653 params["q"] = query 654 if topic: 655 params["topic"] = topic 656 if include_description: 657 params["include_description"] = include_description 658 if user is not None: 659 params["user"] = user.id 660 if owner_to_prioritize is not None: 661 params["owner_to_prioritize"] = owner_to_prioritize.id 662 663 responses = allspice_client.requests_get_paginated(cls.REPO_SEARCH, params=params) 664 665 return [Repository.parse_response(allspice_client, response) for response in responses] 666 667 _patchable_fields: ClassVar[set[str]] = { 668 "allow_manual_merge", 669 "allow_merge_commits", 670 "allow_rebase", 671 "allow_rebase_explicit", 672 "allow_rebase_update", 673 "allow_squash_merge", 674 "archived", 675 "autodetect_manual_merge", 676 "default_branch", 677 "default_delete_branch_after_merge", 678 "default_merge_style", 679 "description", 680 "enable_prune", 681 "external_tracker", 682 "external_wiki", 683 "has_actions", 684 "has_issues", 685 "has_projects", 686 "has_pull_requests", 687 "has_wiki", 688 "ignore_whitespace_conflicts", 689 "internal_tracker", 690 "mirror_interval", 691 "name", 692 "private", 693 "template", 694 "website", 695 } 696 697 def commit(self): 698 args = {"owner": self.owner.username, "name": self.name} 699 self._commit(args) 700 701 def get_branches(self) -> List["Branch"]: 702 """Get all the Branches of this Repository.""" 703 704 results = self.allspice_client.requests_get_paginated( 705 Repository.REPO_BRANCHES % (self.owner.username, self.name) 706 ) 707 return [Branch.parse_response(self.allspice_client, result) for result in results] 708 709 def get_branch(self, name: str) -> "Branch": 710 """Get a specific Branch of this Repository.""" 711 result = self.allspice_client.requests_get( 712 Repository.REPO_BRANCH.format(owner=self.owner.username, repo=self.name, branch=name) 713 ) 714 return Branch.parse_response(self.allspice_client, result) 715 716 def add_branch(self, create_from: Ref, newname: str) -> "Branch": 717 """Add a branch to the repository""" 718 # Note: will only work with gitea 1.13 or higher! 719 720 ref_name = Util.data_params_for_ref(create_from) 721 if "ref" not in ref_name: 722 raise ValueError("create_from must be a Branch, Commit or string") 723 ref_name = ref_name["ref"] 724 725 data = {"new_branch_name": newname, "old_ref_name": ref_name} 726 result = self.allspice_client.requests_post( 727 Repository.REPO_BRANCHES % (self.owner.username, self.name), data=data 728 ) 729 return Branch.parse_response(self.allspice_client, result) 730 731 def get_issues( 732 self, 733 state: Literal["open", "closed", "all"] = "all", 734 search_query: Optional[str] = None, 735 labels: Optional[List[str]] = None, 736 milestones: Optional[List[Union[Milestone, str]]] = None, 737 assignee: Optional[Union[User, str]] = None, 738 since: Optional[datetime] = None, 739 before: Optional[datetime] = None, 740 ) -> List["Issue"]: 741 """ 742 Get all Issues of this Repository (open and closed) 743 744 https://hub.allspice.io/api/swagger#/repository/repoListIssues 745 746 All params of this method are optional filters. If you don't specify a filter, it 747 will not be applied. 748 749 :param state: The state of the Issues to get. If None, all Issues are returned. 750 :param search_query: Filter issues by text. This is equivalent to searching for 751 `search_query` in the Issues on the web interface. 752 :param labels: Filter issues by labels. 753 :param milestones: Filter issues by milestones. 754 :param assignee: Filter issues by the assigned user. 755 :param since: Filter issues by the date they were created. 756 :param before: Filter issues by the date they were created. 757 :return: A list of Issues. 758 """ 759 760 data = { 761 "state": state, 762 } 763 if search_query: 764 data["q"] = search_query 765 if labels: 766 data["labels"] = ",".join(labels) 767 if milestones: 768 data["milestone"] = ",".join( 769 [ 770 milestone.name if isinstance(milestone, Milestone) else milestone 771 for milestone in milestones 772 ] 773 ) 774 if assignee: 775 if isinstance(assignee, User): 776 data["assignee"] = assignee.username 777 else: 778 data["assignee"] = assignee 779 if since: 780 data["since"] = Util.format_time(since) 781 if before: 782 data["before"] = Util.format_time(before) 783 784 results = self.allspice_client.requests_get_paginated( 785 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 786 params=data, 787 ) 788 789 issues = [] 790 for result in results: 791 issue = Issue.parse_response(self.allspice_client, result) 792 # See Issue.request 793 setattr(issue, "_repository", self) 794 # This is mostly for compatibility with an older implementation 795 Issue._add_read_property("repo", self, issue) 796 issues.append(issue) 797 798 return issues 799 800 def get_design_reviews( 801 self, 802 state: Literal["open", "closed", "all"] = "all", 803 milestone: Optional[Union[Milestone, str]] = None, 804 labels: Optional[List[str]] = None, 805 ) -> List["DesignReview"]: 806 """ 807 Get all Design Reviews of this Repository. 808 809 https://hub.allspice.io/api/swagger#/repository/repoListPullRequests 810 811 :param state: The state of the Design Reviews to get. If None, all Design Reviews 812 are returned. 813 :param milestone: The milestone of the Design Reviews to get. 814 :param labels: A list of label IDs to filter DRs by. 815 :return: A list of Design Reviews. 816 """ 817 818 params = { 819 "state": state, 820 } 821 if milestone: 822 if isinstance(milestone, Milestone): 823 params["milestone"] = milestone.name 824 else: 825 params["milestone"] = milestone 826 if labels: 827 params["labels"] = ",".join(labels) 828 829 results = self.allspice_client.requests_get_paginated( 830 self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name), 831 params=params, 832 ) 833 return [DesignReview.parse_response(self.allspice_client, result) for result in results] 834 835 def get_commits( 836 self, 837 sha: Optional[str] = None, 838 path: Optional[str] = None, 839 stat: bool = True, 840 ) -> List["Commit"]: 841 """ 842 Get all the Commits of this Repository. 843 844 https://hub.allspice.io/api/swagger#/repository/repoGetAllCommits 845 846 :param sha: The SHA of the commit to start listing commits from. 847 :param path: filepath of a file/dir. 848 :param stat: Include the number of additions and deletions in the response. 849 Disable for speedup. 850 :return: A list of Commits. 851 """ 852 853 data = {} 854 if sha: 855 data["sha"] = sha 856 if path: 857 data["path"] = path 858 if not stat: 859 data["stat"] = False 860 861 try: 862 results = self.allspice_client.requests_get_paginated( 863 Repository.REPO_COMMITS % (self.owner.username, self.name), 864 params=data, 865 ) 866 except ConflictException as err: 867 logging.warning(err) 868 logging.warning("Repository %s/%s is Empty" % (self.owner.username, self.name)) 869 results = [] 870 return [Commit.parse_response(self.allspice_client, result) for result in results] 871 872 def get_issues_state(self, state) -> List["Issue"]: 873 """ 874 DEPRECATED: Use get_issues() instead. 875 876 Get issues of state Issue.open or Issue.closed of a repository. 877 """ 878 879 assert state in [Issue.OPENED, Issue.CLOSED] 880 issues = [] 881 data = {"state": state} 882 results = self.allspice_client.requests_get_paginated( 883 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 884 params=data, 885 ) 886 for result in results: 887 issue = Issue.parse_response(self.allspice_client, result) 888 # adding data not contained in the issue response 889 # See Issue.request() 890 setattr(issue, "_repository", self) 891 Issue._add_read_property("repo", self, issue) 892 Issue._add_read_property("owner", self.owner, issue) 893 issues.append(issue) 894 return issues 895 896 def get_times(self): 897 results = self.allspice_client.requests_get( 898 Repository.REPO_TIMES % (self.owner.username, self.name) 899 ) 900 return results 901 902 def get_user_time(self, username) -> float: 903 if isinstance(username, User): 904 username = username.username 905 results = self.allspice_client.requests_get( 906 Repository.REPO_USER_TIME % (self.owner.username, self.name, username) 907 ) 908 time = sum(r["time"] for r in results) 909 return time 910 911 def get_full_name(self) -> str: 912 return self.owner.username + "/" + self.name 913 914 def create_issue(self, title, assignees=frozenset(), description="") -> ApiObject: 915 data = { 916 "assignees": assignees, 917 "body": description, 918 "closed": False, 919 "title": title, 920 } 921 result = self.allspice_client.requests_post( 922 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 923 data=data, 924 ) 925 926 issue = Issue.parse_response(self.allspice_client, result) 927 setattr(issue, "_repository", self) 928 Issue._add_read_property("repo", self, issue) 929 return issue 930 931 def create_design_review( 932 self, 933 title: str, 934 head: Union[Branch, str], 935 base: Union[Branch, str], 936 assignees: Optional[Set[Union[User, str]]] = None, 937 body: Optional[str] = None, 938 due_date: Optional[datetime] = None, 939 milestone: Optional["Milestone"] = None, 940 ) -> "DesignReview": 941 """ 942 Create a new Design Review. 943 944 See https://hub.allspice.io/api/swagger#/repository/repoCreatePullRequest 945 946 :param title: Title of the Design Review 947 :param head: Branch or name of the branch to merge into the base branch 948 :param base: Branch or name of the branch to merge into 949 :param assignees: Optional. A list of users to assign this review. List can be of 950 User objects or of usernames. 951 :param body: An Optional Description for the Design Review. 952 :param due_date: An Optional Due date for the Design Review. 953 :param milestone: An Optional Milestone for the Design Review 954 :return: The created Design Review 955 """ 956 957 data: dict[str, Any] = { 958 "title": title, 959 } 960 961 if isinstance(head, Branch): 962 data["head"] = head.name 963 else: 964 data["head"] = head 965 if isinstance(base, Branch): 966 data["base"] = base.name 967 else: 968 data["base"] = base 969 if assignees: 970 data["assignees"] = [a.username if isinstance(a, User) else a for a in assignees] 971 if body: 972 data["body"] = body 973 if due_date: 974 data["due_date"] = Util.format_time(due_date) 975 if milestone: 976 data["milestone"] = milestone.id 977 978 result = self.allspice_client.requests_post( 979 self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name), 980 data=data, 981 ) 982 983 return DesignReview.parse_response(self.allspice_client, result) 984 985 def create_milestone( 986 self, 987 title: str, 988 description: str, 989 due_date: Optional[str] = None, 990 state: str = "open", 991 ) -> "Milestone": 992 url = Repository.REPO_MILESTONES.format(owner=self.owner.username, repo=self.name) 993 data = {"title": title, "description": description, "state": state} 994 if due_date: 995 data["due_date"] = due_date 996 result = self.allspice_client.requests_post(url, data=data) 997 return Milestone.parse_response(self.allspice_client, result) 998 999 def create_gitea_hook(self, hook_url: str, events: List[str]): 1000 url = f"/repos/{self.owner.username}/{self.name}/hooks" 1001 data = { 1002 "type": "gitea", 1003 "config": {"content_type": "json", "url": hook_url}, 1004 "events": events, 1005 "active": True, 1006 } 1007 return self.allspice_client.requests_post(url, data=data) 1008 1009 def list_hooks(self): 1010 url = f"/repos/{self.owner.username}/{self.name}/hooks" 1011 return self.allspice_client.requests_get(url) 1012 1013 def delete_hook(self, id: str): 1014 url = f"/repos/{self.owner.username}/{self.name}/hooks/{id}" 1015 self.allspice_client.requests_delete(url) 1016 1017 def is_collaborator(self, username) -> bool: 1018 if isinstance(username, User): 1019 username = username.username 1020 try: 1021 # returns 204 if its ok, 404 if its not 1022 self.allspice_client.requests_get( 1023 Repository.REPO_IS_COLLABORATOR % (self.owner.username, self.name, username) 1024 ) 1025 return True 1026 except Exception: 1027 return False 1028 1029 def get_users_with_access(self) -> Sequence[User]: 1030 url = f"/repos/{self.owner.username}/{self.name}/collaborators" 1031 response = self.allspice_client.requests_get(url) 1032 collabs = [User.parse_response(self.allspice_client, user) for user in response] 1033 if isinstance(self.owner, User): 1034 return [*collabs, self.owner] 1035 else: 1036 # owner must be org 1037 teams = self.owner.get_teams() 1038 for team in teams: 1039 team_repos = team.get_repos() 1040 if self.name in [n.name for n in team_repos]: 1041 collabs += team.get_members() 1042 return collabs 1043 1044 def remove_collaborator(self, user_name: str): 1045 url = f"/repos/{self.owner.username}/{self.name}/collaborators/{user_name}" 1046 self.allspice_client.requests_delete(url) 1047 1048 def transfer_ownership( 1049 self, 1050 new_owner: Union[User, Organization], 1051 new_teams: Set[Team] | FrozenSet[Team] = frozenset(), 1052 ): 1053 url = Repository.REPO_TRANSFER.format(owner=self.owner.username, repo=self.name) 1054 data: dict[str, Any] = {"new_owner": new_owner.username} 1055 if isinstance(new_owner, Organization): 1056 new_team_ids = [team.id for team in new_teams if team in new_owner.get_teams()] 1057 data["team_ids"] = new_team_ids 1058 self.allspice_client.requests_post(url, data=data) 1059 # TODO: make sure this instance is either updated or discarded 1060 1061 def get_git_content( 1062 self, 1063 ref: Optional["Ref"] = None, 1064 commit: "Optional[Commit]" = None, 1065 ) -> List[Content]: 1066 """ 1067 Get the metadata for all files in the root directory. 1068 1069 https://hub.allspice.io/api/swagger#/repository/repoGetContentsList 1070 1071 :param ref: branch or commit to get content from 1072 :param commit: commit to get content from (deprecated) 1073 """ 1074 url = f"/repos/{self.owner.username}/{self.name}/contents" 1075 data = Util.data_params_for_ref(ref or commit) 1076 1077 result = [ 1078 Content.parse_response(self.allspice_client, f) 1079 for f in self.allspice_client.requests_get(url, data) 1080 ] 1081 return result 1082 1083 def get_tree(self, ref: Optional[Ref] = None, recursive: bool = False) -> List[GitEntry]: 1084 """ 1085 Get the repository's tree on a given ref. 1086 1087 By default, this will only return the top-level entries in the tree. If you want 1088 to get the entire tree, set `recursive` to True. 1089 1090 :param ref: The ref to get the tree from. If not provided, the default branch is used. 1091 :param recursive: Whether to get the entire tree or just the top-level entries. 1092 """ 1093 1094 ref = Util.data_params_for_ref(ref).get("ref", self.default_branch) 1095 url = self.REPO_GET_TREE.format(owner=self.owner.username, repo=self.name, ref=ref) 1096 params = {"recursive": recursive} 1097 results = self.allspice_client.requests_get_paginated(url, params=params) 1098 return [GitEntry.parse_response(self.allspice_client, result) for result in results] 1099 1100 def get_file_content( 1101 self, 1102 content: Content, 1103 ref: Optional[Ref] = None, 1104 commit: Optional[Commit] = None, 1105 ) -> Union[str, List["Content"]]: 1106 """https://hub.allspice.io/api/swagger#/repository/repoGetContents""" 1107 url = f"/repos/{self.owner.username}/{self.name}/contents/{content.path}" 1108 data = Util.data_params_for_ref(ref or commit) 1109 1110 if content.type == Content.FILE: 1111 return self.allspice_client.requests_get(url, data)["content"] 1112 else: 1113 return [ 1114 Content.parse_response(self.allspice_client, f) 1115 for f in self.allspice_client.requests_get(url, data) 1116 ] 1117 1118 def get_raw_file( 1119 self, 1120 file_path: str, 1121 ref: Optional[Ref] = None, 1122 ) -> bytes: 1123 """ 1124 Get the raw, binary data of a single file. 1125 1126 Note 1: if the file you are requesting is a text file, you might want to 1127 use .decode() on the result to get a string. For example: 1128 1129 content = repo.get_raw_file("file.txt").decode("utf-8") 1130 1131 Note 2: this method will store the entire file in memory. If you want 1132 to download a large file, you might want to use `download_to_file` 1133 instead. 1134 1135 See https://hub.allspice.io/api/swagger#/repository/repoGetRawFileOrLFS 1136 1137 :param file_path: The path to the file to get. 1138 :param ref: The branch or commit to get the file from. If not provided, 1139 the default branch is used. 1140 """ 1141 1142 url = self.REPO_GET_MEDIA.format( 1143 owner=self.owner.username, 1144 repo=self.name, 1145 path=file_path, 1146 ) 1147 params = Util.data_params_for_ref(ref) 1148 return self.allspice_client.requests_get_raw(url, params=params) 1149 1150 def download_to_file( 1151 self, 1152 file_path: str, 1153 io: IO, 1154 ref: Optional[Ref] = None, 1155 ) -> None: 1156 """ 1157 Download the binary data of a file to a file-like object. 1158 1159 Example: 1160 1161 with open("schematic.DSN", "wb") as f: 1162 Repository.download_to_file("Schematics/my_schematic.DSN", f) 1163 1164 :param file_path: The path to the file in the repository from the root 1165 of the repository. 1166 :param io: The file-like object to write the data to. 1167 """ 1168 1169 url = self.allspice_client._AllSpice__get_url( 1170 self.REPO_GET_MEDIA.format( 1171 owner=self.owner.username, 1172 repo=self.name, 1173 path=file_path, 1174 ) 1175 ) 1176 params = Util.data_params_for_ref(ref) 1177 response = self.allspice_client.requests.get( 1178 url, 1179 params=params, 1180 headers=self.allspice_client.headers, 1181 stream=True, 1182 ) 1183 1184 for chunk in response.iter_content(chunk_size=4096): 1185 if chunk: 1186 io.write(chunk) 1187 1188 def get_generated_json( 1189 self, 1190 content: Union[Content, str], 1191 ref: Optional[Ref] = None, 1192 params: Optional[dict] = None, 1193 ) -> dict: 1194 """ 1195 Get the json blob for a cad file if it exists, otherwise enqueue 1196 a new job and return a 503 status. 1197 1198 WARNING: This is still experimental and not recommended for critical 1199 applications. The structure and content of the returned dictionary can 1200 change at any time. 1201 1202 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON 1203 """ 1204 1205 if isinstance(content, Content): 1206 content = content.path 1207 1208 url = self.REPO_GET_ALLSPICE_JSON.format( 1209 owner=self.owner.username, 1210 repo=self.name, 1211 content=content, 1212 ) 1213 data = Util.data_params_for_ref(ref) 1214 if params: 1215 data.update(params) 1216 if self.allspice_client.use_new_schdoc_renderer is not None: 1217 data["use_new_schdoc_renderer"] = ( 1218 "true" if self.allspice_client.use_new_schdoc_renderer else "false" 1219 ) 1220 return self.allspice_client.requests_get(url, data) 1221 1222 def get_generated_svg( 1223 self, 1224 content: Union[Content, str], 1225 ref: Optional[Ref] = None, 1226 params: Optional[dict] = None, 1227 ) -> bytes: 1228 """ 1229 Get the svg blob for a cad file if it exists, otherwise enqueue 1230 a new job and return a 503 status. 1231 1232 WARNING: This is still experimental and not yet recommended for 1233 critical applications. The content of the returned svg can change 1234 at any time. 1235 1236 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG 1237 """ 1238 1239 if isinstance(content, Content): 1240 content = content.path 1241 1242 url = self.REPO_GET_ALLSPICE_SVG.format( 1243 owner=self.owner.username, 1244 repo=self.name, 1245 content=content, 1246 ) 1247 data = Util.data_params_for_ref(ref) 1248 if params: 1249 data.update(params) 1250 if self.allspice_client.use_new_schdoc_renderer is not None: 1251 data["use_new_schdoc_renderer"] = ( 1252 "true" if self.allspice_client.use_new_schdoc_renderer else "false" 1253 ) 1254 return self.allspice_client.requests_get_raw(url, data) 1255 1256 def get_generated_projectdata( 1257 self, 1258 content: Union[Content, str], 1259 ref: Optional[Ref] = None, 1260 params: Optional[dict] = None, 1261 ) -> dict: 1262 """ 1263 Get the json project data based on the cad file provided 1264 1265 WARNING: This is still experimental and not yet recommended for 1266 critical applications. The content of the returned dictionary can change 1267 at any time. 1268 1269 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceProject 1270 """ 1271 if isinstance(content, Content): 1272 content = content.path 1273 1274 url = self.REPO_GET_ALLSPICE_PROJECT.format( 1275 owner=self.owner.username, 1276 repo=self.name, 1277 content=content, 1278 ) 1279 data = Util.data_params_for_ref(ref) 1280 if params: 1281 data.update(params) 1282 return self.allspice_client.requests_get(url, data) 1283 1284 def create_file(self, file_path: str, content: str, data: Optional[dict] = None): 1285 """https://hub.allspice.io/api/swagger#/repository/repoCreateFile""" 1286 if not data: 1287 data = {} 1288 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1289 data.update({"content": content}) 1290 return self.allspice_client.requests_post(url, data) 1291 1292 def change_file(self, file_path: str, file_sha: str, content: str, data: Optional[dict] = None): 1293 """https://hub.allspice.io/api/swagger#/repository/repoCreateFile""" 1294 if not data: 1295 data = {} 1296 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1297 data.update({"sha": file_sha, "content": content}) 1298 return self.allspice_client.requests_put(url, data) 1299 1300 def delete_file(self, file_path: str, file_sha: str, data: Optional[dict] = None): 1301 """https://hub.allspice.io/api/swagger#/repository/repoDeleteFile""" 1302 if not data: 1303 data = {} 1304 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1305 data.update({"sha": file_sha}) 1306 return self.allspice_client.requests_delete(url, data) 1307 1308 def get_archive( 1309 self, 1310 ref: Ref = "main", 1311 archive_format: ArchiveFormat = ArchiveFormat.ZIP, 1312 ) -> bytes: 1313 """ 1314 Download all the files in a specific ref of a repository as a zip or tarball 1315 archive. 1316 1317 https://hub.allspice.io/api/swagger#/repository/repoGetArchive 1318 1319 :param ref: branch or commit to get content from, defaults to the "main" branch 1320 :param archive_format: zip or tar, defaults to zip 1321 """ 1322 1323 ref_string = Util.data_params_for_ref(ref)["ref"] 1324 url = self.REPO_GET_ARCHIVE.format( 1325 owner=self.owner.username, 1326 repo=self.name, 1327 ref=ref_string, 1328 format=archive_format.value, 1329 ) 1330 return self.allspice_client.requests_get_raw(url) 1331 1332 def get_topics(self) -> list[str]: 1333 """ 1334 Gets the list of topics on this repository. 1335 1336 See http://localhost:3000/api/swagger#/repository/repoListTopics 1337 """ 1338 1339 url = self.REPO_GET_TOPICS.format( 1340 owner=self.owner.username, 1341 repo=self.name, 1342 ) 1343 return self.allspice_client.requests_get(url)["topics"] 1344 1345 def add_topic(self, topic: str): 1346 """ 1347 Adds a topic to the repository. 1348 1349 See https://hub.allspice.io/api/swagger#/repository/repoAddTopic 1350 1351 :param topic: The topic to add. Topic names must consist only of 1352 lowercase letters, numnbers and dashes (-), and cannot start with 1353 dashes. Topic names also must be under 35 characters long. 1354 """ 1355 1356 url = self.REPO_ADD_TOPIC.format(owner=self.owner.username, repo=self.name, topic=topic) 1357 self.allspice_client.requests_put(url) 1358 1359 def create_release( 1360 self, 1361 tag_name: str, 1362 name: Optional[str] = None, 1363 body: Optional[str] = None, 1364 draft: bool = False, 1365 ): 1366 """ 1367 Create a release for this repository. The release will be created for 1368 the tag with the given name. If there is no tag with this name, create 1369 the tag first. 1370 1371 See https://hub.allspice.io/api/swagger#/repository/repoCreateRelease 1372 """ 1373 1374 url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name) 1375 data = { 1376 "tag_name": tag_name, 1377 "draft": draft, 1378 } 1379 if name is not None: 1380 data["name"] = name 1381 if body is not None: 1382 data["body"] = body 1383 response = self.allspice_client.requests_post(url, data) 1384 return Release.parse_response(self.allspice_client, response, self) 1385 1386 def get_releases( 1387 self, draft: Optional[bool] = None, pre_release: Optional[bool] = None 1388 ) -> List[Release]: 1389 """ 1390 Get the list of releases for this repository. 1391 1392 See https://hub.allspice.io/api/swagger#/repository/repoListReleases 1393 """ 1394 1395 data = {} 1396 1397 if draft is not None: 1398 data["draft"] = draft 1399 if pre_release is not None: 1400 data["pre-release"] = pre_release 1401 1402 url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name) 1403 responses = self.allspice_client.requests_get_paginated(url, params=data) 1404 1405 return [ 1406 Release.parse_response(self.allspice_client, response, self) for response in responses 1407 ] 1408 1409 def get_latest_release(self) -> Release: 1410 """ 1411 Get the latest release for this repository. 1412 1413 See https://hub.allspice.io/api/swagger#/repository/repoGetLatestRelease 1414 """ 1415 1416 url = self.REPO_GET_LATEST_RELEASE.format(owner=self.owner.username, repo=self.name) 1417 response = self.allspice_client.requests_get(url) 1418 release = Release.parse_response(self.allspice_client, response, self) 1419 return release 1420 1421 def get_release_by_tag(self, tag: str) -> Release: 1422 """ 1423 Get a release by its tag. 1424 1425 See https://hub.allspice.io/api/swagger#/repository/repoGetReleaseByTag 1426 """ 1427 1428 url = self.REPO_GET_RELEASE_BY_TAG.format( 1429 owner=self.owner.username, repo=self.name, tag=tag 1430 ) 1431 response = self.allspice_client.requests_get(url) 1432 release = Release.parse_response(self.allspice_client, response, self) 1433 return release 1434 1435 def get_commit_statuses( 1436 self, 1437 commit: Union[str, Commit], 1438 sort: Optional[CommitStatusSort] = None, 1439 state: Optional[CommitStatusState] = None, 1440 ) -> List[CommitStatus]: 1441 """ 1442 Get a list of statuses for a commit. 1443 1444 This is roughly equivalent to the Commit.get_statuses method, but this 1445 method allows you to sort and filter commits and is more convenient if 1446 you have a commit SHA and don't need to get the commit itself. 1447 1448 See https://hub.allspice.io/api/swagger#/repository/repoListStatuses 1449 """ 1450 1451 if isinstance(commit, Commit): 1452 commit = commit.sha 1453 1454 params = {} 1455 if sort is not None: 1456 params["sort"] = sort.value 1457 if state is not None: 1458 params["state"] = state.value 1459 1460 url = self.REPO_GET_COMMIT_STATUS.format( 1461 owner=self.owner.username, repo=self.name, sha=commit 1462 ) 1463 response = self.allspice_client.requests_get_paginated(url, params=params) 1464 return [CommitStatus.parse_response(self.allspice_client, status) for status in response] 1465 1466 def create_commit_status( 1467 self, 1468 commit: Union[str, Commit], 1469 context: Optional[str] = None, 1470 description: Optional[str] = None, 1471 state: Optional[CommitStatusState] = None, 1472 target_url: Optional[str] = None, 1473 ) -> CommitStatus: 1474 """ 1475 Create a status on a commit. 1476 1477 See https://hub.allspice.io/api/swagger#/repository/repoCreateStatus 1478 """ 1479 1480 if isinstance(commit, Commit): 1481 commit = commit.sha 1482 1483 data = {} 1484 if context is not None: 1485 data["context"] = context 1486 if description is not None: 1487 data["description"] = description 1488 if state is not None: 1489 data["state"] = state.value 1490 if target_url is not None: 1491 data["target_url"] = target_url 1492 1493 url = self.REPO_GET_COMMIT_STATUS.format( 1494 owner=self.owner.username, repo=self.name, sha=commit 1495 ) 1496 response = self.allspice_client.requests_post(url, data=data) 1497 return CommitStatus.parse_response(self.allspice_client, response) 1498 1499 def delete(self): 1500 self.allspice_client.requests_delete( 1501 Repository.REPO_DELETE % (self.owner.username, self.name) 1502 ) 1503 self.deleted = True
622 @classmethod 623 def search( 624 cls, 625 allspice_client, 626 query: Optional[str] = None, 627 topic: bool = False, 628 include_description: bool = False, 629 user: Optional[User] = None, 630 owner_to_prioritize: Union[User, Organization, None] = None, 631 ) -> list[Repository]: 632 """ 633 Search for repositories. 634 635 See https://hub.allspice.io/api/swagger#/repository/repoSearch 636 637 :param query: The query string to search for 638 :param topic: If true, the query string will only be matched against the 639 repository's topic. 640 :param include_description: If true, the query string will be matched 641 against the repository's description as well. 642 :param user: If specified, only repositories that this user owns or 643 contributes to will be searched. 644 :param owner_to_prioritize: If specified, repositories owned by the 645 given entity will be prioritized in the search. 646 :returns: All repositories matching the query. If there are many 647 repositories matching this query, this may take some time. 648 """ 649 650 params = {} 651 652 if query is not None: 653 params["q"] = query 654 if topic: 655 params["topic"] = topic 656 if include_description: 657 params["include_description"] = include_description 658 if user is not None: 659 params["user"] = user.id 660 if owner_to_prioritize is not None: 661 params["owner_to_prioritize"] = owner_to_prioritize.id 662 663 responses = allspice_client.requests_get_paginated(cls.REPO_SEARCH, params=params) 664 665 return [Repository.parse_response(allspice_client, response) for response in responses]
Search for repositories.
See allspice.allspice.io/api/swagger#/repository/repoSearch">https://huballspice.allspice.io/api/swagger#/repository/repoSearch
Parameters
- query: The query string to search for
- topic: If true, the query string will only be matched against the repository's topic.
- include_description: If true, the query string will be matched against the repository's description as well.
- user: If specified, only repositories that this user owns or contributes to will be searched.
- owner_to_prioritize: If specified, repositories owned by the given entity will be prioritized in the search. :returns: All repositories matching the query. If there are many repositories matching this query, this may take some time.
701 def get_branches(self) -> List["Branch"]: 702 """Get all the Branches of this Repository.""" 703 704 results = self.allspice_client.requests_get_paginated( 705 Repository.REPO_BRANCHES % (self.owner.username, self.name) 706 ) 707 return [Branch.parse_response(self.allspice_client, result) for result in results]
Get all the Branches of this Repository.
709 def get_branch(self, name: str) -> "Branch": 710 """Get a specific Branch of this Repository.""" 711 result = self.allspice_client.requests_get( 712 Repository.REPO_BRANCH.format(owner=self.owner.username, repo=self.name, branch=name) 713 ) 714 return Branch.parse_response(self.allspice_client, result)
Get a specific Branch of this Repository.
716 def add_branch(self, create_from: Ref, newname: str) -> "Branch": 717 """Add a branch to the repository""" 718 # Note: will only work with gitea 1.13 or higher! 719 720 ref_name = Util.data_params_for_ref(create_from) 721 if "ref" not in ref_name: 722 raise ValueError("create_from must be a Branch, Commit or string") 723 ref_name = ref_name["ref"] 724 725 data = {"new_branch_name": newname, "old_ref_name": ref_name} 726 result = self.allspice_client.requests_post( 727 Repository.REPO_BRANCHES % (self.owner.username, self.name), data=data 728 ) 729 return Branch.parse_response(self.allspice_client, result)
Add a branch to the repository
731 def get_issues( 732 self, 733 state: Literal["open", "closed", "all"] = "all", 734 search_query: Optional[str] = None, 735 labels: Optional[List[str]] = None, 736 milestones: Optional[List[Union[Milestone, str]]] = None, 737 assignee: Optional[Union[User, str]] = None, 738 since: Optional[datetime] = None, 739 before: Optional[datetime] = None, 740 ) -> List["Issue"]: 741 """ 742 Get all Issues of this Repository (open and closed) 743 744 https://hub.allspice.io/api/swagger#/repository/repoListIssues 745 746 All params of this method are optional filters. If you don't specify a filter, it 747 will not be applied. 748 749 :param state: The state of the Issues to get. If None, all Issues are returned. 750 :param search_query: Filter issues by text. This is equivalent to searching for 751 `search_query` in the Issues on the web interface. 752 :param labels: Filter issues by labels. 753 :param milestones: Filter issues by milestones. 754 :param assignee: Filter issues by the assigned user. 755 :param since: Filter issues by the date they were created. 756 :param before: Filter issues by the date they were created. 757 :return: A list of Issues. 758 """ 759 760 data = { 761 "state": state, 762 } 763 if search_query: 764 data["q"] = search_query 765 if labels: 766 data["labels"] = ",".join(labels) 767 if milestones: 768 data["milestone"] = ",".join( 769 [ 770 milestone.name if isinstance(milestone, Milestone) else milestone 771 for milestone in milestones 772 ] 773 ) 774 if assignee: 775 if isinstance(assignee, User): 776 data["assignee"] = assignee.username 777 else: 778 data["assignee"] = assignee 779 if since: 780 data["since"] = Util.format_time(since) 781 if before: 782 data["before"] = Util.format_time(before) 783 784 results = self.allspice_client.requests_get_paginated( 785 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 786 params=data, 787 ) 788 789 issues = [] 790 for result in results: 791 issue = Issue.parse_response(self.allspice_client, result) 792 # See Issue.request 793 setattr(issue, "_repository", self) 794 # This is mostly for compatibility with an older implementation 795 Issue._add_read_property("repo", self, issue) 796 issues.append(issue) 797 798 return issues
Get all Issues of this Repository (open and closed)
allspice.allspice.io/api/swagger#/repository/repoListIssues">https://huballspice.allspice.io/api/swagger#/repository/repoListIssues
All params of this method are optional filters. If you don't specify a filter, it will not be applied.
Parameters
- state: The state of the Issues to get. If None, all Issues are returned.
- search_query: Filter issues by text. This is equivalent to searching for
search_queryin the Issues on the web interface. - labels: Filter issues by labels.
- milestones: Filter issues by milestones.
- assignee: Filter issues by the assigned user.
- since: Filter issues by the date they were created.
- before: Filter issues by the date they were created.
Returns
A list of Issues.
800 def get_design_reviews( 801 self, 802 state: Literal["open", "closed", "all"] = "all", 803 milestone: Optional[Union[Milestone, str]] = None, 804 labels: Optional[List[str]] = None, 805 ) -> List["DesignReview"]: 806 """ 807 Get all Design Reviews of this Repository. 808 809 https://hub.allspice.io/api/swagger#/repository/repoListPullRequests 810 811 :param state: The state of the Design Reviews to get. If None, all Design Reviews 812 are returned. 813 :param milestone: The milestone of the Design Reviews to get. 814 :param labels: A list of label IDs to filter DRs by. 815 :return: A list of Design Reviews. 816 """ 817 818 params = { 819 "state": state, 820 } 821 if milestone: 822 if isinstance(milestone, Milestone): 823 params["milestone"] = milestone.name 824 else: 825 params["milestone"] = milestone 826 if labels: 827 params["labels"] = ",".join(labels) 828 829 results = self.allspice_client.requests_get_paginated( 830 self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name), 831 params=params, 832 ) 833 return [DesignReview.parse_response(self.allspice_client, result) for result in results]
Get all Design Reviews of this Repository.
allspice.allspice.io/api/swagger#/repository/repoListPullRequests">https://huballspice.allspice.io/api/swagger#/repository/repoListPullRequests
Parameters
- state: The state of the Design Reviews to get. If None, all Design Reviews are returned.
- milestone: The milestone of the Design Reviews to get.
- labels: A list of label IDs to filter DRs by.
Returns
A list of Design Reviews.
835 def get_commits( 836 self, 837 sha: Optional[str] = None, 838 path: Optional[str] = None, 839 stat: bool = True, 840 ) -> List["Commit"]: 841 """ 842 Get all the Commits of this Repository. 843 844 https://hub.allspice.io/api/swagger#/repository/repoGetAllCommits 845 846 :param sha: The SHA of the commit to start listing commits from. 847 :param path: filepath of a file/dir. 848 :param stat: Include the number of additions and deletions in the response. 849 Disable for speedup. 850 :return: A list of Commits. 851 """ 852 853 data = {} 854 if sha: 855 data["sha"] = sha 856 if path: 857 data["path"] = path 858 if not stat: 859 data["stat"] = False 860 861 try: 862 results = self.allspice_client.requests_get_paginated( 863 Repository.REPO_COMMITS % (self.owner.username, self.name), 864 params=data, 865 ) 866 except ConflictException as err: 867 logging.warning(err) 868 logging.warning("Repository %s/%s is Empty" % (self.owner.username, self.name)) 869 results = [] 870 return [Commit.parse_response(self.allspice_client, result) for result in results]
Get all the Commits of this Repository.
allspice.allspice.io/api/swagger#/repository/repoGetAllCommits">https://huballspice.allspice.io/api/swagger#/repository/repoGetAllCommits
Parameters
- sha: The SHA of the commit to start listing commits from.
- path: filepath of a file/dir.
- stat: Include the number of additions and deletions in the response. Disable for speedup.
Returns
A list of Commits.
872 def get_issues_state(self, state) -> List["Issue"]: 873 """ 874 DEPRECATED: Use get_issues() instead. 875 876 Get issues of state Issue.open or Issue.closed of a repository. 877 """ 878 879 assert state in [Issue.OPENED, Issue.CLOSED] 880 issues = [] 881 data = {"state": state} 882 results = self.allspice_client.requests_get_paginated( 883 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 884 params=data, 885 ) 886 for result in results: 887 issue = Issue.parse_response(self.allspice_client, result) 888 # adding data not contained in the issue response 889 # See Issue.request() 890 setattr(issue, "_repository", self) 891 Issue._add_read_property("repo", self, issue) 892 Issue._add_read_property("owner", self.owner, issue) 893 issues.append(issue) 894 return issues
DEPRECATED: Use get_issues() instead.
Get issues of state Issue.open or Issue.closed of a repository.
902 def get_user_time(self, username) -> float: 903 if isinstance(username, User): 904 username = username.username 905 results = self.allspice_client.requests_get( 906 Repository.REPO_USER_TIME % (self.owner.username, self.name, username) 907 ) 908 time = sum(r["time"] for r in results) 909 return time
914 def create_issue(self, title, assignees=frozenset(), description="") -> ApiObject: 915 data = { 916 "assignees": assignees, 917 "body": description, 918 "closed": False, 919 "title": title, 920 } 921 result = self.allspice_client.requests_post( 922 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 923 data=data, 924 ) 925 926 issue = Issue.parse_response(self.allspice_client, result) 927 setattr(issue, "_repository", self) 928 Issue._add_read_property("repo", self, issue) 929 return issue
931 def create_design_review( 932 self, 933 title: str, 934 head: Union[Branch, str], 935 base: Union[Branch, str], 936 assignees: Optional[Set[Union[User, str]]] = None, 937 body: Optional[str] = None, 938 due_date: Optional[datetime] = None, 939 milestone: Optional["Milestone"] = None, 940 ) -> "DesignReview": 941 """ 942 Create a new Design Review. 943 944 See https://hub.allspice.io/api/swagger#/repository/repoCreatePullRequest 945 946 :param title: Title of the Design Review 947 :param head: Branch or name of the branch to merge into the base branch 948 :param base: Branch or name of the branch to merge into 949 :param assignees: Optional. A list of users to assign this review. List can be of 950 User objects or of usernames. 951 :param body: An Optional Description for the Design Review. 952 :param due_date: An Optional Due date for the Design Review. 953 :param milestone: An Optional Milestone for the Design Review 954 :return: The created Design Review 955 """ 956 957 data: dict[str, Any] = { 958 "title": title, 959 } 960 961 if isinstance(head, Branch): 962 data["head"] = head.name 963 else: 964 data["head"] = head 965 if isinstance(base, Branch): 966 data["base"] = base.name 967 else: 968 data["base"] = base 969 if assignees: 970 data["assignees"] = [a.username if isinstance(a, User) else a for a in assignees] 971 if body: 972 data["body"] = body 973 if due_date: 974 data["due_date"] = Util.format_time(due_date) 975 if milestone: 976 data["milestone"] = milestone.id 977 978 result = self.allspice_client.requests_post( 979 self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name), 980 data=data, 981 ) 982 983 return DesignReview.parse_response(self.allspice_client, result)
Create a new Design Review.
See allspice.allspice.io/api/swagger#/repository/repoCreatePullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoCreatePullRequest
Parameters
- title: Title of the Design Review
- head: Branch or name of the branch to merge into the base branch
- base: Branch or name of the branch to merge into
- assignees: Optional. A list of users to assign this review. List can be of User objects or of usernames.
- body: An Optional Description for the Design Review.
- due_date: An Optional Due date for the Design Review.
- milestone: An Optional Milestone for the Design Review
Returns
The created Design Review
985 def create_milestone( 986 self, 987 title: str, 988 description: str, 989 due_date: Optional[str] = None, 990 state: str = "open", 991 ) -> "Milestone": 992 url = Repository.REPO_MILESTONES.format(owner=self.owner.username, repo=self.name) 993 data = {"title": title, "description": description, "state": state} 994 if due_date: 995 data["due_date"] = due_date 996 result = self.allspice_client.requests_post(url, data=data) 997 return Milestone.parse_response(self.allspice_client, result)
999 def create_gitea_hook(self, hook_url: str, events: List[str]): 1000 url = f"/repos/{self.owner.username}/{self.name}/hooks" 1001 data = { 1002 "type": "gitea", 1003 "config": {"content_type": "json", "url": hook_url}, 1004 "events": events, 1005 "active": True, 1006 } 1007 return self.allspice_client.requests_post(url, data=data)
1017 def is_collaborator(self, username) -> bool: 1018 if isinstance(username, User): 1019 username = username.username 1020 try: 1021 # returns 204 if its ok, 404 if its not 1022 self.allspice_client.requests_get( 1023 Repository.REPO_IS_COLLABORATOR % (self.owner.username, self.name, username) 1024 ) 1025 return True 1026 except Exception: 1027 return False
1029 def get_users_with_access(self) -> Sequence[User]: 1030 url = f"/repos/{self.owner.username}/{self.name}/collaborators" 1031 response = self.allspice_client.requests_get(url) 1032 collabs = [User.parse_response(self.allspice_client, user) for user in response] 1033 if isinstance(self.owner, User): 1034 return [*collabs, self.owner] 1035 else: 1036 # owner must be org 1037 teams = self.owner.get_teams() 1038 for team in teams: 1039 team_repos = team.get_repos() 1040 if self.name in [n.name for n in team_repos]: 1041 collabs += team.get_members() 1042 return collabs
1048 def transfer_ownership( 1049 self, 1050 new_owner: Union[User, Organization], 1051 new_teams: Set[Team] | FrozenSet[Team] = frozenset(), 1052 ): 1053 url = Repository.REPO_TRANSFER.format(owner=self.owner.username, repo=self.name) 1054 data: dict[str, Any] = {"new_owner": new_owner.username} 1055 if isinstance(new_owner, Organization): 1056 new_team_ids = [team.id for team in new_teams if team in new_owner.get_teams()] 1057 data["team_ids"] = new_team_ids 1058 self.allspice_client.requests_post(url, data=data) 1059 # TODO: make sure this instance is either updated or discarded
1061 def get_git_content( 1062 self, 1063 ref: Optional["Ref"] = None, 1064 commit: "Optional[Commit]" = None, 1065 ) -> List[Content]: 1066 """ 1067 Get the metadata for all files in the root directory. 1068 1069 https://hub.allspice.io/api/swagger#/repository/repoGetContentsList 1070 1071 :param ref: branch or commit to get content from 1072 :param commit: commit to get content from (deprecated) 1073 """ 1074 url = f"/repos/{self.owner.username}/{self.name}/contents" 1075 data = Util.data_params_for_ref(ref or commit) 1076 1077 result = [ 1078 Content.parse_response(self.allspice_client, f) 1079 for f in self.allspice_client.requests_get(url, data) 1080 ] 1081 return result
Get the metadata for all files in the root directory.
allspice.allspice.io/api/swagger#/repository/repoGetContentsList">https://huballspice.allspice.io/api/swagger#/repository/repoGetContentsList
Parameters
- ref: branch or commit to get content from
- commit: commit to get content from (deprecated)
1083 def get_tree(self, ref: Optional[Ref] = None, recursive: bool = False) -> List[GitEntry]: 1084 """ 1085 Get the repository's tree on a given ref. 1086 1087 By default, this will only return the top-level entries in the tree. If you want 1088 to get the entire tree, set `recursive` to True. 1089 1090 :param ref: The ref to get the tree from. If not provided, the default branch is used. 1091 :param recursive: Whether to get the entire tree or just the top-level entries. 1092 """ 1093 1094 ref = Util.data_params_for_ref(ref).get("ref", self.default_branch) 1095 url = self.REPO_GET_TREE.format(owner=self.owner.username, repo=self.name, ref=ref) 1096 params = {"recursive": recursive} 1097 results = self.allspice_client.requests_get_paginated(url, params=params) 1098 return [GitEntry.parse_response(self.allspice_client, result) for result in results]
Get the repository's tree on a given ref.
By default, this will only return the top-level entries in the tree. If you want
to get the entire tree, set recursive to True.
Parameters
- ref: The ref to get the tree from. If not provided, the default branch is used.
- recursive: Whether to get the entire tree or just the top-level entries.
1100 def get_file_content( 1101 self, 1102 content: Content, 1103 ref: Optional[Ref] = None, 1104 commit: Optional[Commit] = None, 1105 ) -> Union[str, List["Content"]]: 1106 """https://hub.allspice.io/api/swagger#/repository/repoGetContents""" 1107 url = f"/repos/{self.owner.username}/{self.name}/contents/{content.path}" 1108 data = Util.data_params_for_ref(ref or commit) 1109 1110 if content.type == Content.FILE: 1111 return self.allspice_client.requests_get(url, data)["content"] 1112 else: 1113 return [ 1114 Content.parse_response(self.allspice_client, f) 1115 for f in self.allspice_client.requests_get(url, data) 1116 ]
allspice.allspice.io/api/swagger#/repository/repoGetContents">https://huballspice.allspice.io/api/swagger#/repository/repoGetContents
1118 def get_raw_file( 1119 self, 1120 file_path: str, 1121 ref: Optional[Ref] = None, 1122 ) -> bytes: 1123 """ 1124 Get the raw, binary data of a single file. 1125 1126 Note 1: if the file you are requesting is a text file, you might want to 1127 use .decode() on the result to get a string. For example: 1128 1129 content = repo.get_raw_file("file.txt").decode("utf-8") 1130 1131 Note 2: this method will store the entire file in memory. If you want 1132 to download a large file, you might want to use `download_to_file` 1133 instead. 1134 1135 See https://hub.allspice.io/api/swagger#/repository/repoGetRawFileOrLFS 1136 1137 :param file_path: The path to the file to get. 1138 :param ref: The branch or commit to get the file from. If not provided, 1139 the default branch is used. 1140 """ 1141 1142 url = self.REPO_GET_MEDIA.format( 1143 owner=self.owner.username, 1144 repo=self.name, 1145 path=file_path, 1146 ) 1147 params = Util.data_params_for_ref(ref) 1148 return self.allspice_client.requests_get_raw(url, params=params)
Get the raw, binary data of a single file.
Note 1: if the file you are requesting is a text file, you might want to use .decode() on the result to get a string. For example:
content = repo.get_raw_file("file.txt").decode("utf-8")
Note 2: this method will store the entire file in memory. If you want
to download a large file, you might want to use download_to_file
instead.
See allspice.allspice.io/api/swagger#/repository/repoGetRawFileOrLFS">https://huballspice.allspice.io/api/swagger#/repository/repoGetRawFileOrLFS
Parameters
- file_path: The path to the file to get.
- ref: The branch or commit to get the file from. If not provided, the default branch is used.
1150 def download_to_file( 1151 self, 1152 file_path: str, 1153 io: IO, 1154 ref: Optional[Ref] = None, 1155 ) -> None: 1156 """ 1157 Download the binary data of a file to a file-like object. 1158 1159 Example: 1160 1161 with open("schematic.DSN", "wb") as f: 1162 Repository.download_to_file("Schematics/my_schematic.DSN", f) 1163 1164 :param file_path: The path to the file in the repository from the root 1165 of the repository. 1166 :param io: The file-like object to write the data to. 1167 """ 1168 1169 url = self.allspice_client._AllSpice__get_url( 1170 self.REPO_GET_MEDIA.format( 1171 owner=self.owner.username, 1172 repo=self.name, 1173 path=file_path, 1174 ) 1175 ) 1176 params = Util.data_params_for_ref(ref) 1177 response = self.allspice_client.requests.get( 1178 url, 1179 params=params, 1180 headers=self.allspice_client.headers, 1181 stream=True, 1182 ) 1183 1184 for chunk in response.iter_content(chunk_size=4096): 1185 if chunk: 1186 io.write(chunk)
Download the binary data of a file to a file-like object.
Example:
with open("schematic.DSN", "wb") as f:
Repository.download_to_file("Schematics/my_schematic.DSN", f)
Parameters
- file_path: The path to the file in the repository from the root of the repository.
- io: The file-like object to write the data to.
1188 def get_generated_json( 1189 self, 1190 content: Union[Content, str], 1191 ref: Optional[Ref] = None, 1192 params: Optional[dict] = None, 1193 ) -> dict: 1194 """ 1195 Get the json blob for a cad file if it exists, otherwise enqueue 1196 a new job and return a 503 status. 1197 1198 WARNING: This is still experimental and not recommended for critical 1199 applications. The structure and content of the returned dictionary can 1200 change at any time. 1201 1202 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON 1203 """ 1204 1205 if isinstance(content, Content): 1206 content = content.path 1207 1208 url = self.REPO_GET_ALLSPICE_JSON.format( 1209 owner=self.owner.username, 1210 repo=self.name, 1211 content=content, 1212 ) 1213 data = Util.data_params_for_ref(ref) 1214 if params: 1215 data.update(params) 1216 if self.allspice_client.use_new_schdoc_renderer is not None: 1217 data["use_new_schdoc_renderer"] = ( 1218 "true" if self.allspice_client.use_new_schdoc_renderer else "false" 1219 ) 1220 return self.allspice_client.requests_get(url, data)
Get the json blob for a cad file if it exists, otherwise enqueue a new job and return a 503 status.
WARNING: This is still experimental and not recommended for critical applications. The structure and content of the returned dictionary can change at any time.
See allspice.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON">https://huballspice.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON
1222 def get_generated_svg( 1223 self, 1224 content: Union[Content, str], 1225 ref: Optional[Ref] = None, 1226 params: Optional[dict] = None, 1227 ) -> bytes: 1228 """ 1229 Get the svg blob for a cad file if it exists, otherwise enqueue 1230 a new job and return a 503 status. 1231 1232 WARNING: This is still experimental and not yet recommended for 1233 critical applications. The content of the returned svg can change 1234 at any time. 1235 1236 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG 1237 """ 1238 1239 if isinstance(content, Content): 1240 content = content.path 1241 1242 url = self.REPO_GET_ALLSPICE_SVG.format( 1243 owner=self.owner.username, 1244 repo=self.name, 1245 content=content, 1246 ) 1247 data = Util.data_params_for_ref(ref) 1248 if params: 1249 data.update(params) 1250 if self.allspice_client.use_new_schdoc_renderer is not None: 1251 data["use_new_schdoc_renderer"] = ( 1252 "true" if self.allspice_client.use_new_schdoc_renderer else "false" 1253 ) 1254 return self.allspice_client.requests_get_raw(url, data)
Get the svg blob for a cad file if it exists, otherwise enqueue a new job and return a 503 status.
WARNING: This is still experimental and not yet recommended for critical applications. The content of the returned svg can change at any time.
See allspice.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG">https://huballspice.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG
1256 def get_generated_projectdata( 1257 self, 1258 content: Union[Content, str], 1259 ref: Optional[Ref] = None, 1260 params: Optional[dict] = None, 1261 ) -> dict: 1262 """ 1263 Get the json project data based on the cad file provided 1264 1265 WARNING: This is still experimental and not yet recommended for 1266 critical applications. The content of the returned dictionary can change 1267 at any time. 1268 1269 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceProject 1270 """ 1271 if isinstance(content, Content): 1272 content = content.path 1273 1274 url = self.REPO_GET_ALLSPICE_PROJECT.format( 1275 owner=self.owner.username, 1276 repo=self.name, 1277 content=content, 1278 ) 1279 data = Util.data_params_for_ref(ref) 1280 if params: 1281 data.update(params) 1282 return self.allspice_client.requests_get(url, data)
Get the json project data based on the cad file provided
WARNING: This is still experimental and not yet recommended for critical applications. The content of the returned dictionary can change at any time.
See allspice.allspice.io/api/swagger#/repository/repoGetAllSpiceProject">https://huballspice.allspice.io/api/swagger#/repository/repoGetAllSpiceProject
1284 def create_file(self, file_path: str, content: str, data: Optional[dict] = None): 1285 """https://hub.allspice.io/api/swagger#/repository/repoCreateFile""" 1286 if not data: 1287 data = {} 1288 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1289 data.update({"content": content}) 1290 return self.allspice_client.requests_post(url, data)
allspice.allspice.io/api/swagger#/repository/repoCreateFile">https://huballspice.allspice.io/api/swagger#/repository/repoCreateFile
1292 def change_file(self, file_path: str, file_sha: str, content: str, data: Optional[dict] = None): 1293 """https://hub.allspice.io/api/swagger#/repository/repoCreateFile""" 1294 if not data: 1295 data = {} 1296 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1297 data.update({"sha": file_sha, "content": content}) 1298 return self.allspice_client.requests_put(url, data)
allspice.allspice.io/api/swagger#/repository/repoCreateFile">https://huballspice.allspice.io/api/swagger#/repository/repoCreateFile
1300 def delete_file(self, file_path: str, file_sha: str, data: Optional[dict] = None): 1301 """https://hub.allspice.io/api/swagger#/repository/repoDeleteFile""" 1302 if not data: 1303 data = {} 1304 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1305 data.update({"sha": file_sha}) 1306 return self.allspice_client.requests_delete(url, data)
allspice.allspice.io/api/swagger#/repository/repoDeleteFile">https://huballspice.allspice.io/api/swagger#/repository/repoDeleteFile
1308 def get_archive( 1309 self, 1310 ref: Ref = "main", 1311 archive_format: ArchiveFormat = ArchiveFormat.ZIP, 1312 ) -> bytes: 1313 """ 1314 Download all the files in a specific ref of a repository as a zip or tarball 1315 archive. 1316 1317 https://hub.allspice.io/api/swagger#/repository/repoGetArchive 1318 1319 :param ref: branch or commit to get content from, defaults to the "main" branch 1320 :param archive_format: zip or tar, defaults to zip 1321 """ 1322 1323 ref_string = Util.data_params_for_ref(ref)["ref"] 1324 url = self.REPO_GET_ARCHIVE.format( 1325 owner=self.owner.username, 1326 repo=self.name, 1327 ref=ref_string, 1328 format=archive_format.value, 1329 ) 1330 return self.allspice_client.requests_get_raw(url)
Download all the files in a specific ref of a repository as a zip or tarball archive.
allspice.allspice.io/api/swagger#/repository/repoGetArchive">https://huballspice.allspice.io/api/swagger#/repository/repoGetArchive
Parameters
- ref: branch or commit to get content from, defaults to the "main" branch
- archive_format: zip or tar, defaults to zip
1332 def get_topics(self) -> list[str]: 1333 """ 1334 Gets the list of topics on this repository. 1335 1336 See http://localhost:3000/api/swagger#/repository/repoListTopics 1337 """ 1338 1339 url = self.REPO_GET_TOPICS.format( 1340 owner=self.owner.username, 1341 repo=self.name, 1342 ) 1343 return self.allspice_client.requests_get(url)["topics"]
Gets the list of topics on this repository.
See http://localhost:3000/api/swagger#/repository/repoListTopics
1345 def add_topic(self, topic: str): 1346 """ 1347 Adds a topic to the repository. 1348 1349 See https://hub.allspice.io/api/swagger#/repository/repoAddTopic 1350 1351 :param topic: The topic to add. Topic names must consist only of 1352 lowercase letters, numnbers and dashes (-), and cannot start with 1353 dashes. Topic names also must be under 35 characters long. 1354 """ 1355 1356 url = self.REPO_ADD_TOPIC.format(owner=self.owner.username, repo=self.name, topic=topic) 1357 self.allspice_client.requests_put(url)
Adds a topic to the repository.
See allspice.allspice.io/api/swagger#/repository/repoAddTopic">https://huballspice.allspice.io/api/swagger#/repository/repoAddTopic
Parameters
- topic: The topic to add. Topic names must consist only of lowercase letters, numnbers and dashes (-), and cannot start with dashes. Topic names also must be under 35 characters long.
1359 def create_release( 1360 self, 1361 tag_name: str, 1362 name: Optional[str] = None, 1363 body: Optional[str] = None, 1364 draft: bool = False, 1365 ): 1366 """ 1367 Create a release for this repository. The release will be created for 1368 the tag with the given name. If there is no tag with this name, create 1369 the tag first. 1370 1371 See https://hub.allspice.io/api/swagger#/repository/repoCreateRelease 1372 """ 1373 1374 url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name) 1375 data = { 1376 "tag_name": tag_name, 1377 "draft": draft, 1378 } 1379 if name is not None: 1380 data["name"] = name 1381 if body is not None: 1382 data["body"] = body 1383 response = self.allspice_client.requests_post(url, data) 1384 return Release.parse_response(self.allspice_client, response, self)
Create a release for this repository. The release will be created for the tag with the given name. If there is no tag with this name, create the tag first.
See allspice.allspice.io/api/swagger#/repository/repoCreateRelease">https://huballspice.allspice.io/api/swagger#/repository/repoCreateRelease
1386 def get_releases( 1387 self, draft: Optional[bool] = None, pre_release: Optional[bool] = None 1388 ) -> List[Release]: 1389 """ 1390 Get the list of releases for this repository. 1391 1392 See https://hub.allspice.io/api/swagger#/repository/repoListReleases 1393 """ 1394 1395 data = {} 1396 1397 if draft is not None: 1398 data["draft"] = draft 1399 if pre_release is not None: 1400 data["pre-release"] = pre_release 1401 1402 url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name) 1403 responses = self.allspice_client.requests_get_paginated(url, params=data) 1404 1405 return [ 1406 Release.parse_response(self.allspice_client, response, self) for response in responses 1407 ]
Get the list of releases for this repository.
See allspice.allspice.io/api/swagger#/repository/repoListReleases">https://huballspice.allspice.io/api/swagger#/repository/repoListReleases
1409 def get_latest_release(self) -> Release: 1410 """ 1411 Get the latest release for this repository. 1412 1413 See https://hub.allspice.io/api/swagger#/repository/repoGetLatestRelease 1414 """ 1415 1416 url = self.REPO_GET_LATEST_RELEASE.format(owner=self.owner.username, repo=self.name) 1417 response = self.allspice_client.requests_get(url) 1418 release = Release.parse_response(self.allspice_client, response, self) 1419 return release
Get the latest release for this repository.
See allspice.allspice.io/api/swagger#/repository/repoGetLatestRelease">https://huballspice.allspice.io/api/swagger#/repository/repoGetLatestRelease
1421 def get_release_by_tag(self, tag: str) -> Release: 1422 """ 1423 Get a release by its tag. 1424 1425 See https://hub.allspice.io/api/swagger#/repository/repoGetReleaseByTag 1426 """ 1427 1428 url = self.REPO_GET_RELEASE_BY_TAG.format( 1429 owner=self.owner.username, repo=self.name, tag=tag 1430 ) 1431 response = self.allspice_client.requests_get(url) 1432 release = Release.parse_response(self.allspice_client, response, self) 1433 return release
Get a release by its tag.
See allspice.allspice.io/api/swagger#/repository/repoGetReleaseByTag">https://huballspice.allspice.io/api/swagger#/repository/repoGetReleaseByTag
1435 def get_commit_statuses( 1436 self, 1437 commit: Union[str, Commit], 1438 sort: Optional[CommitStatusSort] = None, 1439 state: Optional[CommitStatusState] = None, 1440 ) -> List[CommitStatus]: 1441 """ 1442 Get a list of statuses for a commit. 1443 1444 This is roughly equivalent to the Commit.get_statuses method, but this 1445 method allows you to sort and filter commits and is more convenient if 1446 you have a commit SHA and don't need to get the commit itself. 1447 1448 See https://hub.allspice.io/api/swagger#/repository/repoListStatuses 1449 """ 1450 1451 if isinstance(commit, Commit): 1452 commit = commit.sha 1453 1454 params = {} 1455 if sort is not None: 1456 params["sort"] = sort.value 1457 if state is not None: 1458 params["state"] = state.value 1459 1460 url = self.REPO_GET_COMMIT_STATUS.format( 1461 owner=self.owner.username, repo=self.name, sha=commit 1462 ) 1463 response = self.allspice_client.requests_get_paginated(url, params=params) 1464 return [CommitStatus.parse_response(self.allspice_client, status) for status in response]
Get a list of statuses for a commit.
This is roughly equivalent to the Commit.get_statuses method, but this method allows you to sort and filter commits and is more convenient if you have a commit SHA and don't need to get the commit itself.
See allspice.allspice.io/api/swagger#/repository/repoListStatuses">https://huballspice.allspice.io/api/swagger#/repository/repoListStatuses
1466 def create_commit_status( 1467 self, 1468 commit: Union[str, Commit], 1469 context: Optional[str] = None, 1470 description: Optional[str] = None, 1471 state: Optional[CommitStatusState] = None, 1472 target_url: Optional[str] = None, 1473 ) -> CommitStatus: 1474 """ 1475 Create a status on a commit. 1476 1477 See https://hub.allspice.io/api/swagger#/repository/repoCreateStatus 1478 """ 1479 1480 if isinstance(commit, Commit): 1481 commit = commit.sha 1482 1483 data = {} 1484 if context is not None: 1485 data["context"] = context 1486 if description is not None: 1487 data["description"] = description 1488 if state is not None: 1489 data["state"] = state.value 1490 if target_url is not None: 1491 data["target_url"] = target_url 1492 1493 url = self.REPO_GET_COMMIT_STATUS.format( 1494 owner=self.owner.username, repo=self.name, sha=commit 1495 ) 1496 response = self.allspice_client.requests_post(url, data=data) 1497 return CommitStatus.parse_response(self.allspice_client, response)
Create a status on a commit.
See allspice.allspice.io/api/swagger#/repository/repoCreateStatus">https://huballspice.allspice.io/api/swagger#/repository/repoCreateStatus
573 class ArchiveFormat(Enum): 574 """ 575 Archive formats for Repository.get_archive 576 """ 577 578 TAR = "tar.gz" 579 ZIP = "zip"
Archive formats for Repository.get_archive
581 class CommitStatusSort(Enum): 582 """ 583 Sort order for Repository.get_commit_status 584 """ 585 586 OLDEST = "oldest" 587 RECENT_UPDATE = "recentupdate" 588 LEAST_UPDATE = "leastupdate" 589 LEAST_INDEX = "leastindex" 590 HIGHEST_INDEX = "highestindex"
Sort order for Repository.get_commit_status
1506class Milestone(ApiObject): 1507 allow_merge_commits: Any 1508 allow_rebase: Any 1509 allow_rebase_explicit: Any 1510 allow_squash_merge: Any 1511 archived: Any 1512 closed_at: Any 1513 closed_issues: int 1514 created_at: str 1515 default_branch: Any 1516 description: str 1517 due_on: Any 1518 has_issues: Any 1519 has_pull_requests: Any 1520 has_wiki: Any 1521 id: int 1522 ignore_whitespace_conflicts: Any 1523 name: Any 1524 open_issues: int 1525 private: Any 1526 state: str 1527 title: str 1528 updated_at: str 1529 website: Any 1530 1531 API_OBJECT = """/repos/{owner}/{repo}/milestones/{number}""" # <owner, repo> 1532 1533 def __init__(self, allspice_client): 1534 super().__init__(allspice_client) 1535 1536 def __eq__(self, other): 1537 if not isinstance(other, Milestone): 1538 return False 1539 return self.allspice_client == other.allspice_client and self.id == other.id 1540 1541 def __hash__(self): 1542 return hash(self.allspice_client) ^ hash(self.id) 1543 1544 _fields_to_parsers: ClassVar[dict] = { 1545 "closed_at": lambda _, t: Util.convert_time(t), 1546 "due_on": lambda _, t: Util.convert_time(t), 1547 } 1548 1549 _patchable_fields: ClassVar[set[str]] = { 1550 "allow_merge_commits", 1551 "allow_rebase", 1552 "allow_rebase_explicit", 1553 "allow_squash_merge", 1554 "archived", 1555 "default_branch", 1556 "description", 1557 "has_issues", 1558 "has_pull_requests", 1559 "has_wiki", 1560 "ignore_whitespace_conflicts", 1561 "name", 1562 "private", 1563 "website", 1564 } 1565 1566 @classmethod 1567 def request(cls, allspice_client, owner: str, repo: str, number: str): 1568 return cls._request(allspice_client, {"owner": owner, "repo": repo, "number": number})
1571class Attachment(ReadonlyApiObject): 1572 """ 1573 An asset attached to a comment. 1574 1575 You cannot edit or delete the attachment from this object - see the instance methods 1576 Comment.edit_attachment and delete_attachment for that. 1577 """ 1578 1579 browser_download_url: str 1580 created_at: str 1581 download_count: int 1582 id: int 1583 name: str 1584 size: int 1585 uuid: str 1586 1587 def __init__(self, allspice_client): 1588 super().__init__(allspice_client) 1589 1590 def __eq__(self, other): 1591 if not isinstance(other, Attachment): 1592 return False 1593 1594 return self.uuid == other.uuid 1595 1596 def __hash__(self): 1597 return hash(self.uuid) 1598 1599 def download_to_file(self, io: IO): 1600 """ 1601 Download the raw, binary data of this Attachment to a file-like object. 1602 1603 Example: 1604 1605 with open("my_file.zip", "wb") as f: 1606 attachment.download_to_file(f) 1607 1608 :param io: The file-like object to write the data to. 1609 """ 1610 1611 response = self.allspice_client.requests.get( 1612 self.browser_download_url, 1613 headers=self.allspice_client.headers, 1614 stream=True, 1615 ) 1616 # 4kb chunks 1617 for chunk in response.iter_content(chunk_size=4096): 1618 if chunk: 1619 io.write(chunk)
An asset attached to a comment.
You cannot edit or delete the attachment from this object - see the instance methods Comment.edit_attachment and delete_attachment for that.
1599 def download_to_file(self, io: IO): 1600 """ 1601 Download the raw, binary data of this Attachment to a file-like object. 1602 1603 Example: 1604 1605 with open("my_file.zip", "wb") as f: 1606 attachment.download_to_file(f) 1607 1608 :param io: The file-like object to write the data to. 1609 """ 1610 1611 response = self.allspice_client.requests.get( 1612 self.browser_download_url, 1613 headers=self.allspice_client.headers, 1614 stream=True, 1615 ) 1616 # 4kb chunks 1617 for chunk in response.iter_content(chunk_size=4096): 1618 if chunk: 1619 io.write(chunk)
Download the raw, binary data of this Attachment to a file-like object.
Example:
with open("my_file.zip", "wb") as f:
attachment.download_to_file(f)
Parameters
- io: The file-like object to write the data to.
Inherited Members
1622class Comment(ApiObject): 1623 assets: List[Union[Any, Dict[str, Union[int, str]]]] 1624 body: str 1625 created_at: datetime 1626 html_url: str 1627 id: int 1628 issue_url: str 1629 original_author: str 1630 original_author_id: int 1631 pull_request_url: str 1632 updated_at: datetime 1633 user: User 1634 1635 API_OBJECT = """/repos/{owner}/{repo}/issues/comments/{id}""" 1636 GET_ATTACHMENTS_PATH = """/repos/{owner}/{repo}/issues/comments/{id}/assets""" 1637 ATTACHMENT_PATH = """/repos/{owner}/{repo}/issues/comments/{id}/assets/{attachment_id}""" 1638 1639 def __init__(self, allspice_client): 1640 super().__init__(allspice_client) 1641 1642 def __eq__(self, other): 1643 if not isinstance(other, Comment): 1644 return False 1645 return self.repository == other.repository and self.id == other.id 1646 1647 def __hash__(self): 1648 return hash(self.repository) ^ hash(self.id) 1649 1650 @classmethod 1651 def request(cls, allspice_client, owner: str, repo: str, id: str) -> "Comment": 1652 return cls._request(allspice_client, {"owner": owner, "repo": repo, "id": id}) 1653 1654 _fields_to_parsers: ClassVar[dict] = { 1655 "user": lambda allspice_client, r: User.parse_response(allspice_client, r), 1656 "created_at": lambda _, t: Util.convert_time(t), 1657 "updated_at": lambda _, t: Util.convert_time(t), 1658 } 1659 1660 _patchable_fields: ClassVar[set[str]] = {"body"} 1661 1662 @property 1663 def parent_url(self) -> str: 1664 """URL of the parent of this comment (the issue or the pull request)""" 1665 1666 if self.issue_url is not None and self.issue_url != "": 1667 return self.issue_url 1668 else: 1669 return self.pull_request_url 1670 1671 @cached_property 1672 def repository(self) -> Repository: 1673 """The repository this comment was posted on.""" 1674 1675 owner_name, repo_name = self.parent_url.split("/")[-4:-2] 1676 return Repository.request(self.allspice_client, owner_name, repo_name) 1677 1678 def __fields_for_path(self): 1679 return { 1680 "owner": self.repository.owner.username, 1681 "repo": self.repository.name, 1682 "id": self.id, 1683 } 1684 1685 def commit(self): 1686 self._commit(self.__fields_for_path()) 1687 1688 def delete(self): 1689 self.allspice_client.requests_delete(self.API_OBJECT.format(**self.__fields_for_path())) 1690 self.deleted = True 1691 1692 def get_attachments(self) -> List[Attachment]: 1693 """ 1694 Get all attachments on this comment. This returns Attachment objects, which 1695 contain a link to download the attachment. 1696 1697 https://hub.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments 1698 """ 1699 1700 results = self.allspice_client.requests_get( 1701 self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()) 1702 ) 1703 return [Attachment.parse_response(self.allspice_client, result) for result in results] 1704 1705 def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment: 1706 """ 1707 Create an attachment on this comment. 1708 1709 https://hub.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment 1710 1711 :param file: The file to attach. This should be a file-like object. 1712 :param name: The name of the file. If not provided, the name of the file will be 1713 used. 1714 :return: The created attachment. 1715 """ 1716 1717 args: dict[str, Any] = { 1718 "files": {"attachment": file}, 1719 } 1720 if name is not None: 1721 args["params"] = {"name": name} 1722 1723 result = self.allspice_client.requests_post( 1724 self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()), 1725 **args, 1726 ) 1727 return Attachment.parse_response(self.allspice_client, result) 1728 1729 def edit_attachment(self, attachment: Attachment, data: dict) -> Attachment: 1730 """ 1731 Edit an attachment. 1732 1733 The list of params that can be edited is available at 1734 https://hub.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment 1735 1736 :param attachment: The attachment to be edited 1737 :param data: The data parameter should be a dictionary of the fields to edit. 1738 :return: The edited attachment 1739 """ 1740 1741 args = { 1742 **self.__fields_for_path(), 1743 "attachment_id": attachment.id, 1744 } 1745 result = self.allspice_client.requests_patch( 1746 self.ATTACHMENT_PATH.format(**args), 1747 data=data, 1748 ) 1749 return Attachment.parse_response(self.allspice_client, result) 1750 1751 def delete_attachment(self, attachment: Attachment): 1752 """https://hub.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment""" 1753 1754 args = { 1755 **self.__fields_for_path(), 1756 "attachment_id": attachment.id, 1757 } 1758 self.allspice_client.requests_delete(self.ATTACHMENT_PATH.format(**args)) 1759 attachment.deleted = True
1662 @property 1663 def parent_url(self) -> str: 1664 """URL of the parent of this comment (the issue or the pull request)""" 1665 1666 if self.issue_url is not None and self.issue_url != "": 1667 return self.issue_url 1668 else: 1669 return self.pull_request_url
URL of the parent of this comment (the issue or the pull request)
1671 @cached_property 1672 def repository(self) -> Repository: 1673 """The repository this comment was posted on.""" 1674 1675 owner_name, repo_name = self.parent_url.split("/")[-4:-2] 1676 return Repository.request(self.allspice_client, owner_name, repo_name)
The repository this comment was posted on.
1692 def get_attachments(self) -> List[Attachment]: 1693 """ 1694 Get all attachments on this comment. This returns Attachment objects, which 1695 contain a link to download the attachment. 1696 1697 https://hub.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments 1698 """ 1699 1700 results = self.allspice_client.requests_get( 1701 self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()) 1702 ) 1703 return [Attachment.parse_response(self.allspice_client, result) for result in results]
Get all attachments on this comment. This returns Attachment objects, which contain a link to download the attachment.
allspice.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments">https://huballspice.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments
1705 def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment: 1706 """ 1707 Create an attachment on this comment. 1708 1709 https://hub.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment 1710 1711 :param file: The file to attach. This should be a file-like object. 1712 :param name: The name of the file. If not provided, the name of the file will be 1713 used. 1714 :return: The created attachment. 1715 """ 1716 1717 args: dict[str, Any] = { 1718 "files": {"attachment": file}, 1719 } 1720 if name is not None: 1721 args["params"] = {"name": name} 1722 1723 result = self.allspice_client.requests_post( 1724 self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()), 1725 **args, 1726 ) 1727 return Attachment.parse_response(self.allspice_client, result)
Create an attachment on this comment.
allspice.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment
Parameters
- file: The file to attach. This should be a file-like object.
- name: The name of the file. If not provided, the name of the file will be used.
Returns
The created attachment.
1729 def edit_attachment(self, attachment: Attachment, data: dict) -> Attachment: 1730 """ 1731 Edit an attachment. 1732 1733 The list of params that can be edited is available at 1734 https://hub.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment 1735 1736 :param attachment: The attachment to be edited 1737 :param data: The data parameter should be a dictionary of the fields to edit. 1738 :return: The edited attachment 1739 """ 1740 1741 args = { 1742 **self.__fields_for_path(), 1743 "attachment_id": attachment.id, 1744 } 1745 result = self.allspice_client.requests_patch( 1746 self.ATTACHMENT_PATH.format(**args), 1747 data=data, 1748 ) 1749 return Attachment.parse_response(self.allspice_client, result)
Edit an attachment.
The list of params that can be edited is available at allspice.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment
Parameters
- attachment: The attachment to be edited
- data: The data parameter should be a dictionary of the fields to edit.
Returns
The edited attachment
1751 def delete_attachment(self, attachment: Attachment): 1752 """https://hub.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment""" 1753 1754 args = { 1755 **self.__fields_for_path(), 1756 "attachment_id": attachment.id, 1757 } 1758 self.allspice_client.requests_delete(self.ATTACHMENT_PATH.format(**args)) 1759 attachment.deleted = True
allspice.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment
1762class Commit(ReadonlyApiObject): 1763 author: User 1764 commit: Dict[str, Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]] 1765 committer: Dict[str, Union[int, str, bool]] 1766 created: str 1767 files: List[Dict[str, str]] 1768 html_url: str 1769 inner_commit: Dict[str, Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]] 1770 parents: List[Union[Dict[str, str], Any]] 1771 sha: str 1772 stats: Dict[str, int] 1773 url: str 1774 1775 API_OBJECT = """/repos/{owner}/{repo}/commits/{sha}""" 1776 COMMIT_GET_STATUS = """/repos/{owner}/{repo}/commits/{sha}/status""" 1777 COMMIT_GET_STATUSES = """/repos/{owner}/{repo}/commits/{sha}/statuses""" 1778 1779 # Regex to extract owner and repo names from the url property 1780 URL_REGEXP = re.compile(r"/repos/([^/]+)/([^/]+)/git/commits") 1781 1782 def __init__(self, allspice_client): 1783 super().__init__(allspice_client) 1784 1785 _fields_to_parsers: ClassVar[dict] = { 1786 # NOTE: api may return None for commiters that are no allspice users 1787 "author": lambda allspice_client, u: User.parse_response(allspice_client, u) if u else None 1788 } 1789 1790 def __eq__(self, other): 1791 if not isinstance(other, Commit): 1792 return False 1793 return self.sha == other.sha 1794 1795 def __hash__(self): 1796 return hash(self.sha) 1797 1798 @classmethod 1799 def parse_response(cls, allspice_client, result) -> "Commit": 1800 commit_cache = result["commit"] 1801 api_object = cls(allspice_client) 1802 cls._initialize(allspice_client, api_object, result) 1803 # inner_commit for legacy reasons 1804 Commit._add_read_property("inner_commit", commit_cache, api_object) 1805 return api_object 1806 1807 def get_status(self) -> CommitCombinedStatus: 1808 """ 1809 Get a combined status consisting of all statues on this commit. 1810 1811 Note that the returned object is a CommitCombinedStatus object, which 1812 also contains a list of all statuses on the commit. 1813 1814 https://hub.allspice.io/api/swagger#/repository/repoGetCommitStatus 1815 """ 1816 1817 result = self.allspice_client.requests_get( 1818 self.COMMIT_GET_STATUS.format(**self._fields_for_path) 1819 ) 1820 return CommitCombinedStatus.parse_response(self.allspice_client, result) 1821 1822 def get_statuses(self) -> List[CommitStatus]: 1823 """ 1824 Get a list of all statuses on this commit. 1825 1826 https://hub.allspice.io/api/swagger#/repository/repoListCommitStatuses 1827 """ 1828 1829 results = self.allspice_client.requests_get( 1830 self.COMMIT_GET_STATUSES.format(**self._fields_for_path) 1831 ) 1832 return [CommitStatus.parse_response(self.allspice_client, result) for result in results] 1833 1834 @cached_property 1835 def _fields_for_path(self) -> dict[str, str]: 1836 matches = self.URL_REGEXP.search(self.url) 1837 if not matches: 1838 raise ValueError(f"Invalid commit URL: {self.url}") 1839 1840 return { 1841 "owner": matches.group(1), 1842 "repo": matches.group(2), 1843 "sha": self.sha, 1844 }
1798 @classmethod 1799 def parse_response(cls, allspice_client, result) -> "Commit": 1800 commit_cache = result["commit"] 1801 api_object = cls(allspice_client) 1802 cls._initialize(allspice_client, api_object, result) 1803 # inner_commit for legacy reasons 1804 Commit._add_read_property("inner_commit", commit_cache, api_object) 1805 return api_object
1807 def get_status(self) -> CommitCombinedStatus: 1808 """ 1809 Get a combined status consisting of all statues on this commit. 1810 1811 Note that the returned object is a CommitCombinedStatus object, which 1812 also contains a list of all statuses on the commit. 1813 1814 https://hub.allspice.io/api/swagger#/repository/repoGetCommitStatus 1815 """ 1816 1817 result = self.allspice_client.requests_get( 1818 self.COMMIT_GET_STATUS.format(**self._fields_for_path) 1819 ) 1820 return CommitCombinedStatus.parse_response(self.allspice_client, result)
Get a combined status consisting of all statues on this commit.
Note that the returned object is a CommitCombinedStatus object, which also contains a list of all statuses on the commit.
allspice.allspice.io/api/swagger#/repository/repoGetCommitStatus">https://huballspice.allspice.io/api/swagger#/repository/repoGetCommitStatus
1822 def get_statuses(self) -> List[CommitStatus]: 1823 """ 1824 Get a list of all statuses on this commit. 1825 1826 https://hub.allspice.io/api/swagger#/repository/repoListCommitStatuses 1827 """ 1828 1829 results = self.allspice_client.requests_get( 1830 self.COMMIT_GET_STATUSES.format(**self._fields_for_path) 1831 ) 1832 return [CommitStatus.parse_response(self.allspice_client, result) for result in results]
Get a list of all statuses on this commit.
allspice.allspice.io/api/swagger#/repository/repoListCommitStatuses">https://huballspice.allspice.io/api/swagger#/repository/repoListCommitStatuses
Inherited Members
1847class CommitStatusState(Enum): 1848 PENDING = "pending" 1849 SUCCESS = "success" 1850 ERROR = "error" 1851 FAILURE = "failure" 1852 WARNING = "warning" 1853 1854 @classmethod 1855 def try_init(cls, value: str) -> CommitStatusState | str: 1856 """ 1857 Try converting a string to the enum, and if that fails, return the 1858 string itself. 1859 """ 1860 1861 try: 1862 return cls(value) 1863 except ValueError: 1864 return value
1854 @classmethod 1855 def try_init(cls, value: str) -> CommitStatusState | str: 1856 """ 1857 Try converting a string to the enum, and if that fails, return the 1858 string itself. 1859 """ 1860 1861 try: 1862 return cls(value) 1863 except ValueError: 1864 return value
Try converting a string to the enum, and if that fails, return the string itself.
1867class CommitStatus(ReadonlyApiObject): 1868 context: str 1869 created_at: str 1870 creator: User 1871 description: str 1872 id: int 1873 status: CommitStatusState 1874 target_url: str 1875 updated_at: str 1876 url: str 1877 1878 def __init__(self, allspice_client): 1879 super().__init__(allspice_client) 1880 1881 _fields_to_parsers: ClassVar[dict] = { 1882 # Gitea/ASH doesn't actually validate that the status is a "valid" 1883 # status, so we can expect empty or unknown strings in the status field. 1884 "status": lambda _, s: CommitStatusState.try_init(s), 1885 "creator": lambda allspice_client, u: ( 1886 User.parse_response(allspice_client, u) if u else None 1887 ), 1888 } 1889 1890 def __eq__(self, other): 1891 if not isinstance(other, CommitStatus): 1892 return False 1893 return self.id == other.id 1894 1895 def __hash__(self): 1896 return hash(self.id)
Inherited Members
1899class CommitCombinedStatus(ReadonlyApiObject): 1900 commit_url: str 1901 repository: Repository 1902 sha: str 1903 state: CommitStatusState 1904 statuses: List["CommitStatus"] 1905 total_count: int 1906 url: str 1907 1908 def __init__(self, allspice_client): 1909 super().__init__(allspice_client) 1910 1911 _fields_to_parsers: ClassVar[dict] = { 1912 # See CommitStatus 1913 "state": lambda _, s: CommitStatusState.try_init(s), 1914 "statuses": lambda allspice_client, statuses: [ 1915 CommitStatus.parse_response(allspice_client, status) for status in statuses 1916 ], 1917 "repository": lambda allspice_client, r: Repository.parse_response(allspice_client, r), 1918 } 1919 1920 def __eq__(self, other): 1921 if not isinstance(other, CommitCombinedStatus): 1922 return False 1923 return self.sha == other.sha 1924 1925 def __hash__(self): 1926 return hash(self.sha) 1927 1928 @classmethod 1929 def parse_response(cls, allspice_client, result) -> "CommitCombinedStatus": 1930 api_object = cls(allspice_client) 1931 cls._initialize(allspice_client, api_object, result) 1932 return api_object
Inherited Members
1935class Issue(ApiObject): 1936 """ 1937 An issue on a repository. 1938 1939 Note: `Issue.assets` may not have any entries even if the issue has 1940 attachments. This happens when an issue is fetched via a bulk method like 1941 `Repository.get_issues`. In most cases, prefer using 1942 `Issue.get_attachments` to get the attachments on an issue. 1943 """ 1944 1945 assets: List[Union[Any, "Attachment"]] 1946 assignee: Any 1947 assignees: Any 1948 body: str 1949 closed_at: Any 1950 comments: int 1951 created_at: str 1952 due_date: Any 1953 html_url: str 1954 id: int 1955 is_locked: bool 1956 labels: List[Any] 1957 milestone: Optional["Milestone"] 1958 number: int 1959 original_author: str 1960 original_author_id: int 1961 pin_order: int 1962 pull_request: Any 1963 ref: str 1964 repository: Dict[str, Union[int, str]] 1965 state: str 1966 title: str 1967 updated_at: str 1968 url: str 1969 user: User 1970 1971 API_OBJECT = """/repos/{owner}/{repo}/issues/{index}""" # <owner, repo, index> 1972 GET_TIME = """/repos/%s/%s/issues/%s/times""" # <owner, repo, index> 1973 GET_COMMENTS = """/repos/{owner}/{repo}/issues/{index}/comments""" 1974 CREATE_ISSUE = """/repos/{owner}/{repo}/issues""" 1975 GET_ATTACHMENTS = """/repos/{owner}/{repo}/issues/{index}/assets""" 1976 1977 OPENED = "open" 1978 CLOSED = "closed" 1979 1980 def __init__(self, allspice_client): 1981 super().__init__(allspice_client) 1982 1983 def __eq__(self, other): 1984 if not isinstance(other, Issue): 1985 return False 1986 return self.repository == other.repository and self.id == other.id 1987 1988 def __hash__(self): 1989 return hash(self.repository) ^ hash(self.id) 1990 1991 _fields_to_parsers: ClassVar[dict] = { 1992 "milestone": lambda allspice_client, m: Milestone.parse_response(allspice_client, m), 1993 "user": lambda allspice_client, u: User.parse_response(allspice_client, u), 1994 "assets": lambda allspice_client, assets: [ 1995 Attachment.parse_response(allspice_client, a) for a in assets 1996 ], 1997 "assignee": lambda allspice_client, u: User.parse_response(allspice_client, u), 1998 "assignees": lambda allspice_client, us: [ 1999 User.parse_response(allspice_client, u) for u in us 2000 ], 2001 "state": lambda _, s: Issue.CLOSED if s == "closed" else Issue.OPENED, 2002 } 2003 2004 _parsers_to_fields: ClassVar[dict] = { 2005 "milestone": lambda m: m.id, 2006 } 2007 2008 _patchable_fields: ClassVar[set[str]] = { 2009 "assignee", 2010 "assignees", 2011 "body", 2012 "due_date", 2013 "milestone", 2014 "state", 2015 "title", 2016 } 2017 2018 def commit(self): 2019 args = { 2020 "owner": self.repository.owner.username, 2021 "repo": self.repository.name, 2022 "index": self.number, 2023 } 2024 self._commit(args) 2025 2026 @classmethod 2027 def request(cls, allspice_client, owner: str, repo: str, number: str): 2028 api_object = cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number}) 2029 # The repository in the response is a RepositoryMeta object, so request 2030 # the full repository object and add it to the issue object. 2031 repository = Repository.request(allspice_client, owner, repo) 2032 setattr(api_object, "_repository", repository) 2033 # For legacy reasons 2034 cls._add_read_property("repo", repository, api_object) 2035 return api_object 2036 2037 @classmethod 2038 def create_issue(cls, allspice_client, repo: Repository, title: str, body: str = ""): 2039 args = {"owner": repo.owner.username, "repo": repo.name} 2040 data = {"title": title, "body": body} 2041 result = allspice_client.requests_post(Issue.CREATE_ISSUE.format(**args), data=data) 2042 issue = Issue.parse_response(allspice_client, result) 2043 setattr(issue, "_repository", repo) 2044 cls._add_read_property("repo", repo, issue) 2045 return issue 2046 2047 @property 2048 def owner(self) -> Organization | User: 2049 return self.repository.owner 2050 2051 def get_time_sum(self, user: User) -> int: 2052 results = self.allspice_client.requests_get( 2053 Issue.GET_TIME % (self.owner.username, self.repository.name, self.number) 2054 ) 2055 return sum(result["time"] for result in results if result and result["user_id"] == user.id) 2056 2057 def get_times(self) -> Optional[Dict]: 2058 return self.allspice_client.requests_get( 2059 Issue.GET_TIME % (self.owner.username, self.repository.name, self.number) 2060 ) 2061 2062 def delete_time(self, time_id: str): 2063 path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times/{time_id}" 2064 self.allspice_client.requests_delete(path) 2065 2066 def add_time(self, time: int, created: Optional[str] = None, user_name: Optional[User] = None): 2067 path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times" 2068 self.allspice_client.requests_post( 2069 path, data={"created": created, "time": int(time), "user_name": user_name} 2070 ) 2071 2072 def get_comments(self) -> List[Comment]: 2073 """https://hub.allspice.io/api/swagger#/issue/issueGetComments""" 2074 2075 results = self.allspice_client.requests_get( 2076 self.GET_COMMENTS.format( 2077 owner=self.owner.username, repo=self.repository.name, index=self.number 2078 ) 2079 ) 2080 2081 return [Comment.parse_response(self.allspice_client, result) for result in results] 2082 2083 def create_comment(self, body: str) -> Comment: 2084 """https://hub.allspice.io/api/swagger#/issue/issueCreateComment""" 2085 2086 path = self.GET_COMMENTS.format( 2087 owner=self.owner.username, repo=self.repository.name, index=self.number 2088 ) 2089 2090 response = self.allspice_client.requests_post(path, data={"body": body}) 2091 return Comment.parse_response(self.allspice_client, response) 2092 2093 def get_attachments(self) -> List[Attachment]: 2094 """ 2095 Fetch all attachments on this issue. 2096 2097 Unlike the assets field, this will always fetch all attachments from the 2098 API. 2099 2100 See https://hub.allspice.io/api/swagger#/issue/issueListIssueAttachments 2101 """ 2102 2103 path = self.GET_ATTACHMENTS.format( 2104 owner=self.owner.username, repo=self.repository.name, index=self.number 2105 ) 2106 response = self.allspice_client.requests_get(path) 2107 2108 return [Attachment.parse_response(self.allspice_client, result) for result in response] 2109 2110 def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment: 2111 """ 2112 Create an attachment on this issue. 2113 2114 https://hub.allspice.io/api/swagger#/issue/issueCreateIssueAttachment 2115 2116 :param file: The file to attach. This should be a file-like object. 2117 :param name: The name of the file. If not provided, the name of the file will be 2118 used. 2119 :return: The created attachment. 2120 """ 2121 2122 args: dict[str, Any] = { 2123 "files": {"attachment": file}, 2124 } 2125 if name is not None: 2126 args["params"] = {"name": name} 2127 2128 result = self.allspice_client.requests_post( 2129 self.GET_ATTACHMENTS.format( 2130 owner=self.owner.username, repo=self.repository.name, index=self.number 2131 ), 2132 **args, 2133 ) 2134 2135 return Attachment.parse_response(self.allspice_client, result)
An issue on a repository.
Note: Issue.assets may not have any entries even if the issue has
attachments. This happens when an issue is fetched via a bulk method like
Repository.get_issues. In most cases, prefer using
Issue.get_attachments to get the attachments on an issue.
2026 @classmethod 2027 def request(cls, allspice_client, owner: str, repo: str, number: str): 2028 api_object = cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number}) 2029 # The repository in the response is a RepositoryMeta object, so request 2030 # the full repository object and add it to the issue object. 2031 repository = Repository.request(allspice_client, owner, repo) 2032 setattr(api_object, "_repository", repository) 2033 # For legacy reasons 2034 cls._add_read_property("repo", repository, api_object) 2035 return api_object
2037 @classmethod 2038 def create_issue(cls, allspice_client, repo: Repository, title: str, body: str = ""): 2039 args = {"owner": repo.owner.username, "repo": repo.name} 2040 data = {"title": title, "body": body} 2041 result = allspice_client.requests_post(Issue.CREATE_ISSUE.format(**args), data=data) 2042 issue = Issue.parse_response(allspice_client, result) 2043 setattr(issue, "_repository", repo) 2044 cls._add_read_property("repo", repo, issue) 2045 return issue
2066 def add_time(self, time: int, created: Optional[str] = None, user_name: Optional[User] = None): 2067 path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times" 2068 self.allspice_client.requests_post( 2069 path, data={"created": created, "time": int(time), "user_name": user_name} 2070 )
2072 def get_comments(self) -> List[Comment]: 2073 """https://hub.allspice.io/api/swagger#/issue/issueGetComments""" 2074 2075 results = self.allspice_client.requests_get( 2076 self.GET_COMMENTS.format( 2077 owner=self.owner.username, repo=self.repository.name, index=self.number 2078 ) 2079 ) 2080 2081 return [Comment.parse_response(self.allspice_client, result) for result in results]
allspice.allspice.io/api/swagger#/issue/issueGetComments">https://huballspice.allspice.io/api/swagger#/issue/issueGetComments
2083 def create_comment(self, body: str) -> Comment: 2084 """https://hub.allspice.io/api/swagger#/issue/issueCreateComment""" 2085 2086 path = self.GET_COMMENTS.format( 2087 owner=self.owner.username, repo=self.repository.name, index=self.number 2088 ) 2089 2090 response = self.allspice_client.requests_post(path, data={"body": body}) 2091 return Comment.parse_response(self.allspice_client, response)
allspice.allspice.io/api/swagger#/issue/issueCreateComment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateComment
2093 def get_attachments(self) -> List[Attachment]: 2094 """ 2095 Fetch all attachments on this issue. 2096 2097 Unlike the assets field, this will always fetch all attachments from the 2098 API. 2099 2100 See https://hub.allspice.io/api/swagger#/issue/issueListIssueAttachments 2101 """ 2102 2103 path = self.GET_ATTACHMENTS.format( 2104 owner=self.owner.username, repo=self.repository.name, index=self.number 2105 ) 2106 response = self.allspice_client.requests_get(path) 2107 2108 return [Attachment.parse_response(self.allspice_client, result) for result in response]
Fetch all attachments on this issue.
Unlike the assets field, this will always fetch all attachments from the API.
See allspice.allspice.io/api/swagger#/issue/issueListIssueAttachments">https://huballspice.allspice.io/api/swagger#/issue/issueListIssueAttachments
2110 def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment: 2111 """ 2112 Create an attachment on this issue. 2113 2114 https://hub.allspice.io/api/swagger#/issue/issueCreateIssueAttachment 2115 2116 :param file: The file to attach. This should be a file-like object. 2117 :param name: The name of the file. If not provided, the name of the file will be 2118 used. 2119 :return: The created attachment. 2120 """ 2121 2122 args: dict[str, Any] = { 2123 "files": {"attachment": file}, 2124 } 2125 if name is not None: 2126 args["params"] = {"name": name} 2127 2128 result = self.allspice_client.requests_post( 2129 self.GET_ATTACHMENTS.format( 2130 owner=self.owner.username, repo=self.repository.name, index=self.number 2131 ), 2132 **args, 2133 ) 2134 2135 return Attachment.parse_response(self.allspice_client, result)
Create an attachment on this issue.
allspice.allspice.io/api/swagger#/issue/issueCreateIssueAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateIssueAttachment
Parameters
- file: The file to attach. This should be a file-like object.
- name: The name of the file. If not provided, the name of the file will be used.
Returns
The created attachment.
2138class DesignReviewReviewComment(ApiObject): 2139 """ 2140 A comment on a Design Review Review. 2141 """ 2142 2143 body: str 2144 commit_id: str 2145 created_at: str 2146 diff_hunk: str 2147 html_url: str 2148 id: int 2149 original_commit_id: str 2150 original_position: int 2151 path: str 2152 position: int 2153 pull_request_review_id: int 2154 pull_request_url: str 2155 resolver: Any 2156 sub_path: str 2157 updated_at: str 2158 user: User 2159 2160 def __init__(self, allspice_client): 2161 super().__init__(allspice_client) 2162 2163 _fields_to_parsers: ClassVar[dict] = { 2164 "resolver": lambda allspice_client, r: User.parse_response(allspice_client, r), 2165 "user": lambda allspice_client, u: User.parse_response(allspice_client, u), 2166 }
A comment on a Design Review Review.
2169class DesignReviewReview(ReadonlyApiObject): 2170 """ 2171 A review on a Design Review. 2172 """ 2173 2174 body: str 2175 comments_count: int 2176 commit_id: str 2177 dismissed: bool 2178 html_url: str 2179 id: int 2180 official: bool 2181 pull_request_url: str 2182 stale: bool 2183 state: ReviewEvent 2184 submitted_at: str 2185 team: Any 2186 updated_at: str 2187 user: User 2188 2189 API_OBJECT = "/repos/{owner}/{repo}/pulls/{index}/reviews/{id}" 2190 GET_COMMENTS = "/repos/{owner}/{repo}/pulls/{index}/reviews/{id}/comments" 2191 2192 class ReviewEvent(Enum): 2193 APPROVED = "APPROVED" 2194 PENDING = "PENDING" 2195 COMMENT = "COMMENT" 2196 REQUEST_CHANGES = "REQUEST_CHANGES" 2197 REQUEST_REVIEW = "REQUEST_REVIEW" 2198 UNKNOWN = "" 2199 2200 @dataclass 2201 class ReviewComment: 2202 """ 2203 Data required to create a review comment on a design review. 2204 2205 :param body: The body of the comment. 2206 :param path: The path of the file to comment on. If you have a 2207 `Content` object, get the path using the `path` property. 2208 :param sub_path: The sub-path of the file to comment on. This is 2209 usually the page ID of the page in the multi-page document. 2210 :param new_position: The line number of the source code file after the 2211 change to add this comment on. Optional, leave unset if this is an ECAD 2212 file or the comment must be on the entire file. 2213 :param old_position: The line number of the source code file before the 2214 change to add this comment on. Optional, leave unset if this is an ECAD 2215 file or the comment must be on the entire file. 2216 """ 2217 2218 body: str 2219 path: str 2220 sub_path: Optional[str] = None 2221 new_position: Optional[int] = None 2222 old_position: Optional[int] = None 2223 2224 def __init__(self, allspice_client): 2225 super().__init__(allspice_client) 2226 2227 _fields_to_parsers: ClassVar[dict] = { 2228 "user": lambda allspice_client, u: User.parse_response(allspice_client, u), 2229 "state": lambda _, s: DesignReviewReview.ReviewEvent(s), 2230 } 2231 2232 def _get_dr_properties(self) -> dict[str, str]: 2233 """ 2234 Get the owner, repo name and design review number from the URL of this 2235 review's DR. 2236 """ 2237 2238 parts = self.pull_request_url.strip("/").split("/") 2239 2240 try: 2241 index = parts[-1] 2242 assert parts[-2] == "pulls" or parts[-2] == "pull", ( 2243 "Expected the second last part of the URL to be 'pulls' or 'pull', " 2244 ) 2245 repo = parts[-3] 2246 owner = parts[-4] 2247 2248 return { 2249 "owner": owner, 2250 "repo": repo, 2251 "index": index, 2252 } 2253 except IndexError: 2254 raise ValueError("Malformed design review URL: {}".format(self.pull_request_url)) 2255 2256 @cached_property 2257 def owner_name(self) -> str: 2258 """ 2259 The owner of the repository this review is on. 2260 """ 2261 2262 return self._get_dr_properties()["owner"] 2263 2264 @cached_property 2265 def repository_name(self) -> str: 2266 """ 2267 The name of the repository this review is on. 2268 """ 2269 2270 return self._get_dr_properties()["repo"] 2271 2272 @cached_property 2273 def index(self) -> str: 2274 """ 2275 The index of the design review this review is on. 2276 """ 2277 2278 return self._get_dr_properties()["index"] 2279 2280 def delete(self): 2281 """ 2282 Delete this review. 2283 """ 2284 2285 self.allspice_client.requests_delete( 2286 self.API_OBJECT.format(**self._get_dr_properties(), id=self.id) 2287 ) 2288 self.deleted = True 2289 2290 def get_comments(self) -> List[DesignReviewReviewComment]: 2291 """ 2292 Get the comments on this review. 2293 """ 2294 2295 result = self.allspice_client.requests_get( 2296 self.GET_COMMENTS.format(**self._get_dr_properties(), id=self.id) 2297 ) 2298 2299 return [ 2300 DesignReviewReviewComment.parse_response(self.allspice_client, comment) 2301 for comment in result 2302 ]
A review on a Design Review.
2256 @cached_property 2257 def owner_name(self) -> str: 2258 """ 2259 The owner of the repository this review is on. 2260 """ 2261 2262 return self._get_dr_properties()["owner"]
The owner of the repository this review is on.
2264 @cached_property 2265 def repository_name(self) -> str: 2266 """ 2267 The name of the repository this review is on. 2268 """ 2269 2270 return self._get_dr_properties()["repo"]
The name of the repository this review is on.
2272 @cached_property 2273 def index(self) -> str: 2274 """ 2275 The index of the design review this review is on. 2276 """ 2277 2278 return self._get_dr_properties()["index"]
The index of the design review this review is on.
2280 def delete(self): 2281 """ 2282 Delete this review. 2283 """ 2284 2285 self.allspice_client.requests_delete( 2286 self.API_OBJECT.format(**self._get_dr_properties(), id=self.id) 2287 ) 2288 self.deleted = True
Delete this review.
2290 def get_comments(self) -> List[DesignReviewReviewComment]: 2291 """ 2292 Get the comments on this review. 2293 """ 2294 2295 result = self.allspice_client.requests_get( 2296 self.GET_COMMENTS.format(**self._get_dr_properties(), id=self.id) 2297 ) 2298 2299 return [ 2300 DesignReviewReviewComment.parse_response(self.allspice_client, comment) 2301 for comment in result 2302 ]
Get the comments on this review.
Inherited Members
2200 @dataclass 2201 class ReviewComment: 2202 """ 2203 Data required to create a review comment on a design review. 2204 2205 :param body: The body of the comment. 2206 :param path: The path of the file to comment on. If you have a 2207 `Content` object, get the path using the `path` property. 2208 :param sub_path: The sub-path of the file to comment on. This is 2209 usually the page ID of the page in the multi-page document. 2210 :param new_position: The line number of the source code file after the 2211 change to add this comment on. Optional, leave unset if this is an ECAD 2212 file or the comment must be on the entire file. 2213 :param old_position: The line number of the source code file before the 2214 change to add this comment on. Optional, leave unset if this is an ECAD 2215 file or the comment must be on the entire file. 2216 """ 2217 2218 body: str 2219 path: str 2220 sub_path: Optional[str] = None 2221 new_position: Optional[int] = None 2222 old_position: Optional[int] = None
Data required to create a review comment on a design review.
Parameters
- body: The body of the comment.
- path: The path of the file to comment on. If you have a
Contentobject, get the path using thepathproperty. - sub_path: The sub-path of the file to comment on. This is usually the page ID of the page in the multi-page document.
- new_position: The line number of the source code file after the change to add this comment on. Optional, leave unset if this is an ECAD file or the comment must be on the entire file.
- old_position: The line number of the source code file before the change to add this comment on. Optional, leave unset if this is an ECAD file or the comment must be on the entire file.
2305class DesignReview(ApiObject): 2306 """ 2307 A Design Review. See 2308 https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest. 2309 2310 Note: The base and head fields are not `Branch` objects - they are plain strings 2311 referring to the branch names. This is because DRs can exist for branches that have 2312 been deleted, which don't have an associated `Branch` object from the API. You can use 2313 the `Repository.get_branch` method to get a `Branch` object for a branch if you know 2314 it exists. 2315 """ 2316 2317 additions: Optional[int] 2318 allow_maintainer_edit: bool 2319 allow_maintainer_edits: Any 2320 assignee: User 2321 assignees: List["User"] 2322 base: str 2323 body: str 2324 changed_files: Optional[int] 2325 closed_at: Optional[str] 2326 comments: int 2327 created_at: str 2328 deletions: Optional[int] 2329 diff_url: str 2330 draft: bool 2331 due_date: Optional[str] 2332 head: str 2333 html_url: str 2334 id: int 2335 is_locked: bool 2336 labels: List[Any] 2337 merge_base: str 2338 merge_commit_sha: Optional[str] 2339 mergeable: bool 2340 merged: bool 2341 merged_at: Optional[str] 2342 merged_by: Any 2343 milestone: Any 2344 number: int 2345 patch_url: str 2346 pin_order: int 2347 repository: Optional["Repository"] 2348 requested_reviewers: Any 2349 requested_reviewers_teams: Any 2350 review_comments: int 2351 state: str 2352 title: str 2353 updated_at: str 2354 url: str 2355 user: User 2356 2357 API_OBJECT = "/repos/{owner}/{repo}/pulls/{index}" 2358 MERGE_DESIGN_REVIEW = "/repos/{owner}/{repo}/pulls/{index}/merge" 2359 GET_REVIEWS = "/repos/{owner}/{repo}/pulls/{index}/reviews" 2360 GET_REVIEW = "/repos/{owner}/{repo}/pulls/{index}/reviews/{review_id}" 2361 GET_COMMENTS = "/repos/{owner}/{repo}/issues/{index}/comments" 2362 2363 OPEN = "open" 2364 CLOSED = "closed" 2365 2366 class MergeType(Enum): 2367 MERGE = "merge" 2368 REBASE = "rebase" 2369 REBASE_MERGE = "rebase-merge" 2370 SQUASH = "squash" 2371 MANUALLY_MERGED = "manually-merged" 2372 2373 def __init__(self, allspice_client): 2374 super().__init__(allspice_client) 2375 2376 def __eq__(self, other): 2377 if not isinstance(other, DesignReview): 2378 return False 2379 return self.repository == other.repository and self.id == other.id 2380 2381 def __hash__(self): 2382 return hash(self.repository) ^ hash(self.id) 2383 2384 @classmethod 2385 def parse_response(cls, allspice_client, result) -> "DesignReview": 2386 api_object = super().parse_response(allspice_client, result) 2387 cls._add_read_property( 2388 "repository", 2389 Repository.parse_response(allspice_client, result["base"]["repo"]), 2390 api_object, 2391 ) 2392 2393 return api_object 2394 2395 @classmethod 2396 def request(cls, allspice_client, owner: str, repo: str, number: str): 2397 """See https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest""" 2398 return cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number}) 2399 2400 _fields_to_parsers: ClassVar[dict] = { 2401 "assignee": lambda allspice_client, u: User.parse_response(allspice_client, u), 2402 "assignees": lambda allspice_client, us: [ 2403 User.parse_response(allspice_client, u) for u in us 2404 ], 2405 "base": lambda _, b: b["ref"], 2406 "head": lambda _, h: h["ref"], 2407 "merged_by": lambda allspice_client, u: User.parse_response(allspice_client, u), 2408 "milestone": lambda allspice_client, m: Milestone.parse_response(allspice_client, m), 2409 "user": lambda allspice_client, u: User.parse_response(allspice_client, u), 2410 } 2411 2412 _patchable_fields: ClassVar[set[str]] = { 2413 "allow_maintainer_edits", 2414 "assignee", 2415 "assignees", 2416 "base", 2417 "body", 2418 "due_date", 2419 "milestone", 2420 "state", 2421 "title", 2422 } 2423 2424 _parsers_to_fields: ClassVar[dict] = { 2425 "assignee": lambda u: u.username, 2426 "assignees": lambda us: [u.username for u in us], 2427 "base": lambda b: b.name if isinstance(b, Branch) else b, 2428 "milestone": lambda m: m.id, 2429 } 2430 2431 def commit(self): 2432 data = self.get_dirty_fields() 2433 if "due_date" in data and data["due_date"] is None: 2434 data["unset_due_date"] = True 2435 2436 args = { 2437 "owner": self.repository.owner.username, 2438 "repo": self.repository.name, 2439 "index": self.number, 2440 } 2441 self._commit(args, data) 2442 2443 def merge(self, merge_type: MergeType): 2444 """ 2445 Merge the pull request. See 2446 https://hub.allspice.io/api/swagger#/repository/repoMergePullRequest 2447 2448 :param merge_type: The type of merge to perform. See the MergeType enum. 2449 """ 2450 2451 self.allspice_client.requests_post( 2452 self.MERGE_DESIGN_REVIEW.format( 2453 owner=self.repository.owner.username, 2454 repo=self.repository.name, 2455 index=self.number, 2456 ), 2457 data={"Do": merge_type.value}, 2458 ) 2459 2460 def get_comments(self) -> List[Comment]: 2461 """ 2462 Get the comments on this pull request, but not specifically on a review. 2463 2464 https://hub.allspice.io/api/swagger#/issue/issueGetComments 2465 2466 :return: A list of comments on this pull request. 2467 """ 2468 2469 results = self.allspice_client.requests_get( 2470 self.GET_COMMENTS.format( 2471 owner=self.repository.owner.username, 2472 repo=self.repository.name, 2473 index=self.number, 2474 ) 2475 ) 2476 return [Comment.parse_response(self.allspice_client, result) for result in results] 2477 2478 def create_comment(self, body: str): 2479 """ 2480 Create a comment on this pull request. This uses the same endpoint as the 2481 comments on issues, and will not be associated with any reviews. 2482 2483 https://hub.allspice.io/api/swagger#/issue/issueCreateComment 2484 2485 :param body: The body of the comment. 2486 :return: The comment that was created. 2487 """ 2488 2489 result = self.allspice_client.requests_post( 2490 self.GET_COMMENTS.format( 2491 owner=self.repository.owner.username, 2492 repo=self.repository.name, 2493 index=self.number, 2494 ), 2495 data={"body": body}, 2496 ) 2497 return Comment.parse_response(self.allspice_client, result) 2498 2499 def create_review( 2500 self, 2501 *, 2502 body: Optional[str] = None, 2503 event: Optional[DesignReviewReview.ReviewEvent] = None, 2504 comments: Optional[List[DesignReviewReview.ReviewComment]] = None, 2505 commit_id: Optional[str] = None, 2506 ) -> DesignReviewReview: 2507 """ 2508 Create a review on this design review. 2509 2510 https://hub.allspice.io/api/swagger#/repository/repoCreatePullRequestReview 2511 2512 Note: in most cases, you should not set the body or event when creating 2513 a review. The event is automatically set to "PENDING" when the review 2514 is created. You should then use `submit_review` to submit the review 2515 with the desired event and body. 2516 2517 :param body: The body of the review. This is the top-level comment on 2518 the review. If not provided, the review will be created with no body. 2519 :param event: The event of the review. This is the overall status of the 2520 review. See the ReviewEvent enum. If not provided, the API will 2521 default to "PENDING". 2522 :param comments: A list of comments on the review. Each comment should 2523 be a ReviewComment object. If not provided, only the base comment 2524 will be created. 2525 :param commit_id: The commit SHA to associate with the review. This is 2526 optional. 2527 """ 2528 2529 data: dict[str, Any] = {} 2530 2531 if body is not None: 2532 data["body"] = body 2533 if event is not None: 2534 data["event"] = event.value 2535 if commit_id is not None: 2536 data["commit_id"] = commit_id 2537 if comments is not None: 2538 data["comments"] = [asdict(comment) for comment in comments] 2539 2540 result = self.allspice_client.requests_post( 2541 self.GET_REVIEWS.format( 2542 owner=self.repository.owner.username, 2543 repo=self.repository.name, 2544 index=self.number, 2545 ), 2546 data=data, 2547 ) 2548 2549 return DesignReviewReview.parse_response(self.allspice_client, result) 2550 2551 def get_reviews(self) -> List[DesignReviewReview]: 2552 """ 2553 Get all reviews on this design review. 2554 2555 https://hub.allspice.io/api/swagger#/repository/repoListPullReviews 2556 """ 2557 2558 results = self.allspice_client.requests_get( 2559 self.GET_REVIEWS.format( 2560 owner=self.repository.owner.username, 2561 repo=self.repository.name, 2562 index=self.number, 2563 ) 2564 ) 2565 2566 return [ 2567 DesignReviewReview.parse_response(self.allspice_client, result) for result in results 2568 ] 2569 2570 def submit_review( 2571 self, 2572 review_id: int, 2573 event: DesignReviewReview.ReviewEvent, 2574 *, 2575 body: Optional[str] = None, 2576 ): 2577 """ 2578 Submit a review on this design review. 2579 2580 https://hub.allspice.io/api/swagger#/repository/repoSubmitPullReview 2581 2582 :param review_id: The ID of the review to submit. 2583 :param event: The event to submit the review with. See the ReviewEvent 2584 enum for the possible values. 2585 :param body: Optional body text for the review submission. 2586 """ 2587 2588 data = { 2589 "event": event.value, 2590 } 2591 if body is not None: 2592 data["body"] = body 2593 2594 result = self.allspice_client.requests_post( 2595 self.GET_REVIEW.format( 2596 owner=self.repository.owner.username, 2597 repo=self.repository.name, 2598 index=self.number, 2599 review_id=review_id, 2600 ), 2601 data=data, 2602 ) 2603 2604 return result
A Design Review. See allspice.allspice.io/api/swagger#/repository/repoGetPullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoGetPullRequest.
Note: The base and head fields are not Branch objects - they are plain strings
referring to the branch names. This is because DRs can exist for branches that have
been deleted, which don't have an associated Branch object from the API. You can use
the Repository.get_branch method to get a Branch object for a branch if you know
it exists.
2384 @classmethod 2385 def parse_response(cls, allspice_client, result) -> "DesignReview": 2386 api_object = super().parse_response(allspice_client, result) 2387 cls._add_read_property( 2388 "repository", 2389 Repository.parse_response(allspice_client, result["base"]["repo"]), 2390 api_object, 2391 ) 2392 2393 return api_object
2395 @classmethod 2396 def request(cls, allspice_client, owner: str, repo: str, number: str): 2397 """See https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest""" 2398 return cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number})
See allspice.allspice.io/api/swagger#/repository/repoGetPullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoGetPullRequest
2431 def commit(self): 2432 data = self.get_dirty_fields() 2433 if "due_date" in data and data["due_date"] is None: 2434 data["unset_due_date"] = True 2435 2436 args = { 2437 "owner": self.repository.owner.username, 2438 "repo": self.repository.name, 2439 "index": self.number, 2440 } 2441 self._commit(args, data)
2443 def merge(self, merge_type: MergeType): 2444 """ 2445 Merge the pull request. See 2446 https://hub.allspice.io/api/swagger#/repository/repoMergePullRequest 2447 2448 :param merge_type: The type of merge to perform. See the MergeType enum. 2449 """ 2450 2451 self.allspice_client.requests_post( 2452 self.MERGE_DESIGN_REVIEW.format( 2453 owner=self.repository.owner.username, 2454 repo=self.repository.name, 2455 index=self.number, 2456 ), 2457 data={"Do": merge_type.value}, 2458 )
Merge the pull request. See allspice.allspice.io/api/swagger#/repository/repoMergePullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoMergePullRequest
Parameters
- merge_type: The type of merge to perform. See the MergeType enum.
2460 def get_comments(self) -> List[Comment]: 2461 """ 2462 Get the comments on this pull request, but not specifically on a review. 2463 2464 https://hub.allspice.io/api/swagger#/issue/issueGetComments 2465 2466 :return: A list of comments on this pull request. 2467 """ 2468 2469 results = self.allspice_client.requests_get( 2470 self.GET_COMMENTS.format( 2471 owner=self.repository.owner.username, 2472 repo=self.repository.name, 2473 index=self.number, 2474 ) 2475 ) 2476 return [Comment.parse_response(self.allspice_client, result) for result in results]
Get the comments on this pull request, but not specifically on a review.
allspice.allspice.io/api/swagger#/issue/issueGetComments">https://huballspice.allspice.io/api/swagger#/issue/issueGetComments
Returns
A list of comments on this pull request.
2478 def create_comment(self, body: str): 2479 """ 2480 Create a comment on this pull request. This uses the same endpoint as the 2481 comments on issues, and will not be associated with any reviews. 2482 2483 https://hub.allspice.io/api/swagger#/issue/issueCreateComment 2484 2485 :param body: The body of the comment. 2486 :return: The comment that was created. 2487 """ 2488 2489 result = self.allspice_client.requests_post( 2490 self.GET_COMMENTS.format( 2491 owner=self.repository.owner.username, 2492 repo=self.repository.name, 2493 index=self.number, 2494 ), 2495 data={"body": body}, 2496 ) 2497 return Comment.parse_response(self.allspice_client, result)
Create a comment on this pull request. This uses the same endpoint as the comments on issues, and will not be associated with any reviews.
allspice.allspice.io/api/swagger#/issue/issueCreateComment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateComment
Parameters
- body: The body of the comment.
Returns
The comment that was created.
2499 def create_review( 2500 self, 2501 *, 2502 body: Optional[str] = None, 2503 event: Optional[DesignReviewReview.ReviewEvent] = None, 2504 comments: Optional[List[DesignReviewReview.ReviewComment]] = None, 2505 commit_id: Optional[str] = None, 2506 ) -> DesignReviewReview: 2507 """ 2508 Create a review on this design review. 2509 2510 https://hub.allspice.io/api/swagger#/repository/repoCreatePullRequestReview 2511 2512 Note: in most cases, you should not set the body or event when creating 2513 a review. The event is automatically set to "PENDING" when the review 2514 is created. You should then use `submit_review` to submit the review 2515 with the desired event and body. 2516 2517 :param body: The body of the review. This is the top-level comment on 2518 the review. If not provided, the review will be created with no body. 2519 :param event: The event of the review. This is the overall status of the 2520 review. See the ReviewEvent enum. If not provided, the API will 2521 default to "PENDING". 2522 :param comments: A list of comments on the review. Each comment should 2523 be a ReviewComment object. If not provided, only the base comment 2524 will be created. 2525 :param commit_id: The commit SHA to associate with the review. This is 2526 optional. 2527 """ 2528 2529 data: dict[str, Any] = {} 2530 2531 if body is not None: 2532 data["body"] = body 2533 if event is not None: 2534 data["event"] = event.value 2535 if commit_id is not None: 2536 data["commit_id"] = commit_id 2537 if comments is not None: 2538 data["comments"] = [asdict(comment) for comment in comments] 2539 2540 result = self.allspice_client.requests_post( 2541 self.GET_REVIEWS.format( 2542 owner=self.repository.owner.username, 2543 repo=self.repository.name, 2544 index=self.number, 2545 ), 2546 data=data, 2547 ) 2548 2549 return DesignReviewReview.parse_response(self.allspice_client, result)
Create a review on this design review.
allspice.allspice.io/api/swagger#/repository/repoCreatePullRequestReview">https://huballspice.allspice.io/api/swagger#/repository/repoCreatePullRequestReview
Note: in most cases, you should not set the body or event when creating
a review. The event is automatically set to "PENDING" when the review
is created. You should then use submit_review to submit the review
with the desired event and body.
Parameters
- body: The body of the review. This is the top-level comment on the review. If not provided, the review will be created with no body.
- event: The event of the review. This is the overall status of the review. See the ReviewEvent enum. If not provided, the API will default to "PENDING".
- comments: A list of comments on the review. Each comment should be a ReviewComment object. If not provided, only the base comment will be created.
- commit_id: The commit SHA to associate with the review. This is optional.
2551 def get_reviews(self) -> List[DesignReviewReview]: 2552 """ 2553 Get all reviews on this design review. 2554 2555 https://hub.allspice.io/api/swagger#/repository/repoListPullReviews 2556 """ 2557 2558 results = self.allspice_client.requests_get( 2559 self.GET_REVIEWS.format( 2560 owner=self.repository.owner.username, 2561 repo=self.repository.name, 2562 index=self.number, 2563 ) 2564 ) 2565 2566 return [ 2567 DesignReviewReview.parse_response(self.allspice_client, result) for result in results 2568 ]
Get all reviews on this design review.
allspice.allspice.io/api/swagger#/repository/repoListPullReviews">https://huballspice.allspice.io/api/swagger#/repository/repoListPullReviews
2570 def submit_review( 2571 self, 2572 review_id: int, 2573 event: DesignReviewReview.ReviewEvent, 2574 *, 2575 body: Optional[str] = None, 2576 ): 2577 """ 2578 Submit a review on this design review. 2579 2580 https://hub.allspice.io/api/swagger#/repository/repoSubmitPullReview 2581 2582 :param review_id: The ID of the review to submit. 2583 :param event: The event to submit the review with. See the ReviewEvent 2584 enum for the possible values. 2585 :param body: Optional body text for the review submission. 2586 """ 2587 2588 data = { 2589 "event": event.value, 2590 } 2591 if body is not None: 2592 data["body"] = body 2593 2594 result = self.allspice_client.requests_post( 2595 self.GET_REVIEW.format( 2596 owner=self.repository.owner.username, 2597 repo=self.repository.name, 2598 index=self.number, 2599 review_id=review_id, 2600 ), 2601 data=data, 2602 ) 2603 2604 return result
Submit a review on this design review.
allspice.allspice.io/api/swagger#/repository/repoSubmitPullReview">https://huballspice.allspice.io/api/swagger#/repository/repoSubmitPullReview
Parameters
- review_id: The ID of the review to submit.
- event: The event to submit the review with. See the ReviewEvent enum for the possible values.
- body: Optional body text for the review submission.
2607class Team(ApiObject): 2608 can_create_org_repo: bool 2609 description: str 2610 id: int 2611 includes_all_repositories: bool 2612 name: str 2613 organization: Optional["Organization"] 2614 permission: str 2615 units: List[str] 2616 units_map: Dict[str, str] 2617 2618 API_OBJECT = """/teams/{id}""" # <id> 2619 ADD_REPO = """/teams/%s/repos/%s/%s""" # <id, org, repo> 2620 TEAM_DELETE = """/teams/%s""" # <id> 2621 GET_MEMBERS = """/teams/%s/members""" # <id> 2622 GET_REPOS = """/teams/%s/repos""" # <id> 2623 2624 def __init__(self, allspice_client): 2625 super().__init__(allspice_client) 2626 2627 def __eq__(self, other): 2628 if not isinstance(other, Team): 2629 return False 2630 return self.organization == other.organization and self.id == other.id 2631 2632 def __hash__(self): 2633 return hash(self.organization) ^ hash(self.id) 2634 2635 _fields_to_parsers: ClassVar[dict] = { 2636 "organization": lambda allspice_client, o: Organization.parse_response(allspice_client, o) 2637 } 2638 2639 _patchable_fields: ClassVar[set[str]] = { 2640 "can_create_org_repo", 2641 "description", 2642 "includes_all_repositories", 2643 "name", 2644 "permission", 2645 "units", 2646 "units_map", 2647 } 2648 2649 @classmethod 2650 def request(cls, allspice_client, id: int): 2651 return cls._request(allspice_client, {"id": id}) 2652 2653 def commit(self): 2654 args = {"id": self.id} 2655 self._commit(args) 2656 2657 def add_user(self, user: User): 2658 """https://hub.allspice.io/api/swagger#/organization/orgAddTeamMember""" 2659 url = f"/teams/{self.id}/members/{user.login}" 2660 self.allspice_client.requests_put(url) 2661 2662 def add_repo(self, org: Organization, repo: Union[Repository, str]): 2663 if isinstance(repo, Repository): 2664 repo_name = repo.name 2665 else: 2666 repo_name = repo 2667 self.allspice_client.requests_put(Team.ADD_REPO % (self.id, org.username, repo_name)) 2668 2669 def get_members(self): 2670 """Get all users assigned to the team.""" 2671 results = self.allspice_client.requests_get_paginated( 2672 Team.GET_MEMBERS % self.id, 2673 ) 2674 return [User.parse_response(self.allspice_client, result) for result in results] 2675 2676 def get_repos(self): 2677 """Get all repos of this Team.""" 2678 results = self.allspice_client.requests_get(Team.GET_REPOS % self.id) 2679 return [Repository.parse_response(self.allspice_client, result) for result in results] 2680 2681 def delete(self): 2682 self.allspice_client.requests_delete(Team.TEAM_DELETE % self.id) 2683 self.deleted = True 2684 2685 def remove_team_member(self, user_name: str): 2686 url = f"/teams/{self.id}/members/{user_name}" 2687 self.allspice_client.requests_delete(url)
2657 def add_user(self, user: User): 2658 """https://hub.allspice.io/api/swagger#/organization/orgAddTeamMember""" 2659 url = f"/teams/{self.id}/members/{user.login}" 2660 self.allspice_client.requests_put(url)
allspice.allspice.io/api/swagger#/organization/orgAddTeamMember">https://huballspice.allspice.io/api/swagger#/organization/orgAddTeamMember
2669 def get_members(self): 2670 """Get all users assigned to the team.""" 2671 results = self.allspice_client.requests_get_paginated( 2672 Team.GET_MEMBERS % self.id, 2673 ) 2674 return [User.parse_response(self.allspice_client, result) for result in results]
Get all users assigned to the team.
2676 def get_repos(self): 2677 """Get all repos of this Team.""" 2678 results = self.allspice_client.requests_get(Team.GET_REPOS % self.id) 2679 return [Repository.parse_response(self.allspice_client, result) for result in results]
Get all repos of this Team.
2690class Release(ApiObject): 2691 """ 2692 A release on a repo. 2693 """ 2694 2695 assets: List[Union[Any, Dict[str, Union[int, str]], "ReleaseAsset"]] 2696 author: User 2697 body: str 2698 created_at: str 2699 draft: bool 2700 html_url: str 2701 id: int 2702 name: str 2703 prerelease: bool 2704 published_at: str 2705 repo: Optional["Repository"] 2706 repository: Optional["Repository"] 2707 tag_name: str 2708 tarball_url: str 2709 target_commitish: str 2710 upload_url: str 2711 url: str 2712 zipball_url: str 2713 2714 API_OBJECT = "/repos/{owner}/{repo}/releases/{id}" 2715 RELEASE_CREATE_ASSET = "/repos/{owner}/{repo}/releases/{id}/assets" 2716 # Note that we don't strictly need the get_assets route, as the release 2717 # object already contains the assets. 2718 2719 def __init__(self, allspice_client): 2720 super().__init__(allspice_client) 2721 2722 def __eq__(self, other): 2723 if not isinstance(other, Release): 2724 return False 2725 return self.repo == other.repo and self.id == other.id 2726 2727 def __hash__(self): 2728 return hash(self.repo) ^ hash(self.id) 2729 2730 _fields_to_parsers: ClassVar[dict] = { 2731 "author": lambda allspice_client, author: User.parse_response(allspice_client, author), 2732 } 2733 _patchable_fields: ClassVar[set[str]] = { 2734 "body", 2735 "draft", 2736 "name", 2737 "prerelease", 2738 "tag_name", 2739 "target_commitish", 2740 } 2741 2742 @classmethod 2743 def parse_response(cls, allspice_client, result, repo) -> Release: 2744 release = super().parse_response(allspice_client, result) 2745 Release._add_read_property("repository", repo, release) 2746 # For legacy reasons 2747 Release._add_read_property("repo", repo, release) 2748 setattr( 2749 release, 2750 "_assets", 2751 [ 2752 ReleaseAsset.parse_response(allspice_client, asset, release) 2753 for asset in result["assets"] 2754 ], 2755 ) 2756 return release 2757 2758 @classmethod 2759 def request( 2760 cls, 2761 allspice_client, 2762 owner: str, 2763 repo: str, 2764 id: Optional[int] = None, 2765 ) -> Release: 2766 args = {"owner": owner, "repo": repo, "id": id} 2767 release_response = cls._get_gitea_api_object(allspice_client, args) 2768 repository = Repository.request(allspice_client, owner, repo) 2769 release = cls.parse_response(allspice_client, release_response, repository) 2770 return release 2771 2772 def commit(self): 2773 if self.repo is None: 2774 raise ValueError("Cannot commit a release without a repository.") 2775 2776 args = {"owner": self.repo.owner.username, "repo": self.repo.name, "id": self.id} 2777 self._commit(args) 2778 2779 def create_asset(self, file: IO, name: Optional[str] = None) -> ReleaseAsset: 2780 """ 2781 Create an asset for this release. 2782 2783 https://hub.allspice.io/api/swagger#/repository/repoCreateReleaseAsset 2784 2785 :param file: The file to upload. This should be a file-like object. 2786 :param name: The name of the file. 2787 :return: The created asset. 2788 """ 2789 2790 if self.repo is None: 2791 raise ValueError("Cannot commit a release without a repository.") 2792 2793 args: dict[str, Any] = {"files": {"attachment": file}} 2794 if name is not None: 2795 args["params"] = {"name": name} 2796 2797 result = self.allspice_client.requests_post( 2798 self.RELEASE_CREATE_ASSET.format( 2799 owner=self.repo.owner.username, 2800 repo=self.repo.name, 2801 id=self.id, 2802 ), 2803 **args, 2804 ) 2805 return ReleaseAsset.parse_response(self.allspice_client, result, self) 2806 2807 def delete(self): 2808 if self.repo is None: 2809 raise ValueError("Cannot commit a release without a repository.") 2810 2811 args = {"owner": self.repo.owner.username, "repo": self.repo.name, "id": self.id} 2812 self.allspice_client.requests_delete(self.API_OBJECT.format(**args)) 2813 self.deleted = True
A release on a repo.
2742 @classmethod 2743 def parse_response(cls, allspice_client, result, repo) -> Release: 2744 release = super().parse_response(allspice_client, result) 2745 Release._add_read_property("repository", repo, release) 2746 # For legacy reasons 2747 Release._add_read_property("repo", repo, release) 2748 setattr( 2749 release, 2750 "_assets", 2751 [ 2752 ReleaseAsset.parse_response(allspice_client, asset, release) 2753 for asset in result["assets"] 2754 ], 2755 ) 2756 return release
2758 @classmethod 2759 def request( 2760 cls, 2761 allspice_client, 2762 owner: str, 2763 repo: str, 2764 id: Optional[int] = None, 2765 ) -> Release: 2766 args = {"owner": owner, "repo": repo, "id": id} 2767 release_response = cls._get_gitea_api_object(allspice_client, args) 2768 repository = Repository.request(allspice_client, owner, repo) 2769 release = cls.parse_response(allspice_client, release_response, repository) 2770 return release
2779 def create_asset(self, file: IO, name: Optional[str] = None) -> ReleaseAsset: 2780 """ 2781 Create an asset for this release. 2782 2783 https://hub.allspice.io/api/swagger#/repository/repoCreateReleaseAsset 2784 2785 :param file: The file to upload. This should be a file-like object. 2786 :param name: The name of the file. 2787 :return: The created asset. 2788 """ 2789 2790 if self.repo is None: 2791 raise ValueError("Cannot commit a release without a repository.") 2792 2793 args: dict[str, Any] = {"files": {"attachment": file}} 2794 if name is not None: 2795 args["params"] = {"name": name} 2796 2797 result = self.allspice_client.requests_post( 2798 self.RELEASE_CREATE_ASSET.format( 2799 owner=self.repo.owner.username, 2800 repo=self.repo.name, 2801 id=self.id, 2802 ), 2803 **args, 2804 ) 2805 return ReleaseAsset.parse_response(self.allspice_client, result, self)
Create an asset for this release.
allspice.allspice.io/api/swagger#/repository/repoCreateReleaseAsset">https://huballspice.allspice.io/api/swagger#/repository/repoCreateReleaseAsset
Parameters
- file: The file to upload. This should be a file-like object.
- name: The name of the file.
Returns
The created asset.
2807 def delete(self): 2808 if self.repo is None: 2809 raise ValueError("Cannot commit a release without a repository.") 2810 2811 args = {"owner": self.repo.owner.username, "repo": self.repo.name, "id": self.id} 2812 self.allspice_client.requests_delete(self.API_OBJECT.format(**args)) 2813 self.deleted = True
2816class ReleaseAsset(ApiObject): 2817 browser_download_url: str 2818 created_at: str 2819 download_count: int 2820 id: int 2821 name: str 2822 release: Optional["Release"] 2823 size: int 2824 uuid: str 2825 2826 API_OBJECT = "/repos/{owner}/{repo}/releases/{release_id}/assets/{id}" 2827 2828 def __init__(self, allspice_client): 2829 super().__init__(allspice_client) 2830 2831 def __eq__(self, other): 2832 if not isinstance(other, ReleaseAsset): 2833 return False 2834 return self.release == other.release and self.id == other.id 2835 2836 def __hash__(self): 2837 return hash(self.release) ^ hash(self.id) 2838 2839 _fields_to_parsers: ClassVar[dict] = {} 2840 _patchable_fields: ClassVar[set[str]] = { 2841 "name", 2842 } 2843 2844 @classmethod 2845 def parse_response(cls, allspice_client, result, release) -> ReleaseAsset: 2846 asset = super().parse_response(allspice_client, result) 2847 ReleaseAsset._add_read_property("release", release, asset) 2848 return asset 2849 2850 @classmethod 2851 def request( 2852 cls, 2853 allspice_client, 2854 owner: str, 2855 repo: str, 2856 release_id: int, 2857 id: int, 2858 ) -> ReleaseAsset: 2859 args = {"owner": owner, "repo": repo, "release_id": release_id, "id": id} 2860 asset_response = cls._get_gitea_api_object(allspice_client, args) 2861 release = Release.request(allspice_client, owner, repo, release_id) 2862 asset = cls.parse_response(allspice_client, asset_response, release) 2863 return asset 2864 2865 def commit(self): 2866 if self.release is None or self.release.repo is None: 2867 raise ValueError("Cannot commit a release asset without a release or a repository.") 2868 2869 args = { 2870 "owner": self.release.repo.owner.username, 2871 "repo": self.release.repo.name, 2872 "release_id": self.release.id, 2873 "id": self.id, 2874 } 2875 self._commit(args) 2876 2877 def download(self) -> bytes: 2878 """ 2879 Download the raw, binary data of this asset. 2880 2881 Note 1: if the file you are requesting is a text file, you might want to 2882 use .decode() on the result to get a string. For example: 2883 2884 asset.download().decode("utf-8") 2885 2886 Note 2: this method will store the entire file in memory. If you are 2887 downloading a large file, you might want to use download_to_file instead. 2888 """ 2889 2890 return self.allspice_client.requests.get( 2891 self.browser_download_url, 2892 headers=self.allspice_client.headers, 2893 ).content 2894 2895 def download_to_file(self, io: IO): 2896 """ 2897 Download the raw, binary data of this asset to a file-like object. 2898 2899 Example: 2900 2901 with open("my_file.zip", "wb") as f: 2902 asset.download_to_file(f) 2903 2904 :param io: The file-like object to write the data to. 2905 """ 2906 2907 response = self.allspice_client.requests.get( 2908 self.browser_download_url, 2909 headers=self.allspice_client.headers, 2910 stream=True, 2911 ) 2912 # 4kb chunks 2913 for chunk in response.iter_content(chunk_size=4096): 2914 if chunk: 2915 io.write(chunk) 2916 2917 def delete(self): 2918 if self.release is None or self.release.repo is None: 2919 raise ValueError("Cannot commit a release asset without a release or a repository.") 2920 2921 args = { 2922 "owner": self.release.repo.owner.username, 2923 "repo": self.release.repo.name, 2924 "release_id": self.release.id, 2925 "id": self.id, 2926 } 2927 self.allspice_client.requests_delete(self.API_OBJECT.format(**args)) 2928 self.deleted = True
2850 @classmethod 2851 def request( 2852 cls, 2853 allspice_client, 2854 owner: str, 2855 repo: str, 2856 release_id: int, 2857 id: int, 2858 ) -> ReleaseAsset: 2859 args = {"owner": owner, "repo": repo, "release_id": release_id, "id": id} 2860 asset_response = cls._get_gitea_api_object(allspice_client, args) 2861 release = Release.request(allspice_client, owner, repo, release_id) 2862 asset = cls.parse_response(allspice_client, asset_response, release) 2863 return asset
2865 def commit(self): 2866 if self.release is None or self.release.repo is None: 2867 raise ValueError("Cannot commit a release asset without a release or a repository.") 2868 2869 args = { 2870 "owner": self.release.repo.owner.username, 2871 "repo": self.release.repo.name, 2872 "release_id": self.release.id, 2873 "id": self.id, 2874 } 2875 self._commit(args)
2877 def download(self) -> bytes: 2878 """ 2879 Download the raw, binary data of this asset. 2880 2881 Note 1: if the file you are requesting is a text file, you might want to 2882 use .decode() on the result to get a string. For example: 2883 2884 asset.download().decode("utf-8") 2885 2886 Note 2: this method will store the entire file in memory. If you are 2887 downloading a large file, you might want to use download_to_file instead. 2888 """ 2889 2890 return self.allspice_client.requests.get( 2891 self.browser_download_url, 2892 headers=self.allspice_client.headers, 2893 ).content
Download the raw, binary data of this asset.
Note 1: if the file you are requesting is a text file, you might want to use .decode() on the result to get a string. For example:
asset.download().decode("utf-8")
Note 2: this method will store the entire file in memory. If you are downloading a large file, you might want to use download_to_file instead.
2895 def download_to_file(self, io: IO): 2896 """ 2897 Download the raw, binary data of this asset to a file-like object. 2898 2899 Example: 2900 2901 with open("my_file.zip", "wb") as f: 2902 asset.download_to_file(f) 2903 2904 :param io: The file-like object to write the data to. 2905 """ 2906 2907 response = self.allspice_client.requests.get( 2908 self.browser_download_url, 2909 headers=self.allspice_client.headers, 2910 stream=True, 2911 ) 2912 # 4kb chunks 2913 for chunk in response.iter_content(chunk_size=4096): 2914 if chunk: 2915 io.write(chunk)
Download the raw, binary data of this asset to a file-like object.
Example:
with open("my_file.zip", "wb") as f:
asset.download_to_file(f)
Parameters
- io: The file-like object to write the data to.
2917 def delete(self): 2918 if self.release is None or self.release.repo is None: 2919 raise ValueError("Cannot commit a release asset without a release or a repository.") 2920 2921 args = { 2922 "owner": self.release.repo.owner.username, 2923 "repo": self.release.repo.name, 2924 "release_id": self.release.id, 2925 "id": self.id, 2926 } 2927 self.allspice_client.requests_delete(self.API_OBJECT.format(**args)) 2928 self.deleted = True
2931class Content(ReadonlyApiObject): 2932 content: Any 2933 download_url: str 2934 encoding: Any 2935 git_url: str 2936 html_url: str 2937 last_commit_sha: str 2938 name: str 2939 path: str 2940 sha: str 2941 size: int 2942 submodule_git_url: Any 2943 target: Any 2944 type: str 2945 url: str 2946 2947 FILE = "file" 2948 2949 def __init__(self, allspice_client): 2950 super().__init__(allspice_client) 2951 2952 def __eq__(self, other): 2953 if not isinstance(other, Content): 2954 return False 2955 2956 return self.sha == other.sha and self.name == other.name 2957 2958 def __hash__(self): 2959 return hash(self.sha) ^ hash(self.name)
Inherited Members
2965class Util: 2966 @staticmethod 2967 def convert_time(time: str) -> datetime: 2968 """Parsing of strange Gitea time format ("%Y-%m-%dT%H:%M:%S:%z" but with ":" in time zone notation)""" 2969 try: 2970 return datetime.strptime(time[:-3] + "00", "%Y-%m-%dT%H:%M:%S%z") 2971 except ValueError: 2972 return datetime.strptime(time[:-3] + "00", "%Y-%m-%dT%H:%M:%S") 2973 2974 @staticmethod 2975 def format_time(time: datetime) -> str: 2976 """ 2977 Format a datetime object to Gitea's time format. 2978 2979 :param time: The time to format 2980 :return: Formatted time 2981 """ 2982 2983 return time.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S") + "Z" 2984 2985 @staticmethod 2986 def data_params_for_ref(ref: Optional[Ref]) -> Dict: 2987 """ 2988 Given a "ref", returns a dict with the ref parameter for the API call. 2989 2990 If the ref is None, returns an empty dict. You can pass this to the API 2991 directly. 2992 """ 2993 2994 if isinstance(ref, Branch): 2995 return {"ref": ref.name} 2996 elif isinstance(ref, Commit): 2997 return {"ref": ref.sha} 2998 elif ref: 2999 return {"ref": ref} 3000 else: 3001 return {}
2966 @staticmethod 2967 def convert_time(time: str) -> datetime: 2968 """Parsing of strange Gitea time format ("%Y-%m-%dT%H:%M:%S:%z" but with ":" in time zone notation)""" 2969 try: 2970 return datetime.strptime(time[:-3] + "00", "%Y-%m-%dT%H:%M:%S%z") 2971 except ValueError: 2972 return datetime.strptime(time[:-3] + "00", "%Y-%m-%dT%H:%M:%S")
Parsing of strange Gitea time format ("%Y-%m-%dT%H:%M:%S:%z" but with ":" in time zone notation)
2974 @staticmethod 2975 def format_time(time: datetime) -> str: 2976 """ 2977 Format a datetime object to Gitea's time format. 2978 2979 :param time: The time to format 2980 :return: Formatted time 2981 """ 2982 2983 return time.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S") + "Z"
Format a datetime object to Gitea's time format.
Parameters
- time: The time to format
Returns
Formatted time
2985 @staticmethod 2986 def data_params_for_ref(ref: Optional[Ref]) -> Dict: 2987 """ 2988 Given a "ref", returns a dict with the ref parameter for the API call. 2989 2990 If the ref is None, returns an empty dict. You can pass this to the API 2991 directly. 2992 """ 2993 2994 if isinstance(ref, Branch): 2995 return {"ref": ref.name} 2996 elif isinstance(ref, Commit): 2997 return {"ref": ref.sha} 2998 elif ref: 2999 return {"ref": ref} 3000 else: 3001 return {}
Given a "ref", returns a dict with the ref parameter for the API call.
If the ref is None, returns an empty dict. You can pass this to the API directly.