allspice
A very simple API client for AllSpice Hub
Note that not the full Swagger-API is accessible. The whole implementation is focused on making access and working with Organizations, Teams, Repositories and Users as pain free as possible.
Forked from https://github.com/Langenfeld/py-gitea.
Usage
Docs
See the documentation site.
Examples
Check the examples directory for full, working example scripts that you can adapt or refer to for your own needs.
Quickstart
First get an allspice_client
object wrapping access and authentication (via an api token) for your instance of AllSpice Hub.
from allspice import *
# By default, points to hub.allspice.io.
allspice_client = AllSpice(token_text=TOKEN)
# If you are self-hosting:
allspice_client = AllSpice(allspice_hub_url=URL, token_text=TOKEN)
Operations like requesting the AllSpice version or authentication user can be requested directly from the allspice_client
object:
print("AllSpice Version: " + allspice_client.get_version())
print("API-Token belongs to user: " + allspice_client.get_user().username)
Adding entities like Users, Organizations, ... also is done via the allspice_client object.
user = allspice_client.create_user("Test Testson", "test@test.test", "password")
All operations on entities in allspice are then accomplished via the according wrapper objects for those entities.
Each of those objects has a .request
method that creates an entity according to your allspice_client instance.
other_user = User.request(allspice_client, "OtherUserName")
print(other_user.username)
Note that the fields of the User, Organization,... classes are dynamically created at runtime, and thus not visible during divelopment. Refer to the AllSpice API documentation for the fields names.
Fields that can not be altered via allspice-api, are read only. After altering a field, the .commit
method of the according object must be called to synchronize the changed fields with your allspice_client instance.
org = Organization.request(allspice_client, test_org)
org.description = "some new description"
org.location = "some new location"
org.commit()
An entity in allspice can be deleted by calling delete.
org.delete()
All entity objects do have methods to execute some of the requests possible though the AllSpice api:
org = Organization.request(allspice_client, ORGNAME)
teams = org.get_teams()
for team in teams:
repos = team.get_repos()
for repo in repos:
print(repo.name)
1""" 2.. include:: ../README.md 3 :start-line: 1 4 :end-before: Installation 5""" 6 7from .allspice import ( 8 AllSpice, 9) 10from .apiobject import ( 11 Branch, 12 Comment, 13 Commit, 14 Content, 15 DesignReview, 16 Issue, 17 Milestone, 18 Organization, 19 Release, 20 Repository, 21 Team, 22 User, 23) 24from .exceptions import AlreadyExistsException, NotFoundException 25 26__version__ = "3.7.0" 27 28__all__ = [ 29 "AllSpice", 30 "AlreadyExistsException", 31 "Branch", 32 "Comment", 33 "Commit", 34 "Content", 35 "DesignReview", 36 "Issue", 37 "Milestone", 38 "NotFoundException", 39 "Organization", 40 "Release", 41 "Repository", 42 "Team", 43 "User", 44]
20class AllSpice: 21 """Object to establish a session with AllSpice Hub.""" 22 23 ADMIN_CREATE_USER = """/admin/users""" 24 GET_USERS_ADMIN = """/admin/users""" 25 ADMIN_REPO_CREATE = """/admin/users/%s/repos""" # <ownername> 26 ALLSPICE_HUB_VERSION = """/version""" 27 GET_USER = """/user""" 28 GET_REPOSITORY = """/repos/{owner}/{name}""" 29 CREATE_ORG = """/admin/users/%s/orgs""" # <username> 30 CREATE_TEAM = """/orgs/%s/teams""" # <orgname> 31 32 def __init__( 33 self, 34 allspice_hub_url="https://hub.allspice.io", 35 token_text=None, 36 auth=None, 37 verify=True, 38 log_level="INFO", 39 ratelimiting=(100, 60), 40 ): 41 """Initializing an instance of the AllSpice Hub Client 42 43 Args: 44 allspice_hub_url (str): The URL for the AllSpice Hub instance. 45 Defaults to `https://hub.allspice.io`. 46 47 token_text (str, None): The access token, by default None. 48 49 auth (tuple, None): The user credentials 50 `(username, password)`, by default None. 51 52 verify (bool): If True, allow insecure server connections 53 when using SSL. 54 55 log_level (str): The log level, by default `INFO`. 56 57 ratelimiting (tuple[int, int], None): `(max_calls, period)`, 58 If None, no rate limiting is applied. By default, 100 calls 59 per minute are allowed. 60 """ 61 62 self.logger = logging.getLogger(__name__) 63 self.logger.setLevel(log_level) 64 self.headers = { 65 "Content-type": "application/json", 66 } 67 self.url = allspice_hub_url 68 69 if ratelimiting is None: 70 self.requests = requests.Session() 71 else: 72 (max_calls, period) = ratelimiting 73 self.requests = RateLimitedSession(max_calls=max_calls, period=period) 74 75 # Manage authentification 76 if not token_text and not auth: 77 raise ValueError("Please provide auth or token_text, but not both") 78 if token_text: 79 self.headers["Authorization"] = "token " + token_text 80 if auth: 81 self.logger.warning( 82 "Using basic auth is not recommended. Prefer using a token instead." 83 ) 84 self.requests.auth = auth 85 86 # Manage SSL certification verification 87 self.requests.verify = verify 88 if not verify: 89 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) 90 91 def __get_url(self, endpoint): 92 url = self.url + "/api/v1" + endpoint 93 self.logger.debug("Url: %s" % url) 94 return url 95 96 def __get(self, endpoint: str, params: Mapping = frozendict()) -> requests.Response: 97 request = self.requests.get(self.__get_url(endpoint), headers=self.headers, params=params) 98 if request.status_code not in [200, 201]: 99 message = f"Received status code: {request.status_code} ({request.url})" 100 if request.status_code in [404]: 101 raise NotFoundException(message) 102 if request.status_code in [403]: 103 raise Exception( 104 f"Unauthorized: {request.url} - Check your permissions and try again! ({message})" 105 ) 106 if request.status_code in [409]: 107 raise ConflictException(message) 108 if request.status_code in [503]: 109 raise NotYetGeneratedException(message) 110 raise Exception(message) 111 return request 112 113 @staticmethod 114 def parse_result(result) -> Dict: 115 """Parses the result-JSON to a dict.""" 116 if result.text and len(result.text) > 3: 117 return json.loads(result.text) 118 return {} 119 120 def requests_get(self, endpoint: str, params: Mapping = frozendict(), sudo=None): 121 combined_params = {} 122 combined_params.update(params) 123 if sudo: 124 combined_params["sudo"] = sudo.username 125 return self.parse_result(self.__get(endpoint, combined_params)) 126 127 def requests_get_raw(self, endpoint: str, params=frozendict(), sudo=None) -> bytes: 128 combined_params = {} 129 combined_params.update(params) 130 if sudo: 131 combined_params["sudo"] = sudo.username 132 return self.__get(endpoint, combined_params).content 133 134 def requests_get_paginated( 135 self, 136 endpoint: str, 137 params=frozendict(), 138 sudo=None, 139 page_key: str = "page", 140 first_page: int = 1, 141 ): 142 page = first_page 143 combined_params = {} 144 combined_params.update(params) 145 aggregated_result = [] 146 while True: 147 combined_params[page_key] = page 148 result = self.requests_get(endpoint, combined_params, sudo) 149 150 if not result: 151 return aggregated_result 152 153 if isinstance(result, dict): 154 if "data" in result: 155 data = result["data"] 156 if len(data) == 0: 157 return aggregated_result 158 aggregated_result.extend(data) 159 elif "tree" in result: 160 data = result["tree"] 161 if data is None or len(data) == 0: 162 return aggregated_result 163 aggregated_result.extend(data) 164 else: 165 raise NotImplementedError( 166 "requests_get_paginated does not know how to handle responses of this type." 167 ) 168 else: 169 aggregated_result.extend(result) 170 171 page += 1 172 173 def requests_put(self, endpoint: str, data: Optional[dict] = None): 174 if not data: 175 data = {} 176 request = self.requests.put( 177 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 178 ) 179 if request.status_code not in [200, 204]: 180 message = f"Received status code: {request.status_code} ({request.url}) {request.text}" 181 self.logger.error(message) 182 raise Exception(message) 183 184 def requests_delete(self, endpoint: str, data: Optional[dict] = None): 185 request = self.requests.delete( 186 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 187 ) 188 if request.status_code not in [200, 204]: 189 message = f"Received status code: {request.status_code} ({request.url})" 190 self.logger.error(message) 191 raise Exception(message) 192 193 def requests_post( 194 self, 195 endpoint: str, 196 data: Optional[dict] = None, 197 params: Optional[dict] = None, 198 files: Optional[dict] = None, 199 ): 200 """ 201 Make a POST call to the endpoint. 202 203 :param endpoint: The path to the endpoint 204 :param data: A dictionary for JSON data 205 :param params: A dictionary of query params 206 :param files: A dictionary of files, see requests.post. Using both files and data 207 can lead to unexpected results! 208 :return: The JSON response parsed as a dict 209 """ 210 211 # This should ideally be a TypedDict of the type of arguments taken by 212 # `requests.post`. 213 args: dict[str, Any] = { 214 "headers": self.headers.copy(), 215 } 216 if data is not None: 217 args["data"] = json.dumps(data) 218 if params is not None: 219 args["params"] = params 220 if files is not None: 221 args["headers"].pop("Content-type") 222 args["files"] = files 223 224 request = self.requests.post(self.__get_url(endpoint), **args) 225 226 if request.status_code not in [200, 201, 202]: 227 if "already exists" in request.text or "e-mail already in use" in request.text: 228 self.logger.warning(request.text) 229 raise AlreadyExistsException() 230 self.logger.error(f"Received status code: {request.status_code} ({request.url})") 231 self.logger.error(f"With info: {data} ({self.headers})") 232 self.logger.error(f"Answer: {request.text}") 233 raise Exception( 234 f"Received status code: {request.status_code} ({request.url}), {request.text}" 235 ) 236 return self.parse_result(request) 237 238 def requests_patch(self, endpoint: str, data: dict): 239 request = self.requests.patch( 240 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 241 ) 242 if request.status_code not in [200, 201]: 243 error_message = f"Received status code: {request.status_code} ({request.url}) {data}" 244 self.logger.error(error_message) 245 raise Exception(error_message) 246 return self.parse_result(request) 247 248 def get_orgs_public_members_all(self, orgname): 249 path = "/orgs/" + orgname + "/public_members" 250 return self.requests_get(path) 251 252 def get_orgs(self): 253 path = "/admin/orgs" 254 results = self.requests_get(path) 255 return [Organization.parse_response(self, result) for result in results] 256 257 def get_user(self): 258 result = self.requests_get(AllSpice.GET_USER) 259 return User.parse_response(self, result) 260 261 def get_version(self) -> str: 262 result = self.requests_get(AllSpice.ALLSPICE_HUB_VERSION) 263 return result["version"] 264 265 def get_users(self) -> List[User]: 266 results = self.requests_get(AllSpice.GET_USERS_ADMIN) 267 return [User.parse_response(self, result) for result in results] 268 269 def get_user_by_email(self, email: str) -> Optional[User]: 270 users = self.get_users() 271 for user in users: 272 if user.email == email or email in user.emails: 273 return user 274 return None 275 276 def get_user_by_name(self, username: str) -> Optional[User]: 277 users = self.get_users() 278 for user in users: 279 if user.username == username: 280 return user 281 return None 282 283 def get_repository(self, owner: str, name: str) -> Repository: 284 path = self.GET_REPOSITORY.format(owner=owner, name=name) 285 result = self.requests_get(path) 286 return Repository.parse_response(self, result) 287 288 def create_user( 289 self, 290 user_name: str, 291 email: str, 292 password: str, 293 full_name: Optional[str] = None, 294 login_name: Optional[str] = None, 295 change_pw=True, 296 send_notify=True, 297 source_id=0, 298 ): 299 """Create User. 300 Throws: 301 AlreadyExistsException, if the User exists already 302 Exception, if something else went wrong. 303 """ 304 if not login_name: 305 login_name = user_name 306 if not full_name: 307 full_name = user_name 308 request_data = { 309 "source_id": source_id, 310 "login_name": login_name, 311 "full_name": full_name, 312 "username": user_name, 313 "email": email, 314 "password": password, 315 "send_notify": send_notify, 316 "must_change_password": change_pw, 317 } 318 319 self.logger.debug("Gitea post payload: %s", request_data) 320 result = self.requests_post(AllSpice.ADMIN_CREATE_USER, data=request_data) 321 if "id" in result: 322 self.logger.info( 323 "Successfully created User %s <%s> (id %s)", 324 result["login"], 325 result["email"], 326 result["id"], 327 ) 328 self.logger.debug("Gitea response: %s", result) 329 else: 330 self.logger.error(result["message"]) 331 raise Exception("User not created... (gitea: %s)" % result["message"]) 332 user = User.parse_response(self, result) 333 return user 334 335 def create_repo( 336 self, 337 repoOwner: Union[User, Organization], 338 repoName: str, 339 description: str = "", 340 private: bool = False, 341 autoInit=True, 342 gitignores: Optional[str] = None, 343 license: Optional[str] = None, 344 readme: str = "Default", 345 issue_labels: Optional[str] = None, 346 default_branch="master", 347 ): 348 """Create a Repository as the administrator 349 350 Throws: 351 AlreadyExistsException: If the Repository exists already. 352 Exception: If something else went wrong. 353 354 Note: 355 Non-admin users can not use this method. Please use instead 356 `allspice.User.create_repo` or `allspice.Organization.create_repo`. 357 """ 358 # although this only says user in the api, this also works for 359 # organizations 360 assert isinstance(repoOwner, User) or isinstance(repoOwner, Organization) 361 result = self.requests_post( 362 AllSpice.ADMIN_REPO_CREATE % repoOwner.username, 363 data={ 364 "name": repoName, 365 "description": description, 366 "private": private, 367 "auto_init": autoInit, 368 "gitignores": gitignores, 369 "license": license, 370 "issue_labels": issue_labels, 371 "readme": readme, 372 "default_branch": default_branch, 373 }, 374 ) 375 if "id" in result: 376 self.logger.info("Successfully created Repository %s " % result["name"]) 377 else: 378 self.logger.error(result["message"]) 379 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 380 return Repository.parse_response(self, result) 381 382 def create_org( 383 self, 384 owner: User, 385 orgName: str, 386 description: str, 387 location="", 388 website="", 389 full_name="", 390 ): 391 assert isinstance(owner, User) 392 result = self.requests_post( 393 AllSpice.CREATE_ORG % owner.username, 394 data={ 395 "username": orgName, 396 "description": description, 397 "location": location, 398 "website": website, 399 "full_name": full_name, 400 }, 401 ) 402 if "id" in result: 403 self.logger.info("Successfully created Organization %s" % result["username"]) 404 else: 405 self.logger.error("Organization not created... (gitea: %s)" % result["message"]) 406 self.logger.error(result["message"]) 407 raise Exception("Organization not created... (gitea: %s)" % result["message"]) 408 return Organization.parse_response(self, result) 409 410 def create_team( 411 self, 412 org: Organization, 413 name: str, 414 description: str = "", 415 permission: str = "read", 416 can_create_org_repo: bool = False, 417 includes_all_repositories: bool = False, 418 units=( 419 "repo.code", 420 "repo.issues", 421 "repo.ext_issues", 422 "repo.wiki", 423 "repo.pulls", 424 "repo.releases", 425 "repo.ext_wiki", 426 ), 427 units_map={}, 428 ): 429 """Creates a Team. 430 431 Args: 432 org (Organization): Organization the Team will be part of. 433 name (str): The Name of the Team to be created. 434 description (str): Optional, None, short description of the new Team. 435 permission (str): Optional, 'read', What permissions the members 436 units_map (dict): Optional, {}, a mapping of units to their 437 permissions. If None or empty, the `permission` permission will 438 be applied to all units. Note: When both `units` and `units_map` 439 are given, `units_map` will be preferred. 440 """ 441 442 result = self.requests_post( 443 AllSpice.CREATE_TEAM % org.username, 444 data={ 445 "name": name, 446 "description": description, 447 "permission": permission, 448 "can_create_org_repo": can_create_org_repo, 449 "includes_all_repositories": includes_all_repositories, 450 "units": units, 451 "units_map": units_map, 452 }, 453 ) 454 455 if "id" in result: 456 self.logger.info("Successfully created Team %s" % result["name"]) 457 else: 458 self.logger.error("Team not created... (gitea: %s)" % result["message"]) 459 self.logger.error(result["message"]) 460 raise Exception("Team not created... (gitea: %s)" % result["message"]) 461 api_object = Team.parse_response(self, result) 462 setattr( 463 api_object, "_organization", org 464 ) # fixes strange behaviour of gitea not returning a valid organization here. 465 return api_object
Object to establish a session with AllSpice Hub.
32 def __init__( 33 self, 34 allspice_hub_url="https://hub.allspice.io", 35 token_text=None, 36 auth=None, 37 verify=True, 38 log_level="INFO", 39 ratelimiting=(100, 60), 40 ): 41 """Initializing an instance of the AllSpice Hub Client 42 43 Args: 44 allspice_hub_url (str): The URL for the AllSpice Hub instance. 45 Defaults to `https://hub.allspice.io`. 46 47 token_text (str, None): The access token, by default None. 48 49 auth (tuple, None): The user credentials 50 `(username, password)`, by default None. 51 52 verify (bool): If True, allow insecure server connections 53 when using SSL. 54 55 log_level (str): The log level, by default `INFO`. 56 57 ratelimiting (tuple[int, int], None): `(max_calls, period)`, 58 If None, no rate limiting is applied. By default, 100 calls 59 per minute are allowed. 60 """ 61 62 self.logger = logging.getLogger(__name__) 63 self.logger.setLevel(log_level) 64 self.headers = { 65 "Content-type": "application/json", 66 } 67 self.url = allspice_hub_url 68 69 if ratelimiting is None: 70 self.requests = requests.Session() 71 else: 72 (max_calls, period) = ratelimiting 73 self.requests = RateLimitedSession(max_calls=max_calls, period=period) 74 75 # Manage authentification 76 if not token_text and not auth: 77 raise ValueError("Please provide auth or token_text, but not both") 78 if token_text: 79 self.headers["Authorization"] = "token " + token_text 80 if auth: 81 self.logger.warning( 82 "Using basic auth is not recommended. Prefer using a token instead." 83 ) 84 self.requests.auth = auth 85 86 # Manage SSL certification verification 87 self.requests.verify = verify 88 if not verify: 89 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
Initializing an instance of the AllSpice Hub Client
Args:
allspice_hub_url (str): The URL for the AllSpice Hub instance.
Defaults to https://hub.allspice.io
.
token_text (str, None): The access token, by default None.
auth (tuple, None): The user credentials
`(username, password)`, by default None.
verify (bool): If True, allow insecure server connections
when using SSL.
log_level (str): The log level, by default `INFO`.
ratelimiting (tuple[int, int], None): `(max_calls, period)`,
If None, no rate limiting is applied. By default, 100 calls
per minute are allowed.
113 @staticmethod 114 def parse_result(result) -> Dict: 115 """Parses the result-JSON to a dict.""" 116 if result.text and len(result.text) > 3: 117 return json.loads(result.text) 118 return {}
Parses the result-JSON to a dict.
134 def requests_get_paginated( 135 self, 136 endpoint: str, 137 params=frozendict(), 138 sudo=None, 139 page_key: str = "page", 140 first_page: int = 1, 141 ): 142 page = first_page 143 combined_params = {} 144 combined_params.update(params) 145 aggregated_result = [] 146 while True: 147 combined_params[page_key] = page 148 result = self.requests_get(endpoint, combined_params, sudo) 149 150 if not result: 151 return aggregated_result 152 153 if isinstance(result, dict): 154 if "data" in result: 155 data = result["data"] 156 if len(data) == 0: 157 return aggregated_result 158 aggregated_result.extend(data) 159 elif "tree" in result: 160 data = result["tree"] 161 if data is None or len(data) == 0: 162 return aggregated_result 163 aggregated_result.extend(data) 164 else: 165 raise NotImplementedError( 166 "requests_get_paginated does not know how to handle responses of this type." 167 ) 168 else: 169 aggregated_result.extend(result) 170 171 page += 1
173 def requests_put(self, endpoint: str, data: Optional[dict] = None): 174 if not data: 175 data = {} 176 request = self.requests.put( 177 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 178 ) 179 if request.status_code not in [200, 204]: 180 message = f"Received status code: {request.status_code} ({request.url}) {request.text}" 181 self.logger.error(message) 182 raise Exception(message)
184 def requests_delete(self, endpoint: str, data: Optional[dict] = None): 185 request = self.requests.delete( 186 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 187 ) 188 if request.status_code not in [200, 204]: 189 message = f"Received status code: {request.status_code} ({request.url})" 190 self.logger.error(message) 191 raise Exception(message)
193 def requests_post( 194 self, 195 endpoint: str, 196 data: Optional[dict] = None, 197 params: Optional[dict] = None, 198 files: Optional[dict] = None, 199 ): 200 """ 201 Make a POST call to the endpoint. 202 203 :param endpoint: The path to the endpoint 204 :param data: A dictionary for JSON data 205 :param params: A dictionary of query params 206 :param files: A dictionary of files, see requests.post. Using both files and data 207 can lead to unexpected results! 208 :return: The JSON response parsed as a dict 209 """ 210 211 # This should ideally be a TypedDict of the type of arguments taken by 212 # `requests.post`. 213 args: dict[str, Any] = { 214 "headers": self.headers.copy(), 215 } 216 if data is not None: 217 args["data"] = json.dumps(data) 218 if params is not None: 219 args["params"] = params 220 if files is not None: 221 args["headers"].pop("Content-type") 222 args["files"] = files 223 224 request = self.requests.post(self.__get_url(endpoint), **args) 225 226 if request.status_code not in [200, 201, 202]: 227 if "already exists" in request.text or "e-mail already in use" in request.text: 228 self.logger.warning(request.text) 229 raise AlreadyExistsException() 230 self.logger.error(f"Received status code: {request.status_code} ({request.url})") 231 self.logger.error(f"With info: {data} ({self.headers})") 232 self.logger.error(f"Answer: {request.text}") 233 raise Exception( 234 f"Received status code: {request.status_code} ({request.url}), {request.text}" 235 ) 236 return self.parse_result(request)
Make a POST call to the endpoint.
Parameters
- endpoint: The path to the endpoint
- data: A dictionary for JSON data
- params: A dictionary of query params
- files: A dictionary of files, see requests.post. Using both files and data can lead to unexpected results!
Returns
The JSON response parsed as a dict
238 def requests_patch(self, endpoint: str, data: dict): 239 request = self.requests.patch( 240 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 241 ) 242 if request.status_code not in [200, 201]: 243 error_message = f"Received status code: {request.status_code} ({request.url}) {data}" 244 self.logger.error(error_message) 245 raise Exception(error_message) 246 return self.parse_result(request)
288 def create_user( 289 self, 290 user_name: str, 291 email: str, 292 password: str, 293 full_name: Optional[str] = None, 294 login_name: Optional[str] = None, 295 change_pw=True, 296 send_notify=True, 297 source_id=0, 298 ): 299 """Create User. 300 Throws: 301 AlreadyExistsException, if the User exists already 302 Exception, if something else went wrong. 303 """ 304 if not login_name: 305 login_name = user_name 306 if not full_name: 307 full_name = user_name 308 request_data = { 309 "source_id": source_id, 310 "login_name": login_name, 311 "full_name": full_name, 312 "username": user_name, 313 "email": email, 314 "password": password, 315 "send_notify": send_notify, 316 "must_change_password": change_pw, 317 } 318 319 self.logger.debug("Gitea post payload: %s", request_data) 320 result = self.requests_post(AllSpice.ADMIN_CREATE_USER, data=request_data) 321 if "id" in result: 322 self.logger.info( 323 "Successfully created User %s <%s> (id %s)", 324 result["login"], 325 result["email"], 326 result["id"], 327 ) 328 self.logger.debug("Gitea response: %s", result) 329 else: 330 self.logger.error(result["message"]) 331 raise Exception("User not created... (gitea: %s)" % result["message"]) 332 user = User.parse_response(self, result) 333 return user
Create User. Throws: AlreadyExistsException, if the User exists already Exception, if something else went wrong.
335 def create_repo( 336 self, 337 repoOwner: Union[User, Organization], 338 repoName: str, 339 description: str = "", 340 private: bool = False, 341 autoInit=True, 342 gitignores: Optional[str] = None, 343 license: Optional[str] = None, 344 readme: str = "Default", 345 issue_labels: Optional[str] = None, 346 default_branch="master", 347 ): 348 """Create a Repository as the administrator 349 350 Throws: 351 AlreadyExistsException: If the Repository exists already. 352 Exception: If something else went wrong. 353 354 Note: 355 Non-admin users can not use this method. Please use instead 356 `allspice.User.create_repo` or `allspice.Organization.create_repo`. 357 """ 358 # although this only says user in the api, this also works for 359 # organizations 360 assert isinstance(repoOwner, User) or isinstance(repoOwner, Organization) 361 result = self.requests_post( 362 AllSpice.ADMIN_REPO_CREATE % repoOwner.username, 363 data={ 364 "name": repoName, 365 "description": description, 366 "private": private, 367 "auto_init": autoInit, 368 "gitignores": gitignores, 369 "license": license, 370 "issue_labels": issue_labels, 371 "readme": readme, 372 "default_branch": default_branch, 373 }, 374 ) 375 if "id" in result: 376 self.logger.info("Successfully created Repository %s " % result["name"]) 377 else: 378 self.logger.error(result["message"]) 379 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 380 return Repository.parse_response(self, result)
Create a Repository as the administrator
Throws: AlreadyExistsException: If the Repository exists already. Exception: If something else went wrong.
Note:
Non-admin users can not use this method. Please use instead
allspice.User.create_repo
or allspice.Organization.create_repo
.
382 def create_org( 383 self, 384 owner: User, 385 orgName: str, 386 description: str, 387 location="", 388 website="", 389 full_name="", 390 ): 391 assert isinstance(owner, User) 392 result = self.requests_post( 393 AllSpice.CREATE_ORG % owner.username, 394 data={ 395 "username": orgName, 396 "description": description, 397 "location": location, 398 "website": website, 399 "full_name": full_name, 400 }, 401 ) 402 if "id" in result: 403 self.logger.info("Successfully created Organization %s" % result["username"]) 404 else: 405 self.logger.error("Organization not created... (gitea: %s)" % result["message"]) 406 self.logger.error(result["message"]) 407 raise Exception("Organization not created... (gitea: %s)" % result["message"]) 408 return Organization.parse_response(self, result)
410 def create_team( 411 self, 412 org: Organization, 413 name: str, 414 description: str = "", 415 permission: str = "read", 416 can_create_org_repo: bool = False, 417 includes_all_repositories: bool = False, 418 units=( 419 "repo.code", 420 "repo.issues", 421 "repo.ext_issues", 422 "repo.wiki", 423 "repo.pulls", 424 "repo.releases", 425 "repo.ext_wiki", 426 ), 427 units_map={}, 428 ): 429 """Creates a Team. 430 431 Args: 432 org (Organization): Organization the Team will be part of. 433 name (str): The Name of the Team to be created. 434 description (str): Optional, None, short description of the new Team. 435 permission (str): Optional, 'read', What permissions the members 436 units_map (dict): Optional, {}, a mapping of units to their 437 permissions. If None or empty, the `permission` permission will 438 be applied to all units. Note: When both `units` and `units_map` 439 are given, `units_map` will be preferred. 440 """ 441 442 result = self.requests_post( 443 AllSpice.CREATE_TEAM % org.username, 444 data={ 445 "name": name, 446 "description": description, 447 "permission": permission, 448 "can_create_org_repo": can_create_org_repo, 449 "includes_all_repositories": includes_all_repositories, 450 "units": units, 451 "units_map": units_map, 452 }, 453 ) 454 455 if "id" in result: 456 self.logger.info("Successfully created Team %s" % result["name"]) 457 else: 458 self.logger.error("Team not created... (gitea: %s)" % result["message"]) 459 self.logger.error(result["message"]) 460 raise Exception("Team not created... (gitea: %s)" % result["message"]) 461 api_object = Team.parse_response(self, result) 462 setattr( 463 api_object, "_organization", org 464 ) # fixes strange behaviour of gitea not returning a valid organization here. 465 return api_object
Creates a Team.
Args:
org (Organization): Organization the Team will be part of.
name (str): The Name of the Team to be created.
description (str): Optional, None, short description of the new Team.
permission (str): Optional, 'read', What permissions the members
units_map (dict): Optional, {}, a mapping of units to their
permissions. If None or empty, the permission
permission will
be applied to all units. Note: When both units
and units_map
are given, units_map
will be preferred.
Common base class for all non-exit exceptions.
419class Branch(ReadonlyApiObject): 420 commit: Dict[str, Optional[Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]]] 421 effective_branch_protection_name: str 422 enable_status_check: bool 423 name: str 424 protected: bool 425 required_approvals: int 426 status_check_contexts: List[Any] 427 user_can_merge: bool 428 user_can_push: bool 429 430 API_OBJECT = """/repos/{owner}/{repo}/branches/{branch}""" 431 432 def __init__(self, allspice_client): 433 super().__init__(allspice_client) 434 435 def __eq__(self, other): 436 if not isinstance(other, Branch): 437 return False 438 return self.commit == other.commit and self.name == other.name 439 440 def __hash__(self): 441 return hash(self.commit["id"]) ^ hash(self.name) 442 443 _fields_to_parsers: ClassVar[dict] = { 444 # This is not a commit object 445 # "commit": lambda allspice_client, c: Commit.parse_response(allspice_client, c) 446 } 447 448 @classmethod 449 def request(cls, allspice_client, owner: str, repo: str, branch: str): 450 return cls._request(allspice_client, {"owner": owner, "repo": repo, "branch": branch})
Inherited Members
1565class Comment(ApiObject): 1566 assets: List[Union[Any, Dict[str, Union[int, str]]]] 1567 body: str 1568 created_at: datetime 1569 html_url: str 1570 id: int 1571 issue_url: str 1572 original_author: str 1573 original_author_id: int 1574 pull_request_url: str 1575 updated_at: datetime 1576 user: User 1577 1578 API_OBJECT = """/repos/{owner}/{repo}/issues/comments/{id}""" 1579 GET_ATTACHMENTS_PATH = """/repos/{owner}/{repo}/issues/comments/{id}/assets""" 1580 ATTACHMENT_PATH = """/repos/{owner}/{repo}/issues/comments/{id}/assets/{attachment_id}""" 1581 1582 def __init__(self, allspice_client): 1583 super().__init__(allspice_client) 1584 1585 def __eq__(self, other): 1586 if not isinstance(other, Comment): 1587 return False 1588 return self.repository == other.repository and self.id == other.id 1589 1590 def __hash__(self): 1591 return hash(self.repository) ^ hash(self.id) 1592 1593 @classmethod 1594 def request(cls, allspice_client, owner: str, repo: str, id: str) -> "Comment": 1595 return cls._request(allspice_client, {"owner": owner, "repo": repo, "id": id}) 1596 1597 _fields_to_parsers: ClassVar[dict] = { 1598 "user": lambda allspice_client, r: User.parse_response(allspice_client, r), 1599 "created_at": lambda _, t: Util.convert_time(t), 1600 "updated_at": lambda _, t: Util.convert_time(t), 1601 } 1602 1603 _patchable_fields: ClassVar[set[str]] = {"body"} 1604 1605 @property 1606 def parent_url(self) -> str: 1607 """URL of the parent of this comment (the issue or the pull request)""" 1608 1609 if self.issue_url is not None and self.issue_url != "": 1610 return self.issue_url 1611 else: 1612 return self.pull_request_url 1613 1614 @cached_property 1615 def repository(self) -> Repository: 1616 """The repository this comment was posted on.""" 1617 1618 owner_name, repo_name = self.parent_url.split("/")[-4:-2] 1619 return Repository.request(self.allspice_client, owner_name, repo_name) 1620 1621 def __fields_for_path(self): 1622 return { 1623 "owner": self.repository.owner.username, 1624 "repo": self.repository.name, 1625 "id": self.id, 1626 } 1627 1628 def commit(self): 1629 self._commit(self.__fields_for_path()) 1630 1631 def delete(self): 1632 self.allspice_client.requests_delete(self.API_OBJECT.format(**self.__fields_for_path())) 1633 self.deleted = True 1634 1635 def get_attachments(self) -> List[Attachment]: 1636 """ 1637 Get all attachments on this comment. This returns Attachment objects, which 1638 contain a link to download the attachment. 1639 1640 https://hub.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments 1641 """ 1642 1643 results = self.allspice_client.requests_get( 1644 self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()) 1645 ) 1646 return [Attachment.parse_response(self.allspice_client, result) for result in results] 1647 1648 def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment: 1649 """ 1650 Create an attachment on this comment. 1651 1652 https://hub.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment 1653 1654 :param file: The file to attach. This should be a file-like object. 1655 :param name: The name of the file. If not provided, the name of the file will be 1656 used. 1657 :return: The created attachment. 1658 """ 1659 1660 args: dict[str, Any] = { 1661 "files": {"attachment": file}, 1662 } 1663 if name is not None: 1664 args["params"] = {"name": name} 1665 1666 result = self.allspice_client.requests_post( 1667 self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()), 1668 **args, 1669 ) 1670 return Attachment.parse_response(self.allspice_client, result) 1671 1672 def edit_attachment(self, attachment: Attachment, data: dict) -> Attachment: 1673 """ 1674 Edit an attachment. 1675 1676 The list of params that can be edited is available at 1677 https://hub.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment 1678 1679 :param attachment: The attachment to be edited 1680 :param data: The data parameter should be a dictionary of the fields to edit. 1681 :return: The edited attachment 1682 """ 1683 1684 args = { 1685 **self.__fields_for_path(), 1686 "attachment_id": attachment.id, 1687 } 1688 result = self.allspice_client.requests_patch( 1689 self.ATTACHMENT_PATH.format(**args), 1690 data=data, 1691 ) 1692 return Attachment.parse_response(self.allspice_client, result) 1693 1694 def delete_attachment(self, attachment: Attachment): 1695 """https://hub.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment""" 1696 1697 args = { 1698 **self.__fields_for_path(), 1699 "attachment_id": attachment.id, 1700 } 1701 self.allspice_client.requests_delete(self.ATTACHMENT_PATH.format(**args)) 1702 attachment.deleted = True
1605 @property 1606 def parent_url(self) -> str: 1607 """URL of the parent of this comment (the issue or the pull request)""" 1608 1609 if self.issue_url is not None and self.issue_url != "": 1610 return self.issue_url 1611 else: 1612 return self.pull_request_url
URL of the parent of this comment (the issue or the pull request)
1614 @cached_property 1615 def repository(self) -> Repository: 1616 """The repository this comment was posted on.""" 1617 1618 owner_name, repo_name = self.parent_url.split("/")[-4:-2] 1619 return Repository.request(self.allspice_client, owner_name, repo_name)
The repository this comment was posted on.
1635 def get_attachments(self) -> List[Attachment]: 1636 """ 1637 Get all attachments on this comment. This returns Attachment objects, which 1638 contain a link to download the attachment. 1639 1640 https://hub.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments 1641 """ 1642 1643 results = self.allspice_client.requests_get( 1644 self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()) 1645 ) 1646 return [Attachment.parse_response(self.allspice_client, result) for result in results]
Get all attachments on this comment. This returns Attachment objects, which contain a link to download the attachment.
allspice.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments">https://huballspice.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments
1648 def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment: 1649 """ 1650 Create an attachment on this comment. 1651 1652 https://hub.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment 1653 1654 :param file: The file to attach. This should be a file-like object. 1655 :param name: The name of the file. If not provided, the name of the file will be 1656 used. 1657 :return: The created attachment. 1658 """ 1659 1660 args: dict[str, Any] = { 1661 "files": {"attachment": file}, 1662 } 1663 if name is not None: 1664 args["params"] = {"name": name} 1665 1666 result = self.allspice_client.requests_post( 1667 self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()), 1668 **args, 1669 ) 1670 return Attachment.parse_response(self.allspice_client, result)
Create an attachment on this comment.
allspice.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment
Parameters
- file: The file to attach. This should be a file-like object.
- name: The name of the file. If not provided, the name of the file will be used.
Returns
The created attachment.
1672 def edit_attachment(self, attachment: Attachment, data: dict) -> Attachment: 1673 """ 1674 Edit an attachment. 1675 1676 The list of params that can be edited is available at 1677 https://hub.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment 1678 1679 :param attachment: The attachment to be edited 1680 :param data: The data parameter should be a dictionary of the fields to edit. 1681 :return: The edited attachment 1682 """ 1683 1684 args = { 1685 **self.__fields_for_path(), 1686 "attachment_id": attachment.id, 1687 } 1688 result = self.allspice_client.requests_patch( 1689 self.ATTACHMENT_PATH.format(**args), 1690 data=data, 1691 ) 1692 return Attachment.parse_response(self.allspice_client, result)
Edit an attachment.
The list of params that can be edited is available at allspice.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment
Parameters
- attachment: The attachment to be edited
- data: The data parameter should be a dictionary of the fields to edit.
Returns
The edited attachment
1694 def delete_attachment(self, attachment: Attachment): 1695 """https://hub.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment""" 1696 1697 args = { 1698 **self.__fields_for_path(), 1699 "attachment_id": attachment.id, 1700 } 1701 self.allspice_client.requests_delete(self.ATTACHMENT_PATH.format(**args)) 1702 attachment.deleted = True
allspice.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment
1705class Commit(ReadonlyApiObject): 1706 author: User 1707 commit: Dict[str, Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]] 1708 committer: Dict[str, Union[int, str, bool]] 1709 created: str 1710 files: List[Dict[str, str]] 1711 html_url: str 1712 inner_commit: Dict[str, Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]] 1713 parents: List[Union[Dict[str, str], Any]] 1714 sha: str 1715 stats: Dict[str, int] 1716 url: str 1717 1718 API_OBJECT = """/repos/{owner}/{repo}/commits/{sha}""" 1719 COMMIT_GET_STATUS = """/repos/{owner}/{repo}/commits/{sha}/status""" 1720 COMMIT_GET_STATUSES = """/repos/{owner}/{repo}/commits/{sha}/statuses""" 1721 1722 # Regex to extract owner and repo names from the url property 1723 URL_REGEXP = re.compile(r"/repos/([^/]+)/([^/]+)/git/commits") 1724 1725 def __init__(self, allspice_client): 1726 super().__init__(allspice_client) 1727 1728 _fields_to_parsers: ClassVar[dict] = { 1729 # NOTE: api may return None for commiters that are no allspice users 1730 "author": lambda allspice_client, u: ( 1731 User.parse_response(allspice_client, u) if u else None 1732 ) 1733 } 1734 1735 def __eq__(self, other): 1736 if not isinstance(other, Commit): 1737 return False 1738 return self.sha == other.sha 1739 1740 def __hash__(self): 1741 return hash(self.sha) 1742 1743 @classmethod 1744 def parse_response(cls, allspice_client, result) -> "Commit": 1745 commit_cache = result["commit"] 1746 api_object = cls(allspice_client) 1747 cls._initialize(allspice_client, api_object, result) 1748 # inner_commit for legacy reasons 1749 Commit._add_read_property("inner_commit", commit_cache, api_object) 1750 return api_object 1751 1752 def get_status(self) -> CommitCombinedStatus: 1753 """ 1754 Get a combined status consisting of all statues on this commit. 1755 1756 Note that the returned object is a CommitCombinedStatus object, which 1757 also contains a list of all statuses on the commit. 1758 1759 https://hub.allspice.io/api/swagger#/repository/repoGetCommitStatus 1760 """ 1761 1762 result = self.allspice_client.requests_get( 1763 self.COMMIT_GET_STATUS.format(**self._fields_for_path) 1764 ) 1765 return CommitCombinedStatus.parse_response(self.allspice_client, result) 1766 1767 def get_statuses(self) -> List[CommitStatus]: 1768 """ 1769 Get a list of all statuses on this commit. 1770 1771 https://hub.allspice.io/api/swagger#/repository/repoListCommitStatuses 1772 """ 1773 1774 results = self.allspice_client.requests_get( 1775 self.COMMIT_GET_STATUSES.format(**self._fields_for_path) 1776 ) 1777 return [CommitStatus.parse_response(self.allspice_client, result) for result in results] 1778 1779 @cached_property 1780 def _fields_for_path(self) -> dict[str, str]: 1781 matches = self.URL_REGEXP.search(self.url) 1782 if not matches: 1783 raise ValueError(f"Invalid commit URL: {self.url}") 1784 1785 return { 1786 "owner": matches.group(1), 1787 "repo": matches.group(2), 1788 "sha": self.sha, 1789 }
1743 @classmethod 1744 def parse_response(cls, allspice_client, result) -> "Commit": 1745 commit_cache = result["commit"] 1746 api_object = cls(allspice_client) 1747 cls._initialize(allspice_client, api_object, result) 1748 # inner_commit for legacy reasons 1749 Commit._add_read_property("inner_commit", commit_cache, api_object) 1750 return api_object
1752 def get_status(self) -> CommitCombinedStatus: 1753 """ 1754 Get a combined status consisting of all statues on this commit. 1755 1756 Note that the returned object is a CommitCombinedStatus object, which 1757 also contains a list of all statuses on the commit. 1758 1759 https://hub.allspice.io/api/swagger#/repository/repoGetCommitStatus 1760 """ 1761 1762 result = self.allspice_client.requests_get( 1763 self.COMMIT_GET_STATUS.format(**self._fields_for_path) 1764 ) 1765 return CommitCombinedStatus.parse_response(self.allspice_client, result)
Get a combined status consisting of all statues on this commit.
Note that the returned object is a CommitCombinedStatus object, which also contains a list of all statuses on the commit.
allspice.allspice.io/api/swagger#/repository/repoGetCommitStatus">https://huballspice.allspice.io/api/swagger#/repository/repoGetCommitStatus
1767 def get_statuses(self) -> List[CommitStatus]: 1768 """ 1769 Get a list of all statuses on this commit. 1770 1771 https://hub.allspice.io/api/swagger#/repository/repoListCommitStatuses 1772 """ 1773 1774 results = self.allspice_client.requests_get( 1775 self.COMMIT_GET_STATUSES.format(**self._fields_for_path) 1776 ) 1777 return [CommitStatus.parse_response(self.allspice_client, result) for result in results]
Get a list of all statuses on this commit.
allspice.allspice.io/api/swagger#/repository/repoListCommitStatuses">https://huballspice.allspice.io/api/swagger#/repository/repoListCommitStatuses
Inherited Members
2527class Content(ReadonlyApiObject): 2528 content: Any 2529 download_url: str 2530 encoding: Any 2531 git_url: str 2532 html_url: str 2533 last_commit_sha: str 2534 name: str 2535 path: str 2536 sha: str 2537 size: int 2538 submodule_git_url: Any 2539 target: Any 2540 type: str 2541 url: str 2542 2543 FILE = "file" 2544 2545 def __init__(self, allspice_client): 2546 super().__init__(allspice_client) 2547 2548 def __eq__(self, other): 2549 if not isinstance(other, Content): 2550 return False 2551 2552 return self.sha == other.sha and self.name == other.name 2553 2554 def __hash__(self): 2555 return hash(self.sha) ^ hash(self.name)
Inherited Members
2026class DesignReview(ApiObject): 2027 """ 2028 A Design Review. See 2029 https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest. 2030 2031 Note: The base and head fields are not `Branch` objects - they are plain strings 2032 referring to the branch names. This is because DRs can exist for branches that have 2033 been deleted, which don't have an associated `Branch` object from the API. You can use 2034 the `Repository.get_branch` method to get a `Branch` object for a branch if you know 2035 it exists. 2036 """ 2037 2038 additions: int 2039 allow_maintainer_edit: bool 2040 allow_maintainer_edits: Any 2041 assignee: User 2042 assignees: List["User"] 2043 base: str 2044 body: str 2045 changed_files: int 2046 closed_at: Any 2047 comments: int 2048 created_at: str 2049 deletions: int 2050 diff_url: str 2051 draft: bool 2052 due_date: Optional[str] 2053 head: str 2054 html_url: str 2055 id: int 2056 is_locked: bool 2057 labels: List[Any] 2058 merge_base: str 2059 merge_commit_sha: Any 2060 mergeable: bool 2061 merged: bool 2062 merged_at: Any 2063 merged_by: Any 2064 milestone: Any 2065 number: int 2066 patch_url: str 2067 pin_order: int 2068 repository: Optional["Repository"] 2069 requested_reviewers: Any 2070 review_comments: int 2071 state: str 2072 title: str 2073 updated_at: str 2074 url: str 2075 user: User 2076 2077 API_OBJECT = "/repos/{owner}/{repo}/pulls/{index}" 2078 MERGE_DESIGN_REVIEW = "/repos/{owner}/{repo}/pulls/{index}/merge" 2079 GET_COMMENTS = "/repos/{owner}/{repo}/issues/{index}/comments" 2080 2081 OPEN = "open" 2082 CLOSED = "closed" 2083 2084 class MergeType(Enum): 2085 MERGE = "merge" 2086 REBASE = "rebase" 2087 REBASE_MERGE = "rebase-merge" 2088 SQUASH = "squash" 2089 MANUALLY_MERGED = "manually-merged" 2090 2091 def __init__(self, allspice_client): 2092 super().__init__(allspice_client) 2093 2094 def __eq__(self, other): 2095 if not isinstance(other, DesignReview): 2096 return False 2097 return self.repository == other.repository and self.id == other.id 2098 2099 def __hash__(self): 2100 return hash(self.repository) ^ hash(self.id) 2101 2102 @classmethod 2103 def parse_response(cls, allspice_client, result) -> "DesignReview": 2104 api_object = super().parse_response(allspice_client, result) 2105 cls._add_read_property( 2106 "repository", 2107 Repository.parse_response(allspice_client, result["base"]["repo"]), 2108 api_object, 2109 ) 2110 2111 return api_object 2112 2113 @classmethod 2114 def request(cls, allspice_client, owner: str, repo: str, number: str): 2115 """See https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest""" 2116 return cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number}) 2117 2118 _fields_to_parsers: ClassVar[dict] = { 2119 "assignee": lambda allspice_client, u: User.parse_response(allspice_client, u), 2120 "assignees": lambda allspice_client, us: [ 2121 User.parse_response(allspice_client, u) for u in us 2122 ], 2123 "base": lambda _, b: b["ref"], 2124 "head": lambda _, h: h["ref"], 2125 "merged_by": lambda allspice_client, u: User.parse_response(allspice_client, u), 2126 "milestone": lambda allspice_client, m: Milestone.parse_response(allspice_client, m), 2127 "user": lambda allspice_client, u: User.parse_response(allspice_client, u), 2128 } 2129 2130 _patchable_fields: ClassVar[set[str]] = { 2131 "allow_maintainer_edits", 2132 "assignee", 2133 "assignees", 2134 "base", 2135 "body", 2136 "due_date", 2137 "milestone", 2138 "state", 2139 "title", 2140 } 2141 2142 _parsers_to_fields: ClassVar[dict] = { 2143 "assignee": lambda u: u.username, 2144 "assignees": lambda us: [u.username for u in us], 2145 "base": lambda b: b.name if isinstance(b, Branch) else b, 2146 "milestone": lambda m: m.id, 2147 } 2148 2149 def commit(self): 2150 data = self.get_dirty_fields() 2151 if "due_date" in data and data["due_date"] is None: 2152 data["unset_due_date"] = True 2153 2154 args = { 2155 "owner": self.repository.owner.username, 2156 "repo": self.repository.name, 2157 "index": self.number, 2158 } 2159 self._commit(args, data) 2160 2161 def merge(self, merge_type: MergeType): 2162 """ 2163 Merge the pull request. See 2164 https://hub.allspice.io/api/swagger#/repository/repoMergePullRequest 2165 2166 :param merge_type: The type of merge to perform. See the MergeType enum. 2167 """ 2168 2169 self.allspice_client.requests_put( 2170 self.MERGE_DESIGN_REVIEW.format( 2171 owner=self.repository.owner.username, 2172 repo=self.repository.name, 2173 index=self.number, 2174 ), 2175 data={"Do": merge_type.value}, 2176 ) 2177 2178 def get_comments(self) -> List[Comment]: 2179 """ 2180 Get the comments on this pull request, but not specifically on a review. 2181 2182 https://hub.allspice.io/api/swagger#/issue/issueGetComments 2183 2184 :return: A list of comments on this pull request. 2185 """ 2186 2187 results = self.allspice_client.requests_get( 2188 self.GET_COMMENTS.format( 2189 owner=self.repository.owner.username, 2190 repo=self.repository.name, 2191 index=self.number, 2192 ) 2193 ) 2194 return [Comment.parse_response(self.allspice_client, result) for result in results] 2195 2196 def create_comment(self, body: str): 2197 """ 2198 Create a comment on this pull request. This uses the same endpoint as the 2199 comments on issues, and will not be associated with any reviews. 2200 2201 https://hub.allspice.io/api/swagger#/issue/issueCreateComment 2202 2203 :param body: The body of the comment. 2204 :return: The comment that was created. 2205 """ 2206 2207 result = self.allspice_client.requests_post( 2208 self.GET_COMMENTS.format( 2209 owner=self.repository.owner.username, 2210 repo=self.repository.name, 2211 index=self.number, 2212 ), 2213 data={"body": body}, 2214 ) 2215 return Comment.parse_response(self.allspice_client, result)
A Design Review. See allspice.allspice.io/api/swagger#/repository/repoGetPullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoGetPullRequest.
Note: The base and head fields are not Branch
objects - they are plain strings
referring to the branch names. This is because DRs can exist for branches that have
been deleted, which don't have an associated Branch
object from the API. You can use
the Repository.get_branch
method to get a Branch
object for a branch if you know
it exists.
2102 @classmethod 2103 def parse_response(cls, allspice_client, result) -> "DesignReview": 2104 api_object = super().parse_response(allspice_client, result) 2105 cls._add_read_property( 2106 "repository", 2107 Repository.parse_response(allspice_client, result["base"]["repo"]), 2108 api_object, 2109 ) 2110 2111 return api_object
2113 @classmethod 2114 def request(cls, allspice_client, owner: str, repo: str, number: str): 2115 """See https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest""" 2116 return cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number})
See allspice.allspice.io/api/swagger#/repository/repoGetPullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoGetPullRequest
2149 def commit(self): 2150 data = self.get_dirty_fields() 2151 if "due_date" in data and data["due_date"] is None: 2152 data["unset_due_date"] = True 2153 2154 args = { 2155 "owner": self.repository.owner.username, 2156 "repo": self.repository.name, 2157 "index": self.number, 2158 } 2159 self._commit(args, data)
2161 def merge(self, merge_type: MergeType): 2162 """ 2163 Merge the pull request. See 2164 https://hub.allspice.io/api/swagger#/repository/repoMergePullRequest 2165 2166 :param merge_type: The type of merge to perform. See the MergeType enum. 2167 """ 2168 2169 self.allspice_client.requests_put( 2170 self.MERGE_DESIGN_REVIEW.format( 2171 owner=self.repository.owner.username, 2172 repo=self.repository.name, 2173 index=self.number, 2174 ), 2175 data={"Do": merge_type.value}, 2176 )
Merge the pull request. See allspice.allspice.io/api/swagger#/repository/repoMergePullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoMergePullRequest
Parameters
- merge_type: The type of merge to perform. See the MergeType enum.
2178 def get_comments(self) -> List[Comment]: 2179 """ 2180 Get the comments on this pull request, but not specifically on a review. 2181 2182 https://hub.allspice.io/api/swagger#/issue/issueGetComments 2183 2184 :return: A list of comments on this pull request. 2185 """ 2186 2187 results = self.allspice_client.requests_get( 2188 self.GET_COMMENTS.format( 2189 owner=self.repository.owner.username, 2190 repo=self.repository.name, 2191 index=self.number, 2192 ) 2193 ) 2194 return [Comment.parse_response(self.allspice_client, result) for result in results]
Get the comments on this pull request, but not specifically on a review.
allspice.allspice.io/api/swagger#/issue/issueGetComments">https://huballspice.allspice.io/api/swagger#/issue/issueGetComments
Returns
A list of comments on this pull request.
2196 def create_comment(self, body: str): 2197 """ 2198 Create a comment on this pull request. This uses the same endpoint as the 2199 comments on issues, and will not be associated with any reviews. 2200 2201 https://hub.allspice.io/api/swagger#/issue/issueCreateComment 2202 2203 :param body: The body of the comment. 2204 :return: The comment that was created. 2205 """ 2206 2207 result = self.allspice_client.requests_post( 2208 self.GET_COMMENTS.format( 2209 owner=self.repository.owner.username, 2210 repo=self.repository.name, 2211 index=self.number, 2212 ), 2213 data={"body": body}, 2214 ) 2215 return Comment.parse_response(self.allspice_client, result)
Create a comment on this pull request. This uses the same endpoint as the comments on issues, and will not be associated with any reviews.
allspice.allspice.io/api/swagger#/issue/issueCreateComment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateComment
Parameters
- body: The body of the comment.
Returns
The comment that was created.
1880class Issue(ApiObject): 1881 assets: List[Any] 1882 assignee: Any 1883 assignees: Any 1884 body: str 1885 closed_at: Any 1886 comments: int 1887 created_at: str 1888 due_date: Any 1889 html_url: str 1890 id: int 1891 is_locked: bool 1892 labels: List[Any] 1893 milestone: Optional["Milestone"] 1894 number: int 1895 original_author: str 1896 original_author_id: int 1897 pin_order: int 1898 pull_request: Any 1899 ref: str 1900 repository: Dict[str, Union[int, str]] 1901 state: str 1902 title: str 1903 updated_at: str 1904 url: str 1905 user: User 1906 1907 API_OBJECT = """/repos/{owner}/{repo}/issues/{index}""" # <owner, repo, index> 1908 GET_TIME = """/repos/%s/%s/issues/%s/times""" # <owner, repo, index> 1909 GET_COMMENTS = """/repos/{owner}/{repo}/issues/{index}/comments""" 1910 CREATE_ISSUE = """/repos/{owner}/{repo}/issues""" 1911 1912 OPENED = "open" 1913 CLOSED = "closed" 1914 1915 def __init__(self, allspice_client): 1916 super().__init__(allspice_client) 1917 1918 def __eq__(self, other): 1919 if not isinstance(other, Issue): 1920 return False 1921 return self.repository == other.repository and self.id == other.id 1922 1923 def __hash__(self): 1924 return hash(self.repository) ^ hash(self.id) 1925 1926 _fields_to_parsers: ClassVar[dict] = { 1927 "milestone": lambda allspice_client, m: Milestone.parse_response(allspice_client, m), 1928 "user": lambda allspice_client, u: User.parse_response(allspice_client, u), 1929 "assignee": lambda allspice_client, u: User.parse_response(allspice_client, u), 1930 "assignees": lambda allspice_client, us: [ 1931 User.parse_response(allspice_client, u) for u in us 1932 ], 1933 "state": lambda _, s: (Issue.CLOSED if s == "closed" else Issue.OPENED), 1934 } 1935 1936 _parsers_to_fields: ClassVar[dict] = { 1937 "milestone": lambda m: m.id, 1938 } 1939 1940 _patchable_fields: ClassVar[set[str]] = { 1941 "assignee", 1942 "assignees", 1943 "body", 1944 "due_date", 1945 "milestone", 1946 "state", 1947 "title", 1948 } 1949 1950 def commit(self): 1951 args = { 1952 "owner": self.repository.owner.username, 1953 "repo": self.repository.name, 1954 "index": self.number, 1955 } 1956 self._commit(args) 1957 1958 @classmethod 1959 def request(cls, allspice_client, owner: str, repo: str, number: str): 1960 api_object = cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number}) 1961 # The repository in the response is a RepositoryMeta object, so request 1962 # the full repository object and add it to the issue object. 1963 repository = Repository.request(allspice_client, owner, repo) 1964 setattr(api_object, "_repository", repository) 1965 # For legacy reasons 1966 cls._add_read_property("repo", repository, api_object) 1967 return api_object 1968 1969 @classmethod 1970 def create_issue(cls, allspice_client, repo: Repository, title: str, body: str = ""): 1971 args = {"owner": repo.owner.username, "repo": repo.name} 1972 data = {"title": title, "body": body} 1973 result = allspice_client.requests_post(Issue.CREATE_ISSUE.format(**args), data=data) 1974 issue = Issue.parse_response(allspice_client, result) 1975 setattr(issue, "_repository", repo) 1976 cls._add_read_property("repo", repo, issue) 1977 return issue 1978 1979 @property 1980 def owner(self) -> Organization | User: 1981 return self.repository.owner 1982 1983 def get_time_sum(self, user: User) -> int: 1984 results = self.allspice_client.requests_get( 1985 Issue.GET_TIME % (self.owner.username, self.repository.name, self.number) 1986 ) 1987 return sum(result["time"] for result in results if result and result["user_id"] == user.id) 1988 1989 def get_times(self) -> Optional[Dict]: 1990 return self.allspice_client.requests_get( 1991 Issue.GET_TIME % (self.owner.username, self.repository.name, self.number) 1992 ) 1993 1994 def delete_time(self, time_id: str): 1995 path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times/{time_id}" 1996 self.allspice_client.requests_delete(path) 1997 1998 def add_time(self, time: int, created: Optional[str] = None, user_name: Optional[User] = None): 1999 path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times" 2000 self.allspice_client.requests_post( 2001 path, data={"created": created, "time": int(time), "user_name": user_name} 2002 ) 2003 2004 def get_comments(self) -> List[Comment]: 2005 """https://hub.allspice.io/api/swagger#/issue/issueGetComments""" 2006 2007 results = self.allspice_client.requests_get( 2008 self.GET_COMMENTS.format( 2009 owner=self.owner.username, repo=self.repository.name, index=self.number 2010 ) 2011 ) 2012 2013 return [Comment.parse_response(self.allspice_client, result) for result in results] 2014 2015 def create_comment(self, body: str) -> Comment: 2016 """https://hub.allspice.io/api/swagger#/issue/issueCreateComment""" 2017 2018 path = self.GET_COMMENTS.format( 2019 owner=self.owner.username, repo=self.repository.name, index=self.number 2020 ) 2021 2022 response = self.allspice_client.requests_post(path, data={"body": body}) 2023 return Comment.parse_response(self.allspice_client, response)
1958 @classmethod 1959 def request(cls, allspice_client, owner: str, repo: str, number: str): 1960 api_object = cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number}) 1961 # The repository in the response is a RepositoryMeta object, so request 1962 # the full repository object and add it to the issue object. 1963 repository = Repository.request(allspice_client, owner, repo) 1964 setattr(api_object, "_repository", repository) 1965 # For legacy reasons 1966 cls._add_read_property("repo", repository, api_object) 1967 return api_object
1969 @classmethod 1970 def create_issue(cls, allspice_client, repo: Repository, title: str, body: str = ""): 1971 args = {"owner": repo.owner.username, "repo": repo.name} 1972 data = {"title": title, "body": body} 1973 result = allspice_client.requests_post(Issue.CREATE_ISSUE.format(**args), data=data) 1974 issue = Issue.parse_response(allspice_client, result) 1975 setattr(issue, "_repository", repo) 1976 cls._add_read_property("repo", repo, issue) 1977 return issue
1998 def add_time(self, time: int, created: Optional[str] = None, user_name: Optional[User] = None): 1999 path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times" 2000 self.allspice_client.requests_post( 2001 path, data={"created": created, "time": int(time), "user_name": user_name} 2002 )
2004 def get_comments(self) -> List[Comment]: 2005 """https://hub.allspice.io/api/swagger#/issue/issueGetComments""" 2006 2007 results = self.allspice_client.requests_get( 2008 self.GET_COMMENTS.format( 2009 owner=self.owner.username, repo=self.repository.name, index=self.number 2010 ) 2011 ) 2012 2013 return [Comment.parse_response(self.allspice_client, result) for result in results]
allspice.allspice.io/api/swagger#/issue/issueGetComments">https://huballspice.allspice.io/api/swagger#/issue/issueGetComments
2015 def create_comment(self, body: str) -> Comment: 2016 """https://hub.allspice.io/api/swagger#/issue/issueCreateComment""" 2017 2018 path = self.GET_COMMENTS.format( 2019 owner=self.owner.username, repo=self.repository.name, index=self.number 2020 ) 2021 2022 response = self.allspice_client.requests_post(path, data={"body": body}) 2023 return Comment.parse_response(self.allspice_client, response)
allspice.allspice.io/api/swagger#/issue/issueCreateComment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateComment
1449class Milestone(ApiObject): 1450 allow_merge_commits: Any 1451 allow_rebase: Any 1452 allow_rebase_explicit: Any 1453 allow_squash_merge: Any 1454 archived: Any 1455 closed_at: Any 1456 closed_issues: int 1457 created_at: str 1458 default_branch: Any 1459 description: str 1460 due_on: Any 1461 has_issues: Any 1462 has_pull_requests: Any 1463 has_wiki: Any 1464 id: int 1465 ignore_whitespace_conflicts: Any 1466 name: Any 1467 open_issues: int 1468 private: Any 1469 state: str 1470 title: str 1471 updated_at: str 1472 website: Any 1473 1474 API_OBJECT = """/repos/{owner}/{repo}/milestones/{number}""" # <owner, repo> 1475 1476 def __init__(self, allspice_client): 1477 super().__init__(allspice_client) 1478 1479 def __eq__(self, other): 1480 if not isinstance(other, Milestone): 1481 return False 1482 return self.allspice_client == other.allspice_client and self.id == other.id 1483 1484 def __hash__(self): 1485 return hash(self.allspice_client) ^ hash(self.id) 1486 1487 _fields_to_parsers: ClassVar[dict] = { 1488 "closed_at": lambda _, t: Util.convert_time(t), 1489 "due_on": lambda _, t: Util.convert_time(t), 1490 } 1491 1492 _patchable_fields: ClassVar[set[str]] = { 1493 "allow_merge_commits", 1494 "allow_rebase", 1495 "allow_rebase_explicit", 1496 "allow_squash_merge", 1497 "archived", 1498 "default_branch", 1499 "description", 1500 "has_issues", 1501 "has_pull_requests", 1502 "has_wiki", 1503 "ignore_whitespace_conflicts", 1504 "name", 1505 "private", 1506 "website", 1507 } 1508 1509 @classmethod 1510 def request(cls, allspice_client, owner: str, repo: str, number: str): 1511 return cls._request(allspice_client, {"owner": owner, "repo": repo, "number": number})
Common base class for all non-exit exceptions.
33class Organization(ApiObject): 34 """see https://hub.allspice.io/api/swagger#/organization/orgGetAll""" 35 36 active: Optional[bool] 37 avatar_url: str 38 created: Optional[str] 39 description: str 40 email: str 41 followers_count: Optional[int] 42 following_count: Optional[int] 43 full_name: str 44 html_url: Optional[str] 45 id: int 46 is_admin: Optional[bool] 47 language: Optional[str] 48 last_login: Optional[str] 49 location: str 50 login: Optional[str] 51 login_name: Optional[str] 52 name: Optional[str] 53 prohibit_login: Optional[bool] 54 repo_admin_change_team_access: Optional[bool] 55 restricted: Optional[bool] 56 source_id: Optional[int] 57 starred_repos_count: Optional[int] 58 username: str 59 visibility: str 60 website: str 61 62 API_OBJECT = """/orgs/{name}""" # <org> 63 ORG_REPOS_REQUEST = """/orgs/%s/repos""" # <org> 64 ORG_TEAMS_REQUEST = """/orgs/%s/teams""" # <org> 65 ORG_TEAMS_CREATE = """/orgs/%s/teams""" # <org> 66 ORG_GET_MEMBERS = """/orgs/%s/members""" # <org> 67 ORG_IS_MEMBER = """/orgs/%s/members/%s""" # <org>, <username> 68 ORG_HEATMAP = """/users/%s/heatmap""" # <username> 69 70 def __init__(self, allspice_client): 71 super().__init__(allspice_client) 72 73 def __eq__(self, other): 74 if not isinstance(other, Organization): 75 return False 76 return self.allspice_client == other.allspice_client and self.name == other.name 77 78 def __hash__(self): 79 return hash(self.allspice_client) ^ hash(self.name) 80 81 @classmethod 82 def request(cls, allspice_client, name: str) -> Self: 83 return cls._request(allspice_client, {"name": name}) 84 85 @classmethod 86 def parse_response(cls, allspice_client, result) -> "Organization": 87 api_object = super().parse_response(allspice_client, result) 88 # add "name" field to make this behave similar to users for gitea < 1.18 89 # also necessary for repository-owner when org is repo owner 90 if not hasattr(api_object, "name"): 91 Organization._add_read_property("name", result["username"], api_object) 92 return api_object 93 94 _patchable_fields: ClassVar[set[str]] = { 95 "description", 96 "full_name", 97 "location", 98 "visibility", 99 "website", 100 } 101 102 def commit(self): 103 args = {"name": self.name} 104 self._commit(args) 105 106 def create_repo( 107 self, 108 repoName: str, 109 description: str = "", 110 private: bool = False, 111 autoInit=True, 112 gitignores: Optional[str] = None, 113 license: Optional[str] = None, 114 readme: str = "Default", 115 issue_labels: Optional[str] = None, 116 default_branch="master", 117 ): 118 """Create an organization Repository 119 120 Throws: 121 AlreadyExistsException: If the Repository exists already. 122 Exception: If something else went wrong. 123 """ 124 result = self.allspice_client.requests_post( 125 f"/orgs/{self.name}/repos", 126 data={ 127 "name": repoName, 128 "description": description, 129 "private": private, 130 "auto_init": autoInit, 131 "gitignores": gitignores, 132 "license": license, 133 "issue_labels": issue_labels, 134 "readme": readme, 135 "default_branch": default_branch, 136 }, 137 ) 138 if "id" in result: 139 self.allspice_client.logger.info("Successfully created Repository %s " % result["name"]) 140 else: 141 self.allspice_client.logger.error(result["message"]) 142 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 143 return Repository.parse_response(self.allspice_client, result) 144 145 def get_repositories(self) -> List["Repository"]: 146 results = self.allspice_client.requests_get_paginated( 147 Organization.ORG_REPOS_REQUEST % self.username 148 ) 149 return [Repository.parse_response(self.allspice_client, result) for result in results] 150 151 def get_repository(self, name) -> "Repository": 152 repos = self.get_repositories() 153 for repo in repos: 154 if repo.name == name: 155 return repo 156 raise NotFoundException("Repository %s not existent in organization." % name) 157 158 def get_teams(self) -> List["Team"]: 159 results = self.allspice_client.requests_get(Organization.ORG_TEAMS_REQUEST % self.username) 160 teams = [Team.parse_response(self.allspice_client, result) for result in results] 161 # organisation seems to be missing using this request, so we add org manually 162 for t in teams: 163 setattr(t, "_organization", self) 164 return teams 165 166 def get_team(self, name) -> "Team": 167 teams = self.get_teams() 168 for team in teams: 169 if team.name == name: 170 return team 171 raise NotFoundException("Team not existent in organization.") 172 173 def create_team( 174 self, 175 name: str, 176 description: str = "", 177 permission: str = "read", 178 can_create_org_repo: bool = False, 179 includes_all_repositories: bool = False, 180 units=( 181 "repo.code", 182 "repo.issues", 183 "repo.ext_issues", 184 "repo.wiki", 185 "repo.pulls", 186 "repo.releases", 187 "repo.ext_wiki", 188 ), 189 units_map={}, 190 ) -> "Team": 191 """Alias for AllSpice#create_team""" 192 # TODO: Move AllSpice#create_team to Organization#create_team and 193 # deprecate AllSpice#create_team. 194 return self.allspice_client.create_team( 195 org=self, 196 name=name, 197 description=description, 198 permission=permission, 199 can_create_org_repo=can_create_org_repo, 200 includes_all_repositories=includes_all_repositories, 201 units=units, 202 units_map=units_map, 203 ) 204 205 def get_members(self) -> List["User"]: 206 results = self.allspice_client.requests_get(Organization.ORG_GET_MEMBERS % self.username) 207 return [User.parse_response(self.allspice_client, result) for result in results] 208 209 def is_member(self, username) -> bool: 210 if isinstance(username, User): 211 username = username.username 212 try: 213 # returns 204 if its ok, 404 if its not 214 self.allspice_client.requests_get( 215 Organization.ORG_IS_MEMBER % (self.username, username) 216 ) 217 return True 218 except Exception: 219 return False 220 221 def remove_member(self, user: "User"): 222 path = f"/orgs/{self.username}/members/{user.username}" 223 self.allspice_client.requests_delete(path) 224 225 def delete(self): 226 """Delete this Organization. Invalidates this Objects data. Also deletes all Repositories owned by the User""" 227 for repo in self.get_repositories(): 228 repo.delete() 229 self.allspice_client.requests_delete(Organization.API_OBJECT.format(name=self.username)) 230 self.deleted = True 231 232 def get_heatmap(self) -> List[Tuple[datetime, int]]: 233 results = self.allspice_client.requests_get(User.USER_HEATMAP % self.username) 234 results = [ 235 (datetime.fromtimestamp(result["timestamp"]), result["contributions"]) 236 for result in results 237 ] 238 return results
see allspice.allspice.io/api/swagger#/organization/orgGetAll">https://huballspice.allspice.io/api/swagger#/organization/orgGetAll
85 @classmethod 86 def parse_response(cls, allspice_client, result) -> "Organization": 87 api_object = super().parse_response(allspice_client, result) 88 # add "name" field to make this behave similar to users for gitea < 1.18 89 # also necessary for repository-owner when org is repo owner 90 if not hasattr(api_object, "name"): 91 Organization._add_read_property("name", result["username"], api_object) 92 return api_object
106 def create_repo( 107 self, 108 repoName: str, 109 description: str = "", 110 private: bool = False, 111 autoInit=True, 112 gitignores: Optional[str] = None, 113 license: Optional[str] = None, 114 readme: str = "Default", 115 issue_labels: Optional[str] = None, 116 default_branch="master", 117 ): 118 """Create an organization Repository 119 120 Throws: 121 AlreadyExistsException: If the Repository exists already. 122 Exception: If something else went wrong. 123 """ 124 result = self.allspice_client.requests_post( 125 f"/orgs/{self.name}/repos", 126 data={ 127 "name": repoName, 128 "description": description, 129 "private": private, 130 "auto_init": autoInit, 131 "gitignores": gitignores, 132 "license": license, 133 "issue_labels": issue_labels, 134 "readme": readme, 135 "default_branch": default_branch, 136 }, 137 ) 138 if "id" in result: 139 self.allspice_client.logger.info("Successfully created Repository %s " % result["name"]) 140 else: 141 self.allspice_client.logger.error(result["message"]) 142 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 143 return Repository.parse_response(self.allspice_client, result)
Create an organization Repository
Throws: AlreadyExistsException: If the Repository exists already. Exception: If something else went wrong.
158 def get_teams(self) -> List["Team"]: 159 results = self.allspice_client.requests_get(Organization.ORG_TEAMS_REQUEST % self.username) 160 teams = [Team.parse_response(self.allspice_client, result) for result in results] 161 # organisation seems to be missing using this request, so we add org manually 162 for t in teams: 163 setattr(t, "_organization", self) 164 return teams
173 def create_team( 174 self, 175 name: str, 176 description: str = "", 177 permission: str = "read", 178 can_create_org_repo: bool = False, 179 includes_all_repositories: bool = False, 180 units=( 181 "repo.code", 182 "repo.issues", 183 "repo.ext_issues", 184 "repo.wiki", 185 "repo.pulls", 186 "repo.releases", 187 "repo.ext_wiki", 188 ), 189 units_map={}, 190 ) -> "Team": 191 """Alias for AllSpice#create_team""" 192 # TODO: Move AllSpice#create_team to Organization#create_team and 193 # deprecate AllSpice#create_team. 194 return self.allspice_client.create_team( 195 org=self, 196 name=name, 197 description=description, 198 permission=permission, 199 can_create_org_repo=can_create_org_repo, 200 includes_all_repositories=includes_all_repositories, 201 units=units, 202 units_map=units_map, 203 )
Alias for AllSpice#create_team
209 def is_member(self, username) -> bool: 210 if isinstance(username, User): 211 username = username.username 212 try: 213 # returns 204 if its ok, 404 if its not 214 self.allspice_client.requests_get( 215 Organization.ORG_IS_MEMBER % (self.username, username) 216 ) 217 return True 218 except Exception: 219 return False
225 def delete(self): 226 """Delete this Organization. Invalidates this Objects data. Also deletes all Repositories owned by the User""" 227 for repo in self.get_repositories(): 228 repo.delete() 229 self.allspice_client.requests_delete(Organization.API_OBJECT.format(name=self.username)) 230 self.deleted = True
Delete this Organization. Invalidates this Objects data. Also deletes all Repositories owned by the User
2301class Release(ApiObject): 2302 """ 2303 A release on a repo. 2304 """ 2305 2306 assets: List[Union[Any, Dict[str, Union[int, str]], "ReleaseAsset"]] 2307 author: User 2308 body: str 2309 created_at: str 2310 draft: bool 2311 html_url: str 2312 id: int 2313 name: str 2314 prerelease: bool 2315 published_at: str 2316 repo: Optional["Repository"] 2317 repository: Optional["Repository"] 2318 tag_name: str 2319 tarball_url: str 2320 target_commitish: str 2321 upload_url: str 2322 url: str 2323 zipball_url: str 2324 2325 API_OBJECT = "/repos/{owner}/{repo}/releases/{id}" 2326 RELEASE_CREATE_ASSET = "/repos/{owner}/{repo}/releases/{id}/assets" 2327 # Note that we don't strictly need the get_assets route, as the release 2328 # object already contains the assets. 2329 2330 def __init__(self, allspice_client): 2331 super().__init__(allspice_client) 2332 2333 def __eq__(self, other): 2334 if not isinstance(other, Release): 2335 return False 2336 return self.repo == other.repo and self.id == other.id 2337 2338 def __hash__(self): 2339 return hash(self.repo) ^ hash(self.id) 2340 2341 _fields_to_parsers: ClassVar[dict] = { 2342 "author": lambda allspice_client, author: User.parse_response(allspice_client, author), 2343 } 2344 _patchable_fields: ClassVar[set[str]] = { 2345 "body", 2346 "draft", 2347 "name", 2348 "prerelease", 2349 "tag_name", 2350 "target_commitish", 2351 } 2352 2353 @classmethod 2354 def parse_response(cls, allspice_client, result, repo) -> Release: 2355 release = super().parse_response(allspice_client, result) 2356 Release._add_read_property("repository", repo, release) 2357 # For legacy reasons 2358 Release._add_read_property("repo", repo, release) 2359 setattr( 2360 release, 2361 "_assets", 2362 [ 2363 ReleaseAsset.parse_response(allspice_client, asset, release) 2364 for asset in result["assets"] 2365 ], 2366 ) 2367 return release 2368 2369 @classmethod 2370 def request( 2371 cls, 2372 allspice_client, 2373 owner: str, 2374 repo: str, 2375 id: Optional[int] = None, 2376 ) -> Release: 2377 args = {"owner": owner, "repo": repo, "id": id} 2378 release_response = cls._get_gitea_api_object(allspice_client, args) 2379 repository = Repository.request(allspice_client, owner, repo) 2380 release = cls.parse_response(allspice_client, release_response, repository) 2381 return release 2382 2383 def commit(self): 2384 args = {"owner": self.repo.owner.name, "repo": self.repo.name, "id": self.id} 2385 self._commit(args) 2386 2387 def create_asset(self, file: IO, name: Optional[str] = None) -> ReleaseAsset: 2388 """ 2389 Create an asset for this release. 2390 2391 https://hub.allspice.io/api/swagger#/repository/repoCreateReleaseAsset 2392 2393 :param file: The file to upload. This should be a file-like object. 2394 :param name: The name of the file. 2395 :return: The created asset. 2396 """ 2397 2398 args: dict[str, Any] = {"files": {"attachment": file}} 2399 if name is not None: 2400 args["params"] = {"name": name} 2401 2402 result = self.allspice_client.requests_post( 2403 self.RELEASE_CREATE_ASSET.format( 2404 owner=self.repo.owner.username, 2405 repo=self.repo.name, 2406 id=self.id, 2407 ), 2408 **args, 2409 ) 2410 return ReleaseAsset.parse_response(self.allspice_client, result, self) 2411 2412 def delete(self): 2413 args = {"owner": self.repo.owner.name, "repo": self.repo.name, "id": self.id} 2414 self.allspice_client.requests_delete(self.API_OBJECT.format(**args)) 2415 self.deleted = True
A release on a repo.
2353 @classmethod 2354 def parse_response(cls, allspice_client, result, repo) -> Release: 2355 release = super().parse_response(allspice_client, result) 2356 Release._add_read_property("repository", repo, release) 2357 # For legacy reasons 2358 Release._add_read_property("repo", repo, release) 2359 setattr( 2360 release, 2361 "_assets", 2362 [ 2363 ReleaseAsset.parse_response(allspice_client, asset, release) 2364 for asset in result["assets"] 2365 ], 2366 ) 2367 return release
2369 @classmethod 2370 def request( 2371 cls, 2372 allspice_client, 2373 owner: str, 2374 repo: str, 2375 id: Optional[int] = None, 2376 ) -> Release: 2377 args = {"owner": owner, "repo": repo, "id": id} 2378 release_response = cls._get_gitea_api_object(allspice_client, args) 2379 repository = Repository.request(allspice_client, owner, repo) 2380 release = cls.parse_response(allspice_client, release_response, repository) 2381 return release
2387 def create_asset(self, file: IO, name: Optional[str] = None) -> ReleaseAsset: 2388 """ 2389 Create an asset for this release. 2390 2391 https://hub.allspice.io/api/swagger#/repository/repoCreateReleaseAsset 2392 2393 :param file: The file to upload. This should be a file-like object. 2394 :param name: The name of the file. 2395 :return: The created asset. 2396 """ 2397 2398 args: dict[str, Any] = {"files": {"attachment": file}} 2399 if name is not None: 2400 args["params"] = {"name": name} 2401 2402 result = self.allspice_client.requests_post( 2403 self.RELEASE_CREATE_ASSET.format( 2404 owner=self.repo.owner.username, 2405 repo=self.repo.name, 2406 id=self.id, 2407 ), 2408 **args, 2409 ) 2410 return ReleaseAsset.parse_response(self.allspice_client, result, self)
Create an asset for this release.
allspice.allspice.io/api/swagger#/repository/repoCreateReleaseAsset">https://huballspice.allspice.io/api/swagger#/repository/repoCreateReleaseAsset
Parameters
- file: The file to upload. This should be a file-like object.
- name: The name of the file.
Returns
The created asset.
477class Repository(ApiObject): 478 allow_fast_forward_only_merge: bool 479 allow_manual_merge: Any 480 allow_merge_commits: bool 481 allow_rebase: bool 482 allow_rebase_explicit: bool 483 allow_rebase_update: bool 484 allow_squash_merge: bool 485 archived: bool 486 archived_at: str 487 autodetect_manual_merge: Any 488 avatar_url: str 489 clone_url: str 490 created_at: str 491 default_allow_maintainer_edit: bool 492 default_branch: str 493 default_delete_branch_after_merge: bool 494 default_merge_style: str 495 description: str 496 empty: bool 497 enable_prune: Any 498 external_tracker: Any 499 external_wiki: Any 500 fork: bool 501 forks_count: int 502 full_name: str 503 has_actions: bool 504 has_issues: bool 505 has_packages: bool 506 has_projects: bool 507 has_pull_requests: bool 508 has_releases: bool 509 has_wiki: bool 510 html_url: str 511 id: int 512 ignore_whitespace_conflicts: bool 513 internal: bool 514 internal_tracker: Dict[str, bool] 515 language: str 516 languages_url: str 517 link: str 518 mirror: bool 519 mirror_interval: str 520 mirror_updated: str 521 name: str 522 object_format_name: str 523 open_issues_count: int 524 open_pr_counter: int 525 original_url: str 526 owner: Union["User", "Organization"] 527 parent: Any 528 permissions: Dict[str, bool] 529 private: bool 530 projects_mode: str 531 release_counter: int 532 repo_transfer: Any 533 size: int 534 ssh_url: str 535 stars_count: int 536 template: bool 537 updated_at: datetime 538 url: str 539 watchers_count: int 540 website: str 541 542 API_OBJECT = """/repos/{owner}/{name}""" # <owner>, <reponame> 543 REPO_IS_COLLABORATOR = """/repos/%s/%s/collaborators/%s""" # <owner>, <reponame>, <username> 544 REPO_SEARCH = """/repos/search/""" 545 REPO_BRANCHES = """/repos/%s/%s/branches""" # <owner>, <reponame> 546 REPO_BRANCH = """/repos/{owner}/{repo}/branches/{branch}""" 547 REPO_ISSUES = """/repos/{owner}/{repo}/issues""" # <owner, reponame> 548 REPO_DESIGN_REVIEWS = """/repos/{owner}/{repo}/pulls""" 549 REPO_DELETE = """/repos/%s/%s""" # <owner>, <reponame> 550 REPO_TIMES = """/repos/%s/%s/times""" # <owner>, <reponame> 551 REPO_USER_TIME = """/repos/%s/%s/times/%s""" # <owner>, <reponame>, <username> 552 REPO_COMMITS = "/repos/%s/%s/commits" # <owner>, <reponame> 553 REPO_TRANSFER = "/repos/{owner}/{repo}/transfer" 554 REPO_MILESTONES = """/repos/{owner}/{repo}/milestones""" 555 REPO_GET_ARCHIVE = "/repos/{owner}/{repo}/archive/{ref}.{format}" 556 REPO_GET_ALLSPICE_JSON = "/repos/{owner}/{repo}/allspice_generated/json/{content}" 557 REPO_GET_ALLSPICE_SVG = "/repos/{owner}/{repo}/allspice_generated/svg/{content}" 558 REPO_GET_TOPICS = "/repos/{owner}/{repo}/topics" 559 REPO_ADD_TOPIC = "/repos/{owner}/{repo}/topics/{topic}" 560 REPO_GET_RELEASES = "/repos/{owner}/{repo}/releases" 561 REPO_GET_LATEST_RELEASE = "/repos/{owner}/{repo}/releases/latest" 562 REPO_GET_RELEASE_BY_TAG = "/repos/{owner}/{repo}/releases/tags/{tag}" 563 REPO_GET_COMMIT_STATUS = "/repos/{owner}/{repo}/statuses/{sha}" 564 REPO_GET_RAW_FILE = "/repos/{owner}/{repo}/raw/{path}" 565 REPO_GET_TREE = "/repos/{owner}/{repo}/git/trees/{ref}" 566 567 class ArchiveFormat(Enum): 568 """ 569 Archive formats for Repository.get_archive 570 """ 571 572 TAR = "tar.gz" 573 ZIP = "zip" 574 575 class CommitStatusSort(Enum): 576 """ 577 Sort order for Repository.get_commit_status 578 """ 579 580 OLDEST = "oldest" 581 RECENT_UPDATE = "recentupdate" 582 LEAST_UPDATE = "leastupdate" 583 LEAST_INDEX = "leastindex" 584 HIGHEST_INDEX = "highestindex" 585 586 def __init__(self, allspice_client): 587 super().__init__(allspice_client) 588 589 def __eq__(self, other): 590 if not isinstance(other, Repository): 591 return False 592 return self.owner == other.owner and self.name == other.name 593 594 def __hash__(self): 595 return hash(self.owner) ^ hash(self.name) 596 597 _fields_to_parsers: ClassVar[dict] = { 598 # dont know how to tell apart user and org as owner except form email being empty. 599 "owner": lambda allspice_client, r: ( 600 Organization.parse_response(allspice_client, r) 601 if r["email"] == "" 602 else User.parse_response(allspice_client, r) 603 ), 604 "updated_at": lambda _, t: Util.convert_time(t), 605 } 606 607 @classmethod 608 def request( 609 cls, 610 allspice_client, 611 owner: str, 612 name: str, 613 ) -> Repository: 614 return cls._request(allspice_client, {"owner": owner, "name": name}) 615 616 @classmethod 617 def search( 618 cls, 619 allspice_client, 620 query: Optional[str] = None, 621 topic: bool = False, 622 include_description: bool = False, 623 user: Optional[User] = None, 624 owner_to_prioritize: Union[User, Organization, None] = None, 625 ) -> list[Repository]: 626 """ 627 Search for repositories. 628 629 See https://hub.allspice.io/api/swagger#/repository/repoSearch 630 631 :param query: The query string to search for 632 :param topic: If true, the query string will only be matched against the 633 repository's topic. 634 :param include_description: If true, the query string will be matched 635 against the repository's description as well. 636 :param user: If specified, only repositories that this user owns or 637 contributes to will be searched. 638 :param owner_to_prioritize: If specified, repositories owned by the 639 given entity will be prioritized in the search. 640 :returns: All repositories matching the query. If there are many 641 repositories matching this query, this may take some time. 642 """ 643 644 params = {} 645 646 if query is not None: 647 params["q"] = query 648 if topic: 649 params["topic"] = topic 650 if include_description: 651 params["include_description"] = include_description 652 if user is not None: 653 params["user"] = user.id 654 if owner_to_prioritize is not None: 655 params["owner_to_prioritize"] = owner_to_prioritize.id 656 657 responses = allspice_client.requests_get_paginated(cls.REPO_SEARCH, params=params) 658 659 return [Repository.parse_response(allspice_client, response) for response in responses] 660 661 _patchable_fields: ClassVar[set[str]] = { 662 "allow_manual_merge", 663 "allow_merge_commits", 664 "allow_rebase", 665 "allow_rebase_explicit", 666 "allow_rebase_update", 667 "allow_squash_merge", 668 "archived", 669 "autodetect_manual_merge", 670 "default_branch", 671 "default_delete_branch_after_merge", 672 "default_merge_style", 673 "description", 674 "enable_prune", 675 "external_tracker", 676 "external_wiki", 677 "has_issues", 678 "has_projects", 679 "has_pull_requests", 680 "has_wiki", 681 "ignore_whitespace_conflicts", 682 "internal_tracker", 683 "mirror_interval", 684 "name", 685 "private", 686 "template", 687 "website", 688 } 689 690 def commit(self): 691 args = {"owner": self.owner.username, "name": self.name} 692 self._commit(args) 693 694 def get_branches(self) -> List["Branch"]: 695 """Get all the Branches of this Repository.""" 696 697 results = self.allspice_client.requests_get_paginated( 698 Repository.REPO_BRANCHES % (self.owner.username, self.name) 699 ) 700 return [Branch.parse_response(self.allspice_client, result) for result in results] 701 702 def get_branch(self, name: str) -> "Branch": 703 """Get a specific Branch of this Repository.""" 704 result = self.allspice_client.requests_get( 705 Repository.REPO_BRANCH.format(owner=self.owner.username, repo=self.name, branch=name) 706 ) 707 return Branch.parse_response(self.allspice_client, result) 708 709 def add_branch(self, create_from: Ref, newname: str) -> "Branch": 710 """Add a branch to the repository""" 711 # Note: will only work with gitea 1.13 or higher! 712 713 ref_name = Util.data_params_for_ref(create_from) 714 if "ref" not in ref_name: 715 raise ValueError("create_from must be a Branch, Commit or string") 716 ref_name = ref_name["ref"] 717 718 data = {"new_branch_name": newname, "old_ref_name": ref_name} 719 result = self.allspice_client.requests_post( 720 Repository.REPO_BRANCHES % (self.owner.username, self.name), data=data 721 ) 722 return Branch.parse_response(self.allspice_client, result) 723 724 def get_issues( 725 self, 726 state: Literal["open", "closed", "all"] = "all", 727 search_query: Optional[str] = None, 728 labels: Optional[List[str]] = None, 729 milestones: Optional[List[Union[Milestone, str]]] = None, 730 assignee: Optional[Union[User, str]] = None, 731 since: Optional[datetime] = None, 732 before: Optional[datetime] = None, 733 ) -> List["Issue"]: 734 """ 735 Get all Issues of this Repository (open and closed) 736 737 https://hub.allspice.io/api/swagger#/repository/repoListIssues 738 739 All params of this method are optional filters. If you don't specify a filter, it 740 will not be applied. 741 742 :param state: The state of the Issues to get. If None, all Issues are returned. 743 :param search_query: Filter issues by text. This is equivalent to searching for 744 `search_query` in the Issues on the web interface. 745 :param labels: Filter issues by labels. 746 :param milestones: Filter issues by milestones. 747 :param assignee: Filter issues by the assigned user. 748 :param since: Filter issues by the date they were created. 749 :param before: Filter issues by the date they were created. 750 :return: A list of Issues. 751 """ 752 753 data = { 754 "state": state, 755 } 756 if search_query: 757 data["q"] = search_query 758 if labels: 759 data["labels"] = ",".join(labels) 760 if milestones: 761 data["milestone"] = ",".join( 762 [ 763 milestone.name if isinstance(milestone, Milestone) else milestone 764 for milestone in milestones 765 ] 766 ) 767 if assignee: 768 if isinstance(assignee, User): 769 data["assignee"] = assignee.username 770 else: 771 data["assignee"] = assignee 772 if since: 773 data["since"] = Util.format_time(since) 774 if before: 775 data["before"] = Util.format_time(before) 776 777 results = self.allspice_client.requests_get_paginated( 778 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 779 params=data, 780 ) 781 782 issues = [] 783 for result in results: 784 issue = Issue.parse_response(self.allspice_client, result) 785 # See Issue.request 786 setattr(issue, "_repository", self) 787 # This is mostly for compatibility with an older implementation 788 Issue._add_read_property("repo", self, issue) 789 issues.append(issue) 790 791 return issues 792 793 def get_design_reviews( 794 self, 795 state: Literal["open", "closed", "all"] = "all", 796 milestone: Optional[Union[Milestone, str]] = None, 797 labels: Optional[List[str]] = None, 798 ) -> List["DesignReview"]: 799 """ 800 Get all Design Reviews of this Repository. 801 802 https://hub.allspice.io/api/swagger#/repository/repoListPullRequests 803 804 :param state: The state of the Design Reviews to get. If None, all Design Reviews 805 are returned. 806 :param milestone: The milestone of the Design Reviews to get. 807 :param labels: A list of label IDs to filter DRs by. 808 :return: A list of Design Reviews. 809 """ 810 811 params = { 812 "state": state, 813 } 814 if milestone: 815 if isinstance(milestone, Milestone): 816 params["milestone"] = milestone.name 817 else: 818 params["milestone"] = milestone 819 if labels: 820 params["labels"] = ",".join(labels) 821 822 results = self.allspice_client.requests_get_paginated( 823 self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name), 824 params=params, 825 ) 826 return [DesignReview.parse_response(self.allspice_client, result) for result in results] 827 828 def get_commits( 829 self, 830 sha: Optional[str] = None, 831 path: Optional[str] = None, 832 stat: bool = True, 833 ) -> List["Commit"]: 834 """ 835 Get all the Commits of this Repository. 836 837 https://hub.allspice.io/api/swagger#/repository/repoGetAllCommits 838 839 :param sha: The SHA of the commit to start listing commits from. 840 :param path: filepath of a file/dir. 841 :param stat: Include the number of additions and deletions in the response. 842 Disable for speedup. 843 :return: A list of Commits. 844 """ 845 846 data = {} 847 if sha: 848 data["sha"] = sha 849 if path: 850 data["path"] = path 851 if not stat: 852 data["stat"] = False 853 854 try: 855 results = self.allspice_client.requests_get_paginated( 856 Repository.REPO_COMMITS % (self.owner.username, self.name), 857 params=data, 858 ) 859 except ConflictException as err: 860 logging.warning(err) 861 logging.warning("Repository %s/%s is Empty" % (self.owner.username, self.name)) 862 results = [] 863 return [Commit.parse_response(self.allspice_client, result) for result in results] 864 865 def get_issues_state(self, state) -> List["Issue"]: 866 """ 867 DEPRECATED: Use get_issues() instead. 868 869 Get issues of state Issue.open or Issue.closed of a repository. 870 """ 871 872 assert state in [Issue.OPENED, Issue.CLOSED] 873 issues = [] 874 data = {"state": state} 875 results = self.allspice_client.requests_get_paginated( 876 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 877 params=data, 878 ) 879 for result in results: 880 issue = Issue.parse_response(self.allspice_client, result) 881 # adding data not contained in the issue response 882 # See Issue.request() 883 setattr(issue, "_repository", self) 884 Issue._add_read_property("repo", self, issue) 885 Issue._add_read_property("owner", self.owner, issue) 886 issues.append(issue) 887 return issues 888 889 def get_times(self): 890 results = self.allspice_client.requests_get( 891 Repository.REPO_TIMES % (self.owner.username, self.name) 892 ) 893 return results 894 895 def get_user_time(self, username) -> float: 896 if isinstance(username, User): 897 username = username.username 898 results = self.allspice_client.requests_get( 899 Repository.REPO_USER_TIME % (self.owner.username, self.name, username) 900 ) 901 time = sum(r["time"] for r in results) 902 return time 903 904 def get_full_name(self) -> str: 905 return self.owner.username + "/" + self.name 906 907 def create_issue(self, title, assignees=frozenset(), description="") -> ApiObject: 908 data = { 909 "assignees": assignees, 910 "body": description, 911 "closed": False, 912 "title": title, 913 } 914 result = self.allspice_client.requests_post( 915 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 916 data=data, 917 ) 918 919 issue = Issue.parse_response(self.allspice_client, result) 920 setattr(issue, "_repository", self) 921 Issue._add_read_property("repo", self, issue) 922 return issue 923 924 def create_design_review( 925 self, 926 title: str, 927 head: Union[Branch, str], 928 base: Union[Branch, str], 929 assignees: Optional[Set[Union[User, str]]] = None, 930 body: Optional[str] = None, 931 due_date: Optional[datetime] = None, 932 milestone: Optional["Milestone"] = None, 933 ) -> "DesignReview": 934 """ 935 Create a new Design Review. 936 937 See https://hub.allspice.io/api/swagger#/repository/repoCreatePullRequest 938 939 :param title: Title of the Design Review 940 :param head: Branch or name of the branch to merge into the base branch 941 :param base: Branch or name of the branch to merge into 942 :param assignees: Optional. A list of users to assign this review. List can be of 943 User objects or of usernames. 944 :param body: An Optional Description for the Design Review. 945 :param due_date: An Optional Due date for the Design Review. 946 :param milestone: An Optional Milestone for the Design Review 947 :return: The created Design Review 948 """ 949 950 data: dict[str, Any] = { 951 "title": title, 952 } 953 954 if isinstance(head, Branch): 955 data["head"] = head.name 956 else: 957 data["head"] = head 958 if isinstance(base, Branch): 959 data["base"] = base.name 960 else: 961 data["base"] = base 962 if assignees: 963 data["assignees"] = [a.username if isinstance(a, User) else a for a in assignees] 964 if body: 965 data["body"] = body 966 if due_date: 967 data["due_date"] = Util.format_time(due_date) 968 if milestone: 969 data["milestone"] = milestone.id 970 971 result = self.allspice_client.requests_post( 972 self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name), 973 data=data, 974 ) 975 976 return DesignReview.parse_response(self.allspice_client, result) 977 978 def create_milestone( 979 self, 980 title: str, 981 description: str, 982 due_date: Optional[str] = None, 983 state: str = "open", 984 ) -> "Milestone": 985 url = Repository.REPO_MILESTONES.format(owner=self.owner.username, repo=self.name) 986 data = {"title": title, "description": description, "state": state} 987 if due_date: 988 data["due_date"] = due_date 989 result = self.allspice_client.requests_post(url, data=data) 990 return Milestone.parse_response(self.allspice_client, result) 991 992 def create_gitea_hook(self, hook_url: str, events: List[str]): 993 url = f"/repos/{self.owner.username}/{self.name}/hooks" 994 data = { 995 "type": "gitea", 996 "config": {"content_type": "json", "url": hook_url}, 997 "events": events, 998 "active": True, 999 } 1000 return self.allspice_client.requests_post(url, data=data) 1001 1002 def list_hooks(self): 1003 url = f"/repos/{self.owner.username}/{self.name}/hooks" 1004 return self.allspice_client.requests_get(url) 1005 1006 def delete_hook(self, id: str): 1007 url = f"/repos/{self.owner.username}/{self.name}/hooks/{id}" 1008 self.allspice_client.requests_delete(url) 1009 1010 def is_collaborator(self, username) -> bool: 1011 if isinstance(username, User): 1012 username = username.username 1013 try: 1014 # returns 204 if its ok, 404 if its not 1015 self.allspice_client.requests_get( 1016 Repository.REPO_IS_COLLABORATOR % (self.owner.username, self.name, username) 1017 ) 1018 return True 1019 except Exception: 1020 return False 1021 1022 def get_users_with_access(self) -> Sequence[User]: 1023 url = f"/repos/{self.owner.username}/{self.name}/collaborators" 1024 response = self.allspice_client.requests_get(url) 1025 collabs = [User.parse_response(self.allspice_client, user) for user in response] 1026 if isinstance(self.owner, User): 1027 return [*collabs, self.owner] 1028 else: 1029 # owner must be org 1030 teams = self.owner.get_teams() 1031 for team in teams: 1032 team_repos = team.get_repos() 1033 if self.name in [n.name for n in team_repos]: 1034 collabs += team.get_members() 1035 return collabs 1036 1037 def remove_collaborator(self, user_name: str): 1038 url = f"/repos/{self.owner.username}/{self.name}/collaborators/{user_name}" 1039 self.allspice_client.requests_delete(url) 1040 1041 def transfer_ownership( 1042 self, 1043 new_owner: Union[User, Organization], 1044 new_teams: Set[Team] | FrozenSet[Team] = frozenset(), 1045 ): 1046 url = Repository.REPO_TRANSFER.format(owner=self.owner.username, repo=self.name) 1047 data: dict[str, Any] = {"new_owner": new_owner.username} 1048 if isinstance(new_owner, Organization): 1049 new_team_ids = [team.id for team in new_teams if team in new_owner.get_teams()] 1050 data["team_ids"] = new_team_ids 1051 self.allspice_client.requests_post(url, data=data) 1052 # TODO: make sure this instance is either updated or discarded 1053 1054 def get_git_content( 1055 self, 1056 ref: Optional["Ref"] = None, 1057 commit: "Optional[Commit]" = None, 1058 ) -> List[Content]: 1059 """ 1060 Get the metadata for all files in the root directory. 1061 1062 https://hub.allspice.io/api/swagger#/repository/repoGetContentsList 1063 1064 :param ref: branch or commit to get content from 1065 :param commit: commit to get content from (deprecated) 1066 """ 1067 url = f"/repos/{self.owner.username}/{self.name}/contents" 1068 data = Util.data_params_for_ref(ref or commit) 1069 1070 result = [ 1071 Content.parse_response(self.allspice_client, f) 1072 for f in self.allspice_client.requests_get(url, data) 1073 ] 1074 return result 1075 1076 def get_tree(self, ref: Optional[Ref] = None, recursive: bool = False) -> List[GitEntry]: 1077 """ 1078 Get the repository's tree on a given ref. 1079 1080 By default, this will only return the top-level entries in the tree. If you want 1081 to get the entire tree, set `recursive` to True. 1082 1083 :param ref: The ref to get the tree from. If not provided, the default branch is used. 1084 :param recursive: Whether to get the entire tree or just the top-level entries. 1085 """ 1086 1087 ref = Util.data_params_for_ref(ref).get("ref", self.default_branch) 1088 url = self.REPO_GET_TREE.format(owner=self.owner.username, repo=self.name, ref=ref) 1089 params = {"recursive": recursive} 1090 results = self.allspice_client.requests_get_paginated(url, params=params) 1091 return [GitEntry.parse_response(self.allspice_client, result) for result in results] 1092 1093 def get_file_content( 1094 self, 1095 content: Content, 1096 ref: Optional[Ref] = None, 1097 commit: Optional[Commit] = None, 1098 ) -> Union[str, List["Content"]]: 1099 """https://hub.allspice.io/api/swagger#/repository/repoGetContents""" 1100 url = f"/repos/{self.owner.username}/{self.name}/contents/{content.path}" 1101 data = Util.data_params_for_ref(ref or commit) 1102 1103 if content.type == Content.FILE: 1104 return self.allspice_client.requests_get(url, data)["content"] 1105 else: 1106 return [ 1107 Content.parse_response(self.allspice_client, f) 1108 for f in self.allspice_client.requests_get(url, data) 1109 ] 1110 1111 def get_raw_file( 1112 self, 1113 file_path: str, 1114 ref: Optional[Ref] = None, 1115 ) -> bytes: 1116 """ 1117 Get the raw, binary data of a single file. 1118 1119 Note 1: if the file you are requesting is a text file, you might want to 1120 use .decode() on the result to get a string. For example: 1121 1122 content = repo.get_raw_file("file.txt").decode("utf-8") 1123 1124 Note 2: this method will store the entire file in memory. If you want 1125 to download a large file, you might want to use `download_to_file` 1126 instead. 1127 1128 See https://hub.allspice.io/api/swagger#/repository/repoGetRawFile 1129 1130 :param file_path: The path to the file to get. 1131 :param ref: The branch or commit to get the file from. If not provided, 1132 the default branch is used. 1133 """ 1134 1135 url = self.REPO_GET_RAW_FILE.format( 1136 owner=self.owner.username, 1137 repo=self.name, 1138 path=file_path, 1139 ) 1140 params = Util.data_params_for_ref(ref) 1141 return self.allspice_client.requests_get_raw(url, params=params) 1142 1143 def download_to_file( 1144 self, 1145 file_path: str, 1146 io: IO, 1147 ref: Optional[Ref] = None, 1148 ) -> None: 1149 """ 1150 Download the binary data of a file to a file-like object. 1151 1152 Example: 1153 1154 with open("schematic.DSN", "wb") as f: 1155 Repository.download_to_file("Schematics/my_schematic.DSN", f) 1156 1157 :param file_path: The path to the file in the repository from the root 1158 of the repository. 1159 :param io: The file-like object to write the data to. 1160 """ 1161 1162 url = self.allspice_client._AllSpice__get_url( 1163 self.REPO_GET_RAW_FILE.format( 1164 owner=self.owner.username, 1165 repo=self.name, 1166 path=file_path, 1167 ) 1168 ) 1169 params = Util.data_params_for_ref(ref) 1170 response = self.allspice_client.requests.get( 1171 url, 1172 params=params, 1173 headers=self.allspice_client.headers, 1174 stream=True, 1175 ) 1176 1177 for chunk in response.iter_content(chunk_size=4096): 1178 if chunk: 1179 io.write(chunk) 1180 1181 def get_generated_json(self, content: Union[Content, str], ref: Optional[Ref] = None) -> dict: 1182 """ 1183 Get the json blob for a cad file if it exists, otherwise enqueue 1184 a new job and return a 503 status. 1185 1186 WARNING: This is still experimental and not recommended for critical 1187 applications. The structure and content of the returned dictionary can 1188 change at any time. 1189 1190 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON 1191 """ 1192 1193 if isinstance(content, Content): 1194 content = content.path 1195 1196 url = self.REPO_GET_ALLSPICE_JSON.format( 1197 owner=self.owner.username, 1198 repo=self.name, 1199 content=content, 1200 ) 1201 data = Util.data_params_for_ref(ref) 1202 return self.allspice_client.requests_get(url, data) 1203 1204 def get_generated_svg(self, content: Union[Content, str], ref: Optional[Ref] = None) -> bytes: 1205 """ 1206 Get the svg blob for a cad file if it exists, otherwise enqueue 1207 a new job and return a 503 status. 1208 1209 WARNING: This is still experimental and not yet recommended for 1210 critical applications. The content of the returned svg can change 1211 at any time. 1212 1213 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG 1214 """ 1215 1216 if isinstance(content, Content): 1217 content = content.path 1218 1219 url = self.REPO_GET_ALLSPICE_SVG.format( 1220 owner=self.owner.username, 1221 repo=self.name, 1222 content=content, 1223 ) 1224 data = Util.data_params_for_ref(ref) 1225 return self.allspice_client.requests_get_raw(url, data) 1226 1227 def create_file(self, file_path: str, content: str, data: Optional[dict] = None): 1228 """https://hub.allspice.io/api/swagger#/repository/repoCreateFile""" 1229 if not data: 1230 data = {} 1231 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1232 data.update({"content": content}) 1233 return self.allspice_client.requests_post(url, data) 1234 1235 def change_file(self, file_path: str, file_sha: str, content: str, data: Optional[dict] = None): 1236 """https://hub.allspice.io/api/swagger#/repository/repoCreateFile""" 1237 if not data: 1238 data = {} 1239 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1240 data.update({"sha": file_sha, "content": content}) 1241 return self.allspice_client.requests_put(url, data) 1242 1243 def delete_file(self, file_path: str, file_sha: str, data: Optional[dict] = None): 1244 """https://hub.allspice.io/api/swagger#/repository/repoDeleteFile""" 1245 if not data: 1246 data = {} 1247 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1248 data.update({"sha": file_sha}) 1249 return self.allspice_client.requests_delete(url, data) 1250 1251 def get_archive( 1252 self, 1253 ref: Ref = "main", 1254 archive_format: ArchiveFormat = ArchiveFormat.ZIP, 1255 ) -> bytes: 1256 """ 1257 Download all the files in a specific ref of a repository as a zip or tarball 1258 archive. 1259 1260 https://hub.allspice.io/api/swagger#/repository/repoGetArchive 1261 1262 :param ref: branch or commit to get content from, defaults to the "main" branch 1263 :param archive_format: zip or tar, defaults to zip 1264 """ 1265 1266 ref_string = Util.data_params_for_ref(ref)["ref"] 1267 url = self.REPO_GET_ARCHIVE.format( 1268 owner=self.owner.username, 1269 repo=self.name, 1270 ref=ref_string, 1271 format=archive_format.value, 1272 ) 1273 return self.allspice_client.requests_get_raw(url) 1274 1275 def get_topics(self) -> list[str]: 1276 """ 1277 Gets the list of topics on this repository. 1278 1279 See http://localhost:3000/api/swagger#/repository/repoListTopics 1280 """ 1281 1282 url = self.REPO_GET_TOPICS.format( 1283 owner=self.owner.username, 1284 repo=self.name, 1285 ) 1286 return self.allspice_client.requests_get(url)["topics"] 1287 1288 def add_topic(self, topic: str): 1289 """ 1290 Adds a topic to the repository. 1291 1292 See https://hub.allspice.io/api/swagger#/repository/repoAddTopic 1293 1294 :param topic: The topic to add. Topic names must consist only of 1295 lowercase letters, numnbers and dashes (-), and cannot start with 1296 dashes. Topic names also must be under 35 characters long. 1297 """ 1298 1299 url = self.REPO_ADD_TOPIC.format(owner=self.owner.username, repo=self.name, topic=topic) 1300 self.allspice_client.requests_put(url) 1301 1302 def create_release( 1303 self, 1304 tag_name: str, 1305 name: Optional[str] = None, 1306 body: Optional[str] = None, 1307 draft: bool = False, 1308 ): 1309 """ 1310 Create a release for this repository. The release will be created for 1311 the tag with the given name. If there is no tag with this name, create 1312 the tag first. 1313 1314 See https://hub.allspice.io/api/swagger#/repository/repoCreateRelease 1315 """ 1316 1317 url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name) 1318 data = { 1319 "tag_name": tag_name, 1320 "draft": draft, 1321 } 1322 if name is not None: 1323 data["name"] = name 1324 if body is not None: 1325 data["body"] = body 1326 response = self.allspice_client.requests_post(url, data) 1327 return Release.parse_response(self.allspice_client, response, self) 1328 1329 def get_releases( 1330 self, draft: Optional[bool] = None, pre_release: Optional[bool] = None 1331 ) -> List[Release]: 1332 """ 1333 Get the list of releases for this repository. 1334 1335 See https://hub.allspice.io/api/swagger#/repository/repoListReleases 1336 """ 1337 1338 data = {} 1339 1340 if draft is not None: 1341 data["draft"] = draft 1342 if pre_release is not None: 1343 data["pre-release"] = pre_release 1344 1345 url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name) 1346 responses = self.allspice_client.requests_get_paginated(url, params=data) 1347 1348 return [ 1349 Release.parse_response(self.allspice_client, response, self) for response in responses 1350 ] 1351 1352 def get_latest_release(self) -> Release: 1353 """ 1354 Get the latest release for this repository. 1355 1356 See https://hub.allspice.io/api/swagger#/repository/repoGetLatestRelease 1357 """ 1358 1359 url = self.REPO_GET_LATEST_RELEASE.format(owner=self.owner.username, repo=self.name) 1360 response = self.allspice_client.requests_get(url) 1361 release = Release.parse_response(self.allspice_client, response, self) 1362 return release 1363 1364 def get_release_by_tag(self, tag: str) -> Release: 1365 """ 1366 Get a release by its tag. 1367 1368 See https://hub.allspice.io/api/swagger#/repository/repoGetReleaseByTag 1369 """ 1370 1371 url = self.REPO_GET_RELEASE_BY_TAG.format( 1372 owner=self.owner.username, repo=self.name, tag=tag 1373 ) 1374 response = self.allspice_client.requests_get(url) 1375 release = Release.parse_response(self.allspice_client, response, self) 1376 return release 1377 1378 def get_commit_statuses( 1379 self, 1380 commit: Union[str, Commit], 1381 sort: Optional[CommitStatusSort] = None, 1382 state: Optional[CommitStatusState] = None, 1383 ) -> List[CommitStatus]: 1384 """ 1385 Get a list of statuses for a commit. 1386 1387 This is roughly equivalent to the Commit.get_statuses method, but this 1388 method allows you to sort and filter commits and is more convenient if 1389 you have a commit SHA and don't need to get the commit itself. 1390 1391 See https://hub.allspice.io/api/swagger#/repository/repoListStatuses 1392 """ 1393 1394 if isinstance(commit, Commit): 1395 commit = commit.sha 1396 1397 params = {} 1398 if sort is not None: 1399 params["sort"] = sort.value 1400 if state is not None: 1401 params["state"] = state.value 1402 1403 url = self.REPO_GET_COMMIT_STATUS.format( 1404 owner=self.owner.username, repo=self.name, sha=commit 1405 ) 1406 response = self.allspice_client.requests_get_paginated(url, params=params) 1407 return [CommitStatus.parse_response(self.allspice_client, status) for status in response] 1408 1409 def create_commit_status( 1410 self, 1411 commit: Union[str, Commit], 1412 context: Optional[str] = None, 1413 description: Optional[str] = None, 1414 state: Optional[CommitStatusState] = None, 1415 target_url: Optional[str] = None, 1416 ) -> CommitStatus: 1417 """ 1418 Create a status on a commit. 1419 1420 See https://hub.allspice.io/api/swagger#/repository/repoCreateStatus 1421 """ 1422 1423 if isinstance(commit, Commit): 1424 commit = commit.sha 1425 1426 data = {} 1427 if context is not None: 1428 data["context"] = context 1429 if description is not None: 1430 data["description"] = description 1431 if state is not None: 1432 data["state"] = state.value 1433 if target_url is not None: 1434 data["target_url"] = target_url 1435 1436 url = self.REPO_GET_COMMIT_STATUS.format( 1437 owner=self.owner.username, repo=self.name, sha=commit 1438 ) 1439 response = self.allspice_client.requests_post(url, data=data) 1440 return CommitStatus.parse_response(self.allspice_client, response) 1441 1442 def delete(self): 1443 self.allspice_client.requests_delete( 1444 Repository.REPO_DELETE % (self.owner.username, self.name) 1445 ) 1446 self.deleted = True
616 @classmethod 617 def search( 618 cls, 619 allspice_client, 620 query: Optional[str] = None, 621 topic: bool = False, 622 include_description: bool = False, 623 user: Optional[User] = None, 624 owner_to_prioritize: Union[User, Organization, None] = None, 625 ) -> list[Repository]: 626 """ 627 Search for repositories. 628 629 See https://hub.allspice.io/api/swagger#/repository/repoSearch 630 631 :param query: The query string to search for 632 :param topic: If true, the query string will only be matched against the 633 repository's topic. 634 :param include_description: If true, the query string will be matched 635 against the repository's description as well. 636 :param user: If specified, only repositories that this user owns or 637 contributes to will be searched. 638 :param owner_to_prioritize: If specified, repositories owned by the 639 given entity will be prioritized in the search. 640 :returns: All repositories matching the query. If there are many 641 repositories matching this query, this may take some time. 642 """ 643 644 params = {} 645 646 if query is not None: 647 params["q"] = query 648 if topic: 649 params["topic"] = topic 650 if include_description: 651 params["include_description"] = include_description 652 if user is not None: 653 params["user"] = user.id 654 if owner_to_prioritize is not None: 655 params["owner_to_prioritize"] = owner_to_prioritize.id 656 657 responses = allspice_client.requests_get_paginated(cls.REPO_SEARCH, params=params) 658 659 return [Repository.parse_response(allspice_client, response) for response in responses]
Search for repositories.
See allspice.allspice.io/api/swagger#/repository/repoSearch">https://huballspice.allspice.io/api/swagger#/repository/repoSearch
Parameters
- query: The query string to search for
- topic: If true, the query string will only be matched against the repository's topic.
- include_description: If true, the query string will be matched against the repository's description as well.
- user: If specified, only repositories that this user owns or contributes to will be searched.
- owner_to_prioritize: If specified, repositories owned by the given entity will be prioritized in the search. :returns: All repositories matching the query. If there are many repositories matching this query, this may take some time.
694 def get_branches(self) -> List["Branch"]: 695 """Get all the Branches of this Repository.""" 696 697 results = self.allspice_client.requests_get_paginated( 698 Repository.REPO_BRANCHES % (self.owner.username, self.name) 699 ) 700 return [Branch.parse_response(self.allspice_client, result) for result in results]
Get all the Branches of this Repository.
702 def get_branch(self, name: str) -> "Branch": 703 """Get a specific Branch of this Repository.""" 704 result = self.allspice_client.requests_get( 705 Repository.REPO_BRANCH.format(owner=self.owner.username, repo=self.name, branch=name) 706 ) 707 return Branch.parse_response(self.allspice_client, result)
Get a specific Branch of this Repository.
709 def add_branch(self, create_from: Ref, newname: str) -> "Branch": 710 """Add a branch to the repository""" 711 # Note: will only work with gitea 1.13 or higher! 712 713 ref_name = Util.data_params_for_ref(create_from) 714 if "ref" not in ref_name: 715 raise ValueError("create_from must be a Branch, Commit or string") 716 ref_name = ref_name["ref"] 717 718 data = {"new_branch_name": newname, "old_ref_name": ref_name} 719 result = self.allspice_client.requests_post( 720 Repository.REPO_BRANCHES % (self.owner.username, self.name), data=data 721 ) 722 return Branch.parse_response(self.allspice_client, result)
Add a branch to the repository
724 def get_issues( 725 self, 726 state: Literal["open", "closed", "all"] = "all", 727 search_query: Optional[str] = None, 728 labels: Optional[List[str]] = None, 729 milestones: Optional[List[Union[Milestone, str]]] = None, 730 assignee: Optional[Union[User, str]] = None, 731 since: Optional[datetime] = None, 732 before: Optional[datetime] = None, 733 ) -> List["Issue"]: 734 """ 735 Get all Issues of this Repository (open and closed) 736 737 https://hub.allspice.io/api/swagger#/repository/repoListIssues 738 739 All params of this method are optional filters. If you don't specify a filter, it 740 will not be applied. 741 742 :param state: The state of the Issues to get. If None, all Issues are returned. 743 :param search_query: Filter issues by text. This is equivalent to searching for 744 `search_query` in the Issues on the web interface. 745 :param labels: Filter issues by labels. 746 :param milestones: Filter issues by milestones. 747 :param assignee: Filter issues by the assigned user. 748 :param since: Filter issues by the date they were created. 749 :param before: Filter issues by the date they were created. 750 :return: A list of Issues. 751 """ 752 753 data = { 754 "state": state, 755 } 756 if search_query: 757 data["q"] = search_query 758 if labels: 759 data["labels"] = ",".join(labels) 760 if milestones: 761 data["milestone"] = ",".join( 762 [ 763 milestone.name if isinstance(milestone, Milestone) else milestone 764 for milestone in milestones 765 ] 766 ) 767 if assignee: 768 if isinstance(assignee, User): 769 data["assignee"] = assignee.username 770 else: 771 data["assignee"] = assignee 772 if since: 773 data["since"] = Util.format_time(since) 774 if before: 775 data["before"] = Util.format_time(before) 776 777 results = self.allspice_client.requests_get_paginated( 778 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 779 params=data, 780 ) 781 782 issues = [] 783 for result in results: 784 issue = Issue.parse_response(self.allspice_client, result) 785 # See Issue.request 786 setattr(issue, "_repository", self) 787 # This is mostly for compatibility with an older implementation 788 Issue._add_read_property("repo", self, issue) 789 issues.append(issue) 790 791 return issues
Get all Issues of this Repository (open and closed)
allspice.allspice.io/api/swagger#/repository/repoListIssues">https://huballspice.allspice.io/api/swagger#/repository/repoListIssues
All params of this method are optional filters. If you don't specify a filter, it will not be applied.
Parameters
- state: The state of the Issues to get. If None, all Issues are returned.
- search_query: Filter issues by text. This is equivalent to searching for
search_query
in the Issues on the web interface. - labels: Filter issues by labels.
- milestones: Filter issues by milestones.
- assignee: Filter issues by the assigned user.
- since: Filter issues by the date they were created.
- before: Filter issues by the date they were created.
Returns
A list of Issues.
793 def get_design_reviews( 794 self, 795 state: Literal["open", "closed", "all"] = "all", 796 milestone: Optional[Union[Milestone, str]] = None, 797 labels: Optional[List[str]] = None, 798 ) -> List["DesignReview"]: 799 """ 800 Get all Design Reviews of this Repository. 801 802 https://hub.allspice.io/api/swagger#/repository/repoListPullRequests 803 804 :param state: The state of the Design Reviews to get. If None, all Design Reviews 805 are returned. 806 :param milestone: The milestone of the Design Reviews to get. 807 :param labels: A list of label IDs to filter DRs by. 808 :return: A list of Design Reviews. 809 """ 810 811 params = { 812 "state": state, 813 } 814 if milestone: 815 if isinstance(milestone, Milestone): 816 params["milestone"] = milestone.name 817 else: 818 params["milestone"] = milestone 819 if labels: 820 params["labels"] = ",".join(labels) 821 822 results = self.allspice_client.requests_get_paginated( 823 self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name), 824 params=params, 825 ) 826 return [DesignReview.parse_response(self.allspice_client, result) for result in results]
Get all Design Reviews of this Repository.
allspice.allspice.io/api/swagger#/repository/repoListPullRequests">https://huballspice.allspice.io/api/swagger#/repository/repoListPullRequests
Parameters
- state: The state of the Design Reviews to get. If None, all Design Reviews are returned.
- milestone: The milestone of the Design Reviews to get.
- labels: A list of label IDs to filter DRs by.
Returns
A list of Design Reviews.
828 def get_commits( 829 self, 830 sha: Optional[str] = None, 831 path: Optional[str] = None, 832 stat: bool = True, 833 ) -> List["Commit"]: 834 """ 835 Get all the Commits of this Repository. 836 837 https://hub.allspice.io/api/swagger#/repository/repoGetAllCommits 838 839 :param sha: The SHA of the commit to start listing commits from. 840 :param path: filepath of a file/dir. 841 :param stat: Include the number of additions and deletions in the response. 842 Disable for speedup. 843 :return: A list of Commits. 844 """ 845 846 data = {} 847 if sha: 848 data["sha"] = sha 849 if path: 850 data["path"] = path 851 if not stat: 852 data["stat"] = False 853 854 try: 855 results = self.allspice_client.requests_get_paginated( 856 Repository.REPO_COMMITS % (self.owner.username, self.name), 857 params=data, 858 ) 859 except ConflictException as err: 860 logging.warning(err) 861 logging.warning("Repository %s/%s is Empty" % (self.owner.username, self.name)) 862 results = [] 863 return [Commit.parse_response(self.allspice_client, result) for result in results]
Get all the Commits of this Repository.
allspice.allspice.io/api/swagger#/repository/repoGetAllCommits">https://huballspice.allspice.io/api/swagger#/repository/repoGetAllCommits
Parameters
- sha: The SHA of the commit to start listing commits from.
- path: filepath of a file/dir.
- stat: Include the number of additions and deletions in the response. Disable for speedup.
Returns
A list of Commits.
865 def get_issues_state(self, state) -> List["Issue"]: 866 """ 867 DEPRECATED: Use get_issues() instead. 868 869 Get issues of state Issue.open or Issue.closed of a repository. 870 """ 871 872 assert state in [Issue.OPENED, Issue.CLOSED] 873 issues = [] 874 data = {"state": state} 875 results = self.allspice_client.requests_get_paginated( 876 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 877 params=data, 878 ) 879 for result in results: 880 issue = Issue.parse_response(self.allspice_client, result) 881 # adding data not contained in the issue response 882 # See Issue.request() 883 setattr(issue, "_repository", self) 884 Issue._add_read_property("repo", self, issue) 885 Issue._add_read_property("owner", self.owner, issue) 886 issues.append(issue) 887 return issues
DEPRECATED: Use get_issues() instead.
Get issues of state Issue.open or Issue.closed of a repository.
895 def get_user_time(self, username) -> float: 896 if isinstance(username, User): 897 username = username.username 898 results = self.allspice_client.requests_get( 899 Repository.REPO_USER_TIME % (self.owner.username, self.name, username) 900 ) 901 time = sum(r["time"] for r in results) 902 return time
907 def create_issue(self, title, assignees=frozenset(), description="") -> ApiObject: 908 data = { 909 "assignees": assignees, 910 "body": description, 911 "closed": False, 912 "title": title, 913 } 914 result = self.allspice_client.requests_post( 915 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 916 data=data, 917 ) 918 919 issue = Issue.parse_response(self.allspice_client, result) 920 setattr(issue, "_repository", self) 921 Issue._add_read_property("repo", self, issue) 922 return issue
924 def create_design_review( 925 self, 926 title: str, 927 head: Union[Branch, str], 928 base: Union[Branch, str], 929 assignees: Optional[Set[Union[User, str]]] = None, 930 body: Optional[str] = None, 931 due_date: Optional[datetime] = None, 932 milestone: Optional["Milestone"] = None, 933 ) -> "DesignReview": 934 """ 935 Create a new Design Review. 936 937 See https://hub.allspice.io/api/swagger#/repository/repoCreatePullRequest 938 939 :param title: Title of the Design Review 940 :param head: Branch or name of the branch to merge into the base branch 941 :param base: Branch or name of the branch to merge into 942 :param assignees: Optional. A list of users to assign this review. List can be of 943 User objects or of usernames. 944 :param body: An Optional Description for the Design Review. 945 :param due_date: An Optional Due date for the Design Review. 946 :param milestone: An Optional Milestone for the Design Review 947 :return: The created Design Review 948 """ 949 950 data: dict[str, Any] = { 951 "title": title, 952 } 953 954 if isinstance(head, Branch): 955 data["head"] = head.name 956 else: 957 data["head"] = head 958 if isinstance(base, Branch): 959 data["base"] = base.name 960 else: 961 data["base"] = base 962 if assignees: 963 data["assignees"] = [a.username if isinstance(a, User) else a for a in assignees] 964 if body: 965 data["body"] = body 966 if due_date: 967 data["due_date"] = Util.format_time(due_date) 968 if milestone: 969 data["milestone"] = milestone.id 970 971 result = self.allspice_client.requests_post( 972 self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name), 973 data=data, 974 ) 975 976 return DesignReview.parse_response(self.allspice_client, result)
Create a new Design Review.
See allspice.allspice.io/api/swagger#/repository/repoCreatePullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoCreatePullRequest
Parameters
- title: Title of the Design Review
- head: Branch or name of the branch to merge into the base branch
- base: Branch or name of the branch to merge into
- assignees: Optional. A list of users to assign this review. List can be of User objects or of usernames.
- body: An Optional Description for the Design Review.
- due_date: An Optional Due date for the Design Review.
- milestone: An Optional Milestone for the Design Review
Returns
The created Design Review
978 def create_milestone( 979 self, 980 title: str, 981 description: str, 982 due_date: Optional[str] = None, 983 state: str = "open", 984 ) -> "Milestone": 985 url = Repository.REPO_MILESTONES.format(owner=self.owner.username, repo=self.name) 986 data = {"title": title, "description": description, "state": state} 987 if due_date: 988 data["due_date"] = due_date 989 result = self.allspice_client.requests_post(url, data=data) 990 return Milestone.parse_response(self.allspice_client, result)
992 def create_gitea_hook(self, hook_url: str, events: List[str]): 993 url = f"/repos/{self.owner.username}/{self.name}/hooks" 994 data = { 995 "type": "gitea", 996 "config": {"content_type": "json", "url": hook_url}, 997 "events": events, 998 "active": True, 999 } 1000 return self.allspice_client.requests_post(url, data=data)
1010 def is_collaborator(self, username) -> bool: 1011 if isinstance(username, User): 1012 username = username.username 1013 try: 1014 # returns 204 if its ok, 404 if its not 1015 self.allspice_client.requests_get( 1016 Repository.REPO_IS_COLLABORATOR % (self.owner.username, self.name, username) 1017 ) 1018 return True 1019 except Exception: 1020 return False
1022 def get_users_with_access(self) -> Sequence[User]: 1023 url = f"/repos/{self.owner.username}/{self.name}/collaborators" 1024 response = self.allspice_client.requests_get(url) 1025 collabs = [User.parse_response(self.allspice_client, user) for user in response] 1026 if isinstance(self.owner, User): 1027 return [*collabs, self.owner] 1028 else: 1029 # owner must be org 1030 teams = self.owner.get_teams() 1031 for team in teams: 1032 team_repos = team.get_repos() 1033 if self.name in [n.name for n in team_repos]: 1034 collabs += team.get_members() 1035 return collabs
1041 def transfer_ownership( 1042 self, 1043 new_owner: Union[User, Organization], 1044 new_teams: Set[Team] | FrozenSet[Team] = frozenset(), 1045 ): 1046 url = Repository.REPO_TRANSFER.format(owner=self.owner.username, repo=self.name) 1047 data: dict[str, Any] = {"new_owner": new_owner.username} 1048 if isinstance(new_owner, Organization): 1049 new_team_ids = [team.id for team in new_teams if team in new_owner.get_teams()] 1050 data["team_ids"] = new_team_ids 1051 self.allspice_client.requests_post(url, data=data) 1052 # TODO: make sure this instance is either updated or discarded
1054 def get_git_content( 1055 self, 1056 ref: Optional["Ref"] = None, 1057 commit: "Optional[Commit]" = None, 1058 ) -> List[Content]: 1059 """ 1060 Get the metadata for all files in the root directory. 1061 1062 https://hub.allspice.io/api/swagger#/repository/repoGetContentsList 1063 1064 :param ref: branch or commit to get content from 1065 :param commit: commit to get content from (deprecated) 1066 """ 1067 url = f"/repos/{self.owner.username}/{self.name}/contents" 1068 data = Util.data_params_for_ref(ref or commit) 1069 1070 result = [ 1071 Content.parse_response(self.allspice_client, f) 1072 for f in self.allspice_client.requests_get(url, data) 1073 ] 1074 return result
Get the metadata for all files in the root directory.
allspice.allspice.io/api/swagger#/repository/repoGetContentsList">https://huballspice.allspice.io/api/swagger#/repository/repoGetContentsList
Parameters
- ref: branch or commit to get content from
- commit: commit to get content from (deprecated)
1076 def get_tree(self, ref: Optional[Ref] = None, recursive: bool = False) -> List[GitEntry]: 1077 """ 1078 Get the repository's tree on a given ref. 1079 1080 By default, this will only return the top-level entries in the tree. If you want 1081 to get the entire tree, set `recursive` to True. 1082 1083 :param ref: The ref to get the tree from. If not provided, the default branch is used. 1084 :param recursive: Whether to get the entire tree or just the top-level entries. 1085 """ 1086 1087 ref = Util.data_params_for_ref(ref).get("ref", self.default_branch) 1088 url = self.REPO_GET_TREE.format(owner=self.owner.username, repo=self.name, ref=ref) 1089 params = {"recursive": recursive} 1090 results = self.allspice_client.requests_get_paginated(url, params=params) 1091 return [GitEntry.parse_response(self.allspice_client, result) for result in results]
Get the repository's tree on a given ref.
By default, this will only return the top-level entries in the tree. If you want
to get the entire tree, set recursive
to True.
Parameters
- ref: The ref to get the tree from. If not provided, the default branch is used.
- recursive: Whether to get the entire tree or just the top-level entries.
1093 def get_file_content( 1094 self, 1095 content: Content, 1096 ref: Optional[Ref] = None, 1097 commit: Optional[Commit] = None, 1098 ) -> Union[str, List["Content"]]: 1099 """https://hub.allspice.io/api/swagger#/repository/repoGetContents""" 1100 url = f"/repos/{self.owner.username}/{self.name}/contents/{content.path}" 1101 data = Util.data_params_for_ref(ref or commit) 1102 1103 if content.type == Content.FILE: 1104 return self.allspice_client.requests_get(url, data)["content"] 1105 else: 1106 return [ 1107 Content.parse_response(self.allspice_client, f) 1108 for f in self.allspice_client.requests_get(url, data) 1109 ]
allspice.allspice.io/api/swagger#/repository/repoGetContents">https://huballspice.allspice.io/api/swagger#/repository/repoGetContents
1111 def get_raw_file( 1112 self, 1113 file_path: str, 1114 ref: Optional[Ref] = None, 1115 ) -> bytes: 1116 """ 1117 Get the raw, binary data of a single file. 1118 1119 Note 1: if the file you are requesting is a text file, you might want to 1120 use .decode() on the result to get a string. For example: 1121 1122 content = repo.get_raw_file("file.txt").decode("utf-8") 1123 1124 Note 2: this method will store the entire file in memory. If you want 1125 to download a large file, you might want to use `download_to_file` 1126 instead. 1127 1128 See https://hub.allspice.io/api/swagger#/repository/repoGetRawFile 1129 1130 :param file_path: The path to the file to get. 1131 :param ref: The branch or commit to get the file from. If not provided, 1132 the default branch is used. 1133 """ 1134 1135 url = self.REPO_GET_RAW_FILE.format( 1136 owner=self.owner.username, 1137 repo=self.name, 1138 path=file_path, 1139 ) 1140 params = Util.data_params_for_ref(ref) 1141 return self.allspice_client.requests_get_raw(url, params=params)
Get the raw, binary data of a single file.
Note 1: if the file you are requesting is a text file, you might want to use .decode() on the result to get a string. For example:
content = repo.get_raw_file("file.txt").decode("utf-8")
Note 2: this method will store the entire file in memory. If you want
to download a large file, you might want to use download_to_file
instead.
See allspice.allspice.io/api/swagger#/repository/repoGetRawFile">https://huballspice.allspice.io/api/swagger#/repository/repoGetRawFile
Parameters
- file_path: The path to the file to get.
- ref: The branch or commit to get the file from. If not provided, the default branch is used.
1143 def download_to_file( 1144 self, 1145 file_path: str, 1146 io: IO, 1147 ref: Optional[Ref] = None, 1148 ) -> None: 1149 """ 1150 Download the binary data of a file to a file-like object. 1151 1152 Example: 1153 1154 with open("schematic.DSN", "wb") as f: 1155 Repository.download_to_file("Schematics/my_schematic.DSN", f) 1156 1157 :param file_path: The path to the file in the repository from the root 1158 of the repository. 1159 :param io: The file-like object to write the data to. 1160 """ 1161 1162 url = self.allspice_client._AllSpice__get_url( 1163 self.REPO_GET_RAW_FILE.format( 1164 owner=self.owner.username, 1165 repo=self.name, 1166 path=file_path, 1167 ) 1168 ) 1169 params = Util.data_params_for_ref(ref) 1170 response = self.allspice_client.requests.get( 1171 url, 1172 params=params, 1173 headers=self.allspice_client.headers, 1174 stream=True, 1175 ) 1176 1177 for chunk in response.iter_content(chunk_size=4096): 1178 if chunk: 1179 io.write(chunk)
Download the binary data of a file to a file-like object.
Example:
with open("schematic.DSN", "wb") as f:
Repository.download_to_file("Schematics/my_schematic.DSN", f)
Parameters
- file_path: The path to the file in the repository from the root of the repository.
- io: The file-like object to write the data to.
1181 def get_generated_json(self, content: Union[Content, str], ref: Optional[Ref] = None) -> dict: 1182 """ 1183 Get the json blob for a cad file if it exists, otherwise enqueue 1184 a new job and return a 503 status. 1185 1186 WARNING: This is still experimental and not recommended for critical 1187 applications. The structure and content of the returned dictionary can 1188 change at any time. 1189 1190 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON 1191 """ 1192 1193 if isinstance(content, Content): 1194 content = content.path 1195 1196 url = self.REPO_GET_ALLSPICE_JSON.format( 1197 owner=self.owner.username, 1198 repo=self.name, 1199 content=content, 1200 ) 1201 data = Util.data_params_for_ref(ref) 1202 return self.allspice_client.requests_get(url, data)
Get the json blob for a cad file if it exists, otherwise enqueue a new job and return a 503 status.
WARNING: This is still experimental and not recommended for critical applications. The structure and content of the returned dictionary can change at any time.
See allspice.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON">https://huballspice.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON
1204 def get_generated_svg(self, content: Union[Content, str], ref: Optional[Ref] = None) -> bytes: 1205 """ 1206 Get the svg blob for a cad file if it exists, otherwise enqueue 1207 a new job and return a 503 status. 1208 1209 WARNING: This is still experimental and not yet recommended for 1210 critical applications. The content of the returned svg can change 1211 at any time. 1212 1213 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG 1214 """ 1215 1216 if isinstance(content, Content): 1217 content = content.path 1218 1219 url = self.REPO_GET_ALLSPICE_SVG.format( 1220 owner=self.owner.username, 1221 repo=self.name, 1222 content=content, 1223 ) 1224 data = Util.data_params_for_ref(ref) 1225 return self.allspice_client.requests_get_raw(url, data)
Get the svg blob for a cad file if it exists, otherwise enqueue a new job and return a 503 status.
WARNING: This is still experimental and not yet recommended for critical applications. The content of the returned svg can change at any time.
See allspice.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG">https://huballspice.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG
1227 def create_file(self, file_path: str, content: str, data: Optional[dict] = None): 1228 """https://hub.allspice.io/api/swagger#/repository/repoCreateFile""" 1229 if not data: 1230 data = {} 1231 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1232 data.update({"content": content}) 1233 return self.allspice_client.requests_post(url, data)
allspice.allspice.io/api/swagger#/repository/repoCreateFile">https://huballspice.allspice.io/api/swagger#/repository/repoCreateFile
1235 def change_file(self, file_path: str, file_sha: str, content: str, data: Optional[dict] = None): 1236 """https://hub.allspice.io/api/swagger#/repository/repoCreateFile""" 1237 if not data: 1238 data = {} 1239 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1240 data.update({"sha": file_sha, "content": content}) 1241 return self.allspice_client.requests_put(url, data)
allspice.allspice.io/api/swagger#/repository/repoCreateFile">https://huballspice.allspice.io/api/swagger#/repository/repoCreateFile
1243 def delete_file(self, file_path: str, file_sha: str, data: Optional[dict] = None): 1244 """https://hub.allspice.io/api/swagger#/repository/repoDeleteFile""" 1245 if not data: 1246 data = {} 1247 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1248 data.update({"sha": file_sha}) 1249 return self.allspice_client.requests_delete(url, data)
allspice.allspice.io/api/swagger#/repository/repoDeleteFile">https://huballspice.allspice.io/api/swagger#/repository/repoDeleteFile
1251 def get_archive( 1252 self, 1253 ref: Ref = "main", 1254 archive_format: ArchiveFormat = ArchiveFormat.ZIP, 1255 ) -> bytes: 1256 """ 1257 Download all the files in a specific ref of a repository as a zip or tarball 1258 archive. 1259 1260 https://hub.allspice.io/api/swagger#/repository/repoGetArchive 1261 1262 :param ref: branch or commit to get content from, defaults to the "main" branch 1263 :param archive_format: zip or tar, defaults to zip 1264 """ 1265 1266 ref_string = Util.data_params_for_ref(ref)["ref"] 1267 url = self.REPO_GET_ARCHIVE.format( 1268 owner=self.owner.username, 1269 repo=self.name, 1270 ref=ref_string, 1271 format=archive_format.value, 1272 ) 1273 return self.allspice_client.requests_get_raw(url)
Download all the files in a specific ref of a repository as a zip or tarball archive.
allspice.allspice.io/api/swagger#/repository/repoGetArchive">https://huballspice.allspice.io/api/swagger#/repository/repoGetArchive
Parameters
- ref: branch or commit to get content from, defaults to the "main" branch
- archive_format: zip or tar, defaults to zip
1275 def get_topics(self) -> list[str]: 1276 """ 1277 Gets the list of topics on this repository. 1278 1279 See http://localhost:3000/api/swagger#/repository/repoListTopics 1280 """ 1281 1282 url = self.REPO_GET_TOPICS.format( 1283 owner=self.owner.username, 1284 repo=self.name, 1285 ) 1286 return self.allspice_client.requests_get(url)["topics"]
Gets the list of topics on this repository.
See http://localhost:3000/api/swagger#/repository/repoListTopics
1288 def add_topic(self, topic: str): 1289 """ 1290 Adds a topic to the repository. 1291 1292 See https://hub.allspice.io/api/swagger#/repository/repoAddTopic 1293 1294 :param topic: The topic to add. Topic names must consist only of 1295 lowercase letters, numnbers and dashes (-), and cannot start with 1296 dashes. Topic names also must be under 35 characters long. 1297 """ 1298 1299 url = self.REPO_ADD_TOPIC.format(owner=self.owner.username, repo=self.name, topic=topic) 1300 self.allspice_client.requests_put(url)
Adds a topic to the repository.
See allspice.allspice.io/api/swagger#/repository/repoAddTopic">https://huballspice.allspice.io/api/swagger#/repository/repoAddTopic
Parameters
- topic: The topic to add. Topic names must consist only of lowercase letters, numnbers and dashes (-), and cannot start with dashes. Topic names also must be under 35 characters long.
1302 def create_release( 1303 self, 1304 tag_name: str, 1305 name: Optional[str] = None, 1306 body: Optional[str] = None, 1307 draft: bool = False, 1308 ): 1309 """ 1310 Create a release for this repository. The release will be created for 1311 the tag with the given name. If there is no tag with this name, create 1312 the tag first. 1313 1314 See https://hub.allspice.io/api/swagger#/repository/repoCreateRelease 1315 """ 1316 1317 url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name) 1318 data = { 1319 "tag_name": tag_name, 1320 "draft": draft, 1321 } 1322 if name is not None: 1323 data["name"] = name 1324 if body is not None: 1325 data["body"] = body 1326 response = self.allspice_client.requests_post(url, data) 1327 return Release.parse_response(self.allspice_client, response, self)
Create a release for this repository. The release will be created for the tag with the given name. If there is no tag with this name, create the tag first.
See allspice.allspice.io/api/swagger#/repository/repoCreateRelease">https://huballspice.allspice.io/api/swagger#/repository/repoCreateRelease
1329 def get_releases( 1330 self, draft: Optional[bool] = None, pre_release: Optional[bool] = None 1331 ) -> List[Release]: 1332 """ 1333 Get the list of releases for this repository. 1334 1335 See https://hub.allspice.io/api/swagger#/repository/repoListReleases 1336 """ 1337 1338 data = {} 1339 1340 if draft is not None: 1341 data["draft"] = draft 1342 if pre_release is not None: 1343 data["pre-release"] = pre_release 1344 1345 url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name) 1346 responses = self.allspice_client.requests_get_paginated(url, params=data) 1347 1348 return [ 1349 Release.parse_response(self.allspice_client, response, self) for response in responses 1350 ]
Get the list of releases for this repository.
See allspice.allspice.io/api/swagger#/repository/repoListReleases">https://huballspice.allspice.io/api/swagger#/repository/repoListReleases
1352 def get_latest_release(self) -> Release: 1353 """ 1354 Get the latest release for this repository. 1355 1356 See https://hub.allspice.io/api/swagger#/repository/repoGetLatestRelease 1357 """ 1358 1359 url = self.REPO_GET_LATEST_RELEASE.format(owner=self.owner.username, repo=self.name) 1360 response = self.allspice_client.requests_get(url) 1361 release = Release.parse_response(self.allspice_client, response, self) 1362 return release
Get the latest release for this repository.
See allspice.allspice.io/api/swagger#/repository/repoGetLatestRelease">https://huballspice.allspice.io/api/swagger#/repository/repoGetLatestRelease
1364 def get_release_by_tag(self, tag: str) -> Release: 1365 """ 1366 Get a release by its tag. 1367 1368 See https://hub.allspice.io/api/swagger#/repository/repoGetReleaseByTag 1369 """ 1370 1371 url = self.REPO_GET_RELEASE_BY_TAG.format( 1372 owner=self.owner.username, repo=self.name, tag=tag 1373 ) 1374 response = self.allspice_client.requests_get(url) 1375 release = Release.parse_response(self.allspice_client, response, self) 1376 return release
Get a release by its tag.
See allspice.allspice.io/api/swagger#/repository/repoGetReleaseByTag">https://huballspice.allspice.io/api/swagger#/repository/repoGetReleaseByTag
1378 def get_commit_statuses( 1379 self, 1380 commit: Union[str, Commit], 1381 sort: Optional[CommitStatusSort] = None, 1382 state: Optional[CommitStatusState] = None, 1383 ) -> List[CommitStatus]: 1384 """ 1385 Get a list of statuses for a commit. 1386 1387 This is roughly equivalent to the Commit.get_statuses method, but this 1388 method allows you to sort and filter commits and is more convenient if 1389 you have a commit SHA and don't need to get the commit itself. 1390 1391 See https://hub.allspice.io/api/swagger#/repository/repoListStatuses 1392 """ 1393 1394 if isinstance(commit, Commit): 1395 commit = commit.sha 1396 1397 params = {} 1398 if sort is not None: 1399 params["sort"] = sort.value 1400 if state is not None: 1401 params["state"] = state.value 1402 1403 url = self.REPO_GET_COMMIT_STATUS.format( 1404 owner=self.owner.username, repo=self.name, sha=commit 1405 ) 1406 response = self.allspice_client.requests_get_paginated(url, params=params) 1407 return [CommitStatus.parse_response(self.allspice_client, status) for status in response]
Get a list of statuses for a commit.
This is roughly equivalent to the Commit.get_statuses method, but this method allows you to sort and filter commits and is more convenient if you have a commit SHA and don't need to get the commit itself.
See allspice.allspice.io/api/swagger#/repository/repoListStatuses">https://huballspice.allspice.io/api/swagger#/repository/repoListStatuses
1409 def create_commit_status( 1410 self, 1411 commit: Union[str, Commit], 1412 context: Optional[str] = None, 1413 description: Optional[str] = None, 1414 state: Optional[CommitStatusState] = None, 1415 target_url: Optional[str] = None, 1416 ) -> CommitStatus: 1417 """ 1418 Create a status on a commit. 1419 1420 See https://hub.allspice.io/api/swagger#/repository/repoCreateStatus 1421 """ 1422 1423 if isinstance(commit, Commit): 1424 commit = commit.sha 1425 1426 data = {} 1427 if context is not None: 1428 data["context"] = context 1429 if description is not None: 1430 data["description"] = description 1431 if state is not None: 1432 data["state"] = state.value 1433 if target_url is not None: 1434 data["target_url"] = target_url 1435 1436 url = self.REPO_GET_COMMIT_STATUS.format( 1437 owner=self.owner.username, repo=self.name, sha=commit 1438 ) 1439 response = self.allspice_client.requests_post(url, data=data) 1440 return CommitStatus.parse_response(self.allspice_client, response)
Create a status on a commit.
See allspice.allspice.io/api/swagger#/repository/repoCreateStatus">https://huballspice.allspice.io/api/swagger#/repository/repoCreateStatus
567 class ArchiveFormat(Enum): 568 """ 569 Archive formats for Repository.get_archive 570 """ 571 572 TAR = "tar.gz" 573 ZIP = "zip"
Archive formats for Repository.get_archive
575 class CommitStatusSort(Enum): 576 """ 577 Sort order for Repository.get_commit_status 578 """ 579 580 OLDEST = "oldest" 581 RECENT_UPDATE = "recentupdate" 582 LEAST_UPDATE = "leastupdate" 583 LEAST_INDEX = "leastindex" 584 HIGHEST_INDEX = "highestindex"
Sort order for Repository.get_commit_status
2218class Team(ApiObject): 2219 can_create_org_repo: bool 2220 description: str 2221 id: int 2222 includes_all_repositories: bool 2223 name: str 2224 organization: Optional["Organization"] 2225 permission: str 2226 units: List[str] 2227 units_map: Dict[str, str] 2228 2229 API_OBJECT = """/teams/{id}""" # <id> 2230 ADD_REPO = """/teams/%s/repos/%s/%s""" # <id, org, repo> 2231 TEAM_DELETE = """/teams/%s""" # <id> 2232 GET_MEMBERS = """/teams/%s/members""" # <id> 2233 GET_REPOS = """/teams/%s/repos""" # <id> 2234 2235 def __init__(self, allspice_client): 2236 super().__init__(allspice_client) 2237 2238 def __eq__(self, other): 2239 if not isinstance(other, Team): 2240 return False 2241 return self.organization == other.organization and self.id == other.id 2242 2243 def __hash__(self): 2244 return hash(self.organization) ^ hash(self.id) 2245 2246 _fields_to_parsers: ClassVar[dict] = { 2247 "organization": lambda allspice_client, o: Organization.parse_response(allspice_client, o) 2248 } 2249 2250 _patchable_fields: ClassVar[set[str]] = { 2251 "can_create_org_repo", 2252 "description", 2253 "includes_all_repositories", 2254 "name", 2255 "permission", 2256 "units", 2257 "units_map", 2258 } 2259 2260 @classmethod 2261 def request(cls, allspice_client, id: int): 2262 return cls._request(allspice_client, {"id": id}) 2263 2264 def commit(self): 2265 args = {"id": self.id} 2266 self._commit(args) 2267 2268 def add_user(self, user: User): 2269 """https://hub.allspice.io/api/swagger#/organization/orgAddTeamMember""" 2270 url = f"/teams/{self.id}/members/{user.login}" 2271 self.allspice_client.requests_put(url) 2272 2273 def add_repo(self, org: Organization, repo: Union[Repository, str]): 2274 if isinstance(repo, Repository): 2275 repo_name = repo.name 2276 else: 2277 repo_name = repo 2278 self.allspice_client.requests_put(Team.ADD_REPO % (self.id, org.username, repo_name)) 2279 2280 def get_members(self): 2281 """Get all users assigned to the team.""" 2282 results = self.allspice_client.requests_get_paginated( 2283 Team.GET_MEMBERS % self.id, 2284 ) 2285 return [User.parse_response(self.allspice_client, result) for result in results] 2286 2287 def get_repos(self): 2288 """Get all repos of this Team.""" 2289 results = self.allspice_client.requests_get(Team.GET_REPOS % self.id) 2290 return [Repository.parse_response(self.allspice_client, result) for result in results] 2291 2292 def delete(self): 2293 self.allspice_client.requests_delete(Team.TEAM_DELETE % self.id) 2294 self.deleted = True 2295 2296 def remove_team_member(self, user_name: str): 2297 url = f"/teams/{self.id}/members/{user_name}" 2298 self.allspice_client.requests_delete(url)
2268 def add_user(self, user: User): 2269 """https://hub.allspice.io/api/swagger#/organization/orgAddTeamMember""" 2270 url = f"/teams/{self.id}/members/{user.login}" 2271 self.allspice_client.requests_put(url)
allspice.allspice.io/api/swagger#/organization/orgAddTeamMember">https://huballspice.allspice.io/api/swagger#/organization/orgAddTeamMember
2280 def get_members(self): 2281 """Get all users assigned to the team.""" 2282 results = self.allspice_client.requests_get_paginated( 2283 Team.GET_MEMBERS % self.id, 2284 ) 2285 return [User.parse_response(self.allspice_client, result) for result in results]
Get all users assigned to the team.
2287 def get_repos(self): 2288 """Get all repos of this Team.""" 2289 results = self.allspice_client.requests_get(Team.GET_REPOS % self.id) 2290 return [Repository.parse_response(self.allspice_client, result) for result in results]
Get all repos of this Team.
241class User(ApiObject): 242 active: bool 243 admin: Any 244 allow_create_organization: Any 245 allow_git_hook: Any 246 allow_import_local: Any 247 avatar_url: str 248 created: str 249 description: str 250 email: str 251 emails: List[Any] 252 followers_count: int 253 following_count: int 254 full_name: str 255 html_url: str 256 id: int 257 is_admin: bool 258 language: str 259 last_login: str 260 location: str 261 login: str 262 login_name: str 263 max_repo_creation: Any 264 must_change_password: Any 265 password: Any 266 prohibit_login: bool 267 restricted: bool 268 source_id: int 269 starred_repos_count: int 270 username: str 271 visibility: str 272 website: str 273 274 API_OBJECT = """/users/{name}""" # <org> 275 USER_MAIL = """/user/emails?sudo=%s""" # <name> 276 USER_PATCH = """/admin/users/%s""" # <username> 277 ADMIN_DELETE_USER = """/admin/users/%s""" # <username> 278 ADMIN_EDIT_USER = """/admin/users/{username}""" # <username> 279 USER_HEATMAP = """/users/%s/heatmap""" # <username> 280 281 def __init__(self, allspice_client): 282 super().__init__(allspice_client) 283 self._emails = [] 284 285 def __eq__(self, other): 286 if not isinstance(other, User): 287 return False 288 return self.allspice_client == other.allspice_client and self.id == other.id 289 290 def __hash__(self): 291 return hash(self.allspice_client) ^ hash(self.id) 292 293 @property 294 def emails(self): 295 self.__request_emails() 296 return self._emails 297 298 @classmethod 299 def request(cls, allspice_client, name: str) -> "User": 300 api_object = cls._request(allspice_client, {"name": name}) 301 return api_object 302 303 _patchable_fields: ClassVar[set[str]] = { 304 "active", 305 "admin", 306 "allow_create_organization", 307 "allow_git_hook", 308 "allow_import_local", 309 "email", 310 "full_name", 311 "location", 312 "login_name", 313 "max_repo_creation", 314 "must_change_password", 315 "password", 316 "prohibit_login", 317 "website", 318 } 319 320 def commit(self, login_name: str, source_id: int = 0): 321 """ 322 Unfortunately it is necessary to require the login name 323 as well as the login source (that is not supplied when getting a user) for 324 changing a user. 325 Usually source_id is 0 and the login_name is equal to the username. 326 """ 327 values = self.get_dirty_fields() 328 values.update( 329 # api-doc says that the "source_id" is necessary; works without though 330 {"login_name": login_name, "source_id": source_id} 331 ) 332 args = {"username": self.username} 333 self.allspice_client.requests_patch(User.ADMIN_EDIT_USER.format(**args), data=values) 334 self._dirty_fields = {} 335 336 def create_repo( 337 self, 338 repoName: str, 339 description: str = "", 340 private: bool = False, 341 autoInit=True, 342 gitignores: Optional[str] = None, 343 license: Optional[str] = None, 344 readme: str = "Default", 345 issue_labels: Optional[str] = None, 346 default_branch="master", 347 ): 348 """Create a user Repository 349 350 Throws: 351 AlreadyExistsException: If the Repository exists already. 352 Exception: If something else went wrong. 353 """ 354 result = self.allspice_client.requests_post( 355 "/user/repos", 356 data={ 357 "name": repoName, 358 "description": description, 359 "private": private, 360 "auto_init": autoInit, 361 "gitignores": gitignores, 362 "license": license, 363 "issue_labels": issue_labels, 364 "readme": readme, 365 "default_branch": default_branch, 366 }, 367 ) 368 if "id" in result: 369 self.allspice_client.logger.info("Successfully created Repository %s " % result["name"]) 370 else: 371 self.allspice_client.logger.error(result["message"]) 372 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 373 return Repository.parse_response(self.allspice_client, result) 374 375 def get_repositories(self) -> List["Repository"]: 376 """Get all Repositories owned by this User.""" 377 url = f"/users/{self.username}/repos" 378 results = self.allspice_client.requests_get_paginated(url) 379 return [Repository.parse_response(self.allspice_client, result) for result in results] 380 381 def get_orgs(self) -> List[Organization]: 382 """Get all Organizations this user is a member of.""" 383 url = f"/users/{self.username}/orgs" 384 results = self.allspice_client.requests_get_paginated(url) 385 return [Organization.parse_response(self.allspice_client, result) for result in results] 386 387 def get_teams(self) -> List["Team"]: 388 url = "/user/teams" 389 results = self.allspice_client.requests_get_paginated(url, sudo=self) 390 return [Team.parse_response(self.allspice_client, result) for result in results] 391 392 def get_accessible_repos(self) -> List["Repository"]: 393 """Get all Repositories accessible by the logged in User.""" 394 results = self.allspice_client.requests_get("/user/repos", sudo=self) 395 return [Repository.parse_response(self.allspice_client, result) for result in results] 396 397 def __request_emails(self): 398 result = self.allspice_client.requests_get(User.USER_MAIL % self.login) 399 # report if the adress changed by this 400 for mail in result: 401 self._emails.append(mail["email"]) 402 if mail["primary"]: 403 self._email = mail["email"] 404 405 def delete(self): 406 """Deletes this User. Also deletes all Repositories he owns.""" 407 self.allspice_client.requests_delete(User.ADMIN_DELETE_USER % self.username) 408 self.deleted = True 409 410 def get_heatmap(self) -> List[Tuple[datetime, int]]: 411 results = self.allspice_client.requests_get(User.USER_HEATMAP % self.username) 412 results = [ 413 (datetime.fromtimestamp(result["timestamp"]), result["contributions"]) 414 for result in results 415 ] 416 return results
320 def commit(self, login_name: str, source_id: int = 0): 321 """ 322 Unfortunately it is necessary to require the login name 323 as well as the login source (that is not supplied when getting a user) for 324 changing a user. 325 Usually source_id is 0 and the login_name is equal to the username. 326 """ 327 values = self.get_dirty_fields() 328 values.update( 329 # api-doc says that the "source_id" is necessary; works without though 330 {"login_name": login_name, "source_id": source_id} 331 ) 332 args = {"username": self.username} 333 self.allspice_client.requests_patch(User.ADMIN_EDIT_USER.format(**args), data=values) 334 self._dirty_fields = {}
Unfortunately it is necessary to require the login name as well as the login source (that is not supplied when getting a user) for changing a user. Usually source_id is 0 and the login_name is equal to the username.
336 def create_repo( 337 self, 338 repoName: str, 339 description: str = "", 340 private: bool = False, 341 autoInit=True, 342 gitignores: Optional[str] = None, 343 license: Optional[str] = None, 344 readme: str = "Default", 345 issue_labels: Optional[str] = None, 346 default_branch="master", 347 ): 348 """Create a user Repository 349 350 Throws: 351 AlreadyExistsException: If the Repository exists already. 352 Exception: If something else went wrong. 353 """ 354 result = self.allspice_client.requests_post( 355 "/user/repos", 356 data={ 357 "name": repoName, 358 "description": description, 359 "private": private, 360 "auto_init": autoInit, 361 "gitignores": gitignores, 362 "license": license, 363 "issue_labels": issue_labels, 364 "readme": readme, 365 "default_branch": default_branch, 366 }, 367 ) 368 if "id" in result: 369 self.allspice_client.logger.info("Successfully created Repository %s " % result["name"]) 370 else: 371 self.allspice_client.logger.error(result["message"]) 372 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 373 return Repository.parse_response(self.allspice_client, result)
Create a user Repository
Throws: AlreadyExistsException: If the Repository exists already. Exception: If something else went wrong.
375 def get_repositories(self) -> List["Repository"]: 376 """Get all Repositories owned by this User.""" 377 url = f"/users/{self.username}/repos" 378 results = self.allspice_client.requests_get_paginated(url) 379 return [Repository.parse_response(self.allspice_client, result) for result in results]
Get all Repositories owned by this User.
381 def get_orgs(self) -> List[Organization]: 382 """Get all Organizations this user is a member of.""" 383 url = f"/users/{self.username}/orgs" 384 results = self.allspice_client.requests_get_paginated(url) 385 return [Organization.parse_response(self.allspice_client, result) for result in results]
Get all Organizations this user is a member of.
392 def get_accessible_repos(self) -> List["Repository"]: 393 """Get all Repositories accessible by the logged in User.""" 394 results = self.allspice_client.requests_get("/user/repos", sudo=self) 395 return [Repository.parse_response(self.allspice_client, result) for result in results]
Get all Repositories accessible by the logged in User.
405 def delete(self): 406 """Deletes this User. Also deletes all Repositories he owns.""" 407 self.allspice_client.requests_delete(User.ADMIN_DELETE_USER % self.username) 408 self.deleted = True
Deletes this User. Also deletes all Repositories he owns.