allspice
A very simple API client for AllSpice Hub
Note that not the full Swagger-API is accessible. The whole implementation is focused on making access and working with Organizations, Teams, Repositories and Users as pain free as possible.
Forked from https://github.com/Langenfeld/py-gitea.
Usage
Docs
See the documentation site.
Examples
Check the examples directory for full, working example scripts that you can adapt or refer to for your own needs.
Quickstart
First get an allspice_client
object wrapping access and authentication (via an api token) for your instance of AllSpice Hub.
from allspice import *
# By default, points to hub.allspice.io.
allspice_client = AllSpice(token_text=TOKEN)
# If you are self-hosting:
allspice_client = AllSpice(allspice_hub_url=URL, token_text=TOKEN)
Operations like requesting the AllSpice version or authentication user can be requested directly from the allspice_client
object:
print("AllSpice Version: " + allspice_client.get_version())
print("API-Token belongs to user: " + allspice_client.get_user().username)
Adding entities like Users, Organizations, ... also is done via the allspice_client object.
user = allspice_client.create_user("Test Testson", "test@test.test", "password")
All operations on entities in allspice are then accomplished via the according wrapper objects for those entities.
Each of those objects has a .request
method that creates an entity according to your allspice_client instance.
other_user = User.request(allspice_client, "OtherUserName")
print(other_user.username)
Note that the fields of the User, Organization,... classes are dynamically created at runtime, and thus not visible during divelopment. Refer to the AllSpice API documentation for the fields names.
Fields that can not be altered via allspice-api, are read only. After altering a field, the .commit
method of the according object must be called to synchronize the changed fields with your allspice_client instance.
org = Organization.request(allspice_client, test_org)
org.description = "some new description"
org.location = "some new location"
org.commit()
An entity in allspice can be deleted by calling delete.
org.delete()
All entity objects do have methods to execute some of the requests possible though the AllSpice api:
org = Organization.request(allspice_client, ORGNAME)
teams = org.get_teams()
for team in teams:
repos = team.get_repos()
for repo in repos:
print(repo.name)
1""" 2.. include:: ../README.md 3 :start-line: 1 4 :end-before: Installation 5""" 6 7from .allspice import ( 8 AllSpice, 9) 10from .apiobject import ( 11 Branch, 12 Comment, 13 Commit, 14 Content, 15 DesignReview, 16 Issue, 17 Milestone, 18 Organization, 19 Release, 20 Repository, 21 Team, 22 User, 23) 24from .exceptions import AlreadyExistsException, NotFoundException 25 26__version__ = "3.6.0" 27 28__all__ = [ 29 "AllSpice", 30 "User", 31 "Organization", 32 "Team", 33 "Repository", 34 "Branch", 35 "NotFoundException", 36 "AlreadyExistsException", 37 "Issue", 38 "Milestone", 39 "Commit", 40 "Comment", 41 "Content", 42 "DesignReview", 43 "Release", 44]
20class AllSpice: 21 """Object to establish a session with AllSpice Hub.""" 22 23 ADMIN_CREATE_USER = """/admin/users""" 24 GET_USERS_ADMIN = """/admin/users""" 25 ADMIN_REPO_CREATE = """/admin/users/%s/repos""" # <ownername> 26 ALLSPICE_HUB_VERSION = """/version""" 27 GET_USER = """/user""" 28 GET_REPOSITORY = """/repos/{owner}/{name}""" 29 CREATE_ORG = """/admin/users/%s/orgs""" # <username> 30 CREATE_TEAM = """/orgs/%s/teams""" # <orgname> 31 32 def __init__( 33 self, 34 allspice_hub_url="https://hub.allspice.io", 35 token_text=None, 36 auth=None, 37 verify=True, 38 log_level="INFO", 39 ratelimiting=(100, 60), 40 ): 41 """Initializing an instance of the AllSpice Hub Client 42 43 Args: 44 allspice_hub_url (str): The URL for the AllSpice Hub instance. 45 Defaults to `https://hub.allspice.io`. 46 47 token_text (str, None): The access token, by default None. 48 49 auth (tuple, None): The user credentials 50 `(username, password)`, by default None. 51 52 verify (bool): If True, allow insecure server connections 53 when using SSL. 54 55 log_level (str): The log level, by default `INFO`. 56 57 ratelimiting (tuple[int, int], None): `(max_calls, period)`, 58 If None, no rate limiting is applied. By default, 100 calls 59 per minute are allowed. 60 """ 61 62 self.logger = logging.getLogger(__name__) 63 self.logger.setLevel(log_level) 64 self.headers = { 65 "Content-type": "application/json", 66 } 67 self.url = allspice_hub_url 68 69 if ratelimiting is None: 70 self.requests = requests.Session() 71 else: 72 (max_calls, period) = ratelimiting 73 self.requests = RateLimitedSession(max_calls=max_calls, period=period) 74 75 # Manage authentification 76 if not token_text and not auth: 77 raise ValueError("Please provide auth or token_text, but not both") 78 if token_text: 79 self.headers["Authorization"] = "token " + token_text 80 if auth: 81 self.logger.warning( 82 "Using basic auth is not recommended. Prefer using a token instead." 83 ) 84 self.requests.auth = auth 85 86 # Manage SSL certification verification 87 self.requests.verify = verify 88 if not verify: 89 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) 90 91 def __get_url(self, endpoint): 92 url = self.url + "/api/v1" + endpoint 93 self.logger.debug("Url: %s" % url) 94 return url 95 96 def __get(self, endpoint: str, params: Mapping = frozendict()) -> requests.Response: 97 request = self.requests.get(self.__get_url(endpoint), headers=self.headers, params=params) 98 if request.status_code not in [200, 201]: 99 message = f"Received status code: {request.status_code} ({request.url})" 100 if request.status_code in [404]: 101 raise NotFoundException(message) 102 if request.status_code in [403]: 103 raise Exception( 104 f"Unauthorized: {request.url} - Check your permissions and try again! ({message})" 105 ) 106 if request.status_code in [409]: 107 raise ConflictException(message) 108 if request.status_code in [503]: 109 raise NotYetGeneratedException(message) 110 raise Exception(message) 111 return request 112 113 @staticmethod 114 def parse_result(result) -> Dict: 115 """Parses the result-JSON to a dict.""" 116 if result.text and len(result.text) > 3: 117 return json.loads(result.text) 118 return {} 119 120 def requests_get(self, endpoint: str, params: Mapping = frozendict(), sudo=None): 121 combined_params = {} 122 combined_params.update(params) 123 if sudo: 124 combined_params["sudo"] = sudo.username 125 return self.parse_result(self.__get(endpoint, combined_params)) 126 127 def requests_get_raw(self, endpoint: str, params=frozendict(), sudo=None) -> bytes: 128 combined_params = {} 129 combined_params.update(params) 130 if sudo: 131 combined_params["sudo"] = sudo.username 132 return self.__get(endpoint, combined_params).content 133 134 def requests_get_paginated( 135 self, 136 endpoint: str, 137 params=frozendict(), 138 sudo=None, 139 page_key: str = "page", 140 first_page: int = 1, 141 ): 142 page = first_page 143 combined_params = {} 144 combined_params.update(params) 145 aggregated_result = [] 146 while True: 147 combined_params[page_key] = page 148 result = self.requests_get(endpoint, combined_params, sudo) 149 150 if not result: 151 return aggregated_result 152 153 if isinstance(result, dict): 154 if "data" in result: 155 data = result["data"] 156 if len(data) == 0: 157 return aggregated_result 158 aggregated_result.extend(data) 159 elif "tree" in result: 160 data = result["tree"] 161 if data is None or len(data) == 0: 162 return aggregated_result 163 aggregated_result.extend(data) 164 else: 165 raise NotImplementedError( 166 "requests_get_paginated does not know how to handle responses of this type." 167 ) 168 else: 169 aggregated_result.extend(result) 170 171 page += 1 172 173 def requests_put(self, endpoint: str, data: Optional[dict] = None): 174 if not data: 175 data = {} 176 request = self.requests.put( 177 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 178 ) 179 if request.status_code not in [200, 204]: 180 message = f"Received status code: {request.status_code} ({request.url}) {request.text}" 181 self.logger.error(message) 182 raise Exception(message) 183 184 def requests_delete(self, endpoint: str, data: Optional[dict] = None): 185 request = self.requests.delete( 186 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 187 ) 188 if request.status_code not in [200, 204]: 189 message = f"Received status code: {request.status_code} ({request.url})" 190 self.logger.error(message) 191 raise Exception(message) 192 193 def requests_post( 194 self, 195 endpoint: str, 196 data: Optional[dict] = None, 197 params: Optional[dict] = None, 198 files: Optional[dict] = None, 199 ): 200 """ 201 Make a POST call to the endpoint. 202 203 :param endpoint: The path to the endpoint 204 :param data: A dictionary for JSON data 205 :param params: A dictionary of query params 206 :param files: A dictionary of files, see requests.post. Using both files and data 207 can lead to unexpected results! 208 :return: The JSON response parsed as a dict 209 """ 210 211 # This should ideally be a TypedDict of the type of arguments taken by 212 # `requests.post`. 213 args: dict[str, Any] = { 214 "headers": self.headers.copy(), 215 } 216 if data is not None: 217 args["data"] = json.dumps(data) 218 if params is not None: 219 args["params"] = params 220 if files is not None: 221 args["headers"].pop("Content-type") 222 args["files"] = files 223 224 request = self.requests.post(self.__get_url(endpoint), **args) 225 226 if request.status_code not in [200, 201, 202]: 227 if "already exists" in request.text or "e-mail already in use" in request.text: 228 self.logger.warning(request.text) 229 raise AlreadyExistsException() 230 self.logger.error(f"Received status code: {request.status_code} ({request.url})") 231 self.logger.error(f"With info: {data} ({self.headers})") 232 self.logger.error(f"Answer: {request.text}") 233 raise Exception( 234 f"Received status code: {request.status_code} ({request.url}), {request.text}" 235 ) 236 return self.parse_result(request) 237 238 def requests_patch(self, endpoint: str, data: dict): 239 request = self.requests.patch( 240 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 241 ) 242 if request.status_code not in [200, 201]: 243 error_message = f"Received status code: {request.status_code} ({request.url}) {data}" 244 self.logger.error(error_message) 245 raise Exception(error_message) 246 return self.parse_result(request) 247 248 def get_orgs_public_members_all(self, orgname): 249 path = "/orgs/" + orgname + "/public_members" 250 return self.requests_get(path) 251 252 def get_orgs(self): 253 path = "/admin/orgs" 254 results = self.requests_get(path) 255 return [Organization.parse_response(self, result) for result in results] 256 257 def get_user(self): 258 result = self.requests_get(AllSpice.GET_USER) 259 return User.parse_response(self, result) 260 261 def get_version(self) -> str: 262 result = self.requests_get(AllSpice.ALLSPICE_HUB_VERSION) 263 return result["version"] 264 265 def get_users(self) -> List[User]: 266 results = self.requests_get(AllSpice.GET_USERS_ADMIN) 267 return [User.parse_response(self, result) for result in results] 268 269 def get_user_by_email(self, email: str) -> Optional[User]: 270 users = self.get_users() 271 for user in users: 272 if user.email == email or email in user.emails: 273 return user 274 return None 275 276 def get_user_by_name(self, username: str) -> Optional[User]: 277 users = self.get_users() 278 for user in users: 279 if user.username == username: 280 return user 281 return None 282 283 def get_repository(self, owner: str, name: str) -> Repository: 284 path = self.GET_REPOSITORY.format(owner=owner, name=name) 285 result = self.requests_get(path) 286 return Repository.parse_response(self, result) 287 288 def create_user( 289 self, 290 user_name: str, 291 email: str, 292 password: str, 293 full_name: Optional[str] = None, 294 login_name: Optional[str] = None, 295 change_pw=True, 296 send_notify=True, 297 source_id=0, 298 ): 299 """Create User. 300 Throws: 301 AlreadyExistsException, if the User exists already 302 Exception, if something else went wrong. 303 """ 304 if not login_name: 305 login_name = user_name 306 if not full_name: 307 full_name = user_name 308 request_data = { 309 "source_id": source_id, 310 "login_name": login_name, 311 "full_name": full_name, 312 "username": user_name, 313 "email": email, 314 "password": password, 315 "send_notify": send_notify, 316 "must_change_password": change_pw, 317 } 318 319 self.logger.debug("Gitea post payload: %s", request_data) 320 result = self.requests_post(AllSpice.ADMIN_CREATE_USER, data=request_data) 321 if "id" in result: 322 self.logger.info( 323 "Successfully created User %s <%s> (id %s)", 324 result["login"], 325 result["email"], 326 result["id"], 327 ) 328 self.logger.debug("Gitea response: %s", result) 329 else: 330 self.logger.error(result["message"]) 331 raise Exception("User not created... (gitea: %s)" % result["message"]) 332 user = User.parse_response(self, result) 333 return user 334 335 def create_repo( 336 self, 337 repoOwner: Union[User, Organization], 338 repoName: str, 339 description: str = "", 340 private: bool = False, 341 autoInit=True, 342 gitignores: Optional[str] = None, 343 license: Optional[str] = None, 344 readme: str = "Default", 345 issue_labels: Optional[str] = None, 346 default_branch="master", 347 ): 348 """Create a Repository as the administrator 349 350 Throws: 351 AlreadyExistsException: If the Repository exists already. 352 Exception: If something else went wrong. 353 354 Note: 355 Non-admin users can not use this method. Please use instead 356 `allspice.User.create_repo` or `allspice.Organization.create_repo`. 357 """ 358 # although this only says user in the api, this also works for 359 # organizations 360 assert isinstance(repoOwner, User) or isinstance(repoOwner, Organization) 361 result = self.requests_post( 362 AllSpice.ADMIN_REPO_CREATE % repoOwner.username, 363 data={ 364 "name": repoName, 365 "description": description, 366 "private": private, 367 "auto_init": autoInit, 368 "gitignores": gitignores, 369 "license": license, 370 "issue_labels": issue_labels, 371 "readme": readme, 372 "default_branch": default_branch, 373 }, 374 ) 375 if "id" in result: 376 self.logger.info("Successfully created Repository %s " % result["name"]) 377 else: 378 self.logger.error(result["message"]) 379 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 380 return Repository.parse_response(self, result) 381 382 def create_org( 383 self, 384 owner: User, 385 orgName: str, 386 description: str, 387 location="", 388 website="", 389 full_name="", 390 ): 391 assert isinstance(owner, User) 392 result = self.requests_post( 393 AllSpice.CREATE_ORG % owner.username, 394 data={ 395 "username": orgName, 396 "description": description, 397 "location": location, 398 "website": website, 399 "full_name": full_name, 400 }, 401 ) 402 if "id" in result: 403 self.logger.info("Successfully created Organization %s" % result["username"]) 404 else: 405 self.logger.error("Organization not created... (gitea: %s)" % result["message"]) 406 self.logger.error(result["message"]) 407 raise Exception("Organization not created... (gitea: %s)" % result["message"]) 408 return Organization.parse_response(self, result) 409 410 def create_team( 411 self, 412 org: Organization, 413 name: str, 414 description: str = "", 415 permission: str = "read", 416 can_create_org_repo: bool = False, 417 includes_all_repositories: bool = False, 418 units=( 419 "repo.code", 420 "repo.issues", 421 "repo.ext_issues", 422 "repo.wiki", 423 "repo.pulls", 424 "repo.releases", 425 "repo.ext_wiki", 426 ), 427 units_map={}, 428 ): 429 """Creates a Team. 430 431 Args: 432 org (Organization): Organization the Team will be part of. 433 name (str): The Name of the Team to be created. 434 description (str): Optional, None, short description of the new Team. 435 permission (str): Optional, 'read', What permissions the members 436 units_map (dict): Optional, {}, a mapping of units to their 437 permissions. If None or empty, the `permission` permission will 438 be applied to all units. Note: When both `units` and `units_map` 439 are given, `units_map` will be preferred. 440 """ 441 442 result = self.requests_post( 443 AllSpice.CREATE_TEAM % org.username, 444 data={ 445 "name": name, 446 "description": description, 447 "permission": permission, 448 "can_create_org_repo": can_create_org_repo, 449 "includes_all_repositories": includes_all_repositories, 450 "units": units, 451 "units_map": units_map, 452 }, 453 ) 454 455 if "id" in result: 456 self.logger.info("Successfully created Team %s" % result["name"]) 457 else: 458 self.logger.error("Team not created... (gitea: %s)" % result["message"]) 459 self.logger.error(result["message"]) 460 raise Exception("Team not created... (gitea: %s)" % result["message"]) 461 api_object = Team.parse_response(self, result) 462 setattr( 463 api_object, "_organization", org 464 ) # fixes strange behaviour of gitea not returning a valid organization here. 465 return api_object
Object to establish a session with AllSpice Hub.
32 def __init__( 33 self, 34 allspice_hub_url="https://hub.allspice.io", 35 token_text=None, 36 auth=None, 37 verify=True, 38 log_level="INFO", 39 ratelimiting=(100, 60), 40 ): 41 """Initializing an instance of the AllSpice Hub Client 42 43 Args: 44 allspice_hub_url (str): The URL for the AllSpice Hub instance. 45 Defaults to `https://hub.allspice.io`. 46 47 token_text (str, None): The access token, by default None. 48 49 auth (tuple, None): The user credentials 50 `(username, password)`, by default None. 51 52 verify (bool): If True, allow insecure server connections 53 when using SSL. 54 55 log_level (str): The log level, by default `INFO`. 56 57 ratelimiting (tuple[int, int], None): `(max_calls, period)`, 58 If None, no rate limiting is applied. By default, 100 calls 59 per minute are allowed. 60 """ 61 62 self.logger = logging.getLogger(__name__) 63 self.logger.setLevel(log_level) 64 self.headers = { 65 "Content-type": "application/json", 66 } 67 self.url = allspice_hub_url 68 69 if ratelimiting is None: 70 self.requests = requests.Session() 71 else: 72 (max_calls, period) = ratelimiting 73 self.requests = RateLimitedSession(max_calls=max_calls, period=period) 74 75 # Manage authentification 76 if not token_text and not auth: 77 raise ValueError("Please provide auth or token_text, but not both") 78 if token_text: 79 self.headers["Authorization"] = "token " + token_text 80 if auth: 81 self.logger.warning( 82 "Using basic auth is not recommended. Prefer using a token instead." 83 ) 84 self.requests.auth = auth 85 86 # Manage SSL certification verification 87 self.requests.verify = verify 88 if not verify: 89 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
Initializing an instance of the AllSpice Hub Client
Args:
allspice_hub_url (str): The URL for the AllSpice Hub instance.
Defaults to https://hub.allspice.io
.
token_text (str, None): The access token, by default None.
auth (tuple, None): The user credentials
`(username, password)`, by default None.
verify (bool): If True, allow insecure server connections
when using SSL.
log_level (str): The log level, by default `INFO`.
ratelimiting (tuple[int, int], None): `(max_calls, period)`,
If None, no rate limiting is applied. By default, 100 calls
per minute are allowed.
113 @staticmethod 114 def parse_result(result) -> Dict: 115 """Parses the result-JSON to a dict.""" 116 if result.text and len(result.text) > 3: 117 return json.loads(result.text) 118 return {}
Parses the result-JSON to a dict.
134 def requests_get_paginated( 135 self, 136 endpoint: str, 137 params=frozendict(), 138 sudo=None, 139 page_key: str = "page", 140 first_page: int = 1, 141 ): 142 page = first_page 143 combined_params = {} 144 combined_params.update(params) 145 aggregated_result = [] 146 while True: 147 combined_params[page_key] = page 148 result = self.requests_get(endpoint, combined_params, sudo) 149 150 if not result: 151 return aggregated_result 152 153 if isinstance(result, dict): 154 if "data" in result: 155 data = result["data"] 156 if len(data) == 0: 157 return aggregated_result 158 aggregated_result.extend(data) 159 elif "tree" in result: 160 data = result["tree"] 161 if data is None or len(data) == 0: 162 return aggregated_result 163 aggregated_result.extend(data) 164 else: 165 raise NotImplementedError( 166 "requests_get_paginated does not know how to handle responses of this type." 167 ) 168 else: 169 aggregated_result.extend(result) 170 171 page += 1
173 def requests_put(self, endpoint: str, data: Optional[dict] = None): 174 if not data: 175 data = {} 176 request = self.requests.put( 177 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 178 ) 179 if request.status_code not in [200, 204]: 180 message = f"Received status code: {request.status_code} ({request.url}) {request.text}" 181 self.logger.error(message) 182 raise Exception(message)
184 def requests_delete(self, endpoint: str, data: Optional[dict] = None): 185 request = self.requests.delete( 186 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 187 ) 188 if request.status_code not in [200, 204]: 189 message = f"Received status code: {request.status_code} ({request.url})" 190 self.logger.error(message) 191 raise Exception(message)
193 def requests_post( 194 self, 195 endpoint: str, 196 data: Optional[dict] = None, 197 params: Optional[dict] = None, 198 files: Optional[dict] = None, 199 ): 200 """ 201 Make a POST call to the endpoint. 202 203 :param endpoint: The path to the endpoint 204 :param data: A dictionary for JSON data 205 :param params: A dictionary of query params 206 :param files: A dictionary of files, see requests.post. Using both files and data 207 can lead to unexpected results! 208 :return: The JSON response parsed as a dict 209 """ 210 211 # This should ideally be a TypedDict of the type of arguments taken by 212 # `requests.post`. 213 args: dict[str, Any] = { 214 "headers": self.headers.copy(), 215 } 216 if data is not None: 217 args["data"] = json.dumps(data) 218 if params is not None: 219 args["params"] = params 220 if files is not None: 221 args["headers"].pop("Content-type") 222 args["files"] = files 223 224 request = self.requests.post(self.__get_url(endpoint), **args) 225 226 if request.status_code not in [200, 201, 202]: 227 if "already exists" in request.text or "e-mail already in use" in request.text: 228 self.logger.warning(request.text) 229 raise AlreadyExistsException() 230 self.logger.error(f"Received status code: {request.status_code} ({request.url})") 231 self.logger.error(f"With info: {data} ({self.headers})") 232 self.logger.error(f"Answer: {request.text}") 233 raise Exception( 234 f"Received status code: {request.status_code} ({request.url}), {request.text}" 235 ) 236 return self.parse_result(request)
Make a POST call to the endpoint.
Parameters
- endpoint: The path to the endpoint
- data: A dictionary for JSON data
- params: A dictionary of query params
- files: A dictionary of files, see requests.post. Using both files and data can lead to unexpected results!
Returns
The JSON response parsed as a dict
238 def requests_patch(self, endpoint: str, data: dict): 239 request = self.requests.patch( 240 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 241 ) 242 if request.status_code not in [200, 201]: 243 error_message = f"Received status code: {request.status_code} ({request.url}) {data}" 244 self.logger.error(error_message) 245 raise Exception(error_message) 246 return self.parse_result(request)
288 def create_user( 289 self, 290 user_name: str, 291 email: str, 292 password: str, 293 full_name: Optional[str] = None, 294 login_name: Optional[str] = None, 295 change_pw=True, 296 send_notify=True, 297 source_id=0, 298 ): 299 """Create User. 300 Throws: 301 AlreadyExistsException, if the User exists already 302 Exception, if something else went wrong. 303 """ 304 if not login_name: 305 login_name = user_name 306 if not full_name: 307 full_name = user_name 308 request_data = { 309 "source_id": source_id, 310 "login_name": login_name, 311 "full_name": full_name, 312 "username": user_name, 313 "email": email, 314 "password": password, 315 "send_notify": send_notify, 316 "must_change_password": change_pw, 317 } 318 319 self.logger.debug("Gitea post payload: %s", request_data) 320 result = self.requests_post(AllSpice.ADMIN_CREATE_USER, data=request_data) 321 if "id" in result: 322 self.logger.info( 323 "Successfully created User %s <%s> (id %s)", 324 result["login"], 325 result["email"], 326 result["id"], 327 ) 328 self.logger.debug("Gitea response: %s", result) 329 else: 330 self.logger.error(result["message"]) 331 raise Exception("User not created... (gitea: %s)" % result["message"]) 332 user = User.parse_response(self, result) 333 return user
Create User. Throws: AlreadyExistsException, if the User exists already Exception, if something else went wrong.
335 def create_repo( 336 self, 337 repoOwner: Union[User, Organization], 338 repoName: str, 339 description: str = "", 340 private: bool = False, 341 autoInit=True, 342 gitignores: Optional[str] = None, 343 license: Optional[str] = None, 344 readme: str = "Default", 345 issue_labels: Optional[str] = None, 346 default_branch="master", 347 ): 348 """Create a Repository as the administrator 349 350 Throws: 351 AlreadyExistsException: If the Repository exists already. 352 Exception: If something else went wrong. 353 354 Note: 355 Non-admin users can not use this method. Please use instead 356 `allspice.User.create_repo` or `allspice.Organization.create_repo`. 357 """ 358 # although this only says user in the api, this also works for 359 # organizations 360 assert isinstance(repoOwner, User) or isinstance(repoOwner, Organization) 361 result = self.requests_post( 362 AllSpice.ADMIN_REPO_CREATE % repoOwner.username, 363 data={ 364 "name": repoName, 365 "description": description, 366 "private": private, 367 "auto_init": autoInit, 368 "gitignores": gitignores, 369 "license": license, 370 "issue_labels": issue_labels, 371 "readme": readme, 372 "default_branch": default_branch, 373 }, 374 ) 375 if "id" in result: 376 self.logger.info("Successfully created Repository %s " % result["name"]) 377 else: 378 self.logger.error(result["message"]) 379 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 380 return Repository.parse_response(self, result)
Create a Repository as the administrator
Throws: AlreadyExistsException: If the Repository exists already. Exception: If something else went wrong.
Note:
Non-admin users can not use this method. Please use instead
allspice.User.create_repo
or allspice.Organization.create_repo
.
382 def create_org( 383 self, 384 owner: User, 385 orgName: str, 386 description: str, 387 location="", 388 website="", 389 full_name="", 390 ): 391 assert isinstance(owner, User) 392 result = self.requests_post( 393 AllSpice.CREATE_ORG % owner.username, 394 data={ 395 "username": orgName, 396 "description": description, 397 "location": location, 398 "website": website, 399 "full_name": full_name, 400 }, 401 ) 402 if "id" in result: 403 self.logger.info("Successfully created Organization %s" % result["username"]) 404 else: 405 self.logger.error("Organization not created... (gitea: %s)" % result["message"]) 406 self.logger.error(result["message"]) 407 raise Exception("Organization not created... (gitea: %s)" % result["message"]) 408 return Organization.parse_response(self, result)
410 def create_team( 411 self, 412 org: Organization, 413 name: str, 414 description: str = "", 415 permission: str = "read", 416 can_create_org_repo: bool = False, 417 includes_all_repositories: bool = False, 418 units=( 419 "repo.code", 420 "repo.issues", 421 "repo.ext_issues", 422 "repo.wiki", 423 "repo.pulls", 424 "repo.releases", 425 "repo.ext_wiki", 426 ), 427 units_map={}, 428 ): 429 """Creates a Team. 430 431 Args: 432 org (Organization): Organization the Team will be part of. 433 name (str): The Name of the Team to be created. 434 description (str): Optional, None, short description of the new Team. 435 permission (str): Optional, 'read', What permissions the members 436 units_map (dict): Optional, {}, a mapping of units to their 437 permissions. If None or empty, the `permission` permission will 438 be applied to all units. Note: When both `units` and `units_map` 439 are given, `units_map` will be preferred. 440 """ 441 442 result = self.requests_post( 443 AllSpice.CREATE_TEAM % org.username, 444 data={ 445 "name": name, 446 "description": description, 447 "permission": permission, 448 "can_create_org_repo": can_create_org_repo, 449 "includes_all_repositories": includes_all_repositories, 450 "units": units, 451 "units_map": units_map, 452 }, 453 ) 454 455 if "id" in result: 456 self.logger.info("Successfully created Team %s" % result["name"]) 457 else: 458 self.logger.error("Team not created... (gitea: %s)" % result["message"]) 459 self.logger.error(result["message"]) 460 raise Exception("Team not created... (gitea: %s)" % result["message"]) 461 api_object = Team.parse_response(self, result) 462 setattr( 463 api_object, "_organization", org 464 ) # fixes strange behaviour of gitea not returning a valid organization here. 465 return api_object
Creates a Team.
Args:
org (Organization): Organization the Team will be part of.
name (str): The Name of the Team to be created.
description (str): Optional, None, short description of the new Team.
permission (str): Optional, 'read', What permissions the members
units_map (dict): Optional, {}, a mapping of units to their
permissions. If None or empty, the permission
permission will
be applied to all units. Note: When both units
and units_map
are given, units_map
will be preferred.
239class User(ApiObject): 240 active: bool 241 admin: Any 242 allow_create_organization: Any 243 allow_git_hook: Any 244 allow_import_local: Any 245 avatar_url: str 246 created: str 247 description: str 248 email: str 249 emails: List[Any] 250 followers_count: int 251 following_count: int 252 full_name: str 253 id: int 254 is_admin: bool 255 language: str 256 last_login: str 257 location: str 258 login: str 259 login_name: str 260 max_repo_creation: Any 261 must_change_password: Any 262 password: Any 263 prohibit_login: bool 264 restricted: bool 265 starred_repos_count: int 266 username: str 267 visibility: str 268 website: str 269 270 API_OBJECT = """/users/{name}""" # <org> 271 USER_MAIL = """/user/emails?sudo=%s""" # <name> 272 USER_PATCH = """/admin/users/%s""" # <username> 273 ADMIN_DELETE_USER = """/admin/users/%s""" # <username> 274 ADMIN_EDIT_USER = """/admin/users/{username}""" # <username> 275 USER_HEATMAP = """/users/%s/heatmap""" # <username> 276 277 def __init__(self, allspice_client): 278 super().__init__(allspice_client) 279 self._emails = [] 280 281 def __eq__(self, other): 282 if not isinstance(other, User): 283 return False 284 return self.allspice_client == other.allspice_client and self.id == other.id 285 286 def __hash__(self): 287 return hash(self.allspice_client) ^ hash(self.id) 288 289 @property 290 def emails(self): 291 self.__request_emails() 292 return self._emails 293 294 @classmethod 295 def request(cls, allspice_client, name: str) -> "User": 296 api_object = cls._request(allspice_client, {"name": name}) 297 return api_object 298 299 _patchable_fields: ClassVar[set[str]] = { 300 "active", 301 "admin", 302 "allow_create_organization", 303 "allow_git_hook", 304 "allow_import_local", 305 "email", 306 "full_name", 307 "location", 308 "login_name", 309 "max_repo_creation", 310 "must_change_password", 311 "password", 312 "prohibit_login", 313 "website", 314 } 315 316 def commit(self, login_name: str, source_id: int = 0): 317 """ 318 Unfortunately it is necessary to require the login name 319 as well as the login source (that is not supplied when getting a user) for 320 changing a user. 321 Usually source_id is 0 and the login_name is equal to the username. 322 """ 323 values = self.get_dirty_fields() 324 values.update( 325 # api-doc says that the "source_id" is necessary; works without though 326 {"login_name": login_name, "source_id": source_id} 327 ) 328 args = {"username": self.username} 329 self.allspice_client.requests_patch(User.ADMIN_EDIT_USER.format(**args), data=values) 330 self._dirty_fields = {} 331 332 def create_repo( 333 self, 334 repoName: str, 335 description: str = "", 336 private: bool = False, 337 autoInit=True, 338 gitignores: Optional[str] = None, 339 license: Optional[str] = None, 340 readme: str = "Default", 341 issue_labels: Optional[str] = None, 342 default_branch="master", 343 ): 344 """Create a user Repository 345 346 Throws: 347 AlreadyExistsException: If the Repository exists already. 348 Exception: If something else went wrong. 349 """ 350 result = self.allspice_client.requests_post( 351 "/user/repos", 352 data={ 353 "name": repoName, 354 "description": description, 355 "private": private, 356 "auto_init": autoInit, 357 "gitignores": gitignores, 358 "license": license, 359 "issue_labels": issue_labels, 360 "readme": readme, 361 "default_branch": default_branch, 362 }, 363 ) 364 if "id" in result: 365 self.allspice_client.logger.info("Successfully created Repository %s " % result["name"]) 366 else: 367 self.allspice_client.logger.error(result["message"]) 368 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 369 return Repository.parse_response(self.allspice_client, result) 370 371 def get_repositories(self) -> List["Repository"]: 372 """Get all Repositories owned by this User.""" 373 url = f"/users/{self.username}/repos" 374 results = self.allspice_client.requests_get_paginated(url) 375 return [Repository.parse_response(self.allspice_client, result) for result in results] 376 377 def get_orgs(self) -> List[Organization]: 378 """Get all Organizations this user is a member of.""" 379 url = f"/users/{self.username}/orgs" 380 results = self.allspice_client.requests_get_paginated(url) 381 return [Organization.parse_response(self.allspice_client, result) for result in results] 382 383 def get_teams(self) -> List["Team"]: 384 url = "/user/teams" 385 results = self.allspice_client.requests_get_paginated(url, sudo=self) 386 return [Team.parse_response(self.allspice_client, result) for result in results] 387 388 def get_accessible_repos(self) -> List["Repository"]: 389 """Get all Repositories accessible by the logged in User.""" 390 results = self.allspice_client.requests_get("/user/repos", sudo=self) 391 return [Repository.parse_response(self.allspice_client, result) for result in results] 392 393 def __request_emails(self): 394 result = self.allspice_client.requests_get(User.USER_MAIL % self.login) 395 # report if the adress changed by this 396 for mail in result: 397 self._emails.append(mail["email"]) 398 if mail["primary"]: 399 self._email = mail["email"] 400 401 def delete(self): 402 """Deletes this User. Also deletes all Repositories he owns.""" 403 self.allspice_client.requests_delete(User.ADMIN_DELETE_USER % self.username) 404 self.deleted = True 405 406 def get_heatmap(self) -> List[Tuple[datetime, int]]: 407 results = self.allspice_client.requests_get(User.USER_HEATMAP % self.username) 408 results = [ 409 (datetime.fromtimestamp(result["timestamp"]), result["contributions"]) 410 for result in results 411 ] 412 return results
316 def commit(self, login_name: str, source_id: int = 0): 317 """ 318 Unfortunately it is necessary to require the login name 319 as well as the login source (that is not supplied when getting a user) for 320 changing a user. 321 Usually source_id is 0 and the login_name is equal to the username. 322 """ 323 values = self.get_dirty_fields() 324 values.update( 325 # api-doc says that the "source_id" is necessary; works without though 326 {"login_name": login_name, "source_id": source_id} 327 ) 328 args = {"username": self.username} 329 self.allspice_client.requests_patch(User.ADMIN_EDIT_USER.format(**args), data=values) 330 self._dirty_fields = {}
Unfortunately it is necessary to require the login name as well as the login source (that is not supplied when getting a user) for changing a user. Usually source_id is 0 and the login_name is equal to the username.
332 def create_repo( 333 self, 334 repoName: str, 335 description: str = "", 336 private: bool = False, 337 autoInit=True, 338 gitignores: Optional[str] = None, 339 license: Optional[str] = None, 340 readme: str = "Default", 341 issue_labels: Optional[str] = None, 342 default_branch="master", 343 ): 344 """Create a user Repository 345 346 Throws: 347 AlreadyExistsException: If the Repository exists already. 348 Exception: If something else went wrong. 349 """ 350 result = self.allspice_client.requests_post( 351 "/user/repos", 352 data={ 353 "name": repoName, 354 "description": description, 355 "private": private, 356 "auto_init": autoInit, 357 "gitignores": gitignores, 358 "license": license, 359 "issue_labels": issue_labels, 360 "readme": readme, 361 "default_branch": default_branch, 362 }, 363 ) 364 if "id" in result: 365 self.allspice_client.logger.info("Successfully created Repository %s " % result["name"]) 366 else: 367 self.allspice_client.logger.error(result["message"]) 368 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 369 return Repository.parse_response(self.allspice_client, result)
Create a user Repository
Throws: AlreadyExistsException: If the Repository exists already. Exception: If something else went wrong.
371 def get_repositories(self) -> List["Repository"]: 372 """Get all Repositories owned by this User.""" 373 url = f"/users/{self.username}/repos" 374 results = self.allspice_client.requests_get_paginated(url) 375 return [Repository.parse_response(self.allspice_client, result) for result in results]
Get all Repositories owned by this User.
377 def get_orgs(self) -> List[Organization]: 378 """Get all Organizations this user is a member of.""" 379 url = f"/users/{self.username}/orgs" 380 results = self.allspice_client.requests_get_paginated(url) 381 return [Organization.parse_response(self.allspice_client, result) for result in results]
Get all Organizations this user is a member of.
388 def get_accessible_repos(self) -> List["Repository"]: 389 """Get all Repositories accessible by the logged in User.""" 390 results = self.allspice_client.requests_get("/user/repos", sudo=self) 391 return [Repository.parse_response(self.allspice_client, result) for result in results]
Get all Repositories accessible by the logged in User.
401 def delete(self): 402 """Deletes this User. Also deletes all Repositories he owns.""" 403 self.allspice_client.requests_delete(User.ADMIN_DELETE_USER % self.username) 404 self.deleted = True
Deletes this User. Also deletes all Repositories he owns.
33class Organization(ApiObject): 34 """see https://hub.allspice.io/api/swagger#/organization/orgGetAll""" 35 36 active: Optional[bool] 37 avatar_url: str 38 created: Optional[str] 39 description: str 40 email: str 41 followers_count: Optional[int] 42 following_count: Optional[int] 43 full_name: str 44 id: int 45 is_admin: Optional[bool] 46 language: Optional[str] 47 last_login: Optional[str] 48 location: str 49 login: Optional[str] 50 login_name: Optional[str] 51 name: Optional[str] 52 prohibit_login: Optional[bool] 53 repo_admin_change_team_access: Optional[bool] 54 restricted: Optional[bool] 55 starred_repos_count: Optional[int] 56 username: str 57 visibility: str 58 website: str 59 60 API_OBJECT = """/orgs/{name}""" # <org> 61 ORG_REPOS_REQUEST = """/orgs/%s/repos""" # <org> 62 ORG_TEAMS_REQUEST = """/orgs/%s/teams""" # <org> 63 ORG_TEAMS_CREATE = """/orgs/%s/teams""" # <org> 64 ORG_GET_MEMBERS = """/orgs/%s/members""" # <org> 65 ORG_IS_MEMBER = """/orgs/%s/members/%s""" # <org>, <username> 66 ORG_HEATMAP = """/users/%s/heatmap""" # <username> 67 68 def __init__(self, allspice_client): 69 super().__init__(allspice_client) 70 71 def __eq__(self, other): 72 if not isinstance(other, Organization): 73 return False 74 return self.allspice_client == other.allspice_client and self.name == other.name 75 76 def __hash__(self): 77 return hash(self.allspice_client) ^ hash(self.name) 78 79 @classmethod 80 def request(cls, allspice_client, name: str) -> Self: 81 return cls._request(allspice_client, {"name": name}) 82 83 @classmethod 84 def parse_response(cls, allspice_client, result) -> "Organization": 85 api_object = super().parse_response(allspice_client, result) 86 # add "name" field to make this behave similar to users for gitea < 1.18 87 # also necessary for repository-owner when org is repo owner 88 if not hasattr(api_object, "name"): 89 Organization._add_read_property("name", result["username"], api_object) 90 return api_object 91 92 _patchable_fields: ClassVar[set[str]] = { 93 "description", 94 "full_name", 95 "location", 96 "visibility", 97 "website", 98 } 99 100 def commit(self): 101 args = {"name": self.name} 102 self._commit(args) 103 104 def create_repo( 105 self, 106 repoName: str, 107 description: str = "", 108 private: bool = False, 109 autoInit=True, 110 gitignores: Optional[str] = None, 111 license: Optional[str] = None, 112 readme: str = "Default", 113 issue_labels: Optional[str] = None, 114 default_branch="master", 115 ): 116 """Create an organization Repository 117 118 Throws: 119 AlreadyExistsException: If the Repository exists already. 120 Exception: If something else went wrong. 121 """ 122 result = self.allspice_client.requests_post( 123 f"/orgs/{self.name}/repos", 124 data={ 125 "name": repoName, 126 "description": description, 127 "private": private, 128 "auto_init": autoInit, 129 "gitignores": gitignores, 130 "license": license, 131 "issue_labels": issue_labels, 132 "readme": readme, 133 "default_branch": default_branch, 134 }, 135 ) 136 if "id" in result: 137 self.allspice_client.logger.info("Successfully created Repository %s " % result["name"]) 138 else: 139 self.allspice_client.logger.error(result["message"]) 140 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 141 return Repository.parse_response(self.allspice_client, result) 142 143 def get_repositories(self) -> List["Repository"]: 144 results = self.allspice_client.requests_get_paginated( 145 Organization.ORG_REPOS_REQUEST % self.username 146 ) 147 return [Repository.parse_response(self.allspice_client, result) for result in results] 148 149 def get_repository(self, name) -> "Repository": 150 repos = self.get_repositories() 151 for repo in repos: 152 if repo.name == name: 153 return repo 154 raise NotFoundException("Repository %s not existent in organization." % name) 155 156 def get_teams(self) -> List["Team"]: 157 results = self.allspice_client.requests_get(Organization.ORG_TEAMS_REQUEST % self.username) 158 teams = [Team.parse_response(self.allspice_client, result) for result in results] 159 # organisation seems to be missing using this request, so we add org manually 160 for t in teams: 161 setattr(t, "_organization", self) 162 return teams 163 164 def get_team(self, name) -> "Team": 165 teams = self.get_teams() 166 for team in teams: 167 if team.name == name: 168 return team 169 raise NotFoundException("Team not existent in organization.") 170 171 def create_team( 172 self, 173 name: str, 174 description: str = "", 175 permission: str = "read", 176 can_create_org_repo: bool = False, 177 includes_all_repositories: bool = False, 178 units=( 179 "repo.code", 180 "repo.issues", 181 "repo.ext_issues", 182 "repo.wiki", 183 "repo.pulls", 184 "repo.releases", 185 "repo.ext_wiki", 186 ), 187 units_map={}, 188 ) -> "Team": 189 """Alias for AllSpice#create_team""" 190 # TODO: Move AllSpice#create_team to Organization#create_team and 191 # deprecate AllSpice#create_team. 192 return self.allspice_client.create_team( 193 org=self, 194 name=name, 195 description=description, 196 permission=permission, 197 can_create_org_repo=can_create_org_repo, 198 includes_all_repositories=includes_all_repositories, 199 units=units, 200 units_map=units_map, 201 ) 202 203 def get_members(self) -> List["User"]: 204 results = self.allspice_client.requests_get(Organization.ORG_GET_MEMBERS % self.username) 205 return [User.parse_response(self.allspice_client, result) for result in results] 206 207 def is_member(self, username) -> bool: 208 if isinstance(username, User): 209 username = username.username 210 try: 211 # returns 204 if its ok, 404 if its not 212 self.allspice_client.requests_get( 213 Organization.ORG_IS_MEMBER % (self.username, username) 214 ) 215 return True 216 except Exception: 217 return False 218 219 def remove_member(self, user: "User"): 220 path = f"/orgs/{self.username}/members/{user.username}" 221 self.allspice_client.requests_delete(path) 222 223 def delete(self): 224 """Delete this Organization. Invalidates this Objects data. Also deletes all Repositories owned by the User""" 225 for repo in self.get_repositories(): 226 repo.delete() 227 self.allspice_client.requests_delete(Organization.API_OBJECT.format(name=self.username)) 228 self.deleted = True 229 230 def get_heatmap(self) -> List[Tuple[datetime, int]]: 231 results = self.allspice_client.requests_get(User.USER_HEATMAP % self.username) 232 results = [ 233 (datetime.fromtimestamp(result["timestamp"]), result["contributions"]) 234 for result in results 235 ] 236 return results
see allspice.allspice.io/api/swagger#/organization/orgGetAll">https://huballspice.allspice.io/api/swagger#/organization/orgGetAll
83 @classmethod 84 def parse_response(cls, allspice_client, result) -> "Organization": 85 api_object = super().parse_response(allspice_client, result) 86 # add "name" field to make this behave similar to users for gitea < 1.18 87 # also necessary for repository-owner when org is repo owner 88 if not hasattr(api_object, "name"): 89 Organization._add_read_property("name", result["username"], api_object) 90 return api_object
104 def create_repo( 105 self, 106 repoName: str, 107 description: str = "", 108 private: bool = False, 109 autoInit=True, 110 gitignores: Optional[str] = None, 111 license: Optional[str] = None, 112 readme: str = "Default", 113 issue_labels: Optional[str] = None, 114 default_branch="master", 115 ): 116 """Create an organization Repository 117 118 Throws: 119 AlreadyExistsException: If the Repository exists already. 120 Exception: If something else went wrong. 121 """ 122 result = self.allspice_client.requests_post( 123 f"/orgs/{self.name}/repos", 124 data={ 125 "name": repoName, 126 "description": description, 127 "private": private, 128 "auto_init": autoInit, 129 "gitignores": gitignores, 130 "license": license, 131 "issue_labels": issue_labels, 132 "readme": readme, 133 "default_branch": default_branch, 134 }, 135 ) 136 if "id" in result: 137 self.allspice_client.logger.info("Successfully created Repository %s " % result["name"]) 138 else: 139 self.allspice_client.logger.error(result["message"]) 140 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 141 return Repository.parse_response(self.allspice_client, result)
Create an organization Repository
Throws: AlreadyExistsException: If the Repository exists already. Exception: If something else went wrong.
156 def get_teams(self) -> List["Team"]: 157 results = self.allspice_client.requests_get(Organization.ORG_TEAMS_REQUEST % self.username) 158 teams = [Team.parse_response(self.allspice_client, result) for result in results] 159 # organisation seems to be missing using this request, so we add org manually 160 for t in teams: 161 setattr(t, "_organization", self) 162 return teams
171 def create_team( 172 self, 173 name: str, 174 description: str = "", 175 permission: str = "read", 176 can_create_org_repo: bool = False, 177 includes_all_repositories: bool = False, 178 units=( 179 "repo.code", 180 "repo.issues", 181 "repo.ext_issues", 182 "repo.wiki", 183 "repo.pulls", 184 "repo.releases", 185 "repo.ext_wiki", 186 ), 187 units_map={}, 188 ) -> "Team": 189 """Alias for AllSpice#create_team""" 190 # TODO: Move AllSpice#create_team to Organization#create_team and 191 # deprecate AllSpice#create_team. 192 return self.allspice_client.create_team( 193 org=self, 194 name=name, 195 description=description, 196 permission=permission, 197 can_create_org_repo=can_create_org_repo, 198 includes_all_repositories=includes_all_repositories, 199 units=units, 200 units_map=units_map, 201 )
Alias for AllSpice#create_team
207 def is_member(self, username) -> bool: 208 if isinstance(username, User): 209 username = username.username 210 try: 211 # returns 204 if its ok, 404 if its not 212 self.allspice_client.requests_get( 213 Organization.ORG_IS_MEMBER % (self.username, username) 214 ) 215 return True 216 except Exception: 217 return False
223 def delete(self): 224 """Delete this Organization. Invalidates this Objects data. Also deletes all Repositories owned by the User""" 225 for repo in self.get_repositories(): 226 repo.delete() 227 self.allspice_client.requests_delete(Organization.API_OBJECT.format(name=self.username)) 228 self.deleted = True
Delete this Organization. Invalidates this Objects data. Also deletes all Repositories owned by the User
2206class Team(ApiObject): 2207 can_create_org_repo: bool 2208 description: str 2209 id: int 2210 includes_all_repositories: bool 2211 name: str 2212 organization: Optional["Organization"] 2213 permission: str 2214 units: List[str] 2215 units_map: Dict[str, str] 2216 2217 API_OBJECT = """/teams/{id}""" # <id> 2218 ADD_REPO = """/teams/%s/repos/%s/%s""" # <id, org, repo> 2219 TEAM_DELETE = """/teams/%s""" # <id> 2220 GET_MEMBERS = """/teams/%s/members""" # <id> 2221 GET_REPOS = """/teams/%s/repos""" # <id> 2222 2223 def __init__(self, allspice_client): 2224 super().__init__(allspice_client) 2225 2226 def __eq__(self, other): 2227 if not isinstance(other, Team): 2228 return False 2229 return self.organization == other.organization and self.id == other.id 2230 2231 def __hash__(self): 2232 return hash(self.organization) ^ hash(self.id) 2233 2234 _fields_to_parsers: ClassVar[dict] = { 2235 "organization": lambda allspice_client, o: Organization.parse_response(allspice_client, o) 2236 } 2237 2238 _patchable_fields: ClassVar[set[str]] = { 2239 "can_create_org_repo", 2240 "description", 2241 "includes_all_repositories", 2242 "name", 2243 "permission", 2244 "units", 2245 "units_map", 2246 } 2247 2248 @classmethod 2249 def request(cls, allspice_client, id: int): 2250 return cls._request(allspice_client, {"id": id}) 2251 2252 def commit(self): 2253 args = {"id": self.id} 2254 self._commit(args) 2255 2256 def add_user(self, user: User): 2257 """https://hub.allspice.io/api/swagger#/organization/orgAddTeamMember""" 2258 url = f"/teams/{self.id}/members/{user.login}" 2259 self.allspice_client.requests_put(url) 2260 2261 def add_repo(self, org: Organization, repo: Union[Repository, str]): 2262 if isinstance(repo, Repository): 2263 repo_name = repo.name 2264 else: 2265 repo_name = repo 2266 self.allspice_client.requests_put(Team.ADD_REPO % (self.id, org.username, repo_name)) 2267 2268 def get_members(self): 2269 """Get all users assigned to the team.""" 2270 results = self.allspice_client.requests_get_paginated( 2271 Team.GET_MEMBERS % self.id, 2272 ) 2273 return [User.parse_response(self.allspice_client, result) for result in results] 2274 2275 def get_repos(self): 2276 """Get all repos of this Team.""" 2277 results = self.allspice_client.requests_get(Team.GET_REPOS % self.id) 2278 return [Repository.parse_response(self.allspice_client, result) for result in results] 2279 2280 def delete(self): 2281 self.allspice_client.requests_delete(Team.TEAM_DELETE % self.id) 2282 self.deleted = True 2283 2284 def remove_team_member(self, user_name: str): 2285 url = f"/teams/{self.id}/members/{user_name}" 2286 self.allspice_client.requests_delete(url)
2256 def add_user(self, user: User): 2257 """https://hub.allspice.io/api/swagger#/organization/orgAddTeamMember""" 2258 url = f"/teams/{self.id}/members/{user.login}" 2259 self.allspice_client.requests_put(url)
allspice.allspice.io/api/swagger#/organization/orgAddTeamMember">https://huballspice.allspice.io/api/swagger#/organization/orgAddTeamMember
2268 def get_members(self): 2269 """Get all users assigned to the team.""" 2270 results = self.allspice_client.requests_get_paginated( 2271 Team.GET_MEMBERS % self.id, 2272 ) 2273 return [User.parse_response(self.allspice_client, result) for result in results]
Get all users assigned to the team.
2275 def get_repos(self): 2276 """Get all repos of this Team.""" 2277 results = self.allspice_client.requests_get(Team.GET_REPOS % self.id) 2278 return [Repository.parse_response(self.allspice_client, result) for result in results]
Get all repos of this Team.
473class Repository(ApiObject): 474 allow_manual_merge: Any 475 allow_merge_commits: bool 476 allow_rebase: bool 477 allow_rebase_explicit: bool 478 allow_rebase_update: bool 479 allow_squash_merge: bool 480 archived: bool 481 archived_at: str 482 autodetect_manual_merge: Any 483 avatar_url: str 484 clone_url: str 485 created_at: str 486 default_allow_maintainer_edit: bool 487 default_branch: str 488 default_delete_branch_after_merge: bool 489 default_merge_style: str 490 description: str 491 empty: bool 492 enable_prune: Any 493 external_tracker: Any 494 external_wiki: Any 495 fork: bool 496 forks_count: int 497 full_name: str 498 has_actions: bool 499 has_issues: bool 500 has_packages: bool 501 has_projects: bool 502 has_pull_requests: bool 503 has_releases: bool 504 has_wiki: bool 505 html_url: str 506 id: int 507 ignore_whitespace_conflicts: bool 508 internal: bool 509 internal_tracker: Dict[str, bool] 510 language: str 511 languages_url: str 512 link: str 513 mirror: bool 514 mirror_interval: str 515 mirror_updated: str 516 name: str 517 open_issues_count: int 518 open_pr_counter: int 519 original_url: str 520 owner: Union["User", "Organization"] 521 parent: Any 522 permissions: Dict[str, bool] 523 private: bool 524 release_counter: int 525 repo_transfer: Any 526 size: int 527 ssh_url: str 528 stars_count: int 529 template: bool 530 updated_at: datetime 531 url: str 532 watchers_count: int 533 website: str 534 535 API_OBJECT = """/repos/{owner}/{name}""" # <owner>, <reponame> 536 REPO_IS_COLLABORATOR = """/repos/%s/%s/collaborators/%s""" # <owner>, <reponame>, <username> 537 REPO_SEARCH = """/repos/search/""" 538 REPO_BRANCHES = """/repos/%s/%s/branches""" # <owner>, <reponame> 539 REPO_BRANCH = """/repos/{owner}/{repo}/branches/{branch}""" 540 REPO_ISSUES = """/repos/{owner}/{repo}/issues""" # <owner, reponame> 541 REPO_DESIGN_REVIEWS = """/repos/{owner}/{repo}/pulls""" 542 REPO_DELETE = """/repos/%s/%s""" # <owner>, <reponame> 543 REPO_TIMES = """/repos/%s/%s/times""" # <owner>, <reponame> 544 REPO_USER_TIME = """/repos/%s/%s/times/%s""" # <owner>, <reponame>, <username> 545 REPO_COMMITS = "/repos/%s/%s/commits" # <owner>, <reponame> 546 REPO_TRANSFER = "/repos/{owner}/{repo}/transfer" 547 REPO_MILESTONES = """/repos/{owner}/{repo}/milestones""" 548 REPO_GET_ARCHIVE = "/repos/{owner}/{repo}/archive/{ref}.{format}" 549 REPO_GET_ALLSPICE_JSON = "/repos/{owner}/{repo}/allspice_generated/json/{content}" 550 REPO_GET_ALLSPICE_SVG = "/repos/{owner}/{repo}/allspice_generated/svg/{content}" 551 REPO_GET_TOPICS = "/repos/{owner}/{repo}/topics" 552 REPO_ADD_TOPIC = "/repos/{owner}/{repo}/topics/{topic}" 553 REPO_GET_RELEASES = "/repos/{owner}/{repo}/releases" 554 REPO_GET_LATEST_RELEASE = "/repos/{owner}/{repo}/releases/latest" 555 REPO_GET_RELEASE_BY_TAG = "/repos/{owner}/{repo}/releases/tags/{tag}" 556 REPO_GET_COMMIT_STATUS = "/repos/{owner}/{repo}/statuses/{sha}" 557 REPO_GET_RAW_FILE = "/repos/{owner}/{repo}/raw/{path}" 558 REPO_GET_TREE = "/repos/{owner}/{repo}/git/trees/{ref}" 559 560 class ArchiveFormat(Enum): 561 """ 562 Archive formats for Repository.get_archive 563 """ 564 565 TAR = "tar.gz" 566 ZIP = "zip" 567 568 class CommitStatusSort(Enum): 569 """ 570 Sort order for Repository.get_commit_status 571 """ 572 573 OLDEST = "oldest" 574 RECENT_UPDATE = "recentupdate" 575 LEAST_UPDATE = "leastupdate" 576 LEAST_INDEX = "leastindex" 577 HIGHEST_INDEX = "highestindex" 578 579 def __init__(self, allspice_client): 580 super().__init__(allspice_client) 581 582 def __eq__(self, other): 583 if not isinstance(other, Repository): 584 return False 585 return self.owner == other.owner and self.name == other.name 586 587 def __hash__(self): 588 return hash(self.owner) ^ hash(self.name) 589 590 _fields_to_parsers: ClassVar[dict] = { 591 # dont know how to tell apart user and org as owner except form email being empty. 592 "owner": lambda allspice_client, r: ( 593 Organization.parse_response(allspice_client, r) 594 if r["email"] == "" 595 else User.parse_response(allspice_client, r) 596 ), 597 "updated_at": lambda _, t: Util.convert_time(t), 598 } 599 600 @classmethod 601 def request( 602 cls, 603 allspice_client, 604 owner: str, 605 name: str, 606 ) -> Repository: 607 return cls._request(allspice_client, {"owner": owner, "name": name}) 608 609 @classmethod 610 def search( 611 cls, 612 allspice_client, 613 query: Optional[str] = None, 614 topic: bool = False, 615 include_description: bool = False, 616 user: Optional[User] = None, 617 owner_to_prioritize: Union[User, Organization, None] = None, 618 ) -> list[Repository]: 619 """ 620 Search for repositories. 621 622 See https://hub.allspice.io/api/swagger#/repository/repoSearch 623 624 :param query: The query string to search for 625 :param topic: If true, the query string will only be matched against the 626 repository's topic. 627 :param include_description: If true, the query string will be matched 628 against the repository's description as well. 629 :param user: If specified, only repositories that this user owns or 630 contributes to will be searched. 631 :param owner_to_prioritize: If specified, repositories owned by the 632 given entity will be prioritized in the search. 633 :returns: All repositories matching the query. If there are many 634 repositories matching this query, this may take some time. 635 """ 636 637 params = {} 638 639 if query is not None: 640 params["q"] = query 641 if topic: 642 params["topic"] = topic 643 if include_description: 644 params["include_description"] = include_description 645 if user is not None: 646 params["user"] = user.id 647 if owner_to_prioritize is not None: 648 params["owner_to_prioritize"] = owner_to_prioritize.id 649 650 responses = allspice_client.requests_get_paginated(cls.REPO_SEARCH, params=params) 651 652 return [Repository.parse_response(allspice_client, response) for response in responses] 653 654 _patchable_fields: ClassVar[set[str]] = { 655 "allow_manual_merge", 656 "allow_merge_commits", 657 "allow_rebase", 658 "allow_rebase_explicit", 659 "allow_rebase_update", 660 "allow_squash_merge", 661 "archived", 662 "autodetect_manual_merge", 663 "default_branch", 664 "default_delete_branch_after_merge", 665 "default_merge_style", 666 "description", 667 "enable_prune", 668 "external_tracker", 669 "external_wiki", 670 "has_issues", 671 "has_projects", 672 "has_pull_requests", 673 "has_wiki", 674 "ignore_whitespace_conflicts", 675 "internal_tracker", 676 "mirror_interval", 677 "name", 678 "private", 679 "template", 680 "website", 681 } 682 683 def commit(self): 684 args = {"owner": self.owner.username, "name": self.name} 685 self._commit(args) 686 687 def get_branches(self) -> List["Branch"]: 688 """Get all the Branches of this Repository.""" 689 690 results = self.allspice_client.requests_get_paginated( 691 Repository.REPO_BRANCHES % (self.owner.username, self.name) 692 ) 693 return [Branch.parse_response(self.allspice_client, result) for result in results] 694 695 def get_branch(self, name: str) -> "Branch": 696 """Get a specific Branch of this Repository.""" 697 result = self.allspice_client.requests_get( 698 Repository.REPO_BRANCH.format(owner=self.owner.username, repo=self.name, branch=name) 699 ) 700 return Branch.parse_response(self.allspice_client, result) 701 702 def add_branch(self, create_from: Ref, newname: str) -> "Branch": 703 """Add a branch to the repository""" 704 # Note: will only work with gitea 1.13 or higher! 705 706 ref_name = Util.data_params_for_ref(create_from) 707 if "ref" not in ref_name: 708 raise ValueError("create_from must be a Branch, Commit or string") 709 ref_name = ref_name["ref"] 710 711 data = {"new_branch_name": newname, "old_ref_name": ref_name} 712 result = self.allspice_client.requests_post( 713 Repository.REPO_BRANCHES % (self.owner.username, self.name), data=data 714 ) 715 return Branch.parse_response(self.allspice_client, result) 716 717 def get_issues( 718 self, 719 state: Literal["open", "closed", "all"] = "all", 720 search_query: Optional[str] = None, 721 labels: Optional[List[str]] = None, 722 milestones: Optional[List[Union[Milestone, str]]] = None, 723 assignee: Optional[Union[User, str]] = None, 724 since: Optional[datetime] = None, 725 before: Optional[datetime] = None, 726 ) -> List["Issue"]: 727 """ 728 Get all Issues of this Repository (open and closed) 729 730 https://hub.allspice.io/api/swagger#/repository/repoListIssues 731 732 All params of this method are optional filters. If you don't specify a filter, it 733 will not be applied. 734 735 :param state: The state of the Issues to get. If None, all Issues are returned. 736 :param search_query: Filter issues by text. This is equivalent to searching for 737 `search_query` in the Issues on the web interface. 738 :param labels: Filter issues by labels. 739 :param milestones: Filter issues by milestones. 740 :param assignee: Filter issues by the assigned user. 741 :param since: Filter issues by the date they were created. 742 :param before: Filter issues by the date they were created. 743 :return: A list of Issues. 744 """ 745 746 data = { 747 "state": state, 748 } 749 if search_query: 750 data["q"] = search_query 751 if labels: 752 data["labels"] = ",".join(labels) 753 if milestones: 754 data["milestone"] = ",".join( 755 [ 756 milestone.name if isinstance(milestone, Milestone) else milestone 757 for milestone in milestones 758 ] 759 ) 760 if assignee: 761 if isinstance(assignee, User): 762 data["assignee"] = assignee.username 763 else: 764 data["assignee"] = assignee 765 if since: 766 data["since"] = Util.format_time(since) 767 if before: 768 data["before"] = Util.format_time(before) 769 770 results = self.allspice_client.requests_get_paginated( 771 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 772 params=data, 773 ) 774 775 issues = [] 776 for result in results: 777 issue = Issue.parse_response(self.allspice_client, result) 778 # See Issue.request 779 setattr(issue, "_repository", self) 780 # This is mostly for compatibility with an older implementation 781 Issue._add_read_property("repo", self, issue) 782 issues.append(issue) 783 784 return issues 785 786 def get_design_reviews( 787 self, 788 state: Literal["open", "closed", "all"] = "all", 789 milestone: Optional[Union[Milestone, str]] = None, 790 labels: Optional[List[str]] = None, 791 ) -> List["DesignReview"]: 792 """ 793 Get all Design Reviews of this Repository. 794 795 https://hub.allspice.io/api/swagger#/repository/repoListPullRequests 796 797 :param state: The state of the Design Reviews to get. If None, all Design Reviews 798 are returned. 799 :param milestone: The milestone of the Design Reviews to get. 800 :param labels: A list of label IDs to filter DRs by. 801 :return: A list of Design Reviews. 802 """ 803 804 params = { 805 "state": state, 806 } 807 if milestone: 808 if isinstance(milestone, Milestone): 809 params["milestone"] = milestone.name 810 else: 811 params["milestone"] = milestone 812 if labels: 813 params["labels"] = ",".join(labels) 814 815 results = self.allspice_client.requests_get_paginated( 816 self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name), 817 params=params, 818 ) 819 return [DesignReview.parse_response(self.allspice_client, result) for result in results] 820 821 def get_commits( 822 self, 823 sha: Optional[str] = None, 824 path: Optional[str] = None, 825 stat: bool = True, 826 ) -> List["Commit"]: 827 """ 828 Get all the Commits of this Repository. 829 830 https://hub.allspice.io/api/swagger#/repository/repoGetAllCommits 831 832 :param sha: The SHA of the commit to start listing commits from. 833 :param path: filepath of a file/dir. 834 :param stat: Include the number of additions and deletions in the response. 835 Disable for speedup. 836 :return: A list of Commits. 837 """ 838 839 data = {} 840 if sha: 841 data["sha"] = sha 842 if path: 843 data["path"] = path 844 if not stat: 845 data["stat"] = False 846 847 try: 848 results = self.allspice_client.requests_get_paginated( 849 Repository.REPO_COMMITS % (self.owner.username, self.name), 850 params=data, 851 ) 852 except ConflictException as err: 853 logging.warning(err) 854 logging.warning("Repository %s/%s is Empty" % (self.owner.username, self.name)) 855 results = [] 856 return [Commit.parse_response(self.allspice_client, result) for result in results] 857 858 def get_issues_state(self, state) -> List["Issue"]: 859 """ 860 DEPRECATED: Use get_issues() instead. 861 862 Get issues of state Issue.open or Issue.closed of a repository. 863 """ 864 865 assert state in [Issue.OPENED, Issue.CLOSED] 866 issues = [] 867 data = {"state": state} 868 results = self.allspice_client.requests_get_paginated( 869 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 870 params=data, 871 ) 872 for result in results: 873 issue = Issue.parse_response(self.allspice_client, result) 874 # adding data not contained in the issue response 875 # See Issue.request() 876 setattr(issue, "_repository", self) 877 Issue._add_read_property("repo", self, issue) 878 Issue._add_read_property("owner", self.owner, issue) 879 issues.append(issue) 880 return issues 881 882 def get_times(self): 883 results = self.allspice_client.requests_get( 884 Repository.REPO_TIMES % (self.owner.username, self.name) 885 ) 886 return results 887 888 def get_user_time(self, username) -> float: 889 if isinstance(username, User): 890 username = username.username 891 results = self.allspice_client.requests_get( 892 Repository.REPO_USER_TIME % (self.owner.username, self.name, username) 893 ) 894 time = sum(r["time"] for r in results) 895 return time 896 897 def get_full_name(self) -> str: 898 return self.owner.username + "/" + self.name 899 900 def create_issue(self, title, assignees=frozenset(), description="") -> ApiObject: 901 data = { 902 "assignees": assignees, 903 "body": description, 904 "closed": False, 905 "title": title, 906 } 907 result = self.allspice_client.requests_post( 908 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 909 data=data, 910 ) 911 912 issue = Issue.parse_response(self.allspice_client, result) 913 setattr(issue, "_repository", self) 914 Issue._add_read_property("repo", self, issue) 915 return issue 916 917 def create_design_review( 918 self, 919 title: str, 920 head: Union[Branch, str], 921 base: Union[Branch, str], 922 assignees: Optional[Set[Union[User, str]]] = None, 923 body: Optional[str] = None, 924 due_date: Optional[datetime] = None, 925 milestone: Optional["Milestone"] = None, 926 ) -> "DesignReview": 927 """ 928 Create a new Design Review. 929 930 See https://hub.allspice.io/api/swagger#/repository/repoCreatePullRequest 931 932 :param title: Title of the Design Review 933 :param head: Branch or name of the branch to merge into the base branch 934 :param base: Branch or name of the branch to merge into 935 :param assignees: Optional. A list of users to assign this review. List can be of 936 User objects or of usernames. 937 :param body: An Optional Description for the Design Review. 938 :param due_date: An Optional Due date for the Design Review. 939 :param milestone: An Optional Milestone for the Design Review 940 :return: The created Design Review 941 """ 942 943 data: dict[str, Any] = { 944 "title": title, 945 } 946 947 if isinstance(head, Branch): 948 data["head"] = head.name 949 else: 950 data["head"] = head 951 if isinstance(base, Branch): 952 data["base"] = base.name 953 else: 954 data["base"] = base 955 if assignees: 956 data["assignees"] = [a.username if isinstance(a, User) else a for a in assignees] 957 if body: 958 data["body"] = body 959 if due_date: 960 data["due_date"] = Util.format_time(due_date) 961 if milestone: 962 data["milestone"] = milestone.id 963 964 result = self.allspice_client.requests_post( 965 self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name), 966 data=data, 967 ) 968 969 return DesignReview.parse_response(self.allspice_client, result) 970 971 def create_milestone( 972 self, 973 title: str, 974 description: str, 975 due_date: Optional[str] = None, 976 state: str = "open", 977 ) -> "Milestone": 978 url = Repository.REPO_MILESTONES.format(owner=self.owner.username, repo=self.name) 979 data = {"title": title, "description": description, "state": state} 980 if due_date: 981 data["due_date"] = due_date 982 result = self.allspice_client.requests_post(url, data=data) 983 return Milestone.parse_response(self.allspice_client, result) 984 985 def create_gitea_hook(self, hook_url: str, events: List[str]): 986 url = f"/repos/{self.owner.username}/{self.name}/hooks" 987 data = { 988 "type": "gitea", 989 "config": {"content_type": "json", "url": hook_url}, 990 "events": events, 991 "active": True, 992 } 993 return self.allspice_client.requests_post(url, data=data) 994 995 def list_hooks(self): 996 url = f"/repos/{self.owner.username}/{self.name}/hooks" 997 return self.allspice_client.requests_get(url) 998 999 def delete_hook(self, id: str): 1000 url = f"/repos/{self.owner.username}/{self.name}/hooks/{id}" 1001 self.allspice_client.requests_delete(url) 1002 1003 def is_collaborator(self, username) -> bool: 1004 if isinstance(username, User): 1005 username = username.username 1006 try: 1007 # returns 204 if its ok, 404 if its not 1008 self.allspice_client.requests_get( 1009 Repository.REPO_IS_COLLABORATOR % (self.owner.username, self.name, username) 1010 ) 1011 return True 1012 except Exception: 1013 return False 1014 1015 def get_users_with_access(self) -> Sequence[User]: 1016 url = f"/repos/{self.owner.username}/{self.name}/collaborators" 1017 response = self.allspice_client.requests_get(url) 1018 collabs = [User.parse_response(self.allspice_client, user) for user in response] 1019 if isinstance(self.owner, User): 1020 return [*collabs, self.owner] 1021 else: 1022 # owner must be org 1023 teams = self.owner.get_teams() 1024 for team in teams: 1025 team_repos = team.get_repos() 1026 if self.name in [n.name for n in team_repos]: 1027 collabs += team.get_members() 1028 return collabs 1029 1030 def remove_collaborator(self, user_name: str): 1031 url = f"/repos/{self.owner.username}/{self.name}/collaborators/{user_name}" 1032 self.allspice_client.requests_delete(url) 1033 1034 def transfer_ownership( 1035 self, 1036 new_owner: Union[User, Organization], 1037 new_teams: Set[Team] | FrozenSet[Team] = frozenset(), 1038 ): 1039 url = Repository.REPO_TRANSFER.format(owner=self.owner.username, repo=self.name) 1040 data: dict[str, Any] = {"new_owner": new_owner.username} 1041 if isinstance(new_owner, Organization): 1042 new_team_ids = [team.id for team in new_teams if team in new_owner.get_teams()] 1043 data["team_ids"] = new_team_ids 1044 self.allspice_client.requests_post(url, data=data) 1045 # TODO: make sure this instance is either updated or discarded 1046 1047 def get_git_content( 1048 self, 1049 ref: Optional["Ref"] = None, 1050 commit: "Optional[Commit]" = None, 1051 ) -> List[Content]: 1052 """ 1053 Get the metadata for all files in the root directory. 1054 1055 https://hub.allspice.io/api/swagger#/repository/repoGetContentsList 1056 1057 :param ref: branch or commit to get content from 1058 :param commit: commit to get content from (deprecated) 1059 """ 1060 url = f"/repos/{self.owner.username}/{self.name}/contents" 1061 data = Util.data_params_for_ref(ref or commit) 1062 1063 result = [ 1064 Content.parse_response(self.allspice_client, f) 1065 for f in self.allspice_client.requests_get(url, data) 1066 ] 1067 return result 1068 1069 def get_tree(self, ref: Optional[Ref] = None, recursive: bool = False) -> List[GitEntry]: 1070 """ 1071 Get the repository's tree on a given ref. 1072 1073 By default, this will only return the top-level entries in the tree. If you want 1074 to get the entire tree, set `recursive` to True. 1075 1076 :param ref: The ref to get the tree from. If not provided, the default branch is used. 1077 :param recursive: Whether to get the entire tree or just the top-level entries. 1078 """ 1079 1080 ref = Util.data_params_for_ref(ref).get("ref", self.default_branch) 1081 url = self.REPO_GET_TREE.format(owner=self.owner.username, repo=self.name, ref=ref) 1082 params = {"recursive": recursive} 1083 results = self.allspice_client.requests_get_paginated(url, params=params) 1084 return [GitEntry.parse_response(self.allspice_client, result) for result in results] 1085 1086 def get_file_content( 1087 self, 1088 content: Content, 1089 ref: Optional[Ref] = None, 1090 commit: Optional[Commit] = None, 1091 ) -> Union[str, List["Content"]]: 1092 """https://hub.allspice.io/api/swagger#/repository/repoGetContents""" 1093 url = f"/repos/{self.owner.username}/{self.name}/contents/{content.path}" 1094 data = Util.data_params_for_ref(ref or commit) 1095 1096 if content.type == Content.FILE: 1097 return self.allspice_client.requests_get(url, data)["content"] 1098 else: 1099 return [ 1100 Content.parse_response(self.allspice_client, f) 1101 for f in self.allspice_client.requests_get(url, data) 1102 ] 1103 1104 def get_raw_file( 1105 self, 1106 file_path: str, 1107 ref: Optional[Ref] = None, 1108 ) -> bytes: 1109 """ 1110 Get the raw, binary data of a single file. 1111 1112 Note 1: if the file you are requesting is a text file, you might want to 1113 use .decode() on the result to get a string. For example: 1114 1115 content = repo.get_raw_file("file.txt").decode("utf-8") 1116 1117 Note 2: this method will store the entire file in memory. If you want 1118 to download a large file, you might want to use `download_to_file` 1119 instead. 1120 1121 See https://hub.allspice.io/api/swagger#/repository/repoGetRawFile 1122 1123 :param file_path: The path to the file to get. 1124 :param ref: The branch or commit to get the file from. If not provided, 1125 the default branch is used. 1126 """ 1127 1128 url = self.REPO_GET_RAW_FILE.format( 1129 owner=self.owner.username, 1130 repo=self.name, 1131 path=file_path, 1132 ) 1133 params = Util.data_params_for_ref(ref) 1134 return self.allspice_client.requests_get_raw(url, params=params) 1135 1136 def download_to_file( 1137 self, 1138 file_path: str, 1139 io: IO, 1140 ref: Optional[Ref] = None, 1141 ) -> None: 1142 """ 1143 Download the binary data of a file to a file-like object. 1144 1145 Example: 1146 1147 with open("schematic.DSN", "wb") as f: 1148 Repository.download_to_file("Schematics/my_schematic.DSN", f) 1149 1150 :param file_path: The path to the file in the repository from the root 1151 of the repository. 1152 :param io: The file-like object to write the data to. 1153 """ 1154 1155 url = self.allspice_client._AllSpice__get_url( 1156 self.REPO_GET_RAW_FILE.format( 1157 owner=self.owner.username, 1158 repo=self.name, 1159 path=file_path, 1160 ) 1161 ) 1162 params = Util.data_params_for_ref(ref) 1163 response = self.allspice_client.requests.get( 1164 url, 1165 params=params, 1166 headers=self.allspice_client.headers, 1167 stream=True, 1168 ) 1169 1170 for chunk in response.iter_content(chunk_size=4096): 1171 if chunk: 1172 io.write(chunk) 1173 1174 def get_generated_json(self, content: Union[Content, str], ref: Optional[Ref] = None) -> dict: 1175 """ 1176 Get the json blob for a cad file if it exists, otherwise enqueue 1177 a new job and return a 503 status. 1178 1179 WARNING: This is still experimental and not recommended for critical 1180 applications. The structure and content of the returned dictionary can 1181 change at any time. 1182 1183 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON 1184 """ 1185 1186 if isinstance(content, Content): 1187 content = content.path 1188 1189 url = self.REPO_GET_ALLSPICE_JSON.format( 1190 owner=self.owner.username, 1191 repo=self.name, 1192 content=content, 1193 ) 1194 data = Util.data_params_for_ref(ref) 1195 return self.allspice_client.requests_get(url, data) 1196 1197 def get_generated_svg(self, content: Union[Content, str], ref: Optional[Ref] = None) -> bytes: 1198 """ 1199 Get the svg blob for a cad file if it exists, otherwise enqueue 1200 a new job and return a 503 status. 1201 1202 WARNING: This is still experimental and not yet recommended for 1203 critical applications. The content of the returned svg can change 1204 at any time. 1205 1206 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG 1207 """ 1208 1209 if isinstance(content, Content): 1210 content = content.path 1211 1212 url = self.REPO_GET_ALLSPICE_SVG.format( 1213 owner=self.owner.username, 1214 repo=self.name, 1215 content=content, 1216 ) 1217 data = Util.data_params_for_ref(ref) 1218 return self.allspice_client.requests_get_raw(url, data) 1219 1220 def create_file(self, file_path: str, content: str, data: Optional[dict] = None): 1221 """https://hub.allspice.io/api/swagger#/repository/repoCreateFile""" 1222 if not data: 1223 data = {} 1224 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1225 data.update({"content": content}) 1226 return self.allspice_client.requests_post(url, data) 1227 1228 def change_file(self, file_path: str, file_sha: str, content: str, data: Optional[dict] = None): 1229 """https://hub.allspice.io/api/swagger#/repository/repoCreateFile""" 1230 if not data: 1231 data = {} 1232 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1233 data.update({"sha": file_sha, "content": content}) 1234 return self.allspice_client.requests_put(url, data) 1235 1236 def delete_file(self, file_path: str, file_sha: str, data: Optional[dict] = None): 1237 """https://hub.allspice.io/api/swagger#/repository/repoDeleteFile""" 1238 if not data: 1239 data = {} 1240 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1241 data.update({"sha": file_sha}) 1242 return self.allspice_client.requests_delete(url, data) 1243 1244 def get_archive( 1245 self, 1246 ref: Ref = "main", 1247 archive_format: ArchiveFormat = ArchiveFormat.ZIP, 1248 ) -> bytes: 1249 """ 1250 Download all the files in a specific ref of a repository as a zip or tarball 1251 archive. 1252 1253 https://hub.allspice.io/api/swagger#/repository/repoGetArchive 1254 1255 :param ref: branch or commit to get content from, defaults to the "main" branch 1256 :param archive_format: zip or tar, defaults to zip 1257 """ 1258 1259 ref_string = Util.data_params_for_ref(ref)["ref"] 1260 url = self.REPO_GET_ARCHIVE.format( 1261 owner=self.owner.username, 1262 repo=self.name, 1263 ref=ref_string, 1264 format=archive_format.value, 1265 ) 1266 return self.allspice_client.requests_get_raw(url) 1267 1268 def get_topics(self) -> list[str]: 1269 """ 1270 Gets the list of topics on this repository. 1271 1272 See http://localhost:3000/api/swagger#/repository/repoListTopics 1273 """ 1274 1275 url = self.REPO_GET_TOPICS.format( 1276 owner=self.owner.username, 1277 repo=self.name, 1278 ) 1279 return self.allspice_client.requests_get(url)["topics"] 1280 1281 def add_topic(self, topic: str): 1282 """ 1283 Adds a topic to the repository. 1284 1285 See https://hub.allspice.io/api/swagger#/repository/repoAddTopic 1286 1287 :param topic: The topic to add. Topic names must consist only of 1288 lowercase letters, numnbers and dashes (-), and cannot start with 1289 dashes. Topic names also must be under 35 characters long. 1290 """ 1291 1292 url = self.REPO_ADD_TOPIC.format(owner=self.owner.username, repo=self.name, topic=topic) 1293 self.allspice_client.requests_put(url) 1294 1295 def create_release( 1296 self, 1297 tag_name: str, 1298 name: Optional[str] = None, 1299 body: Optional[str] = None, 1300 draft: bool = False, 1301 ): 1302 """ 1303 Create a release for this repository. The release will be created for 1304 the tag with the given name. If there is no tag with this name, create 1305 the tag first. 1306 1307 See https://hub.allspice.io/api/swagger#/repository/repoCreateRelease 1308 """ 1309 1310 url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name) 1311 data = { 1312 "tag_name": tag_name, 1313 "draft": draft, 1314 } 1315 if name is not None: 1316 data["name"] = name 1317 if body is not None: 1318 data["body"] = body 1319 response = self.allspice_client.requests_post(url, data) 1320 return Release.parse_response(self.allspice_client, response, self) 1321 1322 def get_releases( 1323 self, draft: Optional[bool] = None, pre_release: Optional[bool] = None 1324 ) -> List[Release]: 1325 """ 1326 Get the list of releases for this repository. 1327 1328 See https://hub.allspice.io/api/swagger#/repository/repoListReleases 1329 """ 1330 1331 data = {} 1332 1333 if draft is not None: 1334 data["draft"] = draft 1335 if pre_release is not None: 1336 data["pre-release"] = pre_release 1337 1338 url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name) 1339 responses = self.allspice_client.requests_get_paginated(url, params=data) 1340 1341 return [ 1342 Release.parse_response(self.allspice_client, response, self) for response in responses 1343 ] 1344 1345 def get_latest_release(self) -> Release: 1346 """ 1347 Get the latest release for this repository. 1348 1349 See https://hub.allspice.io/api/swagger#/repository/repoGetLatestRelease 1350 """ 1351 1352 url = self.REPO_GET_LATEST_RELEASE.format(owner=self.owner.username, repo=self.name) 1353 response = self.allspice_client.requests_get(url) 1354 release = Release.parse_response(self.allspice_client, response, self) 1355 return release 1356 1357 def get_release_by_tag(self, tag: str) -> Release: 1358 """ 1359 Get a release by its tag. 1360 1361 See https://hub.allspice.io/api/swagger#/repository/repoGetReleaseByTag 1362 """ 1363 1364 url = self.REPO_GET_RELEASE_BY_TAG.format( 1365 owner=self.owner.username, repo=self.name, tag=tag 1366 ) 1367 response = self.allspice_client.requests_get(url) 1368 release = Release.parse_response(self.allspice_client, response, self) 1369 return release 1370 1371 def get_commit_statuses( 1372 self, 1373 commit: Union[str, Commit], 1374 sort: Optional[CommitStatusSort] = None, 1375 state: Optional[CommitStatusState] = None, 1376 ) -> List[CommitStatus]: 1377 """ 1378 Get a list of statuses for a commit. 1379 1380 This is roughly equivalent to the Commit.get_statuses method, but this 1381 method allows you to sort and filter commits and is more convenient if 1382 you have a commit SHA and don't need to get the commit itself. 1383 1384 See https://hub.allspice.io/api/swagger#/repository/repoListStatuses 1385 """ 1386 1387 if isinstance(commit, Commit): 1388 commit = commit.sha 1389 1390 params = {} 1391 if sort is not None: 1392 params["sort"] = sort.value 1393 if state is not None: 1394 params["state"] = state.value 1395 1396 url = self.REPO_GET_COMMIT_STATUS.format( 1397 owner=self.owner.username, repo=self.name, sha=commit 1398 ) 1399 response = self.allspice_client.requests_get_paginated(url, params=params) 1400 return [CommitStatus.parse_response(self.allspice_client, status) for status in response] 1401 1402 def create_commit_status( 1403 self, 1404 commit: Union[str, Commit], 1405 context: Optional[str] = None, 1406 description: Optional[str] = None, 1407 state: Optional[CommitStatusState] = None, 1408 target_url: Optional[str] = None, 1409 ) -> CommitStatus: 1410 """ 1411 Create a status on a commit. 1412 1413 See https://hub.allspice.io/api/swagger#/repository/repoCreateStatus 1414 """ 1415 1416 if isinstance(commit, Commit): 1417 commit = commit.sha 1418 1419 data = {} 1420 if context is not None: 1421 data["context"] = context 1422 if description is not None: 1423 data["description"] = description 1424 if state is not None: 1425 data["state"] = state.value 1426 if target_url is not None: 1427 data["target_url"] = target_url 1428 1429 url = self.REPO_GET_COMMIT_STATUS.format( 1430 owner=self.owner.username, repo=self.name, sha=commit 1431 ) 1432 response = self.allspice_client.requests_post(url, data=data) 1433 return CommitStatus.parse_response(self.allspice_client, response) 1434 1435 def delete(self): 1436 self.allspice_client.requests_delete( 1437 Repository.REPO_DELETE % (self.owner.username, self.name) 1438 ) 1439 self.deleted = True
609 @classmethod 610 def search( 611 cls, 612 allspice_client, 613 query: Optional[str] = None, 614 topic: bool = False, 615 include_description: bool = False, 616 user: Optional[User] = None, 617 owner_to_prioritize: Union[User, Organization, None] = None, 618 ) -> list[Repository]: 619 """ 620 Search for repositories. 621 622 See https://hub.allspice.io/api/swagger#/repository/repoSearch 623 624 :param query: The query string to search for 625 :param topic: If true, the query string will only be matched against the 626 repository's topic. 627 :param include_description: If true, the query string will be matched 628 against the repository's description as well. 629 :param user: If specified, only repositories that this user owns or 630 contributes to will be searched. 631 :param owner_to_prioritize: If specified, repositories owned by the 632 given entity will be prioritized in the search. 633 :returns: All repositories matching the query. If there are many 634 repositories matching this query, this may take some time. 635 """ 636 637 params = {} 638 639 if query is not None: 640 params["q"] = query 641 if topic: 642 params["topic"] = topic 643 if include_description: 644 params["include_description"] = include_description 645 if user is not None: 646 params["user"] = user.id 647 if owner_to_prioritize is not None: 648 params["owner_to_prioritize"] = owner_to_prioritize.id 649 650 responses = allspice_client.requests_get_paginated(cls.REPO_SEARCH, params=params) 651 652 return [Repository.parse_response(allspice_client, response) for response in responses]
Search for repositories.
See allspice.allspice.io/api/swagger#/repository/repoSearch">https://huballspice.allspice.io/api/swagger#/repository/repoSearch
Parameters
- query: The query string to search for
- topic: If true, the query string will only be matched against the repository's topic.
- include_description: If true, the query string will be matched against the repository's description as well.
- user: If specified, only repositories that this user owns or contributes to will be searched.
- owner_to_prioritize: If specified, repositories owned by the given entity will be prioritized in the search. :returns: All repositories matching the query. If there are many repositories matching this query, this may take some time.
687 def get_branches(self) -> List["Branch"]: 688 """Get all the Branches of this Repository.""" 689 690 results = self.allspice_client.requests_get_paginated( 691 Repository.REPO_BRANCHES % (self.owner.username, self.name) 692 ) 693 return [Branch.parse_response(self.allspice_client, result) for result in results]
Get all the Branches of this Repository.
695 def get_branch(self, name: str) -> "Branch": 696 """Get a specific Branch of this Repository.""" 697 result = self.allspice_client.requests_get( 698 Repository.REPO_BRANCH.format(owner=self.owner.username, repo=self.name, branch=name) 699 ) 700 return Branch.parse_response(self.allspice_client, result)
Get a specific Branch of this Repository.
702 def add_branch(self, create_from: Ref, newname: str) -> "Branch": 703 """Add a branch to the repository""" 704 # Note: will only work with gitea 1.13 or higher! 705 706 ref_name = Util.data_params_for_ref(create_from) 707 if "ref" not in ref_name: 708 raise ValueError("create_from must be a Branch, Commit or string") 709 ref_name = ref_name["ref"] 710 711 data = {"new_branch_name": newname, "old_ref_name": ref_name} 712 result = self.allspice_client.requests_post( 713 Repository.REPO_BRANCHES % (self.owner.username, self.name), data=data 714 ) 715 return Branch.parse_response(self.allspice_client, result)
Add a branch to the repository
717 def get_issues( 718 self, 719 state: Literal["open", "closed", "all"] = "all", 720 search_query: Optional[str] = None, 721 labels: Optional[List[str]] = None, 722 milestones: Optional[List[Union[Milestone, str]]] = None, 723 assignee: Optional[Union[User, str]] = None, 724 since: Optional[datetime] = None, 725 before: Optional[datetime] = None, 726 ) -> List["Issue"]: 727 """ 728 Get all Issues of this Repository (open and closed) 729 730 https://hub.allspice.io/api/swagger#/repository/repoListIssues 731 732 All params of this method are optional filters. If you don't specify a filter, it 733 will not be applied. 734 735 :param state: The state of the Issues to get. If None, all Issues are returned. 736 :param search_query: Filter issues by text. This is equivalent to searching for 737 `search_query` in the Issues on the web interface. 738 :param labels: Filter issues by labels. 739 :param milestones: Filter issues by milestones. 740 :param assignee: Filter issues by the assigned user. 741 :param since: Filter issues by the date they were created. 742 :param before: Filter issues by the date they were created. 743 :return: A list of Issues. 744 """ 745 746 data = { 747 "state": state, 748 } 749 if search_query: 750 data["q"] = search_query 751 if labels: 752 data["labels"] = ",".join(labels) 753 if milestones: 754 data["milestone"] = ",".join( 755 [ 756 milestone.name if isinstance(milestone, Milestone) else milestone 757 for milestone in milestones 758 ] 759 ) 760 if assignee: 761 if isinstance(assignee, User): 762 data["assignee"] = assignee.username 763 else: 764 data["assignee"] = assignee 765 if since: 766 data["since"] = Util.format_time(since) 767 if before: 768 data["before"] = Util.format_time(before) 769 770 results = self.allspice_client.requests_get_paginated( 771 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 772 params=data, 773 ) 774 775 issues = [] 776 for result in results: 777 issue = Issue.parse_response(self.allspice_client, result) 778 # See Issue.request 779 setattr(issue, "_repository", self) 780 # This is mostly for compatibility with an older implementation 781 Issue._add_read_property("repo", self, issue) 782 issues.append(issue) 783 784 return issues
Get all Issues of this Repository (open and closed)
allspice.allspice.io/api/swagger#/repository/repoListIssues">https://huballspice.allspice.io/api/swagger#/repository/repoListIssues
All params of this method are optional filters. If you don't specify a filter, it will not be applied.
Parameters
- state: The state of the Issues to get. If None, all Issues are returned.
- search_query: Filter issues by text. This is equivalent to searching for
search_query
in the Issues on the web interface. - labels: Filter issues by labels.
- milestones: Filter issues by milestones.
- assignee: Filter issues by the assigned user.
- since: Filter issues by the date they were created.
- before: Filter issues by the date they were created.
Returns
A list of Issues.
786 def get_design_reviews( 787 self, 788 state: Literal["open", "closed", "all"] = "all", 789 milestone: Optional[Union[Milestone, str]] = None, 790 labels: Optional[List[str]] = None, 791 ) -> List["DesignReview"]: 792 """ 793 Get all Design Reviews of this Repository. 794 795 https://hub.allspice.io/api/swagger#/repository/repoListPullRequests 796 797 :param state: The state of the Design Reviews to get. If None, all Design Reviews 798 are returned. 799 :param milestone: The milestone of the Design Reviews to get. 800 :param labels: A list of label IDs to filter DRs by. 801 :return: A list of Design Reviews. 802 """ 803 804 params = { 805 "state": state, 806 } 807 if milestone: 808 if isinstance(milestone, Milestone): 809 params["milestone"] = milestone.name 810 else: 811 params["milestone"] = milestone 812 if labels: 813 params["labels"] = ",".join(labels) 814 815 results = self.allspice_client.requests_get_paginated( 816 self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name), 817 params=params, 818 ) 819 return [DesignReview.parse_response(self.allspice_client, result) for result in results]
Get all Design Reviews of this Repository.
allspice.allspice.io/api/swagger#/repository/repoListPullRequests">https://huballspice.allspice.io/api/swagger#/repository/repoListPullRequests
Parameters
- state: The state of the Design Reviews to get. If None, all Design Reviews are returned.
- milestone: The milestone of the Design Reviews to get.
- labels: A list of label IDs to filter DRs by.
Returns
A list of Design Reviews.
821 def get_commits( 822 self, 823 sha: Optional[str] = None, 824 path: Optional[str] = None, 825 stat: bool = True, 826 ) -> List["Commit"]: 827 """ 828 Get all the Commits of this Repository. 829 830 https://hub.allspice.io/api/swagger#/repository/repoGetAllCommits 831 832 :param sha: The SHA of the commit to start listing commits from. 833 :param path: filepath of a file/dir. 834 :param stat: Include the number of additions and deletions in the response. 835 Disable for speedup. 836 :return: A list of Commits. 837 """ 838 839 data = {} 840 if sha: 841 data["sha"] = sha 842 if path: 843 data["path"] = path 844 if not stat: 845 data["stat"] = False 846 847 try: 848 results = self.allspice_client.requests_get_paginated( 849 Repository.REPO_COMMITS % (self.owner.username, self.name), 850 params=data, 851 ) 852 except ConflictException as err: 853 logging.warning(err) 854 logging.warning("Repository %s/%s is Empty" % (self.owner.username, self.name)) 855 results = [] 856 return [Commit.parse_response(self.allspice_client, result) for result in results]
Get all the Commits of this Repository.
allspice.allspice.io/api/swagger#/repository/repoGetAllCommits">https://huballspice.allspice.io/api/swagger#/repository/repoGetAllCommits
Parameters
- sha: The SHA of the commit to start listing commits from.
- path: filepath of a file/dir.
- stat: Include the number of additions and deletions in the response. Disable for speedup.
Returns
A list of Commits.
858 def get_issues_state(self, state) -> List["Issue"]: 859 """ 860 DEPRECATED: Use get_issues() instead. 861 862 Get issues of state Issue.open or Issue.closed of a repository. 863 """ 864 865 assert state in [Issue.OPENED, Issue.CLOSED] 866 issues = [] 867 data = {"state": state} 868 results = self.allspice_client.requests_get_paginated( 869 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 870 params=data, 871 ) 872 for result in results: 873 issue = Issue.parse_response(self.allspice_client, result) 874 # adding data not contained in the issue response 875 # See Issue.request() 876 setattr(issue, "_repository", self) 877 Issue._add_read_property("repo", self, issue) 878 Issue._add_read_property("owner", self.owner, issue) 879 issues.append(issue) 880 return issues
DEPRECATED: Use get_issues() instead.
Get issues of state Issue.open or Issue.closed of a repository.
888 def get_user_time(self, username) -> float: 889 if isinstance(username, User): 890 username = username.username 891 results = self.allspice_client.requests_get( 892 Repository.REPO_USER_TIME % (self.owner.username, self.name, username) 893 ) 894 time = sum(r["time"] for r in results) 895 return time
900 def create_issue(self, title, assignees=frozenset(), description="") -> ApiObject: 901 data = { 902 "assignees": assignees, 903 "body": description, 904 "closed": False, 905 "title": title, 906 } 907 result = self.allspice_client.requests_post( 908 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 909 data=data, 910 ) 911 912 issue = Issue.parse_response(self.allspice_client, result) 913 setattr(issue, "_repository", self) 914 Issue._add_read_property("repo", self, issue) 915 return issue
917 def create_design_review( 918 self, 919 title: str, 920 head: Union[Branch, str], 921 base: Union[Branch, str], 922 assignees: Optional[Set[Union[User, str]]] = None, 923 body: Optional[str] = None, 924 due_date: Optional[datetime] = None, 925 milestone: Optional["Milestone"] = None, 926 ) -> "DesignReview": 927 """ 928 Create a new Design Review. 929 930 See https://hub.allspice.io/api/swagger#/repository/repoCreatePullRequest 931 932 :param title: Title of the Design Review 933 :param head: Branch or name of the branch to merge into the base branch 934 :param base: Branch or name of the branch to merge into 935 :param assignees: Optional. A list of users to assign this review. List can be of 936 User objects or of usernames. 937 :param body: An Optional Description for the Design Review. 938 :param due_date: An Optional Due date for the Design Review. 939 :param milestone: An Optional Milestone for the Design Review 940 :return: The created Design Review 941 """ 942 943 data: dict[str, Any] = { 944 "title": title, 945 } 946 947 if isinstance(head, Branch): 948 data["head"] = head.name 949 else: 950 data["head"] = head 951 if isinstance(base, Branch): 952 data["base"] = base.name 953 else: 954 data["base"] = base 955 if assignees: 956 data["assignees"] = [a.username if isinstance(a, User) else a for a in assignees] 957 if body: 958 data["body"] = body 959 if due_date: 960 data["due_date"] = Util.format_time(due_date) 961 if milestone: 962 data["milestone"] = milestone.id 963 964 result = self.allspice_client.requests_post( 965 self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name), 966 data=data, 967 ) 968 969 return DesignReview.parse_response(self.allspice_client, result)
Create a new Design Review.
See allspice.allspice.io/api/swagger#/repository/repoCreatePullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoCreatePullRequest
Parameters
- title: Title of the Design Review
- head: Branch or name of the branch to merge into the base branch
- base: Branch or name of the branch to merge into
- assignees: Optional. A list of users to assign this review. List can be of User objects or of usernames.
- body: An Optional Description for the Design Review.
- due_date: An Optional Due date for the Design Review.
- milestone: An Optional Milestone for the Design Review
Returns
The created Design Review
971 def create_milestone( 972 self, 973 title: str, 974 description: str, 975 due_date: Optional[str] = None, 976 state: str = "open", 977 ) -> "Milestone": 978 url = Repository.REPO_MILESTONES.format(owner=self.owner.username, repo=self.name) 979 data = {"title": title, "description": description, "state": state} 980 if due_date: 981 data["due_date"] = due_date 982 result = self.allspice_client.requests_post(url, data=data) 983 return Milestone.parse_response(self.allspice_client, result)
985 def create_gitea_hook(self, hook_url: str, events: List[str]): 986 url = f"/repos/{self.owner.username}/{self.name}/hooks" 987 data = { 988 "type": "gitea", 989 "config": {"content_type": "json", "url": hook_url}, 990 "events": events, 991 "active": True, 992 } 993 return self.allspice_client.requests_post(url, data=data)
1003 def is_collaborator(self, username) -> bool: 1004 if isinstance(username, User): 1005 username = username.username 1006 try: 1007 # returns 204 if its ok, 404 if its not 1008 self.allspice_client.requests_get( 1009 Repository.REPO_IS_COLLABORATOR % (self.owner.username, self.name, username) 1010 ) 1011 return True 1012 except Exception: 1013 return False
1015 def get_users_with_access(self) -> Sequence[User]: 1016 url = f"/repos/{self.owner.username}/{self.name}/collaborators" 1017 response = self.allspice_client.requests_get(url) 1018 collabs = [User.parse_response(self.allspice_client, user) for user in response] 1019 if isinstance(self.owner, User): 1020 return [*collabs, self.owner] 1021 else: 1022 # owner must be org 1023 teams = self.owner.get_teams() 1024 for team in teams: 1025 team_repos = team.get_repos() 1026 if self.name in [n.name for n in team_repos]: 1027 collabs += team.get_members() 1028 return collabs
1034 def transfer_ownership( 1035 self, 1036 new_owner: Union[User, Organization], 1037 new_teams: Set[Team] | FrozenSet[Team] = frozenset(), 1038 ): 1039 url = Repository.REPO_TRANSFER.format(owner=self.owner.username, repo=self.name) 1040 data: dict[str, Any] = {"new_owner": new_owner.username} 1041 if isinstance(new_owner, Organization): 1042 new_team_ids = [team.id for team in new_teams if team in new_owner.get_teams()] 1043 data["team_ids"] = new_team_ids 1044 self.allspice_client.requests_post(url, data=data) 1045 # TODO: make sure this instance is either updated or discarded
1047 def get_git_content( 1048 self, 1049 ref: Optional["Ref"] = None, 1050 commit: "Optional[Commit]" = None, 1051 ) -> List[Content]: 1052 """ 1053 Get the metadata for all files in the root directory. 1054 1055 https://hub.allspice.io/api/swagger#/repository/repoGetContentsList 1056 1057 :param ref: branch or commit to get content from 1058 :param commit: commit to get content from (deprecated) 1059 """ 1060 url = f"/repos/{self.owner.username}/{self.name}/contents" 1061 data = Util.data_params_for_ref(ref or commit) 1062 1063 result = [ 1064 Content.parse_response(self.allspice_client, f) 1065 for f in self.allspice_client.requests_get(url, data) 1066 ] 1067 return result
Get the metadata for all files in the root directory.
allspice.allspice.io/api/swagger#/repository/repoGetContentsList">https://huballspice.allspice.io/api/swagger#/repository/repoGetContentsList
Parameters
- ref: branch or commit to get content from
- commit: commit to get content from (deprecated)
1069 def get_tree(self, ref: Optional[Ref] = None, recursive: bool = False) -> List[GitEntry]: 1070 """ 1071 Get the repository's tree on a given ref. 1072 1073 By default, this will only return the top-level entries in the tree. If you want 1074 to get the entire tree, set `recursive` to True. 1075 1076 :param ref: The ref to get the tree from. If not provided, the default branch is used. 1077 :param recursive: Whether to get the entire tree or just the top-level entries. 1078 """ 1079 1080 ref = Util.data_params_for_ref(ref).get("ref", self.default_branch) 1081 url = self.REPO_GET_TREE.format(owner=self.owner.username, repo=self.name, ref=ref) 1082 params = {"recursive": recursive} 1083 results = self.allspice_client.requests_get_paginated(url, params=params) 1084 return [GitEntry.parse_response(self.allspice_client, result) for result in results]
Get the repository's tree on a given ref.
By default, this will only return the top-level entries in the tree. If you want
to get the entire tree, set recursive
to True.
Parameters
- ref: The ref to get the tree from. If not provided, the default branch is used.
- recursive: Whether to get the entire tree or just the top-level entries.
1086 def get_file_content( 1087 self, 1088 content: Content, 1089 ref: Optional[Ref] = None, 1090 commit: Optional[Commit] = None, 1091 ) -> Union[str, List["Content"]]: 1092 """https://hub.allspice.io/api/swagger#/repository/repoGetContents""" 1093 url = f"/repos/{self.owner.username}/{self.name}/contents/{content.path}" 1094 data = Util.data_params_for_ref(ref or commit) 1095 1096 if content.type == Content.FILE: 1097 return self.allspice_client.requests_get(url, data)["content"] 1098 else: 1099 return [ 1100 Content.parse_response(self.allspice_client, f) 1101 for f in self.allspice_client.requests_get(url, data) 1102 ]
allspice.allspice.io/api/swagger#/repository/repoGetContents">https://huballspice.allspice.io/api/swagger#/repository/repoGetContents
1104 def get_raw_file( 1105 self, 1106 file_path: str, 1107 ref: Optional[Ref] = None, 1108 ) -> bytes: 1109 """ 1110 Get the raw, binary data of a single file. 1111 1112 Note 1: if the file you are requesting is a text file, you might want to 1113 use .decode() on the result to get a string. For example: 1114 1115 content = repo.get_raw_file("file.txt").decode("utf-8") 1116 1117 Note 2: this method will store the entire file in memory. If you want 1118 to download a large file, you might want to use `download_to_file` 1119 instead. 1120 1121 See https://hub.allspice.io/api/swagger#/repository/repoGetRawFile 1122 1123 :param file_path: The path to the file to get. 1124 :param ref: The branch or commit to get the file from. If not provided, 1125 the default branch is used. 1126 """ 1127 1128 url = self.REPO_GET_RAW_FILE.format( 1129 owner=self.owner.username, 1130 repo=self.name, 1131 path=file_path, 1132 ) 1133 params = Util.data_params_for_ref(ref) 1134 return self.allspice_client.requests_get_raw(url, params=params)
Get the raw, binary data of a single file.
Note 1: if the file you are requesting is a text file, you might want to use .decode() on the result to get a string. For example:
content = repo.get_raw_file("file.txt").decode("utf-8")
Note 2: this method will store the entire file in memory. If you want
to download a large file, you might want to use download_to_file
instead.
See allspice.allspice.io/api/swagger#/repository/repoGetRawFile">https://huballspice.allspice.io/api/swagger#/repository/repoGetRawFile
Parameters
- file_path: The path to the file to get.
- ref: The branch or commit to get the file from. If not provided, the default branch is used.
1136 def download_to_file( 1137 self, 1138 file_path: str, 1139 io: IO, 1140 ref: Optional[Ref] = None, 1141 ) -> None: 1142 """ 1143 Download the binary data of a file to a file-like object. 1144 1145 Example: 1146 1147 with open("schematic.DSN", "wb") as f: 1148 Repository.download_to_file("Schematics/my_schematic.DSN", f) 1149 1150 :param file_path: The path to the file in the repository from the root 1151 of the repository. 1152 :param io: The file-like object to write the data to. 1153 """ 1154 1155 url = self.allspice_client._AllSpice__get_url( 1156 self.REPO_GET_RAW_FILE.format( 1157 owner=self.owner.username, 1158 repo=self.name, 1159 path=file_path, 1160 ) 1161 ) 1162 params = Util.data_params_for_ref(ref) 1163 response = self.allspice_client.requests.get( 1164 url, 1165 params=params, 1166 headers=self.allspice_client.headers, 1167 stream=True, 1168 ) 1169 1170 for chunk in response.iter_content(chunk_size=4096): 1171 if chunk: 1172 io.write(chunk)
Download the binary data of a file to a file-like object.
Example:
with open("schematic.DSN", "wb") as f:
Repository.download_to_file("Schematics/my_schematic.DSN", f)
Parameters
- file_path: The path to the file in the repository from the root of the repository.
- io: The file-like object to write the data to.
1174 def get_generated_json(self, content: Union[Content, str], ref: Optional[Ref] = None) -> dict: 1175 """ 1176 Get the json blob for a cad file if it exists, otherwise enqueue 1177 a new job and return a 503 status. 1178 1179 WARNING: This is still experimental and not recommended for critical 1180 applications. The structure and content of the returned dictionary can 1181 change at any time. 1182 1183 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON 1184 """ 1185 1186 if isinstance(content, Content): 1187 content = content.path 1188 1189 url = self.REPO_GET_ALLSPICE_JSON.format( 1190 owner=self.owner.username, 1191 repo=self.name, 1192 content=content, 1193 ) 1194 data = Util.data_params_for_ref(ref) 1195 return self.allspice_client.requests_get(url, data)
Get the json blob for a cad file if it exists, otherwise enqueue a new job and return a 503 status.
WARNING: This is still experimental and not recommended for critical applications. The structure and content of the returned dictionary can change at any time.
See allspice.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON">https://huballspice.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON
1197 def get_generated_svg(self, content: Union[Content, str], ref: Optional[Ref] = None) -> bytes: 1198 """ 1199 Get the svg blob for a cad file if it exists, otherwise enqueue 1200 a new job and return a 503 status. 1201 1202 WARNING: This is still experimental and not yet recommended for 1203 critical applications. The content of the returned svg can change 1204 at any time. 1205 1206 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG 1207 """ 1208 1209 if isinstance(content, Content): 1210 content = content.path 1211 1212 url = self.REPO_GET_ALLSPICE_SVG.format( 1213 owner=self.owner.username, 1214 repo=self.name, 1215 content=content, 1216 ) 1217 data = Util.data_params_for_ref(ref) 1218 return self.allspice_client.requests_get_raw(url, data)
Get the svg blob for a cad file if it exists, otherwise enqueue a new job and return a 503 status.
WARNING: This is still experimental and not yet recommended for critical applications. The content of the returned svg can change at any time.
See allspice.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG">https://huballspice.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG
1220 def create_file(self, file_path: str, content: str, data: Optional[dict] = None): 1221 """https://hub.allspice.io/api/swagger#/repository/repoCreateFile""" 1222 if not data: 1223 data = {} 1224 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1225 data.update({"content": content}) 1226 return self.allspice_client.requests_post(url, data)
allspice.allspice.io/api/swagger#/repository/repoCreateFile">https://huballspice.allspice.io/api/swagger#/repository/repoCreateFile
1228 def change_file(self, file_path: str, file_sha: str, content: str, data: Optional[dict] = None): 1229 """https://hub.allspice.io/api/swagger#/repository/repoCreateFile""" 1230 if not data: 1231 data = {} 1232 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1233 data.update({"sha": file_sha, "content": content}) 1234 return self.allspice_client.requests_put(url, data)
allspice.allspice.io/api/swagger#/repository/repoCreateFile">https://huballspice.allspice.io/api/swagger#/repository/repoCreateFile
1236 def delete_file(self, file_path: str, file_sha: str, data: Optional[dict] = None): 1237 """https://hub.allspice.io/api/swagger#/repository/repoDeleteFile""" 1238 if not data: 1239 data = {} 1240 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1241 data.update({"sha": file_sha}) 1242 return self.allspice_client.requests_delete(url, data)
allspice.allspice.io/api/swagger#/repository/repoDeleteFile">https://huballspice.allspice.io/api/swagger#/repository/repoDeleteFile
1244 def get_archive( 1245 self, 1246 ref: Ref = "main", 1247 archive_format: ArchiveFormat = ArchiveFormat.ZIP, 1248 ) -> bytes: 1249 """ 1250 Download all the files in a specific ref of a repository as a zip or tarball 1251 archive. 1252 1253 https://hub.allspice.io/api/swagger#/repository/repoGetArchive 1254 1255 :param ref: branch or commit to get content from, defaults to the "main" branch 1256 :param archive_format: zip or tar, defaults to zip 1257 """ 1258 1259 ref_string = Util.data_params_for_ref(ref)["ref"] 1260 url = self.REPO_GET_ARCHIVE.format( 1261 owner=self.owner.username, 1262 repo=self.name, 1263 ref=ref_string, 1264 format=archive_format.value, 1265 ) 1266 return self.allspice_client.requests_get_raw(url)
Download all the files in a specific ref of a repository as a zip or tarball archive.
allspice.allspice.io/api/swagger#/repository/repoGetArchive">https://huballspice.allspice.io/api/swagger#/repository/repoGetArchive
Parameters
- ref: branch or commit to get content from, defaults to the "main" branch
- archive_format: zip or tar, defaults to zip
1268 def get_topics(self) -> list[str]: 1269 """ 1270 Gets the list of topics on this repository. 1271 1272 See http://localhost:3000/api/swagger#/repository/repoListTopics 1273 """ 1274 1275 url = self.REPO_GET_TOPICS.format( 1276 owner=self.owner.username, 1277 repo=self.name, 1278 ) 1279 return self.allspice_client.requests_get(url)["topics"]
Gets the list of topics on this repository.
See http://localhost:3000/api/swagger#/repository/repoListTopics
1281 def add_topic(self, topic: str): 1282 """ 1283 Adds a topic to the repository. 1284 1285 See https://hub.allspice.io/api/swagger#/repository/repoAddTopic 1286 1287 :param topic: The topic to add. Topic names must consist only of 1288 lowercase letters, numnbers and dashes (-), and cannot start with 1289 dashes. Topic names also must be under 35 characters long. 1290 """ 1291 1292 url = self.REPO_ADD_TOPIC.format(owner=self.owner.username, repo=self.name, topic=topic) 1293 self.allspice_client.requests_put(url)
Adds a topic to the repository.
See allspice.allspice.io/api/swagger#/repository/repoAddTopic">https://huballspice.allspice.io/api/swagger#/repository/repoAddTopic
Parameters
- topic: The topic to add. Topic names must consist only of lowercase letters, numnbers and dashes (-), and cannot start with dashes. Topic names also must be under 35 characters long.
1295 def create_release( 1296 self, 1297 tag_name: str, 1298 name: Optional[str] = None, 1299 body: Optional[str] = None, 1300 draft: bool = False, 1301 ): 1302 """ 1303 Create a release for this repository. The release will be created for 1304 the tag with the given name. If there is no tag with this name, create 1305 the tag first. 1306 1307 See https://hub.allspice.io/api/swagger#/repository/repoCreateRelease 1308 """ 1309 1310 url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name) 1311 data = { 1312 "tag_name": tag_name, 1313 "draft": draft, 1314 } 1315 if name is not None: 1316 data["name"] = name 1317 if body is not None: 1318 data["body"] = body 1319 response = self.allspice_client.requests_post(url, data) 1320 return Release.parse_response(self.allspice_client, response, self)
Create a release for this repository. The release will be created for the tag with the given name. If there is no tag with this name, create the tag first.
See allspice.allspice.io/api/swagger#/repository/repoCreateRelease">https://huballspice.allspice.io/api/swagger#/repository/repoCreateRelease
1322 def get_releases( 1323 self, draft: Optional[bool] = None, pre_release: Optional[bool] = None 1324 ) -> List[Release]: 1325 """ 1326 Get the list of releases for this repository. 1327 1328 See https://hub.allspice.io/api/swagger#/repository/repoListReleases 1329 """ 1330 1331 data = {} 1332 1333 if draft is not None: 1334 data["draft"] = draft 1335 if pre_release is not None: 1336 data["pre-release"] = pre_release 1337 1338 url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name) 1339 responses = self.allspice_client.requests_get_paginated(url, params=data) 1340 1341 return [ 1342 Release.parse_response(self.allspice_client, response, self) for response in responses 1343 ]
Get the list of releases for this repository.
See allspice.allspice.io/api/swagger#/repository/repoListReleases">https://huballspice.allspice.io/api/swagger#/repository/repoListReleases
1345 def get_latest_release(self) -> Release: 1346 """ 1347 Get the latest release for this repository. 1348 1349 See https://hub.allspice.io/api/swagger#/repository/repoGetLatestRelease 1350 """ 1351 1352 url = self.REPO_GET_LATEST_RELEASE.format(owner=self.owner.username, repo=self.name) 1353 response = self.allspice_client.requests_get(url) 1354 release = Release.parse_response(self.allspice_client, response, self) 1355 return release
Get the latest release for this repository.
See allspice.allspice.io/api/swagger#/repository/repoGetLatestRelease">https://huballspice.allspice.io/api/swagger#/repository/repoGetLatestRelease
1357 def get_release_by_tag(self, tag: str) -> Release: 1358 """ 1359 Get a release by its tag. 1360 1361 See https://hub.allspice.io/api/swagger#/repository/repoGetReleaseByTag 1362 """ 1363 1364 url = self.REPO_GET_RELEASE_BY_TAG.format( 1365 owner=self.owner.username, repo=self.name, tag=tag 1366 ) 1367 response = self.allspice_client.requests_get(url) 1368 release = Release.parse_response(self.allspice_client, response, self) 1369 return release
Get a release by its tag.
See allspice.allspice.io/api/swagger#/repository/repoGetReleaseByTag">https://huballspice.allspice.io/api/swagger#/repository/repoGetReleaseByTag
1371 def get_commit_statuses( 1372 self, 1373 commit: Union[str, Commit], 1374 sort: Optional[CommitStatusSort] = None, 1375 state: Optional[CommitStatusState] = None, 1376 ) -> List[CommitStatus]: 1377 """ 1378 Get a list of statuses for a commit. 1379 1380 This is roughly equivalent to the Commit.get_statuses method, but this 1381 method allows you to sort and filter commits and is more convenient if 1382 you have a commit SHA and don't need to get the commit itself. 1383 1384 See https://hub.allspice.io/api/swagger#/repository/repoListStatuses 1385 """ 1386 1387 if isinstance(commit, Commit): 1388 commit = commit.sha 1389 1390 params = {} 1391 if sort is not None: 1392 params["sort"] = sort.value 1393 if state is not None: 1394 params["state"] = state.value 1395 1396 url = self.REPO_GET_COMMIT_STATUS.format( 1397 owner=self.owner.username, repo=self.name, sha=commit 1398 ) 1399 response = self.allspice_client.requests_get_paginated(url, params=params) 1400 return [CommitStatus.parse_response(self.allspice_client, status) for status in response]
Get a list of statuses for a commit.
This is roughly equivalent to the Commit.get_statuses method, but this method allows you to sort and filter commits and is more convenient if you have a commit SHA and don't need to get the commit itself.
See allspice.allspice.io/api/swagger#/repository/repoListStatuses">https://huballspice.allspice.io/api/swagger#/repository/repoListStatuses
1402 def create_commit_status( 1403 self, 1404 commit: Union[str, Commit], 1405 context: Optional[str] = None, 1406 description: Optional[str] = None, 1407 state: Optional[CommitStatusState] = None, 1408 target_url: Optional[str] = None, 1409 ) -> CommitStatus: 1410 """ 1411 Create a status on a commit. 1412 1413 See https://hub.allspice.io/api/swagger#/repository/repoCreateStatus 1414 """ 1415 1416 if isinstance(commit, Commit): 1417 commit = commit.sha 1418 1419 data = {} 1420 if context is not None: 1421 data["context"] = context 1422 if description is not None: 1423 data["description"] = description 1424 if state is not None: 1425 data["state"] = state.value 1426 if target_url is not None: 1427 data["target_url"] = target_url 1428 1429 url = self.REPO_GET_COMMIT_STATUS.format( 1430 owner=self.owner.username, repo=self.name, sha=commit 1431 ) 1432 response = self.allspice_client.requests_post(url, data=data) 1433 return CommitStatus.parse_response(self.allspice_client, response)
Create a status on a commit.
See allspice.allspice.io/api/swagger#/repository/repoCreateStatus">https://huballspice.allspice.io/api/swagger#/repository/repoCreateStatus
560 class ArchiveFormat(Enum): 561 """ 562 Archive formats for Repository.get_archive 563 """ 564 565 TAR = "tar.gz" 566 ZIP = "zip"
Archive formats for Repository.get_archive
Inherited Members
- enum.Enum
- name
- value
568 class CommitStatusSort(Enum): 569 """ 570 Sort order for Repository.get_commit_status 571 """ 572 573 OLDEST = "oldest" 574 RECENT_UPDATE = "recentupdate" 575 LEAST_UPDATE = "leastupdate" 576 LEAST_INDEX = "leastindex" 577 HIGHEST_INDEX = "highestindex"
Sort order for Repository.get_commit_status
Inherited Members
- enum.Enum
- name
- value
415class Branch(ReadonlyApiObject): 416 commit: Dict[str, Optional[Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]]] 417 effective_branch_protection_name: str 418 enable_status_check: bool 419 name: str 420 protected: bool 421 required_approvals: int 422 status_check_contexts: List[Any] 423 user_can_merge: bool 424 user_can_push: bool 425 426 API_OBJECT = """/repos/{owner}/{repo}/branches/{branch}""" 427 428 def __init__(self, allspice_client): 429 super().__init__(allspice_client) 430 431 def __eq__(self, other): 432 if not isinstance(other, Branch): 433 return False 434 return self.commit == other.commit and self.name == other.name 435 436 def __hash__(self): 437 return hash(self.commit["id"]) ^ hash(self.name) 438 439 _fields_to_parsers: ClassVar[dict] = { 440 # This is not a commit object 441 # "commit": lambda allspice_client, c: Commit.parse_response(allspice_client, c) 442 } 443 444 @classmethod 445 def request(cls, allspice_client, owner: str, repo: str, branch: str): 446 return cls._request(allspice_client, {"owner": owner, "repo": repo, "branch": branch})
Inherited Members
Common base class for all non-exit exceptions.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- add_note
- args
Common base class for all non-exit exceptions.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- add_note
- args
1873class Issue(ApiObject): 1874 assets: List[Any] 1875 assignee: Any 1876 assignees: Any 1877 body: str 1878 closed_at: Any 1879 comments: int 1880 created_at: str 1881 due_date: Any 1882 html_url: str 1883 id: int 1884 is_locked: bool 1885 labels: List[Any] 1886 milestone: Optional["Milestone"] 1887 number: int 1888 original_author: str 1889 original_author_id: int 1890 pin_order: int 1891 pull_request: Any 1892 ref: str 1893 repository: Dict[str, Union[int, str]] 1894 state: str 1895 title: str 1896 updated_at: str 1897 url: str 1898 user: User 1899 1900 API_OBJECT = """/repos/{owner}/{repo}/issues/{index}""" # <owner, repo, index> 1901 GET_TIME = """/repos/%s/%s/issues/%s/times""" # <owner, repo, index> 1902 GET_COMMENTS = """/repos/{owner}/{repo}/issues/{index}/comments""" 1903 CREATE_ISSUE = """/repos/{owner}/{repo}/issues""" 1904 1905 OPENED = "open" 1906 CLOSED = "closed" 1907 1908 def __init__(self, allspice_client): 1909 super().__init__(allspice_client) 1910 1911 def __eq__(self, other): 1912 if not isinstance(other, Issue): 1913 return False 1914 return self.repository == other.repository and self.id == other.id 1915 1916 def __hash__(self): 1917 return hash(self.repository) ^ hash(self.id) 1918 1919 _fields_to_parsers: ClassVar[dict] = { 1920 "milestone": lambda allspice_client, m: Milestone.parse_response(allspice_client, m), 1921 "user": lambda allspice_client, u: User.parse_response(allspice_client, u), 1922 "assignee": lambda allspice_client, u: User.parse_response(allspice_client, u), 1923 "assignees": lambda allspice_client, us: [ 1924 User.parse_response(allspice_client, u) for u in us 1925 ], 1926 "state": lambda _, s: (Issue.CLOSED if s == "closed" else Issue.OPENED), 1927 } 1928 1929 _parsers_to_fields: ClassVar[dict] = { 1930 "milestone": lambda m: m.id, 1931 } 1932 1933 _patchable_fields: ClassVar[set[str]] = { 1934 "assignee", 1935 "assignees", 1936 "body", 1937 "due_date", 1938 "milestone", 1939 "state", 1940 "title", 1941 } 1942 1943 def commit(self): 1944 args = { 1945 "owner": self.repository.owner.username, 1946 "repo": self.repository.name, 1947 "index": self.number, 1948 } 1949 self._commit(args) 1950 1951 @classmethod 1952 def request(cls, allspice_client, owner: str, repo: str, number: str): 1953 api_object = cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number}) 1954 # The repository in the response is a RepositoryMeta object, so request 1955 # the full repository object and add it to the issue object. 1956 repository = Repository.request(allspice_client, owner, repo) 1957 setattr(api_object, "_repository", repository) 1958 # For legacy reasons 1959 cls._add_read_property("repo", repository, api_object) 1960 return api_object 1961 1962 @classmethod 1963 def create_issue(cls, allspice_client, repo: Repository, title: str, body: str = ""): 1964 args = {"owner": repo.owner.username, "repo": repo.name} 1965 data = {"title": title, "body": body} 1966 result = allspice_client.requests_post(Issue.CREATE_ISSUE.format(**args), data=data) 1967 issue = Issue.parse_response(allspice_client, result) 1968 setattr(issue, "_repository", repo) 1969 cls._add_read_property("repo", repo, issue) 1970 return issue 1971 1972 @property 1973 def owner(self) -> Organization | User: 1974 return self.repository.owner 1975 1976 def get_time_sum(self, user: User) -> int: 1977 results = self.allspice_client.requests_get( 1978 Issue.GET_TIME % (self.owner.username, self.repository.name, self.number) 1979 ) 1980 return sum(result["time"] for result in results if result and result["user_id"] == user.id) 1981 1982 def get_times(self) -> Optional[Dict]: 1983 return self.allspice_client.requests_get( 1984 Issue.GET_TIME % (self.owner.username, self.repository.name, self.number) 1985 ) 1986 1987 def delete_time(self, time_id: str): 1988 path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times/{time_id}" 1989 self.allspice_client.requests_delete(path) 1990 1991 def add_time(self, time: int, created: Optional[str] = None, user_name: Optional[User] = None): 1992 path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times" 1993 self.allspice_client.requests_post( 1994 path, data={"created": created, "time": int(time), "user_name": user_name} 1995 ) 1996 1997 def get_comments(self) -> List[Comment]: 1998 """https://hub.allspice.io/api/swagger#/issue/issueGetComments""" 1999 2000 results = self.allspice_client.requests_get( 2001 self.GET_COMMENTS.format( 2002 owner=self.owner.username, repo=self.repository.name, index=self.number 2003 ) 2004 ) 2005 2006 return [Comment.parse_response(self.allspice_client, result) for result in results] 2007 2008 def create_comment(self, body: str) -> Comment: 2009 """https://hub.allspice.io/api/swagger#/issue/issueCreateComment""" 2010 2011 path = self.GET_COMMENTS.format( 2012 owner=self.owner.username, repo=self.repository.name, index=self.number 2013 ) 2014 2015 response = self.allspice_client.requests_post(path, data={"body": body}) 2016 return Comment.parse_response(self.allspice_client, response)
1951 @classmethod 1952 def request(cls, allspice_client, owner: str, repo: str, number: str): 1953 api_object = cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number}) 1954 # The repository in the response is a RepositoryMeta object, so request 1955 # the full repository object and add it to the issue object. 1956 repository = Repository.request(allspice_client, owner, repo) 1957 setattr(api_object, "_repository", repository) 1958 # For legacy reasons 1959 cls._add_read_property("repo", repository, api_object) 1960 return api_object
1962 @classmethod 1963 def create_issue(cls, allspice_client, repo: Repository, title: str, body: str = ""): 1964 args = {"owner": repo.owner.username, "repo": repo.name} 1965 data = {"title": title, "body": body} 1966 result = allspice_client.requests_post(Issue.CREATE_ISSUE.format(**args), data=data) 1967 issue = Issue.parse_response(allspice_client, result) 1968 setattr(issue, "_repository", repo) 1969 cls._add_read_property("repo", repo, issue) 1970 return issue
1991 def add_time(self, time: int, created: Optional[str] = None, user_name: Optional[User] = None): 1992 path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times" 1993 self.allspice_client.requests_post( 1994 path, data={"created": created, "time": int(time), "user_name": user_name} 1995 )
1997 def get_comments(self) -> List[Comment]: 1998 """https://hub.allspice.io/api/swagger#/issue/issueGetComments""" 1999 2000 results = self.allspice_client.requests_get( 2001 self.GET_COMMENTS.format( 2002 owner=self.owner.username, repo=self.repository.name, index=self.number 2003 ) 2004 ) 2005 2006 return [Comment.parse_response(self.allspice_client, result) for result in results]
allspice.allspice.io/api/swagger#/issue/issueGetComments">https://huballspice.allspice.io/api/swagger#/issue/issueGetComments
2008 def create_comment(self, body: str) -> Comment: 2009 """https://hub.allspice.io/api/swagger#/issue/issueCreateComment""" 2010 2011 path = self.GET_COMMENTS.format( 2012 owner=self.owner.username, repo=self.repository.name, index=self.number 2013 ) 2014 2015 response = self.allspice_client.requests_post(path, data={"body": body}) 2016 return Comment.parse_response(self.allspice_client, response)
allspice.allspice.io/api/swagger#/issue/issueCreateComment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateComment
1442class Milestone(ApiObject): 1443 allow_merge_commits: Any 1444 allow_rebase: Any 1445 allow_rebase_explicit: Any 1446 allow_squash_merge: Any 1447 archived: Any 1448 closed_at: Any 1449 closed_issues: int 1450 created_at: str 1451 default_branch: Any 1452 description: str 1453 due_on: Any 1454 has_issues: Any 1455 has_pull_requests: Any 1456 has_wiki: Any 1457 id: int 1458 ignore_whitespace_conflicts: Any 1459 name: Any 1460 open_issues: int 1461 private: Any 1462 state: str 1463 title: str 1464 updated_at: str 1465 website: Any 1466 1467 API_OBJECT = """/repos/{owner}/{repo}/milestones/{number}""" # <owner, repo> 1468 1469 def __init__(self, allspice_client): 1470 super().__init__(allspice_client) 1471 1472 def __eq__(self, other): 1473 if not isinstance(other, Milestone): 1474 return False 1475 return self.allspice_client == other.allspice_client and self.id == other.id 1476 1477 def __hash__(self): 1478 return hash(self.allspice_client) ^ hash(self.id) 1479 1480 _fields_to_parsers: ClassVar[dict] = { 1481 "closed_at": lambda _, t: Util.convert_time(t), 1482 "due_on": lambda _, t: Util.convert_time(t), 1483 } 1484 1485 _patchable_fields: ClassVar[set[str]] = { 1486 "allow_merge_commits", 1487 "allow_rebase", 1488 "allow_rebase_explicit", 1489 "allow_squash_merge", 1490 "archived", 1491 "default_branch", 1492 "description", 1493 "has_issues", 1494 "has_pull_requests", 1495 "has_wiki", 1496 "ignore_whitespace_conflicts", 1497 "name", 1498 "private", 1499 "website", 1500 } 1501 1502 @classmethod 1503 def request(cls, allspice_client, owner: str, repo: str, number: str): 1504 return cls._request(allspice_client, {"owner": owner, "repo": repo, "number": number})
1698class Commit(ReadonlyApiObject): 1699 author: User 1700 commit: Dict[str, Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]] 1701 committer: Dict[str, Union[int, str, bool]] 1702 created: str 1703 files: List[Dict[str, str]] 1704 html_url: str 1705 inner_commit: Dict[str, Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]] 1706 parents: List[Union[Dict[str, str], Any]] 1707 sha: str 1708 stats: Dict[str, int] 1709 url: str 1710 1711 API_OBJECT = """/repos/{owner}/{repo}/commits/{sha}""" 1712 COMMIT_GET_STATUS = """/repos/{owner}/{repo}/commits/{sha}/status""" 1713 COMMIT_GET_STATUSES = """/repos/{owner}/{repo}/commits/{sha}/statuses""" 1714 1715 # Regex to extract owner and repo names from the url property 1716 URL_REGEXP = re.compile(r"/repos/([^/]+)/([^/]+)/git/commits") 1717 1718 def __init__(self, allspice_client): 1719 super().__init__(allspice_client) 1720 1721 _fields_to_parsers: ClassVar[dict] = { 1722 # NOTE: api may return None for commiters that are no allspice users 1723 "author": lambda allspice_client, u: ( 1724 User.parse_response(allspice_client, u) if u else None 1725 ) 1726 } 1727 1728 def __eq__(self, other): 1729 if not isinstance(other, Commit): 1730 return False 1731 return self.sha == other.sha 1732 1733 def __hash__(self): 1734 return hash(self.sha) 1735 1736 @classmethod 1737 def parse_response(cls, allspice_client, result) -> "Commit": 1738 commit_cache = result["commit"] 1739 api_object = cls(allspice_client) 1740 cls._initialize(allspice_client, api_object, result) 1741 # inner_commit for legacy reasons 1742 Commit._add_read_property("inner_commit", commit_cache, api_object) 1743 return api_object 1744 1745 def get_status(self) -> CommitCombinedStatus: 1746 """ 1747 Get a combined status consisting of all statues on this commit. 1748 1749 Note that the returned object is a CommitCombinedStatus object, which 1750 also contains a list of all statuses on the commit. 1751 1752 https://hub.allspice.io/api/swagger#/repository/repoGetCommitStatus 1753 """ 1754 1755 result = self.allspice_client.requests_get( 1756 self.COMMIT_GET_STATUS.format(**self._fields_for_path) 1757 ) 1758 return CommitCombinedStatus.parse_response(self.allspice_client, result) 1759 1760 def get_statuses(self) -> List[CommitStatus]: 1761 """ 1762 Get a list of all statuses on this commit. 1763 1764 https://hub.allspice.io/api/swagger#/repository/repoListCommitStatuses 1765 """ 1766 1767 results = self.allspice_client.requests_get( 1768 self.COMMIT_GET_STATUSES.format(**self._fields_for_path) 1769 ) 1770 return [CommitStatus.parse_response(self.allspice_client, result) for result in results] 1771 1772 @cached_property 1773 def _fields_for_path(self) -> dict[str, str]: 1774 matches = self.URL_REGEXP.search(self.url) 1775 if not matches: 1776 raise ValueError(f"Invalid commit URL: {self.url}") 1777 1778 return { 1779 "owner": matches.group(1), 1780 "repo": matches.group(2), 1781 "sha": self.sha, 1782 }
1736 @classmethod 1737 def parse_response(cls, allspice_client, result) -> "Commit": 1738 commit_cache = result["commit"] 1739 api_object = cls(allspice_client) 1740 cls._initialize(allspice_client, api_object, result) 1741 # inner_commit for legacy reasons 1742 Commit._add_read_property("inner_commit", commit_cache, api_object) 1743 return api_object
1745 def get_status(self) -> CommitCombinedStatus: 1746 """ 1747 Get a combined status consisting of all statues on this commit. 1748 1749 Note that the returned object is a CommitCombinedStatus object, which 1750 also contains a list of all statuses on the commit. 1751 1752 https://hub.allspice.io/api/swagger#/repository/repoGetCommitStatus 1753 """ 1754 1755 result = self.allspice_client.requests_get( 1756 self.COMMIT_GET_STATUS.format(**self._fields_for_path) 1757 ) 1758 return CommitCombinedStatus.parse_response(self.allspice_client, result)
Get a combined status consisting of all statues on this commit.
Note that the returned object is a CommitCombinedStatus object, which also contains a list of all statuses on the commit.
allspice.allspice.io/api/swagger#/repository/repoGetCommitStatus">https://huballspice.allspice.io/api/swagger#/repository/repoGetCommitStatus
1760 def get_statuses(self) -> List[CommitStatus]: 1761 """ 1762 Get a list of all statuses on this commit. 1763 1764 https://hub.allspice.io/api/swagger#/repository/repoListCommitStatuses 1765 """ 1766 1767 results = self.allspice_client.requests_get( 1768 self.COMMIT_GET_STATUSES.format(**self._fields_for_path) 1769 ) 1770 return [CommitStatus.parse_response(self.allspice_client, result) for result in results]
Get a list of all statuses on this commit.
allspice.allspice.io/api/swagger#/repository/repoListCommitStatuses">https://huballspice.allspice.io/api/swagger#/repository/repoListCommitStatuses
Inherited Members
1558class Comment(ApiObject): 1559 assets: List[Union[Any, Dict[str, Union[int, str]]]] 1560 body: str 1561 created_at: datetime 1562 html_url: str 1563 id: int 1564 issue_url: str 1565 original_author: str 1566 original_author_id: int 1567 pull_request_url: str 1568 updated_at: datetime 1569 user: User 1570 1571 API_OBJECT = """/repos/{owner}/{repo}/issues/comments/{id}""" 1572 GET_ATTACHMENTS_PATH = """/repos/{owner}/{repo}/issues/comments/{id}/assets""" 1573 ATTACHMENT_PATH = """/repos/{owner}/{repo}/issues/comments/{id}/assets/{attachment_id}""" 1574 1575 def __init__(self, allspice_client): 1576 super().__init__(allspice_client) 1577 1578 def __eq__(self, other): 1579 if not isinstance(other, Comment): 1580 return False 1581 return self.repository == other.repository and self.id == other.id 1582 1583 def __hash__(self): 1584 return hash(self.repository) ^ hash(self.id) 1585 1586 @classmethod 1587 def request(cls, allspice_client, owner: str, repo: str, id: str) -> "Comment": 1588 return cls._request(allspice_client, {"owner": owner, "repo": repo, "id": id}) 1589 1590 _fields_to_parsers: ClassVar[dict] = { 1591 "user": lambda allspice_client, r: User.parse_response(allspice_client, r), 1592 "created_at": lambda _, t: Util.convert_time(t), 1593 "updated_at": lambda _, t: Util.convert_time(t), 1594 } 1595 1596 _patchable_fields: ClassVar[set[str]] = {"body"} 1597 1598 @property 1599 def parent_url(self) -> str: 1600 """URL of the parent of this comment (the issue or the pull request)""" 1601 1602 if self.issue_url is not None and self.issue_url != "": 1603 return self.issue_url 1604 else: 1605 return self.pull_request_url 1606 1607 @cached_property 1608 def repository(self) -> Repository: 1609 """The repository this comment was posted on.""" 1610 1611 owner_name, repo_name = self.parent_url.split("/")[-4:-2] 1612 return Repository.request(self.allspice_client, owner_name, repo_name) 1613 1614 def __fields_for_path(self): 1615 return { 1616 "owner": self.repository.owner.username, 1617 "repo": self.repository.name, 1618 "id": self.id, 1619 } 1620 1621 def commit(self): 1622 self._commit(self.__fields_for_path()) 1623 1624 def delete(self): 1625 self.allspice_client.requests_delete(self.API_OBJECT.format(**self.__fields_for_path())) 1626 self.deleted = True 1627 1628 def get_attachments(self) -> List[Attachment]: 1629 """ 1630 Get all attachments on this comment. This returns Attachment objects, which 1631 contain a link to download the attachment. 1632 1633 https://hub.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments 1634 """ 1635 1636 results = self.allspice_client.requests_get( 1637 self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()) 1638 ) 1639 return [Attachment.parse_response(self.allspice_client, result) for result in results] 1640 1641 def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment: 1642 """ 1643 Create an attachment on this comment. 1644 1645 https://hub.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment 1646 1647 :param file: The file to attach. This should be a file-like object. 1648 :param name: The name of the file. If not provided, the name of the file will be 1649 used. 1650 :return: The created attachment. 1651 """ 1652 1653 args: dict[str, Any] = { 1654 "files": {"attachment": file}, 1655 } 1656 if name is not None: 1657 args["params"] = {"name": name} 1658 1659 result = self.allspice_client.requests_post( 1660 self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()), 1661 **args, 1662 ) 1663 return Attachment.parse_response(self.allspice_client, result) 1664 1665 def edit_attachment(self, attachment: Attachment, data: dict) -> Attachment: 1666 """ 1667 Edit an attachment. 1668 1669 The list of params that can be edited is available at 1670 https://hub.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment 1671 1672 :param attachment: The attachment to be edited 1673 :param data: The data parameter should be a dictionary of the fields to edit. 1674 :return: The edited attachment 1675 """ 1676 1677 args = { 1678 **self.__fields_for_path(), 1679 "attachment_id": attachment.id, 1680 } 1681 result = self.allspice_client.requests_patch( 1682 self.ATTACHMENT_PATH.format(**args), 1683 data=data, 1684 ) 1685 return Attachment.parse_response(self.allspice_client, result) 1686 1687 def delete_attachment(self, attachment: Attachment): 1688 """https://hub.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment""" 1689 1690 args = { 1691 **self.__fields_for_path(), 1692 "attachment_id": attachment.id, 1693 } 1694 self.allspice_client.requests_delete(self.ATTACHMENT_PATH.format(**args)) 1695 attachment.deleted = True
1598 @property 1599 def parent_url(self) -> str: 1600 """URL of the parent of this comment (the issue or the pull request)""" 1601 1602 if self.issue_url is not None and self.issue_url != "": 1603 return self.issue_url 1604 else: 1605 return self.pull_request_url
URL of the parent of this comment (the issue or the pull request)
1607 @cached_property 1608 def repository(self) -> Repository: 1609 """The repository this comment was posted on.""" 1610 1611 owner_name, repo_name = self.parent_url.split("/")[-4:-2] 1612 return Repository.request(self.allspice_client, owner_name, repo_name)
The repository this comment was posted on.
1628 def get_attachments(self) -> List[Attachment]: 1629 """ 1630 Get all attachments on this comment. This returns Attachment objects, which 1631 contain a link to download the attachment. 1632 1633 https://hub.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments 1634 """ 1635 1636 results = self.allspice_client.requests_get( 1637 self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()) 1638 ) 1639 return [Attachment.parse_response(self.allspice_client, result) for result in results]
Get all attachments on this comment. This returns Attachment objects, which contain a link to download the attachment.
allspice.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments">https://huballspice.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments
1641 def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment: 1642 """ 1643 Create an attachment on this comment. 1644 1645 https://hub.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment 1646 1647 :param file: The file to attach. This should be a file-like object. 1648 :param name: The name of the file. If not provided, the name of the file will be 1649 used. 1650 :return: The created attachment. 1651 """ 1652 1653 args: dict[str, Any] = { 1654 "files": {"attachment": file}, 1655 } 1656 if name is not None: 1657 args["params"] = {"name": name} 1658 1659 result = self.allspice_client.requests_post( 1660 self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()), 1661 **args, 1662 ) 1663 return Attachment.parse_response(self.allspice_client, result)
Create an attachment on this comment.
allspice.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment
Parameters
- file: The file to attach. This should be a file-like object.
- name: The name of the file. If not provided, the name of the file will be used.
Returns
The created attachment.
1665 def edit_attachment(self, attachment: Attachment, data: dict) -> Attachment: 1666 """ 1667 Edit an attachment. 1668 1669 The list of params that can be edited is available at 1670 https://hub.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment 1671 1672 :param attachment: The attachment to be edited 1673 :param data: The data parameter should be a dictionary of the fields to edit. 1674 :return: The edited attachment 1675 """ 1676 1677 args = { 1678 **self.__fields_for_path(), 1679 "attachment_id": attachment.id, 1680 } 1681 result = self.allspice_client.requests_patch( 1682 self.ATTACHMENT_PATH.format(**args), 1683 data=data, 1684 ) 1685 return Attachment.parse_response(self.allspice_client, result)
Edit an attachment.
The list of params that can be edited is available at allspice.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment
Parameters
- attachment: The attachment to be edited
- data: The data parameter should be a dictionary of the fields to edit.
Returns
The edited attachment
1687 def delete_attachment(self, attachment: Attachment): 1688 """https://hub.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment""" 1689 1690 args = { 1691 **self.__fields_for_path(), 1692 "attachment_id": attachment.id, 1693 } 1694 self.allspice_client.requests_delete(self.ATTACHMENT_PATH.format(**args)) 1695 attachment.deleted = True
allspice.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment
2515class Content(ReadonlyApiObject): 2516 content: Any 2517 download_url: str 2518 encoding: Any 2519 git_url: str 2520 html_url: str 2521 last_commit_sha: str 2522 name: str 2523 path: str 2524 sha: str 2525 size: int 2526 submodule_git_url: Any 2527 target: Any 2528 type: str 2529 url: str 2530 2531 FILE = "file" 2532 2533 def __init__(self, allspice_client): 2534 super().__init__(allspice_client) 2535 2536 def __eq__(self, other): 2537 if not isinstance(other, Content): 2538 return False 2539 2540 return self.sha == other.sha and self.name == other.name 2541 2542 def __hash__(self): 2543 return hash(self.sha) ^ hash(self.name)
Inherited Members
2019class DesignReview(ApiObject): 2020 """ 2021 A Design Review. See 2022 https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest. 2023 2024 Note: The base and head fields are not `Branch` objects - they are plain strings 2025 referring to the branch names. This is because DRs can exist for branches that have 2026 been deleted, which don't have an associated `Branch` object from the API. You can use 2027 the `Repository.get_branch` method to get a `Branch` object for a branch if you know 2028 it exists. 2029 """ 2030 2031 allow_maintainer_edit: bool 2032 allow_maintainer_edits: Any 2033 assignee: User 2034 assignees: List["User"] 2035 base: str 2036 body: str 2037 closed_at: Any 2038 comments: int 2039 created_at: str 2040 diff_url: str 2041 due_date: Optional[str] 2042 head: str 2043 html_url: str 2044 id: int 2045 is_locked: bool 2046 labels: List[Any] 2047 merge_base: str 2048 merge_commit_sha: Any 2049 mergeable: bool 2050 merged: bool 2051 merged_at: Any 2052 merged_by: Any 2053 milestone: Any 2054 number: int 2055 patch_url: str 2056 pin_order: int 2057 repository: Optional["Repository"] 2058 requested_reviewers: Any 2059 state: str 2060 title: str 2061 updated_at: str 2062 url: str 2063 user: User 2064 2065 API_OBJECT = "/repos/{owner}/{repo}/pulls/{index}" 2066 MERGE_DESIGN_REVIEW = "/repos/{owner}/{repo}/pulls/{index}/merge" 2067 GET_COMMENTS = "/repos/{owner}/{repo}/issues/{index}/comments" 2068 2069 OPEN = "open" 2070 CLOSED = "closed" 2071 2072 class MergeType(Enum): 2073 MERGE = "merge" 2074 REBASE = "rebase" 2075 REBASE_MERGE = "rebase-merge" 2076 SQUASH = "squash" 2077 MANUALLY_MERGED = "manually-merged" 2078 2079 def __init__(self, allspice_client): 2080 super().__init__(allspice_client) 2081 2082 def __eq__(self, other): 2083 if not isinstance(other, DesignReview): 2084 return False 2085 return self.repository == other.repository and self.id == other.id 2086 2087 def __hash__(self): 2088 return hash(self.repository) ^ hash(self.id) 2089 2090 @classmethod 2091 def parse_response(cls, allspice_client, result) -> "DesignReview": 2092 api_object = super().parse_response(allspice_client, result) 2093 cls._add_read_property( 2094 "repository", 2095 Repository.parse_response(allspice_client, result["base"]["repo"]), 2096 api_object, 2097 ) 2098 2099 return api_object 2100 2101 @classmethod 2102 def request(cls, allspice_client, owner: str, repo: str, number: str): 2103 """See https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest""" 2104 return cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number}) 2105 2106 _fields_to_parsers: ClassVar[dict] = { 2107 "assignee": lambda allspice_client, u: User.parse_response(allspice_client, u), 2108 "assignees": lambda allspice_client, us: [ 2109 User.parse_response(allspice_client, u) for u in us 2110 ], 2111 "base": lambda _, b: b["ref"], 2112 "head": lambda _, h: h["ref"], 2113 "merged_by": lambda allspice_client, u: User.parse_response(allspice_client, u), 2114 "milestone": lambda allspice_client, m: Milestone.parse_response(allspice_client, m), 2115 "user": lambda allspice_client, u: User.parse_response(allspice_client, u), 2116 } 2117 2118 _patchable_fields: ClassVar[set[str]] = { 2119 "allow_maintainer_edits", 2120 "assignee", 2121 "assignees", 2122 "base", 2123 "body", 2124 "due_date", 2125 "milestone", 2126 "state", 2127 "title", 2128 } 2129 2130 _parsers_to_fields: ClassVar[dict] = { 2131 "assignee": lambda u: u.username, 2132 "assignees": lambda us: [u.username for u in us], 2133 "base": lambda b: b.name if isinstance(b, Branch) else b, 2134 "milestone": lambda m: m.id, 2135 } 2136 2137 def commit(self): 2138 data = self.get_dirty_fields() 2139 if "due_date" in data and data["due_date"] is None: 2140 data["unset_due_date"] = True 2141 2142 args = { 2143 "owner": self.repository.owner.username, 2144 "repo": self.repository.name, 2145 "index": self.number, 2146 } 2147 self._commit(args, data) 2148 2149 def merge(self, merge_type: MergeType): 2150 """ 2151 Merge the pull request. See 2152 https://hub.allspice.io/api/swagger#/repository/repoMergePullRequest 2153 2154 :param merge_type: The type of merge to perform. See the MergeType enum. 2155 """ 2156 2157 self.allspice_client.requests_put( 2158 self.MERGE_DESIGN_REVIEW.format( 2159 owner=self.repository.owner.username, 2160 repo=self.repository.name, 2161 index=self.number, 2162 ), 2163 data={"Do": merge_type.value}, 2164 ) 2165 2166 def get_comments(self) -> List[Comment]: 2167 """ 2168 Get the comments on this pull request, but not specifically on a review. 2169 2170 https://hub.allspice.io/api/swagger#/issue/issueGetComments 2171 2172 :return: A list of comments on this pull request. 2173 """ 2174 2175 results = self.allspice_client.requests_get( 2176 self.GET_COMMENTS.format( 2177 owner=self.repository.owner.username, 2178 repo=self.repository.name, 2179 index=self.number, 2180 ) 2181 ) 2182 return [Comment.parse_response(self.allspice_client, result) for result in results] 2183 2184 def create_comment(self, body: str): 2185 """ 2186 Create a comment on this pull request. This uses the same endpoint as the 2187 comments on issues, and will not be associated with any reviews. 2188 2189 https://hub.allspice.io/api/swagger#/issue/issueCreateComment 2190 2191 :param body: The body of the comment. 2192 :return: The comment that was created. 2193 """ 2194 2195 result = self.allspice_client.requests_post( 2196 self.GET_COMMENTS.format( 2197 owner=self.repository.owner.username, 2198 repo=self.repository.name, 2199 index=self.number, 2200 ), 2201 data={"body": body}, 2202 ) 2203 return Comment.parse_response(self.allspice_client, result)
A Design Review. See allspice.allspice.io/api/swagger#/repository/repoGetPullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoGetPullRequest.
Note: The base and head fields are not Branch
objects - they are plain strings
referring to the branch names. This is because DRs can exist for branches that have
been deleted, which don't have an associated Branch
object from the API. You can use
the Repository.get_branch
method to get a Branch
object for a branch if you know
it exists.
2090 @classmethod 2091 def parse_response(cls, allspice_client, result) -> "DesignReview": 2092 api_object = super().parse_response(allspice_client, result) 2093 cls._add_read_property( 2094 "repository", 2095 Repository.parse_response(allspice_client, result["base"]["repo"]), 2096 api_object, 2097 ) 2098 2099 return api_object
2101 @classmethod 2102 def request(cls, allspice_client, owner: str, repo: str, number: str): 2103 """See https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest""" 2104 return cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number})
See allspice.allspice.io/api/swagger#/repository/repoGetPullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoGetPullRequest
2137 def commit(self): 2138 data = self.get_dirty_fields() 2139 if "due_date" in data and data["due_date"] is None: 2140 data["unset_due_date"] = True 2141 2142 args = { 2143 "owner": self.repository.owner.username, 2144 "repo": self.repository.name, 2145 "index": self.number, 2146 } 2147 self._commit(args, data)
2149 def merge(self, merge_type: MergeType): 2150 """ 2151 Merge the pull request. See 2152 https://hub.allspice.io/api/swagger#/repository/repoMergePullRequest 2153 2154 :param merge_type: The type of merge to perform. See the MergeType enum. 2155 """ 2156 2157 self.allspice_client.requests_put( 2158 self.MERGE_DESIGN_REVIEW.format( 2159 owner=self.repository.owner.username, 2160 repo=self.repository.name, 2161 index=self.number, 2162 ), 2163 data={"Do": merge_type.value}, 2164 )
Merge the pull request. See allspice.allspice.io/api/swagger#/repository/repoMergePullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoMergePullRequest
Parameters
- merge_type: The type of merge to perform. See the MergeType enum.
2166 def get_comments(self) -> List[Comment]: 2167 """ 2168 Get the comments on this pull request, but not specifically on a review. 2169 2170 https://hub.allspice.io/api/swagger#/issue/issueGetComments 2171 2172 :return: A list of comments on this pull request. 2173 """ 2174 2175 results = self.allspice_client.requests_get( 2176 self.GET_COMMENTS.format( 2177 owner=self.repository.owner.username, 2178 repo=self.repository.name, 2179 index=self.number, 2180 ) 2181 ) 2182 return [Comment.parse_response(self.allspice_client, result) for result in results]
Get the comments on this pull request, but not specifically on a review.
allspice.allspice.io/api/swagger#/issue/issueGetComments">https://huballspice.allspice.io/api/swagger#/issue/issueGetComments
Returns
A list of comments on this pull request.
2184 def create_comment(self, body: str): 2185 """ 2186 Create a comment on this pull request. This uses the same endpoint as the 2187 comments on issues, and will not be associated with any reviews. 2188 2189 https://hub.allspice.io/api/swagger#/issue/issueCreateComment 2190 2191 :param body: The body of the comment. 2192 :return: The comment that was created. 2193 """ 2194 2195 result = self.allspice_client.requests_post( 2196 self.GET_COMMENTS.format( 2197 owner=self.repository.owner.username, 2198 repo=self.repository.name, 2199 index=self.number, 2200 ), 2201 data={"body": body}, 2202 ) 2203 return Comment.parse_response(self.allspice_client, result)
Create a comment on this pull request. This uses the same endpoint as the comments on issues, and will not be associated with any reviews.
allspice.allspice.io/api/swagger#/issue/issueCreateComment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateComment
Parameters
- body: The body of the comment.
Returns
The comment that was created.
2072 class MergeType(Enum): 2073 MERGE = "merge" 2074 REBASE = "rebase" 2075 REBASE_MERGE = "rebase-merge" 2076 SQUASH = "squash" 2077 MANUALLY_MERGED = "manually-merged"
Inherited Members
- enum.Enum
- name
- value
2289class Release(ApiObject): 2290 """ 2291 A release on a repo. 2292 """ 2293 2294 assets: List[Union[Any, Dict[str, Union[int, str]], "ReleaseAsset"]] 2295 author: User 2296 body: str 2297 created_at: str 2298 draft: bool 2299 html_url: str 2300 id: int 2301 name: str 2302 prerelease: bool 2303 published_at: str 2304 repo: Optional["Repository"] 2305 repository: Optional["Repository"] 2306 tag_name: str 2307 tarball_url: str 2308 target_commitish: str 2309 upload_url: str 2310 url: str 2311 zipball_url: str 2312 2313 API_OBJECT = "/repos/{owner}/{repo}/releases/{id}" 2314 RELEASE_CREATE_ASSET = "/repos/{owner}/{repo}/releases/{id}/assets" 2315 # Note that we don't strictly need the get_assets route, as the release 2316 # object already contains the assets. 2317 2318 def __init__(self, allspice_client): 2319 super().__init__(allspice_client) 2320 2321 def __eq__(self, other): 2322 if not isinstance(other, Release): 2323 return False 2324 return self.repo == other.repo and self.id == other.id 2325 2326 def __hash__(self): 2327 return hash(self.repo) ^ hash(self.id) 2328 2329 _fields_to_parsers: ClassVar[dict] = { 2330 "author": lambda allspice_client, author: User.parse_response(allspice_client, author), 2331 } 2332 _patchable_fields: ClassVar[set[str]] = { 2333 "body", 2334 "draft", 2335 "name", 2336 "prerelease", 2337 "tag_name", 2338 "target_commitish", 2339 } 2340 2341 @classmethod 2342 def parse_response(cls, allspice_client, result, repo) -> Release: 2343 release = super().parse_response(allspice_client, result) 2344 Release._add_read_property("repository", repo, release) 2345 # For legacy reasons 2346 Release._add_read_property("repo", repo, release) 2347 setattr( 2348 release, 2349 "_assets", 2350 [ 2351 ReleaseAsset.parse_response(allspice_client, asset, release) 2352 for asset in result["assets"] 2353 ], 2354 ) 2355 return release 2356 2357 @classmethod 2358 def request( 2359 cls, 2360 allspice_client, 2361 owner: str, 2362 repo: str, 2363 id: Optional[int] = None, 2364 ) -> Release: 2365 args = {"owner": owner, "repo": repo, "id": id} 2366 release_response = cls._get_gitea_api_object(allspice_client, args) 2367 repository = Repository.request(allspice_client, owner, repo) 2368 release = cls.parse_response(allspice_client, release_response, repository) 2369 return release 2370 2371 def commit(self): 2372 args = {"owner": self.repo.owner.name, "repo": self.repo.name, "id": self.id} 2373 self._commit(args) 2374 2375 def create_asset(self, file: IO, name: Optional[str] = None) -> ReleaseAsset: 2376 """ 2377 Create an asset for this release. 2378 2379 https://hub.allspice.io/api/swagger#/repository/repoCreateReleaseAsset 2380 2381 :param file: The file to upload. This should be a file-like object. 2382 :param name: The name of the file. 2383 :return: The created asset. 2384 """ 2385 2386 args: dict[str, Any] = {"files": {"attachment": file}} 2387 if name is not None: 2388 args["params"] = {"name": name} 2389 2390 result = self.allspice_client.requests_post( 2391 self.RELEASE_CREATE_ASSET.format( 2392 owner=self.repo.owner.username, 2393 repo=self.repo.name, 2394 id=self.id, 2395 ), 2396 **args, 2397 ) 2398 return ReleaseAsset.parse_response(self.allspice_client, result, self) 2399 2400 def delete(self): 2401 args = {"owner": self.repo.owner.name, "repo": self.repo.name, "id": self.id} 2402 self.allspice_client.requests_delete(self.API_OBJECT.format(**args)) 2403 self.deleted = True
A release on a repo.
2341 @classmethod 2342 def parse_response(cls, allspice_client, result, repo) -> Release: 2343 release = super().parse_response(allspice_client, result) 2344 Release._add_read_property("repository", repo, release) 2345 # For legacy reasons 2346 Release._add_read_property("repo", repo, release) 2347 setattr( 2348 release, 2349 "_assets", 2350 [ 2351 ReleaseAsset.parse_response(allspice_client, asset, release) 2352 for asset in result["assets"] 2353 ], 2354 ) 2355 return release
2357 @classmethod 2358 def request( 2359 cls, 2360 allspice_client, 2361 owner: str, 2362 repo: str, 2363 id: Optional[int] = None, 2364 ) -> Release: 2365 args = {"owner": owner, "repo": repo, "id": id} 2366 release_response = cls._get_gitea_api_object(allspice_client, args) 2367 repository = Repository.request(allspice_client, owner, repo) 2368 release = cls.parse_response(allspice_client, release_response, repository) 2369 return release
2375 def create_asset(self, file: IO, name: Optional[str] = None) -> ReleaseAsset: 2376 """ 2377 Create an asset for this release. 2378 2379 https://hub.allspice.io/api/swagger#/repository/repoCreateReleaseAsset 2380 2381 :param file: The file to upload. This should be a file-like object. 2382 :param name: The name of the file. 2383 :return: The created asset. 2384 """ 2385 2386 args: dict[str, Any] = {"files": {"attachment": file}} 2387 if name is not None: 2388 args["params"] = {"name": name} 2389 2390 result = self.allspice_client.requests_post( 2391 self.RELEASE_CREATE_ASSET.format( 2392 owner=self.repo.owner.username, 2393 repo=self.repo.name, 2394 id=self.id, 2395 ), 2396 **args, 2397 ) 2398 return ReleaseAsset.parse_response(self.allspice_client, result, self)
Create an asset for this release.
allspice.allspice.io/api/swagger#/repository/repoCreateReleaseAsset">https://huballspice.allspice.io/api/swagger#/repository/repoCreateReleaseAsset
Parameters
- file: The file to upload. This should be a file-like object.
- name: The name of the file.
Returns
The created asset.