allspice
A very simple API client for AllSpice Hub
Note that not the full Swagger-API is accessible. The whole implementation is focused on making access and working with Organizations, Teams, Repositories and Users as pain free as possible.
Forked from https://github.com/Langenfeld/py-gitea.
Usage
Docs
See the documentation site.
Examples
Check the examples directory for full, working example scripts that you can adapt or refer to for your own needs.
Quickstart
First get an allspice_client
object wrapping access and authentication (via an api token) for your instance of AllSpice Hub.
from allspice import *
# By default, points to hub.allspice.io.
allspice_client = AllSpice(token_text=TOKEN)
# If you are self-hosting:
allspice_client = AllSpice(allspice_hub_url=URL, token_text=TOKEN)
Operations like requesting the AllSpice version or authentication user can be requested directly from the allspice_client
object:
print("AllSpice Version: " + allspice_client.get_version())
print("API-Token belongs to user: " + allspice_client.get_user().username)
Adding entities like Users, Organizations, ... also is done via the allspice_client object.
user = allspice_client.create_user("Test Testson", "test@test.test", "password")
All operations on entities in allspice are then accomplished via the according wrapper objects for those entities.
Each of those objects has a .request
method that creates an entity according to your allspice_client instance.
other_user = User.request(allspice_client, "OtherUserName")
print(other_user.username)
Note that the fields of the User, Organization,... classes are dynamically created at runtime, and thus not visible during divelopment. Refer to the AllSpice API documentation for the fields names.
Fields that can not be altered via allspice-api, are read only. After altering a field, the .commit
method of the according object must be called to synchronize the changed fields with your allspice_client instance.
org = Organization.request(allspice_client, test_org)
org.description = "some new description"
org.location = "some new location"
org.commit()
An entity in allspice can be deleted by calling delete.
org.delete()
All entity objects do have methods to execute some of the requests possible though the AllSpice api:
org = Organization.request(allspice_client, ORGNAME)
teams = org.get_teams()
for team in teams:
repos = team.get_repos()
for repo in repos:
print(repo.name)
1""" 2.. include:: ../README.md 3 :start-line: 1 4 :end-before: Installation 5""" 6 7from .allspice import ( 8 AllSpice, 9) 10from .apiobject import ( 11 Branch, 12 Comment, 13 Commit, 14 Content, 15 DesignReview, 16 DesignReviewReview, 17 Issue, 18 Milestone, 19 Organization, 20 Release, 21 Repository, 22 Team, 23 User, 24) 25from .exceptions import AlreadyExistsException, NotFoundException 26 27__version__ = "3.10.1" 28 29__all__ = [ 30 "AllSpice", 31 "AlreadyExistsException", 32 "Branch", 33 "Comment", 34 "Commit", 35 "Content", 36 "DesignReview", 37 "DesignReviewReview", 38 "Issue", 39 "Milestone", 40 "NotFoundException", 41 "Organization", 42 "Release", 43 "Repository", 44 "Team", 45 "User", 46]
21class AllSpice: 22 """Object to establish a session with AllSpice Hub.""" 23 24 ADMIN_CREATE_USER = """/admin/users""" 25 GET_USERS_ADMIN = """/admin/users""" 26 ADMIN_REPO_CREATE = """/admin/users/%s/repos""" # <ownername> 27 ALLSPICE_HUB_VERSION = """/version""" 28 GET_USER = """/user""" 29 GET_REPOSITORY = """/repos/{owner}/{name}""" 30 CREATE_ORG = """/admin/users/%s/orgs""" # <username> 31 CREATE_TEAM = """/orgs/%s/teams""" # <orgname> 32 33 def __init__( 34 self, 35 allspice_hub_url="https://hub.allspice.io", 36 token_text=None, 37 auth=None, 38 verify=True, 39 log_level="INFO", 40 ratelimiting=(100, 60), 41 ): 42 """Initializing an instance of the AllSpice Hub Client 43 44 Args: 45 allspice_hub_url (str): The URL for the AllSpice Hub instance. 46 Defaults to `https://hub.allspice.io`. 47 48 token_text (str, None): The access token, by default None. 49 50 auth (tuple, None): The user credentials 51 `(username, password)`, by default None. 52 53 verify (bool): If True, allow insecure server connections 54 when using SSL. 55 56 log_level (str): The log level, by default `INFO`. 57 58 ratelimiting (tuple[int, int], None): `(max_calls, period)`, 59 If None, no rate limiting is applied. By default, 100 calls 60 per minute are allowed. 61 """ 62 63 self.logger = logging.getLogger(__name__) 64 handler = logging.StreamHandler(sys.stderr) 65 handler.setFormatter( 66 logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") 67 ) 68 self.logger.addHandler(handler) 69 self.logger.setLevel(log_level) 70 self.headers = { 71 "Content-type": "application/json", 72 } 73 self.url = allspice_hub_url 74 75 if ratelimiting is None: 76 self.requests = requests.Session() 77 else: 78 (max_calls, period) = ratelimiting 79 self.requests = RateLimitedSession(max_calls=max_calls, period=period) 80 81 # Manage authentification 82 if not token_text and not auth: 83 raise ValueError("Please provide auth or token_text, but not both") 84 if token_text: 85 self.headers["Authorization"] = "token " + token_text 86 if auth: 87 self.logger.warning( 88 "Using basic auth is not recommended. Prefer using a token instead." 89 ) 90 self.requests.auth = auth 91 92 # Manage SSL certification verification 93 self.requests.verify = verify 94 if not verify: 95 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) 96 97 def __get_url(self, endpoint): 98 url = self.url + "/api/v1" + endpoint 99 self.logger.debug("Url: %s" % url) 100 return url 101 102 def __get(self, endpoint: str, params: Mapping = frozendict()) -> requests.Response: 103 request = self.requests.get(self.__get_url(endpoint), headers=self.headers, params=params) 104 if request.status_code not in [200, 201]: 105 message = f"Received status code: {request.status_code} ({request.url})" 106 if request.status_code in [404]: 107 raise NotFoundException(message) 108 if request.status_code in [403]: 109 raise Exception( 110 f"Unauthorized: {request.url} - Check your permissions and try again! ({message})" 111 ) 112 if request.status_code in [409]: 113 raise ConflictException(message) 114 if request.status_code in [503]: 115 raise NotYetGeneratedException(message) 116 raise Exception(message) 117 return request 118 119 @staticmethod 120 def parse_result(result) -> Dict: 121 """Parses the result-JSON to a dict.""" 122 if result.text and len(result.text) > 3: 123 return json.loads(result.text) 124 return {} 125 126 def requests_get(self, endpoint: str, params: Mapping = frozendict(), sudo=None): 127 combined_params = {} 128 combined_params.update(params) 129 if sudo: 130 combined_params["sudo"] = sudo.username 131 return self.parse_result(self.__get(endpoint, combined_params)) 132 133 def requests_get_raw(self, endpoint: str, params=frozendict(), sudo=None) -> bytes: 134 combined_params = {} 135 combined_params.update(params) 136 if sudo: 137 combined_params["sudo"] = sudo.username 138 return self.__get(endpoint, combined_params).content 139 140 def requests_get_paginated( 141 self, 142 endpoint: str, 143 params=frozendict(), 144 sudo=None, 145 page_key: str = "page", 146 first_page: int = 1, 147 ): 148 page = first_page 149 combined_params = {} 150 combined_params.update(params) 151 aggregated_result = [] 152 while True: 153 combined_params[page_key] = page 154 result = self.requests_get(endpoint, combined_params, sudo) 155 156 if not result: 157 return aggregated_result 158 159 if isinstance(result, dict): 160 if "data" in result: 161 data = result["data"] 162 if len(data) == 0: 163 return aggregated_result 164 aggregated_result.extend(data) 165 elif "tree" in result: 166 data = result["tree"] 167 if data is None or len(data) == 0: 168 return aggregated_result 169 aggregated_result.extend(data) 170 else: 171 raise NotImplementedError( 172 "requests_get_paginated does not know how to handle responses of this type." 173 ) 174 else: 175 aggregated_result.extend(result) 176 177 page += 1 178 179 def requests_put(self, endpoint: str, data: Optional[dict] = None): 180 if not data: 181 data = {} 182 request = self.requests.put( 183 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 184 ) 185 if request.status_code not in [200, 204]: 186 message = f"Received status code: {request.status_code} ({request.url}) {request.text}" 187 self.logger.error(message) 188 raise Exception(message) 189 190 def requests_delete(self, endpoint: str, data: Optional[dict] = None): 191 request = self.requests.delete( 192 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 193 ) 194 if request.status_code not in [200, 204]: 195 message = f"Received status code: {request.status_code} ({request.url})" 196 self.logger.error(message) 197 raise Exception(message) 198 199 def requests_post( 200 self, 201 endpoint: str, 202 data: Optional[dict] = None, 203 params: Optional[dict] = None, 204 files: Optional[dict] = None, 205 ): 206 """ 207 Make a POST call to the endpoint. 208 209 :param endpoint: The path to the endpoint 210 :param data: A dictionary for JSON data 211 :param params: A dictionary of query params 212 :param files: A dictionary of files, see requests.post. Using both files and data 213 can lead to unexpected results! 214 :return: The JSON response parsed as a dict 215 """ 216 217 # This should ideally be a TypedDict of the type of arguments taken by 218 # `requests.post`. 219 args: dict[str, Any] = { 220 "headers": self.headers.copy(), 221 } 222 if data is not None: 223 args["data"] = json.dumps(data) 224 if params is not None: 225 args["params"] = params 226 if files is not None: 227 args["headers"].pop("Content-type") 228 args["files"] = files 229 230 request = self.requests.post(self.__get_url(endpoint), **args) 231 232 if request.status_code not in [200, 201, 202]: 233 if "already exists" in request.text or "e-mail already in use" in request.text: 234 self.logger.warning(request.text) 235 raise AlreadyExistsException() 236 self.logger.error(f"Received status code: {request.status_code} ({request.url})") 237 self.logger.error(f"With info: {data} ({self.headers})") 238 self.logger.error(f"Answer: {request.text}") 239 raise Exception( 240 f"Received status code: {request.status_code} ({request.url}), {request.text}" 241 ) 242 return self.parse_result(request) 243 244 def requests_patch(self, endpoint: str, data: dict): 245 request = self.requests.patch( 246 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 247 ) 248 if request.status_code not in [200, 201]: 249 error_message = f"Received status code: {request.status_code} ({request.url}) {data}" 250 self.logger.error(error_message) 251 raise Exception(error_message) 252 return self.parse_result(request) 253 254 def get_orgs_public_members_all(self, orgname): 255 path = "/orgs/" + orgname + "/public_members" 256 return self.requests_get(path) 257 258 def get_orgs(self): 259 path = "/admin/orgs" 260 results = self.requests_get(path) 261 return [Organization.parse_response(self, result) for result in results] 262 263 def get_user(self): 264 result = self.requests_get(AllSpice.GET_USER) 265 return User.parse_response(self, result) 266 267 def get_version(self) -> str: 268 result = self.requests_get(AllSpice.ALLSPICE_HUB_VERSION) 269 return result["version"] 270 271 def get_users(self) -> List[User]: 272 results = self.requests_get(AllSpice.GET_USERS_ADMIN) 273 return [User.parse_response(self, result) for result in results] 274 275 def get_user_by_email(self, email: str) -> Optional[User]: 276 users = self.get_users() 277 for user in users: 278 if user.email == email or email in user.emails: 279 return user 280 return None 281 282 def get_user_by_name(self, username: str) -> Optional[User]: 283 users = self.get_users() 284 for user in users: 285 if user.username == username: 286 return user 287 return None 288 289 def get_repository(self, owner: str, name: str) -> Repository: 290 path = self.GET_REPOSITORY.format(owner=owner, name=name) 291 result = self.requests_get(path) 292 return Repository.parse_response(self, result) 293 294 def create_user( 295 self, 296 user_name: str, 297 email: str, 298 password: str, 299 full_name: Optional[str] = None, 300 login_name: Optional[str] = None, 301 change_pw=True, 302 send_notify=True, 303 source_id=0, 304 ): 305 """Create User. 306 Throws: 307 AlreadyExistsException, if the User exists already 308 Exception, if something else went wrong. 309 """ 310 if not login_name: 311 login_name = user_name 312 if not full_name: 313 full_name = user_name 314 request_data = { 315 "source_id": source_id, 316 "login_name": login_name, 317 "full_name": full_name, 318 "username": user_name, 319 "email": email, 320 "password": password, 321 "send_notify": send_notify, 322 "must_change_password": change_pw, 323 } 324 325 self.logger.debug("Gitea post payload: %s", request_data) 326 result = self.requests_post(AllSpice.ADMIN_CREATE_USER, data=request_data) 327 if "id" in result: 328 self.logger.info( 329 "Successfully created User %s <%s> (id %s)", 330 result["login"], 331 result["email"], 332 result["id"], 333 ) 334 self.logger.debug("Gitea response: %s", result) 335 else: 336 self.logger.error(result["message"]) 337 raise Exception("User not created... (gitea: %s)" % result["message"]) 338 user = User.parse_response(self, result) 339 return user 340 341 def create_repo( 342 self, 343 repoOwner: Union[User, Organization], 344 repoName: str, 345 description: str = "", 346 private: bool = False, 347 autoInit=True, 348 gitignores: Optional[str] = None, 349 license: Optional[str] = None, 350 readme: str = "Default", 351 issue_labels: Optional[str] = None, 352 default_branch="master", 353 ): 354 """Create a Repository as the administrator 355 356 Throws: 357 AlreadyExistsException: If the Repository exists already. 358 Exception: If something else went wrong. 359 360 Note: 361 Non-admin users can not use this method. Please use instead 362 `allspice.User.create_repo` or `allspice.Organization.create_repo`. 363 """ 364 # although this only says user in the api, this also works for 365 # organizations 366 assert isinstance(repoOwner, User) or isinstance(repoOwner, Organization) 367 result = self.requests_post( 368 AllSpice.ADMIN_REPO_CREATE % repoOwner.username, 369 data={ 370 "name": repoName, 371 "description": description, 372 "private": private, 373 "auto_init": autoInit, 374 "gitignores": gitignores, 375 "license": license, 376 "issue_labels": issue_labels, 377 "readme": readme, 378 "default_branch": default_branch, 379 }, 380 ) 381 if "id" in result: 382 self.logger.info("Successfully created Repository %s " % result["name"]) 383 else: 384 self.logger.error(result["message"]) 385 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 386 return Repository.parse_response(self, result) 387 388 def create_org( 389 self, 390 owner: User, 391 orgName: str, 392 description: str, 393 location="", 394 website="", 395 full_name="", 396 ): 397 assert isinstance(owner, User) 398 result = self.requests_post( 399 AllSpice.CREATE_ORG % owner.username, 400 data={ 401 "username": orgName, 402 "description": description, 403 "location": location, 404 "website": website, 405 "full_name": full_name, 406 }, 407 ) 408 if "id" in result: 409 self.logger.info("Successfully created Organization %s" % result["username"]) 410 else: 411 self.logger.error("Organization not created... (gitea: %s)" % result["message"]) 412 self.logger.error(result["message"]) 413 raise Exception("Organization not created... (gitea: %s)" % result["message"]) 414 return Organization.parse_response(self, result) 415 416 def create_team( 417 self, 418 org: Organization, 419 name: str, 420 description: str = "", 421 permission: str = "read", 422 can_create_org_repo: bool = False, 423 includes_all_repositories: bool = False, 424 units=( 425 "repo.code", 426 "repo.issues", 427 "repo.ext_issues", 428 "repo.wiki", 429 "repo.pulls", 430 "repo.releases", 431 "repo.ext_wiki", 432 ), 433 units_map={}, 434 ): 435 """Creates a Team. 436 437 Args: 438 org (Organization): Organization the Team will be part of. 439 name (str): The Name of the Team to be created. 440 description (str): Optional, None, short description of the new Team. 441 permission (str): Optional, 'read', What permissions the members 442 units_map (dict): Optional, {}, a mapping of units to their 443 permissions. If None or empty, the `permission` permission will 444 be applied to all units. Note: When both `units` and `units_map` 445 are given, `units_map` will be preferred. 446 """ 447 448 result = self.requests_post( 449 AllSpice.CREATE_TEAM % org.username, 450 data={ 451 "name": name, 452 "description": description, 453 "permission": permission, 454 "can_create_org_repo": can_create_org_repo, 455 "includes_all_repositories": includes_all_repositories, 456 "units": units, 457 "units_map": units_map, 458 }, 459 ) 460 461 if "id" in result: 462 self.logger.info("Successfully created Team %s" % result["name"]) 463 else: 464 self.logger.error("Team not created... (gitea: %s)" % result["message"]) 465 self.logger.error(result["message"]) 466 raise Exception("Team not created... (gitea: %s)" % result["message"]) 467 api_object = Team.parse_response(self, result) 468 setattr( 469 api_object, "_organization", org 470 ) # fixes strange behaviour of gitea not returning a valid organization here. 471 return api_object
Object to establish a session with AllSpice Hub.
33 def __init__( 34 self, 35 allspice_hub_url="https://hub.allspice.io", 36 token_text=None, 37 auth=None, 38 verify=True, 39 log_level="INFO", 40 ratelimiting=(100, 60), 41 ): 42 """Initializing an instance of the AllSpice Hub Client 43 44 Args: 45 allspice_hub_url (str): The URL for the AllSpice Hub instance. 46 Defaults to `https://hub.allspice.io`. 47 48 token_text (str, None): The access token, by default None. 49 50 auth (tuple, None): The user credentials 51 `(username, password)`, by default None. 52 53 verify (bool): If True, allow insecure server connections 54 when using SSL. 55 56 log_level (str): The log level, by default `INFO`. 57 58 ratelimiting (tuple[int, int], None): `(max_calls, period)`, 59 If None, no rate limiting is applied. By default, 100 calls 60 per minute are allowed. 61 """ 62 63 self.logger = logging.getLogger(__name__) 64 handler = logging.StreamHandler(sys.stderr) 65 handler.setFormatter( 66 logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") 67 ) 68 self.logger.addHandler(handler) 69 self.logger.setLevel(log_level) 70 self.headers = { 71 "Content-type": "application/json", 72 } 73 self.url = allspice_hub_url 74 75 if ratelimiting is None: 76 self.requests = requests.Session() 77 else: 78 (max_calls, period) = ratelimiting 79 self.requests = RateLimitedSession(max_calls=max_calls, period=period) 80 81 # Manage authentification 82 if not token_text and not auth: 83 raise ValueError("Please provide auth or token_text, but not both") 84 if token_text: 85 self.headers["Authorization"] = "token " + token_text 86 if auth: 87 self.logger.warning( 88 "Using basic auth is not recommended. Prefer using a token instead." 89 ) 90 self.requests.auth = auth 91 92 # Manage SSL certification verification 93 self.requests.verify = verify 94 if not verify: 95 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
Initializing an instance of the AllSpice Hub Client
Args:
allspice_hub_url (str): The URL for the AllSpice Hub instance.
Defaults to https://hub.allspice.io
.
token_text (str, None): The access token, by default None.
auth (tuple, None): The user credentials
`(username, password)`, by default None.
verify (bool): If True, allow insecure server connections
when using SSL.
log_level (str): The log level, by default `INFO`.
ratelimiting (tuple[int, int], None): `(max_calls, period)`,
If None, no rate limiting is applied. By default, 100 calls
per minute are allowed.
119 @staticmethod 120 def parse_result(result) -> Dict: 121 """Parses the result-JSON to a dict.""" 122 if result.text and len(result.text) > 3: 123 return json.loads(result.text) 124 return {}
Parses the result-JSON to a dict.
140 def requests_get_paginated( 141 self, 142 endpoint: str, 143 params=frozendict(), 144 sudo=None, 145 page_key: str = "page", 146 first_page: int = 1, 147 ): 148 page = first_page 149 combined_params = {} 150 combined_params.update(params) 151 aggregated_result = [] 152 while True: 153 combined_params[page_key] = page 154 result = self.requests_get(endpoint, combined_params, sudo) 155 156 if not result: 157 return aggregated_result 158 159 if isinstance(result, dict): 160 if "data" in result: 161 data = result["data"] 162 if len(data) == 0: 163 return aggregated_result 164 aggregated_result.extend(data) 165 elif "tree" in result: 166 data = result["tree"] 167 if data is None or len(data) == 0: 168 return aggregated_result 169 aggregated_result.extend(data) 170 else: 171 raise NotImplementedError( 172 "requests_get_paginated does not know how to handle responses of this type." 173 ) 174 else: 175 aggregated_result.extend(result) 176 177 page += 1
179 def requests_put(self, endpoint: str, data: Optional[dict] = None): 180 if not data: 181 data = {} 182 request = self.requests.put( 183 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 184 ) 185 if request.status_code not in [200, 204]: 186 message = f"Received status code: {request.status_code} ({request.url}) {request.text}" 187 self.logger.error(message) 188 raise Exception(message)
190 def requests_delete(self, endpoint: str, data: Optional[dict] = None): 191 request = self.requests.delete( 192 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 193 ) 194 if request.status_code not in [200, 204]: 195 message = f"Received status code: {request.status_code} ({request.url})" 196 self.logger.error(message) 197 raise Exception(message)
199 def requests_post( 200 self, 201 endpoint: str, 202 data: Optional[dict] = None, 203 params: Optional[dict] = None, 204 files: Optional[dict] = None, 205 ): 206 """ 207 Make a POST call to the endpoint. 208 209 :param endpoint: The path to the endpoint 210 :param data: A dictionary for JSON data 211 :param params: A dictionary of query params 212 :param files: A dictionary of files, see requests.post. Using both files and data 213 can lead to unexpected results! 214 :return: The JSON response parsed as a dict 215 """ 216 217 # This should ideally be a TypedDict of the type of arguments taken by 218 # `requests.post`. 219 args: dict[str, Any] = { 220 "headers": self.headers.copy(), 221 } 222 if data is not None: 223 args["data"] = json.dumps(data) 224 if params is not None: 225 args["params"] = params 226 if files is not None: 227 args["headers"].pop("Content-type") 228 args["files"] = files 229 230 request = self.requests.post(self.__get_url(endpoint), **args) 231 232 if request.status_code not in [200, 201, 202]: 233 if "already exists" in request.text or "e-mail already in use" in request.text: 234 self.logger.warning(request.text) 235 raise AlreadyExistsException() 236 self.logger.error(f"Received status code: {request.status_code} ({request.url})") 237 self.logger.error(f"With info: {data} ({self.headers})") 238 self.logger.error(f"Answer: {request.text}") 239 raise Exception( 240 f"Received status code: {request.status_code} ({request.url}), {request.text}" 241 ) 242 return self.parse_result(request)
Make a POST call to the endpoint.
Parameters
- endpoint: The path to the endpoint
- data: A dictionary for JSON data
- params: A dictionary of query params
- files: A dictionary of files, see requests.post. Using both files and data can lead to unexpected results!
Returns
The JSON response parsed as a dict
244 def requests_patch(self, endpoint: str, data: dict): 245 request = self.requests.patch( 246 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 247 ) 248 if request.status_code not in [200, 201]: 249 error_message = f"Received status code: {request.status_code} ({request.url}) {data}" 250 self.logger.error(error_message) 251 raise Exception(error_message) 252 return self.parse_result(request)
294 def create_user( 295 self, 296 user_name: str, 297 email: str, 298 password: str, 299 full_name: Optional[str] = None, 300 login_name: Optional[str] = None, 301 change_pw=True, 302 send_notify=True, 303 source_id=0, 304 ): 305 """Create User. 306 Throws: 307 AlreadyExistsException, if the User exists already 308 Exception, if something else went wrong. 309 """ 310 if not login_name: 311 login_name = user_name 312 if not full_name: 313 full_name = user_name 314 request_data = { 315 "source_id": source_id, 316 "login_name": login_name, 317 "full_name": full_name, 318 "username": user_name, 319 "email": email, 320 "password": password, 321 "send_notify": send_notify, 322 "must_change_password": change_pw, 323 } 324 325 self.logger.debug("Gitea post payload: %s", request_data) 326 result = self.requests_post(AllSpice.ADMIN_CREATE_USER, data=request_data) 327 if "id" in result: 328 self.logger.info( 329 "Successfully created User %s <%s> (id %s)", 330 result["login"], 331 result["email"], 332 result["id"], 333 ) 334 self.logger.debug("Gitea response: %s", result) 335 else: 336 self.logger.error(result["message"]) 337 raise Exception("User not created... (gitea: %s)" % result["message"]) 338 user = User.parse_response(self, result) 339 return user
Create User. Throws: AlreadyExistsException, if the User exists already Exception, if something else went wrong.
341 def create_repo( 342 self, 343 repoOwner: Union[User, Organization], 344 repoName: str, 345 description: str = "", 346 private: bool = False, 347 autoInit=True, 348 gitignores: Optional[str] = None, 349 license: Optional[str] = None, 350 readme: str = "Default", 351 issue_labels: Optional[str] = None, 352 default_branch="master", 353 ): 354 """Create a Repository as the administrator 355 356 Throws: 357 AlreadyExistsException: If the Repository exists already. 358 Exception: If something else went wrong. 359 360 Note: 361 Non-admin users can not use this method. Please use instead 362 `allspice.User.create_repo` or `allspice.Organization.create_repo`. 363 """ 364 # although this only says user in the api, this also works for 365 # organizations 366 assert isinstance(repoOwner, User) or isinstance(repoOwner, Organization) 367 result = self.requests_post( 368 AllSpice.ADMIN_REPO_CREATE % repoOwner.username, 369 data={ 370 "name": repoName, 371 "description": description, 372 "private": private, 373 "auto_init": autoInit, 374 "gitignores": gitignores, 375 "license": license, 376 "issue_labels": issue_labels, 377 "readme": readme, 378 "default_branch": default_branch, 379 }, 380 ) 381 if "id" in result: 382 self.logger.info("Successfully created Repository %s " % result["name"]) 383 else: 384 self.logger.error(result["message"]) 385 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 386 return Repository.parse_response(self, result)
Create a Repository as the administrator
Throws: AlreadyExistsException: If the Repository exists already. Exception: If something else went wrong.
Note:
Non-admin users can not use this method. Please use instead
allspice.User.create_repo
or allspice.Organization.create_repo
.
388 def create_org( 389 self, 390 owner: User, 391 orgName: str, 392 description: str, 393 location="", 394 website="", 395 full_name="", 396 ): 397 assert isinstance(owner, User) 398 result = self.requests_post( 399 AllSpice.CREATE_ORG % owner.username, 400 data={ 401 "username": orgName, 402 "description": description, 403 "location": location, 404 "website": website, 405 "full_name": full_name, 406 }, 407 ) 408 if "id" in result: 409 self.logger.info("Successfully created Organization %s" % result["username"]) 410 else: 411 self.logger.error("Organization not created... (gitea: %s)" % result["message"]) 412 self.logger.error(result["message"]) 413 raise Exception("Organization not created... (gitea: %s)" % result["message"]) 414 return Organization.parse_response(self, result)
416 def create_team( 417 self, 418 org: Organization, 419 name: str, 420 description: str = "", 421 permission: str = "read", 422 can_create_org_repo: bool = False, 423 includes_all_repositories: bool = False, 424 units=( 425 "repo.code", 426 "repo.issues", 427 "repo.ext_issues", 428 "repo.wiki", 429 "repo.pulls", 430 "repo.releases", 431 "repo.ext_wiki", 432 ), 433 units_map={}, 434 ): 435 """Creates a Team. 436 437 Args: 438 org (Organization): Organization the Team will be part of. 439 name (str): The Name of the Team to be created. 440 description (str): Optional, None, short description of the new Team. 441 permission (str): Optional, 'read', What permissions the members 442 units_map (dict): Optional, {}, a mapping of units to their 443 permissions. If None or empty, the `permission` permission will 444 be applied to all units. Note: When both `units` and `units_map` 445 are given, `units_map` will be preferred. 446 """ 447 448 result = self.requests_post( 449 AllSpice.CREATE_TEAM % org.username, 450 data={ 451 "name": name, 452 "description": description, 453 "permission": permission, 454 "can_create_org_repo": can_create_org_repo, 455 "includes_all_repositories": includes_all_repositories, 456 "units": units, 457 "units_map": units_map, 458 }, 459 ) 460 461 if "id" in result: 462 self.logger.info("Successfully created Team %s" % result["name"]) 463 else: 464 self.logger.error("Team not created... (gitea: %s)" % result["message"]) 465 self.logger.error(result["message"]) 466 raise Exception("Team not created... (gitea: %s)" % result["message"]) 467 api_object = Team.parse_response(self, result) 468 setattr( 469 api_object, "_organization", org 470 ) # fixes strange behaviour of gitea not returning a valid organization here. 471 return api_object
Creates a Team.
Args:
org (Organization): Organization the Team will be part of.
name (str): The Name of the Team to be created.
description (str): Optional, None, short description of the new Team.
permission (str): Optional, 'read', What permissions the members
units_map (dict): Optional, {}, a mapping of units to their
permissions. If None or empty, the permission
permission will
be applied to all units. Note: When both units
and units_map
are given, units_map
will be preferred.
Common base class for all non-exit exceptions.
422class Branch(ReadonlyApiObject): 423 commit: Dict[str, Optional[Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]]] 424 effective_branch_protection_name: str 425 enable_status_check: bool 426 name: str 427 protected: bool 428 required_approvals: int 429 status_check_contexts: List[Any] 430 user_can_merge: bool 431 user_can_push: bool 432 433 API_OBJECT = """/repos/{owner}/{repo}/branches/{branch}""" 434 435 def __init__(self, allspice_client): 436 super().__init__(allspice_client) 437 438 def __eq__(self, other): 439 if not isinstance(other, Branch): 440 return False 441 return self.commit == other.commit and self.name == other.name 442 443 def __hash__(self): 444 return hash(self.commit["id"]) ^ hash(self.name) 445 446 _fields_to_parsers: ClassVar[dict] = { 447 # This is not a commit object 448 # "commit": lambda allspice_client, c: Commit.parse_response(allspice_client, c) 449 } 450 451 @classmethod 452 def request(cls, allspice_client, owner: str, repo: str, branch: str): 453 return cls._request(allspice_client, {"owner": owner, "repo": repo, "branch": branch})
Inherited Members
1595class Comment(ApiObject): 1596 assets: List[Union[Any, Dict[str, Union[int, str]]]] 1597 body: str 1598 created_at: datetime 1599 html_url: str 1600 id: int 1601 issue_url: str 1602 original_author: str 1603 original_author_id: int 1604 pull_request_url: str 1605 updated_at: datetime 1606 user: User 1607 1608 API_OBJECT = """/repos/{owner}/{repo}/issues/comments/{id}""" 1609 GET_ATTACHMENTS_PATH = """/repos/{owner}/{repo}/issues/comments/{id}/assets""" 1610 ATTACHMENT_PATH = """/repos/{owner}/{repo}/issues/comments/{id}/assets/{attachment_id}""" 1611 1612 def __init__(self, allspice_client): 1613 super().__init__(allspice_client) 1614 1615 def __eq__(self, other): 1616 if not isinstance(other, Comment): 1617 return False 1618 return self.repository == other.repository and self.id == other.id 1619 1620 def __hash__(self): 1621 return hash(self.repository) ^ hash(self.id) 1622 1623 @classmethod 1624 def request(cls, allspice_client, owner: str, repo: str, id: str) -> "Comment": 1625 return cls._request(allspice_client, {"owner": owner, "repo": repo, "id": id}) 1626 1627 _fields_to_parsers: ClassVar[dict] = { 1628 "user": lambda allspice_client, r: User.parse_response(allspice_client, r), 1629 "created_at": lambda _, t: Util.convert_time(t), 1630 "updated_at": lambda _, t: Util.convert_time(t), 1631 } 1632 1633 _patchable_fields: ClassVar[set[str]] = {"body"} 1634 1635 @property 1636 def parent_url(self) -> str: 1637 """URL of the parent of this comment (the issue or the pull request)""" 1638 1639 if self.issue_url is not None and self.issue_url != "": 1640 return self.issue_url 1641 else: 1642 return self.pull_request_url 1643 1644 @cached_property 1645 def repository(self) -> Repository: 1646 """The repository this comment was posted on.""" 1647 1648 owner_name, repo_name = self.parent_url.split("/")[-4:-2] 1649 return Repository.request(self.allspice_client, owner_name, repo_name) 1650 1651 def __fields_for_path(self): 1652 return { 1653 "owner": self.repository.owner.username, 1654 "repo": self.repository.name, 1655 "id": self.id, 1656 } 1657 1658 def commit(self): 1659 self._commit(self.__fields_for_path()) 1660 1661 def delete(self): 1662 self.allspice_client.requests_delete(self.API_OBJECT.format(**self.__fields_for_path())) 1663 self.deleted = True 1664 1665 def get_attachments(self) -> List[Attachment]: 1666 """ 1667 Get all attachments on this comment. This returns Attachment objects, which 1668 contain a link to download the attachment. 1669 1670 https://hub.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments 1671 """ 1672 1673 results = self.allspice_client.requests_get( 1674 self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()) 1675 ) 1676 return [Attachment.parse_response(self.allspice_client, result) for result in results] 1677 1678 def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment: 1679 """ 1680 Create an attachment on this comment. 1681 1682 https://hub.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment 1683 1684 :param file: The file to attach. This should be a file-like object. 1685 :param name: The name of the file. If not provided, the name of the file will be 1686 used. 1687 :return: The created attachment. 1688 """ 1689 1690 args: dict[str, Any] = { 1691 "files": {"attachment": file}, 1692 } 1693 if name is not None: 1694 args["params"] = {"name": name} 1695 1696 result = self.allspice_client.requests_post( 1697 self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()), 1698 **args, 1699 ) 1700 return Attachment.parse_response(self.allspice_client, result) 1701 1702 def edit_attachment(self, attachment: Attachment, data: dict) -> Attachment: 1703 """ 1704 Edit an attachment. 1705 1706 The list of params that can be edited is available at 1707 https://hub.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment 1708 1709 :param attachment: The attachment to be edited 1710 :param data: The data parameter should be a dictionary of the fields to edit. 1711 :return: The edited attachment 1712 """ 1713 1714 args = { 1715 **self.__fields_for_path(), 1716 "attachment_id": attachment.id, 1717 } 1718 result = self.allspice_client.requests_patch( 1719 self.ATTACHMENT_PATH.format(**args), 1720 data=data, 1721 ) 1722 return Attachment.parse_response(self.allspice_client, result) 1723 1724 def delete_attachment(self, attachment: Attachment): 1725 """https://hub.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment""" 1726 1727 args = { 1728 **self.__fields_for_path(), 1729 "attachment_id": attachment.id, 1730 } 1731 self.allspice_client.requests_delete(self.ATTACHMENT_PATH.format(**args)) 1732 attachment.deleted = True
1635 @property 1636 def parent_url(self) -> str: 1637 """URL of the parent of this comment (the issue or the pull request)""" 1638 1639 if self.issue_url is not None and self.issue_url != "": 1640 return self.issue_url 1641 else: 1642 return self.pull_request_url
URL of the parent of this comment (the issue or the pull request)
1644 @cached_property 1645 def repository(self) -> Repository: 1646 """The repository this comment was posted on.""" 1647 1648 owner_name, repo_name = self.parent_url.split("/")[-4:-2] 1649 return Repository.request(self.allspice_client, owner_name, repo_name)
The repository this comment was posted on.
1665 def get_attachments(self) -> List[Attachment]: 1666 """ 1667 Get all attachments on this comment. This returns Attachment objects, which 1668 contain a link to download the attachment. 1669 1670 https://hub.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments 1671 """ 1672 1673 results = self.allspice_client.requests_get( 1674 self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()) 1675 ) 1676 return [Attachment.parse_response(self.allspice_client, result) for result in results]
Get all attachments on this comment. This returns Attachment objects, which contain a link to download the attachment.
allspice.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments">https://huballspice.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments
1678 def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment: 1679 """ 1680 Create an attachment on this comment. 1681 1682 https://hub.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment 1683 1684 :param file: The file to attach. This should be a file-like object. 1685 :param name: The name of the file. If not provided, the name of the file will be 1686 used. 1687 :return: The created attachment. 1688 """ 1689 1690 args: dict[str, Any] = { 1691 "files": {"attachment": file}, 1692 } 1693 if name is not None: 1694 args["params"] = {"name": name} 1695 1696 result = self.allspice_client.requests_post( 1697 self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()), 1698 **args, 1699 ) 1700 return Attachment.parse_response(self.allspice_client, result)
Create an attachment on this comment.
allspice.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment
Parameters
- file: The file to attach. This should be a file-like object.
- name: The name of the file. If not provided, the name of the file will be used.
Returns
The created attachment.
1702 def edit_attachment(self, attachment: Attachment, data: dict) -> Attachment: 1703 """ 1704 Edit an attachment. 1705 1706 The list of params that can be edited is available at 1707 https://hub.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment 1708 1709 :param attachment: The attachment to be edited 1710 :param data: The data parameter should be a dictionary of the fields to edit. 1711 :return: The edited attachment 1712 """ 1713 1714 args = { 1715 **self.__fields_for_path(), 1716 "attachment_id": attachment.id, 1717 } 1718 result = self.allspice_client.requests_patch( 1719 self.ATTACHMENT_PATH.format(**args), 1720 data=data, 1721 ) 1722 return Attachment.parse_response(self.allspice_client, result)
Edit an attachment.
The list of params that can be edited is available at allspice.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment
Parameters
- attachment: The attachment to be edited
- data: The data parameter should be a dictionary of the fields to edit.
Returns
The edited attachment
1724 def delete_attachment(self, attachment: Attachment): 1725 """https://hub.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment""" 1726 1727 args = { 1728 **self.__fields_for_path(), 1729 "attachment_id": attachment.id, 1730 } 1731 self.allspice_client.requests_delete(self.ATTACHMENT_PATH.format(**args)) 1732 attachment.deleted = True
allspice.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment
1735class Commit(ReadonlyApiObject): 1736 author: User 1737 commit: Dict[str, Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]] 1738 committer: Dict[str, Union[int, str, bool]] 1739 created: str 1740 files: List[Dict[str, str]] 1741 html_url: str 1742 inner_commit: Dict[str, Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]] 1743 parents: List[Union[Dict[str, str], Any]] 1744 sha: str 1745 stats: Dict[str, int] 1746 url: str 1747 1748 API_OBJECT = """/repos/{owner}/{repo}/commits/{sha}""" 1749 COMMIT_GET_STATUS = """/repos/{owner}/{repo}/commits/{sha}/status""" 1750 COMMIT_GET_STATUSES = """/repos/{owner}/{repo}/commits/{sha}/statuses""" 1751 1752 # Regex to extract owner and repo names from the url property 1753 URL_REGEXP = re.compile(r"/repos/([^/]+)/([^/]+)/git/commits") 1754 1755 def __init__(self, allspice_client): 1756 super().__init__(allspice_client) 1757 1758 _fields_to_parsers: ClassVar[dict] = { 1759 # NOTE: api may return None for commiters that are no allspice users 1760 "author": lambda allspice_client, u: ( 1761 User.parse_response(allspice_client, u) if u else None 1762 ) 1763 } 1764 1765 def __eq__(self, other): 1766 if not isinstance(other, Commit): 1767 return False 1768 return self.sha == other.sha 1769 1770 def __hash__(self): 1771 return hash(self.sha) 1772 1773 @classmethod 1774 def parse_response(cls, allspice_client, result) -> "Commit": 1775 commit_cache = result["commit"] 1776 api_object = cls(allspice_client) 1777 cls._initialize(allspice_client, api_object, result) 1778 # inner_commit for legacy reasons 1779 Commit._add_read_property("inner_commit", commit_cache, api_object) 1780 return api_object 1781 1782 def get_status(self) -> CommitCombinedStatus: 1783 """ 1784 Get a combined status consisting of all statues on this commit. 1785 1786 Note that the returned object is a CommitCombinedStatus object, which 1787 also contains a list of all statuses on the commit. 1788 1789 https://hub.allspice.io/api/swagger#/repository/repoGetCommitStatus 1790 """ 1791 1792 result = self.allspice_client.requests_get( 1793 self.COMMIT_GET_STATUS.format(**self._fields_for_path) 1794 ) 1795 return CommitCombinedStatus.parse_response(self.allspice_client, result) 1796 1797 def get_statuses(self) -> List[CommitStatus]: 1798 """ 1799 Get a list of all statuses on this commit. 1800 1801 https://hub.allspice.io/api/swagger#/repository/repoListCommitStatuses 1802 """ 1803 1804 results = self.allspice_client.requests_get( 1805 self.COMMIT_GET_STATUSES.format(**self._fields_for_path) 1806 ) 1807 return [CommitStatus.parse_response(self.allspice_client, result) for result in results] 1808 1809 @cached_property 1810 def _fields_for_path(self) -> dict[str, str]: 1811 matches = self.URL_REGEXP.search(self.url) 1812 if not matches: 1813 raise ValueError(f"Invalid commit URL: {self.url}") 1814 1815 return { 1816 "owner": matches.group(1), 1817 "repo": matches.group(2), 1818 "sha": self.sha, 1819 }
1773 @classmethod 1774 def parse_response(cls, allspice_client, result) -> "Commit": 1775 commit_cache = result["commit"] 1776 api_object = cls(allspice_client) 1777 cls._initialize(allspice_client, api_object, result) 1778 # inner_commit for legacy reasons 1779 Commit._add_read_property("inner_commit", commit_cache, api_object) 1780 return api_object
1782 def get_status(self) -> CommitCombinedStatus: 1783 """ 1784 Get a combined status consisting of all statues on this commit. 1785 1786 Note that the returned object is a CommitCombinedStatus object, which 1787 also contains a list of all statuses on the commit. 1788 1789 https://hub.allspice.io/api/swagger#/repository/repoGetCommitStatus 1790 """ 1791 1792 result = self.allspice_client.requests_get( 1793 self.COMMIT_GET_STATUS.format(**self._fields_for_path) 1794 ) 1795 return CommitCombinedStatus.parse_response(self.allspice_client, result)
Get a combined status consisting of all statues on this commit.
Note that the returned object is a CommitCombinedStatus object, which also contains a list of all statuses on the commit.
allspice.allspice.io/api/swagger#/repository/repoGetCommitStatus">https://huballspice.allspice.io/api/swagger#/repository/repoGetCommitStatus
1797 def get_statuses(self) -> List[CommitStatus]: 1798 """ 1799 Get a list of all statuses on this commit. 1800 1801 https://hub.allspice.io/api/swagger#/repository/repoListCommitStatuses 1802 """ 1803 1804 results = self.allspice_client.requests_get( 1805 self.COMMIT_GET_STATUSES.format(**self._fields_for_path) 1806 ) 1807 return [CommitStatus.parse_response(self.allspice_client, result) for result in results]
Get a list of all statuses on this commit.
allspice.allspice.io/api/swagger#/repository/repoListCommitStatuses">https://huballspice.allspice.io/api/swagger#/repository/repoListCommitStatuses
Inherited Members
2906class Content(ReadonlyApiObject): 2907 content: Any 2908 download_url: str 2909 encoding: Any 2910 git_url: str 2911 html_url: str 2912 last_commit_sha: str 2913 name: str 2914 path: str 2915 sha: str 2916 size: int 2917 submodule_git_url: Any 2918 target: Any 2919 type: str 2920 url: str 2921 2922 FILE = "file" 2923 2924 def __init__(self, allspice_client): 2925 super().__init__(allspice_client) 2926 2927 def __eq__(self, other): 2928 if not isinstance(other, Content): 2929 return False 2930 2931 return self.sha == other.sha and self.name == other.name 2932 2933 def __hash__(self): 2934 return hash(self.sha) ^ hash(self.name)
Inherited Members
2280class DesignReview(ApiObject): 2281 """ 2282 A Design Review. See 2283 https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest. 2284 2285 Note: The base and head fields are not `Branch` objects - they are plain strings 2286 referring to the branch names. This is because DRs can exist for branches that have 2287 been deleted, which don't have an associated `Branch` object from the API. You can use 2288 the `Repository.get_branch` method to get a `Branch` object for a branch if you know 2289 it exists. 2290 """ 2291 2292 additions: Optional[int] 2293 allow_maintainer_edit: bool 2294 allow_maintainer_edits: Any 2295 assignee: User 2296 assignees: List["User"] 2297 base: str 2298 body: str 2299 changed_files: Optional[int] 2300 closed_at: Optional[str] 2301 comments: int 2302 created_at: str 2303 deletions: Optional[int] 2304 diff_url: str 2305 draft: bool 2306 due_date: Optional[str] 2307 head: str 2308 html_url: str 2309 id: int 2310 is_locked: bool 2311 labels: List[Any] 2312 merge_base: str 2313 merge_commit_sha: Optional[str] 2314 mergeable: bool 2315 merged: bool 2316 merged_at: Optional[str] 2317 merged_by: Any 2318 milestone: Any 2319 number: int 2320 patch_url: str 2321 pin_order: int 2322 repository: Optional["Repository"] 2323 requested_reviewers: Any 2324 requested_reviewers_teams: Any 2325 review_comments: int 2326 state: str 2327 title: str 2328 updated_at: str 2329 url: str 2330 user: User 2331 2332 API_OBJECT = "/repos/{owner}/{repo}/pulls/{index}" 2333 MERGE_DESIGN_REVIEW = "/repos/{owner}/{repo}/pulls/{index}/merge" 2334 GET_REVIEWS = "/repos/{owner}/{repo}/pulls/{index}/reviews" 2335 GET_REVIEW = "/repos/{owner}/{repo}/pulls/{index}/reviews/{review_id}" 2336 GET_COMMENTS = "/repos/{owner}/{repo}/issues/{index}/comments" 2337 2338 OPEN = "open" 2339 CLOSED = "closed" 2340 2341 class MergeType(Enum): 2342 MERGE = "merge" 2343 REBASE = "rebase" 2344 REBASE_MERGE = "rebase-merge" 2345 SQUASH = "squash" 2346 MANUALLY_MERGED = "manually-merged" 2347 2348 def __init__(self, allspice_client): 2349 super().__init__(allspice_client) 2350 2351 def __eq__(self, other): 2352 if not isinstance(other, DesignReview): 2353 return False 2354 return self.repository == other.repository and self.id == other.id 2355 2356 def __hash__(self): 2357 return hash(self.repository) ^ hash(self.id) 2358 2359 @classmethod 2360 def parse_response(cls, allspice_client, result) -> "DesignReview": 2361 api_object = super().parse_response(allspice_client, result) 2362 cls._add_read_property( 2363 "repository", 2364 Repository.parse_response(allspice_client, result["base"]["repo"]), 2365 api_object, 2366 ) 2367 2368 return api_object 2369 2370 @classmethod 2371 def request(cls, allspice_client, owner: str, repo: str, number: str): 2372 """See https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest""" 2373 return cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number}) 2374 2375 _fields_to_parsers: ClassVar[dict] = { 2376 "assignee": lambda allspice_client, u: User.parse_response(allspice_client, u), 2377 "assignees": lambda allspice_client, us: [ 2378 User.parse_response(allspice_client, u) for u in us 2379 ], 2380 "base": lambda _, b: b["ref"], 2381 "head": lambda _, h: h["ref"], 2382 "merged_by": lambda allspice_client, u: User.parse_response(allspice_client, u), 2383 "milestone": lambda allspice_client, m: Milestone.parse_response(allspice_client, m), 2384 "user": lambda allspice_client, u: User.parse_response(allspice_client, u), 2385 } 2386 2387 _patchable_fields: ClassVar[set[str]] = { 2388 "allow_maintainer_edits", 2389 "assignee", 2390 "assignees", 2391 "base", 2392 "body", 2393 "due_date", 2394 "milestone", 2395 "state", 2396 "title", 2397 } 2398 2399 _parsers_to_fields: ClassVar[dict] = { 2400 "assignee": lambda u: u.username, 2401 "assignees": lambda us: [u.username for u in us], 2402 "base": lambda b: b.name if isinstance(b, Branch) else b, 2403 "milestone": lambda m: m.id, 2404 } 2405 2406 def commit(self): 2407 data = self.get_dirty_fields() 2408 if "due_date" in data and data["due_date"] is None: 2409 data["unset_due_date"] = True 2410 2411 args = { 2412 "owner": self.repository.owner.username, 2413 "repo": self.repository.name, 2414 "index": self.number, 2415 } 2416 self._commit(args, data) 2417 2418 def merge(self, merge_type: MergeType): 2419 """ 2420 Merge the pull request. See 2421 https://hub.allspice.io/api/swagger#/repository/repoMergePullRequest 2422 2423 :param merge_type: The type of merge to perform. See the MergeType enum. 2424 """ 2425 2426 self.allspice_client.requests_post( 2427 self.MERGE_DESIGN_REVIEW.format( 2428 owner=self.repository.owner.username, 2429 repo=self.repository.name, 2430 index=self.number, 2431 ), 2432 data={"Do": merge_type.value}, 2433 ) 2434 2435 def get_comments(self) -> List[Comment]: 2436 """ 2437 Get the comments on this pull request, but not specifically on a review. 2438 2439 https://hub.allspice.io/api/swagger#/issue/issueGetComments 2440 2441 :return: A list of comments on this pull request. 2442 """ 2443 2444 results = self.allspice_client.requests_get( 2445 self.GET_COMMENTS.format( 2446 owner=self.repository.owner.username, 2447 repo=self.repository.name, 2448 index=self.number, 2449 ) 2450 ) 2451 return [Comment.parse_response(self.allspice_client, result) for result in results] 2452 2453 def create_comment(self, body: str): 2454 """ 2455 Create a comment on this pull request. This uses the same endpoint as the 2456 comments on issues, and will not be associated with any reviews. 2457 2458 https://hub.allspice.io/api/swagger#/issue/issueCreateComment 2459 2460 :param body: The body of the comment. 2461 :return: The comment that was created. 2462 """ 2463 2464 result = self.allspice_client.requests_post( 2465 self.GET_COMMENTS.format( 2466 owner=self.repository.owner.username, 2467 repo=self.repository.name, 2468 index=self.number, 2469 ), 2470 data={"body": body}, 2471 ) 2472 return Comment.parse_response(self.allspice_client, result) 2473 2474 def create_review( 2475 self, 2476 *, 2477 body: Optional[str] = None, 2478 event: Optional[DesignReviewReview.ReviewEvent] = None, 2479 comments: Optional[List[DesignReviewReview.ReviewComment]] = None, 2480 commit_id: Optional[str] = None, 2481 ) -> DesignReviewReview: 2482 """ 2483 Create a review on this design review. 2484 2485 https://hub.allspice.io/api/swagger#/repository/repoCreatePullRequestReview 2486 2487 Note: in most cases, you should not set the body or event when creating 2488 a review. The event is automatically set to "PENDING" when the review 2489 is created. You should then use `submit_review` to submit the review 2490 with the desired event and body. 2491 2492 :param body: The body of the review. This is the top-level comment on 2493 the review. If not provided, the review will be created with no body. 2494 :param event: The event of the review. This is the overall status of the 2495 review. See the ReviewEvent enum. If not provided, the API will 2496 default to "PENDING". 2497 :param comments: A list of comments on the review. Each comment should 2498 be a ReviewComment object. If not provided, only the base comment 2499 will be created. 2500 :param commit_id: The commit SHA to associate with the review. This is 2501 optional. 2502 """ 2503 2504 data: dict[str, Any] = {} 2505 2506 if body is not None: 2507 data["body"] = body 2508 if event is not None: 2509 data["event"] = event.value 2510 if commit_id is not None: 2511 data["commit_id"] = commit_id 2512 if comments is not None: 2513 data["comments"] = [asdict(comment) for comment in comments] 2514 2515 result = self.allspice_client.requests_post( 2516 self.GET_REVIEWS.format( 2517 owner=self.repository.owner.username, 2518 repo=self.repository.name, 2519 index=self.number, 2520 ), 2521 data=data, 2522 ) 2523 2524 return DesignReviewReview.parse_response(self.allspice_client, result) 2525 2526 def get_reviews(self) -> List[DesignReviewReview]: 2527 """ 2528 Get all reviews on this design review. 2529 2530 https://hub.allspice.io/api/swagger#/repository/repoListPullReviews 2531 """ 2532 2533 results = self.allspice_client.requests_get( 2534 self.GET_REVIEWS.format( 2535 owner=self.repository.owner.username, 2536 repo=self.repository.name, 2537 index=self.number, 2538 ) 2539 ) 2540 2541 return [ 2542 DesignReviewReview.parse_response(self.allspice_client, result) for result in results 2543 ] 2544 2545 def submit_review( 2546 self, 2547 review_id: int, 2548 event: DesignReviewReview.ReviewEvent, 2549 *, 2550 body: Optional[str] = None, 2551 ): 2552 """ 2553 Submit a review on this design review. 2554 2555 https://hub.allspice.io/api/swagger#/repository/repoSubmitPullReview 2556 2557 :param review_id: The ID of the review to submit. 2558 :param event: The event to submit the review with. See the ReviewEvent 2559 enum for the possible values. 2560 :param body: Optional body text for the review submission. 2561 """ 2562 2563 data = { 2564 "event": event.value, 2565 } 2566 if body is not None: 2567 data["body"] = body 2568 2569 result = self.allspice_client.requests_post( 2570 self.GET_REVIEW.format( 2571 owner=self.repository.owner.username, 2572 repo=self.repository.name, 2573 index=self.number, 2574 review_id=review_id, 2575 ), 2576 data=data, 2577 ) 2578 2579 return result
A Design Review. See allspice.allspice.io/api/swagger#/repository/repoGetPullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoGetPullRequest.
Note: The base and head fields are not Branch
objects - they are plain strings
referring to the branch names. This is because DRs can exist for branches that have
been deleted, which don't have an associated Branch
object from the API. You can use
the Repository.get_branch
method to get a Branch
object for a branch if you know
it exists.
2359 @classmethod 2360 def parse_response(cls, allspice_client, result) -> "DesignReview": 2361 api_object = super().parse_response(allspice_client, result) 2362 cls._add_read_property( 2363 "repository", 2364 Repository.parse_response(allspice_client, result["base"]["repo"]), 2365 api_object, 2366 ) 2367 2368 return api_object
2370 @classmethod 2371 def request(cls, allspice_client, owner: str, repo: str, number: str): 2372 """See https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest""" 2373 return cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number})
See allspice.allspice.io/api/swagger#/repository/repoGetPullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoGetPullRequest
2406 def commit(self): 2407 data = self.get_dirty_fields() 2408 if "due_date" in data and data["due_date"] is None: 2409 data["unset_due_date"] = True 2410 2411 args = { 2412 "owner": self.repository.owner.username, 2413 "repo": self.repository.name, 2414 "index": self.number, 2415 } 2416 self._commit(args, data)
2418 def merge(self, merge_type: MergeType): 2419 """ 2420 Merge the pull request. See 2421 https://hub.allspice.io/api/swagger#/repository/repoMergePullRequest 2422 2423 :param merge_type: The type of merge to perform. See the MergeType enum. 2424 """ 2425 2426 self.allspice_client.requests_post( 2427 self.MERGE_DESIGN_REVIEW.format( 2428 owner=self.repository.owner.username, 2429 repo=self.repository.name, 2430 index=self.number, 2431 ), 2432 data={"Do": merge_type.value}, 2433 )
Merge the pull request. See allspice.allspice.io/api/swagger#/repository/repoMergePullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoMergePullRequest
Parameters
- merge_type: The type of merge to perform. See the MergeType enum.
2435 def get_comments(self) -> List[Comment]: 2436 """ 2437 Get the comments on this pull request, but not specifically on a review. 2438 2439 https://hub.allspice.io/api/swagger#/issue/issueGetComments 2440 2441 :return: A list of comments on this pull request. 2442 """ 2443 2444 results = self.allspice_client.requests_get( 2445 self.GET_COMMENTS.format( 2446 owner=self.repository.owner.username, 2447 repo=self.repository.name, 2448 index=self.number, 2449 ) 2450 ) 2451 return [Comment.parse_response(self.allspice_client, result) for result in results]
Get the comments on this pull request, but not specifically on a review.
allspice.allspice.io/api/swagger#/issue/issueGetComments">https://huballspice.allspice.io/api/swagger#/issue/issueGetComments
Returns
A list of comments on this pull request.
2453 def create_comment(self, body: str): 2454 """ 2455 Create a comment on this pull request. This uses the same endpoint as the 2456 comments on issues, and will not be associated with any reviews. 2457 2458 https://hub.allspice.io/api/swagger#/issue/issueCreateComment 2459 2460 :param body: The body of the comment. 2461 :return: The comment that was created. 2462 """ 2463 2464 result = self.allspice_client.requests_post( 2465 self.GET_COMMENTS.format( 2466 owner=self.repository.owner.username, 2467 repo=self.repository.name, 2468 index=self.number, 2469 ), 2470 data={"body": body}, 2471 ) 2472 return Comment.parse_response(self.allspice_client, result)
Create a comment on this pull request. This uses the same endpoint as the comments on issues, and will not be associated with any reviews.
allspice.allspice.io/api/swagger#/issue/issueCreateComment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateComment
Parameters
- body: The body of the comment.
Returns
The comment that was created.
2474 def create_review( 2475 self, 2476 *, 2477 body: Optional[str] = None, 2478 event: Optional[DesignReviewReview.ReviewEvent] = None, 2479 comments: Optional[List[DesignReviewReview.ReviewComment]] = None, 2480 commit_id: Optional[str] = None, 2481 ) -> DesignReviewReview: 2482 """ 2483 Create a review on this design review. 2484 2485 https://hub.allspice.io/api/swagger#/repository/repoCreatePullRequestReview 2486 2487 Note: in most cases, you should not set the body or event when creating 2488 a review. The event is automatically set to "PENDING" when the review 2489 is created. You should then use `submit_review` to submit the review 2490 with the desired event and body. 2491 2492 :param body: The body of the review. This is the top-level comment on 2493 the review. If not provided, the review will be created with no body. 2494 :param event: The event of the review. This is the overall status of the 2495 review. See the ReviewEvent enum. If not provided, the API will 2496 default to "PENDING". 2497 :param comments: A list of comments on the review. Each comment should 2498 be a ReviewComment object. If not provided, only the base comment 2499 will be created. 2500 :param commit_id: The commit SHA to associate with the review. This is 2501 optional. 2502 """ 2503 2504 data: dict[str, Any] = {} 2505 2506 if body is not None: 2507 data["body"] = body 2508 if event is not None: 2509 data["event"] = event.value 2510 if commit_id is not None: 2511 data["commit_id"] = commit_id 2512 if comments is not None: 2513 data["comments"] = [asdict(comment) for comment in comments] 2514 2515 result = self.allspice_client.requests_post( 2516 self.GET_REVIEWS.format( 2517 owner=self.repository.owner.username, 2518 repo=self.repository.name, 2519 index=self.number, 2520 ), 2521 data=data, 2522 ) 2523 2524 return DesignReviewReview.parse_response(self.allspice_client, result)
Create a review on this design review.
allspice.allspice.io/api/swagger#/repository/repoCreatePullRequestReview">https://huballspice.allspice.io/api/swagger#/repository/repoCreatePullRequestReview
Note: in most cases, you should not set the body or event when creating
a review. The event is automatically set to "PENDING" when the review
is created. You should then use submit_review
to submit the review
with the desired event and body.
Parameters
- body: The body of the review. This is the top-level comment on the review. If not provided, the review will be created with no body.
- event: The event of the review. This is the overall status of the review. See the ReviewEvent enum. If not provided, the API will default to "PENDING".
- comments: A list of comments on the review. Each comment should be a ReviewComment object. If not provided, only the base comment will be created.
- commit_id: The commit SHA to associate with the review. This is optional.
2526 def get_reviews(self) -> List[DesignReviewReview]: 2527 """ 2528 Get all reviews on this design review. 2529 2530 https://hub.allspice.io/api/swagger#/repository/repoListPullReviews 2531 """ 2532 2533 results = self.allspice_client.requests_get( 2534 self.GET_REVIEWS.format( 2535 owner=self.repository.owner.username, 2536 repo=self.repository.name, 2537 index=self.number, 2538 ) 2539 ) 2540 2541 return [ 2542 DesignReviewReview.parse_response(self.allspice_client, result) for result in results 2543 ]
Get all reviews on this design review.
allspice.allspice.io/api/swagger#/repository/repoListPullReviews">https://huballspice.allspice.io/api/swagger#/repository/repoListPullReviews
2545 def submit_review( 2546 self, 2547 review_id: int, 2548 event: DesignReviewReview.ReviewEvent, 2549 *, 2550 body: Optional[str] = None, 2551 ): 2552 """ 2553 Submit a review on this design review. 2554 2555 https://hub.allspice.io/api/swagger#/repository/repoSubmitPullReview 2556 2557 :param review_id: The ID of the review to submit. 2558 :param event: The event to submit the review with. See the ReviewEvent 2559 enum for the possible values. 2560 :param body: Optional body text for the review submission. 2561 """ 2562 2563 data = { 2564 "event": event.value, 2565 } 2566 if body is not None: 2567 data["body"] = body 2568 2569 result = self.allspice_client.requests_post( 2570 self.GET_REVIEW.format( 2571 owner=self.repository.owner.username, 2572 repo=self.repository.name, 2573 index=self.number, 2574 review_id=review_id, 2575 ), 2576 data=data, 2577 ) 2578 2579 return result
Submit a review on this design review.
allspice.allspice.io/api/swagger#/repository/repoSubmitPullReview">https://huballspice.allspice.io/api/swagger#/repository/repoSubmitPullReview
Parameters
- review_id: The ID of the review to submit.
- event: The event to submit the review with. See the ReviewEvent enum for the possible values.
- body: Optional body text for the review submission.
2144class DesignReviewReview(ReadonlyApiObject): 2145 """ 2146 A review on a Design Review. 2147 """ 2148 2149 body: str 2150 comments_count: int 2151 commit_id: str 2152 dismissed: bool 2153 html_url: str 2154 id: int 2155 official: bool 2156 pull_request_url: str 2157 stale: bool 2158 state: ReviewEvent 2159 submitted_at: str 2160 team: Any 2161 updated_at: str 2162 user: User 2163 2164 API_OBJECT = "/repos/{owner}/{repo}/pulls/{index}/reviews/{id}" 2165 GET_COMMENTS = "/repos/{owner}/{repo}/pulls/{index}/reviews/{id}/comments" 2166 2167 class ReviewEvent(Enum): 2168 APPROVED = "APPROVED" 2169 PENDING = "PENDING" 2170 COMMENT = "COMMENT" 2171 REQUEST_CHANGES = "REQUEST_CHANGES" 2172 REQUEST_REVIEW = "REQUEST_REVIEW" 2173 UNKNOWN = "" 2174 2175 @dataclass 2176 class ReviewComment: 2177 """ 2178 Data required to create a review comment on a design review. 2179 2180 :param body: The body of the comment. 2181 :param path: The path of the file to comment on. If you have a 2182 `Content` object, get the path using the `path` property. 2183 :param sub_path: The sub-path of the file to comment on. This is 2184 usually the page ID of the page in the multi-page document. 2185 :param new_position: The line number of the source code file after the 2186 change to add this comment on. Optional, leave unset if this is an ECAD 2187 file or the comment must be on the entire file. 2188 :param old_position: The line number of the source code file before the 2189 change to add this comment on. Optional, leave unset if this is an ECAD 2190 file or the comment must be on the entire file. 2191 """ 2192 2193 body: str 2194 path: str 2195 sub_path: Optional[str] = None 2196 new_position: Optional[int] = None 2197 old_position: Optional[int] = None 2198 2199 def __init__(self, allspice_client): 2200 super().__init__(allspice_client) 2201 2202 _fields_to_parsers: ClassVar[dict] = { 2203 "user": lambda allspice_client, u: User.parse_response(allspice_client, u), 2204 "state": lambda _, s: DesignReviewReview.ReviewEvent(s), 2205 } 2206 2207 def _get_dr_properties(self) -> dict[str, str]: 2208 """ 2209 Get the owner, repo name and design review number from the URL of this 2210 review's DR. 2211 """ 2212 2213 parts = self.pull_request_url.strip("/").split("/") 2214 2215 try: 2216 index = parts[-1] 2217 assert parts[-2] == "pulls" or parts[-2] == "pull", ( 2218 "Expected the second last part of the URL to be 'pulls' or 'pull', " 2219 ) 2220 repo = parts[-3] 2221 owner = parts[-4] 2222 2223 return { 2224 "owner": owner, 2225 "repo": repo, 2226 "index": index, 2227 } 2228 except IndexError: 2229 raise ValueError("Malformed design review URL: {}".format(self.pull_request_url)) 2230 2231 @cached_property 2232 def owner_name(self) -> str: 2233 """ 2234 The owner of the repository this review is on. 2235 """ 2236 2237 return self._get_dr_properties()["owner"] 2238 2239 @cached_property 2240 def repository_name(self) -> str: 2241 """ 2242 The name of the repository this review is on. 2243 """ 2244 2245 return self._get_dr_properties()["repo"] 2246 2247 @cached_property 2248 def index(self) -> str: 2249 """ 2250 The index of the design review this review is on. 2251 """ 2252 2253 return self._get_dr_properties()["index"] 2254 2255 def delete(self): 2256 """ 2257 Delete this review. 2258 """ 2259 2260 self.allspice_client.requests_delete( 2261 self.API_OBJECT.format(**self._get_dr_properties(), id=self.id) 2262 ) 2263 self.deleted = True 2264 2265 def get_comments(self) -> List[DesignReviewReviewComment]: 2266 """ 2267 Get the comments on this review. 2268 """ 2269 2270 result = self.allspice_client.requests_get( 2271 self.GET_COMMENTS.format(**self._get_dr_properties(), id=self.id) 2272 ) 2273 2274 return [ 2275 DesignReviewReviewComment.parse_response(self.allspice_client, comment) 2276 for comment in result 2277 ]
A review on a Design Review.
2231 @cached_property 2232 def owner_name(self) -> str: 2233 """ 2234 The owner of the repository this review is on. 2235 """ 2236 2237 return self._get_dr_properties()["owner"]
The owner of the repository this review is on.
2239 @cached_property 2240 def repository_name(self) -> str: 2241 """ 2242 The name of the repository this review is on. 2243 """ 2244 2245 return self._get_dr_properties()["repo"]
The name of the repository this review is on.
2247 @cached_property 2248 def index(self) -> str: 2249 """ 2250 The index of the design review this review is on. 2251 """ 2252 2253 return self._get_dr_properties()["index"]
The index of the design review this review is on.
2255 def delete(self): 2256 """ 2257 Delete this review. 2258 """ 2259 2260 self.allspice_client.requests_delete( 2261 self.API_OBJECT.format(**self._get_dr_properties(), id=self.id) 2262 ) 2263 self.deleted = True
Delete this review.
2265 def get_comments(self) -> List[DesignReviewReviewComment]: 2266 """ 2267 Get the comments on this review. 2268 """ 2269 2270 result = self.allspice_client.requests_get( 2271 self.GET_COMMENTS.format(**self._get_dr_properties(), id=self.id) 2272 ) 2273 2274 return [ 2275 DesignReviewReviewComment.parse_response(self.allspice_client, comment) 2276 for comment in result 2277 ]
Get the comments on this review.
Inherited Members
2175 @dataclass 2176 class ReviewComment: 2177 """ 2178 Data required to create a review comment on a design review. 2179 2180 :param body: The body of the comment. 2181 :param path: The path of the file to comment on. If you have a 2182 `Content` object, get the path using the `path` property. 2183 :param sub_path: The sub-path of the file to comment on. This is 2184 usually the page ID of the page in the multi-page document. 2185 :param new_position: The line number of the source code file after the 2186 change to add this comment on. Optional, leave unset if this is an ECAD 2187 file or the comment must be on the entire file. 2188 :param old_position: The line number of the source code file before the 2189 change to add this comment on. Optional, leave unset if this is an ECAD 2190 file or the comment must be on the entire file. 2191 """ 2192 2193 body: str 2194 path: str 2195 sub_path: Optional[str] = None 2196 new_position: Optional[int] = None 2197 old_position: Optional[int] = None
Data required to create a review comment on a design review.
Parameters
- body: The body of the comment.
- path: The path of the file to comment on. If you have a
Content
object, get the path using thepath
property. - sub_path: The sub-path of the file to comment on. This is usually the page ID of the page in the multi-page document.
- new_position: The line number of the source code file after the change to add this comment on. Optional, leave unset if this is an ECAD file or the comment must be on the entire file.
- old_position: The line number of the source code file before the change to add this comment on. Optional, leave unset if this is an ECAD file or the comment must be on the entire file.
1910class Issue(ApiObject): 1911 """ 1912 An issue on a repository. 1913 1914 Note: `Issue.assets` may not have any entries even if the issue has 1915 attachments. This happens when an issue is fetched via a bulk method like 1916 `Repository.get_issues`. In most cases, prefer using 1917 `Issue.get_attachments` to get the attachments on an issue. 1918 """ 1919 1920 assets: List[Union[Any, "Attachment"]] 1921 assignee: Any 1922 assignees: Any 1923 body: str 1924 closed_at: Any 1925 comments: int 1926 created_at: str 1927 due_date: Any 1928 html_url: str 1929 id: int 1930 is_locked: bool 1931 labels: List[Any] 1932 milestone: Optional["Milestone"] 1933 number: int 1934 original_author: str 1935 original_author_id: int 1936 pin_order: int 1937 pull_request: Any 1938 ref: str 1939 repository: Dict[str, Union[int, str]] 1940 state: str 1941 title: str 1942 updated_at: str 1943 url: str 1944 user: User 1945 1946 API_OBJECT = """/repos/{owner}/{repo}/issues/{index}""" # <owner, repo, index> 1947 GET_TIME = """/repos/%s/%s/issues/%s/times""" # <owner, repo, index> 1948 GET_COMMENTS = """/repos/{owner}/{repo}/issues/{index}/comments""" 1949 CREATE_ISSUE = """/repos/{owner}/{repo}/issues""" 1950 GET_ATTACHMENTS = """/repos/{owner}/{repo}/issues/{index}/assets""" 1951 1952 OPENED = "open" 1953 CLOSED = "closed" 1954 1955 def __init__(self, allspice_client): 1956 super().__init__(allspice_client) 1957 1958 def __eq__(self, other): 1959 if not isinstance(other, Issue): 1960 return False 1961 return self.repository == other.repository and self.id == other.id 1962 1963 def __hash__(self): 1964 return hash(self.repository) ^ hash(self.id) 1965 1966 _fields_to_parsers: ClassVar[dict] = { 1967 "milestone": lambda allspice_client, m: Milestone.parse_response(allspice_client, m), 1968 "user": lambda allspice_client, u: User.parse_response(allspice_client, u), 1969 "assets": lambda allspice_client, assets: [ 1970 Attachment.parse_response(allspice_client, a) for a in assets 1971 ], 1972 "assignee": lambda allspice_client, u: User.parse_response(allspice_client, u), 1973 "assignees": lambda allspice_client, us: [ 1974 User.parse_response(allspice_client, u) for u in us 1975 ], 1976 "state": lambda _, s: (Issue.CLOSED if s == "closed" else Issue.OPENED), 1977 } 1978 1979 _parsers_to_fields: ClassVar[dict] = { 1980 "milestone": lambda m: m.id, 1981 } 1982 1983 _patchable_fields: ClassVar[set[str]] = { 1984 "assignee", 1985 "assignees", 1986 "body", 1987 "due_date", 1988 "milestone", 1989 "state", 1990 "title", 1991 } 1992 1993 def commit(self): 1994 args = { 1995 "owner": self.repository.owner.username, 1996 "repo": self.repository.name, 1997 "index": self.number, 1998 } 1999 self._commit(args) 2000 2001 @classmethod 2002 def request(cls, allspice_client, owner: str, repo: str, number: str): 2003 api_object = cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number}) 2004 # The repository in the response is a RepositoryMeta object, so request 2005 # the full repository object and add it to the issue object. 2006 repository = Repository.request(allspice_client, owner, repo) 2007 setattr(api_object, "_repository", repository) 2008 # For legacy reasons 2009 cls._add_read_property("repo", repository, api_object) 2010 return api_object 2011 2012 @classmethod 2013 def create_issue(cls, allspice_client, repo: Repository, title: str, body: str = ""): 2014 args = {"owner": repo.owner.username, "repo": repo.name} 2015 data = {"title": title, "body": body} 2016 result = allspice_client.requests_post(Issue.CREATE_ISSUE.format(**args), data=data) 2017 issue = Issue.parse_response(allspice_client, result) 2018 setattr(issue, "_repository", repo) 2019 cls._add_read_property("repo", repo, issue) 2020 return issue 2021 2022 @property 2023 def owner(self) -> Organization | User: 2024 return self.repository.owner 2025 2026 def get_time_sum(self, user: User) -> int: 2027 results = self.allspice_client.requests_get( 2028 Issue.GET_TIME % (self.owner.username, self.repository.name, self.number) 2029 ) 2030 return sum(result["time"] for result in results if result and result["user_id"] == user.id) 2031 2032 def get_times(self) -> Optional[Dict]: 2033 return self.allspice_client.requests_get( 2034 Issue.GET_TIME % (self.owner.username, self.repository.name, self.number) 2035 ) 2036 2037 def delete_time(self, time_id: str): 2038 path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times/{time_id}" 2039 self.allspice_client.requests_delete(path) 2040 2041 def add_time(self, time: int, created: Optional[str] = None, user_name: Optional[User] = None): 2042 path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times" 2043 self.allspice_client.requests_post( 2044 path, data={"created": created, "time": int(time), "user_name": user_name} 2045 ) 2046 2047 def get_comments(self) -> List[Comment]: 2048 """https://hub.allspice.io/api/swagger#/issue/issueGetComments""" 2049 2050 results = self.allspice_client.requests_get( 2051 self.GET_COMMENTS.format( 2052 owner=self.owner.username, repo=self.repository.name, index=self.number 2053 ) 2054 ) 2055 2056 return [Comment.parse_response(self.allspice_client, result) for result in results] 2057 2058 def create_comment(self, body: str) -> Comment: 2059 """https://hub.allspice.io/api/swagger#/issue/issueCreateComment""" 2060 2061 path = self.GET_COMMENTS.format( 2062 owner=self.owner.username, repo=self.repository.name, index=self.number 2063 ) 2064 2065 response = self.allspice_client.requests_post(path, data={"body": body}) 2066 return Comment.parse_response(self.allspice_client, response) 2067 2068 def get_attachments(self) -> List[Attachment]: 2069 """ 2070 Fetch all attachments on this issue. 2071 2072 Unlike the assets field, this will always fetch all attachments from the 2073 API. 2074 2075 See https://hub.allspice.io/api/swagger#/issue/issueListIssueAttachments 2076 """ 2077 2078 path = self.GET_ATTACHMENTS.format( 2079 owner=self.owner.username, repo=self.repository.name, index=self.number 2080 ) 2081 response = self.allspice_client.requests_get(path) 2082 2083 return [Attachment.parse_response(self.allspice_client, result) for result in response] 2084 2085 def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment: 2086 """ 2087 Create an attachment on this issue. 2088 2089 https://hub.allspice.io/api/swagger#/issue/issueCreateIssueAttachment 2090 2091 :param file: The file to attach. This should be a file-like object. 2092 :param name: The name of the file. If not provided, the name of the file will be 2093 used. 2094 :return: The created attachment. 2095 """ 2096 2097 args: dict[str, Any] = { 2098 "files": {"attachment": file}, 2099 } 2100 if name is not None: 2101 args["params"] = {"name": name} 2102 2103 result = self.allspice_client.requests_post( 2104 self.GET_ATTACHMENTS.format( 2105 owner=self.owner.username, repo=self.repository.name, index=self.number 2106 ), 2107 **args, 2108 ) 2109 2110 return Attachment.parse_response(self.allspice_client, result)
An issue on a repository.
Note: Issue.assets
may not have any entries even if the issue has
attachments. This happens when an issue is fetched via a bulk method like
Repository.get_issues
. In most cases, prefer using
Issue.get_attachments
to get the attachments on an issue.
2001 @classmethod 2002 def request(cls, allspice_client, owner: str, repo: str, number: str): 2003 api_object = cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number}) 2004 # The repository in the response is a RepositoryMeta object, so request 2005 # the full repository object and add it to the issue object. 2006 repository = Repository.request(allspice_client, owner, repo) 2007 setattr(api_object, "_repository", repository) 2008 # For legacy reasons 2009 cls._add_read_property("repo", repository, api_object) 2010 return api_object
2012 @classmethod 2013 def create_issue(cls, allspice_client, repo: Repository, title: str, body: str = ""): 2014 args = {"owner": repo.owner.username, "repo": repo.name} 2015 data = {"title": title, "body": body} 2016 result = allspice_client.requests_post(Issue.CREATE_ISSUE.format(**args), data=data) 2017 issue = Issue.parse_response(allspice_client, result) 2018 setattr(issue, "_repository", repo) 2019 cls._add_read_property("repo", repo, issue) 2020 return issue
2041 def add_time(self, time: int, created: Optional[str] = None, user_name: Optional[User] = None): 2042 path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times" 2043 self.allspice_client.requests_post( 2044 path, data={"created": created, "time": int(time), "user_name": user_name} 2045 )
2047 def get_comments(self) -> List[Comment]: 2048 """https://hub.allspice.io/api/swagger#/issue/issueGetComments""" 2049 2050 results = self.allspice_client.requests_get( 2051 self.GET_COMMENTS.format( 2052 owner=self.owner.username, repo=self.repository.name, index=self.number 2053 ) 2054 ) 2055 2056 return [Comment.parse_response(self.allspice_client, result) for result in results]
allspice.allspice.io/api/swagger#/issue/issueGetComments">https://huballspice.allspice.io/api/swagger#/issue/issueGetComments
2058 def create_comment(self, body: str) -> Comment: 2059 """https://hub.allspice.io/api/swagger#/issue/issueCreateComment""" 2060 2061 path = self.GET_COMMENTS.format( 2062 owner=self.owner.username, repo=self.repository.name, index=self.number 2063 ) 2064 2065 response = self.allspice_client.requests_post(path, data={"body": body}) 2066 return Comment.parse_response(self.allspice_client, response)
allspice.allspice.io/api/swagger#/issue/issueCreateComment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateComment
2068 def get_attachments(self) -> List[Attachment]: 2069 """ 2070 Fetch all attachments on this issue. 2071 2072 Unlike the assets field, this will always fetch all attachments from the 2073 API. 2074 2075 See https://hub.allspice.io/api/swagger#/issue/issueListIssueAttachments 2076 """ 2077 2078 path = self.GET_ATTACHMENTS.format( 2079 owner=self.owner.username, repo=self.repository.name, index=self.number 2080 ) 2081 response = self.allspice_client.requests_get(path) 2082 2083 return [Attachment.parse_response(self.allspice_client, result) for result in response]
Fetch all attachments on this issue.
Unlike the assets field, this will always fetch all attachments from the API.
See allspice.allspice.io/api/swagger#/issue/issueListIssueAttachments">https://huballspice.allspice.io/api/swagger#/issue/issueListIssueAttachments
2085 def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment: 2086 """ 2087 Create an attachment on this issue. 2088 2089 https://hub.allspice.io/api/swagger#/issue/issueCreateIssueAttachment 2090 2091 :param file: The file to attach. This should be a file-like object. 2092 :param name: The name of the file. If not provided, the name of the file will be 2093 used. 2094 :return: The created attachment. 2095 """ 2096 2097 args: dict[str, Any] = { 2098 "files": {"attachment": file}, 2099 } 2100 if name is not None: 2101 args["params"] = {"name": name} 2102 2103 result = self.allspice_client.requests_post( 2104 self.GET_ATTACHMENTS.format( 2105 owner=self.owner.username, repo=self.repository.name, index=self.number 2106 ), 2107 **args, 2108 ) 2109 2110 return Attachment.parse_response(self.allspice_client, result)
Create an attachment on this issue.
allspice.allspice.io/api/swagger#/issue/issueCreateIssueAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateIssueAttachment
Parameters
- file: The file to attach. This should be a file-like object.
- name: The name of the file. If not provided, the name of the file will be used.
Returns
The created attachment.
1479class Milestone(ApiObject): 1480 allow_merge_commits: Any 1481 allow_rebase: Any 1482 allow_rebase_explicit: Any 1483 allow_squash_merge: Any 1484 archived: Any 1485 closed_at: Any 1486 closed_issues: int 1487 created_at: str 1488 default_branch: Any 1489 description: str 1490 due_on: Any 1491 has_issues: Any 1492 has_pull_requests: Any 1493 has_wiki: Any 1494 id: int 1495 ignore_whitespace_conflicts: Any 1496 name: Any 1497 open_issues: int 1498 private: Any 1499 state: str 1500 title: str 1501 updated_at: str 1502 website: Any 1503 1504 API_OBJECT = """/repos/{owner}/{repo}/milestones/{number}""" # <owner, repo> 1505 1506 def __init__(self, allspice_client): 1507 super().__init__(allspice_client) 1508 1509 def __eq__(self, other): 1510 if not isinstance(other, Milestone): 1511 return False 1512 return self.allspice_client == other.allspice_client and self.id == other.id 1513 1514 def __hash__(self): 1515 return hash(self.allspice_client) ^ hash(self.id) 1516 1517 _fields_to_parsers: ClassVar[dict] = { 1518 "closed_at": lambda _, t: Util.convert_time(t), 1519 "due_on": lambda _, t: Util.convert_time(t), 1520 } 1521 1522 _patchable_fields: ClassVar[set[str]] = { 1523 "allow_merge_commits", 1524 "allow_rebase", 1525 "allow_rebase_explicit", 1526 "allow_squash_merge", 1527 "archived", 1528 "default_branch", 1529 "description", 1530 "has_issues", 1531 "has_pull_requests", 1532 "has_wiki", 1533 "ignore_whitespace_conflicts", 1534 "name", 1535 "private", 1536 "website", 1537 } 1538 1539 @classmethod 1540 def request(cls, allspice_client, owner: str, repo: str, number: str): 1541 return cls._request(allspice_client, {"owner": owner, "repo": repo, "number": number})
Common base class for all non-exit exceptions.
36class Organization(ApiObject): 37 """see https://hub.allspice.io/api/swagger#/organization/orgGetAll""" 38 39 active: Optional[bool] 40 avatar_url: str 41 created: Optional[str] 42 description: str 43 email: str 44 followers_count: Optional[int] 45 following_count: Optional[int] 46 full_name: str 47 html_url: Optional[str] 48 id: int 49 is_admin: Optional[bool] 50 language: Optional[str] 51 last_login: Optional[str] 52 location: str 53 login: Optional[str] 54 login_name: Optional[str] 55 name: Optional[str] 56 prohibit_login: Optional[bool] 57 repo_admin_change_team_access: Optional[bool] 58 restricted: Optional[bool] 59 source_id: Optional[int] 60 starred_repos_count: Optional[int] 61 username: str 62 visibility: str 63 website: str 64 65 API_OBJECT = """/orgs/{name}""" # <org> 66 ORG_REPOS_REQUEST = """/orgs/%s/repos""" # <org> 67 ORG_TEAMS_REQUEST = """/orgs/%s/teams""" # <org> 68 ORG_TEAMS_CREATE = """/orgs/%s/teams""" # <org> 69 ORG_GET_MEMBERS = """/orgs/%s/members""" # <org> 70 ORG_IS_MEMBER = """/orgs/%s/members/%s""" # <org>, <username> 71 ORG_HEATMAP = """/users/%s/heatmap""" # <username> 72 73 def __init__(self, allspice_client): 74 super().__init__(allspice_client) 75 76 def __eq__(self, other): 77 if not isinstance(other, Organization): 78 return False 79 return self.allspice_client == other.allspice_client and self.name == other.name 80 81 def __hash__(self): 82 return hash(self.allspice_client) ^ hash(self.name) 83 84 @classmethod 85 def request(cls, allspice_client, name: str) -> Self: 86 return cls._request(allspice_client, {"name": name}) 87 88 @classmethod 89 def parse_response(cls, allspice_client, result) -> "Organization": 90 api_object = super().parse_response(allspice_client, result) 91 # add "name" field to make this behave similar to users for gitea < 1.18 92 # also necessary for repository-owner when org is repo owner 93 if not hasattr(api_object, "name"): 94 Organization._add_read_property("name", result["username"], api_object) 95 return api_object 96 97 _patchable_fields: ClassVar[set[str]] = { 98 "description", 99 "full_name", 100 "location", 101 "visibility", 102 "website", 103 } 104 105 def commit(self): 106 args = {"name": self.name} 107 self._commit(args) 108 109 def create_repo( 110 self, 111 repoName: str, 112 description: str = "", 113 private: bool = False, 114 autoInit=True, 115 gitignores: Optional[str] = None, 116 license: Optional[str] = None, 117 readme: str = "Default", 118 issue_labels: Optional[str] = None, 119 default_branch="master", 120 ): 121 """Create an organization Repository 122 123 Throws: 124 AlreadyExistsException: If the Repository exists already. 125 Exception: If something else went wrong. 126 """ 127 result = self.allspice_client.requests_post( 128 f"/orgs/{self.name}/repos", 129 data={ 130 "name": repoName, 131 "description": description, 132 "private": private, 133 "auto_init": autoInit, 134 "gitignores": gitignores, 135 "license": license, 136 "issue_labels": issue_labels, 137 "readme": readme, 138 "default_branch": default_branch, 139 }, 140 ) 141 if "id" in result: 142 self.allspice_client.logger.info("Successfully created Repository %s " % result["name"]) 143 else: 144 self.allspice_client.logger.error(result["message"]) 145 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 146 return Repository.parse_response(self.allspice_client, result) 147 148 def get_repositories(self) -> List["Repository"]: 149 results = self.allspice_client.requests_get_paginated( 150 Organization.ORG_REPOS_REQUEST % self.username 151 ) 152 return [Repository.parse_response(self.allspice_client, result) for result in results] 153 154 def get_repository(self, name) -> "Repository": 155 repos = self.get_repositories() 156 for repo in repos: 157 if repo.name == name: 158 return repo 159 raise NotFoundException("Repository %s not existent in organization." % name) 160 161 def get_teams(self) -> List["Team"]: 162 results = self.allspice_client.requests_get(Organization.ORG_TEAMS_REQUEST % self.username) 163 teams = [Team.parse_response(self.allspice_client, result) for result in results] 164 # organisation seems to be missing using this request, so we add org manually 165 for t in teams: 166 setattr(t, "_organization", self) 167 return teams 168 169 def get_team(self, name) -> "Team": 170 teams = self.get_teams() 171 for team in teams: 172 if team.name == name: 173 return team 174 raise NotFoundException("Team not existent in organization.") 175 176 def create_team( 177 self, 178 name: str, 179 description: str = "", 180 permission: str = "read", 181 can_create_org_repo: bool = False, 182 includes_all_repositories: bool = False, 183 units=( 184 "repo.code", 185 "repo.issues", 186 "repo.ext_issues", 187 "repo.wiki", 188 "repo.pulls", 189 "repo.releases", 190 "repo.ext_wiki", 191 ), 192 units_map={}, 193 ) -> "Team": 194 """Alias for AllSpice#create_team""" 195 # TODO: Move AllSpice#create_team to Organization#create_team and 196 # deprecate AllSpice#create_team. 197 return self.allspice_client.create_team( 198 org=self, 199 name=name, 200 description=description, 201 permission=permission, 202 can_create_org_repo=can_create_org_repo, 203 includes_all_repositories=includes_all_repositories, 204 units=units, 205 units_map=units_map, 206 ) 207 208 def get_members(self) -> List["User"]: 209 results = self.allspice_client.requests_get(Organization.ORG_GET_MEMBERS % self.username) 210 return [User.parse_response(self.allspice_client, result) for result in results] 211 212 def is_member(self, username) -> bool: 213 if isinstance(username, User): 214 username = username.username 215 try: 216 # returns 204 if its ok, 404 if its not 217 self.allspice_client.requests_get( 218 Organization.ORG_IS_MEMBER % (self.username, username) 219 ) 220 return True 221 except Exception: 222 return False 223 224 def remove_member(self, user: "User"): 225 path = f"/orgs/{self.username}/members/{user.username}" 226 self.allspice_client.requests_delete(path) 227 228 def delete(self): 229 """Delete this Organization. Invalidates this Objects data. Also deletes all Repositories owned by the User""" 230 for repo in self.get_repositories(): 231 repo.delete() 232 self.allspice_client.requests_delete(Organization.API_OBJECT.format(name=self.username)) 233 self.deleted = True 234 235 def get_heatmap(self) -> List[Tuple[datetime, int]]: 236 results = self.allspice_client.requests_get(User.USER_HEATMAP % self.username) 237 results = [ 238 (datetime.fromtimestamp(result["timestamp"]), result["contributions"]) 239 for result in results 240 ] 241 return results
see allspice.allspice.io/api/swagger#/organization/orgGetAll">https://huballspice.allspice.io/api/swagger#/organization/orgGetAll
88 @classmethod 89 def parse_response(cls, allspice_client, result) -> "Organization": 90 api_object = super().parse_response(allspice_client, result) 91 # add "name" field to make this behave similar to users for gitea < 1.18 92 # also necessary for repository-owner when org is repo owner 93 if not hasattr(api_object, "name"): 94 Organization._add_read_property("name", result["username"], api_object) 95 return api_object
109 def create_repo( 110 self, 111 repoName: str, 112 description: str = "", 113 private: bool = False, 114 autoInit=True, 115 gitignores: Optional[str] = None, 116 license: Optional[str] = None, 117 readme: str = "Default", 118 issue_labels: Optional[str] = None, 119 default_branch="master", 120 ): 121 """Create an organization Repository 122 123 Throws: 124 AlreadyExistsException: If the Repository exists already. 125 Exception: If something else went wrong. 126 """ 127 result = self.allspice_client.requests_post( 128 f"/orgs/{self.name}/repos", 129 data={ 130 "name": repoName, 131 "description": description, 132 "private": private, 133 "auto_init": autoInit, 134 "gitignores": gitignores, 135 "license": license, 136 "issue_labels": issue_labels, 137 "readme": readme, 138 "default_branch": default_branch, 139 }, 140 ) 141 if "id" in result: 142 self.allspice_client.logger.info("Successfully created Repository %s " % result["name"]) 143 else: 144 self.allspice_client.logger.error(result["message"]) 145 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 146 return Repository.parse_response(self.allspice_client, result)
Create an organization Repository
Throws: AlreadyExistsException: If the Repository exists already. Exception: If something else went wrong.
161 def get_teams(self) -> List["Team"]: 162 results = self.allspice_client.requests_get(Organization.ORG_TEAMS_REQUEST % self.username) 163 teams = [Team.parse_response(self.allspice_client, result) for result in results] 164 # organisation seems to be missing using this request, so we add org manually 165 for t in teams: 166 setattr(t, "_organization", self) 167 return teams
176 def create_team( 177 self, 178 name: str, 179 description: str = "", 180 permission: str = "read", 181 can_create_org_repo: bool = False, 182 includes_all_repositories: bool = False, 183 units=( 184 "repo.code", 185 "repo.issues", 186 "repo.ext_issues", 187 "repo.wiki", 188 "repo.pulls", 189 "repo.releases", 190 "repo.ext_wiki", 191 ), 192 units_map={}, 193 ) -> "Team": 194 """Alias for AllSpice#create_team""" 195 # TODO: Move AllSpice#create_team to Organization#create_team and 196 # deprecate AllSpice#create_team. 197 return self.allspice_client.create_team( 198 org=self, 199 name=name, 200 description=description, 201 permission=permission, 202 can_create_org_repo=can_create_org_repo, 203 includes_all_repositories=includes_all_repositories, 204 units=units, 205 units_map=units_map, 206 )
Alias for AllSpice#create_team
212 def is_member(self, username) -> bool: 213 if isinstance(username, User): 214 username = username.username 215 try: 216 # returns 204 if its ok, 404 if its not 217 self.allspice_client.requests_get( 218 Organization.ORG_IS_MEMBER % (self.username, username) 219 ) 220 return True 221 except Exception: 222 return False
228 def delete(self): 229 """Delete this Organization. Invalidates this Objects data. Also deletes all Repositories owned by the User""" 230 for repo in self.get_repositories(): 231 repo.delete() 232 self.allspice_client.requests_delete(Organization.API_OBJECT.format(name=self.username)) 233 self.deleted = True
Delete this Organization. Invalidates this Objects data. Also deletes all Repositories owned by the User
2665class Release(ApiObject): 2666 """ 2667 A release on a repo. 2668 """ 2669 2670 assets: List[Union[Any, Dict[str, Union[int, str]], "ReleaseAsset"]] 2671 author: User 2672 body: str 2673 created_at: str 2674 draft: bool 2675 html_url: str 2676 id: int 2677 name: str 2678 prerelease: bool 2679 published_at: str 2680 repo: Optional["Repository"] 2681 repository: Optional["Repository"] 2682 tag_name: str 2683 tarball_url: str 2684 target_commitish: str 2685 upload_url: str 2686 url: str 2687 zipball_url: str 2688 2689 API_OBJECT = "/repos/{owner}/{repo}/releases/{id}" 2690 RELEASE_CREATE_ASSET = "/repos/{owner}/{repo}/releases/{id}/assets" 2691 # Note that we don't strictly need the get_assets route, as the release 2692 # object already contains the assets. 2693 2694 def __init__(self, allspice_client): 2695 super().__init__(allspice_client) 2696 2697 def __eq__(self, other): 2698 if not isinstance(other, Release): 2699 return False 2700 return self.repo == other.repo and self.id == other.id 2701 2702 def __hash__(self): 2703 return hash(self.repo) ^ hash(self.id) 2704 2705 _fields_to_parsers: ClassVar[dict] = { 2706 "author": lambda allspice_client, author: User.parse_response(allspice_client, author), 2707 } 2708 _patchable_fields: ClassVar[set[str]] = { 2709 "body", 2710 "draft", 2711 "name", 2712 "prerelease", 2713 "tag_name", 2714 "target_commitish", 2715 } 2716 2717 @classmethod 2718 def parse_response(cls, allspice_client, result, repo) -> Release: 2719 release = super().parse_response(allspice_client, result) 2720 Release._add_read_property("repository", repo, release) 2721 # For legacy reasons 2722 Release._add_read_property("repo", repo, release) 2723 setattr( 2724 release, 2725 "_assets", 2726 [ 2727 ReleaseAsset.parse_response(allspice_client, asset, release) 2728 for asset in result["assets"] 2729 ], 2730 ) 2731 return release 2732 2733 @classmethod 2734 def request( 2735 cls, 2736 allspice_client, 2737 owner: str, 2738 repo: str, 2739 id: Optional[int] = None, 2740 ) -> Release: 2741 args = {"owner": owner, "repo": repo, "id": id} 2742 release_response = cls._get_gitea_api_object(allspice_client, args) 2743 repository = Repository.request(allspice_client, owner, repo) 2744 release = cls.parse_response(allspice_client, release_response, repository) 2745 return release 2746 2747 def commit(self): 2748 if self.repo is None: 2749 raise ValueError("Cannot commit a release without a repository.") 2750 2751 args = {"owner": self.repo.owner.username, "repo": self.repo.name, "id": self.id} 2752 self._commit(args) 2753 2754 def create_asset(self, file: IO, name: Optional[str] = None) -> ReleaseAsset: 2755 """ 2756 Create an asset for this release. 2757 2758 https://hub.allspice.io/api/swagger#/repository/repoCreateReleaseAsset 2759 2760 :param file: The file to upload. This should be a file-like object. 2761 :param name: The name of the file. 2762 :return: The created asset. 2763 """ 2764 2765 if self.repo is None: 2766 raise ValueError("Cannot commit a release without a repository.") 2767 2768 args: dict[str, Any] = {"files": {"attachment": file}} 2769 if name is not None: 2770 args["params"] = {"name": name} 2771 2772 result = self.allspice_client.requests_post( 2773 self.RELEASE_CREATE_ASSET.format( 2774 owner=self.repo.owner.username, 2775 repo=self.repo.name, 2776 id=self.id, 2777 ), 2778 **args, 2779 ) 2780 return ReleaseAsset.parse_response(self.allspice_client, result, self) 2781 2782 def delete(self): 2783 if self.repo is None: 2784 raise ValueError("Cannot commit a release without a repository.") 2785 2786 args = {"owner": self.repo.owner.username, "repo": self.repo.name, "id": self.id} 2787 self.allspice_client.requests_delete(self.API_OBJECT.format(**args)) 2788 self.deleted = True
A release on a repo.
2717 @classmethod 2718 def parse_response(cls, allspice_client, result, repo) -> Release: 2719 release = super().parse_response(allspice_client, result) 2720 Release._add_read_property("repository", repo, release) 2721 # For legacy reasons 2722 Release._add_read_property("repo", repo, release) 2723 setattr( 2724 release, 2725 "_assets", 2726 [ 2727 ReleaseAsset.parse_response(allspice_client, asset, release) 2728 for asset in result["assets"] 2729 ], 2730 ) 2731 return release
2733 @classmethod 2734 def request( 2735 cls, 2736 allspice_client, 2737 owner: str, 2738 repo: str, 2739 id: Optional[int] = None, 2740 ) -> Release: 2741 args = {"owner": owner, "repo": repo, "id": id} 2742 release_response = cls._get_gitea_api_object(allspice_client, args) 2743 repository = Repository.request(allspice_client, owner, repo) 2744 release = cls.parse_response(allspice_client, release_response, repository) 2745 return release
2754 def create_asset(self, file: IO, name: Optional[str] = None) -> ReleaseAsset: 2755 """ 2756 Create an asset for this release. 2757 2758 https://hub.allspice.io/api/swagger#/repository/repoCreateReleaseAsset 2759 2760 :param file: The file to upload. This should be a file-like object. 2761 :param name: The name of the file. 2762 :return: The created asset. 2763 """ 2764 2765 if self.repo is None: 2766 raise ValueError("Cannot commit a release without a repository.") 2767 2768 args: dict[str, Any] = {"files": {"attachment": file}} 2769 if name is not None: 2770 args["params"] = {"name": name} 2771 2772 result = self.allspice_client.requests_post( 2773 self.RELEASE_CREATE_ASSET.format( 2774 owner=self.repo.owner.username, 2775 repo=self.repo.name, 2776 id=self.id, 2777 ), 2778 **args, 2779 ) 2780 return ReleaseAsset.parse_response(self.allspice_client, result, self)
Create an asset for this release.
allspice.allspice.io/api/swagger#/repository/repoCreateReleaseAsset">https://huballspice.allspice.io/api/swagger#/repository/repoCreateReleaseAsset
Parameters
- file: The file to upload. This should be a file-like object.
- name: The name of the file.
Returns
The created asset.
2782 def delete(self): 2783 if self.repo is None: 2784 raise ValueError("Cannot commit a release without a repository.") 2785 2786 args = {"owner": self.repo.owner.username, "repo": self.repo.name, "id": self.id} 2787 self.allspice_client.requests_delete(self.API_OBJECT.format(**args)) 2788 self.deleted = True
480class Repository(ApiObject): 481 allow_fast_forward_only_merge: bool 482 allow_manual_merge: Any 483 allow_merge_commits: bool 484 allow_rebase: bool 485 allow_rebase_explicit: bool 486 allow_rebase_update: bool 487 allow_squash_merge: bool 488 archived: bool 489 archived_at: str 490 autodetect_manual_merge: Any 491 avatar_url: str 492 clone_url: str 493 created_at: str 494 default_allow_maintainer_edit: bool 495 default_branch: str 496 default_delete_branch_after_merge: bool 497 default_merge_style: str 498 description: str 499 empty: bool 500 enable_prune: Any 501 external_tracker: Any 502 external_wiki: Any 503 fork: bool 504 forks_count: int 505 full_name: str 506 has_actions: bool 507 has_issues: bool 508 has_packages: bool 509 has_projects: bool 510 has_pull_requests: bool 511 has_releases: bool 512 has_wiki: bool 513 html_url: str 514 id: int 515 ignore_whitespace_conflicts: bool 516 internal: bool 517 internal_tracker: Dict[str, bool] 518 language: str 519 languages_url: str 520 licenses: Any 521 link: str 522 mirror: bool 523 mirror_interval: str 524 mirror_updated: str 525 name: str 526 object_format_name: str 527 open_issues_count: int 528 open_pr_counter: int 529 original_url: str 530 owner: Union["User", "Organization"] 531 parent: Any 532 permissions: Dict[str, bool] 533 private: bool 534 projects_mode: str 535 release_counter: int 536 repo_transfer: Any 537 size: int 538 ssh_url: str 539 stars_count: int 540 template: bool 541 topics: List[Union[Any, str]] 542 updated_at: datetime 543 url: str 544 watchers_count: int 545 website: str 546 547 API_OBJECT = """/repos/{owner}/{name}""" # <owner>, <reponame> 548 REPO_IS_COLLABORATOR = """/repos/%s/%s/collaborators/%s""" # <owner>, <reponame>, <username> 549 REPO_SEARCH = """/repos/search/""" 550 REPO_BRANCHES = """/repos/%s/%s/branches""" # <owner>, <reponame> 551 REPO_BRANCH = """/repos/{owner}/{repo}/branches/{branch}""" 552 REPO_ISSUES = """/repos/{owner}/{repo}/issues""" # <owner, reponame> 553 REPO_DESIGN_REVIEWS = """/repos/{owner}/{repo}/pulls""" 554 REPO_DELETE = """/repos/%s/%s""" # <owner>, <reponame> 555 REPO_TIMES = """/repos/%s/%s/times""" # <owner>, <reponame> 556 REPO_USER_TIME = """/repos/%s/%s/times/%s""" # <owner>, <reponame>, <username> 557 REPO_COMMITS = "/repos/%s/%s/commits" # <owner>, <reponame> 558 REPO_TRANSFER = "/repos/{owner}/{repo}/transfer" 559 REPO_MILESTONES = """/repos/{owner}/{repo}/milestones""" 560 REPO_GET_ARCHIVE = "/repos/{owner}/{repo}/archive/{ref}.{format}" 561 REPO_GET_ALLSPICE_JSON = "/repos/{owner}/{repo}/allspice_generated/json/{content}" 562 REPO_GET_ALLSPICE_SVG = "/repos/{owner}/{repo}/allspice_generated/svg/{content}" 563 REPO_GET_ALLSPICE_PROJECT = "/repos/{owner}/{repo}/allspice_generated/project/{content}" 564 REPO_GET_TOPICS = "/repos/{owner}/{repo}/topics" 565 REPO_ADD_TOPIC = "/repos/{owner}/{repo}/topics/{topic}" 566 REPO_GET_RELEASES = "/repos/{owner}/{repo}/releases" 567 REPO_GET_LATEST_RELEASE = "/repos/{owner}/{repo}/releases/latest" 568 REPO_GET_RELEASE_BY_TAG = "/repos/{owner}/{repo}/releases/tags/{tag}" 569 REPO_GET_COMMIT_STATUS = "/repos/{owner}/{repo}/statuses/{sha}" 570 REPO_GET_RAW_FILE = "/repos/{owner}/{repo}/raw/{path}" 571 REPO_GET_TREE = "/repos/{owner}/{repo}/git/trees/{ref}" 572 573 class ArchiveFormat(Enum): 574 """ 575 Archive formats for Repository.get_archive 576 """ 577 578 TAR = "tar.gz" 579 ZIP = "zip" 580 581 class CommitStatusSort(Enum): 582 """ 583 Sort order for Repository.get_commit_status 584 """ 585 586 OLDEST = "oldest" 587 RECENT_UPDATE = "recentupdate" 588 LEAST_UPDATE = "leastupdate" 589 LEAST_INDEX = "leastindex" 590 HIGHEST_INDEX = "highestindex" 591 592 def __init__(self, allspice_client): 593 super().__init__(allspice_client) 594 595 def __eq__(self, other): 596 if not isinstance(other, Repository): 597 return False 598 return self.owner == other.owner and self.name == other.name 599 600 def __hash__(self): 601 return hash(self.owner) ^ hash(self.name) 602 603 _fields_to_parsers: ClassVar[dict] = { 604 # dont know how to tell apart user and org as owner except form email being empty. 605 "owner": lambda allspice_client, r: ( 606 Organization.parse_response(allspice_client, r) 607 if r["email"] == "" 608 else User.parse_response(allspice_client, r) 609 ), 610 "updated_at": lambda _, t: Util.convert_time(t), 611 } 612 613 @classmethod 614 def request( 615 cls, 616 allspice_client, 617 owner: str, 618 name: str, 619 ) -> Repository: 620 return cls._request(allspice_client, {"owner": owner, "name": name}) 621 622 @classmethod 623 def search( 624 cls, 625 allspice_client, 626 query: Optional[str] = None, 627 topic: bool = False, 628 include_description: bool = False, 629 user: Optional[User] = None, 630 owner_to_prioritize: Union[User, Organization, None] = None, 631 ) -> list[Repository]: 632 """ 633 Search for repositories. 634 635 See https://hub.allspice.io/api/swagger#/repository/repoSearch 636 637 :param query: The query string to search for 638 :param topic: If true, the query string will only be matched against the 639 repository's topic. 640 :param include_description: If true, the query string will be matched 641 against the repository's description as well. 642 :param user: If specified, only repositories that this user owns or 643 contributes to will be searched. 644 :param owner_to_prioritize: If specified, repositories owned by the 645 given entity will be prioritized in the search. 646 :returns: All repositories matching the query. If there are many 647 repositories matching this query, this may take some time. 648 """ 649 650 params = {} 651 652 if query is not None: 653 params["q"] = query 654 if topic: 655 params["topic"] = topic 656 if include_description: 657 params["include_description"] = include_description 658 if user is not None: 659 params["user"] = user.id 660 if owner_to_prioritize is not None: 661 params["owner_to_prioritize"] = owner_to_prioritize.id 662 663 responses = allspice_client.requests_get_paginated(cls.REPO_SEARCH, params=params) 664 665 return [Repository.parse_response(allspice_client, response) for response in responses] 666 667 _patchable_fields: ClassVar[set[str]] = { 668 "allow_manual_merge", 669 "allow_merge_commits", 670 "allow_rebase", 671 "allow_rebase_explicit", 672 "allow_rebase_update", 673 "allow_squash_merge", 674 "archived", 675 "autodetect_manual_merge", 676 "default_branch", 677 "default_delete_branch_after_merge", 678 "default_merge_style", 679 "description", 680 "enable_prune", 681 "external_tracker", 682 "external_wiki", 683 "has_actions", 684 "has_issues", 685 "has_projects", 686 "has_pull_requests", 687 "has_wiki", 688 "ignore_whitespace_conflicts", 689 "internal_tracker", 690 "mirror_interval", 691 "name", 692 "private", 693 "template", 694 "website", 695 } 696 697 def commit(self): 698 args = {"owner": self.owner.username, "name": self.name} 699 self._commit(args) 700 701 def get_branches(self) -> List["Branch"]: 702 """Get all the Branches of this Repository.""" 703 704 results = self.allspice_client.requests_get_paginated( 705 Repository.REPO_BRANCHES % (self.owner.username, self.name) 706 ) 707 return [Branch.parse_response(self.allspice_client, result) for result in results] 708 709 def get_branch(self, name: str) -> "Branch": 710 """Get a specific Branch of this Repository.""" 711 result = self.allspice_client.requests_get( 712 Repository.REPO_BRANCH.format(owner=self.owner.username, repo=self.name, branch=name) 713 ) 714 return Branch.parse_response(self.allspice_client, result) 715 716 def add_branch(self, create_from: Ref, newname: str) -> "Branch": 717 """Add a branch to the repository""" 718 # Note: will only work with gitea 1.13 or higher! 719 720 ref_name = Util.data_params_for_ref(create_from) 721 if "ref" not in ref_name: 722 raise ValueError("create_from must be a Branch, Commit or string") 723 ref_name = ref_name["ref"] 724 725 data = {"new_branch_name": newname, "old_ref_name": ref_name} 726 result = self.allspice_client.requests_post( 727 Repository.REPO_BRANCHES % (self.owner.username, self.name), data=data 728 ) 729 return Branch.parse_response(self.allspice_client, result) 730 731 def get_issues( 732 self, 733 state: Literal["open", "closed", "all"] = "all", 734 search_query: Optional[str] = None, 735 labels: Optional[List[str]] = None, 736 milestones: Optional[List[Union[Milestone, str]]] = None, 737 assignee: Optional[Union[User, str]] = None, 738 since: Optional[datetime] = None, 739 before: Optional[datetime] = None, 740 ) -> List["Issue"]: 741 """ 742 Get all Issues of this Repository (open and closed) 743 744 https://hub.allspice.io/api/swagger#/repository/repoListIssues 745 746 All params of this method are optional filters. If you don't specify a filter, it 747 will not be applied. 748 749 :param state: The state of the Issues to get. If None, all Issues are returned. 750 :param search_query: Filter issues by text. This is equivalent to searching for 751 `search_query` in the Issues on the web interface. 752 :param labels: Filter issues by labels. 753 :param milestones: Filter issues by milestones. 754 :param assignee: Filter issues by the assigned user. 755 :param since: Filter issues by the date they were created. 756 :param before: Filter issues by the date they were created. 757 :return: A list of Issues. 758 """ 759 760 data = { 761 "state": state, 762 } 763 if search_query: 764 data["q"] = search_query 765 if labels: 766 data["labels"] = ",".join(labels) 767 if milestones: 768 data["milestone"] = ",".join( 769 [ 770 milestone.name if isinstance(milestone, Milestone) else milestone 771 for milestone in milestones 772 ] 773 ) 774 if assignee: 775 if isinstance(assignee, User): 776 data["assignee"] = assignee.username 777 else: 778 data["assignee"] = assignee 779 if since: 780 data["since"] = Util.format_time(since) 781 if before: 782 data["before"] = Util.format_time(before) 783 784 results = self.allspice_client.requests_get_paginated( 785 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 786 params=data, 787 ) 788 789 issues = [] 790 for result in results: 791 issue = Issue.parse_response(self.allspice_client, result) 792 # See Issue.request 793 setattr(issue, "_repository", self) 794 # This is mostly for compatibility with an older implementation 795 Issue._add_read_property("repo", self, issue) 796 issues.append(issue) 797 798 return issues 799 800 def get_design_reviews( 801 self, 802 state: Literal["open", "closed", "all"] = "all", 803 milestone: Optional[Union[Milestone, str]] = None, 804 labels: Optional[List[str]] = None, 805 ) -> List["DesignReview"]: 806 """ 807 Get all Design Reviews of this Repository. 808 809 https://hub.allspice.io/api/swagger#/repository/repoListPullRequests 810 811 :param state: The state of the Design Reviews to get. If None, all Design Reviews 812 are returned. 813 :param milestone: The milestone of the Design Reviews to get. 814 :param labels: A list of label IDs to filter DRs by. 815 :return: A list of Design Reviews. 816 """ 817 818 params = { 819 "state": state, 820 } 821 if milestone: 822 if isinstance(milestone, Milestone): 823 params["milestone"] = milestone.name 824 else: 825 params["milestone"] = milestone 826 if labels: 827 params["labels"] = ",".join(labels) 828 829 results = self.allspice_client.requests_get_paginated( 830 self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name), 831 params=params, 832 ) 833 return [DesignReview.parse_response(self.allspice_client, result) for result in results] 834 835 def get_commits( 836 self, 837 sha: Optional[str] = None, 838 path: Optional[str] = None, 839 stat: bool = True, 840 ) -> List["Commit"]: 841 """ 842 Get all the Commits of this Repository. 843 844 https://hub.allspice.io/api/swagger#/repository/repoGetAllCommits 845 846 :param sha: The SHA of the commit to start listing commits from. 847 :param path: filepath of a file/dir. 848 :param stat: Include the number of additions and deletions in the response. 849 Disable for speedup. 850 :return: A list of Commits. 851 """ 852 853 data = {} 854 if sha: 855 data["sha"] = sha 856 if path: 857 data["path"] = path 858 if not stat: 859 data["stat"] = False 860 861 try: 862 results = self.allspice_client.requests_get_paginated( 863 Repository.REPO_COMMITS % (self.owner.username, self.name), 864 params=data, 865 ) 866 except ConflictException as err: 867 logging.warning(err) 868 logging.warning("Repository %s/%s is Empty" % (self.owner.username, self.name)) 869 results = [] 870 return [Commit.parse_response(self.allspice_client, result) for result in results] 871 872 def get_issues_state(self, state) -> List["Issue"]: 873 """ 874 DEPRECATED: Use get_issues() instead. 875 876 Get issues of state Issue.open or Issue.closed of a repository. 877 """ 878 879 assert state in [Issue.OPENED, Issue.CLOSED] 880 issues = [] 881 data = {"state": state} 882 results = self.allspice_client.requests_get_paginated( 883 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 884 params=data, 885 ) 886 for result in results: 887 issue = Issue.parse_response(self.allspice_client, result) 888 # adding data not contained in the issue response 889 # See Issue.request() 890 setattr(issue, "_repository", self) 891 Issue._add_read_property("repo", self, issue) 892 Issue._add_read_property("owner", self.owner, issue) 893 issues.append(issue) 894 return issues 895 896 def get_times(self): 897 results = self.allspice_client.requests_get( 898 Repository.REPO_TIMES % (self.owner.username, self.name) 899 ) 900 return results 901 902 def get_user_time(self, username) -> float: 903 if isinstance(username, User): 904 username = username.username 905 results = self.allspice_client.requests_get( 906 Repository.REPO_USER_TIME % (self.owner.username, self.name, username) 907 ) 908 time = sum(r["time"] for r in results) 909 return time 910 911 def get_full_name(self) -> str: 912 return self.owner.username + "/" + self.name 913 914 def create_issue(self, title, assignees=frozenset(), description="") -> ApiObject: 915 data = { 916 "assignees": assignees, 917 "body": description, 918 "closed": False, 919 "title": title, 920 } 921 result = self.allspice_client.requests_post( 922 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 923 data=data, 924 ) 925 926 issue = Issue.parse_response(self.allspice_client, result) 927 setattr(issue, "_repository", self) 928 Issue._add_read_property("repo", self, issue) 929 return issue 930 931 def create_design_review( 932 self, 933 title: str, 934 head: Union[Branch, str], 935 base: Union[Branch, str], 936 assignees: Optional[Set[Union[User, str]]] = None, 937 body: Optional[str] = None, 938 due_date: Optional[datetime] = None, 939 milestone: Optional["Milestone"] = None, 940 ) -> "DesignReview": 941 """ 942 Create a new Design Review. 943 944 See https://hub.allspice.io/api/swagger#/repository/repoCreatePullRequest 945 946 :param title: Title of the Design Review 947 :param head: Branch or name of the branch to merge into the base branch 948 :param base: Branch or name of the branch to merge into 949 :param assignees: Optional. A list of users to assign this review. List can be of 950 User objects or of usernames. 951 :param body: An Optional Description for the Design Review. 952 :param due_date: An Optional Due date for the Design Review. 953 :param milestone: An Optional Milestone for the Design Review 954 :return: The created Design Review 955 """ 956 957 data: dict[str, Any] = { 958 "title": title, 959 } 960 961 if isinstance(head, Branch): 962 data["head"] = head.name 963 else: 964 data["head"] = head 965 if isinstance(base, Branch): 966 data["base"] = base.name 967 else: 968 data["base"] = base 969 if assignees: 970 data["assignees"] = [a.username if isinstance(a, User) else a for a in assignees] 971 if body: 972 data["body"] = body 973 if due_date: 974 data["due_date"] = Util.format_time(due_date) 975 if milestone: 976 data["milestone"] = milestone.id 977 978 result = self.allspice_client.requests_post( 979 self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name), 980 data=data, 981 ) 982 983 return DesignReview.parse_response(self.allspice_client, result) 984 985 def create_milestone( 986 self, 987 title: str, 988 description: str, 989 due_date: Optional[str] = None, 990 state: str = "open", 991 ) -> "Milestone": 992 url = Repository.REPO_MILESTONES.format(owner=self.owner.username, repo=self.name) 993 data = {"title": title, "description": description, "state": state} 994 if due_date: 995 data["due_date"] = due_date 996 result = self.allspice_client.requests_post(url, data=data) 997 return Milestone.parse_response(self.allspice_client, result) 998 999 def create_gitea_hook(self, hook_url: str, events: List[str]): 1000 url = f"/repos/{self.owner.username}/{self.name}/hooks" 1001 data = { 1002 "type": "gitea", 1003 "config": {"content_type": "json", "url": hook_url}, 1004 "events": events, 1005 "active": True, 1006 } 1007 return self.allspice_client.requests_post(url, data=data) 1008 1009 def list_hooks(self): 1010 url = f"/repos/{self.owner.username}/{self.name}/hooks" 1011 return self.allspice_client.requests_get(url) 1012 1013 def delete_hook(self, id: str): 1014 url = f"/repos/{self.owner.username}/{self.name}/hooks/{id}" 1015 self.allspice_client.requests_delete(url) 1016 1017 def is_collaborator(self, username) -> bool: 1018 if isinstance(username, User): 1019 username = username.username 1020 try: 1021 # returns 204 if its ok, 404 if its not 1022 self.allspice_client.requests_get( 1023 Repository.REPO_IS_COLLABORATOR % (self.owner.username, self.name, username) 1024 ) 1025 return True 1026 except Exception: 1027 return False 1028 1029 def get_users_with_access(self) -> Sequence[User]: 1030 url = f"/repos/{self.owner.username}/{self.name}/collaborators" 1031 response = self.allspice_client.requests_get(url) 1032 collabs = [User.parse_response(self.allspice_client, user) for user in response] 1033 if isinstance(self.owner, User): 1034 return [*collabs, self.owner] 1035 else: 1036 # owner must be org 1037 teams = self.owner.get_teams() 1038 for team in teams: 1039 team_repos = team.get_repos() 1040 if self.name in [n.name for n in team_repos]: 1041 collabs += team.get_members() 1042 return collabs 1043 1044 def remove_collaborator(self, user_name: str): 1045 url = f"/repos/{self.owner.username}/{self.name}/collaborators/{user_name}" 1046 self.allspice_client.requests_delete(url) 1047 1048 def transfer_ownership( 1049 self, 1050 new_owner: Union[User, Organization], 1051 new_teams: Set[Team] | FrozenSet[Team] = frozenset(), 1052 ): 1053 url = Repository.REPO_TRANSFER.format(owner=self.owner.username, repo=self.name) 1054 data: dict[str, Any] = {"new_owner": new_owner.username} 1055 if isinstance(new_owner, Organization): 1056 new_team_ids = [team.id for team in new_teams if team in new_owner.get_teams()] 1057 data["team_ids"] = new_team_ids 1058 self.allspice_client.requests_post(url, data=data) 1059 # TODO: make sure this instance is either updated or discarded 1060 1061 def get_git_content( 1062 self, 1063 ref: Optional["Ref"] = None, 1064 commit: "Optional[Commit]" = None, 1065 ) -> List[Content]: 1066 """ 1067 Get the metadata for all files in the root directory. 1068 1069 https://hub.allspice.io/api/swagger#/repository/repoGetContentsList 1070 1071 :param ref: branch or commit to get content from 1072 :param commit: commit to get content from (deprecated) 1073 """ 1074 url = f"/repos/{self.owner.username}/{self.name}/contents" 1075 data = Util.data_params_for_ref(ref or commit) 1076 1077 result = [ 1078 Content.parse_response(self.allspice_client, f) 1079 for f in self.allspice_client.requests_get(url, data) 1080 ] 1081 return result 1082 1083 def get_tree(self, ref: Optional[Ref] = None, recursive: bool = False) -> List[GitEntry]: 1084 """ 1085 Get the repository's tree on a given ref. 1086 1087 By default, this will only return the top-level entries in the tree. If you want 1088 to get the entire tree, set `recursive` to True. 1089 1090 :param ref: The ref to get the tree from. If not provided, the default branch is used. 1091 :param recursive: Whether to get the entire tree or just the top-level entries. 1092 """ 1093 1094 ref = Util.data_params_for_ref(ref).get("ref", self.default_branch) 1095 url = self.REPO_GET_TREE.format(owner=self.owner.username, repo=self.name, ref=ref) 1096 params = {"recursive": recursive} 1097 results = self.allspice_client.requests_get_paginated(url, params=params) 1098 return [GitEntry.parse_response(self.allspice_client, result) for result in results] 1099 1100 def get_file_content( 1101 self, 1102 content: Content, 1103 ref: Optional[Ref] = None, 1104 commit: Optional[Commit] = None, 1105 ) -> Union[str, List["Content"]]: 1106 """https://hub.allspice.io/api/swagger#/repository/repoGetContents""" 1107 url = f"/repos/{self.owner.username}/{self.name}/contents/{content.path}" 1108 data = Util.data_params_for_ref(ref or commit) 1109 1110 if content.type == Content.FILE: 1111 return self.allspice_client.requests_get(url, data)["content"] 1112 else: 1113 return [ 1114 Content.parse_response(self.allspice_client, f) 1115 for f in self.allspice_client.requests_get(url, data) 1116 ] 1117 1118 def get_raw_file( 1119 self, 1120 file_path: str, 1121 ref: Optional[Ref] = None, 1122 ) -> bytes: 1123 """ 1124 Get the raw, binary data of a single file. 1125 1126 Note 1: if the file you are requesting is a text file, you might want to 1127 use .decode() on the result to get a string. For example: 1128 1129 content = repo.get_raw_file("file.txt").decode("utf-8") 1130 1131 Note 2: this method will store the entire file in memory. If you want 1132 to download a large file, you might want to use `download_to_file` 1133 instead. 1134 1135 See https://hub.allspice.io/api/swagger#/repository/repoGetRawFile 1136 1137 :param file_path: The path to the file to get. 1138 :param ref: The branch or commit to get the file from. If not provided, 1139 the default branch is used. 1140 """ 1141 1142 url = self.REPO_GET_RAW_FILE.format( 1143 owner=self.owner.username, 1144 repo=self.name, 1145 path=file_path, 1146 ) 1147 params = Util.data_params_for_ref(ref) 1148 return self.allspice_client.requests_get_raw(url, params=params) 1149 1150 def download_to_file( 1151 self, 1152 file_path: str, 1153 io: IO, 1154 ref: Optional[Ref] = None, 1155 ) -> None: 1156 """ 1157 Download the binary data of a file to a file-like object. 1158 1159 Example: 1160 1161 with open("schematic.DSN", "wb") as f: 1162 Repository.download_to_file("Schematics/my_schematic.DSN", f) 1163 1164 :param file_path: The path to the file in the repository from the root 1165 of the repository. 1166 :param io: The file-like object to write the data to. 1167 """ 1168 1169 url = self.allspice_client._AllSpice__get_url( 1170 self.REPO_GET_RAW_FILE.format( 1171 owner=self.owner.username, 1172 repo=self.name, 1173 path=file_path, 1174 ) 1175 ) 1176 params = Util.data_params_for_ref(ref) 1177 response = self.allspice_client.requests.get( 1178 url, 1179 params=params, 1180 headers=self.allspice_client.headers, 1181 stream=True, 1182 ) 1183 1184 for chunk in response.iter_content(chunk_size=4096): 1185 if chunk: 1186 io.write(chunk) 1187 1188 def get_generated_json(self, content: Union[Content, str], ref: Optional[Ref] = None) -> dict: 1189 """ 1190 Get the json blob for a cad file if it exists, otherwise enqueue 1191 a new job and return a 503 status. 1192 1193 WARNING: This is still experimental and not recommended for critical 1194 applications. The structure and content of the returned dictionary can 1195 change at any time. 1196 1197 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON 1198 """ 1199 1200 if isinstance(content, Content): 1201 content = content.path 1202 1203 url = self.REPO_GET_ALLSPICE_JSON.format( 1204 owner=self.owner.username, 1205 repo=self.name, 1206 content=content, 1207 ) 1208 data = Util.data_params_for_ref(ref) 1209 return self.allspice_client.requests_get(url, data) 1210 1211 def get_generated_svg(self, content: Union[Content, str], ref: Optional[Ref] = None) -> bytes: 1212 """ 1213 Get the svg blob for a cad file if it exists, otherwise enqueue 1214 a new job and return a 503 status. 1215 1216 WARNING: This is still experimental and not yet recommended for 1217 critical applications. The content of the returned svg can change 1218 at any time. 1219 1220 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG 1221 """ 1222 1223 if isinstance(content, Content): 1224 content = content.path 1225 1226 url = self.REPO_GET_ALLSPICE_SVG.format( 1227 owner=self.owner.username, 1228 repo=self.name, 1229 content=content, 1230 ) 1231 data = Util.data_params_for_ref(ref) 1232 return self.allspice_client.requests_get_raw(url, data) 1233 1234 def get_generated_projectdata( 1235 self, content: Union[Content, str], ref: Optional[Ref] = None 1236 ) -> dict: 1237 """ 1238 Get the json project data based on the cad file provided 1239 1240 WARNING: This is still experimental and not yet recommended for 1241 critical applications. The content of the returned dictionary can change 1242 at any time. 1243 1244 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceProject 1245 """ 1246 if isinstance(content, Content): 1247 content = content.path 1248 1249 url = self.REPO_GET_ALLSPICE_PROJECT.format( 1250 owner=self.owner.username, 1251 repo=self.name, 1252 content=content, 1253 ) 1254 data = Util.data_params_for_ref(ref) 1255 return self.allspice_client.requests_get(url, data) 1256 1257 def create_file(self, file_path: str, content: str, data: Optional[dict] = None): 1258 """https://hub.allspice.io/api/swagger#/repository/repoCreateFile""" 1259 if not data: 1260 data = {} 1261 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1262 data.update({"content": content}) 1263 return self.allspice_client.requests_post(url, data) 1264 1265 def change_file(self, file_path: str, file_sha: str, content: str, data: Optional[dict] = None): 1266 """https://hub.allspice.io/api/swagger#/repository/repoCreateFile""" 1267 if not data: 1268 data = {} 1269 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1270 data.update({"sha": file_sha, "content": content}) 1271 return self.allspice_client.requests_put(url, data) 1272 1273 def delete_file(self, file_path: str, file_sha: str, data: Optional[dict] = None): 1274 """https://hub.allspice.io/api/swagger#/repository/repoDeleteFile""" 1275 if not data: 1276 data = {} 1277 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1278 data.update({"sha": file_sha}) 1279 return self.allspice_client.requests_delete(url, data) 1280 1281 def get_archive( 1282 self, 1283 ref: Ref = "main", 1284 archive_format: ArchiveFormat = ArchiveFormat.ZIP, 1285 ) -> bytes: 1286 """ 1287 Download all the files in a specific ref of a repository as a zip or tarball 1288 archive. 1289 1290 https://hub.allspice.io/api/swagger#/repository/repoGetArchive 1291 1292 :param ref: branch or commit to get content from, defaults to the "main" branch 1293 :param archive_format: zip or tar, defaults to zip 1294 """ 1295 1296 ref_string = Util.data_params_for_ref(ref)["ref"] 1297 url = self.REPO_GET_ARCHIVE.format( 1298 owner=self.owner.username, 1299 repo=self.name, 1300 ref=ref_string, 1301 format=archive_format.value, 1302 ) 1303 return self.allspice_client.requests_get_raw(url) 1304 1305 def get_topics(self) -> list[str]: 1306 """ 1307 Gets the list of topics on this repository. 1308 1309 See http://localhost:3000/api/swagger#/repository/repoListTopics 1310 """ 1311 1312 url = self.REPO_GET_TOPICS.format( 1313 owner=self.owner.username, 1314 repo=self.name, 1315 ) 1316 return self.allspice_client.requests_get(url)["topics"] 1317 1318 def add_topic(self, topic: str): 1319 """ 1320 Adds a topic to the repository. 1321 1322 See https://hub.allspice.io/api/swagger#/repository/repoAddTopic 1323 1324 :param topic: The topic to add. Topic names must consist only of 1325 lowercase letters, numnbers and dashes (-), and cannot start with 1326 dashes. Topic names also must be under 35 characters long. 1327 """ 1328 1329 url = self.REPO_ADD_TOPIC.format(owner=self.owner.username, repo=self.name, topic=topic) 1330 self.allspice_client.requests_put(url) 1331 1332 def create_release( 1333 self, 1334 tag_name: str, 1335 name: Optional[str] = None, 1336 body: Optional[str] = None, 1337 draft: bool = False, 1338 ): 1339 """ 1340 Create a release for this repository. The release will be created for 1341 the tag with the given name. If there is no tag with this name, create 1342 the tag first. 1343 1344 See https://hub.allspice.io/api/swagger#/repository/repoCreateRelease 1345 """ 1346 1347 url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name) 1348 data = { 1349 "tag_name": tag_name, 1350 "draft": draft, 1351 } 1352 if name is not None: 1353 data["name"] = name 1354 if body is not None: 1355 data["body"] = body 1356 response = self.allspice_client.requests_post(url, data) 1357 return Release.parse_response(self.allspice_client, response, self) 1358 1359 def get_releases( 1360 self, draft: Optional[bool] = None, pre_release: Optional[bool] = None 1361 ) -> List[Release]: 1362 """ 1363 Get the list of releases for this repository. 1364 1365 See https://hub.allspice.io/api/swagger#/repository/repoListReleases 1366 """ 1367 1368 data = {} 1369 1370 if draft is not None: 1371 data["draft"] = draft 1372 if pre_release is not None: 1373 data["pre-release"] = pre_release 1374 1375 url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name) 1376 responses = self.allspice_client.requests_get_paginated(url, params=data) 1377 1378 return [ 1379 Release.parse_response(self.allspice_client, response, self) for response in responses 1380 ] 1381 1382 def get_latest_release(self) -> Release: 1383 """ 1384 Get the latest release for this repository. 1385 1386 See https://hub.allspice.io/api/swagger#/repository/repoGetLatestRelease 1387 """ 1388 1389 url = self.REPO_GET_LATEST_RELEASE.format(owner=self.owner.username, repo=self.name) 1390 response = self.allspice_client.requests_get(url) 1391 release = Release.parse_response(self.allspice_client, response, self) 1392 return release 1393 1394 def get_release_by_tag(self, tag: str) -> Release: 1395 """ 1396 Get a release by its tag. 1397 1398 See https://hub.allspice.io/api/swagger#/repository/repoGetReleaseByTag 1399 """ 1400 1401 url = self.REPO_GET_RELEASE_BY_TAG.format( 1402 owner=self.owner.username, repo=self.name, tag=tag 1403 ) 1404 response = self.allspice_client.requests_get(url) 1405 release = Release.parse_response(self.allspice_client, response, self) 1406 return release 1407 1408 def get_commit_statuses( 1409 self, 1410 commit: Union[str, Commit], 1411 sort: Optional[CommitStatusSort] = None, 1412 state: Optional[CommitStatusState] = None, 1413 ) -> List[CommitStatus]: 1414 """ 1415 Get a list of statuses for a commit. 1416 1417 This is roughly equivalent to the Commit.get_statuses method, but this 1418 method allows you to sort and filter commits and is more convenient if 1419 you have a commit SHA and don't need to get the commit itself. 1420 1421 See https://hub.allspice.io/api/swagger#/repository/repoListStatuses 1422 """ 1423 1424 if isinstance(commit, Commit): 1425 commit = commit.sha 1426 1427 params = {} 1428 if sort is not None: 1429 params["sort"] = sort.value 1430 if state is not None: 1431 params["state"] = state.value 1432 1433 url = self.REPO_GET_COMMIT_STATUS.format( 1434 owner=self.owner.username, repo=self.name, sha=commit 1435 ) 1436 response = self.allspice_client.requests_get_paginated(url, params=params) 1437 return [CommitStatus.parse_response(self.allspice_client, status) for status in response] 1438 1439 def create_commit_status( 1440 self, 1441 commit: Union[str, Commit], 1442 context: Optional[str] = None, 1443 description: Optional[str] = None, 1444 state: Optional[CommitStatusState] = None, 1445 target_url: Optional[str] = None, 1446 ) -> CommitStatus: 1447 """ 1448 Create a status on a commit. 1449 1450 See https://hub.allspice.io/api/swagger#/repository/repoCreateStatus 1451 """ 1452 1453 if isinstance(commit, Commit): 1454 commit = commit.sha 1455 1456 data = {} 1457 if context is not None: 1458 data["context"] = context 1459 if description is not None: 1460 data["description"] = description 1461 if state is not None: 1462 data["state"] = state.value 1463 if target_url is not None: 1464 data["target_url"] = target_url 1465 1466 url = self.REPO_GET_COMMIT_STATUS.format( 1467 owner=self.owner.username, repo=self.name, sha=commit 1468 ) 1469 response = self.allspice_client.requests_post(url, data=data) 1470 return CommitStatus.parse_response(self.allspice_client, response) 1471 1472 def delete(self): 1473 self.allspice_client.requests_delete( 1474 Repository.REPO_DELETE % (self.owner.username, self.name) 1475 ) 1476 self.deleted = True
622 @classmethod 623 def search( 624 cls, 625 allspice_client, 626 query: Optional[str] = None, 627 topic: bool = False, 628 include_description: bool = False, 629 user: Optional[User] = None, 630 owner_to_prioritize: Union[User, Organization, None] = None, 631 ) -> list[Repository]: 632 """ 633 Search for repositories. 634 635 See https://hub.allspice.io/api/swagger#/repository/repoSearch 636 637 :param query: The query string to search for 638 :param topic: If true, the query string will only be matched against the 639 repository's topic. 640 :param include_description: If true, the query string will be matched 641 against the repository's description as well. 642 :param user: If specified, only repositories that this user owns or 643 contributes to will be searched. 644 :param owner_to_prioritize: If specified, repositories owned by the 645 given entity will be prioritized in the search. 646 :returns: All repositories matching the query. If there are many 647 repositories matching this query, this may take some time. 648 """ 649 650 params = {} 651 652 if query is not None: 653 params["q"] = query 654 if topic: 655 params["topic"] = topic 656 if include_description: 657 params["include_description"] = include_description 658 if user is not None: 659 params["user"] = user.id 660 if owner_to_prioritize is not None: 661 params["owner_to_prioritize"] = owner_to_prioritize.id 662 663 responses = allspice_client.requests_get_paginated(cls.REPO_SEARCH, params=params) 664 665 return [Repository.parse_response(allspice_client, response) for response in responses]
Search for repositories.
See allspice.allspice.io/api/swagger#/repository/repoSearch">https://huballspice.allspice.io/api/swagger#/repository/repoSearch
Parameters
- query: The query string to search for
- topic: If true, the query string will only be matched against the repository's topic.
- include_description: If true, the query string will be matched against the repository's description as well.
- user: If specified, only repositories that this user owns or contributes to will be searched.
- owner_to_prioritize: If specified, repositories owned by the given entity will be prioritized in the search. :returns: All repositories matching the query. If there are many repositories matching this query, this may take some time.
701 def get_branches(self) -> List["Branch"]: 702 """Get all the Branches of this Repository.""" 703 704 results = self.allspice_client.requests_get_paginated( 705 Repository.REPO_BRANCHES % (self.owner.username, self.name) 706 ) 707 return [Branch.parse_response(self.allspice_client, result) for result in results]
Get all the Branches of this Repository.
709 def get_branch(self, name: str) -> "Branch": 710 """Get a specific Branch of this Repository.""" 711 result = self.allspice_client.requests_get( 712 Repository.REPO_BRANCH.format(owner=self.owner.username, repo=self.name, branch=name) 713 ) 714 return Branch.parse_response(self.allspice_client, result)
Get a specific Branch of this Repository.
716 def add_branch(self, create_from: Ref, newname: str) -> "Branch": 717 """Add a branch to the repository""" 718 # Note: will only work with gitea 1.13 or higher! 719 720 ref_name = Util.data_params_for_ref(create_from) 721 if "ref" not in ref_name: 722 raise ValueError("create_from must be a Branch, Commit or string") 723 ref_name = ref_name["ref"] 724 725 data = {"new_branch_name": newname, "old_ref_name": ref_name} 726 result = self.allspice_client.requests_post( 727 Repository.REPO_BRANCHES % (self.owner.username, self.name), data=data 728 ) 729 return Branch.parse_response(self.allspice_client, result)
Add a branch to the repository
731 def get_issues( 732 self, 733 state: Literal["open", "closed", "all"] = "all", 734 search_query: Optional[str] = None, 735 labels: Optional[List[str]] = None, 736 milestones: Optional[List[Union[Milestone, str]]] = None, 737 assignee: Optional[Union[User, str]] = None, 738 since: Optional[datetime] = None, 739 before: Optional[datetime] = None, 740 ) -> List["Issue"]: 741 """ 742 Get all Issues of this Repository (open and closed) 743 744 https://hub.allspice.io/api/swagger#/repository/repoListIssues 745 746 All params of this method are optional filters. If you don't specify a filter, it 747 will not be applied. 748 749 :param state: The state of the Issues to get. If None, all Issues are returned. 750 :param search_query: Filter issues by text. This is equivalent to searching for 751 `search_query` in the Issues on the web interface. 752 :param labels: Filter issues by labels. 753 :param milestones: Filter issues by milestones. 754 :param assignee: Filter issues by the assigned user. 755 :param since: Filter issues by the date they were created. 756 :param before: Filter issues by the date they were created. 757 :return: A list of Issues. 758 """ 759 760 data = { 761 "state": state, 762 } 763 if search_query: 764 data["q"] = search_query 765 if labels: 766 data["labels"] = ",".join(labels) 767 if milestones: 768 data["milestone"] = ",".join( 769 [ 770 milestone.name if isinstance(milestone, Milestone) else milestone 771 for milestone in milestones 772 ] 773 ) 774 if assignee: 775 if isinstance(assignee, User): 776 data["assignee"] = assignee.username 777 else: 778 data["assignee"] = assignee 779 if since: 780 data["since"] = Util.format_time(since) 781 if before: 782 data["before"] = Util.format_time(before) 783 784 results = self.allspice_client.requests_get_paginated( 785 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 786 params=data, 787 ) 788 789 issues = [] 790 for result in results: 791 issue = Issue.parse_response(self.allspice_client, result) 792 # See Issue.request 793 setattr(issue, "_repository", self) 794 # This is mostly for compatibility with an older implementation 795 Issue._add_read_property("repo", self, issue) 796 issues.append(issue) 797 798 return issues
Get all Issues of this Repository (open and closed)
allspice.allspice.io/api/swagger#/repository/repoListIssues">https://huballspice.allspice.io/api/swagger#/repository/repoListIssues
All params of this method are optional filters. If you don't specify a filter, it will not be applied.
Parameters
- state: The state of the Issues to get. If None, all Issues are returned.
- search_query: Filter issues by text. This is equivalent to searching for
search_query
in the Issues on the web interface. - labels: Filter issues by labels.
- milestones: Filter issues by milestones.
- assignee: Filter issues by the assigned user.
- since: Filter issues by the date they were created.
- before: Filter issues by the date they were created.
Returns
A list of Issues.
800 def get_design_reviews( 801 self, 802 state: Literal["open", "closed", "all"] = "all", 803 milestone: Optional[Union[Milestone, str]] = None, 804 labels: Optional[List[str]] = None, 805 ) -> List["DesignReview"]: 806 """ 807 Get all Design Reviews of this Repository. 808 809 https://hub.allspice.io/api/swagger#/repository/repoListPullRequests 810 811 :param state: The state of the Design Reviews to get. If None, all Design Reviews 812 are returned. 813 :param milestone: The milestone of the Design Reviews to get. 814 :param labels: A list of label IDs to filter DRs by. 815 :return: A list of Design Reviews. 816 """ 817 818 params = { 819 "state": state, 820 } 821 if milestone: 822 if isinstance(milestone, Milestone): 823 params["milestone"] = milestone.name 824 else: 825 params["milestone"] = milestone 826 if labels: 827 params["labels"] = ",".join(labels) 828 829 results = self.allspice_client.requests_get_paginated( 830 self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name), 831 params=params, 832 ) 833 return [DesignReview.parse_response(self.allspice_client, result) for result in results]
Get all Design Reviews of this Repository.
allspice.allspice.io/api/swagger#/repository/repoListPullRequests">https://huballspice.allspice.io/api/swagger#/repository/repoListPullRequests
Parameters
- state: The state of the Design Reviews to get. If None, all Design Reviews are returned.
- milestone: The milestone of the Design Reviews to get.
- labels: A list of label IDs to filter DRs by.
Returns
A list of Design Reviews.
835 def get_commits( 836 self, 837 sha: Optional[str] = None, 838 path: Optional[str] = None, 839 stat: bool = True, 840 ) -> List["Commit"]: 841 """ 842 Get all the Commits of this Repository. 843 844 https://hub.allspice.io/api/swagger#/repository/repoGetAllCommits 845 846 :param sha: The SHA of the commit to start listing commits from. 847 :param path: filepath of a file/dir. 848 :param stat: Include the number of additions and deletions in the response. 849 Disable for speedup. 850 :return: A list of Commits. 851 """ 852 853 data = {} 854 if sha: 855 data["sha"] = sha 856 if path: 857 data["path"] = path 858 if not stat: 859 data["stat"] = False 860 861 try: 862 results = self.allspice_client.requests_get_paginated( 863 Repository.REPO_COMMITS % (self.owner.username, self.name), 864 params=data, 865 ) 866 except ConflictException as err: 867 logging.warning(err) 868 logging.warning("Repository %s/%s is Empty" % (self.owner.username, self.name)) 869 results = [] 870 return [Commit.parse_response(self.allspice_client, result) for result in results]
Get all the Commits of this Repository.
allspice.allspice.io/api/swagger#/repository/repoGetAllCommits">https://huballspice.allspice.io/api/swagger#/repository/repoGetAllCommits
Parameters
- sha: The SHA of the commit to start listing commits from.
- path: filepath of a file/dir.
- stat: Include the number of additions and deletions in the response. Disable for speedup.
Returns
A list of Commits.
872 def get_issues_state(self, state) -> List["Issue"]: 873 """ 874 DEPRECATED: Use get_issues() instead. 875 876 Get issues of state Issue.open or Issue.closed of a repository. 877 """ 878 879 assert state in [Issue.OPENED, Issue.CLOSED] 880 issues = [] 881 data = {"state": state} 882 results = self.allspice_client.requests_get_paginated( 883 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 884 params=data, 885 ) 886 for result in results: 887 issue = Issue.parse_response(self.allspice_client, result) 888 # adding data not contained in the issue response 889 # See Issue.request() 890 setattr(issue, "_repository", self) 891 Issue._add_read_property("repo", self, issue) 892 Issue._add_read_property("owner", self.owner, issue) 893 issues.append(issue) 894 return issues
DEPRECATED: Use get_issues() instead.
Get issues of state Issue.open or Issue.closed of a repository.
902 def get_user_time(self, username) -> float: 903 if isinstance(username, User): 904 username = username.username 905 results = self.allspice_client.requests_get( 906 Repository.REPO_USER_TIME % (self.owner.username, self.name, username) 907 ) 908 time = sum(r["time"] for r in results) 909 return time
914 def create_issue(self, title, assignees=frozenset(), description="") -> ApiObject: 915 data = { 916 "assignees": assignees, 917 "body": description, 918 "closed": False, 919 "title": title, 920 } 921 result = self.allspice_client.requests_post( 922 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 923 data=data, 924 ) 925 926 issue = Issue.parse_response(self.allspice_client, result) 927 setattr(issue, "_repository", self) 928 Issue._add_read_property("repo", self, issue) 929 return issue
931 def create_design_review( 932 self, 933 title: str, 934 head: Union[Branch, str], 935 base: Union[Branch, str], 936 assignees: Optional[Set[Union[User, str]]] = None, 937 body: Optional[str] = None, 938 due_date: Optional[datetime] = None, 939 milestone: Optional["Milestone"] = None, 940 ) -> "DesignReview": 941 """ 942 Create a new Design Review. 943 944 See https://hub.allspice.io/api/swagger#/repository/repoCreatePullRequest 945 946 :param title: Title of the Design Review 947 :param head: Branch or name of the branch to merge into the base branch 948 :param base: Branch or name of the branch to merge into 949 :param assignees: Optional. A list of users to assign this review. List can be of 950 User objects or of usernames. 951 :param body: An Optional Description for the Design Review. 952 :param due_date: An Optional Due date for the Design Review. 953 :param milestone: An Optional Milestone for the Design Review 954 :return: The created Design Review 955 """ 956 957 data: dict[str, Any] = { 958 "title": title, 959 } 960 961 if isinstance(head, Branch): 962 data["head"] = head.name 963 else: 964 data["head"] = head 965 if isinstance(base, Branch): 966 data["base"] = base.name 967 else: 968 data["base"] = base 969 if assignees: 970 data["assignees"] = [a.username if isinstance(a, User) else a for a in assignees] 971 if body: 972 data["body"] = body 973 if due_date: 974 data["due_date"] = Util.format_time(due_date) 975 if milestone: 976 data["milestone"] = milestone.id 977 978 result = self.allspice_client.requests_post( 979 self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name), 980 data=data, 981 ) 982 983 return DesignReview.parse_response(self.allspice_client, result)
Create a new Design Review.
See allspice.allspice.io/api/swagger#/repository/repoCreatePullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoCreatePullRequest
Parameters
- title: Title of the Design Review
- head: Branch or name of the branch to merge into the base branch
- base: Branch or name of the branch to merge into
- assignees: Optional. A list of users to assign this review. List can be of User objects or of usernames.
- body: An Optional Description for the Design Review.
- due_date: An Optional Due date for the Design Review.
- milestone: An Optional Milestone for the Design Review
Returns
The created Design Review
985 def create_milestone( 986 self, 987 title: str, 988 description: str, 989 due_date: Optional[str] = None, 990 state: str = "open", 991 ) -> "Milestone": 992 url = Repository.REPO_MILESTONES.format(owner=self.owner.username, repo=self.name) 993 data = {"title": title, "description": description, "state": state} 994 if due_date: 995 data["due_date"] = due_date 996 result = self.allspice_client.requests_post(url, data=data) 997 return Milestone.parse_response(self.allspice_client, result)
999 def create_gitea_hook(self, hook_url: str, events: List[str]): 1000 url = f"/repos/{self.owner.username}/{self.name}/hooks" 1001 data = { 1002 "type": "gitea", 1003 "config": {"content_type": "json", "url": hook_url}, 1004 "events": events, 1005 "active": True, 1006 } 1007 return self.allspice_client.requests_post(url, data=data)
1017 def is_collaborator(self, username) -> bool: 1018 if isinstance(username, User): 1019 username = username.username 1020 try: 1021 # returns 204 if its ok, 404 if its not 1022 self.allspice_client.requests_get( 1023 Repository.REPO_IS_COLLABORATOR % (self.owner.username, self.name, username) 1024 ) 1025 return True 1026 except Exception: 1027 return False
1029 def get_users_with_access(self) -> Sequence[User]: 1030 url = f"/repos/{self.owner.username}/{self.name}/collaborators" 1031 response = self.allspice_client.requests_get(url) 1032 collabs = [User.parse_response(self.allspice_client, user) for user in response] 1033 if isinstance(self.owner, User): 1034 return [*collabs, self.owner] 1035 else: 1036 # owner must be org 1037 teams = self.owner.get_teams() 1038 for team in teams: 1039 team_repos = team.get_repos() 1040 if self.name in [n.name for n in team_repos]: 1041 collabs += team.get_members() 1042 return collabs
1048 def transfer_ownership( 1049 self, 1050 new_owner: Union[User, Organization], 1051 new_teams: Set[Team] | FrozenSet[Team] = frozenset(), 1052 ): 1053 url = Repository.REPO_TRANSFER.format(owner=self.owner.username, repo=self.name) 1054 data: dict[str, Any] = {"new_owner": new_owner.username} 1055 if isinstance(new_owner, Organization): 1056 new_team_ids = [team.id for team in new_teams if team in new_owner.get_teams()] 1057 data["team_ids"] = new_team_ids 1058 self.allspice_client.requests_post(url, data=data) 1059 # TODO: make sure this instance is either updated or discarded
1061 def get_git_content( 1062 self, 1063 ref: Optional["Ref"] = None, 1064 commit: "Optional[Commit]" = None, 1065 ) -> List[Content]: 1066 """ 1067 Get the metadata for all files in the root directory. 1068 1069 https://hub.allspice.io/api/swagger#/repository/repoGetContentsList 1070 1071 :param ref: branch or commit to get content from 1072 :param commit: commit to get content from (deprecated) 1073 """ 1074 url = f"/repos/{self.owner.username}/{self.name}/contents" 1075 data = Util.data_params_for_ref(ref or commit) 1076 1077 result = [ 1078 Content.parse_response(self.allspice_client, f) 1079 for f in self.allspice_client.requests_get(url, data) 1080 ] 1081 return result
Get the metadata for all files in the root directory.
allspice.allspice.io/api/swagger#/repository/repoGetContentsList">https://huballspice.allspice.io/api/swagger#/repository/repoGetContentsList
Parameters
- ref: branch or commit to get content from
- commit: commit to get content from (deprecated)
1083 def get_tree(self, ref: Optional[Ref] = None, recursive: bool = False) -> List[GitEntry]: 1084 """ 1085 Get the repository's tree on a given ref. 1086 1087 By default, this will only return the top-level entries in the tree. If you want 1088 to get the entire tree, set `recursive` to True. 1089 1090 :param ref: The ref to get the tree from. If not provided, the default branch is used. 1091 :param recursive: Whether to get the entire tree or just the top-level entries. 1092 """ 1093 1094 ref = Util.data_params_for_ref(ref).get("ref", self.default_branch) 1095 url = self.REPO_GET_TREE.format(owner=self.owner.username, repo=self.name, ref=ref) 1096 params = {"recursive": recursive} 1097 results = self.allspice_client.requests_get_paginated(url, params=params) 1098 return [GitEntry.parse_response(self.allspice_client, result) for result in results]
Get the repository's tree on a given ref.
By default, this will only return the top-level entries in the tree. If you want
to get the entire tree, set recursive
to True.
Parameters
- ref: The ref to get the tree from. If not provided, the default branch is used.
- recursive: Whether to get the entire tree or just the top-level entries.
1100 def get_file_content( 1101 self, 1102 content: Content, 1103 ref: Optional[Ref] = None, 1104 commit: Optional[Commit] = None, 1105 ) -> Union[str, List["Content"]]: 1106 """https://hub.allspice.io/api/swagger#/repository/repoGetContents""" 1107 url = f"/repos/{self.owner.username}/{self.name}/contents/{content.path}" 1108 data = Util.data_params_for_ref(ref or commit) 1109 1110 if content.type == Content.FILE: 1111 return self.allspice_client.requests_get(url, data)["content"] 1112 else: 1113 return [ 1114 Content.parse_response(self.allspice_client, f) 1115 for f in self.allspice_client.requests_get(url, data) 1116 ]
allspice.allspice.io/api/swagger#/repository/repoGetContents">https://huballspice.allspice.io/api/swagger#/repository/repoGetContents
1118 def get_raw_file( 1119 self, 1120 file_path: str, 1121 ref: Optional[Ref] = None, 1122 ) -> bytes: 1123 """ 1124 Get the raw, binary data of a single file. 1125 1126 Note 1: if the file you are requesting is a text file, you might want to 1127 use .decode() on the result to get a string. For example: 1128 1129 content = repo.get_raw_file("file.txt").decode("utf-8") 1130 1131 Note 2: this method will store the entire file in memory. If you want 1132 to download a large file, you might want to use `download_to_file` 1133 instead. 1134 1135 See https://hub.allspice.io/api/swagger#/repository/repoGetRawFile 1136 1137 :param file_path: The path to the file to get. 1138 :param ref: The branch or commit to get the file from. If not provided, 1139 the default branch is used. 1140 """ 1141 1142 url = self.REPO_GET_RAW_FILE.format( 1143 owner=self.owner.username, 1144 repo=self.name, 1145 path=file_path, 1146 ) 1147 params = Util.data_params_for_ref(ref) 1148 return self.allspice_client.requests_get_raw(url, params=params)
Get the raw, binary data of a single file.
Note 1: if the file you are requesting is a text file, you might want to use .decode() on the result to get a string. For example:
content = repo.get_raw_file("file.txt").decode("utf-8")
Note 2: this method will store the entire file in memory. If you want
to download a large file, you might want to use download_to_file
instead.
See allspice.allspice.io/api/swagger#/repository/repoGetRawFile">https://huballspice.allspice.io/api/swagger#/repository/repoGetRawFile
Parameters
- file_path: The path to the file to get.
- ref: The branch or commit to get the file from. If not provided, the default branch is used.
1150 def download_to_file( 1151 self, 1152 file_path: str, 1153 io: IO, 1154 ref: Optional[Ref] = None, 1155 ) -> None: 1156 """ 1157 Download the binary data of a file to a file-like object. 1158 1159 Example: 1160 1161 with open("schematic.DSN", "wb") as f: 1162 Repository.download_to_file("Schematics/my_schematic.DSN", f) 1163 1164 :param file_path: The path to the file in the repository from the root 1165 of the repository. 1166 :param io: The file-like object to write the data to. 1167 """ 1168 1169 url = self.allspice_client._AllSpice__get_url( 1170 self.REPO_GET_RAW_FILE.format( 1171 owner=self.owner.username, 1172 repo=self.name, 1173 path=file_path, 1174 ) 1175 ) 1176 params = Util.data_params_for_ref(ref) 1177 response = self.allspice_client.requests.get( 1178 url, 1179 params=params, 1180 headers=self.allspice_client.headers, 1181 stream=True, 1182 ) 1183 1184 for chunk in response.iter_content(chunk_size=4096): 1185 if chunk: 1186 io.write(chunk)
Download the binary data of a file to a file-like object.
Example:
with open("schematic.DSN", "wb") as f:
Repository.download_to_file("Schematics/my_schematic.DSN", f)
Parameters
- file_path: The path to the file in the repository from the root of the repository.
- io: The file-like object to write the data to.
1188 def get_generated_json(self, content: Union[Content, str], ref: Optional[Ref] = None) -> dict: 1189 """ 1190 Get the json blob for a cad file if it exists, otherwise enqueue 1191 a new job and return a 503 status. 1192 1193 WARNING: This is still experimental and not recommended for critical 1194 applications. The structure and content of the returned dictionary can 1195 change at any time. 1196 1197 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON 1198 """ 1199 1200 if isinstance(content, Content): 1201 content = content.path 1202 1203 url = self.REPO_GET_ALLSPICE_JSON.format( 1204 owner=self.owner.username, 1205 repo=self.name, 1206 content=content, 1207 ) 1208 data = Util.data_params_for_ref(ref) 1209 return self.allspice_client.requests_get(url, data)
Get the json blob for a cad file if it exists, otherwise enqueue a new job and return a 503 status.
WARNING: This is still experimental and not recommended for critical applications. The structure and content of the returned dictionary can change at any time.
See allspice.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON">https://huballspice.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON
1211 def get_generated_svg(self, content: Union[Content, str], ref: Optional[Ref] = None) -> bytes: 1212 """ 1213 Get the svg blob for a cad file if it exists, otherwise enqueue 1214 a new job and return a 503 status. 1215 1216 WARNING: This is still experimental and not yet recommended for 1217 critical applications. The content of the returned svg can change 1218 at any time. 1219 1220 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG 1221 """ 1222 1223 if isinstance(content, Content): 1224 content = content.path 1225 1226 url = self.REPO_GET_ALLSPICE_SVG.format( 1227 owner=self.owner.username, 1228 repo=self.name, 1229 content=content, 1230 ) 1231 data = Util.data_params_for_ref(ref) 1232 return self.allspice_client.requests_get_raw(url, data)
Get the svg blob for a cad file if it exists, otherwise enqueue a new job and return a 503 status.
WARNING: This is still experimental and not yet recommended for critical applications. The content of the returned svg can change at any time.
See allspice.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG">https://huballspice.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG
1234 def get_generated_projectdata( 1235 self, content: Union[Content, str], ref: Optional[Ref] = None 1236 ) -> dict: 1237 """ 1238 Get the json project data based on the cad file provided 1239 1240 WARNING: This is still experimental and not yet recommended for 1241 critical applications. The content of the returned dictionary can change 1242 at any time. 1243 1244 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceProject 1245 """ 1246 if isinstance(content, Content): 1247 content = content.path 1248 1249 url = self.REPO_GET_ALLSPICE_PROJECT.format( 1250 owner=self.owner.username, 1251 repo=self.name, 1252 content=content, 1253 ) 1254 data = Util.data_params_for_ref(ref) 1255 return self.allspice_client.requests_get(url, data)
Get the json project data based on the cad file provided
WARNING: This is still experimental and not yet recommended for critical applications. The content of the returned dictionary can change at any time.
See allspice.allspice.io/api/swagger#/repository/repoGetAllSpiceProject">https://huballspice.allspice.io/api/swagger#/repository/repoGetAllSpiceProject
1257 def create_file(self, file_path: str, content: str, data: Optional[dict] = None): 1258 """https://hub.allspice.io/api/swagger#/repository/repoCreateFile""" 1259 if not data: 1260 data = {} 1261 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1262 data.update({"content": content}) 1263 return self.allspice_client.requests_post(url, data)
allspice.allspice.io/api/swagger#/repository/repoCreateFile">https://huballspice.allspice.io/api/swagger#/repository/repoCreateFile
1265 def change_file(self, file_path: str, file_sha: str, content: str, data: Optional[dict] = None): 1266 """https://hub.allspice.io/api/swagger#/repository/repoCreateFile""" 1267 if not data: 1268 data = {} 1269 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1270 data.update({"sha": file_sha, "content": content}) 1271 return self.allspice_client.requests_put(url, data)
allspice.allspice.io/api/swagger#/repository/repoCreateFile">https://huballspice.allspice.io/api/swagger#/repository/repoCreateFile
1273 def delete_file(self, file_path: str, file_sha: str, data: Optional[dict] = None): 1274 """https://hub.allspice.io/api/swagger#/repository/repoDeleteFile""" 1275 if not data: 1276 data = {} 1277 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1278 data.update({"sha": file_sha}) 1279 return self.allspice_client.requests_delete(url, data)
allspice.allspice.io/api/swagger#/repository/repoDeleteFile">https://huballspice.allspice.io/api/swagger#/repository/repoDeleteFile
1281 def get_archive( 1282 self, 1283 ref: Ref = "main", 1284 archive_format: ArchiveFormat = ArchiveFormat.ZIP, 1285 ) -> bytes: 1286 """ 1287 Download all the files in a specific ref of a repository as a zip or tarball 1288 archive. 1289 1290 https://hub.allspice.io/api/swagger#/repository/repoGetArchive 1291 1292 :param ref: branch or commit to get content from, defaults to the "main" branch 1293 :param archive_format: zip or tar, defaults to zip 1294 """ 1295 1296 ref_string = Util.data_params_for_ref(ref)["ref"] 1297 url = self.REPO_GET_ARCHIVE.format( 1298 owner=self.owner.username, 1299 repo=self.name, 1300 ref=ref_string, 1301 format=archive_format.value, 1302 ) 1303 return self.allspice_client.requests_get_raw(url)
Download all the files in a specific ref of a repository as a zip or tarball archive.
allspice.allspice.io/api/swagger#/repository/repoGetArchive">https://huballspice.allspice.io/api/swagger#/repository/repoGetArchive
Parameters
- ref: branch or commit to get content from, defaults to the "main" branch
- archive_format: zip or tar, defaults to zip
1305 def get_topics(self) -> list[str]: 1306 """ 1307 Gets the list of topics on this repository. 1308 1309 See http://localhost:3000/api/swagger#/repository/repoListTopics 1310 """ 1311 1312 url = self.REPO_GET_TOPICS.format( 1313 owner=self.owner.username, 1314 repo=self.name, 1315 ) 1316 return self.allspice_client.requests_get(url)["topics"]
Gets the list of topics on this repository.
See http://localhost:3000/api/swagger#/repository/repoListTopics
1318 def add_topic(self, topic: str): 1319 """ 1320 Adds a topic to the repository. 1321 1322 See https://hub.allspice.io/api/swagger#/repository/repoAddTopic 1323 1324 :param topic: The topic to add. Topic names must consist only of 1325 lowercase letters, numnbers and dashes (-), and cannot start with 1326 dashes. Topic names also must be under 35 characters long. 1327 """ 1328 1329 url = self.REPO_ADD_TOPIC.format(owner=self.owner.username, repo=self.name, topic=topic) 1330 self.allspice_client.requests_put(url)
Adds a topic to the repository.
See allspice.allspice.io/api/swagger#/repository/repoAddTopic">https://huballspice.allspice.io/api/swagger#/repository/repoAddTopic
Parameters
- topic: The topic to add. Topic names must consist only of lowercase letters, numnbers and dashes (-), and cannot start with dashes. Topic names also must be under 35 characters long.
1332 def create_release( 1333 self, 1334 tag_name: str, 1335 name: Optional[str] = None, 1336 body: Optional[str] = None, 1337 draft: bool = False, 1338 ): 1339 """ 1340 Create a release for this repository. The release will be created for 1341 the tag with the given name. If there is no tag with this name, create 1342 the tag first. 1343 1344 See https://hub.allspice.io/api/swagger#/repository/repoCreateRelease 1345 """ 1346 1347 url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name) 1348 data = { 1349 "tag_name": tag_name, 1350 "draft": draft, 1351 } 1352 if name is not None: 1353 data["name"] = name 1354 if body is not None: 1355 data["body"] = body 1356 response = self.allspice_client.requests_post(url, data) 1357 return Release.parse_response(self.allspice_client, response, self)
Create a release for this repository. The release will be created for the tag with the given name. If there is no tag with this name, create the tag first.
See allspice.allspice.io/api/swagger#/repository/repoCreateRelease">https://huballspice.allspice.io/api/swagger#/repository/repoCreateRelease
1359 def get_releases( 1360 self, draft: Optional[bool] = None, pre_release: Optional[bool] = None 1361 ) -> List[Release]: 1362 """ 1363 Get the list of releases for this repository. 1364 1365 See https://hub.allspice.io/api/swagger#/repository/repoListReleases 1366 """ 1367 1368 data = {} 1369 1370 if draft is not None: 1371 data["draft"] = draft 1372 if pre_release is not None: 1373 data["pre-release"] = pre_release 1374 1375 url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name) 1376 responses = self.allspice_client.requests_get_paginated(url, params=data) 1377 1378 return [ 1379 Release.parse_response(self.allspice_client, response, self) for response in responses 1380 ]
Get the list of releases for this repository.
See allspice.allspice.io/api/swagger#/repository/repoListReleases">https://huballspice.allspice.io/api/swagger#/repository/repoListReleases
1382 def get_latest_release(self) -> Release: 1383 """ 1384 Get the latest release for this repository. 1385 1386 See https://hub.allspice.io/api/swagger#/repository/repoGetLatestRelease 1387 """ 1388 1389 url = self.REPO_GET_LATEST_RELEASE.format(owner=self.owner.username, repo=self.name) 1390 response = self.allspice_client.requests_get(url) 1391 release = Release.parse_response(self.allspice_client, response, self) 1392 return release
Get the latest release for this repository.
See allspice.allspice.io/api/swagger#/repository/repoGetLatestRelease">https://huballspice.allspice.io/api/swagger#/repository/repoGetLatestRelease
1394 def get_release_by_tag(self, tag: str) -> Release: 1395 """ 1396 Get a release by its tag. 1397 1398 See https://hub.allspice.io/api/swagger#/repository/repoGetReleaseByTag 1399 """ 1400 1401 url = self.REPO_GET_RELEASE_BY_TAG.format( 1402 owner=self.owner.username, repo=self.name, tag=tag 1403 ) 1404 response = self.allspice_client.requests_get(url) 1405 release = Release.parse_response(self.allspice_client, response, self) 1406 return release
Get a release by its tag.
See allspice.allspice.io/api/swagger#/repository/repoGetReleaseByTag">https://huballspice.allspice.io/api/swagger#/repository/repoGetReleaseByTag
1408 def get_commit_statuses( 1409 self, 1410 commit: Union[str, Commit], 1411 sort: Optional[CommitStatusSort] = None, 1412 state: Optional[CommitStatusState] = None, 1413 ) -> List[CommitStatus]: 1414 """ 1415 Get a list of statuses for a commit. 1416 1417 This is roughly equivalent to the Commit.get_statuses method, but this 1418 method allows you to sort and filter commits and is more convenient if 1419 you have a commit SHA and don't need to get the commit itself. 1420 1421 See https://hub.allspice.io/api/swagger#/repository/repoListStatuses 1422 """ 1423 1424 if isinstance(commit, Commit): 1425 commit = commit.sha 1426 1427 params = {} 1428 if sort is not None: 1429 params["sort"] = sort.value 1430 if state is not None: 1431 params["state"] = state.value 1432 1433 url = self.REPO_GET_COMMIT_STATUS.format( 1434 owner=self.owner.username, repo=self.name, sha=commit 1435 ) 1436 response = self.allspice_client.requests_get_paginated(url, params=params) 1437 return [CommitStatus.parse_response(self.allspice_client, status) for status in response]
Get a list of statuses for a commit.
This is roughly equivalent to the Commit.get_statuses method, but this method allows you to sort and filter commits and is more convenient if you have a commit SHA and don't need to get the commit itself.
See allspice.allspice.io/api/swagger#/repository/repoListStatuses">https://huballspice.allspice.io/api/swagger#/repository/repoListStatuses
1439 def create_commit_status( 1440 self, 1441 commit: Union[str, Commit], 1442 context: Optional[str] = None, 1443 description: Optional[str] = None, 1444 state: Optional[CommitStatusState] = None, 1445 target_url: Optional[str] = None, 1446 ) -> CommitStatus: 1447 """ 1448 Create a status on a commit. 1449 1450 See https://hub.allspice.io/api/swagger#/repository/repoCreateStatus 1451 """ 1452 1453 if isinstance(commit, Commit): 1454 commit = commit.sha 1455 1456 data = {} 1457 if context is not None: 1458 data["context"] = context 1459 if description is not None: 1460 data["description"] = description 1461 if state is not None: 1462 data["state"] = state.value 1463 if target_url is not None: 1464 data["target_url"] = target_url 1465 1466 url = self.REPO_GET_COMMIT_STATUS.format( 1467 owner=self.owner.username, repo=self.name, sha=commit 1468 ) 1469 response = self.allspice_client.requests_post(url, data=data) 1470 return CommitStatus.parse_response(self.allspice_client, response)
Create a status on a commit.
See allspice.allspice.io/api/swagger#/repository/repoCreateStatus">https://huballspice.allspice.io/api/swagger#/repository/repoCreateStatus
573 class ArchiveFormat(Enum): 574 """ 575 Archive formats for Repository.get_archive 576 """ 577 578 TAR = "tar.gz" 579 ZIP = "zip"
Archive formats for Repository.get_archive
581 class CommitStatusSort(Enum): 582 """ 583 Sort order for Repository.get_commit_status 584 """ 585 586 OLDEST = "oldest" 587 RECENT_UPDATE = "recentupdate" 588 LEAST_UPDATE = "leastupdate" 589 LEAST_INDEX = "leastindex" 590 HIGHEST_INDEX = "highestindex"
Sort order for Repository.get_commit_status
2582class Team(ApiObject): 2583 can_create_org_repo: bool 2584 description: str 2585 id: int 2586 includes_all_repositories: bool 2587 name: str 2588 organization: Optional["Organization"] 2589 permission: str 2590 units: List[str] 2591 units_map: Dict[str, str] 2592 2593 API_OBJECT = """/teams/{id}""" # <id> 2594 ADD_REPO = """/teams/%s/repos/%s/%s""" # <id, org, repo> 2595 TEAM_DELETE = """/teams/%s""" # <id> 2596 GET_MEMBERS = """/teams/%s/members""" # <id> 2597 GET_REPOS = """/teams/%s/repos""" # <id> 2598 2599 def __init__(self, allspice_client): 2600 super().__init__(allspice_client) 2601 2602 def __eq__(self, other): 2603 if not isinstance(other, Team): 2604 return False 2605 return self.organization == other.organization and self.id == other.id 2606 2607 def __hash__(self): 2608 return hash(self.organization) ^ hash(self.id) 2609 2610 _fields_to_parsers: ClassVar[dict] = { 2611 "organization": lambda allspice_client, o: Organization.parse_response(allspice_client, o) 2612 } 2613 2614 _patchable_fields: ClassVar[set[str]] = { 2615 "can_create_org_repo", 2616 "description", 2617 "includes_all_repositories", 2618 "name", 2619 "permission", 2620 "units", 2621 "units_map", 2622 } 2623 2624 @classmethod 2625 def request(cls, allspice_client, id: int): 2626 return cls._request(allspice_client, {"id": id}) 2627 2628 def commit(self): 2629 args = {"id": self.id} 2630 self._commit(args) 2631 2632 def add_user(self, user: User): 2633 """https://hub.allspice.io/api/swagger#/organization/orgAddTeamMember""" 2634 url = f"/teams/{self.id}/members/{user.login}" 2635 self.allspice_client.requests_put(url) 2636 2637 def add_repo(self, org: Organization, repo: Union[Repository, str]): 2638 if isinstance(repo, Repository): 2639 repo_name = repo.name 2640 else: 2641 repo_name = repo 2642 self.allspice_client.requests_put(Team.ADD_REPO % (self.id, org.username, repo_name)) 2643 2644 def get_members(self): 2645 """Get all users assigned to the team.""" 2646 results = self.allspice_client.requests_get_paginated( 2647 Team.GET_MEMBERS % self.id, 2648 ) 2649 return [User.parse_response(self.allspice_client, result) for result in results] 2650 2651 def get_repos(self): 2652 """Get all repos of this Team.""" 2653 results = self.allspice_client.requests_get(Team.GET_REPOS % self.id) 2654 return [Repository.parse_response(self.allspice_client, result) for result in results] 2655 2656 def delete(self): 2657 self.allspice_client.requests_delete(Team.TEAM_DELETE % self.id) 2658 self.deleted = True 2659 2660 def remove_team_member(self, user_name: str): 2661 url = f"/teams/{self.id}/members/{user_name}" 2662 self.allspice_client.requests_delete(url)
2632 def add_user(self, user: User): 2633 """https://hub.allspice.io/api/swagger#/organization/orgAddTeamMember""" 2634 url = f"/teams/{self.id}/members/{user.login}" 2635 self.allspice_client.requests_put(url)
allspice.allspice.io/api/swagger#/organization/orgAddTeamMember">https://huballspice.allspice.io/api/swagger#/organization/orgAddTeamMember
2644 def get_members(self): 2645 """Get all users assigned to the team.""" 2646 results = self.allspice_client.requests_get_paginated( 2647 Team.GET_MEMBERS % self.id, 2648 ) 2649 return [User.parse_response(self.allspice_client, result) for result in results]
Get all users assigned to the team.
2651 def get_repos(self): 2652 """Get all repos of this Team.""" 2653 results = self.allspice_client.requests_get(Team.GET_REPOS % self.id) 2654 return [Repository.parse_response(self.allspice_client, result) for result in results]
Get all repos of this Team.
244class User(ApiObject): 245 active: bool 246 admin: Any 247 allow_create_organization: Any 248 allow_git_hook: Any 249 allow_import_local: Any 250 avatar_url: str 251 created: str 252 description: str 253 email: str 254 emails: List[Any] 255 followers_count: int 256 following_count: int 257 full_name: str 258 html_url: str 259 id: int 260 is_admin: bool 261 language: str 262 last_login: str 263 location: str 264 login: str 265 login_name: str 266 max_repo_creation: Any 267 must_change_password: Any 268 password: Any 269 prohibit_login: bool 270 restricted: bool 271 source_id: int 272 starred_repos_count: int 273 username: str 274 visibility: str 275 website: str 276 277 API_OBJECT = """/users/{name}""" # <org> 278 USER_MAIL = """/user/emails?sudo=%s""" # <name> 279 USER_PATCH = """/admin/users/%s""" # <username> 280 ADMIN_DELETE_USER = """/admin/users/%s""" # <username> 281 ADMIN_EDIT_USER = """/admin/users/{username}""" # <username> 282 USER_HEATMAP = """/users/%s/heatmap""" # <username> 283 284 def __init__(self, allspice_client): 285 super().__init__(allspice_client) 286 self._emails = [] 287 288 def __eq__(self, other): 289 if not isinstance(other, User): 290 return False 291 return self.allspice_client == other.allspice_client and self.id == other.id 292 293 def __hash__(self): 294 return hash(self.allspice_client) ^ hash(self.id) 295 296 @property 297 def emails(self): 298 self.__request_emails() 299 return self._emails 300 301 @classmethod 302 def request(cls, allspice_client, name: str) -> "User": 303 api_object = cls._request(allspice_client, {"name": name}) 304 return api_object 305 306 _patchable_fields: ClassVar[set[str]] = { 307 "active", 308 "admin", 309 "allow_create_organization", 310 "allow_git_hook", 311 "allow_import_local", 312 "email", 313 "full_name", 314 "location", 315 "login_name", 316 "max_repo_creation", 317 "must_change_password", 318 "password", 319 "prohibit_login", 320 "website", 321 } 322 323 def commit(self, login_name: str, source_id: int = 0): 324 """ 325 Unfortunately it is necessary to require the login name 326 as well as the login source (that is not supplied when getting a user) for 327 changing a user. 328 Usually source_id is 0 and the login_name is equal to the username. 329 """ 330 values = self.get_dirty_fields() 331 values.update( 332 # api-doc says that the "source_id" is necessary; works without though 333 {"login_name": login_name, "source_id": source_id} 334 ) 335 args = {"username": self.username} 336 self.allspice_client.requests_patch(User.ADMIN_EDIT_USER.format(**args), data=values) 337 self._dirty_fields = {} 338 339 def create_repo( 340 self, 341 repoName: str, 342 description: str = "", 343 private: bool = False, 344 autoInit=True, 345 gitignores: Optional[str] = None, 346 license: Optional[str] = None, 347 readme: str = "Default", 348 issue_labels: Optional[str] = None, 349 default_branch="master", 350 ): 351 """Create a user Repository 352 353 Throws: 354 AlreadyExistsException: If the Repository exists already. 355 Exception: If something else went wrong. 356 """ 357 result = self.allspice_client.requests_post( 358 "/user/repos", 359 data={ 360 "name": repoName, 361 "description": description, 362 "private": private, 363 "auto_init": autoInit, 364 "gitignores": gitignores, 365 "license": license, 366 "issue_labels": issue_labels, 367 "readme": readme, 368 "default_branch": default_branch, 369 }, 370 ) 371 if "id" in result: 372 self.allspice_client.logger.info("Successfully created Repository %s " % result["name"]) 373 else: 374 self.allspice_client.logger.error(result["message"]) 375 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 376 return Repository.parse_response(self.allspice_client, result) 377 378 def get_repositories(self) -> List["Repository"]: 379 """Get all Repositories owned by this User.""" 380 url = f"/users/{self.username}/repos" 381 results = self.allspice_client.requests_get_paginated(url) 382 return [Repository.parse_response(self.allspice_client, result) for result in results] 383 384 def get_orgs(self) -> List[Organization]: 385 """Get all Organizations this user is a member of.""" 386 url = f"/users/{self.username}/orgs" 387 results = self.allspice_client.requests_get_paginated(url) 388 return [Organization.parse_response(self.allspice_client, result) for result in results] 389 390 def get_teams(self) -> List["Team"]: 391 url = "/user/teams" 392 results = self.allspice_client.requests_get_paginated(url, sudo=self) 393 return [Team.parse_response(self.allspice_client, result) for result in results] 394 395 def get_accessible_repos(self) -> List["Repository"]: 396 """Get all Repositories accessible by the logged in User.""" 397 results = self.allspice_client.requests_get("/user/repos", sudo=self) 398 return [Repository.parse_response(self.allspice_client, result) for result in results] 399 400 def __request_emails(self): 401 result = self.allspice_client.requests_get(User.USER_MAIL % self.login) 402 # report if the adress changed by this 403 for mail in result: 404 self._emails.append(mail["email"]) 405 if mail["primary"]: 406 self._email = mail["email"] 407 408 def delete(self): 409 """Deletes this User. Also deletes all Repositories he owns.""" 410 self.allspice_client.requests_delete(User.ADMIN_DELETE_USER % self.username) 411 self.deleted = True 412 413 def get_heatmap(self) -> List[Tuple[datetime, int]]: 414 results = self.allspice_client.requests_get(User.USER_HEATMAP % self.username) 415 results = [ 416 (datetime.fromtimestamp(result["timestamp"]), result["contributions"]) 417 for result in results 418 ] 419 return results
323 def commit(self, login_name: str, source_id: int = 0): 324 """ 325 Unfortunately it is necessary to require the login name 326 as well as the login source (that is not supplied when getting a user) for 327 changing a user. 328 Usually source_id is 0 and the login_name is equal to the username. 329 """ 330 values = self.get_dirty_fields() 331 values.update( 332 # api-doc says that the "source_id" is necessary; works without though 333 {"login_name": login_name, "source_id": source_id} 334 ) 335 args = {"username": self.username} 336 self.allspice_client.requests_patch(User.ADMIN_EDIT_USER.format(**args), data=values) 337 self._dirty_fields = {}
Unfortunately it is necessary to require the login name as well as the login source (that is not supplied when getting a user) for changing a user. Usually source_id is 0 and the login_name is equal to the username.
339 def create_repo( 340 self, 341 repoName: str, 342 description: str = "", 343 private: bool = False, 344 autoInit=True, 345 gitignores: Optional[str] = None, 346 license: Optional[str] = None, 347 readme: str = "Default", 348 issue_labels: Optional[str] = None, 349 default_branch="master", 350 ): 351 """Create a user Repository 352 353 Throws: 354 AlreadyExistsException: If the Repository exists already. 355 Exception: If something else went wrong. 356 """ 357 result = self.allspice_client.requests_post( 358 "/user/repos", 359 data={ 360 "name": repoName, 361 "description": description, 362 "private": private, 363 "auto_init": autoInit, 364 "gitignores": gitignores, 365 "license": license, 366 "issue_labels": issue_labels, 367 "readme": readme, 368 "default_branch": default_branch, 369 }, 370 ) 371 if "id" in result: 372 self.allspice_client.logger.info("Successfully created Repository %s " % result["name"]) 373 else: 374 self.allspice_client.logger.error(result["message"]) 375 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 376 return Repository.parse_response(self.allspice_client, result)
Create a user Repository
Throws: AlreadyExistsException: If the Repository exists already. Exception: If something else went wrong.
378 def get_repositories(self) -> List["Repository"]: 379 """Get all Repositories owned by this User.""" 380 url = f"/users/{self.username}/repos" 381 results = self.allspice_client.requests_get_paginated(url) 382 return [Repository.parse_response(self.allspice_client, result) for result in results]
Get all Repositories owned by this User.
384 def get_orgs(self) -> List[Organization]: 385 """Get all Organizations this user is a member of.""" 386 url = f"/users/{self.username}/orgs" 387 results = self.allspice_client.requests_get_paginated(url) 388 return [Organization.parse_response(self.allspice_client, result) for result in results]
Get all Organizations this user is a member of.
395 def get_accessible_repos(self) -> List["Repository"]: 396 """Get all Repositories accessible by the logged in User.""" 397 results = self.allspice_client.requests_get("/user/repos", sudo=self) 398 return [Repository.parse_response(self.allspice_client, result) for result in results]
Get all Repositories accessible by the logged in User.
408 def delete(self): 409 """Deletes this User. Also deletes all Repositories he owns.""" 410 self.allspice_client.requests_delete(User.ADMIN_DELETE_USER % self.username) 411 self.deleted = True
Deletes this User. Also deletes all Repositories he owns.