allspice
A very simple API client for AllSpice Hub
Note that not the full Swagger-API is accessible. The whole implementation is focused on making access and working with Organizations, Teams, Repositories and Users as pain free as possible.
Forked from https://github.com/Langenfeld/py-gitea.
Usage
Docs
See the documentation site.
Examples
Check the examples directory for full, working example scripts that you can adapt or refer to for your own needs.
Quickstart
First get an allspice_client object wrapping access and authentication (via an api token) for your instance of AllSpice Hub.
from allspice import *
# By default, points to hub.allspice.io.
allspice_client = AllSpice(token_text=TOKEN)
# If you are self-hosting:
allspice_client = AllSpice(allspice_hub_url=URL, token_text=TOKEN)
Operations like requesting the AllSpice version or authentication user can be requested directly from the allspice_client object:
print("AllSpice Version: " + allspice_client.get_version())
print("API-Token belongs to user: " + allspice_client.get_user().username)
Adding entities like Users, Organizations, ... also is done via the allspice_client object.
user = allspice_client.create_user("Test Testson", "test@test.test", "password")
All operations on entities in allspice are then accomplished via the according wrapper objects for those entities.
Each of those objects has a .request method that creates an entity according to your allspice_client instance.
other_user = User.request(allspice_client, "OtherUserName")
print(other_user.username)
Note that the fields of the User, Organization,... classes are dynamically created at runtime, and thus not visible during divelopment. Refer to the AllSpice API documentation for the fields names.
Fields that can not be altered via allspice-api, are read only. After altering a field, the .commit method of the according object must be called to synchronize the changed fields with your allspice_client instance.
org = Organization.request(allspice_client, test_org)
org.description = "some new description"
org.location = "some new location"
org.commit()
An entity in allspice can be deleted by calling delete.
org.delete()
All entity objects do have methods to execute some of the requests possible though the AllSpice api:
org = Organization.request(allspice_client, ORGNAME)
teams = org.get_teams()
for team in teams:
repos = team.get_repos()
for repo in repos:
print(repo.name)
1""" 2.. include:: ../README.md 3 :start-line: 1 4 :end-before: Installation 5""" 6 7from .allspice import ( 8 AllSpice, 9) 10from .apiobject import ( 11 Branch, 12 Comment, 13 Commit, 14 Content, 15 DesignReview, 16 DesignReviewReview, 17 Issue, 18 Milestone, 19 Organization, 20 Release, 21 Repository, 22 Team, 23 User, 24) 25from .exceptions import AlreadyExistsException, NotFoundException 26 27__version__ = "3.12.1" 28 29__all__ = [ 30 "AllSpice", 31 "AlreadyExistsException", 32 "Branch", 33 "Comment", 34 "Commit", 35 "Content", 36 "DesignReview", 37 "DesignReviewReview", 38 "Issue", 39 "Milestone", 40 "NotFoundException", 41 "Organization", 42 "Release", 43 "Repository", 44 "Team", 45 "User", 46]
21class AllSpice: 22 """Object to establish a session with AllSpice Hub.""" 23 24 ADMIN_CREATE_USER = """/admin/users""" 25 GET_USERS_ADMIN = """/admin/users""" 26 ADMIN_REPO_CREATE = """/admin/users/%s/repos""" # <ownername> 27 ALLSPICE_HUB_VERSION = """/version""" 28 GET_USER = """/user""" 29 GET_REPOSITORY = """/repos/{owner}/{name}""" 30 CREATE_ORG = """/admin/users/%s/orgs""" # <username> 31 CREATE_TEAM = """/orgs/%s/teams""" # <orgname> 32 33 def __init__( 34 self, 35 allspice_hub_url="https://hub.allspice.io", 36 token_text=None, 37 auth=None, 38 verify=True, 39 log_level="INFO", 40 ratelimiting=(100, 60), 41 ): 42 """Initializing an instance of the AllSpice Hub Client 43 44 Args: 45 allspice_hub_url (str): The URL for the AllSpice Hub instance. 46 Defaults to `https://hub.allspice.io`. 47 48 token_text (str, None): The access token, by default None. 49 50 auth (tuple, None): The user credentials 51 `(username, password)`, by default None. 52 53 verify (bool): If True, allow insecure server connections 54 when using SSL. 55 56 log_level (str): The log level, by default `INFO`. 57 58 ratelimiting (tuple[int, int], None): `(max_calls, period)`, 59 If None, no rate limiting is applied. By default, 100 calls 60 per minute are allowed. 61 """ 62 63 self.logger = logging.getLogger(__name__) 64 handler = logging.StreamHandler(sys.stderr) 65 handler.setFormatter( 66 logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") 67 ) 68 self.logger.addHandler(handler) 69 self.logger.setLevel(log_level) 70 self.headers = { 71 "Content-type": "application/json", 72 } 73 self.url = allspice_hub_url 74 75 if ratelimiting is None: 76 self.requests = requests.Session() 77 else: 78 (max_calls, period) = ratelimiting 79 self.requests = RateLimitedSession(max_calls=max_calls, period=period) 80 81 # Manage authentification 82 if not token_text and not auth: 83 raise ValueError("Please provide auth or token_text, but not both") 84 if token_text: 85 self.headers["Authorization"] = "token " + token_text 86 if auth: 87 self.logger.warning( 88 "Using basic auth is not recommended. Prefer using a token instead." 89 ) 90 self.requests.auth = auth 91 92 # Manage SSL certification verification 93 self.requests.verify = verify 94 if not verify: 95 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) 96 97 def __get_url(self, endpoint): 98 url = self.url + "/api/v1" + endpoint 99 self.logger.debug("Url: %s" % url) 100 return url 101 102 def __get(self, endpoint: str, params: Mapping = frozendict()) -> requests.Response: 103 request = self.requests.get(self.__get_url(endpoint), headers=self.headers, params=params) 104 if request.status_code not in [200, 201]: 105 message = f"Received status code: {request.status_code} ({request.url})" 106 if request.status_code in [404]: 107 raise NotFoundException(message) 108 if request.status_code in [403]: 109 raise Exception( 110 f"Unauthorized: {request.url} - Check your permissions and try again! ({message})" 111 ) 112 if request.status_code in [409]: 113 raise ConflictException(message) 114 if request.status_code in [503]: 115 raise NotYetGeneratedException(message) 116 raise Exception(message) 117 return request 118 119 @staticmethod 120 def parse_result(result) -> Dict: 121 """Parses the result-JSON to a dict.""" 122 if result.text and len(result.text) > 3: 123 return json.loads(result.text) 124 return {} 125 126 def requests_get(self, endpoint: str, params: Mapping = frozendict(), sudo=None): 127 combined_params = {} 128 combined_params.update(params) 129 if sudo: 130 combined_params["sudo"] = sudo.username 131 return self.parse_result(self.__get(endpoint, combined_params)) 132 133 def requests_get_raw(self, endpoint: str, params=frozendict(), sudo=None) -> bytes: 134 combined_params = {} 135 combined_params.update(params) 136 if sudo: 137 combined_params["sudo"] = sudo.username 138 return self.__get(endpoint, combined_params).content 139 140 def requests_get_paginated( 141 self, 142 endpoint: str, 143 params=frozendict(), 144 sudo=None, 145 page_key: str = "page", 146 first_page: int = 1, 147 ): 148 page = first_page 149 combined_params = {} 150 combined_params.update(params) 151 aggregated_result = [] 152 while True: 153 combined_params[page_key] = page 154 result = self.requests_get(endpoint, combined_params, sudo) 155 156 if not result: 157 return aggregated_result 158 159 if isinstance(result, dict): 160 if "data" in result: 161 data = result["data"] 162 if len(data) == 0: 163 return aggregated_result 164 aggregated_result.extend(data) 165 elif "tree" in result: 166 data = result["tree"] 167 if data is None or len(data) == 0: 168 return aggregated_result 169 aggregated_result.extend(data) 170 else: 171 raise NotImplementedError( 172 "requests_get_paginated does not know how to handle responses of this type." 173 ) 174 else: 175 aggregated_result.extend(result) 176 177 page += 1 178 179 def requests_put(self, endpoint: str, data: Optional[dict] = None): 180 if not data: 181 data = {} 182 request = self.requests.put( 183 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 184 ) 185 if request.status_code not in [200, 204]: 186 message = f"Received status code: {request.status_code} ({request.url}) {request.text}" 187 self.logger.error(message) 188 raise Exception(message) 189 190 def requests_delete(self, endpoint: str, data: Optional[dict] = None): 191 request = self.requests.delete( 192 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 193 ) 194 if request.status_code not in [200, 204]: 195 message = f"Received status code: {request.status_code} ({request.url})" 196 self.logger.error(message) 197 raise Exception(message) 198 199 def requests_post( 200 self, 201 endpoint: str, 202 data: Optional[dict] = None, 203 params: Optional[dict] = None, 204 files: Optional[dict] = None, 205 ): 206 """ 207 Make a POST call to the endpoint. 208 209 :param endpoint: The path to the endpoint 210 :param data: A dictionary for JSON data 211 :param params: A dictionary of query params 212 :param files: A dictionary of files, see requests.post. Using both files and data 213 can lead to unexpected results! 214 :return: The JSON response parsed as a dict 215 """ 216 217 # This should ideally be a TypedDict of the type of arguments taken by 218 # `requests.post`. 219 args: dict[str, Any] = { 220 "headers": self.headers.copy(), 221 } 222 if data is not None: 223 args["data"] = json.dumps(data) 224 if params is not None: 225 args["params"] = params 226 if files is not None: 227 args["headers"].pop("Content-type") 228 args["files"] = files 229 230 request = self.requests.post(self.__get_url(endpoint), **args) 231 232 if request.status_code not in [200, 201, 202]: 233 if "already exists" in request.text or "e-mail already in use" in request.text: 234 self.logger.warning(request.text) 235 raise AlreadyExistsException() 236 self.logger.error(f"Received status code: {request.status_code} ({request.url})") 237 self.logger.error(f"With info: {data} ({self.headers})") 238 self.logger.error(f"Answer: {request.text}") 239 raise Exception( 240 f"Received status code: {request.status_code} ({request.url}), {request.text}" 241 ) 242 return self.parse_result(request) 243 244 def requests_patch(self, endpoint: str, data: dict): 245 request = self.requests.patch( 246 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 247 ) 248 if request.status_code not in [200, 201]: 249 error_message = f"Received status code: {request.status_code} ({request.url}) {data}" 250 self.logger.error(error_message) 251 raise Exception(error_message) 252 return self.parse_result(request) 253 254 def get_orgs_public_members_all(self, orgname): 255 path = "/orgs/" + orgname + "/public_members" 256 return self.requests_get(path) 257 258 def get_orgs(self): 259 path = "/admin/orgs" 260 results = self.requests_get(path) 261 return [Organization.parse_response(self, result) for result in results] 262 263 def get_user(self): 264 result = self.requests_get(AllSpice.GET_USER) 265 return User.parse_response(self, result) 266 267 def get_version(self) -> str: 268 result = self.requests_get(AllSpice.ALLSPICE_HUB_VERSION) 269 return result["version"] 270 271 def get_users(self) -> List[User]: 272 results = self.requests_get(AllSpice.GET_USERS_ADMIN) 273 return [User.parse_response(self, result) for result in results] 274 275 def get_user_by_email(self, email: str) -> Optional[User]: 276 users = self.get_users() 277 for user in users: 278 if user.email == email or email in user.emails: 279 return user 280 return None 281 282 def get_user_by_name(self, username: str) -> Optional[User]: 283 users = self.get_users() 284 for user in users: 285 if user.username == username: 286 return user 287 return None 288 289 def get_repository(self, owner: str, name: str) -> Repository: 290 path = self.GET_REPOSITORY.format(owner=owner, name=name) 291 result = self.requests_get(path) 292 return Repository.parse_response(self, result) 293 294 def create_user( 295 self, 296 user_name: str, 297 email: str, 298 password: str, 299 full_name: Optional[str] = None, 300 login_name: Optional[str] = None, 301 change_pw=True, 302 send_notify=True, 303 source_id=0, 304 ): 305 """Create User. 306 Throws: 307 AlreadyExistsException, if the User exists already 308 Exception, if something else went wrong. 309 """ 310 if not login_name: 311 login_name = user_name 312 if not full_name: 313 full_name = user_name 314 request_data = { 315 "source_id": source_id, 316 "login_name": login_name, 317 "full_name": full_name, 318 "username": user_name, 319 "email": email, 320 "password": password, 321 "send_notify": send_notify, 322 "must_change_password": change_pw, 323 } 324 325 self.logger.debug("Gitea post payload: %s", request_data) 326 result = self.requests_post(AllSpice.ADMIN_CREATE_USER, data=request_data) 327 if "id" in result: 328 self.logger.info( 329 "Successfully created User %s <%s> (id %s)", 330 result["login"], 331 result["email"], 332 result["id"], 333 ) 334 self.logger.debug("Gitea response: %s", result) 335 else: 336 self.logger.error(result["message"]) 337 raise Exception("User not created... (gitea: %s)" % result["message"]) 338 user = User.parse_response(self, result) 339 return user 340 341 def create_repo( 342 self, 343 repoOwner: Union[User, Organization], 344 repoName: str, 345 description: str = "", 346 private: bool = False, 347 autoInit=True, 348 gitignores: Optional[str] = None, 349 license: Optional[str] = None, 350 readme: str = "Default", 351 issue_labels: Optional[str] = None, 352 default_branch="master", 353 ): 354 """Create a Repository as the administrator 355 356 Throws: 357 AlreadyExistsException: If the Repository exists already. 358 Exception: If something else went wrong. 359 360 Note: 361 Non-admin users can not use this method. Please use instead 362 `allspice.User.create_repo` or `allspice.Organization.create_repo`. 363 """ 364 # although this only says user in the api, this also works for 365 # organizations 366 assert isinstance(repoOwner, User) or isinstance(repoOwner, Organization) 367 result = self.requests_post( 368 AllSpice.ADMIN_REPO_CREATE % repoOwner.username, 369 data={ 370 "name": repoName, 371 "description": description, 372 "private": private, 373 "auto_init": autoInit, 374 "gitignores": gitignores, 375 "license": license, 376 "issue_labels": issue_labels, 377 "readme": readme, 378 "default_branch": default_branch, 379 }, 380 ) 381 if "id" in result: 382 self.logger.info("Successfully created Repository %s " % result["name"]) 383 else: 384 self.logger.error(result["message"]) 385 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 386 return Repository.parse_response(self, result) 387 388 def create_org( 389 self, 390 owner: User, 391 orgName: str, 392 description: str, 393 location="", 394 website="", 395 full_name="", 396 ): 397 assert isinstance(owner, User) 398 result = self.requests_post( 399 AllSpice.CREATE_ORG % owner.username, 400 data={ 401 "username": orgName, 402 "description": description, 403 "location": location, 404 "website": website, 405 "full_name": full_name, 406 }, 407 ) 408 if "id" in result: 409 self.logger.info("Successfully created Organization %s" % result["username"]) 410 else: 411 self.logger.error("Organization not created... (gitea: %s)" % result["message"]) 412 self.logger.error(result["message"]) 413 raise Exception("Organization not created... (gitea: %s)" % result["message"]) 414 return Organization.parse_response(self, result) 415 416 def create_team( 417 self, 418 org: Organization, 419 name: str, 420 description: str = "", 421 permission: str = "read", 422 can_create_org_repo: bool = False, 423 includes_all_repositories: bool = False, 424 units=( 425 "repo.code", 426 "repo.issues", 427 "repo.ext_issues", 428 "repo.wiki", 429 "repo.pulls", 430 "repo.releases", 431 "repo.ext_wiki", 432 ), 433 units_map={}, 434 ): 435 """Creates a Team. 436 437 Args: 438 org (Organization): Organization the Team will be part of. 439 name (str): The Name of the Team to be created. 440 description (str): Optional, None, short description of the new Team. 441 permission (str): Optional, 'read', What permissions the members 442 units_map (dict): Optional, {}, a mapping of units to their 443 permissions. If None or empty, the `permission` permission will 444 be applied to all units. Note: When both `units` and `units_map` 445 are given, `units_map` will be preferred. 446 """ 447 448 result = self.requests_post( 449 AllSpice.CREATE_TEAM % org.username, 450 data={ 451 "name": name, 452 "description": description, 453 "permission": permission, 454 "can_create_org_repo": can_create_org_repo, 455 "includes_all_repositories": includes_all_repositories, 456 "units": units, 457 "units_map": units_map, 458 }, 459 ) 460 461 if "id" in result: 462 self.logger.info("Successfully created Team %s" % result["name"]) 463 else: 464 self.logger.error("Team not created... (gitea: %s)" % result["message"]) 465 self.logger.error(result["message"]) 466 raise Exception("Team not created... (gitea: %s)" % result["message"]) 467 api_object = Team.parse_response(self, result) 468 setattr( 469 api_object, "_organization", org 470 ) # fixes strange behaviour of gitea not returning a valid organization here. 471 return api_object
Object to establish a session with AllSpice Hub.
33 def __init__( 34 self, 35 allspice_hub_url="https://hub.allspice.io", 36 token_text=None, 37 auth=None, 38 verify=True, 39 log_level="INFO", 40 ratelimiting=(100, 60), 41 ): 42 """Initializing an instance of the AllSpice Hub Client 43 44 Args: 45 allspice_hub_url (str): The URL for the AllSpice Hub instance. 46 Defaults to `https://hub.allspice.io`. 47 48 token_text (str, None): The access token, by default None. 49 50 auth (tuple, None): The user credentials 51 `(username, password)`, by default None. 52 53 verify (bool): If True, allow insecure server connections 54 when using SSL. 55 56 log_level (str): The log level, by default `INFO`. 57 58 ratelimiting (tuple[int, int], None): `(max_calls, period)`, 59 If None, no rate limiting is applied. By default, 100 calls 60 per minute are allowed. 61 """ 62 63 self.logger = logging.getLogger(__name__) 64 handler = logging.StreamHandler(sys.stderr) 65 handler.setFormatter( 66 logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") 67 ) 68 self.logger.addHandler(handler) 69 self.logger.setLevel(log_level) 70 self.headers = { 71 "Content-type": "application/json", 72 } 73 self.url = allspice_hub_url 74 75 if ratelimiting is None: 76 self.requests = requests.Session() 77 else: 78 (max_calls, period) = ratelimiting 79 self.requests = RateLimitedSession(max_calls=max_calls, period=period) 80 81 # Manage authentification 82 if not token_text and not auth: 83 raise ValueError("Please provide auth or token_text, but not both") 84 if token_text: 85 self.headers["Authorization"] = "token " + token_text 86 if auth: 87 self.logger.warning( 88 "Using basic auth is not recommended. Prefer using a token instead." 89 ) 90 self.requests.auth = auth 91 92 # Manage SSL certification verification 93 self.requests.verify = verify 94 if not verify: 95 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
Initializing an instance of the AllSpice Hub Client
Args:
allspice_hub_url (str): The URL for the AllSpice Hub instance.
Defaults to https://hub.allspice.io.
token_text (str, None): The access token, by default None.
auth (tuple, None): The user credentials
`(username, password)`, by default None.
verify (bool): If True, allow insecure server connections
when using SSL.
log_level (str): The log level, by default `INFO`.
ratelimiting (tuple[int, int], None): `(max_calls, period)`,
If None, no rate limiting is applied. By default, 100 calls
per minute are allowed.
119 @staticmethod 120 def parse_result(result) -> Dict: 121 """Parses the result-JSON to a dict.""" 122 if result.text and len(result.text) > 3: 123 return json.loads(result.text) 124 return {}
Parses the result-JSON to a dict.
140 def requests_get_paginated( 141 self, 142 endpoint: str, 143 params=frozendict(), 144 sudo=None, 145 page_key: str = "page", 146 first_page: int = 1, 147 ): 148 page = first_page 149 combined_params = {} 150 combined_params.update(params) 151 aggregated_result = [] 152 while True: 153 combined_params[page_key] = page 154 result = self.requests_get(endpoint, combined_params, sudo) 155 156 if not result: 157 return aggregated_result 158 159 if isinstance(result, dict): 160 if "data" in result: 161 data = result["data"] 162 if len(data) == 0: 163 return aggregated_result 164 aggregated_result.extend(data) 165 elif "tree" in result: 166 data = result["tree"] 167 if data is None or len(data) == 0: 168 return aggregated_result 169 aggregated_result.extend(data) 170 else: 171 raise NotImplementedError( 172 "requests_get_paginated does not know how to handle responses of this type." 173 ) 174 else: 175 aggregated_result.extend(result) 176 177 page += 1
179 def requests_put(self, endpoint: str, data: Optional[dict] = None): 180 if not data: 181 data = {} 182 request = self.requests.put( 183 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 184 ) 185 if request.status_code not in [200, 204]: 186 message = f"Received status code: {request.status_code} ({request.url}) {request.text}" 187 self.logger.error(message) 188 raise Exception(message)
190 def requests_delete(self, endpoint: str, data: Optional[dict] = None): 191 request = self.requests.delete( 192 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 193 ) 194 if request.status_code not in [200, 204]: 195 message = f"Received status code: {request.status_code} ({request.url})" 196 self.logger.error(message) 197 raise Exception(message)
199 def requests_post( 200 self, 201 endpoint: str, 202 data: Optional[dict] = None, 203 params: Optional[dict] = None, 204 files: Optional[dict] = None, 205 ): 206 """ 207 Make a POST call to the endpoint. 208 209 :param endpoint: The path to the endpoint 210 :param data: A dictionary for JSON data 211 :param params: A dictionary of query params 212 :param files: A dictionary of files, see requests.post. Using both files and data 213 can lead to unexpected results! 214 :return: The JSON response parsed as a dict 215 """ 216 217 # This should ideally be a TypedDict of the type of arguments taken by 218 # `requests.post`. 219 args: dict[str, Any] = { 220 "headers": self.headers.copy(), 221 } 222 if data is not None: 223 args["data"] = json.dumps(data) 224 if params is not None: 225 args["params"] = params 226 if files is not None: 227 args["headers"].pop("Content-type") 228 args["files"] = files 229 230 request = self.requests.post(self.__get_url(endpoint), **args) 231 232 if request.status_code not in [200, 201, 202]: 233 if "already exists" in request.text or "e-mail already in use" in request.text: 234 self.logger.warning(request.text) 235 raise AlreadyExistsException() 236 self.logger.error(f"Received status code: {request.status_code} ({request.url})") 237 self.logger.error(f"With info: {data} ({self.headers})") 238 self.logger.error(f"Answer: {request.text}") 239 raise Exception( 240 f"Received status code: {request.status_code} ({request.url}), {request.text}" 241 ) 242 return self.parse_result(request)
Make a POST call to the endpoint.
Parameters
- endpoint: The path to the endpoint
- data: A dictionary for JSON data
- params: A dictionary of query params
- files: A dictionary of files, see requests.post. Using both files and data can lead to unexpected results!
Returns
The JSON response parsed as a dict
244 def requests_patch(self, endpoint: str, data: dict): 245 request = self.requests.patch( 246 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 247 ) 248 if request.status_code not in [200, 201]: 249 error_message = f"Received status code: {request.status_code} ({request.url}) {data}" 250 self.logger.error(error_message) 251 raise Exception(error_message) 252 return self.parse_result(request)
294 def create_user( 295 self, 296 user_name: str, 297 email: str, 298 password: str, 299 full_name: Optional[str] = None, 300 login_name: Optional[str] = None, 301 change_pw=True, 302 send_notify=True, 303 source_id=0, 304 ): 305 """Create User. 306 Throws: 307 AlreadyExistsException, if the User exists already 308 Exception, if something else went wrong. 309 """ 310 if not login_name: 311 login_name = user_name 312 if not full_name: 313 full_name = user_name 314 request_data = { 315 "source_id": source_id, 316 "login_name": login_name, 317 "full_name": full_name, 318 "username": user_name, 319 "email": email, 320 "password": password, 321 "send_notify": send_notify, 322 "must_change_password": change_pw, 323 } 324 325 self.logger.debug("Gitea post payload: %s", request_data) 326 result = self.requests_post(AllSpice.ADMIN_CREATE_USER, data=request_data) 327 if "id" in result: 328 self.logger.info( 329 "Successfully created User %s <%s> (id %s)", 330 result["login"], 331 result["email"], 332 result["id"], 333 ) 334 self.logger.debug("Gitea response: %s", result) 335 else: 336 self.logger.error(result["message"]) 337 raise Exception("User not created... (gitea: %s)" % result["message"]) 338 user = User.parse_response(self, result) 339 return user
Create User. Throws: AlreadyExistsException, if the User exists already Exception, if something else went wrong.
341 def create_repo( 342 self, 343 repoOwner: Union[User, Organization], 344 repoName: str, 345 description: str = "", 346 private: bool = False, 347 autoInit=True, 348 gitignores: Optional[str] = None, 349 license: Optional[str] = None, 350 readme: str = "Default", 351 issue_labels: Optional[str] = None, 352 default_branch="master", 353 ): 354 """Create a Repository as the administrator 355 356 Throws: 357 AlreadyExistsException: If the Repository exists already. 358 Exception: If something else went wrong. 359 360 Note: 361 Non-admin users can not use this method. Please use instead 362 `allspice.User.create_repo` or `allspice.Organization.create_repo`. 363 """ 364 # although this only says user in the api, this also works for 365 # organizations 366 assert isinstance(repoOwner, User) or isinstance(repoOwner, Organization) 367 result = self.requests_post( 368 AllSpice.ADMIN_REPO_CREATE % repoOwner.username, 369 data={ 370 "name": repoName, 371 "description": description, 372 "private": private, 373 "auto_init": autoInit, 374 "gitignores": gitignores, 375 "license": license, 376 "issue_labels": issue_labels, 377 "readme": readme, 378 "default_branch": default_branch, 379 }, 380 ) 381 if "id" in result: 382 self.logger.info("Successfully created Repository %s " % result["name"]) 383 else: 384 self.logger.error(result["message"]) 385 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 386 return Repository.parse_response(self, result)
Create a Repository as the administrator
Throws: AlreadyExistsException: If the Repository exists already. Exception: If something else went wrong.
Note:
Non-admin users can not use this method. Please use instead
allspice.User.create_repo or allspice.Organization.create_repo.
388 def create_org( 389 self, 390 owner: User, 391 orgName: str, 392 description: str, 393 location="", 394 website="", 395 full_name="", 396 ): 397 assert isinstance(owner, User) 398 result = self.requests_post( 399 AllSpice.CREATE_ORG % owner.username, 400 data={ 401 "username": orgName, 402 "description": description, 403 "location": location, 404 "website": website, 405 "full_name": full_name, 406 }, 407 ) 408 if "id" in result: 409 self.logger.info("Successfully created Organization %s" % result["username"]) 410 else: 411 self.logger.error("Organization not created... (gitea: %s)" % result["message"]) 412 self.logger.error(result["message"]) 413 raise Exception("Organization not created... (gitea: %s)" % result["message"]) 414 return Organization.parse_response(self, result)
416 def create_team( 417 self, 418 org: Organization, 419 name: str, 420 description: str = "", 421 permission: str = "read", 422 can_create_org_repo: bool = False, 423 includes_all_repositories: bool = False, 424 units=( 425 "repo.code", 426 "repo.issues", 427 "repo.ext_issues", 428 "repo.wiki", 429 "repo.pulls", 430 "repo.releases", 431 "repo.ext_wiki", 432 ), 433 units_map={}, 434 ): 435 """Creates a Team. 436 437 Args: 438 org (Organization): Organization the Team will be part of. 439 name (str): The Name of the Team to be created. 440 description (str): Optional, None, short description of the new Team. 441 permission (str): Optional, 'read', What permissions the members 442 units_map (dict): Optional, {}, a mapping of units to their 443 permissions. If None or empty, the `permission` permission will 444 be applied to all units. Note: When both `units` and `units_map` 445 are given, `units_map` will be preferred. 446 """ 447 448 result = self.requests_post( 449 AllSpice.CREATE_TEAM % org.username, 450 data={ 451 "name": name, 452 "description": description, 453 "permission": permission, 454 "can_create_org_repo": can_create_org_repo, 455 "includes_all_repositories": includes_all_repositories, 456 "units": units, 457 "units_map": units_map, 458 }, 459 ) 460 461 if "id" in result: 462 self.logger.info("Successfully created Team %s" % result["name"]) 463 else: 464 self.logger.error("Team not created... (gitea: %s)" % result["message"]) 465 self.logger.error(result["message"]) 466 raise Exception("Team not created... (gitea: %s)" % result["message"]) 467 api_object = Team.parse_response(self, result) 468 setattr( 469 api_object, "_organization", org 470 ) # fixes strange behaviour of gitea not returning a valid organization here. 471 return api_object
Creates a Team.
Args:
org (Organization): Organization the Team will be part of.
name (str): The Name of the Team to be created.
description (str): Optional, None, short description of the new Team.
permission (str): Optional, 'read', What permissions the members
units_map (dict): Optional, {}, a mapping of units to their
permissions. If None or empty, the permission permission will
be applied to all units. Note: When both units and units_map
are given, units_map will be preferred.
Common base class for all non-exit exceptions.
422class Branch(ReadonlyApiObject): 423 commit: Dict[str, Optional[Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]]] 424 effective_branch_protection_name: str 425 enable_status_check: bool 426 name: str 427 protected: bool 428 required_approvals: int 429 status_check_contexts: List[Any] 430 user_can_merge: bool 431 user_can_push: bool 432 433 API_OBJECT = """/repos/{owner}/{repo}/branches/{branch}""" 434 435 def __init__(self, allspice_client): 436 super().__init__(allspice_client) 437 438 def __eq__(self, other): 439 if not isinstance(other, Branch): 440 return False 441 return self.commit == other.commit and self.name == other.name 442 443 def __hash__(self): 444 return hash(self.commit["id"]) ^ hash(self.name) 445 446 _fields_to_parsers: ClassVar[dict] = { 447 # This is not a commit object 448 # "commit": lambda allspice_client, c: Commit.parse_response(allspice_client, c) 449 } 450 451 @classmethod 452 def request(cls, allspice_client, owner: str, repo: str, branch: str): 453 return cls._request(allspice_client, {"owner": owner, "repo": repo, "branch": branch})
Inherited Members
1595class Comment(ApiObject): 1596 assets: List[Union[Any, Dict[str, Union[int, str]]]] 1597 body: str 1598 created_at: datetime 1599 html_url: str 1600 id: int 1601 issue_url: str 1602 original_author: str 1603 original_author_id: int 1604 pull_request_url: str 1605 updated_at: datetime 1606 user: User 1607 1608 API_OBJECT = """/repos/{owner}/{repo}/issues/comments/{id}""" 1609 GET_ATTACHMENTS_PATH = """/repos/{owner}/{repo}/issues/comments/{id}/assets""" 1610 ATTACHMENT_PATH = """/repos/{owner}/{repo}/issues/comments/{id}/assets/{attachment_id}""" 1611 1612 def __init__(self, allspice_client): 1613 super().__init__(allspice_client) 1614 1615 def __eq__(self, other): 1616 if not isinstance(other, Comment): 1617 return False 1618 return self.repository == other.repository and self.id == other.id 1619 1620 def __hash__(self): 1621 return hash(self.repository) ^ hash(self.id) 1622 1623 @classmethod 1624 def request(cls, allspice_client, owner: str, repo: str, id: str) -> "Comment": 1625 return cls._request(allspice_client, {"owner": owner, "repo": repo, "id": id}) 1626 1627 _fields_to_parsers: ClassVar[dict] = { 1628 "user": lambda allspice_client, r: User.parse_response(allspice_client, r), 1629 "created_at": lambda _, t: Util.convert_time(t), 1630 "updated_at": lambda _, t: Util.convert_time(t), 1631 } 1632 1633 _patchable_fields: ClassVar[set[str]] = {"body"} 1634 1635 @property 1636 def parent_url(self) -> str: 1637 """URL of the parent of this comment (the issue or the pull request)""" 1638 1639 if self.issue_url is not None and self.issue_url != "": 1640 return self.issue_url 1641 else: 1642 return self.pull_request_url 1643 1644 @cached_property 1645 def repository(self) -> Repository: 1646 """The repository this comment was posted on.""" 1647 1648 owner_name, repo_name = self.parent_url.split("/")[-4:-2] 1649 return Repository.request(self.allspice_client, owner_name, repo_name) 1650 1651 def __fields_for_path(self): 1652 return { 1653 "owner": self.repository.owner.username, 1654 "repo": self.repository.name, 1655 "id": self.id, 1656 } 1657 1658 def commit(self): 1659 self._commit(self.__fields_for_path()) 1660 1661 def delete(self): 1662 self.allspice_client.requests_delete(self.API_OBJECT.format(**self.__fields_for_path())) 1663 self.deleted = True 1664 1665 def get_attachments(self) -> List[Attachment]: 1666 """ 1667 Get all attachments on this comment. This returns Attachment objects, which 1668 contain a link to download the attachment. 1669 1670 https://hub.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments 1671 """ 1672 1673 results = self.allspice_client.requests_get( 1674 self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()) 1675 ) 1676 return [Attachment.parse_response(self.allspice_client, result) for result in results] 1677 1678 def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment: 1679 """ 1680 Create an attachment on this comment. 1681 1682 https://hub.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment 1683 1684 :param file: The file to attach. This should be a file-like object. 1685 :param name: The name of the file. If not provided, the name of the file will be 1686 used. 1687 :return: The created attachment. 1688 """ 1689 1690 args: dict[str, Any] = { 1691 "files": {"attachment": file}, 1692 } 1693 if name is not None: 1694 args["params"] = {"name": name} 1695 1696 result = self.allspice_client.requests_post( 1697 self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()), 1698 **args, 1699 ) 1700 return Attachment.parse_response(self.allspice_client, result) 1701 1702 def edit_attachment(self, attachment: Attachment, data: dict) -> Attachment: 1703 """ 1704 Edit an attachment. 1705 1706 The list of params that can be edited is available at 1707 https://hub.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment 1708 1709 :param attachment: The attachment to be edited 1710 :param data: The data parameter should be a dictionary of the fields to edit. 1711 :return: The edited attachment 1712 """ 1713 1714 args = { 1715 **self.__fields_for_path(), 1716 "attachment_id": attachment.id, 1717 } 1718 result = self.allspice_client.requests_patch( 1719 self.ATTACHMENT_PATH.format(**args), 1720 data=data, 1721 ) 1722 return Attachment.parse_response(self.allspice_client, result) 1723 1724 def delete_attachment(self, attachment: Attachment): 1725 """https://hub.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment""" 1726 1727 args = { 1728 **self.__fields_for_path(), 1729 "attachment_id": attachment.id, 1730 } 1731 self.allspice_client.requests_delete(self.ATTACHMENT_PATH.format(**args)) 1732 attachment.deleted = True
1635 @property 1636 def parent_url(self) -> str: 1637 """URL of the parent of this comment (the issue or the pull request)""" 1638 1639 if self.issue_url is not None and self.issue_url != "": 1640 return self.issue_url 1641 else: 1642 return self.pull_request_url
URL of the parent of this comment (the issue or the pull request)
1644 @cached_property 1645 def repository(self) -> Repository: 1646 """The repository this comment was posted on.""" 1647 1648 owner_name, repo_name = self.parent_url.split("/")[-4:-2] 1649 return Repository.request(self.allspice_client, owner_name, repo_name)
The repository this comment was posted on.
1665 def get_attachments(self) -> List[Attachment]: 1666 """ 1667 Get all attachments on this comment. This returns Attachment objects, which 1668 contain a link to download the attachment. 1669 1670 https://hub.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments 1671 """ 1672 1673 results = self.allspice_client.requests_get( 1674 self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()) 1675 ) 1676 return [Attachment.parse_response(self.allspice_client, result) for result in results]
Get all attachments on this comment. This returns Attachment objects, which contain a link to download the attachment.
allspice.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments">https://huballspice.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments
1678 def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment: 1679 """ 1680 Create an attachment on this comment. 1681 1682 https://hub.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment 1683 1684 :param file: The file to attach. This should be a file-like object. 1685 :param name: The name of the file. If not provided, the name of the file will be 1686 used. 1687 :return: The created attachment. 1688 """ 1689 1690 args: dict[str, Any] = { 1691 "files": {"attachment": file}, 1692 } 1693 if name is not None: 1694 args["params"] = {"name": name} 1695 1696 result = self.allspice_client.requests_post( 1697 self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()), 1698 **args, 1699 ) 1700 return Attachment.parse_response(self.allspice_client, result)
Create an attachment on this comment.
allspice.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment
Parameters
- file: The file to attach. This should be a file-like object.
- name: The name of the file. If not provided, the name of the file will be used.
Returns
The created attachment.
1702 def edit_attachment(self, attachment: Attachment, data: dict) -> Attachment: 1703 """ 1704 Edit an attachment. 1705 1706 The list of params that can be edited is available at 1707 https://hub.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment 1708 1709 :param attachment: The attachment to be edited 1710 :param data: The data parameter should be a dictionary of the fields to edit. 1711 :return: The edited attachment 1712 """ 1713 1714 args = { 1715 **self.__fields_for_path(), 1716 "attachment_id": attachment.id, 1717 } 1718 result = self.allspice_client.requests_patch( 1719 self.ATTACHMENT_PATH.format(**args), 1720 data=data, 1721 ) 1722 return Attachment.parse_response(self.allspice_client, result)
Edit an attachment.
The list of params that can be edited is available at allspice.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment
Parameters
- attachment: The attachment to be edited
- data: The data parameter should be a dictionary of the fields to edit.
Returns
The edited attachment
1724 def delete_attachment(self, attachment: Attachment): 1725 """https://hub.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment""" 1726 1727 args = { 1728 **self.__fields_for_path(), 1729 "attachment_id": attachment.id, 1730 } 1731 self.allspice_client.requests_delete(self.ATTACHMENT_PATH.format(**args)) 1732 attachment.deleted = True
allspice.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment
1735class Commit(ReadonlyApiObject): 1736 author: User 1737 commit: Dict[str, Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]] 1738 committer: Dict[str, Union[int, str, bool]] 1739 created: str 1740 files: List[Dict[str, str]] 1741 html_url: str 1742 inner_commit: Dict[str, Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]] 1743 parents: List[Union[Dict[str, str], Any]] 1744 sha: str 1745 stats: Dict[str, int] 1746 url: str 1747 1748 API_OBJECT = """/repos/{owner}/{repo}/commits/{sha}""" 1749 COMMIT_GET_STATUS = """/repos/{owner}/{repo}/commits/{sha}/status""" 1750 COMMIT_GET_STATUSES = """/repos/{owner}/{repo}/commits/{sha}/statuses""" 1751 1752 # Regex to extract owner and repo names from the url property 1753 URL_REGEXP = re.compile(r"/repos/([^/]+)/([^/]+)/git/commits") 1754 1755 def __init__(self, allspice_client): 1756 super().__init__(allspice_client) 1757 1758 _fields_to_parsers: ClassVar[dict] = { 1759 # NOTE: api may return None for commiters that are no allspice users 1760 "author": lambda allspice_client, u: User.parse_response(allspice_client, u) if u else None 1761 } 1762 1763 def __eq__(self, other): 1764 if not isinstance(other, Commit): 1765 return False 1766 return self.sha == other.sha 1767 1768 def __hash__(self): 1769 return hash(self.sha) 1770 1771 @classmethod 1772 def parse_response(cls, allspice_client, result) -> "Commit": 1773 commit_cache = result["commit"] 1774 api_object = cls(allspice_client) 1775 cls._initialize(allspice_client, api_object, result) 1776 # inner_commit for legacy reasons 1777 Commit._add_read_property("inner_commit", commit_cache, api_object) 1778 return api_object 1779 1780 def get_status(self) -> CommitCombinedStatus: 1781 """ 1782 Get a combined status consisting of all statues on this commit. 1783 1784 Note that the returned object is a CommitCombinedStatus object, which 1785 also contains a list of all statuses on the commit. 1786 1787 https://hub.allspice.io/api/swagger#/repository/repoGetCommitStatus 1788 """ 1789 1790 result = self.allspice_client.requests_get( 1791 self.COMMIT_GET_STATUS.format(**self._fields_for_path) 1792 ) 1793 return CommitCombinedStatus.parse_response(self.allspice_client, result) 1794 1795 def get_statuses(self) -> List[CommitStatus]: 1796 """ 1797 Get a list of all statuses on this commit. 1798 1799 https://hub.allspice.io/api/swagger#/repository/repoListCommitStatuses 1800 """ 1801 1802 results = self.allspice_client.requests_get( 1803 self.COMMIT_GET_STATUSES.format(**self._fields_for_path) 1804 ) 1805 return [CommitStatus.parse_response(self.allspice_client, result) for result in results] 1806 1807 @cached_property 1808 def _fields_for_path(self) -> dict[str, str]: 1809 matches = self.URL_REGEXP.search(self.url) 1810 if not matches: 1811 raise ValueError(f"Invalid commit URL: {self.url}") 1812 1813 return { 1814 "owner": matches.group(1), 1815 "repo": matches.group(2), 1816 "sha": self.sha, 1817 }
1771 @classmethod 1772 def parse_response(cls, allspice_client, result) -> "Commit": 1773 commit_cache = result["commit"] 1774 api_object = cls(allspice_client) 1775 cls._initialize(allspice_client, api_object, result) 1776 # inner_commit for legacy reasons 1777 Commit._add_read_property("inner_commit", commit_cache, api_object) 1778 return api_object
1780 def get_status(self) -> CommitCombinedStatus: 1781 """ 1782 Get a combined status consisting of all statues on this commit. 1783 1784 Note that the returned object is a CommitCombinedStatus object, which 1785 also contains a list of all statuses on the commit. 1786 1787 https://hub.allspice.io/api/swagger#/repository/repoGetCommitStatus 1788 """ 1789 1790 result = self.allspice_client.requests_get( 1791 self.COMMIT_GET_STATUS.format(**self._fields_for_path) 1792 ) 1793 return CommitCombinedStatus.parse_response(self.allspice_client, result)
Get a combined status consisting of all statues on this commit.
Note that the returned object is a CommitCombinedStatus object, which also contains a list of all statuses on the commit.
allspice.allspice.io/api/swagger#/repository/repoGetCommitStatus">https://huballspice.allspice.io/api/swagger#/repository/repoGetCommitStatus
1795 def get_statuses(self) -> List[CommitStatus]: 1796 """ 1797 Get a list of all statuses on this commit. 1798 1799 https://hub.allspice.io/api/swagger#/repository/repoListCommitStatuses 1800 """ 1801 1802 results = self.allspice_client.requests_get( 1803 self.COMMIT_GET_STATUSES.format(**self._fields_for_path) 1804 ) 1805 return [CommitStatus.parse_response(self.allspice_client, result) for result in results]
Get a list of all statuses on this commit.
allspice.allspice.io/api/swagger#/repository/repoListCommitStatuses">https://huballspice.allspice.io/api/swagger#/repository/repoListCommitStatuses
Inherited Members
2904class Content(ReadonlyApiObject): 2905 content: Any 2906 download_url: str 2907 encoding: Any 2908 git_url: str 2909 html_url: str 2910 last_commit_sha: str 2911 name: str 2912 path: str 2913 sha: str 2914 size: int 2915 submodule_git_url: Any 2916 target: Any 2917 type: str 2918 url: str 2919 2920 FILE = "file" 2921 2922 def __init__(self, allspice_client): 2923 super().__init__(allspice_client) 2924 2925 def __eq__(self, other): 2926 if not isinstance(other, Content): 2927 return False 2928 2929 return self.sha == other.sha and self.name == other.name 2930 2931 def __hash__(self): 2932 return hash(self.sha) ^ hash(self.name)
Inherited Members
2278class DesignReview(ApiObject): 2279 """ 2280 A Design Review. See 2281 https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest. 2282 2283 Note: The base and head fields are not `Branch` objects - they are plain strings 2284 referring to the branch names. This is because DRs can exist for branches that have 2285 been deleted, which don't have an associated `Branch` object from the API. You can use 2286 the `Repository.get_branch` method to get a `Branch` object for a branch if you know 2287 it exists. 2288 """ 2289 2290 additions: Optional[int] 2291 allow_maintainer_edit: bool 2292 allow_maintainer_edits: Any 2293 assignee: User 2294 assignees: List["User"] 2295 base: str 2296 body: str 2297 changed_files: Optional[int] 2298 closed_at: Optional[str] 2299 comments: int 2300 created_at: str 2301 deletions: Optional[int] 2302 diff_url: str 2303 draft: bool 2304 due_date: Optional[str] 2305 head: str 2306 html_url: str 2307 id: int 2308 is_locked: bool 2309 labels: List[Any] 2310 merge_base: str 2311 merge_commit_sha: Optional[str] 2312 mergeable: bool 2313 merged: bool 2314 merged_at: Optional[str] 2315 merged_by: Any 2316 milestone: Any 2317 number: int 2318 patch_url: str 2319 pin_order: int 2320 repository: Optional["Repository"] 2321 requested_reviewers: Any 2322 requested_reviewers_teams: Any 2323 review_comments: int 2324 state: str 2325 title: str 2326 updated_at: str 2327 url: str 2328 user: User 2329 2330 API_OBJECT = "/repos/{owner}/{repo}/pulls/{index}" 2331 MERGE_DESIGN_REVIEW = "/repos/{owner}/{repo}/pulls/{index}/merge" 2332 GET_REVIEWS = "/repos/{owner}/{repo}/pulls/{index}/reviews" 2333 GET_REVIEW = "/repos/{owner}/{repo}/pulls/{index}/reviews/{review_id}" 2334 GET_COMMENTS = "/repos/{owner}/{repo}/issues/{index}/comments" 2335 2336 OPEN = "open" 2337 CLOSED = "closed" 2338 2339 class MergeType(Enum): 2340 MERGE = "merge" 2341 REBASE = "rebase" 2342 REBASE_MERGE = "rebase-merge" 2343 SQUASH = "squash" 2344 MANUALLY_MERGED = "manually-merged" 2345 2346 def __init__(self, allspice_client): 2347 super().__init__(allspice_client) 2348 2349 def __eq__(self, other): 2350 if not isinstance(other, DesignReview): 2351 return False 2352 return self.repository == other.repository and self.id == other.id 2353 2354 def __hash__(self): 2355 return hash(self.repository) ^ hash(self.id) 2356 2357 @classmethod 2358 def parse_response(cls, allspice_client, result) -> "DesignReview": 2359 api_object = super().parse_response(allspice_client, result) 2360 cls._add_read_property( 2361 "repository", 2362 Repository.parse_response(allspice_client, result["base"]["repo"]), 2363 api_object, 2364 ) 2365 2366 return api_object 2367 2368 @classmethod 2369 def request(cls, allspice_client, owner: str, repo: str, number: str): 2370 """See https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest""" 2371 return cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number}) 2372 2373 _fields_to_parsers: ClassVar[dict] = { 2374 "assignee": lambda allspice_client, u: User.parse_response(allspice_client, u), 2375 "assignees": lambda allspice_client, us: [ 2376 User.parse_response(allspice_client, u) for u in us 2377 ], 2378 "base": lambda _, b: b["ref"], 2379 "head": lambda _, h: h["ref"], 2380 "merged_by": lambda allspice_client, u: User.parse_response(allspice_client, u), 2381 "milestone": lambda allspice_client, m: Milestone.parse_response(allspice_client, m), 2382 "user": lambda allspice_client, u: User.parse_response(allspice_client, u), 2383 } 2384 2385 _patchable_fields: ClassVar[set[str]] = { 2386 "allow_maintainer_edits", 2387 "assignee", 2388 "assignees", 2389 "base", 2390 "body", 2391 "due_date", 2392 "milestone", 2393 "state", 2394 "title", 2395 } 2396 2397 _parsers_to_fields: ClassVar[dict] = { 2398 "assignee": lambda u: u.username, 2399 "assignees": lambda us: [u.username for u in us], 2400 "base": lambda b: b.name if isinstance(b, Branch) else b, 2401 "milestone": lambda m: m.id, 2402 } 2403 2404 def commit(self): 2405 data = self.get_dirty_fields() 2406 if "due_date" in data and data["due_date"] is None: 2407 data["unset_due_date"] = True 2408 2409 args = { 2410 "owner": self.repository.owner.username, 2411 "repo": self.repository.name, 2412 "index": self.number, 2413 } 2414 self._commit(args, data) 2415 2416 def merge(self, merge_type: MergeType): 2417 """ 2418 Merge the pull request. See 2419 https://hub.allspice.io/api/swagger#/repository/repoMergePullRequest 2420 2421 :param merge_type: The type of merge to perform. See the MergeType enum. 2422 """ 2423 2424 self.allspice_client.requests_post( 2425 self.MERGE_DESIGN_REVIEW.format( 2426 owner=self.repository.owner.username, 2427 repo=self.repository.name, 2428 index=self.number, 2429 ), 2430 data={"Do": merge_type.value}, 2431 ) 2432 2433 def get_comments(self) -> List[Comment]: 2434 """ 2435 Get the comments on this pull request, but not specifically on a review. 2436 2437 https://hub.allspice.io/api/swagger#/issue/issueGetComments 2438 2439 :return: A list of comments on this pull request. 2440 """ 2441 2442 results = self.allspice_client.requests_get( 2443 self.GET_COMMENTS.format( 2444 owner=self.repository.owner.username, 2445 repo=self.repository.name, 2446 index=self.number, 2447 ) 2448 ) 2449 return [Comment.parse_response(self.allspice_client, result) for result in results] 2450 2451 def create_comment(self, body: str): 2452 """ 2453 Create a comment on this pull request. This uses the same endpoint as the 2454 comments on issues, and will not be associated with any reviews. 2455 2456 https://hub.allspice.io/api/swagger#/issue/issueCreateComment 2457 2458 :param body: The body of the comment. 2459 :return: The comment that was created. 2460 """ 2461 2462 result = self.allspice_client.requests_post( 2463 self.GET_COMMENTS.format( 2464 owner=self.repository.owner.username, 2465 repo=self.repository.name, 2466 index=self.number, 2467 ), 2468 data={"body": body}, 2469 ) 2470 return Comment.parse_response(self.allspice_client, result) 2471 2472 def create_review( 2473 self, 2474 *, 2475 body: Optional[str] = None, 2476 event: Optional[DesignReviewReview.ReviewEvent] = None, 2477 comments: Optional[List[DesignReviewReview.ReviewComment]] = None, 2478 commit_id: Optional[str] = None, 2479 ) -> DesignReviewReview: 2480 """ 2481 Create a review on this design review. 2482 2483 https://hub.allspice.io/api/swagger#/repository/repoCreatePullRequestReview 2484 2485 Note: in most cases, you should not set the body or event when creating 2486 a review. The event is automatically set to "PENDING" when the review 2487 is created. You should then use `submit_review` to submit the review 2488 with the desired event and body. 2489 2490 :param body: The body of the review. This is the top-level comment on 2491 the review. If not provided, the review will be created with no body. 2492 :param event: The event of the review. This is the overall status of the 2493 review. See the ReviewEvent enum. If not provided, the API will 2494 default to "PENDING". 2495 :param comments: A list of comments on the review. Each comment should 2496 be a ReviewComment object. If not provided, only the base comment 2497 will be created. 2498 :param commit_id: The commit SHA to associate with the review. This is 2499 optional. 2500 """ 2501 2502 data: dict[str, Any] = {} 2503 2504 if body is not None: 2505 data["body"] = body 2506 if event is not None: 2507 data["event"] = event.value 2508 if commit_id is not None: 2509 data["commit_id"] = commit_id 2510 if comments is not None: 2511 data["comments"] = [asdict(comment) for comment in comments] 2512 2513 result = self.allspice_client.requests_post( 2514 self.GET_REVIEWS.format( 2515 owner=self.repository.owner.username, 2516 repo=self.repository.name, 2517 index=self.number, 2518 ), 2519 data=data, 2520 ) 2521 2522 return DesignReviewReview.parse_response(self.allspice_client, result) 2523 2524 def get_reviews(self) -> List[DesignReviewReview]: 2525 """ 2526 Get all reviews on this design review. 2527 2528 https://hub.allspice.io/api/swagger#/repository/repoListPullReviews 2529 """ 2530 2531 results = self.allspice_client.requests_get( 2532 self.GET_REVIEWS.format( 2533 owner=self.repository.owner.username, 2534 repo=self.repository.name, 2535 index=self.number, 2536 ) 2537 ) 2538 2539 return [ 2540 DesignReviewReview.parse_response(self.allspice_client, result) for result in results 2541 ] 2542 2543 def submit_review( 2544 self, 2545 review_id: int, 2546 event: DesignReviewReview.ReviewEvent, 2547 *, 2548 body: Optional[str] = None, 2549 ): 2550 """ 2551 Submit a review on this design review. 2552 2553 https://hub.allspice.io/api/swagger#/repository/repoSubmitPullReview 2554 2555 :param review_id: The ID of the review to submit. 2556 :param event: The event to submit the review with. See the ReviewEvent 2557 enum for the possible values. 2558 :param body: Optional body text for the review submission. 2559 """ 2560 2561 data = { 2562 "event": event.value, 2563 } 2564 if body is not None: 2565 data["body"] = body 2566 2567 result = self.allspice_client.requests_post( 2568 self.GET_REVIEW.format( 2569 owner=self.repository.owner.username, 2570 repo=self.repository.name, 2571 index=self.number, 2572 review_id=review_id, 2573 ), 2574 data=data, 2575 ) 2576 2577 return result
A Design Review. See allspice.allspice.io/api/swagger#/repository/repoGetPullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoGetPullRequest.
Note: The base and head fields are not Branch objects - they are plain strings
referring to the branch names. This is because DRs can exist for branches that have
been deleted, which don't have an associated Branch object from the API. You can use
the Repository.get_branch method to get a Branch object for a branch if you know
it exists.
2357 @classmethod 2358 def parse_response(cls, allspice_client, result) -> "DesignReview": 2359 api_object = super().parse_response(allspice_client, result) 2360 cls._add_read_property( 2361 "repository", 2362 Repository.parse_response(allspice_client, result["base"]["repo"]), 2363 api_object, 2364 ) 2365 2366 return api_object
2368 @classmethod 2369 def request(cls, allspice_client, owner: str, repo: str, number: str): 2370 """See https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest""" 2371 return cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number})
See allspice.allspice.io/api/swagger#/repository/repoGetPullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoGetPullRequest
2404 def commit(self): 2405 data = self.get_dirty_fields() 2406 if "due_date" in data and data["due_date"] is None: 2407 data["unset_due_date"] = True 2408 2409 args = { 2410 "owner": self.repository.owner.username, 2411 "repo": self.repository.name, 2412 "index": self.number, 2413 } 2414 self._commit(args, data)
2416 def merge(self, merge_type: MergeType): 2417 """ 2418 Merge the pull request. See 2419 https://hub.allspice.io/api/swagger#/repository/repoMergePullRequest 2420 2421 :param merge_type: The type of merge to perform. See the MergeType enum. 2422 """ 2423 2424 self.allspice_client.requests_post( 2425 self.MERGE_DESIGN_REVIEW.format( 2426 owner=self.repository.owner.username, 2427 repo=self.repository.name, 2428 index=self.number, 2429 ), 2430 data={"Do": merge_type.value}, 2431 )
Merge the pull request. See allspice.allspice.io/api/swagger#/repository/repoMergePullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoMergePullRequest
Parameters
- merge_type: The type of merge to perform. See the MergeType enum.
2433 def get_comments(self) -> List[Comment]: 2434 """ 2435 Get the comments on this pull request, but not specifically on a review. 2436 2437 https://hub.allspice.io/api/swagger#/issue/issueGetComments 2438 2439 :return: A list of comments on this pull request. 2440 """ 2441 2442 results = self.allspice_client.requests_get( 2443 self.GET_COMMENTS.format( 2444 owner=self.repository.owner.username, 2445 repo=self.repository.name, 2446 index=self.number, 2447 ) 2448 ) 2449 return [Comment.parse_response(self.allspice_client, result) for result in results]
Get the comments on this pull request, but not specifically on a review.
allspice.allspice.io/api/swagger#/issue/issueGetComments">https://huballspice.allspice.io/api/swagger#/issue/issueGetComments
Returns
A list of comments on this pull request.
2451 def create_comment(self, body: str): 2452 """ 2453 Create a comment on this pull request. This uses the same endpoint as the 2454 comments on issues, and will not be associated with any reviews. 2455 2456 https://hub.allspice.io/api/swagger#/issue/issueCreateComment 2457 2458 :param body: The body of the comment. 2459 :return: The comment that was created. 2460 """ 2461 2462 result = self.allspice_client.requests_post( 2463 self.GET_COMMENTS.format( 2464 owner=self.repository.owner.username, 2465 repo=self.repository.name, 2466 index=self.number, 2467 ), 2468 data={"body": body}, 2469 ) 2470 return Comment.parse_response(self.allspice_client, result)
Create a comment on this pull request. This uses the same endpoint as the comments on issues, and will not be associated with any reviews.
allspice.allspice.io/api/swagger#/issue/issueCreateComment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateComment
Parameters
- body: The body of the comment.
Returns
The comment that was created.
2472 def create_review( 2473 self, 2474 *, 2475 body: Optional[str] = None, 2476 event: Optional[DesignReviewReview.ReviewEvent] = None, 2477 comments: Optional[List[DesignReviewReview.ReviewComment]] = None, 2478 commit_id: Optional[str] = None, 2479 ) -> DesignReviewReview: 2480 """ 2481 Create a review on this design review. 2482 2483 https://hub.allspice.io/api/swagger#/repository/repoCreatePullRequestReview 2484 2485 Note: in most cases, you should not set the body or event when creating 2486 a review. The event is automatically set to "PENDING" when the review 2487 is created. You should then use `submit_review` to submit the review 2488 with the desired event and body. 2489 2490 :param body: The body of the review. This is the top-level comment on 2491 the review. If not provided, the review will be created with no body. 2492 :param event: The event of the review. This is the overall status of the 2493 review. See the ReviewEvent enum. If not provided, the API will 2494 default to "PENDING". 2495 :param comments: A list of comments on the review. Each comment should 2496 be a ReviewComment object. If not provided, only the base comment 2497 will be created. 2498 :param commit_id: The commit SHA to associate with the review. This is 2499 optional. 2500 """ 2501 2502 data: dict[str, Any] = {} 2503 2504 if body is not None: 2505 data["body"] = body 2506 if event is not None: 2507 data["event"] = event.value 2508 if commit_id is not None: 2509 data["commit_id"] = commit_id 2510 if comments is not None: 2511 data["comments"] = [asdict(comment) for comment in comments] 2512 2513 result = self.allspice_client.requests_post( 2514 self.GET_REVIEWS.format( 2515 owner=self.repository.owner.username, 2516 repo=self.repository.name, 2517 index=self.number, 2518 ), 2519 data=data, 2520 ) 2521 2522 return DesignReviewReview.parse_response(self.allspice_client, result)
Create a review on this design review.
allspice.allspice.io/api/swagger#/repository/repoCreatePullRequestReview">https://huballspice.allspice.io/api/swagger#/repository/repoCreatePullRequestReview
Note: in most cases, you should not set the body or event when creating
a review. The event is automatically set to "PENDING" when the review
is created. You should then use submit_review to submit the review
with the desired event and body.
Parameters
- body: The body of the review. This is the top-level comment on the review. If not provided, the review will be created with no body.
- event: The event of the review. This is the overall status of the review. See the ReviewEvent enum. If not provided, the API will default to "PENDING".
- comments: A list of comments on the review. Each comment should be a ReviewComment object. If not provided, only the base comment will be created.
- commit_id: The commit SHA to associate with the review. This is optional.
2524 def get_reviews(self) -> List[DesignReviewReview]: 2525 """ 2526 Get all reviews on this design review. 2527 2528 https://hub.allspice.io/api/swagger#/repository/repoListPullReviews 2529 """ 2530 2531 results = self.allspice_client.requests_get( 2532 self.GET_REVIEWS.format( 2533 owner=self.repository.owner.username, 2534 repo=self.repository.name, 2535 index=self.number, 2536 ) 2537 ) 2538 2539 return [ 2540 DesignReviewReview.parse_response(self.allspice_client, result) for result in results 2541 ]
Get all reviews on this design review.
allspice.allspice.io/api/swagger#/repository/repoListPullReviews">https://huballspice.allspice.io/api/swagger#/repository/repoListPullReviews
2543 def submit_review( 2544 self, 2545 review_id: int, 2546 event: DesignReviewReview.ReviewEvent, 2547 *, 2548 body: Optional[str] = None, 2549 ): 2550 """ 2551 Submit a review on this design review. 2552 2553 https://hub.allspice.io/api/swagger#/repository/repoSubmitPullReview 2554 2555 :param review_id: The ID of the review to submit. 2556 :param event: The event to submit the review with. See the ReviewEvent 2557 enum for the possible values. 2558 :param body: Optional body text for the review submission. 2559 """ 2560 2561 data = { 2562 "event": event.value, 2563 } 2564 if body is not None: 2565 data["body"] = body 2566 2567 result = self.allspice_client.requests_post( 2568 self.GET_REVIEW.format( 2569 owner=self.repository.owner.username, 2570 repo=self.repository.name, 2571 index=self.number, 2572 review_id=review_id, 2573 ), 2574 data=data, 2575 ) 2576 2577 return result
Submit a review on this design review.
allspice.allspice.io/api/swagger#/repository/repoSubmitPullReview">https://huballspice.allspice.io/api/swagger#/repository/repoSubmitPullReview
Parameters
- review_id: The ID of the review to submit.
- event: The event to submit the review with. See the ReviewEvent enum for the possible values.
- body: Optional body text for the review submission.
2142class DesignReviewReview(ReadonlyApiObject): 2143 """ 2144 A review on a Design Review. 2145 """ 2146 2147 body: str 2148 comments_count: int 2149 commit_id: str 2150 dismissed: bool 2151 html_url: str 2152 id: int 2153 official: bool 2154 pull_request_url: str 2155 stale: bool 2156 state: ReviewEvent 2157 submitted_at: str 2158 team: Any 2159 updated_at: str 2160 user: User 2161 2162 API_OBJECT = "/repos/{owner}/{repo}/pulls/{index}/reviews/{id}" 2163 GET_COMMENTS = "/repos/{owner}/{repo}/pulls/{index}/reviews/{id}/comments" 2164 2165 class ReviewEvent(Enum): 2166 APPROVED = "APPROVED" 2167 PENDING = "PENDING" 2168 COMMENT = "COMMENT" 2169 REQUEST_CHANGES = "REQUEST_CHANGES" 2170 REQUEST_REVIEW = "REQUEST_REVIEW" 2171 UNKNOWN = "" 2172 2173 @dataclass 2174 class ReviewComment: 2175 """ 2176 Data required to create a review comment on a design review. 2177 2178 :param body: The body of the comment. 2179 :param path: The path of the file to comment on. If you have a 2180 `Content` object, get the path using the `path` property. 2181 :param sub_path: The sub-path of the file to comment on. This is 2182 usually the page ID of the page in the multi-page document. 2183 :param new_position: The line number of the source code file after the 2184 change to add this comment on. Optional, leave unset if this is an ECAD 2185 file or the comment must be on the entire file. 2186 :param old_position: The line number of the source code file before the 2187 change to add this comment on. Optional, leave unset if this is an ECAD 2188 file or the comment must be on the entire file. 2189 """ 2190 2191 body: str 2192 path: str 2193 sub_path: Optional[str] = None 2194 new_position: Optional[int] = None 2195 old_position: Optional[int] = None 2196 2197 def __init__(self, allspice_client): 2198 super().__init__(allspice_client) 2199 2200 _fields_to_parsers: ClassVar[dict] = { 2201 "user": lambda allspice_client, u: User.parse_response(allspice_client, u), 2202 "state": lambda _, s: DesignReviewReview.ReviewEvent(s), 2203 } 2204 2205 def _get_dr_properties(self) -> dict[str, str]: 2206 """ 2207 Get the owner, repo name and design review number from the URL of this 2208 review's DR. 2209 """ 2210 2211 parts = self.pull_request_url.strip("/").split("/") 2212 2213 try: 2214 index = parts[-1] 2215 assert parts[-2] == "pulls" or parts[-2] == "pull", ( 2216 "Expected the second last part of the URL to be 'pulls' or 'pull', " 2217 ) 2218 repo = parts[-3] 2219 owner = parts[-4] 2220 2221 return { 2222 "owner": owner, 2223 "repo": repo, 2224 "index": index, 2225 } 2226 except IndexError: 2227 raise ValueError("Malformed design review URL: {}".format(self.pull_request_url)) 2228 2229 @cached_property 2230 def owner_name(self) -> str: 2231 """ 2232 The owner of the repository this review is on. 2233 """ 2234 2235 return self._get_dr_properties()["owner"] 2236 2237 @cached_property 2238 def repository_name(self) -> str: 2239 """ 2240 The name of the repository this review is on. 2241 """ 2242 2243 return self._get_dr_properties()["repo"] 2244 2245 @cached_property 2246 def index(self) -> str: 2247 """ 2248 The index of the design review this review is on. 2249 """ 2250 2251 return self._get_dr_properties()["index"] 2252 2253 def delete(self): 2254 """ 2255 Delete this review. 2256 """ 2257 2258 self.allspice_client.requests_delete( 2259 self.API_OBJECT.format(**self._get_dr_properties(), id=self.id) 2260 ) 2261 self.deleted = True 2262 2263 def get_comments(self) -> List[DesignReviewReviewComment]: 2264 """ 2265 Get the comments on this review. 2266 """ 2267 2268 result = self.allspice_client.requests_get( 2269 self.GET_COMMENTS.format(**self._get_dr_properties(), id=self.id) 2270 ) 2271 2272 return [ 2273 DesignReviewReviewComment.parse_response(self.allspice_client, comment) 2274 for comment in result 2275 ]
A review on a Design Review.
2229 @cached_property 2230 def owner_name(self) -> str: 2231 """ 2232 The owner of the repository this review is on. 2233 """ 2234 2235 return self._get_dr_properties()["owner"]
The owner of the repository this review is on.
2237 @cached_property 2238 def repository_name(self) -> str: 2239 """ 2240 The name of the repository this review is on. 2241 """ 2242 2243 return self._get_dr_properties()["repo"]
The name of the repository this review is on.
2245 @cached_property 2246 def index(self) -> str: 2247 """ 2248 The index of the design review this review is on. 2249 """ 2250 2251 return self._get_dr_properties()["index"]
The index of the design review this review is on.
2253 def delete(self): 2254 """ 2255 Delete this review. 2256 """ 2257 2258 self.allspice_client.requests_delete( 2259 self.API_OBJECT.format(**self._get_dr_properties(), id=self.id) 2260 ) 2261 self.deleted = True
Delete this review.
2263 def get_comments(self) -> List[DesignReviewReviewComment]: 2264 """ 2265 Get the comments on this review. 2266 """ 2267 2268 result = self.allspice_client.requests_get( 2269 self.GET_COMMENTS.format(**self._get_dr_properties(), id=self.id) 2270 ) 2271 2272 return [ 2273 DesignReviewReviewComment.parse_response(self.allspice_client, comment) 2274 for comment in result 2275 ]
Get the comments on this review.
Inherited Members
2173 @dataclass 2174 class ReviewComment: 2175 """ 2176 Data required to create a review comment on a design review. 2177 2178 :param body: The body of the comment. 2179 :param path: The path of the file to comment on. If you have a 2180 `Content` object, get the path using the `path` property. 2181 :param sub_path: The sub-path of the file to comment on. This is 2182 usually the page ID of the page in the multi-page document. 2183 :param new_position: The line number of the source code file after the 2184 change to add this comment on. Optional, leave unset if this is an ECAD 2185 file or the comment must be on the entire file. 2186 :param old_position: The line number of the source code file before the 2187 change to add this comment on. Optional, leave unset if this is an ECAD 2188 file or the comment must be on the entire file. 2189 """ 2190 2191 body: str 2192 path: str 2193 sub_path: Optional[str] = None 2194 new_position: Optional[int] = None 2195 old_position: Optional[int] = None
Data required to create a review comment on a design review.
Parameters
- body: The body of the comment.
- path: The path of the file to comment on. If you have a
Contentobject, get the path using thepathproperty. - sub_path: The sub-path of the file to comment on. This is usually the page ID of the page in the multi-page document.
- new_position: The line number of the source code file after the change to add this comment on. Optional, leave unset if this is an ECAD file or the comment must be on the entire file.
- old_position: The line number of the source code file before the change to add this comment on. Optional, leave unset if this is an ECAD file or the comment must be on the entire file.
1908class Issue(ApiObject): 1909 """ 1910 An issue on a repository. 1911 1912 Note: `Issue.assets` may not have any entries even if the issue has 1913 attachments. This happens when an issue is fetched via a bulk method like 1914 `Repository.get_issues`. In most cases, prefer using 1915 `Issue.get_attachments` to get the attachments on an issue. 1916 """ 1917 1918 assets: List[Union[Any, "Attachment"]] 1919 assignee: Any 1920 assignees: Any 1921 body: str 1922 closed_at: Any 1923 comments: int 1924 created_at: str 1925 due_date: Any 1926 html_url: str 1927 id: int 1928 is_locked: bool 1929 labels: List[Any] 1930 milestone: Optional["Milestone"] 1931 number: int 1932 original_author: str 1933 original_author_id: int 1934 pin_order: int 1935 pull_request: Any 1936 ref: str 1937 repository: Dict[str, Union[int, str]] 1938 state: str 1939 title: str 1940 updated_at: str 1941 url: str 1942 user: User 1943 1944 API_OBJECT = """/repos/{owner}/{repo}/issues/{index}""" # <owner, repo, index> 1945 GET_TIME = """/repos/%s/%s/issues/%s/times""" # <owner, repo, index> 1946 GET_COMMENTS = """/repos/{owner}/{repo}/issues/{index}/comments""" 1947 CREATE_ISSUE = """/repos/{owner}/{repo}/issues""" 1948 GET_ATTACHMENTS = """/repos/{owner}/{repo}/issues/{index}/assets""" 1949 1950 OPENED = "open" 1951 CLOSED = "closed" 1952 1953 def __init__(self, allspice_client): 1954 super().__init__(allspice_client) 1955 1956 def __eq__(self, other): 1957 if not isinstance(other, Issue): 1958 return False 1959 return self.repository == other.repository and self.id == other.id 1960 1961 def __hash__(self): 1962 return hash(self.repository) ^ hash(self.id) 1963 1964 _fields_to_parsers: ClassVar[dict] = { 1965 "milestone": lambda allspice_client, m: Milestone.parse_response(allspice_client, m), 1966 "user": lambda allspice_client, u: User.parse_response(allspice_client, u), 1967 "assets": lambda allspice_client, assets: [ 1968 Attachment.parse_response(allspice_client, a) for a in assets 1969 ], 1970 "assignee": lambda allspice_client, u: User.parse_response(allspice_client, u), 1971 "assignees": lambda allspice_client, us: [ 1972 User.parse_response(allspice_client, u) for u in us 1973 ], 1974 "state": lambda _, s: Issue.CLOSED if s == "closed" else Issue.OPENED, 1975 } 1976 1977 _parsers_to_fields: ClassVar[dict] = { 1978 "milestone": lambda m: m.id, 1979 } 1980 1981 _patchable_fields: ClassVar[set[str]] = { 1982 "assignee", 1983 "assignees", 1984 "body", 1985 "due_date", 1986 "milestone", 1987 "state", 1988 "title", 1989 } 1990 1991 def commit(self): 1992 args = { 1993 "owner": self.repository.owner.username, 1994 "repo": self.repository.name, 1995 "index": self.number, 1996 } 1997 self._commit(args) 1998 1999 @classmethod 2000 def request(cls, allspice_client, owner: str, repo: str, number: str): 2001 api_object = cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number}) 2002 # The repository in the response is a RepositoryMeta object, so request 2003 # the full repository object and add it to the issue object. 2004 repository = Repository.request(allspice_client, owner, repo) 2005 setattr(api_object, "_repository", repository) 2006 # For legacy reasons 2007 cls._add_read_property("repo", repository, api_object) 2008 return api_object 2009 2010 @classmethod 2011 def create_issue(cls, allspice_client, repo: Repository, title: str, body: str = ""): 2012 args = {"owner": repo.owner.username, "repo": repo.name} 2013 data = {"title": title, "body": body} 2014 result = allspice_client.requests_post(Issue.CREATE_ISSUE.format(**args), data=data) 2015 issue = Issue.parse_response(allspice_client, result) 2016 setattr(issue, "_repository", repo) 2017 cls._add_read_property("repo", repo, issue) 2018 return issue 2019 2020 @property 2021 def owner(self) -> Organization | User: 2022 return self.repository.owner 2023 2024 def get_time_sum(self, user: User) -> int: 2025 results = self.allspice_client.requests_get( 2026 Issue.GET_TIME % (self.owner.username, self.repository.name, self.number) 2027 ) 2028 return sum(result["time"] for result in results if result and result["user_id"] == user.id) 2029 2030 def get_times(self) -> Optional[Dict]: 2031 return self.allspice_client.requests_get( 2032 Issue.GET_TIME % (self.owner.username, self.repository.name, self.number) 2033 ) 2034 2035 def delete_time(self, time_id: str): 2036 path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times/{time_id}" 2037 self.allspice_client.requests_delete(path) 2038 2039 def add_time(self, time: int, created: Optional[str] = None, user_name: Optional[User] = None): 2040 path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times" 2041 self.allspice_client.requests_post( 2042 path, data={"created": created, "time": int(time), "user_name": user_name} 2043 ) 2044 2045 def get_comments(self) -> List[Comment]: 2046 """https://hub.allspice.io/api/swagger#/issue/issueGetComments""" 2047 2048 results = self.allspice_client.requests_get( 2049 self.GET_COMMENTS.format( 2050 owner=self.owner.username, repo=self.repository.name, index=self.number 2051 ) 2052 ) 2053 2054 return [Comment.parse_response(self.allspice_client, result) for result in results] 2055 2056 def create_comment(self, body: str) -> Comment: 2057 """https://hub.allspice.io/api/swagger#/issue/issueCreateComment""" 2058 2059 path = self.GET_COMMENTS.format( 2060 owner=self.owner.username, repo=self.repository.name, index=self.number 2061 ) 2062 2063 response = self.allspice_client.requests_post(path, data={"body": body}) 2064 return Comment.parse_response(self.allspice_client, response) 2065 2066 def get_attachments(self) -> List[Attachment]: 2067 """ 2068 Fetch all attachments on this issue. 2069 2070 Unlike the assets field, this will always fetch all attachments from the 2071 API. 2072 2073 See https://hub.allspice.io/api/swagger#/issue/issueListIssueAttachments 2074 """ 2075 2076 path = self.GET_ATTACHMENTS.format( 2077 owner=self.owner.username, repo=self.repository.name, index=self.number 2078 ) 2079 response = self.allspice_client.requests_get(path) 2080 2081 return [Attachment.parse_response(self.allspice_client, result) for result in response] 2082 2083 def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment: 2084 """ 2085 Create an attachment on this issue. 2086 2087 https://hub.allspice.io/api/swagger#/issue/issueCreateIssueAttachment 2088 2089 :param file: The file to attach. This should be a file-like object. 2090 :param name: The name of the file. If not provided, the name of the file will be 2091 used. 2092 :return: The created attachment. 2093 """ 2094 2095 args: dict[str, Any] = { 2096 "files": {"attachment": file}, 2097 } 2098 if name is not None: 2099 args["params"] = {"name": name} 2100 2101 result = self.allspice_client.requests_post( 2102 self.GET_ATTACHMENTS.format( 2103 owner=self.owner.username, repo=self.repository.name, index=self.number 2104 ), 2105 **args, 2106 ) 2107 2108 return Attachment.parse_response(self.allspice_client, result)
An issue on a repository.
Note: Issue.assets may not have any entries even if the issue has
attachments. This happens when an issue is fetched via a bulk method like
Repository.get_issues. In most cases, prefer using
Issue.get_attachments to get the attachments on an issue.
1999 @classmethod 2000 def request(cls, allspice_client, owner: str, repo: str, number: str): 2001 api_object = cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number}) 2002 # The repository in the response is a RepositoryMeta object, so request 2003 # the full repository object and add it to the issue object. 2004 repository = Repository.request(allspice_client, owner, repo) 2005 setattr(api_object, "_repository", repository) 2006 # For legacy reasons 2007 cls._add_read_property("repo", repository, api_object) 2008 return api_object
2010 @classmethod 2011 def create_issue(cls, allspice_client, repo: Repository, title: str, body: str = ""): 2012 args = {"owner": repo.owner.username, "repo": repo.name} 2013 data = {"title": title, "body": body} 2014 result = allspice_client.requests_post(Issue.CREATE_ISSUE.format(**args), data=data) 2015 issue = Issue.parse_response(allspice_client, result) 2016 setattr(issue, "_repository", repo) 2017 cls._add_read_property("repo", repo, issue) 2018 return issue
2039 def add_time(self, time: int, created: Optional[str] = None, user_name: Optional[User] = None): 2040 path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times" 2041 self.allspice_client.requests_post( 2042 path, data={"created": created, "time": int(time), "user_name": user_name} 2043 )
2045 def get_comments(self) -> List[Comment]: 2046 """https://hub.allspice.io/api/swagger#/issue/issueGetComments""" 2047 2048 results = self.allspice_client.requests_get( 2049 self.GET_COMMENTS.format( 2050 owner=self.owner.username, repo=self.repository.name, index=self.number 2051 ) 2052 ) 2053 2054 return [Comment.parse_response(self.allspice_client, result) for result in results]
allspice.allspice.io/api/swagger#/issue/issueGetComments">https://huballspice.allspice.io/api/swagger#/issue/issueGetComments
2056 def create_comment(self, body: str) -> Comment: 2057 """https://hub.allspice.io/api/swagger#/issue/issueCreateComment""" 2058 2059 path = self.GET_COMMENTS.format( 2060 owner=self.owner.username, repo=self.repository.name, index=self.number 2061 ) 2062 2063 response = self.allspice_client.requests_post(path, data={"body": body}) 2064 return Comment.parse_response(self.allspice_client, response)
allspice.allspice.io/api/swagger#/issue/issueCreateComment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateComment
2066 def get_attachments(self) -> List[Attachment]: 2067 """ 2068 Fetch all attachments on this issue. 2069 2070 Unlike the assets field, this will always fetch all attachments from the 2071 API. 2072 2073 See https://hub.allspice.io/api/swagger#/issue/issueListIssueAttachments 2074 """ 2075 2076 path = self.GET_ATTACHMENTS.format( 2077 owner=self.owner.username, repo=self.repository.name, index=self.number 2078 ) 2079 response = self.allspice_client.requests_get(path) 2080 2081 return [Attachment.parse_response(self.allspice_client, result) for result in response]
Fetch all attachments on this issue.
Unlike the assets field, this will always fetch all attachments from the API.
See allspice.allspice.io/api/swagger#/issue/issueListIssueAttachments">https://huballspice.allspice.io/api/swagger#/issue/issueListIssueAttachments
2083 def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment: 2084 """ 2085 Create an attachment on this issue. 2086 2087 https://hub.allspice.io/api/swagger#/issue/issueCreateIssueAttachment 2088 2089 :param file: The file to attach. This should be a file-like object. 2090 :param name: The name of the file. If not provided, the name of the file will be 2091 used. 2092 :return: The created attachment. 2093 """ 2094 2095 args: dict[str, Any] = { 2096 "files": {"attachment": file}, 2097 } 2098 if name is not None: 2099 args["params"] = {"name": name} 2100 2101 result = self.allspice_client.requests_post( 2102 self.GET_ATTACHMENTS.format( 2103 owner=self.owner.username, repo=self.repository.name, index=self.number 2104 ), 2105 **args, 2106 ) 2107 2108 return Attachment.parse_response(self.allspice_client, result)
Create an attachment on this issue.
allspice.allspice.io/api/swagger#/issue/issueCreateIssueAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateIssueAttachment
Parameters
- file: The file to attach. This should be a file-like object.
- name: The name of the file. If not provided, the name of the file will be used.
Returns
The created attachment.
1479class Milestone(ApiObject): 1480 allow_merge_commits: Any 1481 allow_rebase: Any 1482 allow_rebase_explicit: Any 1483 allow_squash_merge: Any 1484 archived: Any 1485 closed_at: Any 1486 closed_issues: int 1487 created_at: str 1488 default_branch: Any 1489 description: str 1490 due_on: Any 1491 has_issues: Any 1492 has_pull_requests: Any 1493 has_wiki: Any 1494 id: int 1495 ignore_whitespace_conflicts: Any 1496 name: Any 1497 open_issues: int 1498 private: Any 1499 state: str 1500 title: str 1501 updated_at: str 1502 website: Any 1503 1504 API_OBJECT = """/repos/{owner}/{repo}/milestones/{number}""" # <owner, repo> 1505 1506 def __init__(self, allspice_client): 1507 super().__init__(allspice_client) 1508 1509 def __eq__(self, other): 1510 if not isinstance(other, Milestone): 1511 return False 1512 return self.allspice_client == other.allspice_client and self.id == other.id 1513 1514 def __hash__(self): 1515 return hash(self.allspice_client) ^ hash(self.id) 1516 1517 _fields_to_parsers: ClassVar[dict] = { 1518 "closed_at": lambda _, t: Util.convert_time(t), 1519 "due_on": lambda _, t: Util.convert_time(t), 1520 } 1521 1522 _patchable_fields: ClassVar[set[str]] = { 1523 "allow_merge_commits", 1524 "allow_rebase", 1525 "allow_rebase_explicit", 1526 "allow_squash_merge", 1527 "archived", 1528 "default_branch", 1529 "description", 1530 "has_issues", 1531 "has_pull_requests", 1532 "has_wiki", 1533 "ignore_whitespace_conflicts", 1534 "name", 1535 "private", 1536 "website", 1537 } 1538 1539 @classmethod 1540 def request(cls, allspice_client, owner: str, repo: str, number: str): 1541 return cls._request(allspice_client, {"owner": owner, "repo": repo, "number": number})
Common base class for all non-exit exceptions.
36class Organization(ApiObject): 37 """see https://hub.allspice.io/api/swagger#/organization/orgGetAll""" 38 39 active: Optional[bool] 40 avatar_url: str 41 created: Optional[str] 42 description: str 43 email: str 44 followers_count: Optional[int] 45 following_count: Optional[int] 46 full_name: str 47 html_url: Optional[str] 48 id: int 49 is_admin: Optional[bool] 50 language: Optional[str] 51 last_login: Optional[str] 52 location: str 53 login: Optional[str] 54 login_name: Optional[str] 55 name: Optional[str] 56 prohibit_login: Optional[bool] 57 repo_admin_change_team_access: Optional[bool] 58 restricted: Optional[bool] 59 source_id: Optional[int] 60 starred_repos_count: Optional[int] 61 username: str 62 visibility: str 63 website: str 64 65 API_OBJECT = """/orgs/{name}""" # <org> 66 ORG_REPOS_REQUEST = """/orgs/%s/repos""" # <org> 67 ORG_TEAMS_REQUEST = """/orgs/%s/teams""" # <org> 68 ORG_TEAMS_CREATE = """/orgs/%s/teams""" # <org> 69 ORG_GET_MEMBERS = """/orgs/%s/members""" # <org> 70 ORG_IS_MEMBER = """/orgs/%s/members/%s""" # <org>, <username> 71 ORG_HEATMAP = """/users/%s/heatmap""" # <username> 72 73 def __init__(self, allspice_client): 74 super().__init__(allspice_client) 75 76 def __eq__(self, other): 77 if not isinstance(other, Organization): 78 return False 79 return self.allspice_client == other.allspice_client and self.name == other.name 80 81 def __hash__(self): 82 return hash(self.allspice_client) ^ hash(self.name) 83 84 @classmethod 85 def request(cls, allspice_client, name: str) -> Self: 86 return cls._request(allspice_client, {"name": name}) 87 88 @classmethod 89 def parse_response(cls, allspice_client, result) -> "Organization": 90 api_object = super().parse_response(allspice_client, result) 91 # add "name" field to make this behave similar to users for gitea < 1.18 92 # also necessary for repository-owner when org is repo owner 93 if not hasattr(api_object, "name"): 94 Organization._add_read_property("name", result["username"], api_object) 95 return api_object 96 97 _patchable_fields: ClassVar[set[str]] = { 98 "description", 99 "full_name", 100 "location", 101 "visibility", 102 "website", 103 } 104 105 def commit(self): 106 args = {"name": self.name} 107 self._commit(args) 108 109 def create_repo( 110 self, 111 repoName: str, 112 description: str = "", 113 private: bool = False, 114 autoInit=True, 115 gitignores: Optional[str] = None, 116 license: Optional[str] = None, 117 readme: str = "Default", 118 issue_labels: Optional[str] = None, 119 default_branch="master", 120 ): 121 """Create an organization Repository 122 123 Throws: 124 AlreadyExistsException: If the Repository exists already. 125 Exception: If something else went wrong. 126 """ 127 result = self.allspice_client.requests_post( 128 f"/orgs/{self.name}/repos", 129 data={ 130 "name": repoName, 131 "description": description, 132 "private": private, 133 "auto_init": autoInit, 134 "gitignores": gitignores, 135 "license": license, 136 "issue_labels": issue_labels, 137 "readme": readme, 138 "default_branch": default_branch, 139 }, 140 ) 141 if "id" in result: 142 self.allspice_client.logger.info("Successfully created Repository %s " % result["name"]) 143 else: 144 self.allspice_client.logger.error(result["message"]) 145 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 146 return Repository.parse_response(self.allspice_client, result) 147 148 def get_repositories(self) -> List["Repository"]: 149 results = self.allspice_client.requests_get_paginated( 150 Organization.ORG_REPOS_REQUEST % self.username 151 ) 152 return [Repository.parse_response(self.allspice_client, result) for result in results] 153 154 def get_repository(self, name) -> "Repository": 155 repos = self.get_repositories() 156 for repo in repos: 157 if repo.name == name: 158 return repo 159 raise NotFoundException("Repository %s not existent in organization." % name) 160 161 def get_teams(self) -> List["Team"]: 162 results = self.allspice_client.requests_get(Organization.ORG_TEAMS_REQUEST % self.username) 163 teams = [Team.parse_response(self.allspice_client, result) for result in results] 164 # organisation seems to be missing using this request, so we add org manually 165 for t in teams: 166 setattr(t, "_organization", self) 167 return teams 168 169 def get_team(self, name) -> "Team": 170 teams = self.get_teams() 171 for team in teams: 172 if team.name == name: 173 return team 174 raise NotFoundException("Team not existent in organization.") 175 176 def create_team( 177 self, 178 name: str, 179 description: str = "", 180 permission: str = "read", 181 can_create_org_repo: bool = False, 182 includes_all_repositories: bool = False, 183 units=( 184 "repo.code", 185 "repo.issues", 186 "repo.ext_issues", 187 "repo.wiki", 188 "repo.pulls", 189 "repo.releases", 190 "repo.ext_wiki", 191 ), 192 units_map={}, 193 ) -> "Team": 194 """Alias for AllSpice#create_team""" 195 # TODO: Move AllSpice#create_team to Organization#create_team and 196 # deprecate AllSpice#create_team. 197 return self.allspice_client.create_team( 198 org=self, 199 name=name, 200 description=description, 201 permission=permission, 202 can_create_org_repo=can_create_org_repo, 203 includes_all_repositories=includes_all_repositories, 204 units=units, 205 units_map=units_map, 206 ) 207 208 def get_members(self) -> List["User"]: 209 results = self.allspice_client.requests_get(Organization.ORG_GET_MEMBERS % self.username) 210 return [User.parse_response(self.allspice_client, result) for result in results] 211 212 def is_member(self, username) -> bool: 213 if isinstance(username, User): 214 username = username.username 215 try: 216 # returns 204 if its ok, 404 if its not 217 self.allspice_client.requests_get( 218 Organization.ORG_IS_MEMBER % (self.username, username) 219 ) 220 return True 221 except Exception: 222 return False 223 224 def remove_member(self, user: "User"): 225 path = f"/orgs/{self.username}/members/{user.username}" 226 self.allspice_client.requests_delete(path) 227 228 def delete(self): 229 """Delete this Organization. Invalidates this Objects data. Also deletes all Repositories owned by the User""" 230 for repo in self.get_repositories(): 231 repo.delete() 232 self.allspice_client.requests_delete(Organization.API_OBJECT.format(name=self.username)) 233 self.deleted = True 234 235 def get_heatmap(self) -> List[Tuple[datetime, int]]: 236 results = self.allspice_client.requests_get(User.USER_HEATMAP % self.username) 237 results = [ 238 (datetime.fromtimestamp(result["timestamp"]), result["contributions"]) 239 for result in results 240 ] 241 return results
see allspice.allspice.io/api/swagger#/organization/orgGetAll">https://huballspice.allspice.io/api/swagger#/organization/orgGetAll
88 @classmethod 89 def parse_response(cls, allspice_client, result) -> "Organization": 90 api_object = super().parse_response(allspice_client, result) 91 # add "name" field to make this behave similar to users for gitea < 1.18 92 # also necessary for repository-owner when org is repo owner 93 if not hasattr(api_object, "name"): 94 Organization._add_read_property("name", result["username"], api_object) 95 return api_object
109 def create_repo( 110 self, 111 repoName: str, 112 description: str = "", 113 private: bool = False, 114 autoInit=True, 115 gitignores: Optional[str] = None, 116 license: Optional[str] = None, 117 readme: str = "Default", 118 issue_labels: Optional[str] = None, 119 default_branch="master", 120 ): 121 """Create an organization Repository 122 123 Throws: 124 AlreadyExistsException: If the Repository exists already. 125 Exception: If something else went wrong. 126 """ 127 result = self.allspice_client.requests_post( 128 f"/orgs/{self.name}/repos", 129 data={ 130 "name": repoName, 131 "description": description, 132 "private": private, 133 "auto_init": autoInit, 134 "gitignores": gitignores, 135 "license": license, 136 "issue_labels": issue_labels, 137 "readme": readme, 138 "default_branch": default_branch, 139 }, 140 ) 141 if "id" in result: 142 self.allspice_client.logger.info("Successfully created Repository %s " % result["name"]) 143 else: 144 self.allspice_client.logger.error(result["message"]) 145 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 146 return Repository.parse_response(self.allspice_client, result)
Create an organization Repository
Throws: AlreadyExistsException: If the Repository exists already. Exception: If something else went wrong.
161 def get_teams(self) -> List["Team"]: 162 results = self.allspice_client.requests_get(Organization.ORG_TEAMS_REQUEST % self.username) 163 teams = [Team.parse_response(self.allspice_client, result) for result in results] 164 # organisation seems to be missing using this request, so we add org manually 165 for t in teams: 166 setattr(t, "_organization", self) 167 return teams
176 def create_team( 177 self, 178 name: str, 179 description: str = "", 180 permission: str = "read", 181 can_create_org_repo: bool = False, 182 includes_all_repositories: bool = False, 183 units=( 184 "repo.code", 185 "repo.issues", 186 "repo.ext_issues", 187 "repo.wiki", 188 "repo.pulls", 189 "repo.releases", 190 "repo.ext_wiki", 191 ), 192 units_map={}, 193 ) -> "Team": 194 """Alias for AllSpice#create_team""" 195 # TODO: Move AllSpice#create_team to Organization#create_team and 196 # deprecate AllSpice#create_team. 197 return self.allspice_client.create_team( 198 org=self, 199 name=name, 200 description=description, 201 permission=permission, 202 can_create_org_repo=can_create_org_repo, 203 includes_all_repositories=includes_all_repositories, 204 units=units, 205 units_map=units_map, 206 )
Alias for AllSpice#create_team
212 def is_member(self, username) -> bool: 213 if isinstance(username, User): 214 username = username.username 215 try: 216 # returns 204 if its ok, 404 if its not 217 self.allspice_client.requests_get( 218 Organization.ORG_IS_MEMBER % (self.username, username) 219 ) 220 return True 221 except Exception: 222 return False
228 def delete(self): 229 """Delete this Organization. Invalidates this Objects data. Also deletes all Repositories owned by the User""" 230 for repo in self.get_repositories(): 231 repo.delete() 232 self.allspice_client.requests_delete(Organization.API_OBJECT.format(name=self.username)) 233 self.deleted = True
Delete this Organization. Invalidates this Objects data. Also deletes all Repositories owned by the User
2663class Release(ApiObject): 2664 """ 2665 A release on a repo. 2666 """ 2667 2668 assets: List[Union[Any, Dict[str, Union[int, str]], "ReleaseAsset"]] 2669 author: User 2670 body: str 2671 created_at: str 2672 draft: bool 2673 html_url: str 2674 id: int 2675 name: str 2676 prerelease: bool 2677 published_at: str 2678 repo: Optional["Repository"] 2679 repository: Optional["Repository"] 2680 tag_name: str 2681 tarball_url: str 2682 target_commitish: str 2683 upload_url: str 2684 url: str 2685 zipball_url: str 2686 2687 API_OBJECT = "/repos/{owner}/{repo}/releases/{id}" 2688 RELEASE_CREATE_ASSET = "/repos/{owner}/{repo}/releases/{id}/assets" 2689 # Note that we don't strictly need the get_assets route, as the release 2690 # object already contains the assets. 2691 2692 def __init__(self, allspice_client): 2693 super().__init__(allspice_client) 2694 2695 def __eq__(self, other): 2696 if not isinstance(other, Release): 2697 return False 2698 return self.repo == other.repo and self.id == other.id 2699 2700 def __hash__(self): 2701 return hash(self.repo) ^ hash(self.id) 2702 2703 _fields_to_parsers: ClassVar[dict] = { 2704 "author": lambda allspice_client, author: User.parse_response(allspice_client, author), 2705 } 2706 _patchable_fields: ClassVar[set[str]] = { 2707 "body", 2708 "draft", 2709 "name", 2710 "prerelease", 2711 "tag_name", 2712 "target_commitish", 2713 } 2714 2715 @classmethod 2716 def parse_response(cls, allspice_client, result, repo) -> Release: 2717 release = super().parse_response(allspice_client, result) 2718 Release._add_read_property("repository", repo, release) 2719 # For legacy reasons 2720 Release._add_read_property("repo", repo, release) 2721 setattr( 2722 release, 2723 "_assets", 2724 [ 2725 ReleaseAsset.parse_response(allspice_client, asset, release) 2726 for asset in result["assets"] 2727 ], 2728 ) 2729 return release 2730 2731 @classmethod 2732 def request( 2733 cls, 2734 allspice_client, 2735 owner: str, 2736 repo: str, 2737 id: Optional[int] = None, 2738 ) -> Release: 2739 args = {"owner": owner, "repo": repo, "id": id} 2740 release_response = cls._get_gitea_api_object(allspice_client, args) 2741 repository = Repository.request(allspice_client, owner, repo) 2742 release = cls.parse_response(allspice_client, release_response, repository) 2743 return release 2744 2745 def commit(self): 2746 if self.repo is None: 2747 raise ValueError("Cannot commit a release without a repository.") 2748 2749 args = {"owner": self.repo.owner.username, "repo": self.repo.name, "id": self.id} 2750 self._commit(args) 2751 2752 def create_asset(self, file: IO, name: Optional[str] = None) -> ReleaseAsset: 2753 """ 2754 Create an asset for this release. 2755 2756 https://hub.allspice.io/api/swagger#/repository/repoCreateReleaseAsset 2757 2758 :param file: The file to upload. This should be a file-like object. 2759 :param name: The name of the file. 2760 :return: The created asset. 2761 """ 2762 2763 if self.repo is None: 2764 raise ValueError("Cannot commit a release without a repository.") 2765 2766 args: dict[str, Any] = {"files": {"attachment": file}} 2767 if name is not None: 2768 args["params"] = {"name": name} 2769 2770 result = self.allspice_client.requests_post( 2771 self.RELEASE_CREATE_ASSET.format( 2772 owner=self.repo.owner.username, 2773 repo=self.repo.name, 2774 id=self.id, 2775 ), 2776 **args, 2777 ) 2778 return ReleaseAsset.parse_response(self.allspice_client, result, self) 2779 2780 def delete(self): 2781 if self.repo is None: 2782 raise ValueError("Cannot commit a release without a repository.") 2783 2784 args = {"owner": self.repo.owner.username, "repo": self.repo.name, "id": self.id} 2785 self.allspice_client.requests_delete(self.API_OBJECT.format(**args)) 2786 self.deleted = True
A release on a repo.
2715 @classmethod 2716 def parse_response(cls, allspice_client, result, repo) -> Release: 2717 release = super().parse_response(allspice_client, result) 2718 Release._add_read_property("repository", repo, release) 2719 # For legacy reasons 2720 Release._add_read_property("repo", repo, release) 2721 setattr( 2722 release, 2723 "_assets", 2724 [ 2725 ReleaseAsset.parse_response(allspice_client, asset, release) 2726 for asset in result["assets"] 2727 ], 2728 ) 2729 return release
2731 @classmethod 2732 def request( 2733 cls, 2734 allspice_client, 2735 owner: str, 2736 repo: str, 2737 id: Optional[int] = None, 2738 ) -> Release: 2739 args = {"owner": owner, "repo": repo, "id": id} 2740 release_response = cls._get_gitea_api_object(allspice_client, args) 2741 repository = Repository.request(allspice_client, owner, repo) 2742 release = cls.parse_response(allspice_client, release_response, repository) 2743 return release
2752 def create_asset(self, file: IO, name: Optional[str] = None) -> ReleaseAsset: 2753 """ 2754 Create an asset for this release. 2755 2756 https://hub.allspice.io/api/swagger#/repository/repoCreateReleaseAsset 2757 2758 :param file: The file to upload. This should be a file-like object. 2759 :param name: The name of the file. 2760 :return: The created asset. 2761 """ 2762 2763 if self.repo is None: 2764 raise ValueError("Cannot commit a release without a repository.") 2765 2766 args: dict[str, Any] = {"files": {"attachment": file}} 2767 if name is not None: 2768 args["params"] = {"name": name} 2769 2770 result = self.allspice_client.requests_post( 2771 self.RELEASE_CREATE_ASSET.format( 2772 owner=self.repo.owner.username, 2773 repo=self.repo.name, 2774 id=self.id, 2775 ), 2776 **args, 2777 ) 2778 return ReleaseAsset.parse_response(self.allspice_client, result, self)
Create an asset for this release.
allspice.allspice.io/api/swagger#/repository/repoCreateReleaseAsset">https://huballspice.allspice.io/api/swagger#/repository/repoCreateReleaseAsset
Parameters
- file: The file to upload. This should be a file-like object.
- name: The name of the file.
Returns
The created asset.
2780 def delete(self): 2781 if self.repo is None: 2782 raise ValueError("Cannot commit a release without a repository.") 2783 2784 args = {"owner": self.repo.owner.username, "repo": self.repo.name, "id": self.id} 2785 self.allspice_client.requests_delete(self.API_OBJECT.format(**args)) 2786 self.deleted = True
480class Repository(ApiObject): 481 allow_fast_forward_only_merge: bool 482 allow_manual_merge: Any 483 allow_merge_commits: bool 484 allow_rebase: bool 485 allow_rebase_explicit: bool 486 allow_rebase_update: bool 487 allow_squash_merge: bool 488 archived: bool 489 archived_at: str 490 autodetect_manual_merge: Any 491 avatar_url: str 492 clone_url: str 493 created_at: str 494 default_allow_maintainer_edit: bool 495 default_branch: str 496 default_delete_branch_after_merge: bool 497 default_merge_style: str 498 description: str 499 empty: bool 500 enable_prune: Any 501 external_tracker: Any 502 external_wiki: Any 503 fork: bool 504 forks_count: int 505 full_name: str 506 has_actions: bool 507 has_issues: bool 508 has_packages: bool 509 has_projects: bool 510 has_pull_requests: bool 511 has_releases: bool 512 has_wiki: bool 513 html_url: str 514 id: int 515 ignore_whitespace_conflicts: bool 516 internal: bool 517 internal_tracker: Dict[str, bool] 518 language: str 519 languages_url: str 520 licenses: Any 521 link: str 522 mirror: bool 523 mirror_interval: str 524 mirror_updated: str 525 name: str 526 object_format_name: str 527 open_issues_count: int 528 open_pr_counter: int 529 original_url: str 530 owner: Union["User", "Organization"] 531 parent: Any 532 permissions: Dict[str, bool] 533 private: bool 534 projects_mode: str 535 release_counter: int 536 repo_transfer: Any 537 size: int 538 ssh_url: str 539 stars_count: int 540 template: bool 541 topics: List[Union[Any, str]] 542 updated_at: datetime 543 url: str 544 watchers_count: int 545 website: str 546 547 API_OBJECT = """/repos/{owner}/{name}""" # <owner>, <reponame> 548 REPO_IS_COLLABORATOR = """/repos/%s/%s/collaborators/%s""" # <owner>, <reponame>, <username> 549 REPO_SEARCH = """/repos/search/""" 550 REPO_BRANCHES = """/repos/%s/%s/branches""" # <owner>, <reponame> 551 REPO_BRANCH = """/repos/{owner}/{repo}/branches/{branch}""" 552 REPO_ISSUES = """/repos/{owner}/{repo}/issues""" # <owner, reponame> 553 REPO_DESIGN_REVIEWS = """/repos/{owner}/{repo}/pulls""" 554 REPO_DELETE = """/repos/%s/%s""" # <owner>, <reponame> 555 REPO_TIMES = """/repos/%s/%s/times""" # <owner>, <reponame> 556 REPO_USER_TIME = """/repos/%s/%s/times/%s""" # <owner>, <reponame>, <username> 557 REPO_COMMITS = "/repos/%s/%s/commits" # <owner>, <reponame> 558 REPO_TRANSFER = "/repos/{owner}/{repo}/transfer" 559 REPO_MILESTONES = """/repos/{owner}/{repo}/milestones""" 560 REPO_GET_ARCHIVE = "/repos/{owner}/{repo}/archive/{ref}.{format}" 561 REPO_GET_ALLSPICE_JSON = "/repos/{owner}/{repo}/allspice_generated/json/{content}" 562 REPO_GET_ALLSPICE_SVG = "/repos/{owner}/{repo}/allspice_generated/svg/{content}" 563 REPO_GET_ALLSPICE_PROJECT = "/repos/{owner}/{repo}/allspice_generated/project/{content}" 564 REPO_GET_TOPICS = "/repos/{owner}/{repo}/topics" 565 REPO_ADD_TOPIC = "/repos/{owner}/{repo}/topics/{topic}" 566 REPO_GET_RELEASES = "/repos/{owner}/{repo}/releases" 567 REPO_GET_LATEST_RELEASE = "/repos/{owner}/{repo}/releases/latest" 568 REPO_GET_RELEASE_BY_TAG = "/repos/{owner}/{repo}/releases/tags/{tag}" 569 REPO_GET_COMMIT_STATUS = "/repos/{owner}/{repo}/statuses/{sha}" 570 REPO_GET_MEDIA = "/repos/{owner}/{repo}/media/{path}" 571 REPO_GET_TREE = "/repos/{owner}/{repo}/git/trees/{ref}" 572 573 class ArchiveFormat(Enum): 574 """ 575 Archive formats for Repository.get_archive 576 """ 577 578 TAR = "tar.gz" 579 ZIP = "zip" 580 581 class CommitStatusSort(Enum): 582 """ 583 Sort order for Repository.get_commit_status 584 """ 585 586 OLDEST = "oldest" 587 RECENT_UPDATE = "recentupdate" 588 LEAST_UPDATE = "leastupdate" 589 LEAST_INDEX = "leastindex" 590 HIGHEST_INDEX = "highestindex" 591 592 def __init__(self, allspice_client): 593 super().__init__(allspice_client) 594 595 def __eq__(self, other): 596 if not isinstance(other, Repository): 597 return False 598 return self.owner == other.owner and self.name == other.name 599 600 def __hash__(self): 601 return hash(self.owner) ^ hash(self.name) 602 603 _fields_to_parsers: ClassVar[dict] = { 604 # dont know how to tell apart user and org as owner except form email being empty. 605 "owner": lambda allspice_client, r: ( 606 Organization.parse_response(allspice_client, r) 607 if r["email"] == "" 608 else User.parse_response(allspice_client, r) 609 ), 610 "updated_at": lambda _, t: Util.convert_time(t), 611 } 612 613 @classmethod 614 def request( 615 cls, 616 allspice_client, 617 owner: str, 618 name: str, 619 ) -> Repository: 620 return cls._request(allspice_client, {"owner": owner, "name": name}) 621 622 @classmethod 623 def search( 624 cls, 625 allspice_client, 626 query: Optional[str] = None, 627 topic: bool = False, 628 include_description: bool = False, 629 user: Optional[User] = None, 630 owner_to_prioritize: Union[User, Organization, None] = None, 631 ) -> list[Repository]: 632 """ 633 Search for repositories. 634 635 See https://hub.allspice.io/api/swagger#/repository/repoSearch 636 637 :param query: The query string to search for 638 :param topic: If true, the query string will only be matched against the 639 repository's topic. 640 :param include_description: If true, the query string will be matched 641 against the repository's description as well. 642 :param user: If specified, only repositories that this user owns or 643 contributes to will be searched. 644 :param owner_to_prioritize: If specified, repositories owned by the 645 given entity will be prioritized in the search. 646 :returns: All repositories matching the query. If there are many 647 repositories matching this query, this may take some time. 648 """ 649 650 params = {} 651 652 if query is not None: 653 params["q"] = query 654 if topic: 655 params["topic"] = topic 656 if include_description: 657 params["include_description"] = include_description 658 if user is not None: 659 params["user"] = user.id 660 if owner_to_prioritize is not None: 661 params["owner_to_prioritize"] = owner_to_prioritize.id 662 663 responses = allspice_client.requests_get_paginated(cls.REPO_SEARCH, params=params) 664 665 return [Repository.parse_response(allspice_client, response) for response in responses] 666 667 _patchable_fields: ClassVar[set[str]] = { 668 "allow_manual_merge", 669 "allow_merge_commits", 670 "allow_rebase", 671 "allow_rebase_explicit", 672 "allow_rebase_update", 673 "allow_squash_merge", 674 "archived", 675 "autodetect_manual_merge", 676 "default_branch", 677 "default_delete_branch_after_merge", 678 "default_merge_style", 679 "description", 680 "enable_prune", 681 "external_tracker", 682 "external_wiki", 683 "has_actions", 684 "has_issues", 685 "has_projects", 686 "has_pull_requests", 687 "has_wiki", 688 "ignore_whitespace_conflicts", 689 "internal_tracker", 690 "mirror_interval", 691 "name", 692 "private", 693 "template", 694 "website", 695 } 696 697 def commit(self): 698 args = {"owner": self.owner.username, "name": self.name} 699 self._commit(args) 700 701 def get_branches(self) -> List["Branch"]: 702 """Get all the Branches of this Repository.""" 703 704 results = self.allspice_client.requests_get_paginated( 705 Repository.REPO_BRANCHES % (self.owner.username, self.name) 706 ) 707 return [Branch.parse_response(self.allspice_client, result) for result in results] 708 709 def get_branch(self, name: str) -> "Branch": 710 """Get a specific Branch of this Repository.""" 711 result = self.allspice_client.requests_get( 712 Repository.REPO_BRANCH.format(owner=self.owner.username, repo=self.name, branch=name) 713 ) 714 return Branch.parse_response(self.allspice_client, result) 715 716 def add_branch(self, create_from: Ref, newname: str) -> "Branch": 717 """Add a branch to the repository""" 718 # Note: will only work with gitea 1.13 or higher! 719 720 ref_name = Util.data_params_for_ref(create_from) 721 if "ref" not in ref_name: 722 raise ValueError("create_from must be a Branch, Commit or string") 723 ref_name = ref_name["ref"] 724 725 data = {"new_branch_name": newname, "old_ref_name": ref_name} 726 result = self.allspice_client.requests_post( 727 Repository.REPO_BRANCHES % (self.owner.username, self.name), data=data 728 ) 729 return Branch.parse_response(self.allspice_client, result) 730 731 def get_issues( 732 self, 733 state: Literal["open", "closed", "all"] = "all", 734 search_query: Optional[str] = None, 735 labels: Optional[List[str]] = None, 736 milestones: Optional[List[Union[Milestone, str]]] = None, 737 assignee: Optional[Union[User, str]] = None, 738 since: Optional[datetime] = None, 739 before: Optional[datetime] = None, 740 ) -> List["Issue"]: 741 """ 742 Get all Issues of this Repository (open and closed) 743 744 https://hub.allspice.io/api/swagger#/repository/repoListIssues 745 746 All params of this method are optional filters. If you don't specify a filter, it 747 will not be applied. 748 749 :param state: The state of the Issues to get. If None, all Issues are returned. 750 :param search_query: Filter issues by text. This is equivalent to searching for 751 `search_query` in the Issues on the web interface. 752 :param labels: Filter issues by labels. 753 :param milestones: Filter issues by milestones. 754 :param assignee: Filter issues by the assigned user. 755 :param since: Filter issues by the date they were created. 756 :param before: Filter issues by the date they were created. 757 :return: A list of Issues. 758 """ 759 760 data = { 761 "state": state, 762 } 763 if search_query: 764 data["q"] = search_query 765 if labels: 766 data["labels"] = ",".join(labels) 767 if milestones: 768 data["milestone"] = ",".join( 769 [ 770 milestone.name if isinstance(milestone, Milestone) else milestone 771 for milestone in milestones 772 ] 773 ) 774 if assignee: 775 if isinstance(assignee, User): 776 data["assignee"] = assignee.username 777 else: 778 data["assignee"] = assignee 779 if since: 780 data["since"] = Util.format_time(since) 781 if before: 782 data["before"] = Util.format_time(before) 783 784 results = self.allspice_client.requests_get_paginated( 785 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 786 params=data, 787 ) 788 789 issues = [] 790 for result in results: 791 issue = Issue.parse_response(self.allspice_client, result) 792 # See Issue.request 793 setattr(issue, "_repository", self) 794 # This is mostly for compatibility with an older implementation 795 Issue._add_read_property("repo", self, issue) 796 issues.append(issue) 797 798 return issues 799 800 def get_design_reviews( 801 self, 802 state: Literal["open", "closed", "all"] = "all", 803 milestone: Optional[Union[Milestone, str]] = None, 804 labels: Optional[List[str]] = None, 805 ) -> List["DesignReview"]: 806 """ 807 Get all Design Reviews of this Repository. 808 809 https://hub.allspice.io/api/swagger#/repository/repoListPullRequests 810 811 :param state: The state of the Design Reviews to get. If None, all Design Reviews 812 are returned. 813 :param milestone: The milestone of the Design Reviews to get. 814 :param labels: A list of label IDs to filter DRs by. 815 :return: A list of Design Reviews. 816 """ 817 818 params = { 819 "state": state, 820 } 821 if milestone: 822 if isinstance(milestone, Milestone): 823 params["milestone"] = milestone.name 824 else: 825 params["milestone"] = milestone 826 if labels: 827 params["labels"] = ",".join(labels) 828 829 results = self.allspice_client.requests_get_paginated( 830 self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name), 831 params=params, 832 ) 833 return [DesignReview.parse_response(self.allspice_client, result) for result in results] 834 835 def get_commits( 836 self, 837 sha: Optional[str] = None, 838 path: Optional[str] = None, 839 stat: bool = True, 840 ) -> List["Commit"]: 841 """ 842 Get all the Commits of this Repository. 843 844 https://hub.allspice.io/api/swagger#/repository/repoGetAllCommits 845 846 :param sha: The SHA of the commit to start listing commits from. 847 :param path: filepath of a file/dir. 848 :param stat: Include the number of additions and deletions in the response. 849 Disable for speedup. 850 :return: A list of Commits. 851 """ 852 853 data = {} 854 if sha: 855 data["sha"] = sha 856 if path: 857 data["path"] = path 858 if not stat: 859 data["stat"] = False 860 861 try: 862 results = self.allspice_client.requests_get_paginated( 863 Repository.REPO_COMMITS % (self.owner.username, self.name), 864 params=data, 865 ) 866 except ConflictException as err: 867 logging.warning(err) 868 logging.warning("Repository %s/%s is Empty" % (self.owner.username, self.name)) 869 results = [] 870 return [Commit.parse_response(self.allspice_client, result) for result in results] 871 872 def get_issues_state(self, state) -> List["Issue"]: 873 """ 874 DEPRECATED: Use get_issues() instead. 875 876 Get issues of state Issue.open or Issue.closed of a repository. 877 """ 878 879 assert state in [Issue.OPENED, Issue.CLOSED] 880 issues = [] 881 data = {"state": state} 882 results = self.allspice_client.requests_get_paginated( 883 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 884 params=data, 885 ) 886 for result in results: 887 issue = Issue.parse_response(self.allspice_client, result) 888 # adding data not contained in the issue response 889 # See Issue.request() 890 setattr(issue, "_repository", self) 891 Issue._add_read_property("repo", self, issue) 892 Issue._add_read_property("owner", self.owner, issue) 893 issues.append(issue) 894 return issues 895 896 def get_times(self): 897 results = self.allspice_client.requests_get( 898 Repository.REPO_TIMES % (self.owner.username, self.name) 899 ) 900 return results 901 902 def get_user_time(self, username) -> float: 903 if isinstance(username, User): 904 username = username.username 905 results = self.allspice_client.requests_get( 906 Repository.REPO_USER_TIME % (self.owner.username, self.name, username) 907 ) 908 time = sum(r["time"] for r in results) 909 return time 910 911 def get_full_name(self) -> str: 912 return self.owner.username + "/" + self.name 913 914 def create_issue(self, title, assignees=frozenset(), description="") -> ApiObject: 915 data = { 916 "assignees": assignees, 917 "body": description, 918 "closed": False, 919 "title": title, 920 } 921 result = self.allspice_client.requests_post( 922 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 923 data=data, 924 ) 925 926 issue = Issue.parse_response(self.allspice_client, result) 927 setattr(issue, "_repository", self) 928 Issue._add_read_property("repo", self, issue) 929 return issue 930 931 def create_design_review( 932 self, 933 title: str, 934 head: Union[Branch, str], 935 base: Union[Branch, str], 936 assignees: Optional[Set[Union[User, str]]] = None, 937 body: Optional[str] = None, 938 due_date: Optional[datetime] = None, 939 milestone: Optional["Milestone"] = None, 940 ) -> "DesignReview": 941 """ 942 Create a new Design Review. 943 944 See https://hub.allspice.io/api/swagger#/repository/repoCreatePullRequest 945 946 :param title: Title of the Design Review 947 :param head: Branch or name of the branch to merge into the base branch 948 :param base: Branch or name of the branch to merge into 949 :param assignees: Optional. A list of users to assign this review. List can be of 950 User objects or of usernames. 951 :param body: An Optional Description for the Design Review. 952 :param due_date: An Optional Due date for the Design Review. 953 :param milestone: An Optional Milestone for the Design Review 954 :return: The created Design Review 955 """ 956 957 data: dict[str, Any] = { 958 "title": title, 959 } 960 961 if isinstance(head, Branch): 962 data["head"] = head.name 963 else: 964 data["head"] = head 965 if isinstance(base, Branch): 966 data["base"] = base.name 967 else: 968 data["base"] = base 969 if assignees: 970 data["assignees"] = [a.username if isinstance(a, User) else a for a in assignees] 971 if body: 972 data["body"] = body 973 if due_date: 974 data["due_date"] = Util.format_time(due_date) 975 if milestone: 976 data["milestone"] = milestone.id 977 978 result = self.allspice_client.requests_post( 979 self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name), 980 data=data, 981 ) 982 983 return DesignReview.parse_response(self.allspice_client, result) 984 985 def create_milestone( 986 self, 987 title: str, 988 description: str, 989 due_date: Optional[str] = None, 990 state: str = "open", 991 ) -> "Milestone": 992 url = Repository.REPO_MILESTONES.format(owner=self.owner.username, repo=self.name) 993 data = {"title": title, "description": description, "state": state} 994 if due_date: 995 data["due_date"] = due_date 996 result = self.allspice_client.requests_post(url, data=data) 997 return Milestone.parse_response(self.allspice_client, result) 998 999 def create_gitea_hook(self, hook_url: str, events: List[str]): 1000 url = f"/repos/{self.owner.username}/{self.name}/hooks" 1001 data = { 1002 "type": "gitea", 1003 "config": {"content_type": "json", "url": hook_url}, 1004 "events": events, 1005 "active": True, 1006 } 1007 return self.allspice_client.requests_post(url, data=data) 1008 1009 def list_hooks(self): 1010 url = f"/repos/{self.owner.username}/{self.name}/hooks" 1011 return self.allspice_client.requests_get(url) 1012 1013 def delete_hook(self, id: str): 1014 url = f"/repos/{self.owner.username}/{self.name}/hooks/{id}" 1015 self.allspice_client.requests_delete(url) 1016 1017 def is_collaborator(self, username) -> bool: 1018 if isinstance(username, User): 1019 username = username.username 1020 try: 1021 # returns 204 if its ok, 404 if its not 1022 self.allspice_client.requests_get( 1023 Repository.REPO_IS_COLLABORATOR % (self.owner.username, self.name, username) 1024 ) 1025 return True 1026 except Exception: 1027 return False 1028 1029 def get_users_with_access(self) -> Sequence[User]: 1030 url = f"/repos/{self.owner.username}/{self.name}/collaborators" 1031 response = self.allspice_client.requests_get(url) 1032 collabs = [User.parse_response(self.allspice_client, user) for user in response] 1033 if isinstance(self.owner, User): 1034 return [*collabs, self.owner] 1035 else: 1036 # owner must be org 1037 teams = self.owner.get_teams() 1038 for team in teams: 1039 team_repos = team.get_repos() 1040 if self.name in [n.name for n in team_repos]: 1041 collabs += team.get_members() 1042 return collabs 1043 1044 def remove_collaborator(self, user_name: str): 1045 url = f"/repos/{self.owner.username}/{self.name}/collaborators/{user_name}" 1046 self.allspice_client.requests_delete(url) 1047 1048 def transfer_ownership( 1049 self, 1050 new_owner: Union[User, Organization], 1051 new_teams: Set[Team] | FrozenSet[Team] = frozenset(), 1052 ): 1053 url = Repository.REPO_TRANSFER.format(owner=self.owner.username, repo=self.name) 1054 data: dict[str, Any] = {"new_owner": new_owner.username} 1055 if isinstance(new_owner, Organization): 1056 new_team_ids = [team.id for team in new_teams if team in new_owner.get_teams()] 1057 data["team_ids"] = new_team_ids 1058 self.allspice_client.requests_post(url, data=data) 1059 # TODO: make sure this instance is either updated or discarded 1060 1061 def get_git_content( 1062 self, 1063 ref: Optional["Ref"] = None, 1064 commit: "Optional[Commit]" = None, 1065 ) -> List[Content]: 1066 """ 1067 Get the metadata for all files in the root directory. 1068 1069 https://hub.allspice.io/api/swagger#/repository/repoGetContentsList 1070 1071 :param ref: branch or commit to get content from 1072 :param commit: commit to get content from (deprecated) 1073 """ 1074 url = f"/repos/{self.owner.username}/{self.name}/contents" 1075 data = Util.data_params_for_ref(ref or commit) 1076 1077 result = [ 1078 Content.parse_response(self.allspice_client, f) 1079 for f in self.allspice_client.requests_get(url, data) 1080 ] 1081 return result 1082 1083 def get_tree(self, ref: Optional[Ref] = None, recursive: bool = False) -> List[GitEntry]: 1084 """ 1085 Get the repository's tree on a given ref. 1086 1087 By default, this will only return the top-level entries in the tree. If you want 1088 to get the entire tree, set `recursive` to True. 1089 1090 :param ref: The ref to get the tree from. If not provided, the default branch is used. 1091 :param recursive: Whether to get the entire tree or just the top-level entries. 1092 """ 1093 1094 ref = Util.data_params_for_ref(ref).get("ref", self.default_branch) 1095 url = self.REPO_GET_TREE.format(owner=self.owner.username, repo=self.name, ref=ref) 1096 params = {"recursive": recursive} 1097 results = self.allspice_client.requests_get_paginated(url, params=params) 1098 return [GitEntry.parse_response(self.allspice_client, result) for result in results] 1099 1100 def get_file_content( 1101 self, 1102 content: Content, 1103 ref: Optional[Ref] = None, 1104 commit: Optional[Commit] = None, 1105 ) -> Union[str, List["Content"]]: 1106 """https://hub.allspice.io/api/swagger#/repository/repoGetContents""" 1107 url = f"/repos/{self.owner.username}/{self.name}/contents/{content.path}" 1108 data = Util.data_params_for_ref(ref or commit) 1109 1110 if content.type == Content.FILE: 1111 return self.allspice_client.requests_get(url, data)["content"] 1112 else: 1113 return [ 1114 Content.parse_response(self.allspice_client, f) 1115 for f in self.allspice_client.requests_get(url, data) 1116 ] 1117 1118 def get_raw_file( 1119 self, 1120 file_path: str, 1121 ref: Optional[Ref] = None, 1122 ) -> bytes: 1123 """ 1124 Get the raw, binary data of a single file. 1125 1126 Note 1: if the file you are requesting is a text file, you might want to 1127 use .decode() on the result to get a string. For example: 1128 1129 content = repo.get_raw_file("file.txt").decode("utf-8") 1130 1131 Note 2: this method will store the entire file in memory. If you want 1132 to download a large file, you might want to use `download_to_file` 1133 instead. 1134 1135 See https://hub.allspice.io/api/swagger#/repository/repoGetRawFileOrLFS 1136 1137 :param file_path: The path to the file to get. 1138 :param ref: The branch or commit to get the file from. If not provided, 1139 the default branch is used. 1140 """ 1141 1142 url = self.REPO_GET_MEDIA.format( 1143 owner=self.owner.username, 1144 repo=self.name, 1145 path=file_path, 1146 ) 1147 params = Util.data_params_for_ref(ref) 1148 return self.allspice_client.requests_get_raw(url, params=params) 1149 1150 def download_to_file( 1151 self, 1152 file_path: str, 1153 io: IO, 1154 ref: Optional[Ref] = None, 1155 ) -> None: 1156 """ 1157 Download the binary data of a file to a file-like object. 1158 1159 Example: 1160 1161 with open("schematic.DSN", "wb") as f: 1162 Repository.download_to_file("Schematics/my_schematic.DSN", f) 1163 1164 :param file_path: The path to the file in the repository from the root 1165 of the repository. 1166 :param io: The file-like object to write the data to. 1167 """ 1168 1169 url = self.allspice_client._AllSpice__get_url( 1170 self.REPO_GET_MEDIA.format( 1171 owner=self.owner.username, 1172 repo=self.name, 1173 path=file_path, 1174 ) 1175 ) 1176 params = Util.data_params_for_ref(ref) 1177 response = self.allspice_client.requests.get( 1178 url, 1179 params=params, 1180 headers=self.allspice_client.headers, 1181 stream=True, 1182 ) 1183 1184 for chunk in response.iter_content(chunk_size=4096): 1185 if chunk: 1186 io.write(chunk) 1187 1188 def get_generated_json(self, content: Union[Content, str], ref: Optional[Ref] = None) -> dict: 1189 """ 1190 Get the json blob for a cad file if it exists, otherwise enqueue 1191 a new job and return a 503 status. 1192 1193 WARNING: This is still experimental and not recommended for critical 1194 applications. The structure and content of the returned dictionary can 1195 change at any time. 1196 1197 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON 1198 """ 1199 1200 if isinstance(content, Content): 1201 content = content.path 1202 1203 url = self.REPO_GET_ALLSPICE_JSON.format( 1204 owner=self.owner.username, 1205 repo=self.name, 1206 content=content, 1207 ) 1208 data = Util.data_params_for_ref(ref) 1209 return self.allspice_client.requests_get(url, data) 1210 1211 def get_generated_svg(self, content: Union[Content, str], ref: Optional[Ref] = None) -> bytes: 1212 """ 1213 Get the svg blob for a cad file if it exists, otherwise enqueue 1214 a new job and return a 503 status. 1215 1216 WARNING: This is still experimental and not yet recommended for 1217 critical applications. The content of the returned svg can change 1218 at any time. 1219 1220 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG 1221 """ 1222 1223 if isinstance(content, Content): 1224 content = content.path 1225 1226 url = self.REPO_GET_ALLSPICE_SVG.format( 1227 owner=self.owner.username, 1228 repo=self.name, 1229 content=content, 1230 ) 1231 data = Util.data_params_for_ref(ref) 1232 return self.allspice_client.requests_get_raw(url, data) 1233 1234 def get_generated_projectdata( 1235 self, content: Union[Content, str], ref: Optional[Ref] = None 1236 ) -> dict: 1237 """ 1238 Get the json project data based on the cad file provided 1239 1240 WARNING: This is still experimental and not yet recommended for 1241 critical applications. The content of the returned dictionary can change 1242 at any time. 1243 1244 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceProject 1245 """ 1246 if isinstance(content, Content): 1247 content = content.path 1248 1249 url = self.REPO_GET_ALLSPICE_PROJECT.format( 1250 owner=self.owner.username, 1251 repo=self.name, 1252 content=content, 1253 ) 1254 data = Util.data_params_for_ref(ref) 1255 return self.allspice_client.requests_get(url, data) 1256 1257 def create_file(self, file_path: str, content: str, data: Optional[dict] = None): 1258 """https://hub.allspice.io/api/swagger#/repository/repoCreateFile""" 1259 if not data: 1260 data = {} 1261 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1262 data.update({"content": content}) 1263 return self.allspice_client.requests_post(url, data) 1264 1265 def change_file(self, file_path: str, file_sha: str, content: str, data: Optional[dict] = None): 1266 """https://hub.allspice.io/api/swagger#/repository/repoCreateFile""" 1267 if not data: 1268 data = {} 1269 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1270 data.update({"sha": file_sha, "content": content}) 1271 return self.allspice_client.requests_put(url, data) 1272 1273 def delete_file(self, file_path: str, file_sha: str, data: Optional[dict] = None): 1274 """https://hub.allspice.io/api/swagger#/repository/repoDeleteFile""" 1275 if not data: 1276 data = {} 1277 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1278 data.update({"sha": file_sha}) 1279 return self.allspice_client.requests_delete(url, data) 1280 1281 def get_archive( 1282 self, 1283 ref: Ref = "main", 1284 archive_format: ArchiveFormat = ArchiveFormat.ZIP, 1285 ) -> bytes: 1286 """ 1287 Download all the files in a specific ref of a repository as a zip or tarball 1288 archive. 1289 1290 https://hub.allspice.io/api/swagger#/repository/repoGetArchive 1291 1292 :param ref: branch or commit to get content from, defaults to the "main" branch 1293 :param archive_format: zip or tar, defaults to zip 1294 """ 1295 1296 ref_string = Util.data_params_for_ref(ref)["ref"] 1297 url = self.REPO_GET_ARCHIVE.format( 1298 owner=self.owner.username, 1299 repo=self.name, 1300 ref=ref_string, 1301 format=archive_format.value, 1302 ) 1303 return self.allspice_client.requests_get_raw(url) 1304 1305 def get_topics(self) -> list[str]: 1306 """ 1307 Gets the list of topics on this repository. 1308 1309 See http://localhost:3000/api/swagger#/repository/repoListTopics 1310 """ 1311 1312 url = self.REPO_GET_TOPICS.format( 1313 owner=self.owner.username, 1314 repo=self.name, 1315 ) 1316 return self.allspice_client.requests_get(url)["topics"] 1317 1318 def add_topic(self, topic: str): 1319 """ 1320 Adds a topic to the repository. 1321 1322 See https://hub.allspice.io/api/swagger#/repository/repoAddTopic 1323 1324 :param topic: The topic to add. Topic names must consist only of 1325 lowercase letters, numnbers and dashes (-), and cannot start with 1326 dashes. Topic names also must be under 35 characters long. 1327 """ 1328 1329 url = self.REPO_ADD_TOPIC.format(owner=self.owner.username, repo=self.name, topic=topic) 1330 self.allspice_client.requests_put(url) 1331 1332 def create_release( 1333 self, 1334 tag_name: str, 1335 name: Optional[str] = None, 1336 body: Optional[str] = None, 1337 draft: bool = False, 1338 ): 1339 """ 1340 Create a release for this repository. The release will be created for 1341 the tag with the given name. If there is no tag with this name, create 1342 the tag first. 1343 1344 See https://hub.allspice.io/api/swagger#/repository/repoCreateRelease 1345 """ 1346 1347 url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name) 1348 data = { 1349 "tag_name": tag_name, 1350 "draft": draft, 1351 } 1352 if name is not None: 1353 data["name"] = name 1354 if body is not None: 1355 data["body"] = body 1356 response = self.allspice_client.requests_post(url, data) 1357 return Release.parse_response(self.allspice_client, response, self) 1358 1359 def get_releases( 1360 self, draft: Optional[bool] = None, pre_release: Optional[bool] = None 1361 ) -> List[Release]: 1362 """ 1363 Get the list of releases for this repository. 1364 1365 See https://hub.allspice.io/api/swagger#/repository/repoListReleases 1366 """ 1367 1368 data = {} 1369 1370 if draft is not None: 1371 data["draft"] = draft 1372 if pre_release is not None: 1373 data["pre-release"] = pre_release 1374 1375 url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name) 1376 responses = self.allspice_client.requests_get_paginated(url, params=data) 1377 1378 return [ 1379 Release.parse_response(self.allspice_client, response, self) for response in responses 1380 ] 1381 1382 def get_latest_release(self) -> Release: 1383 """ 1384 Get the latest release for this repository. 1385 1386 See https://hub.allspice.io/api/swagger#/repository/repoGetLatestRelease 1387 """ 1388 1389 url = self.REPO_GET_LATEST_RELEASE.format(owner=self.owner.username, repo=self.name) 1390 response = self.allspice_client.requests_get(url) 1391 release = Release.parse_response(self.allspice_client, response, self) 1392 return release 1393 1394 def get_release_by_tag(self, tag: str) -> Release: 1395 """ 1396 Get a release by its tag. 1397 1398 See https://hub.allspice.io/api/swagger#/repository/repoGetReleaseByTag 1399 """ 1400 1401 url = self.REPO_GET_RELEASE_BY_TAG.format( 1402 owner=self.owner.username, repo=self.name, tag=tag 1403 ) 1404 response = self.allspice_client.requests_get(url) 1405 release = Release.parse_response(self.allspice_client, response, self) 1406 return release 1407 1408 def get_commit_statuses( 1409 self, 1410 commit: Union[str, Commit], 1411 sort: Optional[CommitStatusSort] = None, 1412 state: Optional[CommitStatusState] = None, 1413 ) -> List[CommitStatus]: 1414 """ 1415 Get a list of statuses for a commit. 1416 1417 This is roughly equivalent to the Commit.get_statuses method, but this 1418 method allows you to sort and filter commits and is more convenient if 1419 you have a commit SHA and don't need to get the commit itself. 1420 1421 See https://hub.allspice.io/api/swagger#/repository/repoListStatuses 1422 """ 1423 1424 if isinstance(commit, Commit): 1425 commit = commit.sha 1426 1427 params = {} 1428 if sort is not None: 1429 params["sort"] = sort.value 1430 if state is not None: 1431 params["state"] = state.value 1432 1433 url = self.REPO_GET_COMMIT_STATUS.format( 1434 owner=self.owner.username, repo=self.name, sha=commit 1435 ) 1436 response = self.allspice_client.requests_get_paginated(url, params=params) 1437 return [CommitStatus.parse_response(self.allspice_client, status) for status in response] 1438 1439 def create_commit_status( 1440 self, 1441 commit: Union[str, Commit], 1442 context: Optional[str] = None, 1443 description: Optional[str] = None, 1444 state: Optional[CommitStatusState] = None, 1445 target_url: Optional[str] = None, 1446 ) -> CommitStatus: 1447 """ 1448 Create a status on a commit. 1449 1450 See https://hub.allspice.io/api/swagger#/repository/repoCreateStatus 1451 """ 1452 1453 if isinstance(commit, Commit): 1454 commit = commit.sha 1455 1456 data = {} 1457 if context is not None: 1458 data["context"] = context 1459 if description is not None: 1460 data["description"] = description 1461 if state is not None: 1462 data["state"] = state.value 1463 if target_url is not None: 1464 data["target_url"] = target_url 1465 1466 url = self.REPO_GET_COMMIT_STATUS.format( 1467 owner=self.owner.username, repo=self.name, sha=commit 1468 ) 1469 response = self.allspice_client.requests_post(url, data=data) 1470 return CommitStatus.parse_response(self.allspice_client, response) 1471 1472 def delete(self): 1473 self.allspice_client.requests_delete( 1474 Repository.REPO_DELETE % (self.owner.username, self.name) 1475 ) 1476 self.deleted = True
622 @classmethod 623 def search( 624 cls, 625 allspice_client, 626 query: Optional[str] = None, 627 topic: bool = False, 628 include_description: bool = False, 629 user: Optional[User] = None, 630 owner_to_prioritize: Union[User, Organization, None] = None, 631 ) -> list[Repository]: 632 """ 633 Search for repositories. 634 635 See https://hub.allspice.io/api/swagger#/repository/repoSearch 636 637 :param query: The query string to search for 638 :param topic: If true, the query string will only be matched against the 639 repository's topic. 640 :param include_description: If true, the query string will be matched 641 against the repository's description as well. 642 :param user: If specified, only repositories that this user owns or 643 contributes to will be searched. 644 :param owner_to_prioritize: If specified, repositories owned by the 645 given entity will be prioritized in the search. 646 :returns: All repositories matching the query. If there are many 647 repositories matching this query, this may take some time. 648 """ 649 650 params = {} 651 652 if query is not None: 653 params["q"] = query 654 if topic: 655 params["topic"] = topic 656 if include_description: 657 params["include_description"] = include_description 658 if user is not None: 659 params["user"] = user.id 660 if owner_to_prioritize is not None: 661 params["owner_to_prioritize"] = owner_to_prioritize.id 662 663 responses = allspice_client.requests_get_paginated(cls.REPO_SEARCH, params=params) 664 665 return [Repository.parse_response(allspice_client, response) for response in responses]
Search for repositories.
See allspice.allspice.io/api/swagger#/repository/repoSearch">https://huballspice.allspice.io/api/swagger#/repository/repoSearch
Parameters
- query: The query string to search for
- topic: If true, the query string will only be matched against the repository's topic.
- include_description: If true, the query string will be matched against the repository's description as well.
- user: If specified, only repositories that this user owns or contributes to will be searched.
- owner_to_prioritize: If specified, repositories owned by the given entity will be prioritized in the search. :returns: All repositories matching the query. If there are many repositories matching this query, this may take some time.
701 def get_branches(self) -> List["Branch"]: 702 """Get all the Branches of this Repository.""" 703 704 results = self.allspice_client.requests_get_paginated( 705 Repository.REPO_BRANCHES % (self.owner.username, self.name) 706 ) 707 return [Branch.parse_response(self.allspice_client, result) for result in results]
Get all the Branches of this Repository.
709 def get_branch(self, name: str) -> "Branch": 710 """Get a specific Branch of this Repository.""" 711 result = self.allspice_client.requests_get( 712 Repository.REPO_BRANCH.format(owner=self.owner.username, repo=self.name, branch=name) 713 ) 714 return Branch.parse_response(self.allspice_client, result)
Get a specific Branch of this Repository.
716 def add_branch(self, create_from: Ref, newname: str) -> "Branch": 717 """Add a branch to the repository""" 718 # Note: will only work with gitea 1.13 or higher! 719 720 ref_name = Util.data_params_for_ref(create_from) 721 if "ref" not in ref_name: 722 raise ValueError("create_from must be a Branch, Commit or string") 723 ref_name = ref_name["ref"] 724 725 data = {"new_branch_name": newname, "old_ref_name": ref_name} 726 result = self.allspice_client.requests_post( 727 Repository.REPO_BRANCHES % (self.owner.username, self.name), data=data 728 ) 729 return Branch.parse_response(self.allspice_client, result)
Add a branch to the repository
731 def get_issues( 732 self, 733 state: Literal["open", "closed", "all"] = "all", 734 search_query: Optional[str] = None, 735 labels: Optional[List[str]] = None, 736 milestones: Optional[List[Union[Milestone, str]]] = None, 737 assignee: Optional[Union[User, str]] = None, 738 since: Optional[datetime] = None, 739 before: Optional[datetime] = None, 740 ) -> List["Issue"]: 741 """ 742 Get all Issues of this Repository (open and closed) 743 744 https://hub.allspice.io/api/swagger#/repository/repoListIssues 745 746 All params of this method are optional filters. If you don't specify a filter, it 747 will not be applied. 748 749 :param state: The state of the Issues to get. If None, all Issues are returned. 750 :param search_query: Filter issues by text. This is equivalent to searching for 751 `search_query` in the Issues on the web interface. 752 :param labels: Filter issues by labels. 753 :param milestones: Filter issues by milestones. 754 :param assignee: Filter issues by the assigned user. 755 :param since: Filter issues by the date they were created. 756 :param before: Filter issues by the date they were created. 757 :return: A list of Issues. 758 """ 759 760 data = { 761 "state": state, 762 } 763 if search_query: 764 data["q"] = search_query 765 if labels: 766 data["labels"] = ",".join(labels) 767 if milestones: 768 data["milestone"] = ",".join( 769 [ 770 milestone.name if isinstance(milestone, Milestone) else milestone 771 for milestone in milestones 772 ] 773 ) 774 if assignee: 775 if isinstance(assignee, User): 776 data["assignee"] = assignee.username 777 else: 778 data["assignee"] = assignee 779 if since: 780 data["since"] = Util.format_time(since) 781 if before: 782 data["before"] = Util.format_time(before) 783 784 results = self.allspice_client.requests_get_paginated( 785 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 786 params=data, 787 ) 788 789 issues = [] 790 for result in results: 791 issue = Issue.parse_response(self.allspice_client, result) 792 # See Issue.request 793 setattr(issue, "_repository", self) 794 # This is mostly for compatibility with an older implementation 795 Issue._add_read_property("repo", self, issue) 796 issues.append(issue) 797 798 return issues
Get all Issues of this Repository (open and closed)
allspice.allspice.io/api/swagger#/repository/repoListIssues">https://huballspice.allspice.io/api/swagger#/repository/repoListIssues
All params of this method are optional filters. If you don't specify a filter, it will not be applied.
Parameters
- state: The state of the Issues to get. If None, all Issues are returned.
- search_query: Filter issues by text. This is equivalent to searching for
search_queryin the Issues on the web interface. - labels: Filter issues by labels.
- milestones: Filter issues by milestones.
- assignee: Filter issues by the assigned user.
- since: Filter issues by the date they were created.
- before: Filter issues by the date they were created.
Returns
A list of Issues.
800 def get_design_reviews( 801 self, 802 state: Literal["open", "closed", "all"] = "all", 803 milestone: Optional[Union[Milestone, str]] = None, 804 labels: Optional[List[str]] = None, 805 ) -> List["DesignReview"]: 806 """ 807 Get all Design Reviews of this Repository. 808 809 https://hub.allspice.io/api/swagger#/repository/repoListPullRequests 810 811 :param state: The state of the Design Reviews to get. If None, all Design Reviews 812 are returned. 813 :param milestone: The milestone of the Design Reviews to get. 814 :param labels: A list of label IDs to filter DRs by. 815 :return: A list of Design Reviews. 816 """ 817 818 params = { 819 "state": state, 820 } 821 if milestone: 822 if isinstance(milestone, Milestone): 823 params["milestone"] = milestone.name 824 else: 825 params["milestone"] = milestone 826 if labels: 827 params["labels"] = ",".join(labels) 828 829 results = self.allspice_client.requests_get_paginated( 830 self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name), 831 params=params, 832 ) 833 return [DesignReview.parse_response(self.allspice_client, result) for result in results]
Get all Design Reviews of this Repository.
allspice.allspice.io/api/swagger#/repository/repoListPullRequests">https://huballspice.allspice.io/api/swagger#/repository/repoListPullRequests
Parameters
- state: The state of the Design Reviews to get. If None, all Design Reviews are returned.
- milestone: The milestone of the Design Reviews to get.
- labels: A list of label IDs to filter DRs by.
Returns
A list of Design Reviews.
835 def get_commits( 836 self, 837 sha: Optional[str] = None, 838 path: Optional[str] = None, 839 stat: bool = True, 840 ) -> List["Commit"]: 841 """ 842 Get all the Commits of this Repository. 843 844 https://hub.allspice.io/api/swagger#/repository/repoGetAllCommits 845 846 :param sha: The SHA of the commit to start listing commits from. 847 :param path: filepath of a file/dir. 848 :param stat: Include the number of additions and deletions in the response. 849 Disable for speedup. 850 :return: A list of Commits. 851 """ 852 853 data = {} 854 if sha: 855 data["sha"] = sha 856 if path: 857 data["path"] = path 858 if not stat: 859 data["stat"] = False 860 861 try: 862 results = self.allspice_client.requests_get_paginated( 863 Repository.REPO_COMMITS % (self.owner.username, self.name), 864 params=data, 865 ) 866 except ConflictException as err: 867 logging.warning(err) 868 logging.warning("Repository %s/%s is Empty" % (self.owner.username, self.name)) 869 results = [] 870 return [Commit.parse_response(self.allspice_client, result) for result in results]
Get all the Commits of this Repository.
allspice.allspice.io/api/swagger#/repository/repoGetAllCommits">https://huballspice.allspice.io/api/swagger#/repository/repoGetAllCommits
Parameters
- sha: The SHA of the commit to start listing commits from.
- path: filepath of a file/dir.
- stat: Include the number of additions and deletions in the response. Disable for speedup.
Returns
A list of Commits.
872 def get_issues_state(self, state) -> List["Issue"]: 873 """ 874 DEPRECATED: Use get_issues() instead. 875 876 Get issues of state Issue.open or Issue.closed of a repository. 877 """ 878 879 assert state in [Issue.OPENED, Issue.CLOSED] 880 issues = [] 881 data = {"state": state} 882 results = self.allspice_client.requests_get_paginated( 883 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 884 params=data, 885 ) 886 for result in results: 887 issue = Issue.parse_response(self.allspice_client, result) 888 # adding data not contained in the issue response 889 # See Issue.request() 890 setattr(issue, "_repository", self) 891 Issue._add_read_property("repo", self, issue) 892 Issue._add_read_property("owner", self.owner, issue) 893 issues.append(issue) 894 return issues
DEPRECATED: Use get_issues() instead.
Get issues of state Issue.open or Issue.closed of a repository.
902 def get_user_time(self, username) -> float: 903 if isinstance(username, User): 904 username = username.username 905 results = self.allspice_client.requests_get( 906 Repository.REPO_USER_TIME % (self.owner.username, self.name, username) 907 ) 908 time = sum(r["time"] for r in results) 909 return time
914 def create_issue(self, title, assignees=frozenset(), description="") -> ApiObject: 915 data = { 916 "assignees": assignees, 917 "body": description, 918 "closed": False, 919 "title": title, 920 } 921 result = self.allspice_client.requests_post( 922 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 923 data=data, 924 ) 925 926 issue = Issue.parse_response(self.allspice_client, result) 927 setattr(issue, "_repository", self) 928 Issue._add_read_property("repo", self, issue) 929 return issue
931 def create_design_review( 932 self, 933 title: str, 934 head: Union[Branch, str], 935 base: Union[Branch, str], 936 assignees: Optional[Set[Union[User, str]]] = None, 937 body: Optional[str] = None, 938 due_date: Optional[datetime] = None, 939 milestone: Optional["Milestone"] = None, 940 ) -> "DesignReview": 941 """ 942 Create a new Design Review. 943 944 See https://hub.allspice.io/api/swagger#/repository/repoCreatePullRequest 945 946 :param title: Title of the Design Review 947 :param head: Branch or name of the branch to merge into the base branch 948 :param base: Branch or name of the branch to merge into 949 :param assignees: Optional. A list of users to assign this review. List can be of 950 User objects or of usernames. 951 :param body: An Optional Description for the Design Review. 952 :param due_date: An Optional Due date for the Design Review. 953 :param milestone: An Optional Milestone for the Design Review 954 :return: The created Design Review 955 """ 956 957 data: dict[str, Any] = { 958 "title": title, 959 } 960 961 if isinstance(head, Branch): 962 data["head"] = head.name 963 else: 964 data["head"] = head 965 if isinstance(base, Branch): 966 data["base"] = base.name 967 else: 968 data["base"] = base 969 if assignees: 970 data["assignees"] = [a.username if isinstance(a, User) else a for a in assignees] 971 if body: 972 data["body"] = body 973 if due_date: 974 data["due_date"] = Util.format_time(due_date) 975 if milestone: 976 data["milestone"] = milestone.id 977 978 result = self.allspice_client.requests_post( 979 self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name), 980 data=data, 981 ) 982 983 return DesignReview.parse_response(self.allspice_client, result)
Create a new Design Review.
See allspice.allspice.io/api/swagger#/repository/repoCreatePullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoCreatePullRequest
Parameters
- title: Title of the Design Review
- head: Branch or name of the branch to merge into the base branch
- base: Branch or name of the branch to merge into
- assignees: Optional. A list of users to assign this review. List can be of User objects or of usernames.
- body: An Optional Description for the Design Review.
- due_date: An Optional Due date for the Design Review.
- milestone: An Optional Milestone for the Design Review
Returns
The created Design Review
985 def create_milestone( 986 self, 987 title: str, 988 description: str, 989 due_date: Optional[str] = None, 990 state: str = "open", 991 ) -> "Milestone": 992 url = Repository.REPO_MILESTONES.format(owner=self.owner.username, repo=self.name) 993 data = {"title": title, "description": description, "state": state} 994 if due_date: 995 data["due_date"] = due_date 996 result = self.allspice_client.requests_post(url, data=data) 997 return Milestone.parse_response(self.allspice_client, result)
999 def create_gitea_hook(self, hook_url: str, events: List[str]): 1000 url = f"/repos/{self.owner.username}/{self.name}/hooks" 1001 data = { 1002 "type": "gitea", 1003 "config": {"content_type": "json", "url": hook_url}, 1004 "events": events, 1005 "active": True, 1006 } 1007 return self.allspice_client.requests_post(url, data=data)
1017 def is_collaborator(self, username) -> bool: 1018 if isinstance(username, User): 1019 username = username.username 1020 try: 1021 # returns 204 if its ok, 404 if its not 1022 self.allspice_client.requests_get( 1023 Repository.REPO_IS_COLLABORATOR % (self.owner.username, self.name, username) 1024 ) 1025 return True 1026 except Exception: 1027 return False
1029 def get_users_with_access(self) -> Sequence[User]: 1030 url = f"/repos/{self.owner.username}/{self.name}/collaborators" 1031 response = self.allspice_client.requests_get(url) 1032 collabs = [User.parse_response(self.allspice_client, user) for user in response] 1033 if isinstance(self.owner, User): 1034 return [*collabs, self.owner] 1035 else: 1036 # owner must be org 1037 teams = self.owner.get_teams() 1038 for team in teams: 1039 team_repos = team.get_repos() 1040 if self.name in [n.name for n in team_repos]: 1041 collabs += team.get_members() 1042 return collabs
1048 def transfer_ownership( 1049 self, 1050 new_owner: Union[User, Organization], 1051 new_teams: Set[Team] | FrozenSet[Team] = frozenset(), 1052 ): 1053 url = Repository.REPO_TRANSFER.format(owner=self.owner.username, repo=self.name) 1054 data: dict[str, Any] = {"new_owner": new_owner.username} 1055 if isinstance(new_owner, Organization): 1056 new_team_ids = [team.id for team in new_teams if team in new_owner.get_teams()] 1057 data["team_ids"] = new_team_ids 1058 self.allspice_client.requests_post(url, data=data) 1059 # TODO: make sure this instance is either updated or discarded
1061 def get_git_content( 1062 self, 1063 ref: Optional["Ref"] = None, 1064 commit: "Optional[Commit]" = None, 1065 ) -> List[Content]: 1066 """ 1067 Get the metadata for all files in the root directory. 1068 1069 https://hub.allspice.io/api/swagger#/repository/repoGetContentsList 1070 1071 :param ref: branch or commit to get content from 1072 :param commit: commit to get content from (deprecated) 1073 """ 1074 url = f"/repos/{self.owner.username}/{self.name}/contents" 1075 data = Util.data_params_for_ref(ref or commit) 1076 1077 result = [ 1078 Content.parse_response(self.allspice_client, f) 1079 for f in self.allspice_client.requests_get(url, data) 1080 ] 1081 return result
Get the metadata for all files in the root directory.
allspice.allspice.io/api/swagger#/repository/repoGetContentsList">https://huballspice.allspice.io/api/swagger#/repository/repoGetContentsList
Parameters
- ref: branch or commit to get content from
- commit: commit to get content from (deprecated)
1083 def get_tree(self, ref: Optional[Ref] = None, recursive: bool = False) -> List[GitEntry]: 1084 """ 1085 Get the repository's tree on a given ref. 1086 1087 By default, this will only return the top-level entries in the tree. If you want 1088 to get the entire tree, set `recursive` to True. 1089 1090 :param ref: The ref to get the tree from. If not provided, the default branch is used. 1091 :param recursive: Whether to get the entire tree or just the top-level entries. 1092 """ 1093 1094 ref = Util.data_params_for_ref(ref).get("ref", self.default_branch) 1095 url = self.REPO_GET_TREE.format(owner=self.owner.username, repo=self.name, ref=ref) 1096 params = {"recursive": recursive} 1097 results = self.allspice_client.requests_get_paginated(url, params=params) 1098 return [GitEntry.parse_response(self.allspice_client, result) for result in results]
Get the repository's tree on a given ref.
By default, this will only return the top-level entries in the tree. If you want
to get the entire tree, set recursive to True.
Parameters
- ref: The ref to get the tree from. If not provided, the default branch is used.
- recursive: Whether to get the entire tree or just the top-level entries.
1100 def get_file_content( 1101 self, 1102 content: Content, 1103 ref: Optional[Ref] = None, 1104 commit: Optional[Commit] = None, 1105 ) -> Union[str, List["Content"]]: 1106 """https://hub.allspice.io/api/swagger#/repository/repoGetContents""" 1107 url = f"/repos/{self.owner.username}/{self.name}/contents/{content.path}" 1108 data = Util.data_params_for_ref(ref or commit) 1109 1110 if content.type == Content.FILE: 1111 return self.allspice_client.requests_get(url, data)["content"] 1112 else: 1113 return [ 1114 Content.parse_response(self.allspice_client, f) 1115 for f in self.allspice_client.requests_get(url, data) 1116 ]
allspice.allspice.io/api/swagger#/repository/repoGetContents">https://huballspice.allspice.io/api/swagger#/repository/repoGetContents
1118 def get_raw_file( 1119 self, 1120 file_path: str, 1121 ref: Optional[Ref] = None, 1122 ) -> bytes: 1123 """ 1124 Get the raw, binary data of a single file. 1125 1126 Note 1: if the file you are requesting is a text file, you might want to 1127 use .decode() on the result to get a string. For example: 1128 1129 content = repo.get_raw_file("file.txt").decode("utf-8") 1130 1131 Note 2: this method will store the entire file in memory. If you want 1132 to download a large file, you might want to use `download_to_file` 1133 instead. 1134 1135 See https://hub.allspice.io/api/swagger#/repository/repoGetRawFileOrLFS 1136 1137 :param file_path: The path to the file to get. 1138 :param ref: The branch or commit to get the file from. If not provided, 1139 the default branch is used. 1140 """ 1141 1142 url = self.REPO_GET_MEDIA.format( 1143 owner=self.owner.username, 1144 repo=self.name, 1145 path=file_path, 1146 ) 1147 params = Util.data_params_for_ref(ref) 1148 return self.allspice_client.requests_get_raw(url, params=params)
Get the raw, binary data of a single file.
Note 1: if the file you are requesting is a text file, you might want to use .decode() on the result to get a string. For example:
content = repo.get_raw_file("file.txt").decode("utf-8")
Note 2: this method will store the entire file in memory. If you want
to download a large file, you might want to use download_to_file
instead.
See allspice.allspice.io/api/swagger#/repository/repoGetRawFileOrLFS">https://huballspice.allspice.io/api/swagger#/repository/repoGetRawFileOrLFS
Parameters
- file_path: The path to the file to get.
- ref: The branch or commit to get the file from. If not provided, the default branch is used.
1150 def download_to_file( 1151 self, 1152 file_path: str, 1153 io: IO, 1154 ref: Optional[Ref] = None, 1155 ) -> None: 1156 """ 1157 Download the binary data of a file to a file-like object. 1158 1159 Example: 1160 1161 with open("schematic.DSN", "wb") as f: 1162 Repository.download_to_file("Schematics/my_schematic.DSN", f) 1163 1164 :param file_path: The path to the file in the repository from the root 1165 of the repository. 1166 :param io: The file-like object to write the data to. 1167 """ 1168 1169 url = self.allspice_client._AllSpice__get_url( 1170 self.REPO_GET_MEDIA.format( 1171 owner=self.owner.username, 1172 repo=self.name, 1173 path=file_path, 1174 ) 1175 ) 1176 params = Util.data_params_for_ref(ref) 1177 response = self.allspice_client.requests.get( 1178 url, 1179 params=params, 1180 headers=self.allspice_client.headers, 1181 stream=True, 1182 ) 1183 1184 for chunk in response.iter_content(chunk_size=4096): 1185 if chunk: 1186 io.write(chunk)
Download the binary data of a file to a file-like object.
Example:
with open("schematic.DSN", "wb") as f:
Repository.download_to_file("Schematics/my_schematic.DSN", f)
Parameters
- file_path: The path to the file in the repository from the root of the repository.
- io: The file-like object to write the data to.
1188 def get_generated_json(self, content: Union[Content, str], ref: Optional[Ref] = None) -> dict: 1189 """ 1190 Get the json blob for a cad file if it exists, otherwise enqueue 1191 a new job and return a 503 status. 1192 1193 WARNING: This is still experimental and not recommended for critical 1194 applications. The structure and content of the returned dictionary can 1195 change at any time. 1196 1197 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON 1198 """ 1199 1200 if isinstance(content, Content): 1201 content = content.path 1202 1203 url = self.REPO_GET_ALLSPICE_JSON.format( 1204 owner=self.owner.username, 1205 repo=self.name, 1206 content=content, 1207 ) 1208 data = Util.data_params_for_ref(ref) 1209 return self.allspice_client.requests_get(url, data)
Get the json blob for a cad file if it exists, otherwise enqueue a new job and return a 503 status.
WARNING: This is still experimental and not recommended for critical applications. The structure and content of the returned dictionary can change at any time.
See allspice.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON">https://huballspice.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON
1211 def get_generated_svg(self, content: Union[Content, str], ref: Optional[Ref] = None) -> bytes: 1212 """ 1213 Get the svg blob for a cad file if it exists, otherwise enqueue 1214 a new job and return a 503 status. 1215 1216 WARNING: This is still experimental and not yet recommended for 1217 critical applications. The content of the returned svg can change 1218 at any time. 1219 1220 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG 1221 """ 1222 1223 if isinstance(content, Content): 1224 content = content.path 1225 1226 url = self.REPO_GET_ALLSPICE_SVG.format( 1227 owner=self.owner.username, 1228 repo=self.name, 1229 content=content, 1230 ) 1231 data = Util.data_params_for_ref(ref) 1232 return self.allspice_client.requests_get_raw(url, data)
Get the svg blob for a cad file if it exists, otherwise enqueue a new job and return a 503 status.
WARNING: This is still experimental and not yet recommended for critical applications. The content of the returned svg can change at any time.
See allspice.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG">https://huballspice.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG
1234 def get_generated_projectdata( 1235 self, content: Union[Content, str], ref: Optional[Ref] = None 1236 ) -> dict: 1237 """ 1238 Get the json project data based on the cad file provided 1239 1240 WARNING: This is still experimental and not yet recommended for 1241 critical applications. The content of the returned dictionary can change 1242 at any time. 1243 1244 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceProject 1245 """ 1246 if isinstance(content, Content): 1247 content = content.path 1248 1249 url = self.REPO_GET_ALLSPICE_PROJECT.format( 1250 owner=self.owner.username, 1251 repo=self.name, 1252 content=content, 1253 ) 1254 data = Util.data_params_for_ref(ref) 1255 return self.allspice_client.requests_get(url, data)
Get the json project data based on the cad file provided
WARNING: This is still experimental and not yet recommended for critical applications. The content of the returned dictionary can change at any time.
See allspice.allspice.io/api/swagger#/repository/repoGetAllSpiceProject">https://huballspice.allspice.io/api/swagger#/repository/repoGetAllSpiceProject
1257 def create_file(self, file_path: str, content: str, data: Optional[dict] = None): 1258 """https://hub.allspice.io/api/swagger#/repository/repoCreateFile""" 1259 if not data: 1260 data = {} 1261 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1262 data.update({"content": content}) 1263 return self.allspice_client.requests_post(url, data)
allspice.allspice.io/api/swagger#/repository/repoCreateFile">https://huballspice.allspice.io/api/swagger#/repository/repoCreateFile
1265 def change_file(self, file_path: str, file_sha: str, content: str, data: Optional[dict] = None): 1266 """https://hub.allspice.io/api/swagger#/repository/repoCreateFile""" 1267 if not data: 1268 data = {} 1269 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1270 data.update({"sha": file_sha, "content": content}) 1271 return self.allspice_client.requests_put(url, data)
allspice.allspice.io/api/swagger#/repository/repoCreateFile">https://huballspice.allspice.io/api/swagger#/repository/repoCreateFile
1273 def delete_file(self, file_path: str, file_sha: str, data: Optional[dict] = None): 1274 """https://hub.allspice.io/api/swagger#/repository/repoDeleteFile""" 1275 if not data: 1276 data = {} 1277 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1278 data.update({"sha": file_sha}) 1279 return self.allspice_client.requests_delete(url, data)
allspice.allspice.io/api/swagger#/repository/repoDeleteFile">https://huballspice.allspice.io/api/swagger#/repository/repoDeleteFile
1281 def get_archive( 1282 self, 1283 ref: Ref = "main", 1284 archive_format: ArchiveFormat = ArchiveFormat.ZIP, 1285 ) -> bytes: 1286 """ 1287 Download all the files in a specific ref of a repository as a zip or tarball 1288 archive. 1289 1290 https://hub.allspice.io/api/swagger#/repository/repoGetArchive 1291 1292 :param ref: branch or commit to get content from, defaults to the "main" branch 1293 :param archive_format: zip or tar, defaults to zip 1294 """ 1295 1296 ref_string = Util.data_params_for_ref(ref)["ref"] 1297 url = self.REPO_GET_ARCHIVE.format( 1298 owner=self.owner.username, 1299 repo=self.name, 1300 ref=ref_string, 1301 format=archive_format.value, 1302 ) 1303 return self.allspice_client.requests_get_raw(url)
Download all the files in a specific ref of a repository as a zip or tarball archive.
allspice.allspice.io/api/swagger#/repository/repoGetArchive">https://huballspice.allspice.io/api/swagger#/repository/repoGetArchive
Parameters
- ref: branch or commit to get content from, defaults to the "main" branch
- archive_format: zip or tar, defaults to zip
1305 def get_topics(self) -> list[str]: 1306 """ 1307 Gets the list of topics on this repository. 1308 1309 See http://localhost:3000/api/swagger#/repository/repoListTopics 1310 """ 1311 1312 url = self.REPO_GET_TOPICS.format( 1313 owner=self.owner.username, 1314 repo=self.name, 1315 ) 1316 return self.allspice_client.requests_get(url)["topics"]
Gets the list of topics on this repository.
See http://localhost:3000/api/swagger#/repository/repoListTopics
1318 def add_topic(self, topic: str): 1319 """ 1320 Adds a topic to the repository. 1321 1322 See https://hub.allspice.io/api/swagger#/repository/repoAddTopic 1323 1324 :param topic: The topic to add. Topic names must consist only of 1325 lowercase letters, numnbers and dashes (-), and cannot start with 1326 dashes. Topic names also must be under 35 characters long. 1327 """ 1328 1329 url = self.REPO_ADD_TOPIC.format(owner=self.owner.username, repo=self.name, topic=topic) 1330 self.allspice_client.requests_put(url)
Adds a topic to the repository.
See allspice.allspice.io/api/swagger#/repository/repoAddTopic">https://huballspice.allspice.io/api/swagger#/repository/repoAddTopic
Parameters
- topic: The topic to add. Topic names must consist only of lowercase letters, numnbers and dashes (-), and cannot start with dashes. Topic names also must be under 35 characters long.
1332 def create_release( 1333 self, 1334 tag_name: str, 1335 name: Optional[str] = None, 1336 body: Optional[str] = None, 1337 draft: bool = False, 1338 ): 1339 """ 1340 Create a release for this repository. The release will be created for 1341 the tag with the given name. If there is no tag with this name, create 1342 the tag first. 1343 1344 See https://hub.allspice.io/api/swagger#/repository/repoCreateRelease 1345 """ 1346 1347 url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name) 1348 data = { 1349 "tag_name": tag_name, 1350 "draft": draft, 1351 } 1352 if name is not None: 1353 data["name"] = name 1354 if body is not None: 1355 data["body"] = body 1356 response = self.allspice_client.requests_post(url, data) 1357 return Release.parse_response(self.allspice_client, response, self)
Create a release for this repository. The release will be created for the tag with the given name. If there is no tag with this name, create the tag first.
See allspice.allspice.io/api/swagger#/repository/repoCreateRelease">https://huballspice.allspice.io/api/swagger#/repository/repoCreateRelease
1359 def get_releases( 1360 self, draft: Optional[bool] = None, pre_release: Optional[bool] = None 1361 ) -> List[Release]: 1362 """ 1363 Get the list of releases for this repository. 1364 1365 See https://hub.allspice.io/api/swagger#/repository/repoListReleases 1366 """ 1367 1368 data = {} 1369 1370 if draft is not None: 1371 data["draft"] = draft 1372 if pre_release is not None: 1373 data["pre-release"] = pre_release 1374 1375 url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name) 1376 responses = self.allspice_client.requests_get_paginated(url, params=data) 1377 1378 return [ 1379 Release.parse_response(self.allspice_client, response, self) for response in responses 1380 ]
Get the list of releases for this repository.
See allspice.allspice.io/api/swagger#/repository/repoListReleases">https://huballspice.allspice.io/api/swagger#/repository/repoListReleases
1382 def get_latest_release(self) -> Release: 1383 """ 1384 Get the latest release for this repository. 1385 1386 See https://hub.allspice.io/api/swagger#/repository/repoGetLatestRelease 1387 """ 1388 1389 url = self.REPO_GET_LATEST_RELEASE.format(owner=self.owner.username, repo=self.name) 1390 response = self.allspice_client.requests_get(url) 1391 release = Release.parse_response(self.allspice_client, response, self) 1392 return release
Get the latest release for this repository.
See allspice.allspice.io/api/swagger#/repository/repoGetLatestRelease">https://huballspice.allspice.io/api/swagger#/repository/repoGetLatestRelease
1394 def get_release_by_tag(self, tag: str) -> Release: 1395 """ 1396 Get a release by its tag. 1397 1398 See https://hub.allspice.io/api/swagger#/repository/repoGetReleaseByTag 1399 """ 1400 1401 url = self.REPO_GET_RELEASE_BY_TAG.format( 1402 owner=self.owner.username, repo=self.name, tag=tag 1403 ) 1404 response = self.allspice_client.requests_get(url) 1405 release = Release.parse_response(self.allspice_client, response, self) 1406 return release
Get a release by its tag.
See allspice.allspice.io/api/swagger#/repository/repoGetReleaseByTag">https://huballspice.allspice.io/api/swagger#/repository/repoGetReleaseByTag
1408 def get_commit_statuses( 1409 self, 1410 commit: Union[str, Commit], 1411 sort: Optional[CommitStatusSort] = None, 1412 state: Optional[CommitStatusState] = None, 1413 ) -> List[CommitStatus]: 1414 """ 1415 Get a list of statuses for a commit. 1416 1417 This is roughly equivalent to the Commit.get_statuses method, but this 1418 method allows you to sort and filter commits and is more convenient if 1419 you have a commit SHA and don't need to get the commit itself. 1420 1421 See https://hub.allspice.io/api/swagger#/repository/repoListStatuses 1422 """ 1423 1424 if isinstance(commit, Commit): 1425 commit = commit.sha 1426 1427 params = {} 1428 if sort is not None: 1429 params["sort"] = sort.value 1430 if state is not None: 1431 params["state"] = state.value 1432 1433 url = self.REPO_GET_COMMIT_STATUS.format( 1434 owner=self.owner.username, repo=self.name, sha=commit 1435 ) 1436 response = self.allspice_client.requests_get_paginated(url, params=params) 1437 return [CommitStatus.parse_response(self.allspice_client, status) for status in response]
Get a list of statuses for a commit.
This is roughly equivalent to the Commit.get_statuses method, but this method allows you to sort and filter commits and is more convenient if you have a commit SHA and don't need to get the commit itself.
See allspice.allspice.io/api/swagger#/repository/repoListStatuses">https://huballspice.allspice.io/api/swagger#/repository/repoListStatuses
1439 def create_commit_status( 1440 self, 1441 commit: Union[str, Commit], 1442 context: Optional[str] = None, 1443 description: Optional[str] = None, 1444 state: Optional[CommitStatusState] = None, 1445 target_url: Optional[str] = None, 1446 ) -> CommitStatus: 1447 """ 1448 Create a status on a commit. 1449 1450 See https://hub.allspice.io/api/swagger#/repository/repoCreateStatus 1451 """ 1452 1453 if isinstance(commit, Commit): 1454 commit = commit.sha 1455 1456 data = {} 1457 if context is not None: 1458 data["context"] = context 1459 if description is not None: 1460 data["description"] = description 1461 if state is not None: 1462 data["state"] = state.value 1463 if target_url is not None: 1464 data["target_url"] = target_url 1465 1466 url = self.REPO_GET_COMMIT_STATUS.format( 1467 owner=self.owner.username, repo=self.name, sha=commit 1468 ) 1469 response = self.allspice_client.requests_post(url, data=data) 1470 return CommitStatus.parse_response(self.allspice_client, response)
Create a status on a commit.
See allspice.allspice.io/api/swagger#/repository/repoCreateStatus">https://huballspice.allspice.io/api/swagger#/repository/repoCreateStatus
573 class ArchiveFormat(Enum): 574 """ 575 Archive formats for Repository.get_archive 576 """ 577 578 TAR = "tar.gz" 579 ZIP = "zip"
Archive formats for Repository.get_archive
581 class CommitStatusSort(Enum): 582 """ 583 Sort order for Repository.get_commit_status 584 """ 585 586 OLDEST = "oldest" 587 RECENT_UPDATE = "recentupdate" 588 LEAST_UPDATE = "leastupdate" 589 LEAST_INDEX = "leastindex" 590 HIGHEST_INDEX = "highestindex"
Sort order for Repository.get_commit_status
2580class Team(ApiObject): 2581 can_create_org_repo: bool 2582 description: str 2583 id: int 2584 includes_all_repositories: bool 2585 name: str 2586 organization: Optional["Organization"] 2587 permission: str 2588 units: List[str] 2589 units_map: Dict[str, str] 2590 2591 API_OBJECT = """/teams/{id}""" # <id> 2592 ADD_REPO = """/teams/%s/repos/%s/%s""" # <id, org, repo> 2593 TEAM_DELETE = """/teams/%s""" # <id> 2594 GET_MEMBERS = """/teams/%s/members""" # <id> 2595 GET_REPOS = """/teams/%s/repos""" # <id> 2596 2597 def __init__(self, allspice_client): 2598 super().__init__(allspice_client) 2599 2600 def __eq__(self, other): 2601 if not isinstance(other, Team): 2602 return False 2603 return self.organization == other.organization and self.id == other.id 2604 2605 def __hash__(self): 2606 return hash(self.organization) ^ hash(self.id) 2607 2608 _fields_to_parsers: ClassVar[dict] = { 2609 "organization": lambda allspice_client, o: Organization.parse_response(allspice_client, o) 2610 } 2611 2612 _patchable_fields: ClassVar[set[str]] = { 2613 "can_create_org_repo", 2614 "description", 2615 "includes_all_repositories", 2616 "name", 2617 "permission", 2618 "units", 2619 "units_map", 2620 } 2621 2622 @classmethod 2623 def request(cls, allspice_client, id: int): 2624 return cls._request(allspice_client, {"id": id}) 2625 2626 def commit(self): 2627 args = {"id": self.id} 2628 self._commit(args) 2629 2630 def add_user(self, user: User): 2631 """https://hub.allspice.io/api/swagger#/organization/orgAddTeamMember""" 2632 url = f"/teams/{self.id}/members/{user.login}" 2633 self.allspice_client.requests_put(url) 2634 2635 def add_repo(self, org: Organization, repo: Union[Repository, str]): 2636 if isinstance(repo, Repository): 2637 repo_name = repo.name 2638 else: 2639 repo_name = repo 2640 self.allspice_client.requests_put(Team.ADD_REPO % (self.id, org.username, repo_name)) 2641 2642 def get_members(self): 2643 """Get all users assigned to the team.""" 2644 results = self.allspice_client.requests_get_paginated( 2645 Team.GET_MEMBERS % self.id, 2646 ) 2647 return [User.parse_response(self.allspice_client, result) for result in results] 2648 2649 def get_repos(self): 2650 """Get all repos of this Team.""" 2651 results = self.allspice_client.requests_get(Team.GET_REPOS % self.id) 2652 return [Repository.parse_response(self.allspice_client, result) for result in results] 2653 2654 def delete(self): 2655 self.allspice_client.requests_delete(Team.TEAM_DELETE % self.id) 2656 self.deleted = True 2657 2658 def remove_team_member(self, user_name: str): 2659 url = f"/teams/{self.id}/members/{user_name}" 2660 self.allspice_client.requests_delete(url)
2630 def add_user(self, user: User): 2631 """https://hub.allspice.io/api/swagger#/organization/orgAddTeamMember""" 2632 url = f"/teams/{self.id}/members/{user.login}" 2633 self.allspice_client.requests_put(url)
allspice.allspice.io/api/swagger#/organization/orgAddTeamMember">https://huballspice.allspice.io/api/swagger#/organization/orgAddTeamMember
2642 def get_members(self): 2643 """Get all users assigned to the team.""" 2644 results = self.allspice_client.requests_get_paginated( 2645 Team.GET_MEMBERS % self.id, 2646 ) 2647 return [User.parse_response(self.allspice_client, result) for result in results]
Get all users assigned to the team.
2649 def get_repos(self): 2650 """Get all repos of this Team.""" 2651 results = self.allspice_client.requests_get(Team.GET_REPOS % self.id) 2652 return [Repository.parse_response(self.allspice_client, result) for result in results]
Get all repos of this Team.
244class User(ApiObject): 245 active: bool 246 admin: Any 247 allow_create_organization: Any 248 allow_git_hook: Any 249 allow_import_local: Any 250 avatar_url: str 251 created: str 252 description: str 253 email: str 254 emails: List[Any] 255 followers_count: int 256 following_count: int 257 full_name: str 258 html_url: str 259 id: int 260 is_admin: bool 261 language: str 262 last_login: str 263 location: str 264 login: str 265 login_name: str 266 max_repo_creation: Any 267 must_change_password: Any 268 password: Any 269 prohibit_login: bool 270 restricted: bool 271 source_id: int 272 starred_repos_count: int 273 username: str 274 visibility: str 275 website: str 276 277 API_OBJECT = """/users/{name}""" # <org> 278 USER_MAIL = """/user/emails?sudo=%s""" # <name> 279 USER_PATCH = """/admin/users/%s""" # <username> 280 ADMIN_DELETE_USER = """/admin/users/%s""" # <username> 281 ADMIN_EDIT_USER = """/admin/users/{username}""" # <username> 282 USER_HEATMAP = """/users/%s/heatmap""" # <username> 283 284 def __init__(self, allspice_client): 285 super().__init__(allspice_client) 286 self._emails = [] 287 288 def __eq__(self, other): 289 if not isinstance(other, User): 290 return False 291 return self.allspice_client == other.allspice_client and self.id == other.id 292 293 def __hash__(self): 294 return hash(self.allspice_client) ^ hash(self.id) 295 296 @property 297 def emails(self): 298 self.__request_emails() 299 return self._emails 300 301 @classmethod 302 def request(cls, allspice_client, name: str) -> "User": 303 api_object = cls._request(allspice_client, {"name": name}) 304 return api_object 305 306 _patchable_fields: ClassVar[set[str]] = { 307 "active", 308 "admin", 309 "allow_create_organization", 310 "allow_git_hook", 311 "allow_import_local", 312 "email", 313 "full_name", 314 "location", 315 "login_name", 316 "max_repo_creation", 317 "must_change_password", 318 "password", 319 "prohibit_login", 320 "website", 321 } 322 323 def commit(self, login_name: str, source_id: int = 0): 324 """ 325 Unfortunately it is necessary to require the login name 326 as well as the login source (that is not supplied when getting a user) for 327 changing a user. 328 Usually source_id is 0 and the login_name is equal to the username. 329 """ 330 values = self.get_dirty_fields() 331 values.update( 332 # api-doc says that the "source_id" is necessary; works without though 333 {"login_name": login_name, "source_id": source_id} 334 ) 335 args = {"username": self.username} 336 self.allspice_client.requests_patch(User.ADMIN_EDIT_USER.format(**args), data=values) 337 self._dirty_fields = {} 338 339 def create_repo( 340 self, 341 repoName: str, 342 description: str = "", 343 private: bool = False, 344 autoInit=True, 345 gitignores: Optional[str] = None, 346 license: Optional[str] = None, 347 readme: str = "Default", 348 issue_labels: Optional[str] = None, 349 default_branch="master", 350 ): 351 """Create a user Repository 352 353 Throws: 354 AlreadyExistsException: If the Repository exists already. 355 Exception: If something else went wrong. 356 """ 357 result = self.allspice_client.requests_post( 358 "/user/repos", 359 data={ 360 "name": repoName, 361 "description": description, 362 "private": private, 363 "auto_init": autoInit, 364 "gitignores": gitignores, 365 "license": license, 366 "issue_labels": issue_labels, 367 "readme": readme, 368 "default_branch": default_branch, 369 }, 370 ) 371 if "id" in result: 372 self.allspice_client.logger.info("Successfully created Repository %s " % result["name"]) 373 else: 374 self.allspice_client.logger.error(result["message"]) 375 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 376 return Repository.parse_response(self.allspice_client, result) 377 378 def get_repositories(self) -> List["Repository"]: 379 """Get all Repositories owned by this User.""" 380 url = f"/users/{self.username}/repos" 381 results = self.allspice_client.requests_get_paginated(url) 382 return [Repository.parse_response(self.allspice_client, result) for result in results] 383 384 def get_orgs(self) -> List[Organization]: 385 """Get all Organizations this user is a member of.""" 386 url = f"/users/{self.username}/orgs" 387 results = self.allspice_client.requests_get_paginated(url) 388 return [Organization.parse_response(self.allspice_client, result) for result in results] 389 390 def get_teams(self) -> List["Team"]: 391 url = "/user/teams" 392 results = self.allspice_client.requests_get_paginated(url, sudo=self) 393 return [Team.parse_response(self.allspice_client, result) for result in results] 394 395 def get_accessible_repos(self) -> List["Repository"]: 396 """Get all Repositories accessible by the logged in User.""" 397 results = self.allspice_client.requests_get("/user/repos", sudo=self) 398 return [Repository.parse_response(self.allspice_client, result) for result in results] 399 400 def __request_emails(self): 401 result = self.allspice_client.requests_get(User.USER_MAIL % self.login) 402 # report if the adress changed by this 403 for mail in result: 404 self._emails.append(mail["email"]) 405 if mail["primary"]: 406 self._email = mail["email"] 407 408 def delete(self): 409 """Deletes this User. Also deletes all Repositories he owns.""" 410 self.allspice_client.requests_delete(User.ADMIN_DELETE_USER % self.username) 411 self.deleted = True 412 413 def get_heatmap(self) -> List[Tuple[datetime, int]]: 414 results = self.allspice_client.requests_get(User.USER_HEATMAP % self.username) 415 results = [ 416 (datetime.fromtimestamp(result["timestamp"]), result["contributions"]) 417 for result in results 418 ] 419 return results
323 def commit(self, login_name: str, source_id: int = 0): 324 """ 325 Unfortunately it is necessary to require the login name 326 as well as the login source (that is not supplied when getting a user) for 327 changing a user. 328 Usually source_id is 0 and the login_name is equal to the username. 329 """ 330 values = self.get_dirty_fields() 331 values.update( 332 # api-doc says that the "source_id" is necessary; works without though 333 {"login_name": login_name, "source_id": source_id} 334 ) 335 args = {"username": self.username} 336 self.allspice_client.requests_patch(User.ADMIN_EDIT_USER.format(**args), data=values) 337 self._dirty_fields = {}
Unfortunately it is necessary to require the login name as well as the login source (that is not supplied when getting a user) for changing a user. Usually source_id is 0 and the login_name is equal to the username.
339 def create_repo( 340 self, 341 repoName: str, 342 description: str = "", 343 private: bool = False, 344 autoInit=True, 345 gitignores: Optional[str] = None, 346 license: Optional[str] = None, 347 readme: str = "Default", 348 issue_labels: Optional[str] = None, 349 default_branch="master", 350 ): 351 """Create a user Repository 352 353 Throws: 354 AlreadyExistsException: If the Repository exists already. 355 Exception: If something else went wrong. 356 """ 357 result = self.allspice_client.requests_post( 358 "/user/repos", 359 data={ 360 "name": repoName, 361 "description": description, 362 "private": private, 363 "auto_init": autoInit, 364 "gitignores": gitignores, 365 "license": license, 366 "issue_labels": issue_labels, 367 "readme": readme, 368 "default_branch": default_branch, 369 }, 370 ) 371 if "id" in result: 372 self.allspice_client.logger.info("Successfully created Repository %s " % result["name"]) 373 else: 374 self.allspice_client.logger.error(result["message"]) 375 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 376 return Repository.parse_response(self.allspice_client, result)
Create a user Repository
Throws: AlreadyExistsException: If the Repository exists already. Exception: If something else went wrong.
378 def get_repositories(self) -> List["Repository"]: 379 """Get all Repositories owned by this User.""" 380 url = f"/users/{self.username}/repos" 381 results = self.allspice_client.requests_get_paginated(url) 382 return [Repository.parse_response(self.allspice_client, result) for result in results]
Get all Repositories owned by this User.
384 def get_orgs(self) -> List[Organization]: 385 """Get all Organizations this user is a member of.""" 386 url = f"/users/{self.username}/orgs" 387 results = self.allspice_client.requests_get_paginated(url) 388 return [Organization.parse_response(self.allspice_client, result) for result in results]
Get all Organizations this user is a member of.
395 def get_accessible_repos(self) -> List["Repository"]: 396 """Get all Repositories accessible by the logged in User.""" 397 results = self.allspice_client.requests_get("/user/repos", sudo=self) 398 return [Repository.parse_response(self.allspice_client, result) for result in results]
Get all Repositories accessible by the logged in User.
408 def delete(self): 409 """Deletes this User. Also deletes all Repositories he owns.""" 410 self.allspice_client.requests_delete(User.ADMIN_DELETE_USER % self.username) 411 self.deleted = True
Deletes this User. Also deletes all Repositories he owns.