allspice
A very simple API client for AllSpice Hub
Note that not the full Swagger-API is accessible. The whole implementation is focused on making access and working with Organizations, Teams, Repositories and Users as pain free as possible.
Forked from https://github.com/Langenfeld/py-gitea.
Usage
Docs
See the documentation site.
Examples
Check the examples directory for full, working example scripts that you can adapt or refer to for your own needs.
Quickstart
First get an allspice_client object wrapping access and authentication (via an api token) for your instance of AllSpice Hub.
from allspice import *
# By default, points to hub.allspice.io.
allspice_client = AllSpice(token_text=TOKEN)
# If you are self-hosting:
allspice_client = AllSpice(allspice_hub_url=URL, token_text=TOKEN)
Operations like requesting the AllSpice version or authentication user can be requested directly from the allspice_client object:
print("AllSpice Version: " + allspice_client.get_version())
print("API-Token belongs to user: " + allspice_client.get_user().username)
Adding entities like Users, Organizations, ... also is done via the allspice_client object.
user = allspice_client.create_user("Test Testson", "test@test.test", "password")
All operations on entities in allspice are then accomplished via the according wrapper objects for those entities.
Each of those objects has a .request method that creates an entity according to your allspice_client instance.
other_user = User.request(allspice_client, "OtherUserName")
print(other_user.username)
Note that the fields of the User, Organization,... classes are dynamically created at runtime, and thus not visible during divelopment. Refer to the AllSpice API documentation for the fields names.
Fields that can not be altered via allspice-api, are read only. After altering a field, the .commit method of the according object must be called to synchronize the changed fields with your allspice_client instance.
org = Organization.request(allspice_client, test_org)
org.description = "some new description"
org.location = "some new location"
org.commit()
An entity in allspice can be deleted by calling delete.
org.delete()
All entity objects do have methods to execute some of the requests possible though the AllSpice api:
org = Organization.request(allspice_client, ORGNAME)
teams = org.get_teams()
for team in teams:
repos = team.get_repos()
for repo in repos:
print(repo.name)
1""" 2.. include:: ../README.md 3 :start-line: 1 4 :end-before: Installation 5""" 6 7from .allspice import ( 8 AllSpice, 9) 10from .apiobject import ( 11 Branch, 12 Comment, 13 Commit, 14 Content, 15 DesignReview, 16 DesignReviewReview, 17 Issue, 18 Milestone, 19 Organization, 20 Release, 21 Repository, 22 Team, 23 User, 24) 25from .exceptions import AlreadyExistsException, NotFoundException 26 27__version__ = "4.0.0b1" 28 29__all__ = [ 30 "AllSpice", 31 "AlreadyExistsException", 32 "Branch", 33 "Comment", 34 "Commit", 35 "Content", 36 "DesignReview", 37 "DesignReviewReview", 38 "Issue", 39 "Milestone", 40 "NotFoundException", 41 "Organization", 42 "Release", 43 "Repository", 44 "Team", 45 "User", 46]
21class AllSpice: 22 """Object to establish a session with AllSpice Hub.""" 23 24 ADMIN_CREATE_USER = """/admin/users""" 25 GET_USERS_ADMIN = """/admin/users""" 26 ADMIN_REPO_CREATE = """/admin/users/%s/repos""" # <ownername> 27 ALLSPICE_HUB_VERSION = """/version""" 28 GET_USER = """/user""" 29 GET_REPOSITORY = """/repos/{owner}/{name}""" 30 CREATE_ORG = """/admin/users/%s/orgs""" # <username> 31 CREATE_TEAM = """/orgs/%s/teams""" # <orgname> 32 33 def __init__( 34 self, 35 allspice_hub_url="https://hub.allspice.io", 36 token_text=None, 37 auth=None, 38 verify=True, 39 log_level="INFO", 40 ratelimiting=(100, 60), 41 use_new_schdoc_renderer: Optional[bool] = None, 42 ): 43 """Initializing an instance of the AllSpice Hub Client 44 45 Args: 46 allspice_hub_url (str): The URL for the AllSpice Hub instance. 47 Defaults to `https://hub.allspice.io`. 48 49 token_text (str, None): The access token, by default None. 50 51 auth (tuple, None): The user credentials 52 `(username, password)`, by default None. 53 54 verify (bool): If True, allow insecure server connections 55 when using SSL. 56 57 log_level (str): The log level, by default `INFO`. 58 59 ratelimiting (tuple[int, int], None): `(max_calls, period)`, 60 If None, no rate limiting is applied. By default, 100 calls 61 per minute are allowed. 62 63 use_new_schdoc_renderer (bool): Allows explicit override for using the new Altium schematic renderer. If set, 64 this will take precedence over the default behavior on the AllSpice Hub instance. 65 """ 66 67 self.logger = logging.getLogger(__name__) 68 handler = logging.StreamHandler(sys.stderr) 69 handler.setFormatter( 70 logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") 71 ) 72 self.logger.addHandler(handler) 73 self.logger.setLevel(log_level) 74 self.headers = { 75 "Content-type": "application/json", 76 } 77 self.url = allspice_hub_url 78 79 if ratelimiting is None: 80 self.requests = requests.Session() 81 else: 82 (max_calls, period) = ratelimiting 83 self.requests = RateLimitedSession(max_calls=max_calls, period=period) 84 85 # Manage authentification 86 if not token_text and not auth: 87 raise ValueError("Please provide auth or token_text, but not both") 88 if token_text: 89 self.headers["Authorization"] = "token " + token_text 90 if auth: 91 self.logger.warning( 92 "Using basic auth is not recommended. Prefer using a token instead." 93 ) 94 self.requests.auth = auth 95 96 # Manage SSL certification verification 97 self.requests.verify = verify 98 if not verify: 99 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) 100 101 self.use_new_schdoc_renderer = use_new_schdoc_renderer 102 103 def __get_url(self, endpoint): 104 url = self.url + "/api/v1" + endpoint 105 self.logger.debug("Url: %s" % url) 106 return url 107 108 def __get(self, endpoint: str, params: Mapping = frozendict()) -> requests.Response: 109 request = self.requests.get(self.__get_url(endpoint), headers=self.headers, params=params) 110 if request.status_code not in [200, 201]: 111 message = f"Received status code: {request.status_code} ({request.url})" 112 if request.status_code in [404]: 113 raise NotFoundException(message) 114 if request.status_code in [403]: 115 raise Exception( 116 f"Unauthorized: {request.url} - Check your permissions and try again! ({message})" 117 ) 118 if request.status_code in [409]: 119 raise ConflictException(message) 120 if request.status_code in [503]: 121 raise NotYetGeneratedException(message) 122 raise Exception(message) 123 return request 124 125 @staticmethod 126 def parse_result(result) -> Dict: 127 """Parses the result-JSON to a dict.""" 128 if result.text and len(result.text) > 3: 129 return json.loads(result.text) 130 return {} 131 132 def requests_get(self, endpoint: str, params: Mapping = frozendict(), sudo=None): 133 combined_params = {} 134 combined_params.update(params) 135 if sudo: 136 combined_params["sudo"] = sudo.username 137 return self.parse_result(self.__get(endpoint, combined_params)) 138 139 def requests_get_raw(self, endpoint: str, params=frozendict(), sudo=None) -> bytes: 140 combined_params = {} 141 combined_params.update(params) 142 if sudo: 143 combined_params["sudo"] = sudo.username 144 return self.__get(endpoint, combined_params).content 145 146 def requests_get_paginated( 147 self, 148 endpoint: str, 149 params=frozendict(), 150 sudo=None, 151 page_key: str = "page", 152 first_page: int = 1, 153 ): 154 page = first_page 155 combined_params = {} 156 combined_params.update(params) 157 aggregated_result = [] 158 while True: 159 combined_params[page_key] = page 160 result = self.requests_get(endpoint, combined_params, sudo) 161 162 if not result: 163 return aggregated_result 164 165 if isinstance(result, dict): 166 if "data" in result: 167 data = result["data"] 168 if len(data) == 0: 169 return aggregated_result 170 aggregated_result.extend(data) 171 elif "tree" in result: 172 data = result["tree"] 173 if data is None or len(data) == 0: 174 return aggregated_result 175 aggregated_result.extend(data) 176 else: 177 raise NotImplementedError( 178 "requests_get_paginated does not know how to handle responses of this type." 179 ) 180 else: 181 aggregated_result.extend(result) 182 183 page += 1 184 185 def requests_put(self, endpoint: str, data: Optional[dict] = None): 186 if not data: 187 data = {} 188 request = self.requests.put( 189 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 190 ) 191 if request.status_code not in [200, 204]: 192 message = f"Received status code: {request.status_code} ({request.url}) {request.text}" 193 self.logger.error(message) 194 raise Exception(message) 195 196 def requests_delete(self, endpoint: str, data: Optional[dict] = None): 197 request = self.requests.delete( 198 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 199 ) 200 if request.status_code not in [200, 204]: 201 message = f"Received status code: {request.status_code} ({request.url})" 202 self.logger.error(message) 203 raise Exception(message) 204 205 def requests_post( 206 self, 207 endpoint: str, 208 data: Optional[dict] = None, 209 params: Optional[dict] = None, 210 files: Optional[dict] = None, 211 ): 212 """ 213 Make a POST call to the endpoint. 214 215 :param endpoint: The path to the endpoint 216 :param data: A dictionary for JSON data 217 :param params: A dictionary of query params 218 :param files: A dictionary of files, see requests.post. Using both files and data 219 can lead to unexpected results! 220 :return: The JSON response parsed as a dict 221 """ 222 223 # This should ideally be a TypedDict of the type of arguments taken by 224 # `requests.post`. 225 args: dict[str, Any] = { 226 "headers": self.headers.copy(), 227 } 228 if data is not None: 229 args["data"] = json.dumps(data) 230 if params is not None: 231 args["params"] = params 232 if files is not None: 233 args["headers"].pop("Content-type") 234 args["files"] = files 235 236 request = self.requests.post(self.__get_url(endpoint), **args) 237 238 if request.status_code not in [200, 201, 202]: 239 if "already exists" in request.text or "e-mail already in use" in request.text: 240 self.logger.warning(request.text) 241 raise AlreadyExistsException() 242 self.logger.error(f"Received status code: {request.status_code} ({request.url})") 243 self.logger.error(f"With info: {data} ({self.headers})") 244 self.logger.error(f"Answer: {request.text}") 245 raise Exception( 246 f"Received status code: {request.status_code} ({request.url}), {request.text}" 247 ) 248 return self.parse_result(request) 249 250 def requests_patch(self, endpoint: str, data: dict): 251 request = self.requests.patch( 252 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 253 ) 254 if request.status_code not in [200, 201]: 255 error_message = f"Received status code: {request.status_code} ({request.url}) {data}" 256 self.logger.error(error_message) 257 raise Exception(error_message) 258 return self.parse_result(request) 259 260 def get_orgs_public_members_all(self, orgname): 261 path = "/orgs/" + orgname + "/public_members" 262 return self.requests_get(path) 263 264 def get_orgs(self): 265 path = "/admin/orgs" 266 results = self.requests_get(path) 267 return [Organization.parse_response(self, result) for result in results] 268 269 def get_user(self): 270 result = self.requests_get(AllSpice.GET_USER) 271 return User.parse_response(self, result) 272 273 def get_version(self) -> str: 274 result = self.requests_get(AllSpice.ALLSPICE_HUB_VERSION) 275 return result["version"] 276 277 def get_users(self) -> List[User]: 278 results = self.requests_get(AllSpice.GET_USERS_ADMIN) 279 return [User.parse_response(self, result) for result in results] 280 281 def get_user_by_email(self, email: str) -> Optional[User]: 282 users = self.get_users() 283 for user in users: 284 if user.email == email or email in user.emails: 285 return user 286 return None 287 288 def get_user_by_name(self, username: str) -> Optional[User]: 289 users = self.get_users() 290 for user in users: 291 if user.username == username: 292 return user 293 return None 294 295 def get_repository(self, owner: str, name: str) -> Repository: 296 path = self.GET_REPOSITORY.format(owner=owner, name=name) 297 result = self.requests_get(path) 298 return Repository.parse_response(self, result) 299 300 def create_user( 301 self, 302 user_name: str, 303 email: str, 304 password: str, 305 full_name: Optional[str] = None, 306 login_name: Optional[str] = None, 307 change_pw=True, 308 send_notify=True, 309 source_id=0, 310 ): 311 """Create User. 312 Throws: 313 AlreadyExistsException, if the User exists already 314 Exception, if something else went wrong. 315 """ 316 if not login_name: 317 login_name = user_name 318 if not full_name: 319 full_name = user_name 320 request_data = { 321 "source_id": source_id, 322 "login_name": login_name, 323 "full_name": full_name, 324 "username": user_name, 325 "email": email, 326 "password": password, 327 "send_notify": send_notify, 328 "must_change_password": change_pw, 329 } 330 331 self.logger.debug("Gitea post payload: %s", request_data) 332 result = self.requests_post(AllSpice.ADMIN_CREATE_USER, data=request_data) 333 if "id" in result: 334 self.logger.info( 335 "Successfully created User %s <%s> (id %s)", 336 result["login"], 337 result["email"], 338 result["id"], 339 ) 340 self.logger.debug("Gitea response: %s", result) 341 else: 342 self.logger.error(result["message"]) 343 raise Exception("User not created... (gitea: %s)" % result["message"]) 344 user = User.parse_response(self, result) 345 return user 346 347 def create_repo( 348 self, 349 repoOwner: Union[User, Organization], 350 repoName: str, 351 description: str = "", 352 private: bool = False, 353 autoInit=True, 354 gitignores: Optional[str] = None, 355 license: Optional[str] = None, 356 readme: str = "Default", 357 issue_labels: Optional[str] = None, 358 default_branch="master", 359 ): 360 """Create a Repository as the administrator 361 362 Throws: 363 AlreadyExistsException: If the Repository exists already. 364 Exception: If something else went wrong. 365 366 Note: 367 Non-admin users can not use this method. Please use instead 368 `allspice.User.create_repo` or `allspice.Organization.create_repo`. 369 """ 370 # although this only says user in the api, this also works for 371 # organizations 372 assert isinstance(repoOwner, User) or isinstance(repoOwner, Organization) 373 result = self.requests_post( 374 AllSpice.ADMIN_REPO_CREATE % repoOwner.username, 375 data={ 376 "name": repoName, 377 "description": description, 378 "private": private, 379 "auto_init": autoInit, 380 "gitignores": gitignores, 381 "license": license, 382 "issue_labels": issue_labels, 383 "readme": readme, 384 "default_branch": default_branch, 385 }, 386 ) 387 if "id" in result: 388 self.logger.info("Successfully created Repository %s " % result["name"]) 389 else: 390 self.logger.error(result["message"]) 391 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 392 return Repository.parse_response(self, result) 393 394 def create_org( 395 self, 396 owner: User, 397 orgName: str, 398 description: str, 399 location="", 400 website="", 401 full_name="", 402 ): 403 assert isinstance(owner, User) 404 result = self.requests_post( 405 AllSpice.CREATE_ORG % owner.username, 406 data={ 407 "username": orgName, 408 "description": description, 409 "location": location, 410 "website": website, 411 "full_name": full_name, 412 }, 413 ) 414 if "id" in result: 415 self.logger.info("Successfully created Organization %s" % result["username"]) 416 else: 417 self.logger.error("Organization not created... (gitea: %s)" % result["message"]) 418 self.logger.error(result["message"]) 419 raise Exception("Organization not created... (gitea: %s)" % result["message"]) 420 return Organization.parse_response(self, result) 421 422 def create_team( 423 self, 424 org: Organization, 425 name: str, 426 description: str = "", 427 permission: str = "read", 428 can_create_org_repo: bool = False, 429 includes_all_repositories: bool = False, 430 units=( 431 "repo.code", 432 "repo.issues", 433 "repo.ext_issues", 434 "repo.wiki", 435 "repo.pulls", 436 "repo.releases", 437 "repo.ext_wiki", 438 ), 439 units_map={}, 440 ): 441 """Creates a Team. 442 443 Args: 444 org (Organization): Organization the Team will be part of. 445 name (str): The Name of the Team to be created. 446 description (str): Optional, None, short description of the new Team. 447 permission (str): Optional, 'read', What permissions the members 448 units_map (dict): Optional, {}, a mapping of units to their 449 permissions. If None or empty, the `permission` permission will 450 be applied to all units. Note: When both `units` and `units_map` 451 are given, `units_map` will be preferred. 452 """ 453 454 result = self.requests_post( 455 AllSpice.CREATE_TEAM % org.username, 456 data={ 457 "name": name, 458 "description": description, 459 "permission": permission, 460 "can_create_org_repo": can_create_org_repo, 461 "includes_all_repositories": includes_all_repositories, 462 "units": units, 463 "units_map": units_map, 464 }, 465 ) 466 467 if "id" in result: 468 self.logger.info("Successfully created Team %s" % result["name"]) 469 else: 470 self.logger.error("Team not created... (gitea: %s)" % result["message"]) 471 self.logger.error(result["message"]) 472 raise Exception("Team not created... (gitea: %s)" % result["message"]) 473 api_object = Team.parse_response(self, result) 474 setattr( 475 api_object, "_organization", org 476 ) # fixes strange behaviour of gitea not returning a valid organization here. 477 return api_object
Object to establish a session with AllSpice Hub.
33 def __init__( 34 self, 35 allspice_hub_url="https://hub.allspice.io", 36 token_text=None, 37 auth=None, 38 verify=True, 39 log_level="INFO", 40 ratelimiting=(100, 60), 41 use_new_schdoc_renderer: Optional[bool] = None, 42 ): 43 """Initializing an instance of the AllSpice Hub Client 44 45 Args: 46 allspice_hub_url (str): The URL for the AllSpice Hub instance. 47 Defaults to `https://hub.allspice.io`. 48 49 token_text (str, None): The access token, by default None. 50 51 auth (tuple, None): The user credentials 52 `(username, password)`, by default None. 53 54 verify (bool): If True, allow insecure server connections 55 when using SSL. 56 57 log_level (str): The log level, by default `INFO`. 58 59 ratelimiting (tuple[int, int], None): `(max_calls, period)`, 60 If None, no rate limiting is applied. By default, 100 calls 61 per minute are allowed. 62 63 use_new_schdoc_renderer (bool): Allows explicit override for using the new Altium schematic renderer. If set, 64 this will take precedence over the default behavior on the AllSpice Hub instance. 65 """ 66 67 self.logger = logging.getLogger(__name__) 68 handler = logging.StreamHandler(sys.stderr) 69 handler.setFormatter( 70 logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") 71 ) 72 self.logger.addHandler(handler) 73 self.logger.setLevel(log_level) 74 self.headers = { 75 "Content-type": "application/json", 76 } 77 self.url = allspice_hub_url 78 79 if ratelimiting is None: 80 self.requests = requests.Session() 81 else: 82 (max_calls, period) = ratelimiting 83 self.requests = RateLimitedSession(max_calls=max_calls, period=period) 84 85 # Manage authentification 86 if not token_text and not auth: 87 raise ValueError("Please provide auth or token_text, but not both") 88 if token_text: 89 self.headers["Authorization"] = "token " + token_text 90 if auth: 91 self.logger.warning( 92 "Using basic auth is not recommended. Prefer using a token instead." 93 ) 94 self.requests.auth = auth 95 96 # Manage SSL certification verification 97 self.requests.verify = verify 98 if not verify: 99 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) 100 101 self.use_new_schdoc_renderer = use_new_schdoc_renderer
Initializing an instance of the AllSpice Hub Client
Args:
allspice_hub_url (str): The URL for the AllSpice Hub instance.
Defaults to https://hub.allspice.io.
token_text (str, None): The access token, by default None.
auth (tuple, None): The user credentials
`(username, password)`, by default None.
verify (bool): If True, allow insecure server connections
when using SSL.
log_level (str): The log level, by default `INFO`.
ratelimiting (tuple[int, int], None): `(max_calls, period)`,
If None, no rate limiting is applied. By default, 100 calls
per minute are allowed.
use_new_schdoc_renderer (bool): Allows explicit override for using the new Altium schematic renderer. If set,
this will take precedence over the default behavior on the AllSpice Hub instance.
125 @staticmethod 126 def parse_result(result) -> Dict: 127 """Parses the result-JSON to a dict.""" 128 if result.text and len(result.text) > 3: 129 return json.loads(result.text) 130 return {}
Parses the result-JSON to a dict.
146 def requests_get_paginated( 147 self, 148 endpoint: str, 149 params=frozendict(), 150 sudo=None, 151 page_key: str = "page", 152 first_page: int = 1, 153 ): 154 page = first_page 155 combined_params = {} 156 combined_params.update(params) 157 aggregated_result = [] 158 while True: 159 combined_params[page_key] = page 160 result = self.requests_get(endpoint, combined_params, sudo) 161 162 if not result: 163 return aggregated_result 164 165 if isinstance(result, dict): 166 if "data" in result: 167 data = result["data"] 168 if len(data) == 0: 169 return aggregated_result 170 aggregated_result.extend(data) 171 elif "tree" in result: 172 data = result["tree"] 173 if data is None or len(data) == 0: 174 return aggregated_result 175 aggregated_result.extend(data) 176 else: 177 raise NotImplementedError( 178 "requests_get_paginated does not know how to handle responses of this type." 179 ) 180 else: 181 aggregated_result.extend(result) 182 183 page += 1
185 def requests_put(self, endpoint: str, data: Optional[dict] = None): 186 if not data: 187 data = {} 188 request = self.requests.put( 189 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 190 ) 191 if request.status_code not in [200, 204]: 192 message = f"Received status code: {request.status_code} ({request.url}) {request.text}" 193 self.logger.error(message) 194 raise Exception(message)
196 def requests_delete(self, endpoint: str, data: Optional[dict] = None): 197 request = self.requests.delete( 198 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 199 ) 200 if request.status_code not in [200, 204]: 201 message = f"Received status code: {request.status_code} ({request.url})" 202 self.logger.error(message) 203 raise Exception(message)
205 def requests_post( 206 self, 207 endpoint: str, 208 data: Optional[dict] = None, 209 params: Optional[dict] = None, 210 files: Optional[dict] = None, 211 ): 212 """ 213 Make a POST call to the endpoint. 214 215 :param endpoint: The path to the endpoint 216 :param data: A dictionary for JSON data 217 :param params: A dictionary of query params 218 :param files: A dictionary of files, see requests.post. Using both files and data 219 can lead to unexpected results! 220 :return: The JSON response parsed as a dict 221 """ 222 223 # This should ideally be a TypedDict of the type of arguments taken by 224 # `requests.post`. 225 args: dict[str, Any] = { 226 "headers": self.headers.copy(), 227 } 228 if data is not None: 229 args["data"] = json.dumps(data) 230 if params is not None: 231 args["params"] = params 232 if files is not None: 233 args["headers"].pop("Content-type") 234 args["files"] = files 235 236 request = self.requests.post(self.__get_url(endpoint), **args) 237 238 if request.status_code not in [200, 201, 202]: 239 if "already exists" in request.text or "e-mail already in use" in request.text: 240 self.logger.warning(request.text) 241 raise AlreadyExistsException() 242 self.logger.error(f"Received status code: {request.status_code} ({request.url})") 243 self.logger.error(f"With info: {data} ({self.headers})") 244 self.logger.error(f"Answer: {request.text}") 245 raise Exception( 246 f"Received status code: {request.status_code} ({request.url}), {request.text}" 247 ) 248 return self.parse_result(request)
Make a POST call to the endpoint.
Parameters
- endpoint: The path to the endpoint
- data: A dictionary for JSON data
- params: A dictionary of query params
- files: A dictionary of files, see requests.post. Using both files and data can lead to unexpected results!
Returns
The JSON response parsed as a dict
250 def requests_patch(self, endpoint: str, data: dict): 251 request = self.requests.patch( 252 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 253 ) 254 if request.status_code not in [200, 201]: 255 error_message = f"Received status code: {request.status_code} ({request.url}) {data}" 256 self.logger.error(error_message) 257 raise Exception(error_message) 258 return self.parse_result(request)
300 def create_user( 301 self, 302 user_name: str, 303 email: str, 304 password: str, 305 full_name: Optional[str] = None, 306 login_name: Optional[str] = None, 307 change_pw=True, 308 send_notify=True, 309 source_id=0, 310 ): 311 """Create User. 312 Throws: 313 AlreadyExistsException, if the User exists already 314 Exception, if something else went wrong. 315 """ 316 if not login_name: 317 login_name = user_name 318 if not full_name: 319 full_name = user_name 320 request_data = { 321 "source_id": source_id, 322 "login_name": login_name, 323 "full_name": full_name, 324 "username": user_name, 325 "email": email, 326 "password": password, 327 "send_notify": send_notify, 328 "must_change_password": change_pw, 329 } 330 331 self.logger.debug("Gitea post payload: %s", request_data) 332 result = self.requests_post(AllSpice.ADMIN_CREATE_USER, data=request_data) 333 if "id" in result: 334 self.logger.info( 335 "Successfully created User %s <%s> (id %s)", 336 result["login"], 337 result["email"], 338 result["id"], 339 ) 340 self.logger.debug("Gitea response: %s", result) 341 else: 342 self.logger.error(result["message"]) 343 raise Exception("User not created... (gitea: %s)" % result["message"]) 344 user = User.parse_response(self, result) 345 return user
Create User. Throws: AlreadyExistsException, if the User exists already Exception, if something else went wrong.
347 def create_repo( 348 self, 349 repoOwner: Union[User, Organization], 350 repoName: str, 351 description: str = "", 352 private: bool = False, 353 autoInit=True, 354 gitignores: Optional[str] = None, 355 license: Optional[str] = None, 356 readme: str = "Default", 357 issue_labels: Optional[str] = None, 358 default_branch="master", 359 ): 360 """Create a Repository as the administrator 361 362 Throws: 363 AlreadyExistsException: If the Repository exists already. 364 Exception: If something else went wrong. 365 366 Note: 367 Non-admin users can not use this method. Please use instead 368 `allspice.User.create_repo` or `allspice.Organization.create_repo`. 369 """ 370 # although this only says user in the api, this also works for 371 # organizations 372 assert isinstance(repoOwner, User) or isinstance(repoOwner, Organization) 373 result = self.requests_post( 374 AllSpice.ADMIN_REPO_CREATE % repoOwner.username, 375 data={ 376 "name": repoName, 377 "description": description, 378 "private": private, 379 "auto_init": autoInit, 380 "gitignores": gitignores, 381 "license": license, 382 "issue_labels": issue_labels, 383 "readme": readme, 384 "default_branch": default_branch, 385 }, 386 ) 387 if "id" in result: 388 self.logger.info("Successfully created Repository %s " % result["name"]) 389 else: 390 self.logger.error(result["message"]) 391 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 392 return Repository.parse_response(self, result)
Create a Repository as the administrator
Throws: AlreadyExistsException: If the Repository exists already. Exception: If something else went wrong.
Note:
Non-admin users can not use this method. Please use instead
allspice.User.create_repo or allspice.Organization.create_repo.
394 def create_org( 395 self, 396 owner: User, 397 orgName: str, 398 description: str, 399 location="", 400 website="", 401 full_name="", 402 ): 403 assert isinstance(owner, User) 404 result = self.requests_post( 405 AllSpice.CREATE_ORG % owner.username, 406 data={ 407 "username": orgName, 408 "description": description, 409 "location": location, 410 "website": website, 411 "full_name": full_name, 412 }, 413 ) 414 if "id" in result: 415 self.logger.info("Successfully created Organization %s" % result["username"]) 416 else: 417 self.logger.error("Organization not created... (gitea: %s)" % result["message"]) 418 self.logger.error(result["message"]) 419 raise Exception("Organization not created... (gitea: %s)" % result["message"]) 420 return Organization.parse_response(self, result)
422 def create_team( 423 self, 424 org: Organization, 425 name: str, 426 description: str = "", 427 permission: str = "read", 428 can_create_org_repo: bool = False, 429 includes_all_repositories: bool = False, 430 units=( 431 "repo.code", 432 "repo.issues", 433 "repo.ext_issues", 434 "repo.wiki", 435 "repo.pulls", 436 "repo.releases", 437 "repo.ext_wiki", 438 ), 439 units_map={}, 440 ): 441 """Creates a Team. 442 443 Args: 444 org (Organization): Organization the Team will be part of. 445 name (str): The Name of the Team to be created. 446 description (str): Optional, None, short description of the new Team. 447 permission (str): Optional, 'read', What permissions the members 448 units_map (dict): Optional, {}, a mapping of units to their 449 permissions. If None or empty, the `permission` permission will 450 be applied to all units. Note: When both `units` and `units_map` 451 are given, `units_map` will be preferred. 452 """ 453 454 result = self.requests_post( 455 AllSpice.CREATE_TEAM % org.username, 456 data={ 457 "name": name, 458 "description": description, 459 "permission": permission, 460 "can_create_org_repo": can_create_org_repo, 461 "includes_all_repositories": includes_all_repositories, 462 "units": units, 463 "units_map": units_map, 464 }, 465 ) 466 467 if "id" in result: 468 self.logger.info("Successfully created Team %s" % result["name"]) 469 else: 470 self.logger.error("Team not created... (gitea: %s)" % result["message"]) 471 self.logger.error(result["message"]) 472 raise Exception("Team not created... (gitea: %s)" % result["message"]) 473 api_object = Team.parse_response(self, result) 474 setattr( 475 api_object, "_organization", org 476 ) # fixes strange behaviour of gitea not returning a valid organization here. 477 return api_object
Creates a Team.
Args:
org (Organization): Organization the Team will be part of.
name (str): The Name of the Team to be created.
description (str): Optional, None, short description of the new Team.
permission (str): Optional, 'read', What permissions the members
units_map (dict): Optional, {}, a mapping of units to their
permissions. If None or empty, the permission permission will
be applied to all units. Note: When both units and units_map
are given, units_map will be preferred.
Common base class for all non-exit exceptions.
422class Branch(ReadonlyApiObject): 423 commit: Dict[str, Optional[Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]]] 424 effective_branch_protection_name: str 425 enable_status_check: bool 426 name: str 427 protected: bool 428 required_approvals: int 429 status_check_contexts: List[Any] 430 user_can_merge: bool 431 user_can_push: bool 432 433 API_OBJECT = """/repos/{owner}/{repo}/branches/{branch}""" 434 435 def __init__(self, allspice_client): 436 super().__init__(allspice_client) 437 438 def __eq__(self, other): 439 if not isinstance(other, Branch): 440 return False 441 return self.commit == other.commit and self.name == other.name 442 443 def __hash__(self): 444 return hash(self.commit["id"]) ^ hash(self.name) 445 446 _fields_to_parsers: ClassVar[dict] = { 447 # This is not a commit object 448 # "commit": lambda allspice_client, c: Commit.parse_response(allspice_client, c) 449 } 450 451 @classmethod 452 def request(cls, allspice_client, owner: str, repo: str, branch: str): 453 return cls._request(allspice_client, {"owner": owner, "repo": repo, "branch": branch})
Inherited Members
1622class Comment(ApiObject): 1623 assets: List[Union[Any, Dict[str, Union[int, str]]]] 1624 body: str 1625 created_at: datetime 1626 html_url: str 1627 id: int 1628 issue_url: str 1629 original_author: str 1630 original_author_id: int 1631 pull_request_url: str 1632 updated_at: datetime 1633 user: User 1634 1635 API_OBJECT = """/repos/{owner}/{repo}/issues/comments/{id}""" 1636 GET_ATTACHMENTS_PATH = """/repos/{owner}/{repo}/issues/comments/{id}/assets""" 1637 ATTACHMENT_PATH = """/repos/{owner}/{repo}/issues/comments/{id}/assets/{attachment_id}""" 1638 1639 def __init__(self, allspice_client): 1640 super().__init__(allspice_client) 1641 1642 def __eq__(self, other): 1643 if not isinstance(other, Comment): 1644 return False 1645 return self.repository == other.repository and self.id == other.id 1646 1647 def __hash__(self): 1648 return hash(self.repository) ^ hash(self.id) 1649 1650 @classmethod 1651 def request(cls, allspice_client, owner: str, repo: str, id: str) -> "Comment": 1652 return cls._request(allspice_client, {"owner": owner, "repo": repo, "id": id}) 1653 1654 _fields_to_parsers: ClassVar[dict] = { 1655 "user": lambda allspice_client, r: User.parse_response(allspice_client, r), 1656 "created_at": lambda _, t: Util.convert_time(t), 1657 "updated_at": lambda _, t: Util.convert_time(t), 1658 } 1659 1660 _patchable_fields: ClassVar[set[str]] = {"body"} 1661 1662 @property 1663 def parent_url(self) -> str: 1664 """URL of the parent of this comment (the issue or the pull request)""" 1665 1666 if self.issue_url is not None and self.issue_url != "": 1667 return self.issue_url 1668 else: 1669 return self.pull_request_url 1670 1671 @cached_property 1672 def repository(self) -> Repository: 1673 """The repository this comment was posted on.""" 1674 1675 owner_name, repo_name = self.parent_url.split("/")[-4:-2] 1676 return Repository.request(self.allspice_client, owner_name, repo_name) 1677 1678 def __fields_for_path(self): 1679 return { 1680 "owner": self.repository.owner.username, 1681 "repo": self.repository.name, 1682 "id": self.id, 1683 } 1684 1685 def commit(self): 1686 self._commit(self.__fields_for_path()) 1687 1688 def delete(self): 1689 self.allspice_client.requests_delete(self.API_OBJECT.format(**self.__fields_for_path())) 1690 self.deleted = True 1691 1692 def get_attachments(self) -> List[Attachment]: 1693 """ 1694 Get all attachments on this comment. This returns Attachment objects, which 1695 contain a link to download the attachment. 1696 1697 https://hub.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments 1698 """ 1699 1700 results = self.allspice_client.requests_get( 1701 self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()) 1702 ) 1703 return [Attachment.parse_response(self.allspice_client, result) for result in results] 1704 1705 def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment: 1706 """ 1707 Create an attachment on this comment. 1708 1709 https://hub.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment 1710 1711 :param file: The file to attach. This should be a file-like object. 1712 :param name: The name of the file. If not provided, the name of the file will be 1713 used. 1714 :return: The created attachment. 1715 """ 1716 1717 args: dict[str, Any] = { 1718 "files": {"attachment": file}, 1719 } 1720 if name is not None: 1721 args["params"] = {"name": name} 1722 1723 result = self.allspice_client.requests_post( 1724 self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()), 1725 **args, 1726 ) 1727 return Attachment.parse_response(self.allspice_client, result) 1728 1729 def edit_attachment(self, attachment: Attachment, data: dict) -> Attachment: 1730 """ 1731 Edit an attachment. 1732 1733 The list of params that can be edited is available at 1734 https://hub.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment 1735 1736 :param attachment: The attachment to be edited 1737 :param data: The data parameter should be a dictionary of the fields to edit. 1738 :return: The edited attachment 1739 """ 1740 1741 args = { 1742 **self.__fields_for_path(), 1743 "attachment_id": attachment.id, 1744 } 1745 result = self.allspice_client.requests_patch( 1746 self.ATTACHMENT_PATH.format(**args), 1747 data=data, 1748 ) 1749 return Attachment.parse_response(self.allspice_client, result) 1750 1751 def delete_attachment(self, attachment: Attachment): 1752 """https://hub.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment""" 1753 1754 args = { 1755 **self.__fields_for_path(), 1756 "attachment_id": attachment.id, 1757 } 1758 self.allspice_client.requests_delete(self.ATTACHMENT_PATH.format(**args)) 1759 attachment.deleted = True
1662 @property 1663 def parent_url(self) -> str: 1664 """URL of the parent of this comment (the issue or the pull request)""" 1665 1666 if self.issue_url is not None and self.issue_url != "": 1667 return self.issue_url 1668 else: 1669 return self.pull_request_url
URL of the parent of this comment (the issue or the pull request)
1671 @cached_property 1672 def repository(self) -> Repository: 1673 """The repository this comment was posted on.""" 1674 1675 owner_name, repo_name = self.parent_url.split("/")[-4:-2] 1676 return Repository.request(self.allspice_client, owner_name, repo_name)
The repository this comment was posted on.
1692 def get_attachments(self) -> List[Attachment]: 1693 """ 1694 Get all attachments on this comment. This returns Attachment objects, which 1695 contain a link to download the attachment. 1696 1697 https://hub.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments 1698 """ 1699 1700 results = self.allspice_client.requests_get( 1701 self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()) 1702 ) 1703 return [Attachment.parse_response(self.allspice_client, result) for result in results]
Get all attachments on this comment. This returns Attachment objects, which contain a link to download the attachment.
allspice.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments">https://huballspice.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments
1705 def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment: 1706 """ 1707 Create an attachment on this comment. 1708 1709 https://hub.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment 1710 1711 :param file: The file to attach. This should be a file-like object. 1712 :param name: The name of the file. If not provided, the name of the file will be 1713 used. 1714 :return: The created attachment. 1715 """ 1716 1717 args: dict[str, Any] = { 1718 "files": {"attachment": file}, 1719 } 1720 if name is not None: 1721 args["params"] = {"name": name} 1722 1723 result = self.allspice_client.requests_post( 1724 self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()), 1725 **args, 1726 ) 1727 return Attachment.parse_response(self.allspice_client, result)
Create an attachment on this comment.
allspice.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment
Parameters
- file: The file to attach. This should be a file-like object.
- name: The name of the file. If not provided, the name of the file will be used.
Returns
The created attachment.
1729 def edit_attachment(self, attachment: Attachment, data: dict) -> Attachment: 1730 """ 1731 Edit an attachment. 1732 1733 The list of params that can be edited is available at 1734 https://hub.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment 1735 1736 :param attachment: The attachment to be edited 1737 :param data: The data parameter should be a dictionary of the fields to edit. 1738 :return: The edited attachment 1739 """ 1740 1741 args = { 1742 **self.__fields_for_path(), 1743 "attachment_id": attachment.id, 1744 } 1745 result = self.allspice_client.requests_patch( 1746 self.ATTACHMENT_PATH.format(**args), 1747 data=data, 1748 ) 1749 return Attachment.parse_response(self.allspice_client, result)
Edit an attachment.
The list of params that can be edited is available at allspice.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment
Parameters
- attachment: The attachment to be edited
- data: The data parameter should be a dictionary of the fields to edit.
Returns
The edited attachment
1751 def delete_attachment(self, attachment: Attachment): 1752 """https://hub.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment""" 1753 1754 args = { 1755 **self.__fields_for_path(), 1756 "attachment_id": attachment.id, 1757 } 1758 self.allspice_client.requests_delete(self.ATTACHMENT_PATH.format(**args)) 1759 attachment.deleted = True
allspice.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment
1762class Commit(ReadonlyApiObject): 1763 author: User 1764 commit: Dict[str, Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]] 1765 committer: Dict[str, Union[int, str, bool]] 1766 created: str 1767 files: List[Dict[str, str]] 1768 html_url: str 1769 inner_commit: Dict[str, Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]] 1770 parents: List[Union[Dict[str, str], Any]] 1771 sha: str 1772 stats: Dict[str, int] 1773 url: str 1774 1775 API_OBJECT = """/repos/{owner}/{repo}/commits/{sha}""" 1776 COMMIT_GET_STATUS = """/repos/{owner}/{repo}/commits/{sha}/status""" 1777 COMMIT_GET_STATUSES = """/repos/{owner}/{repo}/commits/{sha}/statuses""" 1778 1779 # Regex to extract owner and repo names from the url property 1780 URL_REGEXP = re.compile(r"/repos/([^/]+)/([^/]+)/git/commits") 1781 1782 def __init__(self, allspice_client): 1783 super().__init__(allspice_client) 1784 1785 _fields_to_parsers: ClassVar[dict] = { 1786 # NOTE: api may return None for commiters that are no allspice users 1787 "author": lambda allspice_client, u: User.parse_response(allspice_client, u) if u else None 1788 } 1789 1790 def __eq__(self, other): 1791 if not isinstance(other, Commit): 1792 return False 1793 return self.sha == other.sha 1794 1795 def __hash__(self): 1796 return hash(self.sha) 1797 1798 @classmethod 1799 def parse_response(cls, allspice_client, result) -> "Commit": 1800 commit_cache = result["commit"] 1801 api_object = cls(allspice_client) 1802 cls._initialize(allspice_client, api_object, result) 1803 # inner_commit for legacy reasons 1804 Commit._add_read_property("inner_commit", commit_cache, api_object) 1805 return api_object 1806 1807 def get_status(self) -> CommitCombinedStatus: 1808 """ 1809 Get a combined status consisting of all statues on this commit. 1810 1811 Note that the returned object is a CommitCombinedStatus object, which 1812 also contains a list of all statuses on the commit. 1813 1814 https://hub.allspice.io/api/swagger#/repository/repoGetCommitStatus 1815 """ 1816 1817 result = self.allspice_client.requests_get( 1818 self.COMMIT_GET_STATUS.format(**self._fields_for_path) 1819 ) 1820 return CommitCombinedStatus.parse_response(self.allspice_client, result) 1821 1822 def get_statuses(self) -> List[CommitStatus]: 1823 """ 1824 Get a list of all statuses on this commit. 1825 1826 https://hub.allspice.io/api/swagger#/repository/repoListCommitStatuses 1827 """ 1828 1829 results = self.allspice_client.requests_get( 1830 self.COMMIT_GET_STATUSES.format(**self._fields_for_path) 1831 ) 1832 return [CommitStatus.parse_response(self.allspice_client, result) for result in results] 1833 1834 @cached_property 1835 def _fields_for_path(self) -> dict[str, str]: 1836 matches = self.URL_REGEXP.search(self.url) 1837 if not matches: 1838 raise ValueError(f"Invalid commit URL: {self.url}") 1839 1840 return { 1841 "owner": matches.group(1), 1842 "repo": matches.group(2), 1843 "sha": self.sha, 1844 }
1798 @classmethod 1799 def parse_response(cls, allspice_client, result) -> "Commit": 1800 commit_cache = result["commit"] 1801 api_object = cls(allspice_client) 1802 cls._initialize(allspice_client, api_object, result) 1803 # inner_commit for legacy reasons 1804 Commit._add_read_property("inner_commit", commit_cache, api_object) 1805 return api_object
1807 def get_status(self) -> CommitCombinedStatus: 1808 """ 1809 Get a combined status consisting of all statues on this commit. 1810 1811 Note that the returned object is a CommitCombinedStatus object, which 1812 also contains a list of all statuses on the commit. 1813 1814 https://hub.allspice.io/api/swagger#/repository/repoGetCommitStatus 1815 """ 1816 1817 result = self.allspice_client.requests_get( 1818 self.COMMIT_GET_STATUS.format(**self._fields_for_path) 1819 ) 1820 return CommitCombinedStatus.parse_response(self.allspice_client, result)
Get a combined status consisting of all statues on this commit.
Note that the returned object is a CommitCombinedStatus object, which also contains a list of all statuses on the commit.
allspice.allspice.io/api/swagger#/repository/repoGetCommitStatus">https://huballspice.allspice.io/api/swagger#/repository/repoGetCommitStatus
1822 def get_statuses(self) -> List[CommitStatus]: 1823 """ 1824 Get a list of all statuses on this commit. 1825 1826 https://hub.allspice.io/api/swagger#/repository/repoListCommitStatuses 1827 """ 1828 1829 results = self.allspice_client.requests_get( 1830 self.COMMIT_GET_STATUSES.format(**self._fields_for_path) 1831 ) 1832 return [CommitStatus.parse_response(self.allspice_client, result) for result in results]
Get a list of all statuses on this commit.
allspice.allspice.io/api/swagger#/repository/repoListCommitStatuses">https://huballspice.allspice.io/api/swagger#/repository/repoListCommitStatuses
Inherited Members
2931class Content(ReadonlyApiObject): 2932 content: Any 2933 download_url: str 2934 encoding: Any 2935 git_url: str 2936 html_url: str 2937 last_commit_sha: str 2938 name: str 2939 path: str 2940 sha: str 2941 size: int 2942 submodule_git_url: Any 2943 target: Any 2944 type: str 2945 url: str 2946 2947 FILE = "file" 2948 2949 def __init__(self, allspice_client): 2950 super().__init__(allspice_client) 2951 2952 def __eq__(self, other): 2953 if not isinstance(other, Content): 2954 return False 2955 2956 return self.sha == other.sha and self.name == other.name 2957 2958 def __hash__(self): 2959 return hash(self.sha) ^ hash(self.name)
Inherited Members
2305class DesignReview(ApiObject): 2306 """ 2307 A Design Review. See 2308 https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest. 2309 2310 Note: The base and head fields are not `Branch` objects - they are plain strings 2311 referring to the branch names. This is because DRs can exist for branches that have 2312 been deleted, which don't have an associated `Branch` object from the API. You can use 2313 the `Repository.get_branch` method to get a `Branch` object for a branch if you know 2314 it exists. 2315 """ 2316 2317 additions: Optional[int] 2318 allow_maintainer_edit: bool 2319 allow_maintainer_edits: Any 2320 assignee: User 2321 assignees: List["User"] 2322 base: str 2323 body: str 2324 changed_files: Optional[int] 2325 closed_at: Optional[str] 2326 comments: int 2327 created_at: str 2328 deletions: Optional[int] 2329 diff_url: str 2330 draft: bool 2331 due_date: Optional[str] 2332 head: str 2333 html_url: str 2334 id: int 2335 is_locked: bool 2336 labels: List[Any] 2337 merge_base: str 2338 merge_commit_sha: Optional[str] 2339 mergeable: bool 2340 merged: bool 2341 merged_at: Optional[str] 2342 merged_by: Any 2343 milestone: Any 2344 number: int 2345 patch_url: str 2346 pin_order: int 2347 repository: Optional["Repository"] 2348 requested_reviewers: Any 2349 requested_reviewers_teams: Any 2350 review_comments: int 2351 state: str 2352 title: str 2353 updated_at: str 2354 url: str 2355 user: User 2356 2357 API_OBJECT = "/repos/{owner}/{repo}/pulls/{index}" 2358 MERGE_DESIGN_REVIEW = "/repos/{owner}/{repo}/pulls/{index}/merge" 2359 GET_REVIEWS = "/repos/{owner}/{repo}/pulls/{index}/reviews" 2360 GET_REVIEW = "/repos/{owner}/{repo}/pulls/{index}/reviews/{review_id}" 2361 GET_COMMENTS = "/repos/{owner}/{repo}/issues/{index}/comments" 2362 2363 OPEN = "open" 2364 CLOSED = "closed" 2365 2366 class MergeType(Enum): 2367 MERGE = "merge" 2368 REBASE = "rebase" 2369 REBASE_MERGE = "rebase-merge" 2370 SQUASH = "squash" 2371 MANUALLY_MERGED = "manually-merged" 2372 2373 def __init__(self, allspice_client): 2374 super().__init__(allspice_client) 2375 2376 def __eq__(self, other): 2377 if not isinstance(other, DesignReview): 2378 return False 2379 return self.repository == other.repository and self.id == other.id 2380 2381 def __hash__(self): 2382 return hash(self.repository) ^ hash(self.id) 2383 2384 @classmethod 2385 def parse_response(cls, allspice_client, result) -> "DesignReview": 2386 api_object = super().parse_response(allspice_client, result) 2387 cls._add_read_property( 2388 "repository", 2389 Repository.parse_response(allspice_client, result["base"]["repo"]), 2390 api_object, 2391 ) 2392 2393 return api_object 2394 2395 @classmethod 2396 def request(cls, allspice_client, owner: str, repo: str, number: str): 2397 """See https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest""" 2398 return cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number}) 2399 2400 _fields_to_parsers: ClassVar[dict] = { 2401 "assignee": lambda allspice_client, u: User.parse_response(allspice_client, u), 2402 "assignees": lambda allspice_client, us: [ 2403 User.parse_response(allspice_client, u) for u in us 2404 ], 2405 "base": lambda _, b: b["ref"], 2406 "head": lambda _, h: h["ref"], 2407 "merged_by": lambda allspice_client, u: User.parse_response(allspice_client, u), 2408 "milestone": lambda allspice_client, m: Milestone.parse_response(allspice_client, m), 2409 "user": lambda allspice_client, u: User.parse_response(allspice_client, u), 2410 } 2411 2412 _patchable_fields: ClassVar[set[str]] = { 2413 "allow_maintainer_edits", 2414 "assignee", 2415 "assignees", 2416 "base", 2417 "body", 2418 "due_date", 2419 "milestone", 2420 "state", 2421 "title", 2422 } 2423 2424 _parsers_to_fields: ClassVar[dict] = { 2425 "assignee": lambda u: u.username, 2426 "assignees": lambda us: [u.username for u in us], 2427 "base": lambda b: b.name if isinstance(b, Branch) else b, 2428 "milestone": lambda m: m.id, 2429 } 2430 2431 def commit(self): 2432 data = self.get_dirty_fields() 2433 if "due_date" in data and data["due_date"] is None: 2434 data["unset_due_date"] = True 2435 2436 args = { 2437 "owner": self.repository.owner.username, 2438 "repo": self.repository.name, 2439 "index": self.number, 2440 } 2441 self._commit(args, data) 2442 2443 def merge(self, merge_type: MergeType): 2444 """ 2445 Merge the pull request. See 2446 https://hub.allspice.io/api/swagger#/repository/repoMergePullRequest 2447 2448 :param merge_type: The type of merge to perform. See the MergeType enum. 2449 """ 2450 2451 self.allspice_client.requests_post( 2452 self.MERGE_DESIGN_REVIEW.format( 2453 owner=self.repository.owner.username, 2454 repo=self.repository.name, 2455 index=self.number, 2456 ), 2457 data={"Do": merge_type.value}, 2458 ) 2459 2460 def get_comments(self) -> List[Comment]: 2461 """ 2462 Get the comments on this pull request, but not specifically on a review. 2463 2464 https://hub.allspice.io/api/swagger#/issue/issueGetComments 2465 2466 :return: A list of comments on this pull request. 2467 """ 2468 2469 results = self.allspice_client.requests_get( 2470 self.GET_COMMENTS.format( 2471 owner=self.repository.owner.username, 2472 repo=self.repository.name, 2473 index=self.number, 2474 ) 2475 ) 2476 return [Comment.parse_response(self.allspice_client, result) for result in results] 2477 2478 def create_comment(self, body: str): 2479 """ 2480 Create a comment on this pull request. This uses the same endpoint as the 2481 comments on issues, and will not be associated with any reviews. 2482 2483 https://hub.allspice.io/api/swagger#/issue/issueCreateComment 2484 2485 :param body: The body of the comment. 2486 :return: The comment that was created. 2487 """ 2488 2489 result = self.allspice_client.requests_post( 2490 self.GET_COMMENTS.format( 2491 owner=self.repository.owner.username, 2492 repo=self.repository.name, 2493 index=self.number, 2494 ), 2495 data={"body": body}, 2496 ) 2497 return Comment.parse_response(self.allspice_client, result) 2498 2499 def create_review( 2500 self, 2501 *, 2502 body: Optional[str] = None, 2503 event: Optional[DesignReviewReview.ReviewEvent] = None, 2504 comments: Optional[List[DesignReviewReview.ReviewComment]] = None, 2505 commit_id: Optional[str] = None, 2506 ) -> DesignReviewReview: 2507 """ 2508 Create a review on this design review. 2509 2510 https://hub.allspice.io/api/swagger#/repository/repoCreatePullRequestReview 2511 2512 Note: in most cases, you should not set the body or event when creating 2513 a review. The event is automatically set to "PENDING" when the review 2514 is created. You should then use `submit_review` to submit the review 2515 with the desired event and body. 2516 2517 :param body: The body of the review. This is the top-level comment on 2518 the review. If not provided, the review will be created with no body. 2519 :param event: The event of the review. This is the overall status of the 2520 review. See the ReviewEvent enum. If not provided, the API will 2521 default to "PENDING". 2522 :param comments: A list of comments on the review. Each comment should 2523 be a ReviewComment object. If not provided, only the base comment 2524 will be created. 2525 :param commit_id: The commit SHA to associate with the review. This is 2526 optional. 2527 """ 2528 2529 data: dict[str, Any] = {} 2530 2531 if body is not None: 2532 data["body"] = body 2533 if event is not None: 2534 data["event"] = event.value 2535 if commit_id is not None: 2536 data["commit_id"] = commit_id 2537 if comments is not None: 2538 data["comments"] = [asdict(comment) for comment in comments] 2539 2540 result = self.allspice_client.requests_post( 2541 self.GET_REVIEWS.format( 2542 owner=self.repository.owner.username, 2543 repo=self.repository.name, 2544 index=self.number, 2545 ), 2546 data=data, 2547 ) 2548 2549 return DesignReviewReview.parse_response(self.allspice_client, result) 2550 2551 def get_reviews(self) -> List[DesignReviewReview]: 2552 """ 2553 Get all reviews on this design review. 2554 2555 https://hub.allspice.io/api/swagger#/repository/repoListPullReviews 2556 """ 2557 2558 results = self.allspice_client.requests_get( 2559 self.GET_REVIEWS.format( 2560 owner=self.repository.owner.username, 2561 repo=self.repository.name, 2562 index=self.number, 2563 ) 2564 ) 2565 2566 return [ 2567 DesignReviewReview.parse_response(self.allspice_client, result) for result in results 2568 ] 2569 2570 def submit_review( 2571 self, 2572 review_id: int, 2573 event: DesignReviewReview.ReviewEvent, 2574 *, 2575 body: Optional[str] = None, 2576 ): 2577 """ 2578 Submit a review on this design review. 2579 2580 https://hub.allspice.io/api/swagger#/repository/repoSubmitPullReview 2581 2582 :param review_id: The ID of the review to submit. 2583 :param event: The event to submit the review with. See the ReviewEvent 2584 enum for the possible values. 2585 :param body: Optional body text for the review submission. 2586 """ 2587 2588 data = { 2589 "event": event.value, 2590 } 2591 if body is not None: 2592 data["body"] = body 2593 2594 result = self.allspice_client.requests_post( 2595 self.GET_REVIEW.format( 2596 owner=self.repository.owner.username, 2597 repo=self.repository.name, 2598 index=self.number, 2599 review_id=review_id, 2600 ), 2601 data=data, 2602 ) 2603 2604 return result
A Design Review. See allspice.allspice.io/api/swagger#/repository/repoGetPullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoGetPullRequest.
Note: The base and head fields are not Branch objects - they are plain strings
referring to the branch names. This is because DRs can exist for branches that have
been deleted, which don't have an associated Branch object from the API. You can use
the Repository.get_branch method to get a Branch object for a branch if you know
it exists.
2384 @classmethod 2385 def parse_response(cls, allspice_client, result) -> "DesignReview": 2386 api_object = super().parse_response(allspice_client, result) 2387 cls._add_read_property( 2388 "repository", 2389 Repository.parse_response(allspice_client, result["base"]["repo"]), 2390 api_object, 2391 ) 2392 2393 return api_object
2395 @classmethod 2396 def request(cls, allspice_client, owner: str, repo: str, number: str): 2397 """See https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest""" 2398 return cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number})
See allspice.allspice.io/api/swagger#/repository/repoGetPullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoGetPullRequest
2431 def commit(self): 2432 data = self.get_dirty_fields() 2433 if "due_date" in data and data["due_date"] is None: 2434 data["unset_due_date"] = True 2435 2436 args = { 2437 "owner": self.repository.owner.username, 2438 "repo": self.repository.name, 2439 "index": self.number, 2440 } 2441 self._commit(args, data)
2443 def merge(self, merge_type: MergeType): 2444 """ 2445 Merge the pull request. See 2446 https://hub.allspice.io/api/swagger#/repository/repoMergePullRequest 2447 2448 :param merge_type: The type of merge to perform. See the MergeType enum. 2449 """ 2450 2451 self.allspice_client.requests_post( 2452 self.MERGE_DESIGN_REVIEW.format( 2453 owner=self.repository.owner.username, 2454 repo=self.repository.name, 2455 index=self.number, 2456 ), 2457 data={"Do": merge_type.value}, 2458 )
Merge the pull request. See allspice.allspice.io/api/swagger#/repository/repoMergePullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoMergePullRequest
Parameters
- merge_type: The type of merge to perform. See the MergeType enum.
2460 def get_comments(self) -> List[Comment]: 2461 """ 2462 Get the comments on this pull request, but not specifically on a review. 2463 2464 https://hub.allspice.io/api/swagger#/issue/issueGetComments 2465 2466 :return: A list of comments on this pull request. 2467 """ 2468 2469 results = self.allspice_client.requests_get( 2470 self.GET_COMMENTS.format( 2471 owner=self.repository.owner.username, 2472 repo=self.repository.name, 2473 index=self.number, 2474 ) 2475 ) 2476 return [Comment.parse_response(self.allspice_client, result) for result in results]
Get the comments on this pull request, but not specifically on a review.
allspice.allspice.io/api/swagger#/issue/issueGetComments">https://huballspice.allspice.io/api/swagger#/issue/issueGetComments
Returns
A list of comments on this pull request.
2478 def create_comment(self, body: str): 2479 """ 2480 Create a comment on this pull request. This uses the same endpoint as the 2481 comments on issues, and will not be associated with any reviews. 2482 2483 https://hub.allspice.io/api/swagger#/issue/issueCreateComment 2484 2485 :param body: The body of the comment. 2486 :return: The comment that was created. 2487 """ 2488 2489 result = self.allspice_client.requests_post( 2490 self.GET_COMMENTS.format( 2491 owner=self.repository.owner.username, 2492 repo=self.repository.name, 2493 index=self.number, 2494 ), 2495 data={"body": body}, 2496 ) 2497 return Comment.parse_response(self.allspice_client, result)
Create a comment on this pull request. This uses the same endpoint as the comments on issues, and will not be associated with any reviews.
allspice.allspice.io/api/swagger#/issue/issueCreateComment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateComment
Parameters
- body: The body of the comment.
Returns
The comment that was created.
2499 def create_review( 2500 self, 2501 *, 2502 body: Optional[str] = None, 2503 event: Optional[DesignReviewReview.ReviewEvent] = None, 2504 comments: Optional[List[DesignReviewReview.ReviewComment]] = None, 2505 commit_id: Optional[str] = None, 2506 ) -> DesignReviewReview: 2507 """ 2508 Create a review on this design review. 2509 2510 https://hub.allspice.io/api/swagger#/repository/repoCreatePullRequestReview 2511 2512 Note: in most cases, you should not set the body or event when creating 2513 a review. The event is automatically set to "PENDING" when the review 2514 is created. You should then use `submit_review` to submit the review 2515 with the desired event and body. 2516 2517 :param body: The body of the review. This is the top-level comment on 2518 the review. If not provided, the review will be created with no body. 2519 :param event: The event of the review. This is the overall status of the 2520 review. See the ReviewEvent enum. If not provided, the API will 2521 default to "PENDING". 2522 :param comments: A list of comments on the review. Each comment should 2523 be a ReviewComment object. If not provided, only the base comment 2524 will be created. 2525 :param commit_id: The commit SHA to associate with the review. This is 2526 optional. 2527 """ 2528 2529 data: dict[str, Any] = {} 2530 2531 if body is not None: 2532 data["body"] = body 2533 if event is not None: 2534 data["event"] = event.value 2535 if commit_id is not None: 2536 data["commit_id"] = commit_id 2537 if comments is not None: 2538 data["comments"] = [asdict(comment) for comment in comments] 2539 2540 result = self.allspice_client.requests_post( 2541 self.GET_REVIEWS.format( 2542 owner=self.repository.owner.username, 2543 repo=self.repository.name, 2544 index=self.number, 2545 ), 2546 data=data, 2547 ) 2548 2549 return DesignReviewReview.parse_response(self.allspice_client, result)
Create a review on this design review.
allspice.allspice.io/api/swagger#/repository/repoCreatePullRequestReview">https://huballspice.allspice.io/api/swagger#/repository/repoCreatePullRequestReview
Note: in most cases, you should not set the body or event when creating
a review. The event is automatically set to "PENDING" when the review
is created. You should then use submit_review to submit the review
with the desired event and body.
Parameters
- body: The body of the review. This is the top-level comment on the review. If not provided, the review will be created with no body.
- event: The event of the review. This is the overall status of the review. See the ReviewEvent enum. If not provided, the API will default to "PENDING".
- comments: A list of comments on the review. Each comment should be a ReviewComment object. If not provided, only the base comment will be created.
- commit_id: The commit SHA to associate with the review. This is optional.
2551 def get_reviews(self) -> List[DesignReviewReview]: 2552 """ 2553 Get all reviews on this design review. 2554 2555 https://hub.allspice.io/api/swagger#/repository/repoListPullReviews 2556 """ 2557 2558 results = self.allspice_client.requests_get( 2559 self.GET_REVIEWS.format( 2560 owner=self.repository.owner.username, 2561 repo=self.repository.name, 2562 index=self.number, 2563 ) 2564 ) 2565 2566 return [ 2567 DesignReviewReview.parse_response(self.allspice_client, result) for result in results 2568 ]
Get all reviews on this design review.
allspice.allspice.io/api/swagger#/repository/repoListPullReviews">https://huballspice.allspice.io/api/swagger#/repository/repoListPullReviews
2570 def submit_review( 2571 self, 2572 review_id: int, 2573 event: DesignReviewReview.ReviewEvent, 2574 *, 2575 body: Optional[str] = None, 2576 ): 2577 """ 2578 Submit a review on this design review. 2579 2580 https://hub.allspice.io/api/swagger#/repository/repoSubmitPullReview 2581 2582 :param review_id: The ID of the review to submit. 2583 :param event: The event to submit the review with. See the ReviewEvent 2584 enum for the possible values. 2585 :param body: Optional body text for the review submission. 2586 """ 2587 2588 data = { 2589 "event": event.value, 2590 } 2591 if body is not None: 2592 data["body"] = body 2593 2594 result = self.allspice_client.requests_post( 2595 self.GET_REVIEW.format( 2596 owner=self.repository.owner.username, 2597 repo=self.repository.name, 2598 index=self.number, 2599 review_id=review_id, 2600 ), 2601 data=data, 2602 ) 2603 2604 return result
Submit a review on this design review.
allspice.allspice.io/api/swagger#/repository/repoSubmitPullReview">https://huballspice.allspice.io/api/swagger#/repository/repoSubmitPullReview
Parameters
- review_id: The ID of the review to submit.
- event: The event to submit the review with. See the ReviewEvent enum for the possible values.
- body: Optional body text for the review submission.
2169class DesignReviewReview(ReadonlyApiObject): 2170 """ 2171 A review on a Design Review. 2172 """ 2173 2174 body: str 2175 comments_count: int 2176 commit_id: str 2177 dismissed: bool 2178 html_url: str 2179 id: int 2180 official: bool 2181 pull_request_url: str 2182 stale: bool 2183 state: ReviewEvent 2184 submitted_at: str 2185 team: Any 2186 updated_at: str 2187 user: User 2188 2189 API_OBJECT = "/repos/{owner}/{repo}/pulls/{index}/reviews/{id}" 2190 GET_COMMENTS = "/repos/{owner}/{repo}/pulls/{index}/reviews/{id}/comments" 2191 2192 class ReviewEvent(Enum): 2193 APPROVED = "APPROVED" 2194 PENDING = "PENDING" 2195 COMMENT = "COMMENT" 2196 REQUEST_CHANGES = "REQUEST_CHANGES" 2197 REQUEST_REVIEW = "REQUEST_REVIEW" 2198 UNKNOWN = "" 2199 2200 @dataclass 2201 class ReviewComment: 2202 """ 2203 Data required to create a review comment on a design review. 2204 2205 :param body: The body of the comment. 2206 :param path: The path of the file to comment on. If you have a 2207 `Content` object, get the path using the `path` property. 2208 :param sub_path: The sub-path of the file to comment on. This is 2209 usually the page ID of the page in the multi-page document. 2210 :param new_position: The line number of the source code file after the 2211 change to add this comment on. Optional, leave unset if this is an ECAD 2212 file or the comment must be on the entire file. 2213 :param old_position: The line number of the source code file before the 2214 change to add this comment on. Optional, leave unset if this is an ECAD 2215 file or the comment must be on the entire file. 2216 """ 2217 2218 body: str 2219 path: str 2220 sub_path: Optional[str] = None 2221 new_position: Optional[int] = None 2222 old_position: Optional[int] = None 2223 2224 def __init__(self, allspice_client): 2225 super().__init__(allspice_client) 2226 2227 _fields_to_parsers: ClassVar[dict] = { 2228 "user": lambda allspice_client, u: User.parse_response(allspice_client, u), 2229 "state": lambda _, s: DesignReviewReview.ReviewEvent(s), 2230 } 2231 2232 def _get_dr_properties(self) -> dict[str, str]: 2233 """ 2234 Get the owner, repo name and design review number from the URL of this 2235 review's DR. 2236 """ 2237 2238 parts = self.pull_request_url.strip("/").split("/") 2239 2240 try: 2241 index = parts[-1] 2242 assert parts[-2] == "pulls" or parts[-2] == "pull", ( 2243 "Expected the second last part of the URL to be 'pulls' or 'pull', " 2244 ) 2245 repo = parts[-3] 2246 owner = parts[-4] 2247 2248 return { 2249 "owner": owner, 2250 "repo": repo, 2251 "index": index, 2252 } 2253 except IndexError: 2254 raise ValueError("Malformed design review URL: {}".format(self.pull_request_url)) 2255 2256 @cached_property 2257 def owner_name(self) -> str: 2258 """ 2259 The owner of the repository this review is on. 2260 """ 2261 2262 return self._get_dr_properties()["owner"] 2263 2264 @cached_property 2265 def repository_name(self) -> str: 2266 """ 2267 The name of the repository this review is on. 2268 """ 2269 2270 return self._get_dr_properties()["repo"] 2271 2272 @cached_property 2273 def index(self) -> str: 2274 """ 2275 The index of the design review this review is on. 2276 """ 2277 2278 return self._get_dr_properties()["index"] 2279 2280 def delete(self): 2281 """ 2282 Delete this review. 2283 """ 2284 2285 self.allspice_client.requests_delete( 2286 self.API_OBJECT.format(**self._get_dr_properties(), id=self.id) 2287 ) 2288 self.deleted = True 2289 2290 def get_comments(self) -> List[DesignReviewReviewComment]: 2291 """ 2292 Get the comments on this review. 2293 """ 2294 2295 result = self.allspice_client.requests_get( 2296 self.GET_COMMENTS.format(**self._get_dr_properties(), id=self.id) 2297 ) 2298 2299 return [ 2300 DesignReviewReviewComment.parse_response(self.allspice_client, comment) 2301 for comment in result 2302 ]
A review on a Design Review.
2256 @cached_property 2257 def owner_name(self) -> str: 2258 """ 2259 The owner of the repository this review is on. 2260 """ 2261 2262 return self._get_dr_properties()["owner"]
The owner of the repository this review is on.
2264 @cached_property 2265 def repository_name(self) -> str: 2266 """ 2267 The name of the repository this review is on. 2268 """ 2269 2270 return self._get_dr_properties()["repo"]
The name of the repository this review is on.
2272 @cached_property 2273 def index(self) -> str: 2274 """ 2275 The index of the design review this review is on. 2276 """ 2277 2278 return self._get_dr_properties()["index"]
The index of the design review this review is on.
2280 def delete(self): 2281 """ 2282 Delete this review. 2283 """ 2284 2285 self.allspice_client.requests_delete( 2286 self.API_OBJECT.format(**self._get_dr_properties(), id=self.id) 2287 ) 2288 self.deleted = True
Delete this review.
2290 def get_comments(self) -> List[DesignReviewReviewComment]: 2291 """ 2292 Get the comments on this review. 2293 """ 2294 2295 result = self.allspice_client.requests_get( 2296 self.GET_COMMENTS.format(**self._get_dr_properties(), id=self.id) 2297 ) 2298 2299 return [ 2300 DesignReviewReviewComment.parse_response(self.allspice_client, comment) 2301 for comment in result 2302 ]
Get the comments on this review.
Inherited Members
2200 @dataclass 2201 class ReviewComment: 2202 """ 2203 Data required to create a review comment on a design review. 2204 2205 :param body: The body of the comment. 2206 :param path: The path of the file to comment on. If you have a 2207 `Content` object, get the path using the `path` property. 2208 :param sub_path: The sub-path of the file to comment on. This is 2209 usually the page ID of the page in the multi-page document. 2210 :param new_position: The line number of the source code file after the 2211 change to add this comment on. Optional, leave unset if this is an ECAD 2212 file or the comment must be on the entire file. 2213 :param old_position: The line number of the source code file before the 2214 change to add this comment on. Optional, leave unset if this is an ECAD 2215 file or the comment must be on the entire file. 2216 """ 2217 2218 body: str 2219 path: str 2220 sub_path: Optional[str] = None 2221 new_position: Optional[int] = None 2222 old_position: Optional[int] = None
Data required to create a review comment on a design review.
Parameters
- body: The body of the comment.
- path: The path of the file to comment on. If you have a
Contentobject, get the path using thepathproperty. - sub_path: The sub-path of the file to comment on. This is usually the page ID of the page in the multi-page document.
- new_position: The line number of the source code file after the change to add this comment on. Optional, leave unset if this is an ECAD file or the comment must be on the entire file.
- old_position: The line number of the source code file before the change to add this comment on. Optional, leave unset if this is an ECAD file or the comment must be on the entire file.
1935class Issue(ApiObject): 1936 """ 1937 An issue on a repository. 1938 1939 Note: `Issue.assets` may not have any entries even if the issue has 1940 attachments. This happens when an issue is fetched via a bulk method like 1941 `Repository.get_issues`. In most cases, prefer using 1942 `Issue.get_attachments` to get the attachments on an issue. 1943 """ 1944 1945 assets: List[Union[Any, "Attachment"]] 1946 assignee: Any 1947 assignees: Any 1948 body: str 1949 closed_at: Any 1950 comments: int 1951 created_at: str 1952 due_date: Any 1953 html_url: str 1954 id: int 1955 is_locked: bool 1956 labels: List[Any] 1957 milestone: Optional["Milestone"] 1958 number: int 1959 original_author: str 1960 original_author_id: int 1961 pin_order: int 1962 pull_request: Any 1963 ref: str 1964 repository: Dict[str, Union[int, str]] 1965 state: str 1966 title: str 1967 updated_at: str 1968 url: str 1969 user: User 1970 1971 API_OBJECT = """/repos/{owner}/{repo}/issues/{index}""" # <owner, repo, index> 1972 GET_TIME = """/repos/%s/%s/issues/%s/times""" # <owner, repo, index> 1973 GET_COMMENTS = """/repos/{owner}/{repo}/issues/{index}/comments""" 1974 CREATE_ISSUE = """/repos/{owner}/{repo}/issues""" 1975 GET_ATTACHMENTS = """/repos/{owner}/{repo}/issues/{index}/assets""" 1976 1977 OPENED = "open" 1978 CLOSED = "closed" 1979 1980 def __init__(self, allspice_client): 1981 super().__init__(allspice_client) 1982 1983 def __eq__(self, other): 1984 if not isinstance(other, Issue): 1985 return False 1986 return self.repository == other.repository and self.id == other.id 1987 1988 def __hash__(self): 1989 return hash(self.repository) ^ hash(self.id) 1990 1991 _fields_to_parsers: ClassVar[dict] = { 1992 "milestone": lambda allspice_client, m: Milestone.parse_response(allspice_client, m), 1993 "user": lambda allspice_client, u: User.parse_response(allspice_client, u), 1994 "assets": lambda allspice_client, assets: [ 1995 Attachment.parse_response(allspice_client, a) for a in assets 1996 ], 1997 "assignee": lambda allspice_client, u: User.parse_response(allspice_client, u), 1998 "assignees": lambda allspice_client, us: [ 1999 User.parse_response(allspice_client, u) for u in us 2000 ], 2001 "state": lambda _, s: Issue.CLOSED if s == "closed" else Issue.OPENED, 2002 } 2003 2004 _parsers_to_fields: ClassVar[dict] = { 2005 "milestone": lambda m: m.id, 2006 } 2007 2008 _patchable_fields: ClassVar[set[str]] = { 2009 "assignee", 2010 "assignees", 2011 "body", 2012 "due_date", 2013 "milestone", 2014 "state", 2015 "title", 2016 } 2017 2018 def commit(self): 2019 args = { 2020 "owner": self.repository.owner.username, 2021 "repo": self.repository.name, 2022 "index": self.number, 2023 } 2024 self._commit(args) 2025 2026 @classmethod 2027 def request(cls, allspice_client, owner: str, repo: str, number: str): 2028 api_object = cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number}) 2029 # The repository in the response is a RepositoryMeta object, so request 2030 # the full repository object and add it to the issue object. 2031 repository = Repository.request(allspice_client, owner, repo) 2032 setattr(api_object, "_repository", repository) 2033 # For legacy reasons 2034 cls._add_read_property("repo", repository, api_object) 2035 return api_object 2036 2037 @classmethod 2038 def create_issue(cls, allspice_client, repo: Repository, title: str, body: str = ""): 2039 args = {"owner": repo.owner.username, "repo": repo.name} 2040 data = {"title": title, "body": body} 2041 result = allspice_client.requests_post(Issue.CREATE_ISSUE.format(**args), data=data) 2042 issue = Issue.parse_response(allspice_client, result) 2043 setattr(issue, "_repository", repo) 2044 cls._add_read_property("repo", repo, issue) 2045 return issue 2046 2047 @property 2048 def owner(self) -> Organization | User: 2049 return self.repository.owner 2050 2051 def get_time_sum(self, user: User) -> int: 2052 results = self.allspice_client.requests_get( 2053 Issue.GET_TIME % (self.owner.username, self.repository.name, self.number) 2054 ) 2055 return sum(result["time"] for result in results if result and result["user_id"] == user.id) 2056 2057 def get_times(self) -> Optional[Dict]: 2058 return self.allspice_client.requests_get( 2059 Issue.GET_TIME % (self.owner.username, self.repository.name, self.number) 2060 ) 2061 2062 def delete_time(self, time_id: str): 2063 path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times/{time_id}" 2064 self.allspice_client.requests_delete(path) 2065 2066 def add_time(self, time: int, created: Optional[str] = None, user_name: Optional[User] = None): 2067 path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times" 2068 self.allspice_client.requests_post( 2069 path, data={"created": created, "time": int(time), "user_name": user_name} 2070 ) 2071 2072 def get_comments(self) -> List[Comment]: 2073 """https://hub.allspice.io/api/swagger#/issue/issueGetComments""" 2074 2075 results = self.allspice_client.requests_get( 2076 self.GET_COMMENTS.format( 2077 owner=self.owner.username, repo=self.repository.name, index=self.number 2078 ) 2079 ) 2080 2081 return [Comment.parse_response(self.allspice_client, result) for result in results] 2082 2083 def create_comment(self, body: str) -> Comment: 2084 """https://hub.allspice.io/api/swagger#/issue/issueCreateComment""" 2085 2086 path = self.GET_COMMENTS.format( 2087 owner=self.owner.username, repo=self.repository.name, index=self.number 2088 ) 2089 2090 response = self.allspice_client.requests_post(path, data={"body": body}) 2091 return Comment.parse_response(self.allspice_client, response) 2092 2093 def get_attachments(self) -> List[Attachment]: 2094 """ 2095 Fetch all attachments on this issue. 2096 2097 Unlike the assets field, this will always fetch all attachments from the 2098 API. 2099 2100 See https://hub.allspice.io/api/swagger#/issue/issueListIssueAttachments 2101 """ 2102 2103 path = self.GET_ATTACHMENTS.format( 2104 owner=self.owner.username, repo=self.repository.name, index=self.number 2105 ) 2106 response = self.allspice_client.requests_get(path) 2107 2108 return [Attachment.parse_response(self.allspice_client, result) for result in response] 2109 2110 def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment: 2111 """ 2112 Create an attachment on this issue. 2113 2114 https://hub.allspice.io/api/swagger#/issue/issueCreateIssueAttachment 2115 2116 :param file: The file to attach. This should be a file-like object. 2117 :param name: The name of the file. If not provided, the name of the file will be 2118 used. 2119 :return: The created attachment. 2120 """ 2121 2122 args: dict[str, Any] = { 2123 "files": {"attachment": file}, 2124 } 2125 if name is not None: 2126 args["params"] = {"name": name} 2127 2128 result = self.allspice_client.requests_post( 2129 self.GET_ATTACHMENTS.format( 2130 owner=self.owner.username, repo=self.repository.name, index=self.number 2131 ), 2132 **args, 2133 ) 2134 2135 return Attachment.parse_response(self.allspice_client, result)
An issue on a repository.
Note: Issue.assets may not have any entries even if the issue has
attachments. This happens when an issue is fetched via a bulk method like
Repository.get_issues. In most cases, prefer using
Issue.get_attachments to get the attachments on an issue.
2026 @classmethod 2027 def request(cls, allspice_client, owner: str, repo: str, number: str): 2028 api_object = cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number}) 2029 # The repository in the response is a RepositoryMeta object, so request 2030 # the full repository object and add it to the issue object. 2031 repository = Repository.request(allspice_client, owner, repo) 2032 setattr(api_object, "_repository", repository) 2033 # For legacy reasons 2034 cls._add_read_property("repo", repository, api_object) 2035 return api_object
2037 @classmethod 2038 def create_issue(cls, allspice_client, repo: Repository, title: str, body: str = ""): 2039 args = {"owner": repo.owner.username, "repo": repo.name} 2040 data = {"title": title, "body": body} 2041 result = allspice_client.requests_post(Issue.CREATE_ISSUE.format(**args), data=data) 2042 issue = Issue.parse_response(allspice_client, result) 2043 setattr(issue, "_repository", repo) 2044 cls._add_read_property("repo", repo, issue) 2045 return issue
2066 def add_time(self, time: int, created: Optional[str] = None, user_name: Optional[User] = None): 2067 path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times" 2068 self.allspice_client.requests_post( 2069 path, data={"created": created, "time": int(time), "user_name": user_name} 2070 )
2072 def get_comments(self) -> List[Comment]: 2073 """https://hub.allspice.io/api/swagger#/issue/issueGetComments""" 2074 2075 results = self.allspice_client.requests_get( 2076 self.GET_COMMENTS.format( 2077 owner=self.owner.username, repo=self.repository.name, index=self.number 2078 ) 2079 ) 2080 2081 return [Comment.parse_response(self.allspice_client, result) for result in results]
allspice.allspice.io/api/swagger#/issue/issueGetComments">https://huballspice.allspice.io/api/swagger#/issue/issueGetComments
2083 def create_comment(self, body: str) -> Comment: 2084 """https://hub.allspice.io/api/swagger#/issue/issueCreateComment""" 2085 2086 path = self.GET_COMMENTS.format( 2087 owner=self.owner.username, repo=self.repository.name, index=self.number 2088 ) 2089 2090 response = self.allspice_client.requests_post(path, data={"body": body}) 2091 return Comment.parse_response(self.allspice_client, response)
allspice.allspice.io/api/swagger#/issue/issueCreateComment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateComment
2093 def get_attachments(self) -> List[Attachment]: 2094 """ 2095 Fetch all attachments on this issue. 2096 2097 Unlike the assets field, this will always fetch all attachments from the 2098 API. 2099 2100 See https://hub.allspice.io/api/swagger#/issue/issueListIssueAttachments 2101 """ 2102 2103 path = self.GET_ATTACHMENTS.format( 2104 owner=self.owner.username, repo=self.repository.name, index=self.number 2105 ) 2106 response = self.allspice_client.requests_get(path) 2107 2108 return [Attachment.parse_response(self.allspice_client, result) for result in response]
Fetch all attachments on this issue.
Unlike the assets field, this will always fetch all attachments from the API.
See allspice.allspice.io/api/swagger#/issue/issueListIssueAttachments">https://huballspice.allspice.io/api/swagger#/issue/issueListIssueAttachments
2110 def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment: 2111 """ 2112 Create an attachment on this issue. 2113 2114 https://hub.allspice.io/api/swagger#/issue/issueCreateIssueAttachment 2115 2116 :param file: The file to attach. This should be a file-like object. 2117 :param name: The name of the file. If not provided, the name of the file will be 2118 used. 2119 :return: The created attachment. 2120 """ 2121 2122 args: dict[str, Any] = { 2123 "files": {"attachment": file}, 2124 } 2125 if name is not None: 2126 args["params"] = {"name": name} 2127 2128 result = self.allspice_client.requests_post( 2129 self.GET_ATTACHMENTS.format( 2130 owner=self.owner.username, repo=self.repository.name, index=self.number 2131 ), 2132 **args, 2133 ) 2134 2135 return Attachment.parse_response(self.allspice_client, result)
Create an attachment on this issue.
allspice.allspice.io/api/swagger#/issue/issueCreateIssueAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateIssueAttachment
Parameters
- file: The file to attach. This should be a file-like object.
- name: The name of the file. If not provided, the name of the file will be used.
Returns
The created attachment.
1506class Milestone(ApiObject): 1507 allow_merge_commits: Any 1508 allow_rebase: Any 1509 allow_rebase_explicit: Any 1510 allow_squash_merge: Any 1511 archived: Any 1512 closed_at: Any 1513 closed_issues: int 1514 created_at: str 1515 default_branch: Any 1516 description: str 1517 due_on: Any 1518 has_issues: Any 1519 has_pull_requests: Any 1520 has_wiki: Any 1521 id: int 1522 ignore_whitespace_conflicts: Any 1523 name: Any 1524 open_issues: int 1525 private: Any 1526 state: str 1527 title: str 1528 updated_at: str 1529 website: Any 1530 1531 API_OBJECT = """/repos/{owner}/{repo}/milestones/{number}""" # <owner, repo> 1532 1533 def __init__(self, allspice_client): 1534 super().__init__(allspice_client) 1535 1536 def __eq__(self, other): 1537 if not isinstance(other, Milestone): 1538 return False 1539 return self.allspice_client == other.allspice_client and self.id == other.id 1540 1541 def __hash__(self): 1542 return hash(self.allspice_client) ^ hash(self.id) 1543 1544 _fields_to_parsers: ClassVar[dict] = { 1545 "closed_at": lambda _, t: Util.convert_time(t), 1546 "due_on": lambda _, t: Util.convert_time(t), 1547 } 1548 1549 _patchable_fields: ClassVar[set[str]] = { 1550 "allow_merge_commits", 1551 "allow_rebase", 1552 "allow_rebase_explicit", 1553 "allow_squash_merge", 1554 "archived", 1555 "default_branch", 1556 "description", 1557 "has_issues", 1558 "has_pull_requests", 1559 "has_wiki", 1560 "ignore_whitespace_conflicts", 1561 "name", 1562 "private", 1563 "website", 1564 } 1565 1566 @classmethod 1567 def request(cls, allspice_client, owner: str, repo: str, number: str): 1568 return cls._request(allspice_client, {"owner": owner, "repo": repo, "number": number})
Common base class for all non-exit exceptions.
36class Organization(ApiObject): 37 """see https://hub.allspice.io/api/swagger#/organization/orgGetAll""" 38 39 active: Optional[bool] 40 avatar_url: str 41 created: Optional[str] 42 description: str 43 email: str 44 followers_count: Optional[int] 45 following_count: Optional[int] 46 full_name: str 47 html_url: Optional[str] 48 id: int 49 is_admin: Optional[bool] 50 language: Optional[str] 51 last_login: Optional[str] 52 location: str 53 login: Optional[str] 54 login_name: Optional[str] 55 name: Optional[str] 56 prohibit_login: Optional[bool] 57 repo_admin_change_team_access: Optional[bool] 58 restricted: Optional[bool] 59 source_id: Optional[int] 60 starred_repos_count: Optional[int] 61 username: str 62 visibility: str 63 website: str 64 65 API_OBJECT = """/orgs/{name}""" # <org> 66 ORG_REPOS_REQUEST = """/orgs/%s/repos""" # <org> 67 ORG_TEAMS_REQUEST = """/orgs/%s/teams""" # <org> 68 ORG_TEAMS_CREATE = """/orgs/%s/teams""" # <org> 69 ORG_GET_MEMBERS = """/orgs/%s/members""" # <org> 70 ORG_IS_MEMBER = """/orgs/%s/members/%s""" # <org>, <username> 71 ORG_HEATMAP = """/users/%s/heatmap""" # <username> 72 73 def __init__(self, allspice_client): 74 super().__init__(allspice_client) 75 76 def __eq__(self, other): 77 if not isinstance(other, Organization): 78 return False 79 return self.allspice_client == other.allspice_client and self.name == other.name 80 81 def __hash__(self): 82 return hash(self.allspice_client) ^ hash(self.name) 83 84 @classmethod 85 def request(cls, allspice_client, name: str) -> Self: 86 return cls._request(allspice_client, {"name": name}) 87 88 @classmethod 89 def parse_response(cls, allspice_client, result) -> "Organization": 90 api_object = super().parse_response(allspice_client, result) 91 # add "name" field to make this behave similar to users for gitea < 1.18 92 # also necessary for repository-owner when org is repo owner 93 if not hasattr(api_object, "name"): 94 Organization._add_read_property("name", result["username"], api_object) 95 return api_object 96 97 _patchable_fields: ClassVar[set[str]] = { 98 "description", 99 "full_name", 100 "location", 101 "visibility", 102 "website", 103 } 104 105 def commit(self): 106 args = {"name": self.name} 107 self._commit(args) 108 109 def create_repo( 110 self, 111 repoName: str, 112 description: str = "", 113 private: bool = False, 114 autoInit=True, 115 gitignores: Optional[str] = None, 116 license: Optional[str] = None, 117 readme: str = "Default", 118 issue_labels: Optional[str] = None, 119 default_branch="master", 120 ): 121 """Create an organization Repository 122 123 Throws: 124 AlreadyExistsException: If the Repository exists already. 125 Exception: If something else went wrong. 126 """ 127 result = self.allspice_client.requests_post( 128 f"/orgs/{self.name}/repos", 129 data={ 130 "name": repoName, 131 "description": description, 132 "private": private, 133 "auto_init": autoInit, 134 "gitignores": gitignores, 135 "license": license, 136 "issue_labels": issue_labels, 137 "readme": readme, 138 "default_branch": default_branch, 139 }, 140 ) 141 if "id" in result: 142 self.allspice_client.logger.info("Successfully created Repository %s " % result["name"]) 143 else: 144 self.allspice_client.logger.error(result["message"]) 145 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 146 return Repository.parse_response(self.allspice_client, result) 147 148 def get_repositories(self) -> List["Repository"]: 149 results = self.allspice_client.requests_get_paginated( 150 Organization.ORG_REPOS_REQUEST % self.username 151 ) 152 return [Repository.parse_response(self.allspice_client, result) for result in results] 153 154 def get_repository(self, name) -> "Repository": 155 repos = self.get_repositories() 156 for repo in repos: 157 if repo.name == name: 158 return repo 159 raise NotFoundException("Repository %s not existent in organization." % name) 160 161 def get_teams(self) -> List["Team"]: 162 results = self.allspice_client.requests_get(Organization.ORG_TEAMS_REQUEST % self.username) 163 teams = [Team.parse_response(self.allspice_client, result) for result in results] 164 # organisation seems to be missing using this request, so we add org manually 165 for t in teams: 166 setattr(t, "_organization", self) 167 return teams 168 169 def get_team(self, name) -> "Team": 170 teams = self.get_teams() 171 for team in teams: 172 if team.name == name: 173 return team 174 raise NotFoundException("Team not existent in organization.") 175 176 def create_team( 177 self, 178 name: str, 179 description: str = "", 180 permission: str = "read", 181 can_create_org_repo: bool = False, 182 includes_all_repositories: bool = False, 183 units=( 184 "repo.code", 185 "repo.issues", 186 "repo.ext_issues", 187 "repo.wiki", 188 "repo.pulls", 189 "repo.releases", 190 "repo.ext_wiki", 191 ), 192 units_map={}, 193 ) -> "Team": 194 """Alias for AllSpice#create_team""" 195 # TODO: Move AllSpice#create_team to Organization#create_team and 196 # deprecate AllSpice#create_team. 197 return self.allspice_client.create_team( 198 org=self, 199 name=name, 200 description=description, 201 permission=permission, 202 can_create_org_repo=can_create_org_repo, 203 includes_all_repositories=includes_all_repositories, 204 units=units, 205 units_map=units_map, 206 ) 207 208 def get_members(self) -> List["User"]: 209 results = self.allspice_client.requests_get(Organization.ORG_GET_MEMBERS % self.username) 210 return [User.parse_response(self.allspice_client, result) for result in results] 211 212 def is_member(self, username) -> bool: 213 if isinstance(username, User): 214 username = username.username 215 try: 216 # returns 204 if its ok, 404 if its not 217 self.allspice_client.requests_get( 218 Organization.ORG_IS_MEMBER % (self.username, username) 219 ) 220 return True 221 except Exception: 222 return False 223 224 def remove_member(self, user: "User"): 225 path = f"/orgs/{self.username}/members/{user.username}" 226 self.allspice_client.requests_delete(path) 227 228 def delete(self): 229 """Delete this Organization. Invalidates this Objects data. Also deletes all Repositories owned by the User""" 230 for repo in self.get_repositories(): 231 repo.delete() 232 self.allspice_client.requests_delete(Organization.API_OBJECT.format(name=self.username)) 233 self.deleted = True 234 235 def get_heatmap(self) -> List[Tuple[datetime, int]]: 236 results = self.allspice_client.requests_get(User.USER_HEATMAP % self.username) 237 results = [ 238 (datetime.fromtimestamp(result["timestamp"]), result["contributions"]) 239 for result in results 240 ] 241 return results
see allspice.allspice.io/api/swagger#/organization/orgGetAll">https://huballspice.allspice.io/api/swagger#/organization/orgGetAll
88 @classmethod 89 def parse_response(cls, allspice_client, result) -> "Organization": 90 api_object = super().parse_response(allspice_client, result) 91 # add "name" field to make this behave similar to users for gitea < 1.18 92 # also necessary for repository-owner when org is repo owner 93 if not hasattr(api_object, "name"): 94 Organization._add_read_property("name", result["username"], api_object) 95 return api_object
109 def create_repo( 110 self, 111 repoName: str, 112 description: str = "", 113 private: bool = False, 114 autoInit=True, 115 gitignores: Optional[str] = None, 116 license: Optional[str] = None, 117 readme: str = "Default", 118 issue_labels: Optional[str] = None, 119 default_branch="master", 120 ): 121 """Create an organization Repository 122 123 Throws: 124 AlreadyExistsException: If the Repository exists already. 125 Exception: If something else went wrong. 126 """ 127 result = self.allspice_client.requests_post( 128 f"/orgs/{self.name}/repos", 129 data={ 130 "name": repoName, 131 "description": description, 132 "private": private, 133 "auto_init": autoInit, 134 "gitignores": gitignores, 135 "license": license, 136 "issue_labels": issue_labels, 137 "readme": readme, 138 "default_branch": default_branch, 139 }, 140 ) 141 if "id" in result: 142 self.allspice_client.logger.info("Successfully created Repository %s " % result["name"]) 143 else: 144 self.allspice_client.logger.error(result["message"]) 145 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 146 return Repository.parse_response(self.allspice_client, result)
Create an organization Repository
Throws: AlreadyExistsException: If the Repository exists already. Exception: If something else went wrong.
161 def get_teams(self) -> List["Team"]: 162 results = self.allspice_client.requests_get(Organization.ORG_TEAMS_REQUEST % self.username) 163 teams = [Team.parse_response(self.allspice_client, result) for result in results] 164 # organisation seems to be missing using this request, so we add org manually 165 for t in teams: 166 setattr(t, "_organization", self) 167 return teams
176 def create_team( 177 self, 178 name: str, 179 description: str = "", 180 permission: str = "read", 181 can_create_org_repo: bool = False, 182 includes_all_repositories: bool = False, 183 units=( 184 "repo.code", 185 "repo.issues", 186 "repo.ext_issues", 187 "repo.wiki", 188 "repo.pulls", 189 "repo.releases", 190 "repo.ext_wiki", 191 ), 192 units_map={}, 193 ) -> "Team": 194 """Alias for AllSpice#create_team""" 195 # TODO: Move AllSpice#create_team to Organization#create_team and 196 # deprecate AllSpice#create_team. 197 return self.allspice_client.create_team( 198 org=self, 199 name=name, 200 description=description, 201 permission=permission, 202 can_create_org_repo=can_create_org_repo, 203 includes_all_repositories=includes_all_repositories, 204 units=units, 205 units_map=units_map, 206 )
Alias for AllSpice#create_team
212 def is_member(self, username) -> bool: 213 if isinstance(username, User): 214 username = username.username 215 try: 216 # returns 204 if its ok, 404 if its not 217 self.allspice_client.requests_get( 218 Organization.ORG_IS_MEMBER % (self.username, username) 219 ) 220 return True 221 except Exception: 222 return False
228 def delete(self): 229 """Delete this Organization. Invalidates this Objects data. Also deletes all Repositories owned by the User""" 230 for repo in self.get_repositories(): 231 repo.delete() 232 self.allspice_client.requests_delete(Organization.API_OBJECT.format(name=self.username)) 233 self.deleted = True
Delete this Organization. Invalidates this Objects data. Also deletes all Repositories owned by the User
2690class Release(ApiObject): 2691 """ 2692 A release on a repo. 2693 """ 2694 2695 assets: List[Union[Any, Dict[str, Union[int, str]], "ReleaseAsset"]] 2696 author: User 2697 body: str 2698 created_at: str 2699 draft: bool 2700 html_url: str 2701 id: int 2702 name: str 2703 prerelease: bool 2704 published_at: str 2705 repo: Optional["Repository"] 2706 repository: Optional["Repository"] 2707 tag_name: str 2708 tarball_url: str 2709 target_commitish: str 2710 upload_url: str 2711 url: str 2712 zipball_url: str 2713 2714 API_OBJECT = "/repos/{owner}/{repo}/releases/{id}" 2715 RELEASE_CREATE_ASSET = "/repos/{owner}/{repo}/releases/{id}/assets" 2716 # Note that we don't strictly need the get_assets route, as the release 2717 # object already contains the assets. 2718 2719 def __init__(self, allspice_client): 2720 super().__init__(allspice_client) 2721 2722 def __eq__(self, other): 2723 if not isinstance(other, Release): 2724 return False 2725 return self.repo == other.repo and self.id == other.id 2726 2727 def __hash__(self): 2728 return hash(self.repo) ^ hash(self.id) 2729 2730 _fields_to_parsers: ClassVar[dict] = { 2731 "author": lambda allspice_client, author: User.parse_response(allspice_client, author), 2732 } 2733 _patchable_fields: ClassVar[set[str]] = { 2734 "body", 2735 "draft", 2736 "name", 2737 "prerelease", 2738 "tag_name", 2739 "target_commitish", 2740 } 2741 2742 @classmethod 2743 def parse_response(cls, allspice_client, result, repo) -> Release: 2744 release = super().parse_response(allspice_client, result) 2745 Release._add_read_property("repository", repo, release) 2746 # For legacy reasons 2747 Release._add_read_property("repo", repo, release) 2748 setattr( 2749 release, 2750 "_assets", 2751 [ 2752 ReleaseAsset.parse_response(allspice_client, asset, release) 2753 for asset in result["assets"] 2754 ], 2755 ) 2756 return release 2757 2758 @classmethod 2759 def request( 2760 cls, 2761 allspice_client, 2762 owner: str, 2763 repo: str, 2764 id: Optional[int] = None, 2765 ) -> Release: 2766 args = {"owner": owner, "repo": repo, "id": id} 2767 release_response = cls._get_gitea_api_object(allspice_client, args) 2768 repository = Repository.request(allspice_client, owner, repo) 2769 release = cls.parse_response(allspice_client, release_response, repository) 2770 return release 2771 2772 def commit(self): 2773 if self.repo is None: 2774 raise ValueError("Cannot commit a release without a repository.") 2775 2776 args = {"owner": self.repo.owner.username, "repo": self.repo.name, "id": self.id} 2777 self._commit(args) 2778 2779 def create_asset(self, file: IO, name: Optional[str] = None) -> ReleaseAsset: 2780 """ 2781 Create an asset for this release. 2782 2783 https://hub.allspice.io/api/swagger#/repository/repoCreateReleaseAsset 2784 2785 :param file: The file to upload. This should be a file-like object. 2786 :param name: The name of the file. 2787 :return: The created asset. 2788 """ 2789 2790 if self.repo is None: 2791 raise ValueError("Cannot commit a release without a repository.") 2792 2793 args: dict[str, Any] = {"files": {"attachment": file}} 2794 if name is not None: 2795 args["params"] = {"name": name} 2796 2797 result = self.allspice_client.requests_post( 2798 self.RELEASE_CREATE_ASSET.format( 2799 owner=self.repo.owner.username, 2800 repo=self.repo.name, 2801 id=self.id, 2802 ), 2803 **args, 2804 ) 2805 return ReleaseAsset.parse_response(self.allspice_client, result, self) 2806 2807 def delete(self): 2808 if self.repo is None: 2809 raise ValueError("Cannot commit a release without a repository.") 2810 2811 args = {"owner": self.repo.owner.username, "repo": self.repo.name, "id": self.id} 2812 self.allspice_client.requests_delete(self.API_OBJECT.format(**args)) 2813 self.deleted = True
A release on a repo.
2742 @classmethod 2743 def parse_response(cls, allspice_client, result, repo) -> Release: 2744 release = super().parse_response(allspice_client, result) 2745 Release._add_read_property("repository", repo, release) 2746 # For legacy reasons 2747 Release._add_read_property("repo", repo, release) 2748 setattr( 2749 release, 2750 "_assets", 2751 [ 2752 ReleaseAsset.parse_response(allspice_client, asset, release) 2753 for asset in result["assets"] 2754 ], 2755 ) 2756 return release
2758 @classmethod 2759 def request( 2760 cls, 2761 allspice_client, 2762 owner: str, 2763 repo: str, 2764 id: Optional[int] = None, 2765 ) -> Release: 2766 args = {"owner": owner, "repo": repo, "id": id} 2767 release_response = cls._get_gitea_api_object(allspice_client, args) 2768 repository = Repository.request(allspice_client, owner, repo) 2769 release = cls.parse_response(allspice_client, release_response, repository) 2770 return release
2779 def create_asset(self, file: IO, name: Optional[str] = None) -> ReleaseAsset: 2780 """ 2781 Create an asset for this release. 2782 2783 https://hub.allspice.io/api/swagger#/repository/repoCreateReleaseAsset 2784 2785 :param file: The file to upload. This should be a file-like object. 2786 :param name: The name of the file. 2787 :return: The created asset. 2788 """ 2789 2790 if self.repo is None: 2791 raise ValueError("Cannot commit a release without a repository.") 2792 2793 args: dict[str, Any] = {"files": {"attachment": file}} 2794 if name is not None: 2795 args["params"] = {"name": name} 2796 2797 result = self.allspice_client.requests_post( 2798 self.RELEASE_CREATE_ASSET.format( 2799 owner=self.repo.owner.username, 2800 repo=self.repo.name, 2801 id=self.id, 2802 ), 2803 **args, 2804 ) 2805 return ReleaseAsset.parse_response(self.allspice_client, result, self)
Create an asset for this release.
allspice.allspice.io/api/swagger#/repository/repoCreateReleaseAsset">https://huballspice.allspice.io/api/swagger#/repository/repoCreateReleaseAsset
Parameters
- file: The file to upload. This should be a file-like object.
- name: The name of the file.
Returns
The created asset.
2807 def delete(self): 2808 if self.repo is None: 2809 raise ValueError("Cannot commit a release without a repository.") 2810 2811 args = {"owner": self.repo.owner.username, "repo": self.repo.name, "id": self.id} 2812 self.allspice_client.requests_delete(self.API_OBJECT.format(**args)) 2813 self.deleted = True
480class Repository(ApiObject): 481 allow_fast_forward_only_merge: bool 482 allow_manual_merge: Any 483 allow_merge_commits: bool 484 allow_rebase: bool 485 allow_rebase_explicit: bool 486 allow_rebase_update: bool 487 allow_squash_merge: bool 488 archived: bool 489 archived_at: str 490 autodetect_manual_merge: Any 491 avatar_url: str 492 clone_url: str 493 created_at: str 494 default_allow_maintainer_edit: bool 495 default_branch: str 496 default_delete_branch_after_merge: bool 497 default_merge_style: str 498 description: str 499 empty: bool 500 enable_prune: Any 501 external_tracker: Any 502 external_wiki: Any 503 fork: bool 504 forks_count: int 505 full_name: str 506 has_actions: bool 507 has_issues: bool 508 has_packages: bool 509 has_projects: bool 510 has_pull_requests: bool 511 has_releases: bool 512 has_wiki: bool 513 html_url: str 514 id: int 515 ignore_whitespace_conflicts: bool 516 internal: bool 517 internal_tracker: Dict[str, bool] 518 language: str 519 languages_url: str 520 licenses: Any 521 link: str 522 mirror: bool 523 mirror_interval: str 524 mirror_updated: str 525 name: str 526 object_format_name: str 527 open_issues_count: int 528 open_pr_counter: int 529 original_url: str 530 owner: Union["User", "Organization"] 531 parent: Any 532 permissions: Dict[str, bool] 533 private: bool 534 projects_mode: str 535 release_counter: int 536 repo_transfer: Any 537 size: int 538 ssh_url: str 539 stars_count: int 540 template: bool 541 topics: List[Union[Any, str]] 542 updated_at: datetime 543 url: str 544 watchers_count: int 545 website: str 546 547 API_OBJECT = """/repos/{owner}/{name}""" # <owner>, <reponame> 548 REPO_IS_COLLABORATOR = """/repos/%s/%s/collaborators/%s""" # <owner>, <reponame>, <username> 549 REPO_SEARCH = """/repos/search/""" 550 REPO_BRANCHES = """/repos/%s/%s/branches""" # <owner>, <reponame> 551 REPO_BRANCH = """/repos/{owner}/{repo}/branches/{branch}""" 552 REPO_ISSUES = """/repos/{owner}/{repo}/issues""" # <owner, reponame> 553 REPO_DESIGN_REVIEWS = """/repos/{owner}/{repo}/pulls""" 554 REPO_DELETE = """/repos/%s/%s""" # <owner>, <reponame> 555 REPO_TIMES = """/repos/%s/%s/times""" # <owner>, <reponame> 556 REPO_USER_TIME = """/repos/%s/%s/times/%s""" # <owner>, <reponame>, <username> 557 REPO_COMMITS = "/repos/%s/%s/commits" # <owner>, <reponame> 558 REPO_TRANSFER = "/repos/{owner}/{repo}/transfer" 559 REPO_MILESTONES = """/repos/{owner}/{repo}/milestones""" 560 REPO_GET_ARCHIVE = "/repos/{owner}/{repo}/archive/{ref}.{format}" 561 REPO_GET_ALLSPICE_JSON = "/repos/{owner}/{repo}/allspice_generated/json/{content}" 562 REPO_GET_ALLSPICE_SVG = "/repos/{owner}/{repo}/allspice_generated/svg/{content}" 563 REPO_GET_ALLSPICE_PROJECT = "/repos/{owner}/{repo}/allspice_generated/project/{content}" 564 REPO_GET_TOPICS = "/repos/{owner}/{repo}/topics" 565 REPO_ADD_TOPIC = "/repos/{owner}/{repo}/topics/{topic}" 566 REPO_GET_RELEASES = "/repos/{owner}/{repo}/releases" 567 REPO_GET_LATEST_RELEASE = "/repos/{owner}/{repo}/releases/latest" 568 REPO_GET_RELEASE_BY_TAG = "/repos/{owner}/{repo}/releases/tags/{tag}" 569 REPO_GET_COMMIT_STATUS = "/repos/{owner}/{repo}/statuses/{sha}" 570 REPO_GET_MEDIA = "/repos/{owner}/{repo}/media/{path}" 571 REPO_GET_TREE = "/repos/{owner}/{repo}/git/trees/{ref}" 572 573 class ArchiveFormat(Enum): 574 """ 575 Archive formats for Repository.get_archive 576 """ 577 578 TAR = "tar.gz" 579 ZIP = "zip" 580 581 class CommitStatusSort(Enum): 582 """ 583 Sort order for Repository.get_commit_status 584 """ 585 586 OLDEST = "oldest" 587 RECENT_UPDATE = "recentupdate" 588 LEAST_UPDATE = "leastupdate" 589 LEAST_INDEX = "leastindex" 590 HIGHEST_INDEX = "highestindex" 591 592 def __init__(self, allspice_client): 593 super().__init__(allspice_client) 594 595 def __eq__(self, other): 596 if not isinstance(other, Repository): 597 return False 598 return self.owner == other.owner and self.name == other.name 599 600 def __hash__(self): 601 return hash(self.owner) ^ hash(self.name) 602 603 _fields_to_parsers: ClassVar[dict] = { 604 # dont know how to tell apart user and org as owner except form email being empty. 605 "owner": lambda allspice_client, r: ( 606 Organization.parse_response(allspice_client, r) 607 if r["email"] == "" 608 else User.parse_response(allspice_client, r) 609 ), 610 "updated_at": lambda _, t: Util.convert_time(t), 611 } 612 613 @classmethod 614 def request( 615 cls, 616 allspice_client, 617 owner: str, 618 name: str, 619 ) -> Repository: 620 return cls._request(allspice_client, {"owner": owner, "name": name}) 621 622 @classmethod 623 def search( 624 cls, 625 allspice_client, 626 query: Optional[str] = None, 627 topic: bool = False, 628 include_description: bool = False, 629 user: Optional[User] = None, 630 owner_to_prioritize: Union[User, Organization, None] = None, 631 ) -> list[Repository]: 632 """ 633 Search for repositories. 634 635 See https://hub.allspice.io/api/swagger#/repository/repoSearch 636 637 :param query: The query string to search for 638 :param topic: If true, the query string will only be matched against the 639 repository's topic. 640 :param include_description: If true, the query string will be matched 641 against the repository's description as well. 642 :param user: If specified, only repositories that this user owns or 643 contributes to will be searched. 644 :param owner_to_prioritize: If specified, repositories owned by the 645 given entity will be prioritized in the search. 646 :returns: All repositories matching the query. If there are many 647 repositories matching this query, this may take some time. 648 """ 649 650 params = {} 651 652 if query is not None: 653 params["q"] = query 654 if topic: 655 params["topic"] = topic 656 if include_description: 657 params["include_description"] = include_description 658 if user is not None: 659 params["user"] = user.id 660 if owner_to_prioritize is not None: 661 params["owner_to_prioritize"] = owner_to_prioritize.id 662 663 responses = allspice_client.requests_get_paginated(cls.REPO_SEARCH, params=params) 664 665 return [Repository.parse_response(allspice_client, response) for response in responses] 666 667 _patchable_fields: ClassVar[set[str]] = { 668 "allow_manual_merge", 669 "allow_merge_commits", 670 "allow_rebase", 671 "allow_rebase_explicit", 672 "allow_rebase_update", 673 "allow_squash_merge", 674 "archived", 675 "autodetect_manual_merge", 676 "default_branch", 677 "default_delete_branch_after_merge", 678 "default_merge_style", 679 "description", 680 "enable_prune", 681 "external_tracker", 682 "external_wiki", 683 "has_actions", 684 "has_issues", 685 "has_projects", 686 "has_pull_requests", 687 "has_wiki", 688 "ignore_whitespace_conflicts", 689 "internal_tracker", 690 "mirror_interval", 691 "name", 692 "private", 693 "template", 694 "website", 695 } 696 697 def commit(self): 698 args = {"owner": self.owner.username, "name": self.name} 699 self._commit(args) 700 701 def get_branches(self) -> List["Branch"]: 702 """Get all the Branches of this Repository.""" 703 704 results = self.allspice_client.requests_get_paginated( 705 Repository.REPO_BRANCHES % (self.owner.username, self.name) 706 ) 707 return [Branch.parse_response(self.allspice_client, result) for result in results] 708 709 def get_branch(self, name: str) -> "Branch": 710 """Get a specific Branch of this Repository.""" 711 result = self.allspice_client.requests_get( 712 Repository.REPO_BRANCH.format(owner=self.owner.username, repo=self.name, branch=name) 713 ) 714 return Branch.parse_response(self.allspice_client, result) 715 716 def add_branch(self, create_from: Ref, newname: str) -> "Branch": 717 """Add a branch to the repository""" 718 # Note: will only work with gitea 1.13 or higher! 719 720 ref_name = Util.data_params_for_ref(create_from) 721 if "ref" not in ref_name: 722 raise ValueError("create_from must be a Branch, Commit or string") 723 ref_name = ref_name["ref"] 724 725 data = {"new_branch_name": newname, "old_ref_name": ref_name} 726 result = self.allspice_client.requests_post( 727 Repository.REPO_BRANCHES % (self.owner.username, self.name), data=data 728 ) 729 return Branch.parse_response(self.allspice_client, result) 730 731 def get_issues( 732 self, 733 state: Literal["open", "closed", "all"] = "all", 734 search_query: Optional[str] = None, 735 labels: Optional[List[str]] = None, 736 milestones: Optional[List[Union[Milestone, str]]] = None, 737 assignee: Optional[Union[User, str]] = None, 738 since: Optional[datetime] = None, 739 before: Optional[datetime] = None, 740 ) -> List["Issue"]: 741 """ 742 Get all Issues of this Repository (open and closed) 743 744 https://hub.allspice.io/api/swagger#/repository/repoListIssues 745 746 All params of this method are optional filters. If you don't specify a filter, it 747 will not be applied. 748 749 :param state: The state of the Issues to get. If None, all Issues are returned. 750 :param search_query: Filter issues by text. This is equivalent to searching for 751 `search_query` in the Issues on the web interface. 752 :param labels: Filter issues by labels. 753 :param milestones: Filter issues by milestones. 754 :param assignee: Filter issues by the assigned user. 755 :param since: Filter issues by the date they were created. 756 :param before: Filter issues by the date they were created. 757 :return: A list of Issues. 758 """ 759 760 data = { 761 "state": state, 762 } 763 if search_query: 764 data["q"] = search_query 765 if labels: 766 data["labels"] = ",".join(labels) 767 if milestones: 768 data["milestone"] = ",".join( 769 [ 770 milestone.name if isinstance(milestone, Milestone) else milestone 771 for milestone in milestones 772 ] 773 ) 774 if assignee: 775 if isinstance(assignee, User): 776 data["assignee"] = assignee.username 777 else: 778 data["assignee"] = assignee 779 if since: 780 data["since"] = Util.format_time(since) 781 if before: 782 data["before"] = Util.format_time(before) 783 784 results = self.allspice_client.requests_get_paginated( 785 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 786 params=data, 787 ) 788 789 issues = [] 790 for result in results: 791 issue = Issue.parse_response(self.allspice_client, result) 792 # See Issue.request 793 setattr(issue, "_repository", self) 794 # This is mostly for compatibility with an older implementation 795 Issue._add_read_property("repo", self, issue) 796 issues.append(issue) 797 798 return issues 799 800 def get_design_reviews( 801 self, 802 state: Literal["open", "closed", "all"] = "all", 803 milestone: Optional[Union[Milestone, str]] = None, 804 labels: Optional[List[str]] = None, 805 ) -> List["DesignReview"]: 806 """ 807 Get all Design Reviews of this Repository. 808 809 https://hub.allspice.io/api/swagger#/repository/repoListPullRequests 810 811 :param state: The state of the Design Reviews to get. If None, all Design Reviews 812 are returned. 813 :param milestone: The milestone of the Design Reviews to get. 814 :param labels: A list of label IDs to filter DRs by. 815 :return: A list of Design Reviews. 816 """ 817 818 params = { 819 "state": state, 820 } 821 if milestone: 822 if isinstance(milestone, Milestone): 823 params["milestone"] = milestone.name 824 else: 825 params["milestone"] = milestone 826 if labels: 827 params["labels"] = ",".join(labels) 828 829 results = self.allspice_client.requests_get_paginated( 830 self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name), 831 params=params, 832 ) 833 return [DesignReview.parse_response(self.allspice_client, result) for result in results] 834 835 def get_commits( 836 self, 837 sha: Optional[str] = None, 838 path: Optional[str] = None, 839 stat: bool = True, 840 ) -> List["Commit"]: 841 """ 842 Get all the Commits of this Repository. 843 844 https://hub.allspice.io/api/swagger#/repository/repoGetAllCommits 845 846 :param sha: The SHA of the commit to start listing commits from. 847 :param path: filepath of a file/dir. 848 :param stat: Include the number of additions and deletions in the response. 849 Disable for speedup. 850 :return: A list of Commits. 851 """ 852 853 data = {} 854 if sha: 855 data["sha"] = sha 856 if path: 857 data["path"] = path 858 if not stat: 859 data["stat"] = False 860 861 try: 862 results = self.allspice_client.requests_get_paginated( 863 Repository.REPO_COMMITS % (self.owner.username, self.name), 864 params=data, 865 ) 866 except ConflictException as err: 867 logging.warning(err) 868 logging.warning("Repository %s/%s is Empty" % (self.owner.username, self.name)) 869 results = [] 870 return [Commit.parse_response(self.allspice_client, result) for result in results] 871 872 def get_issues_state(self, state) -> List["Issue"]: 873 """ 874 DEPRECATED: Use get_issues() instead. 875 876 Get issues of state Issue.open or Issue.closed of a repository. 877 """ 878 879 assert state in [Issue.OPENED, Issue.CLOSED] 880 issues = [] 881 data = {"state": state} 882 results = self.allspice_client.requests_get_paginated( 883 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 884 params=data, 885 ) 886 for result in results: 887 issue = Issue.parse_response(self.allspice_client, result) 888 # adding data not contained in the issue response 889 # See Issue.request() 890 setattr(issue, "_repository", self) 891 Issue._add_read_property("repo", self, issue) 892 Issue._add_read_property("owner", self.owner, issue) 893 issues.append(issue) 894 return issues 895 896 def get_times(self): 897 results = self.allspice_client.requests_get( 898 Repository.REPO_TIMES % (self.owner.username, self.name) 899 ) 900 return results 901 902 def get_user_time(self, username) -> float: 903 if isinstance(username, User): 904 username = username.username 905 results = self.allspice_client.requests_get( 906 Repository.REPO_USER_TIME % (self.owner.username, self.name, username) 907 ) 908 time = sum(r["time"] for r in results) 909 return time 910 911 def get_full_name(self) -> str: 912 return self.owner.username + "/" + self.name 913 914 def create_issue(self, title, assignees=frozenset(), description="") -> ApiObject: 915 data = { 916 "assignees": assignees, 917 "body": description, 918 "closed": False, 919 "title": title, 920 } 921 result = self.allspice_client.requests_post( 922 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 923 data=data, 924 ) 925 926 issue = Issue.parse_response(self.allspice_client, result) 927 setattr(issue, "_repository", self) 928 Issue._add_read_property("repo", self, issue) 929 return issue 930 931 def create_design_review( 932 self, 933 title: str, 934 head: Union[Branch, str], 935 base: Union[Branch, str], 936 assignees: Optional[Set[Union[User, str]]] = None, 937 body: Optional[str] = None, 938 due_date: Optional[datetime] = None, 939 milestone: Optional["Milestone"] = None, 940 ) -> "DesignReview": 941 """ 942 Create a new Design Review. 943 944 See https://hub.allspice.io/api/swagger#/repository/repoCreatePullRequest 945 946 :param title: Title of the Design Review 947 :param head: Branch or name of the branch to merge into the base branch 948 :param base: Branch or name of the branch to merge into 949 :param assignees: Optional. A list of users to assign this review. List can be of 950 User objects or of usernames. 951 :param body: An Optional Description for the Design Review. 952 :param due_date: An Optional Due date for the Design Review. 953 :param milestone: An Optional Milestone for the Design Review 954 :return: The created Design Review 955 """ 956 957 data: dict[str, Any] = { 958 "title": title, 959 } 960 961 if isinstance(head, Branch): 962 data["head"] = head.name 963 else: 964 data["head"] = head 965 if isinstance(base, Branch): 966 data["base"] = base.name 967 else: 968 data["base"] = base 969 if assignees: 970 data["assignees"] = [a.username if isinstance(a, User) else a for a in assignees] 971 if body: 972 data["body"] = body 973 if due_date: 974 data["due_date"] = Util.format_time(due_date) 975 if milestone: 976 data["milestone"] = milestone.id 977 978 result = self.allspice_client.requests_post( 979 self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name), 980 data=data, 981 ) 982 983 return DesignReview.parse_response(self.allspice_client, result) 984 985 def create_milestone( 986 self, 987 title: str, 988 description: str, 989 due_date: Optional[str] = None, 990 state: str = "open", 991 ) -> "Milestone": 992 url = Repository.REPO_MILESTONES.format(owner=self.owner.username, repo=self.name) 993 data = {"title": title, "description": description, "state": state} 994 if due_date: 995 data["due_date"] = due_date 996 result = self.allspice_client.requests_post(url, data=data) 997 return Milestone.parse_response(self.allspice_client, result) 998 999 def create_gitea_hook(self, hook_url: str, events: List[str]): 1000 url = f"/repos/{self.owner.username}/{self.name}/hooks" 1001 data = { 1002 "type": "gitea", 1003 "config": {"content_type": "json", "url": hook_url}, 1004 "events": events, 1005 "active": True, 1006 } 1007 return self.allspice_client.requests_post(url, data=data) 1008 1009 def list_hooks(self): 1010 url = f"/repos/{self.owner.username}/{self.name}/hooks" 1011 return self.allspice_client.requests_get(url) 1012 1013 def delete_hook(self, id: str): 1014 url = f"/repos/{self.owner.username}/{self.name}/hooks/{id}" 1015 self.allspice_client.requests_delete(url) 1016 1017 def is_collaborator(self, username) -> bool: 1018 if isinstance(username, User): 1019 username = username.username 1020 try: 1021 # returns 204 if its ok, 404 if its not 1022 self.allspice_client.requests_get( 1023 Repository.REPO_IS_COLLABORATOR % (self.owner.username, self.name, username) 1024 ) 1025 return True 1026 except Exception: 1027 return False 1028 1029 def get_users_with_access(self) -> Sequence[User]: 1030 url = f"/repos/{self.owner.username}/{self.name}/collaborators" 1031 response = self.allspice_client.requests_get(url) 1032 collabs = [User.parse_response(self.allspice_client, user) for user in response] 1033 if isinstance(self.owner, User): 1034 return [*collabs, self.owner] 1035 else: 1036 # owner must be org 1037 teams = self.owner.get_teams() 1038 for team in teams: 1039 team_repos = team.get_repos() 1040 if self.name in [n.name for n in team_repos]: 1041 collabs += team.get_members() 1042 return collabs 1043 1044 def remove_collaborator(self, user_name: str): 1045 url = f"/repos/{self.owner.username}/{self.name}/collaborators/{user_name}" 1046 self.allspice_client.requests_delete(url) 1047 1048 def transfer_ownership( 1049 self, 1050 new_owner: Union[User, Organization], 1051 new_teams: Set[Team] | FrozenSet[Team] = frozenset(), 1052 ): 1053 url = Repository.REPO_TRANSFER.format(owner=self.owner.username, repo=self.name) 1054 data: dict[str, Any] = {"new_owner": new_owner.username} 1055 if isinstance(new_owner, Organization): 1056 new_team_ids = [team.id for team in new_teams if team in new_owner.get_teams()] 1057 data["team_ids"] = new_team_ids 1058 self.allspice_client.requests_post(url, data=data) 1059 # TODO: make sure this instance is either updated or discarded 1060 1061 def get_git_content( 1062 self, 1063 ref: Optional["Ref"] = None, 1064 commit: "Optional[Commit]" = None, 1065 ) -> List[Content]: 1066 """ 1067 Get the metadata for all files in the root directory. 1068 1069 https://hub.allspice.io/api/swagger#/repository/repoGetContentsList 1070 1071 :param ref: branch or commit to get content from 1072 :param commit: commit to get content from (deprecated) 1073 """ 1074 url = f"/repos/{self.owner.username}/{self.name}/contents" 1075 data = Util.data_params_for_ref(ref or commit) 1076 1077 result = [ 1078 Content.parse_response(self.allspice_client, f) 1079 for f in self.allspice_client.requests_get(url, data) 1080 ] 1081 return result 1082 1083 def get_tree(self, ref: Optional[Ref] = None, recursive: bool = False) -> List[GitEntry]: 1084 """ 1085 Get the repository's tree on a given ref. 1086 1087 By default, this will only return the top-level entries in the tree. If you want 1088 to get the entire tree, set `recursive` to True. 1089 1090 :param ref: The ref to get the tree from. If not provided, the default branch is used. 1091 :param recursive: Whether to get the entire tree or just the top-level entries. 1092 """ 1093 1094 ref = Util.data_params_for_ref(ref).get("ref", self.default_branch) 1095 url = self.REPO_GET_TREE.format(owner=self.owner.username, repo=self.name, ref=ref) 1096 params = {"recursive": recursive} 1097 results = self.allspice_client.requests_get_paginated(url, params=params) 1098 return [GitEntry.parse_response(self.allspice_client, result) for result in results] 1099 1100 def get_file_content( 1101 self, 1102 content: Content, 1103 ref: Optional[Ref] = None, 1104 commit: Optional[Commit] = None, 1105 ) -> Union[str, List["Content"]]: 1106 """https://hub.allspice.io/api/swagger#/repository/repoGetContents""" 1107 url = f"/repos/{self.owner.username}/{self.name}/contents/{content.path}" 1108 data = Util.data_params_for_ref(ref or commit) 1109 1110 if content.type == Content.FILE: 1111 return self.allspice_client.requests_get(url, data)["content"] 1112 else: 1113 return [ 1114 Content.parse_response(self.allspice_client, f) 1115 for f in self.allspice_client.requests_get(url, data) 1116 ] 1117 1118 def get_raw_file( 1119 self, 1120 file_path: str, 1121 ref: Optional[Ref] = None, 1122 ) -> bytes: 1123 """ 1124 Get the raw, binary data of a single file. 1125 1126 Note 1: if the file you are requesting is a text file, you might want to 1127 use .decode() on the result to get a string. For example: 1128 1129 content = repo.get_raw_file("file.txt").decode("utf-8") 1130 1131 Note 2: this method will store the entire file in memory. If you want 1132 to download a large file, you might want to use `download_to_file` 1133 instead. 1134 1135 See https://hub.allspice.io/api/swagger#/repository/repoGetRawFileOrLFS 1136 1137 :param file_path: The path to the file to get. 1138 :param ref: The branch or commit to get the file from. If not provided, 1139 the default branch is used. 1140 """ 1141 1142 url = self.REPO_GET_MEDIA.format( 1143 owner=self.owner.username, 1144 repo=self.name, 1145 path=file_path, 1146 ) 1147 params = Util.data_params_for_ref(ref) 1148 return self.allspice_client.requests_get_raw(url, params=params) 1149 1150 def download_to_file( 1151 self, 1152 file_path: str, 1153 io: IO, 1154 ref: Optional[Ref] = None, 1155 ) -> None: 1156 """ 1157 Download the binary data of a file to a file-like object. 1158 1159 Example: 1160 1161 with open("schematic.DSN", "wb") as f: 1162 Repository.download_to_file("Schematics/my_schematic.DSN", f) 1163 1164 :param file_path: The path to the file in the repository from the root 1165 of the repository. 1166 :param io: The file-like object to write the data to. 1167 """ 1168 1169 url = self.allspice_client._AllSpice__get_url( 1170 self.REPO_GET_MEDIA.format( 1171 owner=self.owner.username, 1172 repo=self.name, 1173 path=file_path, 1174 ) 1175 ) 1176 params = Util.data_params_for_ref(ref) 1177 response = self.allspice_client.requests.get( 1178 url, 1179 params=params, 1180 headers=self.allspice_client.headers, 1181 stream=True, 1182 ) 1183 1184 for chunk in response.iter_content(chunk_size=4096): 1185 if chunk: 1186 io.write(chunk) 1187 1188 def get_generated_json( 1189 self, 1190 content: Union[Content, str], 1191 ref: Optional[Ref] = None, 1192 params: Optional[dict] = None, 1193 ) -> dict: 1194 """ 1195 Get the json blob for a cad file if it exists, otherwise enqueue 1196 a new job and return a 503 status. 1197 1198 WARNING: This is still experimental and not recommended for critical 1199 applications. The structure and content of the returned dictionary can 1200 change at any time. 1201 1202 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON 1203 """ 1204 1205 if isinstance(content, Content): 1206 content = content.path 1207 1208 url = self.REPO_GET_ALLSPICE_JSON.format( 1209 owner=self.owner.username, 1210 repo=self.name, 1211 content=content, 1212 ) 1213 data = Util.data_params_for_ref(ref) 1214 if params: 1215 data.update(params) 1216 if self.allspice_client.use_new_schdoc_renderer is not None: 1217 data["use_new_schdoc_renderer"] = ( 1218 "true" if self.allspice_client.use_new_schdoc_renderer else "false" 1219 ) 1220 return self.allspice_client.requests_get(url, data) 1221 1222 def get_generated_svg( 1223 self, 1224 content: Union[Content, str], 1225 ref: Optional[Ref] = None, 1226 params: Optional[dict] = None, 1227 ) -> bytes: 1228 """ 1229 Get the svg blob for a cad file if it exists, otherwise enqueue 1230 a new job and return a 503 status. 1231 1232 WARNING: This is still experimental and not yet recommended for 1233 critical applications. The content of the returned svg can change 1234 at any time. 1235 1236 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG 1237 """ 1238 1239 if isinstance(content, Content): 1240 content = content.path 1241 1242 url = self.REPO_GET_ALLSPICE_SVG.format( 1243 owner=self.owner.username, 1244 repo=self.name, 1245 content=content, 1246 ) 1247 data = Util.data_params_for_ref(ref) 1248 if params: 1249 data.update(params) 1250 if self.allspice_client.use_new_schdoc_renderer is not None: 1251 data["use_new_schdoc_renderer"] = ( 1252 "true" if self.allspice_client.use_new_schdoc_renderer else "false" 1253 ) 1254 return self.allspice_client.requests_get_raw(url, data) 1255 1256 def get_generated_projectdata( 1257 self, 1258 content: Union[Content, str], 1259 ref: Optional[Ref] = None, 1260 params: Optional[dict] = None, 1261 ) -> dict: 1262 """ 1263 Get the json project data based on the cad file provided 1264 1265 WARNING: This is still experimental and not yet recommended for 1266 critical applications. The content of the returned dictionary can change 1267 at any time. 1268 1269 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceProject 1270 """ 1271 if isinstance(content, Content): 1272 content = content.path 1273 1274 url = self.REPO_GET_ALLSPICE_PROJECT.format( 1275 owner=self.owner.username, 1276 repo=self.name, 1277 content=content, 1278 ) 1279 data = Util.data_params_for_ref(ref) 1280 if params: 1281 data.update(params) 1282 return self.allspice_client.requests_get(url, data) 1283 1284 def create_file(self, file_path: str, content: str, data: Optional[dict] = None): 1285 """https://hub.allspice.io/api/swagger#/repository/repoCreateFile""" 1286 if not data: 1287 data = {} 1288 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1289 data.update({"content": content}) 1290 return self.allspice_client.requests_post(url, data) 1291 1292 def change_file(self, file_path: str, file_sha: str, content: str, data: Optional[dict] = None): 1293 """https://hub.allspice.io/api/swagger#/repository/repoCreateFile""" 1294 if not data: 1295 data = {} 1296 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1297 data.update({"sha": file_sha, "content": content}) 1298 return self.allspice_client.requests_put(url, data) 1299 1300 def delete_file(self, file_path: str, file_sha: str, data: Optional[dict] = None): 1301 """https://hub.allspice.io/api/swagger#/repository/repoDeleteFile""" 1302 if not data: 1303 data = {} 1304 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1305 data.update({"sha": file_sha}) 1306 return self.allspice_client.requests_delete(url, data) 1307 1308 def get_archive( 1309 self, 1310 ref: Ref = "main", 1311 archive_format: ArchiveFormat = ArchiveFormat.ZIP, 1312 ) -> bytes: 1313 """ 1314 Download all the files in a specific ref of a repository as a zip or tarball 1315 archive. 1316 1317 https://hub.allspice.io/api/swagger#/repository/repoGetArchive 1318 1319 :param ref: branch or commit to get content from, defaults to the "main" branch 1320 :param archive_format: zip or tar, defaults to zip 1321 """ 1322 1323 ref_string = Util.data_params_for_ref(ref)["ref"] 1324 url = self.REPO_GET_ARCHIVE.format( 1325 owner=self.owner.username, 1326 repo=self.name, 1327 ref=ref_string, 1328 format=archive_format.value, 1329 ) 1330 return self.allspice_client.requests_get_raw(url) 1331 1332 def get_topics(self) -> list[str]: 1333 """ 1334 Gets the list of topics on this repository. 1335 1336 See http://localhost:3000/api/swagger#/repository/repoListTopics 1337 """ 1338 1339 url = self.REPO_GET_TOPICS.format( 1340 owner=self.owner.username, 1341 repo=self.name, 1342 ) 1343 return self.allspice_client.requests_get(url)["topics"] 1344 1345 def add_topic(self, topic: str): 1346 """ 1347 Adds a topic to the repository. 1348 1349 See https://hub.allspice.io/api/swagger#/repository/repoAddTopic 1350 1351 :param topic: The topic to add. Topic names must consist only of 1352 lowercase letters, numnbers and dashes (-), and cannot start with 1353 dashes. Topic names also must be under 35 characters long. 1354 """ 1355 1356 url = self.REPO_ADD_TOPIC.format(owner=self.owner.username, repo=self.name, topic=topic) 1357 self.allspice_client.requests_put(url) 1358 1359 def create_release( 1360 self, 1361 tag_name: str, 1362 name: Optional[str] = None, 1363 body: Optional[str] = None, 1364 draft: bool = False, 1365 ): 1366 """ 1367 Create a release for this repository. The release will be created for 1368 the tag with the given name. If there is no tag with this name, create 1369 the tag first. 1370 1371 See https://hub.allspice.io/api/swagger#/repository/repoCreateRelease 1372 """ 1373 1374 url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name) 1375 data = { 1376 "tag_name": tag_name, 1377 "draft": draft, 1378 } 1379 if name is not None: 1380 data["name"] = name 1381 if body is not None: 1382 data["body"] = body 1383 response = self.allspice_client.requests_post(url, data) 1384 return Release.parse_response(self.allspice_client, response, self) 1385 1386 def get_releases( 1387 self, draft: Optional[bool] = None, pre_release: Optional[bool] = None 1388 ) -> List[Release]: 1389 """ 1390 Get the list of releases for this repository. 1391 1392 See https://hub.allspice.io/api/swagger#/repository/repoListReleases 1393 """ 1394 1395 data = {} 1396 1397 if draft is not None: 1398 data["draft"] = draft 1399 if pre_release is not None: 1400 data["pre-release"] = pre_release 1401 1402 url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name) 1403 responses = self.allspice_client.requests_get_paginated(url, params=data) 1404 1405 return [ 1406 Release.parse_response(self.allspice_client, response, self) for response in responses 1407 ] 1408 1409 def get_latest_release(self) -> Release: 1410 """ 1411 Get the latest release for this repository. 1412 1413 See https://hub.allspice.io/api/swagger#/repository/repoGetLatestRelease 1414 """ 1415 1416 url = self.REPO_GET_LATEST_RELEASE.format(owner=self.owner.username, repo=self.name) 1417 response = self.allspice_client.requests_get(url) 1418 release = Release.parse_response(self.allspice_client, response, self) 1419 return release 1420 1421 def get_release_by_tag(self, tag: str) -> Release: 1422 """ 1423 Get a release by its tag. 1424 1425 See https://hub.allspice.io/api/swagger#/repository/repoGetReleaseByTag 1426 """ 1427 1428 url = self.REPO_GET_RELEASE_BY_TAG.format( 1429 owner=self.owner.username, repo=self.name, tag=tag 1430 ) 1431 response = self.allspice_client.requests_get(url) 1432 release = Release.parse_response(self.allspice_client, response, self) 1433 return release 1434 1435 def get_commit_statuses( 1436 self, 1437 commit: Union[str, Commit], 1438 sort: Optional[CommitStatusSort] = None, 1439 state: Optional[CommitStatusState] = None, 1440 ) -> List[CommitStatus]: 1441 """ 1442 Get a list of statuses for a commit. 1443 1444 This is roughly equivalent to the Commit.get_statuses method, but this 1445 method allows you to sort and filter commits and is more convenient if 1446 you have a commit SHA and don't need to get the commit itself. 1447 1448 See https://hub.allspice.io/api/swagger#/repository/repoListStatuses 1449 """ 1450 1451 if isinstance(commit, Commit): 1452 commit = commit.sha 1453 1454 params = {} 1455 if sort is not None: 1456 params["sort"] = sort.value 1457 if state is not None: 1458 params["state"] = state.value 1459 1460 url = self.REPO_GET_COMMIT_STATUS.format( 1461 owner=self.owner.username, repo=self.name, sha=commit 1462 ) 1463 response = self.allspice_client.requests_get_paginated(url, params=params) 1464 return [CommitStatus.parse_response(self.allspice_client, status) for status in response] 1465 1466 def create_commit_status( 1467 self, 1468 commit: Union[str, Commit], 1469 context: Optional[str] = None, 1470 description: Optional[str] = None, 1471 state: Optional[CommitStatusState] = None, 1472 target_url: Optional[str] = None, 1473 ) -> CommitStatus: 1474 """ 1475 Create a status on a commit. 1476 1477 See https://hub.allspice.io/api/swagger#/repository/repoCreateStatus 1478 """ 1479 1480 if isinstance(commit, Commit): 1481 commit = commit.sha 1482 1483 data = {} 1484 if context is not None: 1485 data["context"] = context 1486 if description is not None: 1487 data["description"] = description 1488 if state is not None: 1489 data["state"] = state.value 1490 if target_url is not None: 1491 data["target_url"] = target_url 1492 1493 url = self.REPO_GET_COMMIT_STATUS.format( 1494 owner=self.owner.username, repo=self.name, sha=commit 1495 ) 1496 response = self.allspice_client.requests_post(url, data=data) 1497 return CommitStatus.parse_response(self.allspice_client, response) 1498 1499 def delete(self): 1500 self.allspice_client.requests_delete( 1501 Repository.REPO_DELETE % (self.owner.username, self.name) 1502 ) 1503 self.deleted = True
622 @classmethod 623 def search( 624 cls, 625 allspice_client, 626 query: Optional[str] = None, 627 topic: bool = False, 628 include_description: bool = False, 629 user: Optional[User] = None, 630 owner_to_prioritize: Union[User, Organization, None] = None, 631 ) -> list[Repository]: 632 """ 633 Search for repositories. 634 635 See https://hub.allspice.io/api/swagger#/repository/repoSearch 636 637 :param query: The query string to search for 638 :param topic: If true, the query string will only be matched against the 639 repository's topic. 640 :param include_description: If true, the query string will be matched 641 against the repository's description as well. 642 :param user: If specified, only repositories that this user owns or 643 contributes to will be searched. 644 :param owner_to_prioritize: If specified, repositories owned by the 645 given entity will be prioritized in the search. 646 :returns: All repositories matching the query. If there are many 647 repositories matching this query, this may take some time. 648 """ 649 650 params = {} 651 652 if query is not None: 653 params["q"] = query 654 if topic: 655 params["topic"] = topic 656 if include_description: 657 params["include_description"] = include_description 658 if user is not None: 659 params["user"] = user.id 660 if owner_to_prioritize is not None: 661 params["owner_to_prioritize"] = owner_to_prioritize.id 662 663 responses = allspice_client.requests_get_paginated(cls.REPO_SEARCH, params=params) 664 665 return [Repository.parse_response(allspice_client, response) for response in responses]
Search for repositories.
See allspice.allspice.io/api/swagger#/repository/repoSearch">https://huballspice.allspice.io/api/swagger#/repository/repoSearch
Parameters
- query: The query string to search for
- topic: If true, the query string will only be matched against the repository's topic.
- include_description: If true, the query string will be matched against the repository's description as well.
- user: If specified, only repositories that this user owns or contributes to will be searched.
- owner_to_prioritize: If specified, repositories owned by the given entity will be prioritized in the search. :returns: All repositories matching the query. If there are many repositories matching this query, this may take some time.
701 def get_branches(self) -> List["Branch"]: 702 """Get all the Branches of this Repository.""" 703 704 results = self.allspice_client.requests_get_paginated( 705 Repository.REPO_BRANCHES % (self.owner.username, self.name) 706 ) 707 return [Branch.parse_response(self.allspice_client, result) for result in results]
Get all the Branches of this Repository.
709 def get_branch(self, name: str) -> "Branch": 710 """Get a specific Branch of this Repository.""" 711 result = self.allspice_client.requests_get( 712 Repository.REPO_BRANCH.format(owner=self.owner.username, repo=self.name, branch=name) 713 ) 714 return Branch.parse_response(self.allspice_client, result)
Get a specific Branch of this Repository.
716 def add_branch(self, create_from: Ref, newname: str) -> "Branch": 717 """Add a branch to the repository""" 718 # Note: will only work with gitea 1.13 or higher! 719 720 ref_name = Util.data_params_for_ref(create_from) 721 if "ref" not in ref_name: 722 raise ValueError("create_from must be a Branch, Commit or string") 723 ref_name = ref_name["ref"] 724 725 data = {"new_branch_name": newname, "old_ref_name": ref_name} 726 result = self.allspice_client.requests_post( 727 Repository.REPO_BRANCHES % (self.owner.username, self.name), data=data 728 ) 729 return Branch.parse_response(self.allspice_client, result)
Add a branch to the repository
731 def get_issues( 732 self, 733 state: Literal["open", "closed", "all"] = "all", 734 search_query: Optional[str] = None, 735 labels: Optional[List[str]] = None, 736 milestones: Optional[List[Union[Milestone, str]]] = None, 737 assignee: Optional[Union[User, str]] = None, 738 since: Optional[datetime] = None, 739 before: Optional[datetime] = None, 740 ) -> List["Issue"]: 741 """ 742 Get all Issues of this Repository (open and closed) 743 744 https://hub.allspice.io/api/swagger#/repository/repoListIssues 745 746 All params of this method are optional filters. If you don't specify a filter, it 747 will not be applied. 748 749 :param state: The state of the Issues to get. If None, all Issues are returned. 750 :param search_query: Filter issues by text. This is equivalent to searching for 751 `search_query` in the Issues on the web interface. 752 :param labels: Filter issues by labels. 753 :param milestones: Filter issues by milestones. 754 :param assignee: Filter issues by the assigned user. 755 :param since: Filter issues by the date they were created. 756 :param before: Filter issues by the date they were created. 757 :return: A list of Issues. 758 """ 759 760 data = { 761 "state": state, 762 } 763 if search_query: 764 data["q"] = search_query 765 if labels: 766 data["labels"] = ",".join(labels) 767 if milestones: 768 data["milestone"] = ",".join( 769 [ 770 milestone.name if isinstance(milestone, Milestone) else milestone 771 for milestone in milestones 772 ] 773 ) 774 if assignee: 775 if isinstance(assignee, User): 776 data["assignee"] = assignee.username 777 else: 778 data["assignee"] = assignee 779 if since: 780 data["since"] = Util.format_time(since) 781 if before: 782 data["before"] = Util.format_time(before) 783 784 results = self.allspice_client.requests_get_paginated( 785 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 786 params=data, 787 ) 788 789 issues = [] 790 for result in results: 791 issue = Issue.parse_response(self.allspice_client, result) 792 # See Issue.request 793 setattr(issue, "_repository", self) 794 # This is mostly for compatibility with an older implementation 795 Issue._add_read_property("repo", self, issue) 796 issues.append(issue) 797 798 return issues
Get all Issues of this Repository (open and closed)
allspice.allspice.io/api/swagger#/repository/repoListIssues">https://huballspice.allspice.io/api/swagger#/repository/repoListIssues
All params of this method are optional filters. If you don't specify a filter, it will not be applied.
Parameters
- state: The state of the Issues to get. If None, all Issues are returned.
- search_query: Filter issues by text. This is equivalent to searching for
search_queryin the Issues on the web interface. - labels: Filter issues by labels.
- milestones: Filter issues by milestones.
- assignee: Filter issues by the assigned user.
- since: Filter issues by the date they were created.
- before: Filter issues by the date they were created.
Returns
A list of Issues.
800 def get_design_reviews( 801 self, 802 state: Literal["open", "closed", "all"] = "all", 803 milestone: Optional[Union[Milestone, str]] = None, 804 labels: Optional[List[str]] = None, 805 ) -> List["DesignReview"]: 806 """ 807 Get all Design Reviews of this Repository. 808 809 https://hub.allspice.io/api/swagger#/repository/repoListPullRequests 810 811 :param state: The state of the Design Reviews to get. If None, all Design Reviews 812 are returned. 813 :param milestone: The milestone of the Design Reviews to get. 814 :param labels: A list of label IDs to filter DRs by. 815 :return: A list of Design Reviews. 816 """ 817 818 params = { 819 "state": state, 820 } 821 if milestone: 822 if isinstance(milestone, Milestone): 823 params["milestone"] = milestone.name 824 else: 825 params["milestone"] = milestone 826 if labels: 827 params["labels"] = ",".join(labels) 828 829 results = self.allspice_client.requests_get_paginated( 830 self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name), 831 params=params, 832 ) 833 return [DesignReview.parse_response(self.allspice_client, result) for result in results]
Get all Design Reviews of this Repository.
allspice.allspice.io/api/swagger#/repository/repoListPullRequests">https://huballspice.allspice.io/api/swagger#/repository/repoListPullRequests
Parameters
- state: The state of the Design Reviews to get. If None, all Design Reviews are returned.
- milestone: The milestone of the Design Reviews to get.
- labels: A list of label IDs to filter DRs by.
Returns
A list of Design Reviews.
835 def get_commits( 836 self, 837 sha: Optional[str] = None, 838 path: Optional[str] = None, 839 stat: bool = True, 840 ) -> List["Commit"]: 841 """ 842 Get all the Commits of this Repository. 843 844 https://hub.allspice.io/api/swagger#/repository/repoGetAllCommits 845 846 :param sha: The SHA of the commit to start listing commits from. 847 :param path: filepath of a file/dir. 848 :param stat: Include the number of additions and deletions in the response. 849 Disable for speedup. 850 :return: A list of Commits. 851 """ 852 853 data = {} 854 if sha: 855 data["sha"] = sha 856 if path: 857 data["path"] = path 858 if not stat: 859 data["stat"] = False 860 861 try: 862 results = self.allspice_client.requests_get_paginated( 863 Repository.REPO_COMMITS % (self.owner.username, self.name), 864 params=data, 865 ) 866 except ConflictException as err: 867 logging.warning(err) 868 logging.warning("Repository %s/%s is Empty" % (self.owner.username, self.name)) 869 results = [] 870 return [Commit.parse_response(self.allspice_client, result) for result in results]
Get all the Commits of this Repository.
allspice.allspice.io/api/swagger#/repository/repoGetAllCommits">https://huballspice.allspice.io/api/swagger#/repository/repoGetAllCommits
Parameters
- sha: The SHA of the commit to start listing commits from.
- path: filepath of a file/dir.
- stat: Include the number of additions and deletions in the response. Disable for speedup.
Returns
A list of Commits.
872 def get_issues_state(self, state) -> List["Issue"]: 873 """ 874 DEPRECATED: Use get_issues() instead. 875 876 Get issues of state Issue.open or Issue.closed of a repository. 877 """ 878 879 assert state in [Issue.OPENED, Issue.CLOSED] 880 issues = [] 881 data = {"state": state} 882 results = self.allspice_client.requests_get_paginated( 883 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 884 params=data, 885 ) 886 for result in results: 887 issue = Issue.parse_response(self.allspice_client, result) 888 # adding data not contained in the issue response 889 # See Issue.request() 890 setattr(issue, "_repository", self) 891 Issue._add_read_property("repo", self, issue) 892 Issue._add_read_property("owner", self.owner, issue) 893 issues.append(issue) 894 return issues
DEPRECATED: Use get_issues() instead.
Get issues of state Issue.open or Issue.closed of a repository.
902 def get_user_time(self, username) -> float: 903 if isinstance(username, User): 904 username = username.username 905 results = self.allspice_client.requests_get( 906 Repository.REPO_USER_TIME % (self.owner.username, self.name, username) 907 ) 908 time = sum(r["time"] for r in results) 909 return time
914 def create_issue(self, title, assignees=frozenset(), description="") -> ApiObject: 915 data = { 916 "assignees": assignees, 917 "body": description, 918 "closed": False, 919 "title": title, 920 } 921 result = self.allspice_client.requests_post( 922 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 923 data=data, 924 ) 925 926 issue = Issue.parse_response(self.allspice_client, result) 927 setattr(issue, "_repository", self) 928 Issue._add_read_property("repo", self, issue) 929 return issue
931 def create_design_review( 932 self, 933 title: str, 934 head: Union[Branch, str], 935 base: Union[Branch, str], 936 assignees: Optional[Set[Union[User, str]]] = None, 937 body: Optional[str] = None, 938 due_date: Optional[datetime] = None, 939 milestone: Optional["Milestone"] = None, 940 ) -> "DesignReview": 941 """ 942 Create a new Design Review. 943 944 See https://hub.allspice.io/api/swagger#/repository/repoCreatePullRequest 945 946 :param title: Title of the Design Review 947 :param head: Branch or name of the branch to merge into the base branch 948 :param base: Branch or name of the branch to merge into 949 :param assignees: Optional. A list of users to assign this review. List can be of 950 User objects or of usernames. 951 :param body: An Optional Description for the Design Review. 952 :param due_date: An Optional Due date for the Design Review. 953 :param milestone: An Optional Milestone for the Design Review 954 :return: The created Design Review 955 """ 956 957 data: dict[str, Any] = { 958 "title": title, 959 } 960 961 if isinstance(head, Branch): 962 data["head"] = head.name 963 else: 964 data["head"] = head 965 if isinstance(base, Branch): 966 data["base"] = base.name 967 else: 968 data["base"] = base 969 if assignees: 970 data["assignees"] = [a.username if isinstance(a, User) else a for a in assignees] 971 if body: 972 data["body"] = body 973 if due_date: 974 data["due_date"] = Util.format_time(due_date) 975 if milestone: 976 data["milestone"] = milestone.id 977 978 result = self.allspice_client.requests_post( 979 self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name), 980 data=data, 981 ) 982 983 return DesignReview.parse_response(self.allspice_client, result)
Create a new Design Review.
See allspice.allspice.io/api/swagger#/repository/repoCreatePullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoCreatePullRequest
Parameters
- title: Title of the Design Review
- head: Branch or name of the branch to merge into the base branch
- base: Branch or name of the branch to merge into
- assignees: Optional. A list of users to assign this review. List can be of User objects or of usernames.
- body: An Optional Description for the Design Review.
- due_date: An Optional Due date for the Design Review.
- milestone: An Optional Milestone for the Design Review
Returns
The created Design Review
985 def create_milestone( 986 self, 987 title: str, 988 description: str, 989 due_date: Optional[str] = None, 990 state: str = "open", 991 ) -> "Milestone": 992 url = Repository.REPO_MILESTONES.format(owner=self.owner.username, repo=self.name) 993 data = {"title": title, "description": description, "state": state} 994 if due_date: 995 data["due_date"] = due_date 996 result = self.allspice_client.requests_post(url, data=data) 997 return Milestone.parse_response(self.allspice_client, result)
999 def create_gitea_hook(self, hook_url: str, events: List[str]): 1000 url = f"/repos/{self.owner.username}/{self.name}/hooks" 1001 data = { 1002 "type": "gitea", 1003 "config": {"content_type": "json", "url": hook_url}, 1004 "events": events, 1005 "active": True, 1006 } 1007 return self.allspice_client.requests_post(url, data=data)
1017 def is_collaborator(self, username) -> bool: 1018 if isinstance(username, User): 1019 username = username.username 1020 try: 1021 # returns 204 if its ok, 404 if its not 1022 self.allspice_client.requests_get( 1023 Repository.REPO_IS_COLLABORATOR % (self.owner.username, self.name, username) 1024 ) 1025 return True 1026 except Exception: 1027 return False
1029 def get_users_with_access(self) -> Sequence[User]: 1030 url = f"/repos/{self.owner.username}/{self.name}/collaborators" 1031 response = self.allspice_client.requests_get(url) 1032 collabs = [User.parse_response(self.allspice_client, user) for user in response] 1033 if isinstance(self.owner, User): 1034 return [*collabs, self.owner] 1035 else: 1036 # owner must be org 1037 teams = self.owner.get_teams() 1038 for team in teams: 1039 team_repos = team.get_repos() 1040 if self.name in [n.name for n in team_repos]: 1041 collabs += team.get_members() 1042 return collabs
1048 def transfer_ownership( 1049 self, 1050 new_owner: Union[User, Organization], 1051 new_teams: Set[Team] | FrozenSet[Team] = frozenset(), 1052 ): 1053 url = Repository.REPO_TRANSFER.format(owner=self.owner.username, repo=self.name) 1054 data: dict[str, Any] = {"new_owner": new_owner.username} 1055 if isinstance(new_owner, Organization): 1056 new_team_ids = [team.id for team in new_teams if team in new_owner.get_teams()] 1057 data["team_ids"] = new_team_ids 1058 self.allspice_client.requests_post(url, data=data) 1059 # TODO: make sure this instance is either updated or discarded
1061 def get_git_content( 1062 self, 1063 ref: Optional["Ref"] = None, 1064 commit: "Optional[Commit]" = None, 1065 ) -> List[Content]: 1066 """ 1067 Get the metadata for all files in the root directory. 1068 1069 https://hub.allspice.io/api/swagger#/repository/repoGetContentsList 1070 1071 :param ref: branch or commit to get content from 1072 :param commit: commit to get content from (deprecated) 1073 """ 1074 url = f"/repos/{self.owner.username}/{self.name}/contents" 1075 data = Util.data_params_for_ref(ref or commit) 1076 1077 result = [ 1078 Content.parse_response(self.allspice_client, f) 1079 for f in self.allspice_client.requests_get(url, data) 1080 ] 1081 return result
Get the metadata for all files in the root directory.
allspice.allspice.io/api/swagger#/repository/repoGetContentsList">https://huballspice.allspice.io/api/swagger#/repository/repoGetContentsList
Parameters
- ref: branch or commit to get content from
- commit: commit to get content from (deprecated)
1083 def get_tree(self, ref: Optional[Ref] = None, recursive: bool = False) -> List[GitEntry]: 1084 """ 1085 Get the repository's tree on a given ref. 1086 1087 By default, this will only return the top-level entries in the tree. If you want 1088 to get the entire tree, set `recursive` to True. 1089 1090 :param ref: The ref to get the tree from. If not provided, the default branch is used. 1091 :param recursive: Whether to get the entire tree or just the top-level entries. 1092 """ 1093 1094 ref = Util.data_params_for_ref(ref).get("ref", self.default_branch) 1095 url = self.REPO_GET_TREE.format(owner=self.owner.username, repo=self.name, ref=ref) 1096 params = {"recursive": recursive} 1097 results = self.allspice_client.requests_get_paginated(url, params=params) 1098 return [GitEntry.parse_response(self.allspice_client, result) for result in results]
Get the repository's tree on a given ref.
By default, this will only return the top-level entries in the tree. If you want
to get the entire tree, set recursive to True.
Parameters
- ref: The ref to get the tree from. If not provided, the default branch is used.
- recursive: Whether to get the entire tree or just the top-level entries.
1100 def get_file_content( 1101 self, 1102 content: Content, 1103 ref: Optional[Ref] = None, 1104 commit: Optional[Commit] = None, 1105 ) -> Union[str, List["Content"]]: 1106 """https://hub.allspice.io/api/swagger#/repository/repoGetContents""" 1107 url = f"/repos/{self.owner.username}/{self.name}/contents/{content.path}" 1108 data = Util.data_params_for_ref(ref or commit) 1109 1110 if content.type == Content.FILE: 1111 return self.allspice_client.requests_get(url, data)["content"] 1112 else: 1113 return [ 1114 Content.parse_response(self.allspice_client, f) 1115 for f in self.allspice_client.requests_get(url, data) 1116 ]
allspice.allspice.io/api/swagger#/repository/repoGetContents">https://huballspice.allspice.io/api/swagger#/repository/repoGetContents
1118 def get_raw_file( 1119 self, 1120 file_path: str, 1121 ref: Optional[Ref] = None, 1122 ) -> bytes: 1123 """ 1124 Get the raw, binary data of a single file. 1125 1126 Note 1: if the file you are requesting is a text file, you might want to 1127 use .decode() on the result to get a string. For example: 1128 1129 content = repo.get_raw_file("file.txt").decode("utf-8") 1130 1131 Note 2: this method will store the entire file in memory. If you want 1132 to download a large file, you might want to use `download_to_file` 1133 instead. 1134 1135 See https://hub.allspice.io/api/swagger#/repository/repoGetRawFileOrLFS 1136 1137 :param file_path: The path to the file to get. 1138 :param ref: The branch or commit to get the file from. If not provided, 1139 the default branch is used. 1140 """ 1141 1142 url = self.REPO_GET_MEDIA.format( 1143 owner=self.owner.username, 1144 repo=self.name, 1145 path=file_path, 1146 ) 1147 params = Util.data_params_for_ref(ref) 1148 return self.allspice_client.requests_get_raw(url, params=params)
Get the raw, binary data of a single file.
Note 1: if the file you are requesting is a text file, you might want to use .decode() on the result to get a string. For example:
content = repo.get_raw_file("file.txt").decode("utf-8")
Note 2: this method will store the entire file in memory. If you want
to download a large file, you might want to use download_to_file
instead.
See allspice.allspice.io/api/swagger#/repository/repoGetRawFileOrLFS">https://huballspice.allspice.io/api/swagger#/repository/repoGetRawFileOrLFS
Parameters
- file_path: The path to the file to get.
- ref: The branch or commit to get the file from. If not provided, the default branch is used.
1150 def download_to_file( 1151 self, 1152 file_path: str, 1153 io: IO, 1154 ref: Optional[Ref] = None, 1155 ) -> None: 1156 """ 1157 Download the binary data of a file to a file-like object. 1158 1159 Example: 1160 1161 with open("schematic.DSN", "wb") as f: 1162 Repository.download_to_file("Schematics/my_schematic.DSN", f) 1163 1164 :param file_path: The path to the file in the repository from the root 1165 of the repository. 1166 :param io: The file-like object to write the data to. 1167 """ 1168 1169 url = self.allspice_client._AllSpice__get_url( 1170 self.REPO_GET_MEDIA.format( 1171 owner=self.owner.username, 1172 repo=self.name, 1173 path=file_path, 1174 ) 1175 ) 1176 params = Util.data_params_for_ref(ref) 1177 response = self.allspice_client.requests.get( 1178 url, 1179 params=params, 1180 headers=self.allspice_client.headers, 1181 stream=True, 1182 ) 1183 1184 for chunk in response.iter_content(chunk_size=4096): 1185 if chunk: 1186 io.write(chunk)
Download the binary data of a file to a file-like object.
Example:
with open("schematic.DSN", "wb") as f:
Repository.download_to_file("Schematics/my_schematic.DSN", f)
Parameters
- file_path: The path to the file in the repository from the root of the repository.
- io: The file-like object to write the data to.
1188 def get_generated_json( 1189 self, 1190 content: Union[Content, str], 1191 ref: Optional[Ref] = None, 1192 params: Optional[dict] = None, 1193 ) -> dict: 1194 """ 1195 Get the json blob for a cad file if it exists, otherwise enqueue 1196 a new job and return a 503 status. 1197 1198 WARNING: This is still experimental and not recommended for critical 1199 applications. The structure and content of the returned dictionary can 1200 change at any time. 1201 1202 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON 1203 """ 1204 1205 if isinstance(content, Content): 1206 content = content.path 1207 1208 url = self.REPO_GET_ALLSPICE_JSON.format( 1209 owner=self.owner.username, 1210 repo=self.name, 1211 content=content, 1212 ) 1213 data = Util.data_params_for_ref(ref) 1214 if params: 1215 data.update(params) 1216 if self.allspice_client.use_new_schdoc_renderer is not None: 1217 data["use_new_schdoc_renderer"] = ( 1218 "true" if self.allspice_client.use_new_schdoc_renderer else "false" 1219 ) 1220 return self.allspice_client.requests_get(url, data)
Get the json blob for a cad file if it exists, otherwise enqueue a new job and return a 503 status.
WARNING: This is still experimental and not recommended for critical applications. The structure and content of the returned dictionary can change at any time.
See allspice.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON">https://huballspice.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON
1222 def get_generated_svg( 1223 self, 1224 content: Union[Content, str], 1225 ref: Optional[Ref] = None, 1226 params: Optional[dict] = None, 1227 ) -> bytes: 1228 """ 1229 Get the svg blob for a cad file if it exists, otherwise enqueue 1230 a new job and return a 503 status. 1231 1232 WARNING: This is still experimental and not yet recommended for 1233 critical applications. The content of the returned svg can change 1234 at any time. 1235 1236 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG 1237 """ 1238 1239 if isinstance(content, Content): 1240 content = content.path 1241 1242 url = self.REPO_GET_ALLSPICE_SVG.format( 1243 owner=self.owner.username, 1244 repo=self.name, 1245 content=content, 1246 ) 1247 data = Util.data_params_for_ref(ref) 1248 if params: 1249 data.update(params) 1250 if self.allspice_client.use_new_schdoc_renderer is not None: 1251 data["use_new_schdoc_renderer"] = ( 1252 "true" if self.allspice_client.use_new_schdoc_renderer else "false" 1253 ) 1254 return self.allspice_client.requests_get_raw(url, data)
Get the svg blob for a cad file if it exists, otherwise enqueue a new job and return a 503 status.
WARNING: This is still experimental and not yet recommended for critical applications. The content of the returned svg can change at any time.
See allspice.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG">https://huballspice.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG
1256 def get_generated_projectdata( 1257 self, 1258 content: Union[Content, str], 1259 ref: Optional[Ref] = None, 1260 params: Optional[dict] = None, 1261 ) -> dict: 1262 """ 1263 Get the json project data based on the cad file provided 1264 1265 WARNING: This is still experimental and not yet recommended for 1266 critical applications. The content of the returned dictionary can change 1267 at any time. 1268 1269 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceProject 1270 """ 1271 if isinstance(content, Content): 1272 content = content.path 1273 1274 url = self.REPO_GET_ALLSPICE_PROJECT.format( 1275 owner=self.owner.username, 1276 repo=self.name, 1277 content=content, 1278 ) 1279 data = Util.data_params_for_ref(ref) 1280 if params: 1281 data.update(params) 1282 return self.allspice_client.requests_get(url, data)
Get the json project data based on the cad file provided
WARNING: This is still experimental and not yet recommended for critical applications. The content of the returned dictionary can change at any time.
See allspice.allspice.io/api/swagger#/repository/repoGetAllSpiceProject">https://huballspice.allspice.io/api/swagger#/repository/repoGetAllSpiceProject
1284 def create_file(self, file_path: str, content: str, data: Optional[dict] = None): 1285 """https://hub.allspice.io/api/swagger#/repository/repoCreateFile""" 1286 if not data: 1287 data = {} 1288 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1289 data.update({"content": content}) 1290 return self.allspice_client.requests_post(url, data)
allspice.allspice.io/api/swagger#/repository/repoCreateFile">https://huballspice.allspice.io/api/swagger#/repository/repoCreateFile
1292 def change_file(self, file_path: str, file_sha: str, content: str, data: Optional[dict] = None): 1293 """https://hub.allspice.io/api/swagger#/repository/repoCreateFile""" 1294 if not data: 1295 data = {} 1296 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1297 data.update({"sha": file_sha, "content": content}) 1298 return self.allspice_client.requests_put(url, data)
allspice.allspice.io/api/swagger#/repository/repoCreateFile">https://huballspice.allspice.io/api/swagger#/repository/repoCreateFile
1300 def delete_file(self, file_path: str, file_sha: str, data: Optional[dict] = None): 1301 """https://hub.allspice.io/api/swagger#/repository/repoDeleteFile""" 1302 if not data: 1303 data = {} 1304 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1305 data.update({"sha": file_sha}) 1306 return self.allspice_client.requests_delete(url, data)
allspice.allspice.io/api/swagger#/repository/repoDeleteFile">https://huballspice.allspice.io/api/swagger#/repository/repoDeleteFile
1308 def get_archive( 1309 self, 1310 ref: Ref = "main", 1311 archive_format: ArchiveFormat = ArchiveFormat.ZIP, 1312 ) -> bytes: 1313 """ 1314 Download all the files in a specific ref of a repository as a zip or tarball 1315 archive. 1316 1317 https://hub.allspice.io/api/swagger#/repository/repoGetArchive 1318 1319 :param ref: branch or commit to get content from, defaults to the "main" branch 1320 :param archive_format: zip or tar, defaults to zip 1321 """ 1322 1323 ref_string = Util.data_params_for_ref(ref)["ref"] 1324 url = self.REPO_GET_ARCHIVE.format( 1325 owner=self.owner.username, 1326 repo=self.name, 1327 ref=ref_string, 1328 format=archive_format.value, 1329 ) 1330 return self.allspice_client.requests_get_raw(url)
Download all the files in a specific ref of a repository as a zip or tarball archive.
allspice.allspice.io/api/swagger#/repository/repoGetArchive">https://huballspice.allspice.io/api/swagger#/repository/repoGetArchive
Parameters
- ref: branch or commit to get content from, defaults to the "main" branch
- archive_format: zip or tar, defaults to zip
1332 def get_topics(self) -> list[str]: 1333 """ 1334 Gets the list of topics on this repository. 1335 1336 See http://localhost:3000/api/swagger#/repository/repoListTopics 1337 """ 1338 1339 url = self.REPO_GET_TOPICS.format( 1340 owner=self.owner.username, 1341 repo=self.name, 1342 ) 1343 return self.allspice_client.requests_get(url)["topics"]
Gets the list of topics on this repository.
See http://localhost:3000/api/swagger#/repository/repoListTopics
1345 def add_topic(self, topic: str): 1346 """ 1347 Adds a topic to the repository. 1348 1349 See https://hub.allspice.io/api/swagger#/repository/repoAddTopic 1350 1351 :param topic: The topic to add. Topic names must consist only of 1352 lowercase letters, numnbers and dashes (-), and cannot start with 1353 dashes. Topic names also must be under 35 characters long. 1354 """ 1355 1356 url = self.REPO_ADD_TOPIC.format(owner=self.owner.username, repo=self.name, topic=topic) 1357 self.allspice_client.requests_put(url)
Adds a topic to the repository.
See allspice.allspice.io/api/swagger#/repository/repoAddTopic">https://huballspice.allspice.io/api/swagger#/repository/repoAddTopic
Parameters
- topic: The topic to add. Topic names must consist only of lowercase letters, numnbers and dashes (-), and cannot start with dashes. Topic names also must be under 35 characters long.
1359 def create_release( 1360 self, 1361 tag_name: str, 1362 name: Optional[str] = None, 1363 body: Optional[str] = None, 1364 draft: bool = False, 1365 ): 1366 """ 1367 Create a release for this repository. The release will be created for 1368 the tag with the given name. If there is no tag with this name, create 1369 the tag first. 1370 1371 See https://hub.allspice.io/api/swagger#/repository/repoCreateRelease 1372 """ 1373 1374 url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name) 1375 data = { 1376 "tag_name": tag_name, 1377 "draft": draft, 1378 } 1379 if name is not None: 1380 data["name"] = name 1381 if body is not None: 1382 data["body"] = body 1383 response = self.allspice_client.requests_post(url, data) 1384 return Release.parse_response(self.allspice_client, response, self)
Create a release for this repository. The release will be created for the tag with the given name. If there is no tag with this name, create the tag first.
See allspice.allspice.io/api/swagger#/repository/repoCreateRelease">https://huballspice.allspice.io/api/swagger#/repository/repoCreateRelease
1386 def get_releases( 1387 self, draft: Optional[bool] = None, pre_release: Optional[bool] = None 1388 ) -> List[Release]: 1389 """ 1390 Get the list of releases for this repository. 1391 1392 See https://hub.allspice.io/api/swagger#/repository/repoListReleases 1393 """ 1394 1395 data = {} 1396 1397 if draft is not None: 1398 data["draft"] = draft 1399 if pre_release is not None: 1400 data["pre-release"] = pre_release 1401 1402 url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name) 1403 responses = self.allspice_client.requests_get_paginated(url, params=data) 1404 1405 return [ 1406 Release.parse_response(self.allspice_client, response, self) for response in responses 1407 ]
Get the list of releases for this repository.
See allspice.allspice.io/api/swagger#/repository/repoListReleases">https://huballspice.allspice.io/api/swagger#/repository/repoListReleases
1409 def get_latest_release(self) -> Release: 1410 """ 1411 Get the latest release for this repository. 1412 1413 See https://hub.allspice.io/api/swagger#/repository/repoGetLatestRelease 1414 """ 1415 1416 url = self.REPO_GET_LATEST_RELEASE.format(owner=self.owner.username, repo=self.name) 1417 response = self.allspice_client.requests_get(url) 1418 release = Release.parse_response(self.allspice_client, response, self) 1419 return release
Get the latest release for this repository.
See allspice.allspice.io/api/swagger#/repository/repoGetLatestRelease">https://huballspice.allspice.io/api/swagger#/repository/repoGetLatestRelease
1421 def get_release_by_tag(self, tag: str) -> Release: 1422 """ 1423 Get a release by its tag. 1424 1425 See https://hub.allspice.io/api/swagger#/repository/repoGetReleaseByTag 1426 """ 1427 1428 url = self.REPO_GET_RELEASE_BY_TAG.format( 1429 owner=self.owner.username, repo=self.name, tag=tag 1430 ) 1431 response = self.allspice_client.requests_get(url) 1432 release = Release.parse_response(self.allspice_client, response, self) 1433 return release
Get a release by its tag.
See allspice.allspice.io/api/swagger#/repository/repoGetReleaseByTag">https://huballspice.allspice.io/api/swagger#/repository/repoGetReleaseByTag
1435 def get_commit_statuses( 1436 self, 1437 commit: Union[str, Commit], 1438 sort: Optional[CommitStatusSort] = None, 1439 state: Optional[CommitStatusState] = None, 1440 ) -> List[CommitStatus]: 1441 """ 1442 Get a list of statuses for a commit. 1443 1444 This is roughly equivalent to the Commit.get_statuses method, but this 1445 method allows you to sort and filter commits and is more convenient if 1446 you have a commit SHA and don't need to get the commit itself. 1447 1448 See https://hub.allspice.io/api/swagger#/repository/repoListStatuses 1449 """ 1450 1451 if isinstance(commit, Commit): 1452 commit = commit.sha 1453 1454 params = {} 1455 if sort is not None: 1456 params["sort"] = sort.value 1457 if state is not None: 1458 params["state"] = state.value 1459 1460 url = self.REPO_GET_COMMIT_STATUS.format( 1461 owner=self.owner.username, repo=self.name, sha=commit 1462 ) 1463 response = self.allspice_client.requests_get_paginated(url, params=params) 1464 return [CommitStatus.parse_response(self.allspice_client, status) for status in response]
Get a list of statuses for a commit.
This is roughly equivalent to the Commit.get_statuses method, but this method allows you to sort and filter commits and is more convenient if you have a commit SHA and don't need to get the commit itself.
See allspice.allspice.io/api/swagger#/repository/repoListStatuses">https://huballspice.allspice.io/api/swagger#/repository/repoListStatuses
1466 def create_commit_status( 1467 self, 1468 commit: Union[str, Commit], 1469 context: Optional[str] = None, 1470 description: Optional[str] = None, 1471 state: Optional[CommitStatusState] = None, 1472 target_url: Optional[str] = None, 1473 ) -> CommitStatus: 1474 """ 1475 Create a status on a commit. 1476 1477 See https://hub.allspice.io/api/swagger#/repository/repoCreateStatus 1478 """ 1479 1480 if isinstance(commit, Commit): 1481 commit = commit.sha 1482 1483 data = {} 1484 if context is not None: 1485 data["context"] = context 1486 if description is not None: 1487 data["description"] = description 1488 if state is not None: 1489 data["state"] = state.value 1490 if target_url is not None: 1491 data["target_url"] = target_url 1492 1493 url = self.REPO_GET_COMMIT_STATUS.format( 1494 owner=self.owner.username, repo=self.name, sha=commit 1495 ) 1496 response = self.allspice_client.requests_post(url, data=data) 1497 return CommitStatus.parse_response(self.allspice_client, response)
Create a status on a commit.
See allspice.allspice.io/api/swagger#/repository/repoCreateStatus">https://huballspice.allspice.io/api/swagger#/repository/repoCreateStatus
573 class ArchiveFormat(Enum): 574 """ 575 Archive formats for Repository.get_archive 576 """ 577 578 TAR = "tar.gz" 579 ZIP = "zip"
Archive formats for Repository.get_archive
581 class CommitStatusSort(Enum): 582 """ 583 Sort order for Repository.get_commit_status 584 """ 585 586 OLDEST = "oldest" 587 RECENT_UPDATE = "recentupdate" 588 LEAST_UPDATE = "leastupdate" 589 LEAST_INDEX = "leastindex" 590 HIGHEST_INDEX = "highestindex"
Sort order for Repository.get_commit_status
2607class Team(ApiObject): 2608 can_create_org_repo: bool 2609 description: str 2610 id: int 2611 includes_all_repositories: bool 2612 name: str 2613 organization: Optional["Organization"] 2614 permission: str 2615 units: List[str] 2616 units_map: Dict[str, str] 2617 2618 API_OBJECT = """/teams/{id}""" # <id> 2619 ADD_REPO = """/teams/%s/repos/%s/%s""" # <id, org, repo> 2620 TEAM_DELETE = """/teams/%s""" # <id> 2621 GET_MEMBERS = """/teams/%s/members""" # <id> 2622 GET_REPOS = """/teams/%s/repos""" # <id> 2623 2624 def __init__(self, allspice_client): 2625 super().__init__(allspice_client) 2626 2627 def __eq__(self, other): 2628 if not isinstance(other, Team): 2629 return False 2630 return self.organization == other.organization and self.id == other.id 2631 2632 def __hash__(self): 2633 return hash(self.organization) ^ hash(self.id) 2634 2635 _fields_to_parsers: ClassVar[dict] = { 2636 "organization": lambda allspice_client, o: Organization.parse_response(allspice_client, o) 2637 } 2638 2639 _patchable_fields: ClassVar[set[str]] = { 2640 "can_create_org_repo", 2641 "description", 2642 "includes_all_repositories", 2643 "name", 2644 "permission", 2645 "units", 2646 "units_map", 2647 } 2648 2649 @classmethod 2650 def request(cls, allspice_client, id: int): 2651 return cls._request(allspice_client, {"id": id}) 2652 2653 def commit(self): 2654 args = {"id": self.id} 2655 self._commit(args) 2656 2657 def add_user(self, user: User): 2658 """https://hub.allspice.io/api/swagger#/organization/orgAddTeamMember""" 2659 url = f"/teams/{self.id}/members/{user.login}" 2660 self.allspice_client.requests_put(url) 2661 2662 def add_repo(self, org: Organization, repo: Union[Repository, str]): 2663 if isinstance(repo, Repository): 2664 repo_name = repo.name 2665 else: 2666 repo_name = repo 2667 self.allspice_client.requests_put(Team.ADD_REPO % (self.id, org.username, repo_name)) 2668 2669 def get_members(self): 2670 """Get all users assigned to the team.""" 2671 results = self.allspice_client.requests_get_paginated( 2672 Team.GET_MEMBERS % self.id, 2673 ) 2674 return [User.parse_response(self.allspice_client, result) for result in results] 2675 2676 def get_repos(self): 2677 """Get all repos of this Team.""" 2678 results = self.allspice_client.requests_get(Team.GET_REPOS % self.id) 2679 return [Repository.parse_response(self.allspice_client, result) for result in results] 2680 2681 def delete(self): 2682 self.allspice_client.requests_delete(Team.TEAM_DELETE % self.id) 2683 self.deleted = True 2684 2685 def remove_team_member(self, user_name: str): 2686 url = f"/teams/{self.id}/members/{user_name}" 2687 self.allspice_client.requests_delete(url)
2657 def add_user(self, user: User): 2658 """https://hub.allspice.io/api/swagger#/organization/orgAddTeamMember""" 2659 url = f"/teams/{self.id}/members/{user.login}" 2660 self.allspice_client.requests_put(url)
allspice.allspice.io/api/swagger#/organization/orgAddTeamMember">https://huballspice.allspice.io/api/swagger#/organization/orgAddTeamMember
2669 def get_members(self): 2670 """Get all users assigned to the team.""" 2671 results = self.allspice_client.requests_get_paginated( 2672 Team.GET_MEMBERS % self.id, 2673 ) 2674 return [User.parse_response(self.allspice_client, result) for result in results]
Get all users assigned to the team.
2676 def get_repos(self): 2677 """Get all repos of this Team.""" 2678 results = self.allspice_client.requests_get(Team.GET_REPOS % self.id) 2679 return [Repository.parse_response(self.allspice_client, result) for result in results]
Get all repos of this Team.
244class User(ApiObject): 245 active: bool 246 admin: Any 247 allow_create_organization: Any 248 allow_git_hook: Any 249 allow_import_local: Any 250 avatar_url: str 251 created: str 252 description: str 253 email: str 254 emails: List[Any] 255 followers_count: int 256 following_count: int 257 full_name: str 258 html_url: str 259 id: int 260 is_admin: bool 261 language: str 262 last_login: str 263 location: str 264 login: str 265 login_name: str 266 max_repo_creation: Any 267 must_change_password: Any 268 password: Any 269 prohibit_login: bool 270 restricted: bool 271 source_id: int 272 starred_repos_count: int 273 username: str 274 visibility: str 275 website: str 276 277 API_OBJECT = """/users/{name}""" # <org> 278 USER_MAIL = """/user/emails?sudo=%s""" # <name> 279 USER_PATCH = """/admin/users/%s""" # <username> 280 ADMIN_DELETE_USER = """/admin/users/%s""" # <username> 281 ADMIN_EDIT_USER = """/admin/users/{username}""" # <username> 282 USER_HEATMAP = """/users/%s/heatmap""" # <username> 283 284 def __init__(self, allspice_client): 285 super().__init__(allspice_client) 286 self._emails = [] 287 288 def __eq__(self, other): 289 if not isinstance(other, User): 290 return False 291 return self.allspice_client == other.allspice_client and self.id == other.id 292 293 def __hash__(self): 294 return hash(self.allspice_client) ^ hash(self.id) 295 296 @property 297 def emails(self): 298 self.__request_emails() 299 return self._emails 300 301 @classmethod 302 def request(cls, allspice_client, name: str) -> "User": 303 api_object = cls._request(allspice_client, {"name": name}) 304 return api_object 305 306 _patchable_fields: ClassVar[set[str]] = { 307 "active", 308 "admin", 309 "allow_create_organization", 310 "allow_git_hook", 311 "allow_import_local", 312 "email", 313 "full_name", 314 "location", 315 "login_name", 316 "max_repo_creation", 317 "must_change_password", 318 "password", 319 "prohibit_login", 320 "website", 321 } 322 323 def commit(self, login_name: str, source_id: int = 0): 324 """ 325 Unfortunately it is necessary to require the login name 326 as well as the login source (that is not supplied when getting a user) for 327 changing a user. 328 Usually source_id is 0 and the login_name is equal to the username. 329 """ 330 values = self.get_dirty_fields() 331 values.update( 332 # api-doc says that the "source_id" is necessary; works without though 333 {"login_name": login_name, "source_id": source_id} 334 ) 335 args = {"username": self.username} 336 self.allspice_client.requests_patch(User.ADMIN_EDIT_USER.format(**args), data=values) 337 self._dirty_fields = {} 338 339 def create_repo( 340 self, 341 repoName: str, 342 description: str = "", 343 private: bool = False, 344 autoInit=True, 345 gitignores: Optional[str] = None, 346 license: Optional[str] = None, 347 readme: str = "Default", 348 issue_labels: Optional[str] = None, 349 default_branch="master", 350 ): 351 """Create a user Repository 352 353 Throws: 354 AlreadyExistsException: If the Repository exists already. 355 Exception: If something else went wrong. 356 """ 357 result = self.allspice_client.requests_post( 358 "/user/repos", 359 data={ 360 "name": repoName, 361 "description": description, 362 "private": private, 363 "auto_init": autoInit, 364 "gitignores": gitignores, 365 "license": license, 366 "issue_labels": issue_labels, 367 "readme": readme, 368 "default_branch": default_branch, 369 }, 370 ) 371 if "id" in result: 372 self.allspice_client.logger.info("Successfully created Repository %s " % result["name"]) 373 else: 374 self.allspice_client.logger.error(result["message"]) 375 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 376 return Repository.parse_response(self.allspice_client, result) 377 378 def get_repositories(self) -> List["Repository"]: 379 """Get all Repositories owned by this User.""" 380 url = f"/users/{self.username}/repos" 381 results = self.allspice_client.requests_get_paginated(url) 382 return [Repository.parse_response(self.allspice_client, result) for result in results] 383 384 def get_orgs(self) -> List[Organization]: 385 """Get all Organizations this user is a member of.""" 386 url = f"/users/{self.username}/orgs" 387 results = self.allspice_client.requests_get_paginated(url) 388 return [Organization.parse_response(self.allspice_client, result) for result in results] 389 390 def get_teams(self) -> List["Team"]: 391 url = "/user/teams" 392 results = self.allspice_client.requests_get_paginated(url, sudo=self) 393 return [Team.parse_response(self.allspice_client, result) for result in results] 394 395 def get_accessible_repos(self) -> List["Repository"]: 396 """Get all Repositories accessible by the logged in User.""" 397 results = self.allspice_client.requests_get("/user/repos", sudo=self) 398 return [Repository.parse_response(self.allspice_client, result) for result in results] 399 400 def __request_emails(self): 401 result = self.allspice_client.requests_get(User.USER_MAIL % self.login) 402 # report if the adress changed by this 403 for mail in result: 404 self._emails.append(mail["email"]) 405 if mail["primary"]: 406 self._email = mail["email"] 407 408 def delete(self): 409 """Deletes this User. Also deletes all Repositories he owns.""" 410 self.allspice_client.requests_delete(User.ADMIN_DELETE_USER % self.username) 411 self.deleted = True 412 413 def get_heatmap(self) -> List[Tuple[datetime, int]]: 414 results = self.allspice_client.requests_get(User.USER_HEATMAP % self.username) 415 results = [ 416 (datetime.fromtimestamp(result["timestamp"]), result["contributions"]) 417 for result in results 418 ] 419 return results
323 def commit(self, login_name: str, source_id: int = 0): 324 """ 325 Unfortunately it is necessary to require the login name 326 as well as the login source (that is not supplied when getting a user) for 327 changing a user. 328 Usually source_id is 0 and the login_name is equal to the username. 329 """ 330 values = self.get_dirty_fields() 331 values.update( 332 # api-doc says that the "source_id" is necessary; works without though 333 {"login_name": login_name, "source_id": source_id} 334 ) 335 args = {"username": self.username} 336 self.allspice_client.requests_patch(User.ADMIN_EDIT_USER.format(**args), data=values) 337 self._dirty_fields = {}
Unfortunately it is necessary to require the login name as well as the login source (that is not supplied when getting a user) for changing a user. Usually source_id is 0 and the login_name is equal to the username.
339 def create_repo( 340 self, 341 repoName: str, 342 description: str = "", 343 private: bool = False, 344 autoInit=True, 345 gitignores: Optional[str] = None, 346 license: Optional[str] = None, 347 readme: str = "Default", 348 issue_labels: Optional[str] = None, 349 default_branch="master", 350 ): 351 """Create a user Repository 352 353 Throws: 354 AlreadyExistsException: If the Repository exists already. 355 Exception: If something else went wrong. 356 """ 357 result = self.allspice_client.requests_post( 358 "/user/repos", 359 data={ 360 "name": repoName, 361 "description": description, 362 "private": private, 363 "auto_init": autoInit, 364 "gitignores": gitignores, 365 "license": license, 366 "issue_labels": issue_labels, 367 "readme": readme, 368 "default_branch": default_branch, 369 }, 370 ) 371 if "id" in result: 372 self.allspice_client.logger.info("Successfully created Repository %s " % result["name"]) 373 else: 374 self.allspice_client.logger.error(result["message"]) 375 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 376 return Repository.parse_response(self.allspice_client, result)
Create a user Repository
Throws: AlreadyExistsException: If the Repository exists already. Exception: If something else went wrong.
378 def get_repositories(self) -> List["Repository"]: 379 """Get all Repositories owned by this User.""" 380 url = f"/users/{self.username}/repos" 381 results = self.allspice_client.requests_get_paginated(url) 382 return [Repository.parse_response(self.allspice_client, result) for result in results]
Get all Repositories owned by this User.
384 def get_orgs(self) -> List[Organization]: 385 """Get all Organizations this user is a member of.""" 386 url = f"/users/{self.username}/orgs" 387 results = self.allspice_client.requests_get_paginated(url) 388 return [Organization.parse_response(self.allspice_client, result) for result in results]
Get all Organizations this user is a member of.
395 def get_accessible_repos(self) -> List["Repository"]: 396 """Get all Repositories accessible by the logged in User.""" 397 results = self.allspice_client.requests_get("/user/repos", sudo=self) 398 return [Repository.parse_response(self.allspice_client, result) for result in results]
Get all Repositories accessible by the logged in User.
408 def delete(self): 409 """Deletes this User. Also deletes all Repositories he owns.""" 410 self.allspice_client.requests_delete(User.ADMIN_DELETE_USER % self.username) 411 self.deleted = True
Deletes this User. Also deletes all Repositories he owns.