allspice
A very simple API client for AllSpice Hub
Note that not the full Swagger-API is accessible. The whole implementation is focused on making access and working with Organizations, Teams, Repositories and Users as pain free as possible.
Forked from https://github.com/Langenfeld/py-gitea.
Usage
Docs
See the documentation site.
Examples
Check the examples directory for full, working example scripts that you can adapt or refer to for your own needs.
Quickstart
First get an allspice_client
object wrapping access and authentication (via an api token) for your instance of AllSpice Hub.
from allspice import *
# By default, points to hub.allspice.io.
allspice_client = AllSpice(token_text=TOKEN)
# If you are self-hosting:
allspice_client = AllSpice(allspice_hub_url=URL, token_text=TOKEN)
Operations like requesting the AllSpice version or authentication user can be requested directly from the allspice_client
object:
print("AllSpice Version: " + allspice_client.get_version())
print("API-Token belongs to user: " + allspice_client.get_user().username)
Adding entities like Users, Organizations, ... also is done via the allspice_client object.
user = allspice_client.create_user("Test Testson", "test@test.test", "password")
All operations on entities in allspice are then accomplished via the according wrapper objects for those entities.
Each of those objects has a .request
method that creates an entity according to your allspice_client instance.
other_user = User.request(allspice_client, "OtherUserName")
print(other_user.username)
Note that the fields of the User, Organization,... classes are dynamically created at runtime, and thus not visible during divelopment. Refer to the AllSpice API documentation for the fields names.
Fields that can not be altered via allspice-api, are read only. After altering a field, the .commit
method of the according object must be called to synchronize the changed fields with your allspice_client instance.
org = Organization.request(allspice_client, test_org)
org.description = "some new description"
org.location = "some new location"
org.commit()
An entity in allspice can be deleted by calling delete.
org.delete()
All entity objects do have methods to execute some of the requests possible though the AllSpice api:
org = Organization.request(allspice_client, ORGNAME)
teams = org.get_teams()
for team in teams:
repos = team.get_repos()
for repo in repos:
print(repo.name)
1""" 2.. include:: ../README.md 3 :start-line: 1 4 :end-before: Installation 5""" 6 7from .allspice import ( 8 AllSpice, 9) 10from .apiobject import ( 11 Branch, 12 Comment, 13 Commit, 14 Content, 15 DesignReview, 16 Issue, 17 Milestone, 18 Organization, 19 Release, 20 Repository, 21 Team, 22 User, 23) 24from .exceptions import AlreadyExistsException, NotFoundException 25 26__version__ = "3.9.0" 27 28__all__ = [ 29 "AllSpice", 30 "AlreadyExistsException", 31 "Branch", 32 "Comment", 33 "Commit", 34 "Content", 35 "DesignReview", 36 "Issue", 37 "Milestone", 38 "NotFoundException", 39 "Organization", 40 "Release", 41 "Repository", 42 "Team", 43 "User", 44]
21class AllSpice: 22 """Object to establish a session with AllSpice Hub.""" 23 24 ADMIN_CREATE_USER = """/admin/users""" 25 GET_USERS_ADMIN = """/admin/users""" 26 ADMIN_REPO_CREATE = """/admin/users/%s/repos""" # <ownername> 27 ALLSPICE_HUB_VERSION = """/version""" 28 GET_USER = """/user""" 29 GET_REPOSITORY = """/repos/{owner}/{name}""" 30 CREATE_ORG = """/admin/users/%s/orgs""" # <username> 31 CREATE_TEAM = """/orgs/%s/teams""" # <orgname> 32 33 def __init__( 34 self, 35 allspice_hub_url="https://hub.allspice.io", 36 token_text=None, 37 auth=None, 38 verify=True, 39 log_level="INFO", 40 ratelimiting=(100, 60), 41 ): 42 """Initializing an instance of the AllSpice Hub Client 43 44 Args: 45 allspice_hub_url (str): The URL for the AllSpice Hub instance. 46 Defaults to `https://hub.allspice.io`. 47 48 token_text (str, None): The access token, by default None. 49 50 auth (tuple, None): The user credentials 51 `(username, password)`, by default None. 52 53 verify (bool): If True, allow insecure server connections 54 when using SSL. 55 56 log_level (str): The log level, by default `INFO`. 57 58 ratelimiting (tuple[int, int], None): `(max_calls, period)`, 59 If None, no rate limiting is applied. By default, 100 calls 60 per minute are allowed. 61 """ 62 63 self.logger = logging.getLogger(__name__) 64 handler = logging.StreamHandler(sys.stderr) 65 handler.setFormatter( 66 logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") 67 ) 68 self.logger.addHandler(handler) 69 self.logger.setLevel(log_level) 70 self.headers = { 71 "Content-type": "application/json", 72 } 73 self.url = allspice_hub_url 74 75 if ratelimiting is None: 76 self.requests = requests.Session() 77 else: 78 (max_calls, period) = ratelimiting 79 self.requests = RateLimitedSession(max_calls=max_calls, period=period) 80 81 # Manage authentification 82 if not token_text and not auth: 83 raise ValueError("Please provide auth or token_text, but not both") 84 if token_text: 85 self.headers["Authorization"] = "token " + token_text 86 if auth: 87 self.logger.warning( 88 "Using basic auth is not recommended. Prefer using a token instead." 89 ) 90 self.requests.auth = auth 91 92 # Manage SSL certification verification 93 self.requests.verify = verify 94 if not verify: 95 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) 96 97 def __get_url(self, endpoint): 98 url = self.url + "/api/v1" + endpoint 99 self.logger.debug("Url: %s" % url) 100 return url 101 102 def __get(self, endpoint: str, params: Mapping = frozendict()) -> requests.Response: 103 request = self.requests.get(self.__get_url(endpoint), headers=self.headers, params=params) 104 if request.status_code not in [200, 201]: 105 message = f"Received status code: {request.status_code} ({request.url})" 106 if request.status_code in [404]: 107 raise NotFoundException(message) 108 if request.status_code in [403]: 109 raise Exception( 110 f"Unauthorized: {request.url} - Check your permissions and try again! ({message})" 111 ) 112 if request.status_code in [409]: 113 raise ConflictException(message) 114 if request.status_code in [503]: 115 raise NotYetGeneratedException(message) 116 raise Exception(message) 117 return request 118 119 @staticmethod 120 def parse_result(result) -> Dict: 121 """Parses the result-JSON to a dict.""" 122 if result.text and len(result.text) > 3: 123 return json.loads(result.text) 124 return {} 125 126 def requests_get(self, endpoint: str, params: Mapping = frozendict(), sudo=None): 127 combined_params = {} 128 combined_params.update(params) 129 if sudo: 130 combined_params["sudo"] = sudo.username 131 return self.parse_result(self.__get(endpoint, combined_params)) 132 133 def requests_get_raw(self, endpoint: str, params=frozendict(), sudo=None) -> bytes: 134 combined_params = {} 135 combined_params.update(params) 136 if sudo: 137 combined_params["sudo"] = sudo.username 138 return self.__get(endpoint, combined_params).content 139 140 def requests_get_paginated( 141 self, 142 endpoint: str, 143 params=frozendict(), 144 sudo=None, 145 page_key: str = "page", 146 first_page: int = 1, 147 ): 148 page = first_page 149 combined_params = {} 150 combined_params.update(params) 151 aggregated_result = [] 152 while True: 153 combined_params[page_key] = page 154 result = self.requests_get(endpoint, combined_params, sudo) 155 156 if not result: 157 return aggregated_result 158 159 if isinstance(result, dict): 160 if "data" in result: 161 data = result["data"] 162 if len(data) == 0: 163 return aggregated_result 164 aggregated_result.extend(data) 165 elif "tree" in result: 166 data = result["tree"] 167 if data is None or len(data) == 0: 168 return aggregated_result 169 aggregated_result.extend(data) 170 else: 171 raise NotImplementedError( 172 "requests_get_paginated does not know how to handle responses of this type." 173 ) 174 else: 175 aggregated_result.extend(result) 176 177 page += 1 178 179 def requests_put(self, endpoint: str, data: Optional[dict] = None): 180 if not data: 181 data = {} 182 request = self.requests.put( 183 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 184 ) 185 if request.status_code not in [200, 204]: 186 message = f"Received status code: {request.status_code} ({request.url}) {request.text}" 187 self.logger.error(message) 188 raise Exception(message) 189 190 def requests_delete(self, endpoint: str, data: Optional[dict] = None): 191 request = self.requests.delete( 192 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 193 ) 194 if request.status_code not in [200, 204]: 195 message = f"Received status code: {request.status_code} ({request.url})" 196 self.logger.error(message) 197 raise Exception(message) 198 199 def requests_post( 200 self, 201 endpoint: str, 202 data: Optional[dict] = None, 203 params: Optional[dict] = None, 204 files: Optional[dict] = None, 205 ): 206 """ 207 Make a POST call to the endpoint. 208 209 :param endpoint: The path to the endpoint 210 :param data: A dictionary for JSON data 211 :param params: A dictionary of query params 212 :param files: A dictionary of files, see requests.post. Using both files and data 213 can lead to unexpected results! 214 :return: The JSON response parsed as a dict 215 """ 216 217 # This should ideally be a TypedDict of the type of arguments taken by 218 # `requests.post`. 219 args: dict[str, Any] = { 220 "headers": self.headers.copy(), 221 } 222 if data is not None: 223 args["data"] = json.dumps(data) 224 if params is not None: 225 args["params"] = params 226 if files is not None: 227 args["headers"].pop("Content-type") 228 args["files"] = files 229 230 request = self.requests.post(self.__get_url(endpoint), **args) 231 232 if request.status_code not in [200, 201, 202]: 233 if "already exists" in request.text or "e-mail already in use" in request.text: 234 self.logger.warning(request.text) 235 raise AlreadyExistsException() 236 self.logger.error(f"Received status code: {request.status_code} ({request.url})") 237 self.logger.error(f"With info: {data} ({self.headers})") 238 self.logger.error(f"Answer: {request.text}") 239 raise Exception( 240 f"Received status code: {request.status_code} ({request.url}), {request.text}" 241 ) 242 return self.parse_result(request) 243 244 def requests_patch(self, endpoint: str, data: dict): 245 request = self.requests.patch( 246 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 247 ) 248 if request.status_code not in [200, 201]: 249 error_message = f"Received status code: {request.status_code} ({request.url}) {data}" 250 self.logger.error(error_message) 251 raise Exception(error_message) 252 return self.parse_result(request) 253 254 def get_orgs_public_members_all(self, orgname): 255 path = "/orgs/" + orgname + "/public_members" 256 return self.requests_get(path) 257 258 def get_orgs(self): 259 path = "/admin/orgs" 260 results = self.requests_get(path) 261 return [Organization.parse_response(self, result) for result in results] 262 263 def get_user(self): 264 result = self.requests_get(AllSpice.GET_USER) 265 return User.parse_response(self, result) 266 267 def get_version(self) -> str: 268 result = self.requests_get(AllSpice.ALLSPICE_HUB_VERSION) 269 return result["version"] 270 271 def get_users(self) -> List[User]: 272 results = self.requests_get(AllSpice.GET_USERS_ADMIN) 273 return [User.parse_response(self, result) for result in results] 274 275 def get_user_by_email(self, email: str) -> Optional[User]: 276 users = self.get_users() 277 for user in users: 278 if user.email == email or email in user.emails: 279 return user 280 return None 281 282 def get_user_by_name(self, username: str) -> Optional[User]: 283 users = self.get_users() 284 for user in users: 285 if user.username == username: 286 return user 287 return None 288 289 def get_repository(self, owner: str, name: str) -> Repository: 290 path = self.GET_REPOSITORY.format(owner=owner, name=name) 291 result = self.requests_get(path) 292 return Repository.parse_response(self, result) 293 294 def create_user( 295 self, 296 user_name: str, 297 email: str, 298 password: str, 299 full_name: Optional[str] = None, 300 login_name: Optional[str] = None, 301 change_pw=True, 302 send_notify=True, 303 source_id=0, 304 ): 305 """Create User. 306 Throws: 307 AlreadyExistsException, if the User exists already 308 Exception, if something else went wrong. 309 """ 310 if not login_name: 311 login_name = user_name 312 if not full_name: 313 full_name = user_name 314 request_data = { 315 "source_id": source_id, 316 "login_name": login_name, 317 "full_name": full_name, 318 "username": user_name, 319 "email": email, 320 "password": password, 321 "send_notify": send_notify, 322 "must_change_password": change_pw, 323 } 324 325 self.logger.debug("Gitea post payload: %s", request_data) 326 result = self.requests_post(AllSpice.ADMIN_CREATE_USER, data=request_data) 327 if "id" in result: 328 self.logger.info( 329 "Successfully created User %s <%s> (id %s)", 330 result["login"], 331 result["email"], 332 result["id"], 333 ) 334 self.logger.debug("Gitea response: %s", result) 335 else: 336 self.logger.error(result["message"]) 337 raise Exception("User not created... (gitea: %s)" % result["message"]) 338 user = User.parse_response(self, result) 339 return user 340 341 def create_repo( 342 self, 343 repoOwner: Union[User, Organization], 344 repoName: str, 345 description: str = "", 346 private: bool = False, 347 autoInit=True, 348 gitignores: Optional[str] = None, 349 license: Optional[str] = None, 350 readme: str = "Default", 351 issue_labels: Optional[str] = None, 352 default_branch="master", 353 ): 354 """Create a Repository as the administrator 355 356 Throws: 357 AlreadyExistsException: If the Repository exists already. 358 Exception: If something else went wrong. 359 360 Note: 361 Non-admin users can not use this method. Please use instead 362 `allspice.User.create_repo` or `allspice.Organization.create_repo`. 363 """ 364 # although this only says user in the api, this also works for 365 # organizations 366 assert isinstance(repoOwner, User) or isinstance(repoOwner, Organization) 367 result = self.requests_post( 368 AllSpice.ADMIN_REPO_CREATE % repoOwner.username, 369 data={ 370 "name": repoName, 371 "description": description, 372 "private": private, 373 "auto_init": autoInit, 374 "gitignores": gitignores, 375 "license": license, 376 "issue_labels": issue_labels, 377 "readme": readme, 378 "default_branch": default_branch, 379 }, 380 ) 381 if "id" in result: 382 self.logger.info("Successfully created Repository %s " % result["name"]) 383 else: 384 self.logger.error(result["message"]) 385 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 386 return Repository.parse_response(self, result) 387 388 def create_org( 389 self, 390 owner: User, 391 orgName: str, 392 description: str, 393 location="", 394 website="", 395 full_name="", 396 ): 397 assert isinstance(owner, User) 398 result = self.requests_post( 399 AllSpice.CREATE_ORG % owner.username, 400 data={ 401 "username": orgName, 402 "description": description, 403 "location": location, 404 "website": website, 405 "full_name": full_name, 406 }, 407 ) 408 if "id" in result: 409 self.logger.info("Successfully created Organization %s" % result["username"]) 410 else: 411 self.logger.error("Organization not created... (gitea: %s)" % result["message"]) 412 self.logger.error(result["message"]) 413 raise Exception("Organization not created... (gitea: %s)" % result["message"]) 414 return Organization.parse_response(self, result) 415 416 def create_team( 417 self, 418 org: Organization, 419 name: str, 420 description: str = "", 421 permission: str = "read", 422 can_create_org_repo: bool = False, 423 includes_all_repositories: bool = False, 424 units=( 425 "repo.code", 426 "repo.issues", 427 "repo.ext_issues", 428 "repo.wiki", 429 "repo.pulls", 430 "repo.releases", 431 "repo.ext_wiki", 432 ), 433 units_map={}, 434 ): 435 """Creates a Team. 436 437 Args: 438 org (Organization): Organization the Team will be part of. 439 name (str): The Name of the Team to be created. 440 description (str): Optional, None, short description of the new Team. 441 permission (str): Optional, 'read', What permissions the members 442 units_map (dict): Optional, {}, a mapping of units to their 443 permissions. If None or empty, the `permission` permission will 444 be applied to all units. Note: When both `units` and `units_map` 445 are given, `units_map` will be preferred. 446 """ 447 448 result = self.requests_post( 449 AllSpice.CREATE_TEAM % org.username, 450 data={ 451 "name": name, 452 "description": description, 453 "permission": permission, 454 "can_create_org_repo": can_create_org_repo, 455 "includes_all_repositories": includes_all_repositories, 456 "units": units, 457 "units_map": units_map, 458 }, 459 ) 460 461 if "id" in result: 462 self.logger.info("Successfully created Team %s" % result["name"]) 463 else: 464 self.logger.error("Team not created... (gitea: %s)" % result["message"]) 465 self.logger.error(result["message"]) 466 raise Exception("Team not created... (gitea: %s)" % result["message"]) 467 api_object = Team.parse_response(self, result) 468 setattr( 469 api_object, "_organization", org 470 ) # fixes strange behaviour of gitea not returning a valid organization here. 471 return api_object
Object to establish a session with AllSpice Hub.
33 def __init__( 34 self, 35 allspice_hub_url="https://hub.allspice.io", 36 token_text=None, 37 auth=None, 38 verify=True, 39 log_level="INFO", 40 ratelimiting=(100, 60), 41 ): 42 """Initializing an instance of the AllSpice Hub Client 43 44 Args: 45 allspice_hub_url (str): The URL for the AllSpice Hub instance. 46 Defaults to `https://hub.allspice.io`. 47 48 token_text (str, None): The access token, by default None. 49 50 auth (tuple, None): The user credentials 51 `(username, password)`, by default None. 52 53 verify (bool): If True, allow insecure server connections 54 when using SSL. 55 56 log_level (str): The log level, by default `INFO`. 57 58 ratelimiting (tuple[int, int], None): `(max_calls, period)`, 59 If None, no rate limiting is applied. By default, 100 calls 60 per minute are allowed. 61 """ 62 63 self.logger = logging.getLogger(__name__) 64 handler = logging.StreamHandler(sys.stderr) 65 handler.setFormatter( 66 logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") 67 ) 68 self.logger.addHandler(handler) 69 self.logger.setLevel(log_level) 70 self.headers = { 71 "Content-type": "application/json", 72 } 73 self.url = allspice_hub_url 74 75 if ratelimiting is None: 76 self.requests = requests.Session() 77 else: 78 (max_calls, period) = ratelimiting 79 self.requests = RateLimitedSession(max_calls=max_calls, period=period) 80 81 # Manage authentification 82 if not token_text and not auth: 83 raise ValueError("Please provide auth or token_text, but not both") 84 if token_text: 85 self.headers["Authorization"] = "token " + token_text 86 if auth: 87 self.logger.warning( 88 "Using basic auth is not recommended. Prefer using a token instead." 89 ) 90 self.requests.auth = auth 91 92 # Manage SSL certification verification 93 self.requests.verify = verify 94 if not verify: 95 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
Initializing an instance of the AllSpice Hub Client
Args:
allspice_hub_url (str): The URL for the AllSpice Hub instance.
Defaults to https://hub.allspice.io
.
token_text (str, None): The access token, by default None.
auth (tuple, None): The user credentials
`(username, password)`, by default None.
verify (bool): If True, allow insecure server connections
when using SSL.
log_level (str): The log level, by default `INFO`.
ratelimiting (tuple[int, int], None): `(max_calls, period)`,
If None, no rate limiting is applied. By default, 100 calls
per minute are allowed.
119 @staticmethod 120 def parse_result(result) -> Dict: 121 """Parses the result-JSON to a dict.""" 122 if result.text and len(result.text) > 3: 123 return json.loads(result.text) 124 return {}
Parses the result-JSON to a dict.
140 def requests_get_paginated( 141 self, 142 endpoint: str, 143 params=frozendict(), 144 sudo=None, 145 page_key: str = "page", 146 first_page: int = 1, 147 ): 148 page = first_page 149 combined_params = {} 150 combined_params.update(params) 151 aggregated_result = [] 152 while True: 153 combined_params[page_key] = page 154 result = self.requests_get(endpoint, combined_params, sudo) 155 156 if not result: 157 return aggregated_result 158 159 if isinstance(result, dict): 160 if "data" in result: 161 data = result["data"] 162 if len(data) == 0: 163 return aggregated_result 164 aggregated_result.extend(data) 165 elif "tree" in result: 166 data = result["tree"] 167 if data is None or len(data) == 0: 168 return aggregated_result 169 aggregated_result.extend(data) 170 else: 171 raise NotImplementedError( 172 "requests_get_paginated does not know how to handle responses of this type." 173 ) 174 else: 175 aggregated_result.extend(result) 176 177 page += 1
179 def requests_put(self, endpoint: str, data: Optional[dict] = None): 180 if not data: 181 data = {} 182 request = self.requests.put( 183 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 184 ) 185 if request.status_code not in [200, 204]: 186 message = f"Received status code: {request.status_code} ({request.url}) {request.text}" 187 self.logger.error(message) 188 raise Exception(message)
190 def requests_delete(self, endpoint: str, data: Optional[dict] = None): 191 request = self.requests.delete( 192 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 193 ) 194 if request.status_code not in [200, 204]: 195 message = f"Received status code: {request.status_code} ({request.url})" 196 self.logger.error(message) 197 raise Exception(message)
199 def requests_post( 200 self, 201 endpoint: str, 202 data: Optional[dict] = None, 203 params: Optional[dict] = None, 204 files: Optional[dict] = None, 205 ): 206 """ 207 Make a POST call to the endpoint. 208 209 :param endpoint: The path to the endpoint 210 :param data: A dictionary for JSON data 211 :param params: A dictionary of query params 212 :param files: A dictionary of files, see requests.post. Using both files and data 213 can lead to unexpected results! 214 :return: The JSON response parsed as a dict 215 """ 216 217 # This should ideally be a TypedDict of the type of arguments taken by 218 # `requests.post`. 219 args: dict[str, Any] = { 220 "headers": self.headers.copy(), 221 } 222 if data is not None: 223 args["data"] = json.dumps(data) 224 if params is not None: 225 args["params"] = params 226 if files is not None: 227 args["headers"].pop("Content-type") 228 args["files"] = files 229 230 request = self.requests.post(self.__get_url(endpoint), **args) 231 232 if request.status_code not in [200, 201, 202]: 233 if "already exists" in request.text or "e-mail already in use" in request.text: 234 self.logger.warning(request.text) 235 raise AlreadyExistsException() 236 self.logger.error(f"Received status code: {request.status_code} ({request.url})") 237 self.logger.error(f"With info: {data} ({self.headers})") 238 self.logger.error(f"Answer: {request.text}") 239 raise Exception( 240 f"Received status code: {request.status_code} ({request.url}), {request.text}" 241 ) 242 return self.parse_result(request)
Make a POST call to the endpoint.
Parameters
- endpoint: The path to the endpoint
- data: A dictionary for JSON data
- params: A dictionary of query params
- files: A dictionary of files, see requests.post. Using both files and data can lead to unexpected results!
Returns
The JSON response parsed as a dict
244 def requests_patch(self, endpoint: str, data: dict): 245 request = self.requests.patch( 246 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 247 ) 248 if request.status_code not in [200, 201]: 249 error_message = f"Received status code: {request.status_code} ({request.url}) {data}" 250 self.logger.error(error_message) 251 raise Exception(error_message) 252 return self.parse_result(request)
294 def create_user( 295 self, 296 user_name: str, 297 email: str, 298 password: str, 299 full_name: Optional[str] = None, 300 login_name: Optional[str] = None, 301 change_pw=True, 302 send_notify=True, 303 source_id=0, 304 ): 305 """Create User. 306 Throws: 307 AlreadyExistsException, if the User exists already 308 Exception, if something else went wrong. 309 """ 310 if not login_name: 311 login_name = user_name 312 if not full_name: 313 full_name = user_name 314 request_data = { 315 "source_id": source_id, 316 "login_name": login_name, 317 "full_name": full_name, 318 "username": user_name, 319 "email": email, 320 "password": password, 321 "send_notify": send_notify, 322 "must_change_password": change_pw, 323 } 324 325 self.logger.debug("Gitea post payload: %s", request_data) 326 result = self.requests_post(AllSpice.ADMIN_CREATE_USER, data=request_data) 327 if "id" in result: 328 self.logger.info( 329 "Successfully created User %s <%s> (id %s)", 330 result["login"], 331 result["email"], 332 result["id"], 333 ) 334 self.logger.debug("Gitea response: %s", result) 335 else: 336 self.logger.error(result["message"]) 337 raise Exception("User not created... (gitea: %s)" % result["message"]) 338 user = User.parse_response(self, result) 339 return user
Create User. Throws: AlreadyExistsException, if the User exists already Exception, if something else went wrong.
341 def create_repo( 342 self, 343 repoOwner: Union[User, Organization], 344 repoName: str, 345 description: str = "", 346 private: bool = False, 347 autoInit=True, 348 gitignores: Optional[str] = None, 349 license: Optional[str] = None, 350 readme: str = "Default", 351 issue_labels: Optional[str] = None, 352 default_branch="master", 353 ): 354 """Create a Repository as the administrator 355 356 Throws: 357 AlreadyExistsException: If the Repository exists already. 358 Exception: If something else went wrong. 359 360 Note: 361 Non-admin users can not use this method. Please use instead 362 `allspice.User.create_repo` or `allspice.Organization.create_repo`. 363 """ 364 # although this only says user in the api, this also works for 365 # organizations 366 assert isinstance(repoOwner, User) or isinstance(repoOwner, Organization) 367 result = self.requests_post( 368 AllSpice.ADMIN_REPO_CREATE % repoOwner.username, 369 data={ 370 "name": repoName, 371 "description": description, 372 "private": private, 373 "auto_init": autoInit, 374 "gitignores": gitignores, 375 "license": license, 376 "issue_labels": issue_labels, 377 "readme": readme, 378 "default_branch": default_branch, 379 }, 380 ) 381 if "id" in result: 382 self.logger.info("Successfully created Repository %s " % result["name"]) 383 else: 384 self.logger.error(result["message"]) 385 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 386 return Repository.parse_response(self, result)
Create a Repository as the administrator
Throws: AlreadyExistsException: If the Repository exists already. Exception: If something else went wrong.
Note:
Non-admin users can not use this method. Please use instead
allspice.User.create_repo
or allspice.Organization.create_repo
.
388 def create_org( 389 self, 390 owner: User, 391 orgName: str, 392 description: str, 393 location="", 394 website="", 395 full_name="", 396 ): 397 assert isinstance(owner, User) 398 result = self.requests_post( 399 AllSpice.CREATE_ORG % owner.username, 400 data={ 401 "username": orgName, 402 "description": description, 403 "location": location, 404 "website": website, 405 "full_name": full_name, 406 }, 407 ) 408 if "id" in result: 409 self.logger.info("Successfully created Organization %s" % result["username"]) 410 else: 411 self.logger.error("Organization not created... (gitea: %s)" % result["message"]) 412 self.logger.error(result["message"]) 413 raise Exception("Organization not created... (gitea: %s)" % result["message"]) 414 return Organization.parse_response(self, result)
416 def create_team( 417 self, 418 org: Organization, 419 name: str, 420 description: str = "", 421 permission: str = "read", 422 can_create_org_repo: bool = False, 423 includes_all_repositories: bool = False, 424 units=( 425 "repo.code", 426 "repo.issues", 427 "repo.ext_issues", 428 "repo.wiki", 429 "repo.pulls", 430 "repo.releases", 431 "repo.ext_wiki", 432 ), 433 units_map={}, 434 ): 435 """Creates a Team. 436 437 Args: 438 org (Organization): Organization the Team will be part of. 439 name (str): The Name of the Team to be created. 440 description (str): Optional, None, short description of the new Team. 441 permission (str): Optional, 'read', What permissions the members 442 units_map (dict): Optional, {}, a mapping of units to their 443 permissions. If None or empty, the `permission` permission will 444 be applied to all units. Note: When both `units` and `units_map` 445 are given, `units_map` will be preferred. 446 """ 447 448 result = self.requests_post( 449 AllSpice.CREATE_TEAM % org.username, 450 data={ 451 "name": name, 452 "description": description, 453 "permission": permission, 454 "can_create_org_repo": can_create_org_repo, 455 "includes_all_repositories": includes_all_repositories, 456 "units": units, 457 "units_map": units_map, 458 }, 459 ) 460 461 if "id" in result: 462 self.logger.info("Successfully created Team %s" % result["name"]) 463 else: 464 self.logger.error("Team not created... (gitea: %s)" % result["message"]) 465 self.logger.error(result["message"]) 466 raise Exception("Team not created... (gitea: %s)" % result["message"]) 467 api_object = Team.parse_response(self, result) 468 setattr( 469 api_object, "_organization", org 470 ) # fixes strange behaviour of gitea not returning a valid organization here. 471 return api_object
Creates a Team.
Args:
org (Organization): Organization the Team will be part of.
name (str): The Name of the Team to be created.
description (str): Optional, None, short description of the new Team.
permission (str): Optional, 'read', What permissions the members
units_map (dict): Optional, {}, a mapping of units to their
permissions. If None or empty, the permission
permission will
be applied to all units. Note: When both units
and units_map
are given, units_map
will be preferred.
Common base class for all non-exit exceptions.
419class Branch(ReadonlyApiObject): 420 commit: Dict[str, Optional[Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]]] 421 effective_branch_protection_name: str 422 enable_status_check: bool 423 name: str 424 protected: bool 425 required_approvals: int 426 status_check_contexts: List[Any] 427 user_can_merge: bool 428 user_can_push: bool 429 430 API_OBJECT = """/repos/{owner}/{repo}/branches/{branch}""" 431 432 def __init__(self, allspice_client): 433 super().__init__(allspice_client) 434 435 def __eq__(self, other): 436 if not isinstance(other, Branch): 437 return False 438 return self.commit == other.commit and self.name == other.name 439 440 def __hash__(self): 441 return hash(self.commit["id"]) ^ hash(self.name) 442 443 _fields_to_parsers: ClassVar[dict] = { 444 # This is not a commit object 445 # "commit": lambda allspice_client, c: Commit.parse_response(allspice_client, c) 446 } 447 448 @classmethod 449 def request(cls, allspice_client, owner: str, repo: str, branch: str): 450 return cls._request(allspice_client, {"owner": owner, "repo": repo, "branch": branch})
Inherited Members
1590class Comment(ApiObject): 1591 assets: List[Union[Any, Dict[str, Union[int, str]]]] 1592 body: str 1593 created_at: datetime 1594 html_url: str 1595 id: int 1596 issue_url: str 1597 original_author: str 1598 original_author_id: int 1599 pull_request_url: str 1600 updated_at: datetime 1601 user: User 1602 1603 API_OBJECT = """/repos/{owner}/{repo}/issues/comments/{id}""" 1604 GET_ATTACHMENTS_PATH = """/repos/{owner}/{repo}/issues/comments/{id}/assets""" 1605 ATTACHMENT_PATH = """/repos/{owner}/{repo}/issues/comments/{id}/assets/{attachment_id}""" 1606 1607 def __init__(self, allspice_client): 1608 super().__init__(allspice_client) 1609 1610 def __eq__(self, other): 1611 if not isinstance(other, Comment): 1612 return False 1613 return self.repository == other.repository and self.id == other.id 1614 1615 def __hash__(self): 1616 return hash(self.repository) ^ hash(self.id) 1617 1618 @classmethod 1619 def request(cls, allspice_client, owner: str, repo: str, id: str) -> "Comment": 1620 return cls._request(allspice_client, {"owner": owner, "repo": repo, "id": id}) 1621 1622 _fields_to_parsers: ClassVar[dict] = { 1623 "user": lambda allspice_client, r: User.parse_response(allspice_client, r), 1624 "created_at": lambda _, t: Util.convert_time(t), 1625 "updated_at": lambda _, t: Util.convert_time(t), 1626 } 1627 1628 _patchable_fields: ClassVar[set[str]] = {"body"} 1629 1630 @property 1631 def parent_url(self) -> str: 1632 """URL of the parent of this comment (the issue or the pull request)""" 1633 1634 if self.issue_url is not None and self.issue_url != "": 1635 return self.issue_url 1636 else: 1637 return self.pull_request_url 1638 1639 @cached_property 1640 def repository(self) -> Repository: 1641 """The repository this comment was posted on.""" 1642 1643 owner_name, repo_name = self.parent_url.split("/")[-4:-2] 1644 return Repository.request(self.allspice_client, owner_name, repo_name) 1645 1646 def __fields_for_path(self): 1647 return { 1648 "owner": self.repository.owner.username, 1649 "repo": self.repository.name, 1650 "id": self.id, 1651 } 1652 1653 def commit(self): 1654 self._commit(self.__fields_for_path()) 1655 1656 def delete(self): 1657 self.allspice_client.requests_delete(self.API_OBJECT.format(**self.__fields_for_path())) 1658 self.deleted = True 1659 1660 def get_attachments(self) -> List[Attachment]: 1661 """ 1662 Get all attachments on this comment. This returns Attachment objects, which 1663 contain a link to download the attachment. 1664 1665 https://hub.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments 1666 """ 1667 1668 results = self.allspice_client.requests_get( 1669 self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()) 1670 ) 1671 return [Attachment.parse_response(self.allspice_client, result) for result in results] 1672 1673 def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment: 1674 """ 1675 Create an attachment on this comment. 1676 1677 https://hub.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment 1678 1679 :param file: The file to attach. This should be a file-like object. 1680 :param name: The name of the file. If not provided, the name of the file will be 1681 used. 1682 :return: The created attachment. 1683 """ 1684 1685 args: dict[str, Any] = { 1686 "files": {"attachment": file}, 1687 } 1688 if name is not None: 1689 args["params"] = {"name": name} 1690 1691 result = self.allspice_client.requests_post( 1692 self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()), 1693 **args, 1694 ) 1695 return Attachment.parse_response(self.allspice_client, result) 1696 1697 def edit_attachment(self, attachment: Attachment, data: dict) -> Attachment: 1698 """ 1699 Edit an attachment. 1700 1701 The list of params that can be edited is available at 1702 https://hub.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment 1703 1704 :param attachment: The attachment to be edited 1705 :param data: The data parameter should be a dictionary of the fields to edit. 1706 :return: The edited attachment 1707 """ 1708 1709 args = { 1710 **self.__fields_for_path(), 1711 "attachment_id": attachment.id, 1712 } 1713 result = self.allspice_client.requests_patch( 1714 self.ATTACHMENT_PATH.format(**args), 1715 data=data, 1716 ) 1717 return Attachment.parse_response(self.allspice_client, result) 1718 1719 def delete_attachment(self, attachment: Attachment): 1720 """https://hub.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment""" 1721 1722 args = { 1723 **self.__fields_for_path(), 1724 "attachment_id": attachment.id, 1725 } 1726 self.allspice_client.requests_delete(self.ATTACHMENT_PATH.format(**args)) 1727 attachment.deleted = True
1630 @property 1631 def parent_url(self) -> str: 1632 """URL of the parent of this comment (the issue or the pull request)""" 1633 1634 if self.issue_url is not None and self.issue_url != "": 1635 return self.issue_url 1636 else: 1637 return self.pull_request_url
URL of the parent of this comment (the issue or the pull request)
1639 @cached_property 1640 def repository(self) -> Repository: 1641 """The repository this comment was posted on.""" 1642 1643 owner_name, repo_name = self.parent_url.split("/")[-4:-2] 1644 return Repository.request(self.allspice_client, owner_name, repo_name)
The repository this comment was posted on.
1660 def get_attachments(self) -> List[Attachment]: 1661 """ 1662 Get all attachments on this comment. This returns Attachment objects, which 1663 contain a link to download the attachment. 1664 1665 https://hub.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments 1666 """ 1667 1668 results = self.allspice_client.requests_get( 1669 self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()) 1670 ) 1671 return [Attachment.parse_response(self.allspice_client, result) for result in results]
Get all attachments on this comment. This returns Attachment objects, which contain a link to download the attachment.
allspice.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments">https://huballspice.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments
1673 def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment: 1674 """ 1675 Create an attachment on this comment. 1676 1677 https://hub.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment 1678 1679 :param file: The file to attach. This should be a file-like object. 1680 :param name: The name of the file. If not provided, the name of the file will be 1681 used. 1682 :return: The created attachment. 1683 """ 1684 1685 args: dict[str, Any] = { 1686 "files": {"attachment": file}, 1687 } 1688 if name is not None: 1689 args["params"] = {"name": name} 1690 1691 result = self.allspice_client.requests_post( 1692 self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()), 1693 **args, 1694 ) 1695 return Attachment.parse_response(self.allspice_client, result)
Create an attachment on this comment.
allspice.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment
Parameters
- file: The file to attach. This should be a file-like object.
- name: The name of the file. If not provided, the name of the file will be used.
Returns
The created attachment.
1697 def edit_attachment(self, attachment: Attachment, data: dict) -> Attachment: 1698 """ 1699 Edit an attachment. 1700 1701 The list of params that can be edited is available at 1702 https://hub.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment 1703 1704 :param attachment: The attachment to be edited 1705 :param data: The data parameter should be a dictionary of the fields to edit. 1706 :return: The edited attachment 1707 """ 1708 1709 args = { 1710 **self.__fields_for_path(), 1711 "attachment_id": attachment.id, 1712 } 1713 result = self.allspice_client.requests_patch( 1714 self.ATTACHMENT_PATH.format(**args), 1715 data=data, 1716 ) 1717 return Attachment.parse_response(self.allspice_client, result)
Edit an attachment.
The list of params that can be edited is available at allspice.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment
Parameters
- attachment: The attachment to be edited
- data: The data parameter should be a dictionary of the fields to edit.
Returns
The edited attachment
1719 def delete_attachment(self, attachment: Attachment): 1720 """https://hub.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment""" 1721 1722 args = { 1723 **self.__fields_for_path(), 1724 "attachment_id": attachment.id, 1725 } 1726 self.allspice_client.requests_delete(self.ATTACHMENT_PATH.format(**args)) 1727 attachment.deleted = True
allspice.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment
1730class Commit(ReadonlyApiObject): 1731 author: User 1732 commit: Dict[str, Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]] 1733 committer: Dict[str, Union[int, str, bool]] 1734 created: str 1735 files: List[Dict[str, str]] 1736 html_url: str 1737 inner_commit: Dict[str, Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]] 1738 parents: List[Union[Dict[str, str], Any]] 1739 sha: str 1740 stats: Dict[str, int] 1741 url: str 1742 1743 API_OBJECT = """/repos/{owner}/{repo}/commits/{sha}""" 1744 COMMIT_GET_STATUS = """/repos/{owner}/{repo}/commits/{sha}/status""" 1745 COMMIT_GET_STATUSES = """/repos/{owner}/{repo}/commits/{sha}/statuses""" 1746 1747 # Regex to extract owner and repo names from the url property 1748 URL_REGEXP = re.compile(r"/repos/([^/]+)/([^/]+)/git/commits") 1749 1750 def __init__(self, allspice_client): 1751 super().__init__(allspice_client) 1752 1753 _fields_to_parsers: ClassVar[dict] = { 1754 # NOTE: api may return None for commiters that are no allspice users 1755 "author": lambda allspice_client, u: ( 1756 User.parse_response(allspice_client, u) if u else None 1757 ) 1758 } 1759 1760 def __eq__(self, other): 1761 if not isinstance(other, Commit): 1762 return False 1763 return self.sha == other.sha 1764 1765 def __hash__(self): 1766 return hash(self.sha) 1767 1768 @classmethod 1769 def parse_response(cls, allspice_client, result) -> "Commit": 1770 commit_cache = result["commit"] 1771 api_object = cls(allspice_client) 1772 cls._initialize(allspice_client, api_object, result) 1773 # inner_commit for legacy reasons 1774 Commit._add_read_property("inner_commit", commit_cache, api_object) 1775 return api_object 1776 1777 def get_status(self) -> CommitCombinedStatus: 1778 """ 1779 Get a combined status consisting of all statues on this commit. 1780 1781 Note that the returned object is a CommitCombinedStatus object, which 1782 also contains a list of all statuses on the commit. 1783 1784 https://hub.allspice.io/api/swagger#/repository/repoGetCommitStatus 1785 """ 1786 1787 result = self.allspice_client.requests_get( 1788 self.COMMIT_GET_STATUS.format(**self._fields_for_path) 1789 ) 1790 return CommitCombinedStatus.parse_response(self.allspice_client, result) 1791 1792 def get_statuses(self) -> List[CommitStatus]: 1793 """ 1794 Get a list of all statuses on this commit. 1795 1796 https://hub.allspice.io/api/swagger#/repository/repoListCommitStatuses 1797 """ 1798 1799 results = self.allspice_client.requests_get( 1800 self.COMMIT_GET_STATUSES.format(**self._fields_for_path) 1801 ) 1802 return [CommitStatus.parse_response(self.allspice_client, result) for result in results] 1803 1804 @cached_property 1805 def _fields_for_path(self) -> dict[str, str]: 1806 matches = self.URL_REGEXP.search(self.url) 1807 if not matches: 1808 raise ValueError(f"Invalid commit URL: {self.url}") 1809 1810 return { 1811 "owner": matches.group(1), 1812 "repo": matches.group(2), 1813 "sha": self.sha, 1814 }
1768 @classmethod 1769 def parse_response(cls, allspice_client, result) -> "Commit": 1770 commit_cache = result["commit"] 1771 api_object = cls(allspice_client) 1772 cls._initialize(allspice_client, api_object, result) 1773 # inner_commit for legacy reasons 1774 Commit._add_read_property("inner_commit", commit_cache, api_object) 1775 return api_object
1777 def get_status(self) -> CommitCombinedStatus: 1778 """ 1779 Get a combined status consisting of all statues on this commit. 1780 1781 Note that the returned object is a CommitCombinedStatus object, which 1782 also contains a list of all statuses on the commit. 1783 1784 https://hub.allspice.io/api/swagger#/repository/repoGetCommitStatus 1785 """ 1786 1787 result = self.allspice_client.requests_get( 1788 self.COMMIT_GET_STATUS.format(**self._fields_for_path) 1789 ) 1790 return CommitCombinedStatus.parse_response(self.allspice_client, result)
Get a combined status consisting of all statues on this commit.
Note that the returned object is a CommitCombinedStatus object, which also contains a list of all statuses on the commit.
allspice.allspice.io/api/swagger#/repository/repoGetCommitStatus">https://huballspice.allspice.io/api/swagger#/repository/repoGetCommitStatus
1792 def get_statuses(self) -> List[CommitStatus]: 1793 """ 1794 Get a list of all statuses on this commit. 1795 1796 https://hub.allspice.io/api/swagger#/repository/repoListCommitStatuses 1797 """ 1798 1799 results = self.allspice_client.requests_get( 1800 self.COMMIT_GET_STATUSES.format(**self._fields_for_path) 1801 ) 1802 return [CommitStatus.parse_response(self.allspice_client, result) for result in results]
Get a list of all statuses on this commit.
allspice.allspice.io/api/swagger#/repository/repoListCommitStatuses">https://huballspice.allspice.io/api/swagger#/repository/repoListCommitStatuses
Inherited Members
2624class Content(ReadonlyApiObject): 2625 content: Any 2626 download_url: str 2627 encoding: Any 2628 git_url: str 2629 html_url: str 2630 last_commit_sha: str 2631 name: str 2632 path: str 2633 sha: str 2634 size: int 2635 submodule_git_url: Any 2636 target: Any 2637 type: str 2638 url: str 2639 2640 FILE = "file" 2641 2642 def __init__(self, allspice_client): 2643 super().__init__(allspice_client) 2644 2645 def __eq__(self, other): 2646 if not isinstance(other, Content): 2647 return False 2648 2649 return self.sha == other.sha and self.name == other.name 2650 2651 def __hash__(self): 2652 return hash(self.sha) ^ hash(self.name)
Inherited Members
2108class DesignReview(ApiObject): 2109 """ 2110 A Design Review. See 2111 https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest. 2112 2113 Note: The base and head fields are not `Branch` objects - they are plain strings 2114 referring to the branch names. This is because DRs can exist for branches that have 2115 been deleted, which don't have an associated `Branch` object from the API. You can use 2116 the `Repository.get_branch` method to get a `Branch` object for a branch if you know 2117 it exists. 2118 """ 2119 2120 additions: int 2121 allow_maintainer_edit: bool 2122 allow_maintainer_edits: Any 2123 assignee: User 2124 assignees: List["User"] 2125 base: str 2126 body: str 2127 changed_files: int 2128 closed_at: Optional[str] 2129 comments: int 2130 created_at: str 2131 deletions: int 2132 diff_url: str 2133 draft: bool 2134 due_date: Optional[str] 2135 head: str 2136 html_url: str 2137 id: int 2138 is_locked: bool 2139 labels: List[Any] 2140 merge_base: str 2141 merge_commit_sha: Optional[str] 2142 mergeable: bool 2143 merged: bool 2144 merged_at: Optional[str] 2145 merged_by: Optional["User"] 2146 milestone: Any 2147 number: int 2148 patch_url: str 2149 pin_order: int 2150 repository: Optional["Repository"] 2151 requested_reviewers: Any 2152 review_comments: int 2153 state: str 2154 title: str 2155 updated_at: str 2156 url: str 2157 user: User 2158 2159 API_OBJECT = "/repos/{owner}/{repo}/pulls/{index}" 2160 MERGE_DESIGN_REVIEW = "/repos/{owner}/{repo}/pulls/{index}/merge" 2161 GET_COMMENTS = "/repos/{owner}/{repo}/issues/{index}/comments" 2162 2163 OPEN = "open" 2164 CLOSED = "closed" 2165 2166 class MergeType(Enum): 2167 MERGE = "merge" 2168 REBASE = "rebase" 2169 REBASE_MERGE = "rebase-merge" 2170 SQUASH = "squash" 2171 MANUALLY_MERGED = "manually-merged" 2172 2173 def __init__(self, allspice_client): 2174 super().__init__(allspice_client) 2175 2176 def __eq__(self, other): 2177 if not isinstance(other, DesignReview): 2178 return False 2179 return self.repository == other.repository and self.id == other.id 2180 2181 def __hash__(self): 2182 return hash(self.repository) ^ hash(self.id) 2183 2184 @classmethod 2185 def parse_response(cls, allspice_client, result) -> "DesignReview": 2186 api_object = super().parse_response(allspice_client, result) 2187 cls._add_read_property( 2188 "repository", 2189 Repository.parse_response(allspice_client, result["base"]["repo"]), 2190 api_object, 2191 ) 2192 2193 return api_object 2194 2195 @classmethod 2196 def request(cls, allspice_client, owner: str, repo: str, number: str): 2197 """See https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest""" 2198 return cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number}) 2199 2200 _fields_to_parsers: ClassVar[dict] = { 2201 "assignee": lambda allspice_client, u: User.parse_response(allspice_client, u), 2202 "assignees": lambda allspice_client, us: [ 2203 User.parse_response(allspice_client, u) for u in us 2204 ], 2205 "base": lambda _, b: b["ref"], 2206 "head": lambda _, h: h["ref"], 2207 "merged_by": lambda allspice_client, u: User.parse_response(allspice_client, u), 2208 "milestone": lambda allspice_client, m: Milestone.parse_response(allspice_client, m), 2209 "user": lambda allspice_client, u: User.parse_response(allspice_client, u), 2210 } 2211 2212 _patchable_fields: ClassVar[set[str]] = { 2213 "allow_maintainer_edits", 2214 "assignee", 2215 "assignees", 2216 "base", 2217 "body", 2218 "due_date", 2219 "milestone", 2220 "state", 2221 "title", 2222 } 2223 2224 _parsers_to_fields: ClassVar[dict] = { 2225 "assignee": lambda u: u.username, 2226 "assignees": lambda us: [u.username for u in us], 2227 "base": lambda b: b.name if isinstance(b, Branch) else b, 2228 "milestone": lambda m: m.id, 2229 } 2230 2231 def commit(self): 2232 data = self.get_dirty_fields() 2233 if "due_date" in data and data["due_date"] is None: 2234 data["unset_due_date"] = True 2235 2236 args = { 2237 "owner": self.repository.owner.username, 2238 "repo": self.repository.name, 2239 "index": self.number, 2240 } 2241 self._commit(args, data) 2242 2243 def merge(self, merge_type: MergeType): 2244 """ 2245 Merge the pull request. See 2246 https://hub.allspice.io/api/swagger#/repository/repoMergePullRequest 2247 2248 :param merge_type: The type of merge to perform. See the MergeType enum. 2249 """ 2250 2251 self.allspice_client.requests_post( 2252 self.MERGE_DESIGN_REVIEW.format( 2253 owner=self.repository.owner.username, 2254 repo=self.repository.name, 2255 index=self.number, 2256 ), 2257 data={"Do": merge_type.value}, 2258 ) 2259 2260 def get_comments(self) -> List[Comment]: 2261 """ 2262 Get the comments on this pull request, but not specifically on a review. 2263 2264 https://hub.allspice.io/api/swagger#/issue/issueGetComments 2265 2266 :return: A list of comments on this pull request. 2267 """ 2268 2269 results = self.allspice_client.requests_get( 2270 self.GET_COMMENTS.format( 2271 owner=self.repository.owner.username, 2272 repo=self.repository.name, 2273 index=self.number, 2274 ) 2275 ) 2276 return [Comment.parse_response(self.allspice_client, result) for result in results] 2277 2278 def create_comment(self, body: str): 2279 """ 2280 Create a comment on this pull request. This uses the same endpoint as the 2281 comments on issues, and will not be associated with any reviews. 2282 2283 https://hub.allspice.io/api/swagger#/issue/issueCreateComment 2284 2285 :param body: The body of the comment. 2286 :return: The comment that was created. 2287 """ 2288 2289 result = self.allspice_client.requests_post( 2290 self.GET_COMMENTS.format( 2291 owner=self.repository.owner.username, 2292 repo=self.repository.name, 2293 index=self.number, 2294 ), 2295 data={"body": body}, 2296 ) 2297 return Comment.parse_response(self.allspice_client, result)
A Design Review. See allspice.allspice.io/api/swagger#/repository/repoGetPullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoGetPullRequest.
Note: The base and head fields are not Branch
objects - they are plain strings
referring to the branch names. This is because DRs can exist for branches that have
been deleted, which don't have an associated Branch
object from the API. You can use
the Repository.get_branch
method to get a Branch
object for a branch if you know
it exists.
2184 @classmethod 2185 def parse_response(cls, allspice_client, result) -> "DesignReview": 2186 api_object = super().parse_response(allspice_client, result) 2187 cls._add_read_property( 2188 "repository", 2189 Repository.parse_response(allspice_client, result["base"]["repo"]), 2190 api_object, 2191 ) 2192 2193 return api_object
2195 @classmethod 2196 def request(cls, allspice_client, owner: str, repo: str, number: str): 2197 """See https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest""" 2198 return cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number})
See allspice.allspice.io/api/swagger#/repository/repoGetPullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoGetPullRequest
2231 def commit(self): 2232 data = self.get_dirty_fields() 2233 if "due_date" in data and data["due_date"] is None: 2234 data["unset_due_date"] = True 2235 2236 args = { 2237 "owner": self.repository.owner.username, 2238 "repo": self.repository.name, 2239 "index": self.number, 2240 } 2241 self._commit(args, data)
2243 def merge(self, merge_type: MergeType): 2244 """ 2245 Merge the pull request. See 2246 https://hub.allspice.io/api/swagger#/repository/repoMergePullRequest 2247 2248 :param merge_type: The type of merge to perform. See the MergeType enum. 2249 """ 2250 2251 self.allspice_client.requests_post( 2252 self.MERGE_DESIGN_REVIEW.format( 2253 owner=self.repository.owner.username, 2254 repo=self.repository.name, 2255 index=self.number, 2256 ), 2257 data={"Do": merge_type.value}, 2258 )
Merge the pull request. See allspice.allspice.io/api/swagger#/repository/repoMergePullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoMergePullRequest
Parameters
- merge_type: The type of merge to perform. See the MergeType enum.
2260 def get_comments(self) -> List[Comment]: 2261 """ 2262 Get the comments on this pull request, but not specifically on a review. 2263 2264 https://hub.allspice.io/api/swagger#/issue/issueGetComments 2265 2266 :return: A list of comments on this pull request. 2267 """ 2268 2269 results = self.allspice_client.requests_get( 2270 self.GET_COMMENTS.format( 2271 owner=self.repository.owner.username, 2272 repo=self.repository.name, 2273 index=self.number, 2274 ) 2275 ) 2276 return [Comment.parse_response(self.allspice_client, result) for result in results]
Get the comments on this pull request, but not specifically on a review.
allspice.allspice.io/api/swagger#/issue/issueGetComments">https://huballspice.allspice.io/api/swagger#/issue/issueGetComments
Returns
A list of comments on this pull request.
2278 def create_comment(self, body: str): 2279 """ 2280 Create a comment on this pull request. This uses the same endpoint as the 2281 comments on issues, and will not be associated with any reviews. 2282 2283 https://hub.allspice.io/api/swagger#/issue/issueCreateComment 2284 2285 :param body: The body of the comment. 2286 :return: The comment that was created. 2287 """ 2288 2289 result = self.allspice_client.requests_post( 2290 self.GET_COMMENTS.format( 2291 owner=self.repository.owner.username, 2292 repo=self.repository.name, 2293 index=self.number, 2294 ), 2295 data={"body": body}, 2296 ) 2297 return Comment.parse_response(self.allspice_client, result)
Create a comment on this pull request. This uses the same endpoint as the comments on issues, and will not be associated with any reviews.
allspice.allspice.io/api/swagger#/issue/issueCreateComment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateComment
Parameters
- body: The body of the comment.
Returns
The comment that was created.
1905class Issue(ApiObject): 1906 """ 1907 An issue on a repository. 1908 1909 Note: `Issue.assets` may not have any entries even if the issue has 1910 attachments. This happens when an issue is fetched via a bulk method like 1911 `Repository.get_issues`. In most cases, prefer using 1912 `Issue.get_attachments` to get the attachments on an issue. 1913 """ 1914 1915 assets: List[Union[Any, "Attachment"]] 1916 assignee: Any 1917 assignees: Any 1918 body: str 1919 closed_at: Any 1920 comments: int 1921 created_at: str 1922 due_date: Any 1923 html_url: str 1924 id: int 1925 is_locked: bool 1926 labels: List[Any] 1927 milestone: Optional["Milestone"] 1928 number: int 1929 original_author: str 1930 original_author_id: int 1931 pin_order: int 1932 pull_request: Any 1933 ref: str 1934 repository: Dict[str, Union[int, str]] 1935 state: str 1936 title: str 1937 updated_at: str 1938 url: str 1939 user: User 1940 1941 API_OBJECT = """/repos/{owner}/{repo}/issues/{index}""" # <owner, repo, index> 1942 GET_TIME = """/repos/%s/%s/issues/%s/times""" # <owner, repo, index> 1943 GET_COMMENTS = """/repos/{owner}/{repo}/issues/{index}/comments""" 1944 CREATE_ISSUE = """/repos/{owner}/{repo}/issues""" 1945 GET_ATTACHMENTS = """/repos/{owner}/{repo}/issues/{index}/assets""" 1946 1947 OPENED = "open" 1948 CLOSED = "closed" 1949 1950 def __init__(self, allspice_client): 1951 super().__init__(allspice_client) 1952 1953 def __eq__(self, other): 1954 if not isinstance(other, Issue): 1955 return False 1956 return self.repository == other.repository and self.id == other.id 1957 1958 def __hash__(self): 1959 return hash(self.repository) ^ hash(self.id) 1960 1961 _fields_to_parsers: ClassVar[dict] = { 1962 "milestone": lambda allspice_client, m: Milestone.parse_response(allspice_client, m), 1963 "user": lambda allspice_client, u: User.parse_response(allspice_client, u), 1964 "assets": lambda allspice_client, assets: [ 1965 Attachment.parse_response(allspice_client, a) for a in assets 1966 ], 1967 "assignee": lambda allspice_client, u: User.parse_response(allspice_client, u), 1968 "assignees": lambda allspice_client, us: [ 1969 User.parse_response(allspice_client, u) for u in us 1970 ], 1971 "state": lambda _, s: (Issue.CLOSED if s == "closed" else Issue.OPENED), 1972 } 1973 1974 _parsers_to_fields: ClassVar[dict] = { 1975 "milestone": lambda m: m.id, 1976 } 1977 1978 _patchable_fields: ClassVar[set[str]] = { 1979 "assignee", 1980 "assignees", 1981 "body", 1982 "due_date", 1983 "milestone", 1984 "state", 1985 "title", 1986 } 1987 1988 def commit(self): 1989 args = { 1990 "owner": self.repository.owner.username, 1991 "repo": self.repository.name, 1992 "index": self.number, 1993 } 1994 self._commit(args) 1995 1996 @classmethod 1997 def request(cls, allspice_client, owner: str, repo: str, number: str): 1998 api_object = cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number}) 1999 # The repository in the response is a RepositoryMeta object, so request 2000 # the full repository object and add it to the issue object. 2001 repository = Repository.request(allspice_client, owner, repo) 2002 setattr(api_object, "_repository", repository) 2003 # For legacy reasons 2004 cls._add_read_property("repo", repository, api_object) 2005 return api_object 2006 2007 @classmethod 2008 def create_issue(cls, allspice_client, repo: Repository, title: str, body: str = ""): 2009 args = {"owner": repo.owner.username, "repo": repo.name} 2010 data = {"title": title, "body": body} 2011 result = allspice_client.requests_post(Issue.CREATE_ISSUE.format(**args), data=data) 2012 issue = Issue.parse_response(allspice_client, result) 2013 setattr(issue, "_repository", repo) 2014 cls._add_read_property("repo", repo, issue) 2015 return issue 2016 2017 @property 2018 def owner(self) -> Organization | User: 2019 return self.repository.owner 2020 2021 def get_time_sum(self, user: User) -> int: 2022 results = self.allspice_client.requests_get( 2023 Issue.GET_TIME % (self.owner.username, self.repository.name, self.number) 2024 ) 2025 return sum(result["time"] for result in results if result and result["user_id"] == user.id) 2026 2027 def get_times(self) -> Optional[Dict]: 2028 return self.allspice_client.requests_get( 2029 Issue.GET_TIME % (self.owner.username, self.repository.name, self.number) 2030 ) 2031 2032 def delete_time(self, time_id: str): 2033 path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times/{time_id}" 2034 self.allspice_client.requests_delete(path) 2035 2036 def add_time(self, time: int, created: Optional[str] = None, user_name: Optional[User] = None): 2037 path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times" 2038 self.allspice_client.requests_post( 2039 path, data={"created": created, "time": int(time), "user_name": user_name} 2040 ) 2041 2042 def get_comments(self) -> List[Comment]: 2043 """https://hub.allspice.io/api/swagger#/issue/issueGetComments""" 2044 2045 results = self.allspice_client.requests_get( 2046 self.GET_COMMENTS.format( 2047 owner=self.owner.username, repo=self.repository.name, index=self.number 2048 ) 2049 ) 2050 2051 return [Comment.parse_response(self.allspice_client, result) for result in results] 2052 2053 def create_comment(self, body: str) -> Comment: 2054 """https://hub.allspice.io/api/swagger#/issue/issueCreateComment""" 2055 2056 path = self.GET_COMMENTS.format( 2057 owner=self.owner.username, repo=self.repository.name, index=self.number 2058 ) 2059 2060 response = self.allspice_client.requests_post(path, data={"body": body}) 2061 return Comment.parse_response(self.allspice_client, response) 2062 2063 def get_attachments(self) -> List[Attachment]: 2064 """ 2065 Fetch all attachments on this issue. 2066 2067 Unlike the assets field, this will always fetch all attachments from the 2068 API. 2069 2070 See https://hub.allspice.io/api/swagger#/issue/issueListIssueAttachments 2071 """ 2072 2073 path = self.GET_ATTACHMENTS.format( 2074 owner=self.owner.username, repo=self.repository.name, index=self.number 2075 ) 2076 response = self.allspice_client.requests_get(path) 2077 2078 return [Attachment.parse_response(self.allspice_client, result) for result in response] 2079 2080 def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment: 2081 """ 2082 Create an attachment on this issue. 2083 2084 https://hub.allspice.io/api/swagger#/issue/issueCreateIssueAttachment 2085 2086 :param file: The file to attach. This should be a file-like object. 2087 :param name: The name of the file. If not provided, the name of the file will be 2088 used. 2089 :return: The created attachment. 2090 """ 2091 2092 args: dict[str, Any] = { 2093 "files": {"attachment": file}, 2094 } 2095 if name is not None: 2096 args["params"] = {"name": name} 2097 2098 result = self.allspice_client.requests_post( 2099 self.GET_ATTACHMENTS.format( 2100 owner=self.owner.username, repo=self.repository.name, index=self.number 2101 ), 2102 **args, 2103 ) 2104 2105 return Attachment.parse_response(self.allspice_client, result)
An issue on a repository.
Note: Issue.assets
may not have any entries even if the issue has
attachments. This happens when an issue is fetched via a bulk method like
Repository.get_issues
. In most cases, prefer using
Issue.get_attachments
to get the attachments on an issue.
1996 @classmethod 1997 def request(cls, allspice_client, owner: str, repo: str, number: str): 1998 api_object = cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number}) 1999 # The repository in the response is a RepositoryMeta object, so request 2000 # the full repository object and add it to the issue object. 2001 repository = Repository.request(allspice_client, owner, repo) 2002 setattr(api_object, "_repository", repository) 2003 # For legacy reasons 2004 cls._add_read_property("repo", repository, api_object) 2005 return api_object
2007 @classmethod 2008 def create_issue(cls, allspice_client, repo: Repository, title: str, body: str = ""): 2009 args = {"owner": repo.owner.username, "repo": repo.name} 2010 data = {"title": title, "body": body} 2011 result = allspice_client.requests_post(Issue.CREATE_ISSUE.format(**args), data=data) 2012 issue = Issue.parse_response(allspice_client, result) 2013 setattr(issue, "_repository", repo) 2014 cls._add_read_property("repo", repo, issue) 2015 return issue
2036 def add_time(self, time: int, created: Optional[str] = None, user_name: Optional[User] = None): 2037 path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times" 2038 self.allspice_client.requests_post( 2039 path, data={"created": created, "time": int(time), "user_name": user_name} 2040 )
2042 def get_comments(self) -> List[Comment]: 2043 """https://hub.allspice.io/api/swagger#/issue/issueGetComments""" 2044 2045 results = self.allspice_client.requests_get( 2046 self.GET_COMMENTS.format( 2047 owner=self.owner.username, repo=self.repository.name, index=self.number 2048 ) 2049 ) 2050 2051 return [Comment.parse_response(self.allspice_client, result) for result in results]
allspice.allspice.io/api/swagger#/issue/issueGetComments">https://huballspice.allspice.io/api/swagger#/issue/issueGetComments
2053 def create_comment(self, body: str) -> Comment: 2054 """https://hub.allspice.io/api/swagger#/issue/issueCreateComment""" 2055 2056 path = self.GET_COMMENTS.format( 2057 owner=self.owner.username, repo=self.repository.name, index=self.number 2058 ) 2059 2060 response = self.allspice_client.requests_post(path, data={"body": body}) 2061 return Comment.parse_response(self.allspice_client, response)
allspice.allspice.io/api/swagger#/issue/issueCreateComment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateComment
2063 def get_attachments(self) -> List[Attachment]: 2064 """ 2065 Fetch all attachments on this issue. 2066 2067 Unlike the assets field, this will always fetch all attachments from the 2068 API. 2069 2070 See https://hub.allspice.io/api/swagger#/issue/issueListIssueAttachments 2071 """ 2072 2073 path = self.GET_ATTACHMENTS.format( 2074 owner=self.owner.username, repo=self.repository.name, index=self.number 2075 ) 2076 response = self.allspice_client.requests_get(path) 2077 2078 return [Attachment.parse_response(self.allspice_client, result) for result in response]
Fetch all attachments on this issue.
Unlike the assets field, this will always fetch all attachments from the API.
See allspice.allspice.io/api/swagger#/issue/issueListIssueAttachments">https://huballspice.allspice.io/api/swagger#/issue/issueListIssueAttachments
2080 def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment: 2081 """ 2082 Create an attachment on this issue. 2083 2084 https://hub.allspice.io/api/swagger#/issue/issueCreateIssueAttachment 2085 2086 :param file: The file to attach. This should be a file-like object. 2087 :param name: The name of the file. If not provided, the name of the file will be 2088 used. 2089 :return: The created attachment. 2090 """ 2091 2092 args: dict[str, Any] = { 2093 "files": {"attachment": file}, 2094 } 2095 if name is not None: 2096 args["params"] = {"name": name} 2097 2098 result = self.allspice_client.requests_post( 2099 self.GET_ATTACHMENTS.format( 2100 owner=self.owner.username, repo=self.repository.name, index=self.number 2101 ), 2102 **args, 2103 ) 2104 2105 return Attachment.parse_response(self.allspice_client, result)
Create an attachment on this issue.
allspice.allspice.io/api/swagger#/issue/issueCreateIssueAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateIssueAttachment
Parameters
- file: The file to attach. This should be a file-like object.
- name: The name of the file. If not provided, the name of the file will be used.
Returns
The created attachment.
1474class Milestone(ApiObject): 1475 allow_merge_commits: Any 1476 allow_rebase: Any 1477 allow_rebase_explicit: Any 1478 allow_squash_merge: Any 1479 archived: Any 1480 closed_at: Any 1481 closed_issues: int 1482 created_at: str 1483 default_branch: Any 1484 description: str 1485 due_on: Any 1486 has_issues: Any 1487 has_pull_requests: Any 1488 has_wiki: Any 1489 id: int 1490 ignore_whitespace_conflicts: Any 1491 name: Any 1492 open_issues: int 1493 private: Any 1494 state: str 1495 title: str 1496 updated_at: str 1497 website: Any 1498 1499 API_OBJECT = """/repos/{owner}/{repo}/milestones/{number}""" # <owner, repo> 1500 1501 def __init__(self, allspice_client): 1502 super().__init__(allspice_client) 1503 1504 def __eq__(self, other): 1505 if not isinstance(other, Milestone): 1506 return False 1507 return self.allspice_client == other.allspice_client and self.id == other.id 1508 1509 def __hash__(self): 1510 return hash(self.allspice_client) ^ hash(self.id) 1511 1512 _fields_to_parsers: ClassVar[dict] = { 1513 "closed_at": lambda _, t: Util.convert_time(t), 1514 "due_on": lambda _, t: Util.convert_time(t), 1515 } 1516 1517 _patchable_fields: ClassVar[set[str]] = { 1518 "allow_merge_commits", 1519 "allow_rebase", 1520 "allow_rebase_explicit", 1521 "allow_squash_merge", 1522 "archived", 1523 "default_branch", 1524 "description", 1525 "has_issues", 1526 "has_pull_requests", 1527 "has_wiki", 1528 "ignore_whitespace_conflicts", 1529 "name", 1530 "private", 1531 "website", 1532 } 1533 1534 @classmethod 1535 def request(cls, allspice_client, owner: str, repo: str, number: str): 1536 return cls._request(allspice_client, {"owner": owner, "repo": repo, "number": number})
Common base class for all non-exit exceptions.
33class Organization(ApiObject): 34 """see https://hub.allspice.io/api/swagger#/organization/orgGetAll""" 35 36 active: Optional[bool] 37 avatar_url: str 38 created: Optional[str] 39 description: str 40 email: str 41 followers_count: Optional[int] 42 following_count: Optional[int] 43 full_name: str 44 html_url: Optional[str] 45 id: int 46 is_admin: Optional[bool] 47 language: Optional[str] 48 last_login: Optional[str] 49 location: str 50 login: Optional[str] 51 login_name: Optional[str] 52 name: Optional[str] 53 prohibit_login: Optional[bool] 54 repo_admin_change_team_access: Optional[bool] 55 restricted: Optional[bool] 56 source_id: Optional[int] 57 starred_repos_count: Optional[int] 58 username: str 59 visibility: str 60 website: str 61 62 API_OBJECT = """/orgs/{name}""" # <org> 63 ORG_REPOS_REQUEST = """/orgs/%s/repos""" # <org> 64 ORG_TEAMS_REQUEST = """/orgs/%s/teams""" # <org> 65 ORG_TEAMS_CREATE = """/orgs/%s/teams""" # <org> 66 ORG_GET_MEMBERS = """/orgs/%s/members""" # <org> 67 ORG_IS_MEMBER = """/orgs/%s/members/%s""" # <org>, <username> 68 ORG_HEATMAP = """/users/%s/heatmap""" # <username> 69 70 def __init__(self, allspice_client): 71 super().__init__(allspice_client) 72 73 def __eq__(self, other): 74 if not isinstance(other, Organization): 75 return False 76 return self.allspice_client == other.allspice_client and self.name == other.name 77 78 def __hash__(self): 79 return hash(self.allspice_client) ^ hash(self.name) 80 81 @classmethod 82 def request(cls, allspice_client, name: str) -> Self: 83 return cls._request(allspice_client, {"name": name}) 84 85 @classmethod 86 def parse_response(cls, allspice_client, result) -> "Organization": 87 api_object = super().parse_response(allspice_client, result) 88 # add "name" field to make this behave similar to users for gitea < 1.18 89 # also necessary for repository-owner when org is repo owner 90 if not hasattr(api_object, "name"): 91 Organization._add_read_property("name", result["username"], api_object) 92 return api_object 93 94 _patchable_fields: ClassVar[set[str]] = { 95 "description", 96 "full_name", 97 "location", 98 "visibility", 99 "website", 100 } 101 102 def commit(self): 103 args = {"name": self.name} 104 self._commit(args) 105 106 def create_repo( 107 self, 108 repoName: str, 109 description: str = "", 110 private: bool = False, 111 autoInit=True, 112 gitignores: Optional[str] = None, 113 license: Optional[str] = None, 114 readme: str = "Default", 115 issue_labels: Optional[str] = None, 116 default_branch="master", 117 ): 118 """Create an organization Repository 119 120 Throws: 121 AlreadyExistsException: If the Repository exists already. 122 Exception: If something else went wrong. 123 """ 124 result = self.allspice_client.requests_post( 125 f"/orgs/{self.name}/repos", 126 data={ 127 "name": repoName, 128 "description": description, 129 "private": private, 130 "auto_init": autoInit, 131 "gitignores": gitignores, 132 "license": license, 133 "issue_labels": issue_labels, 134 "readme": readme, 135 "default_branch": default_branch, 136 }, 137 ) 138 if "id" in result: 139 self.allspice_client.logger.info("Successfully created Repository %s " % result["name"]) 140 else: 141 self.allspice_client.logger.error(result["message"]) 142 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 143 return Repository.parse_response(self.allspice_client, result) 144 145 def get_repositories(self) -> List["Repository"]: 146 results = self.allspice_client.requests_get_paginated( 147 Organization.ORG_REPOS_REQUEST % self.username 148 ) 149 return [Repository.parse_response(self.allspice_client, result) for result in results] 150 151 def get_repository(self, name) -> "Repository": 152 repos = self.get_repositories() 153 for repo in repos: 154 if repo.name == name: 155 return repo 156 raise NotFoundException("Repository %s not existent in organization." % name) 157 158 def get_teams(self) -> List["Team"]: 159 results = self.allspice_client.requests_get(Organization.ORG_TEAMS_REQUEST % self.username) 160 teams = [Team.parse_response(self.allspice_client, result) for result in results] 161 # organisation seems to be missing using this request, so we add org manually 162 for t in teams: 163 setattr(t, "_organization", self) 164 return teams 165 166 def get_team(self, name) -> "Team": 167 teams = self.get_teams() 168 for team in teams: 169 if team.name == name: 170 return team 171 raise NotFoundException("Team not existent in organization.") 172 173 def create_team( 174 self, 175 name: str, 176 description: str = "", 177 permission: str = "read", 178 can_create_org_repo: bool = False, 179 includes_all_repositories: bool = False, 180 units=( 181 "repo.code", 182 "repo.issues", 183 "repo.ext_issues", 184 "repo.wiki", 185 "repo.pulls", 186 "repo.releases", 187 "repo.ext_wiki", 188 ), 189 units_map={}, 190 ) -> "Team": 191 """Alias for AllSpice#create_team""" 192 # TODO: Move AllSpice#create_team to Organization#create_team and 193 # deprecate AllSpice#create_team. 194 return self.allspice_client.create_team( 195 org=self, 196 name=name, 197 description=description, 198 permission=permission, 199 can_create_org_repo=can_create_org_repo, 200 includes_all_repositories=includes_all_repositories, 201 units=units, 202 units_map=units_map, 203 ) 204 205 def get_members(self) -> List["User"]: 206 results = self.allspice_client.requests_get(Organization.ORG_GET_MEMBERS % self.username) 207 return [User.parse_response(self.allspice_client, result) for result in results] 208 209 def is_member(self, username) -> bool: 210 if isinstance(username, User): 211 username = username.username 212 try: 213 # returns 204 if its ok, 404 if its not 214 self.allspice_client.requests_get( 215 Organization.ORG_IS_MEMBER % (self.username, username) 216 ) 217 return True 218 except Exception: 219 return False 220 221 def remove_member(self, user: "User"): 222 path = f"/orgs/{self.username}/members/{user.username}" 223 self.allspice_client.requests_delete(path) 224 225 def delete(self): 226 """Delete this Organization. Invalidates this Objects data. Also deletes all Repositories owned by the User""" 227 for repo in self.get_repositories(): 228 repo.delete() 229 self.allspice_client.requests_delete(Organization.API_OBJECT.format(name=self.username)) 230 self.deleted = True 231 232 def get_heatmap(self) -> List[Tuple[datetime, int]]: 233 results = self.allspice_client.requests_get(User.USER_HEATMAP % self.username) 234 results = [ 235 (datetime.fromtimestamp(result["timestamp"]), result["contributions"]) 236 for result in results 237 ] 238 return results
see allspice.allspice.io/api/swagger#/organization/orgGetAll">https://huballspice.allspice.io/api/swagger#/organization/orgGetAll
85 @classmethod 86 def parse_response(cls, allspice_client, result) -> "Organization": 87 api_object = super().parse_response(allspice_client, result) 88 # add "name" field to make this behave similar to users for gitea < 1.18 89 # also necessary for repository-owner when org is repo owner 90 if not hasattr(api_object, "name"): 91 Organization._add_read_property("name", result["username"], api_object) 92 return api_object
106 def create_repo( 107 self, 108 repoName: str, 109 description: str = "", 110 private: bool = False, 111 autoInit=True, 112 gitignores: Optional[str] = None, 113 license: Optional[str] = None, 114 readme: str = "Default", 115 issue_labels: Optional[str] = None, 116 default_branch="master", 117 ): 118 """Create an organization Repository 119 120 Throws: 121 AlreadyExistsException: If the Repository exists already. 122 Exception: If something else went wrong. 123 """ 124 result = self.allspice_client.requests_post( 125 f"/orgs/{self.name}/repos", 126 data={ 127 "name": repoName, 128 "description": description, 129 "private": private, 130 "auto_init": autoInit, 131 "gitignores": gitignores, 132 "license": license, 133 "issue_labels": issue_labels, 134 "readme": readme, 135 "default_branch": default_branch, 136 }, 137 ) 138 if "id" in result: 139 self.allspice_client.logger.info("Successfully created Repository %s " % result["name"]) 140 else: 141 self.allspice_client.logger.error(result["message"]) 142 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 143 return Repository.parse_response(self.allspice_client, result)
Create an organization Repository
Throws: AlreadyExistsException: If the Repository exists already. Exception: If something else went wrong.
158 def get_teams(self) -> List["Team"]: 159 results = self.allspice_client.requests_get(Organization.ORG_TEAMS_REQUEST % self.username) 160 teams = [Team.parse_response(self.allspice_client, result) for result in results] 161 # organisation seems to be missing using this request, so we add org manually 162 for t in teams: 163 setattr(t, "_organization", self) 164 return teams
173 def create_team( 174 self, 175 name: str, 176 description: str = "", 177 permission: str = "read", 178 can_create_org_repo: bool = False, 179 includes_all_repositories: bool = False, 180 units=( 181 "repo.code", 182 "repo.issues", 183 "repo.ext_issues", 184 "repo.wiki", 185 "repo.pulls", 186 "repo.releases", 187 "repo.ext_wiki", 188 ), 189 units_map={}, 190 ) -> "Team": 191 """Alias for AllSpice#create_team""" 192 # TODO: Move AllSpice#create_team to Organization#create_team and 193 # deprecate AllSpice#create_team. 194 return self.allspice_client.create_team( 195 org=self, 196 name=name, 197 description=description, 198 permission=permission, 199 can_create_org_repo=can_create_org_repo, 200 includes_all_repositories=includes_all_repositories, 201 units=units, 202 units_map=units_map, 203 )
Alias for AllSpice#create_team
209 def is_member(self, username) -> bool: 210 if isinstance(username, User): 211 username = username.username 212 try: 213 # returns 204 if its ok, 404 if its not 214 self.allspice_client.requests_get( 215 Organization.ORG_IS_MEMBER % (self.username, username) 216 ) 217 return True 218 except Exception: 219 return False
225 def delete(self): 226 """Delete this Organization. Invalidates this Objects data. Also deletes all Repositories owned by the User""" 227 for repo in self.get_repositories(): 228 repo.delete() 229 self.allspice_client.requests_delete(Organization.API_OBJECT.format(name=self.username)) 230 self.deleted = True
Delete this Organization. Invalidates this Objects data. Also deletes all Repositories owned by the User
2383class Release(ApiObject): 2384 """ 2385 A release on a repo. 2386 """ 2387 2388 assets: List[Union[Any, Dict[str, Union[int, str]], "ReleaseAsset"]] 2389 author: User 2390 body: str 2391 created_at: str 2392 draft: bool 2393 html_url: str 2394 id: int 2395 name: str 2396 prerelease: bool 2397 published_at: str 2398 repo: Optional["Repository"] 2399 repository: Optional["Repository"] 2400 tag_name: str 2401 tarball_url: str 2402 target_commitish: str 2403 upload_url: str 2404 url: str 2405 zipball_url: str 2406 2407 API_OBJECT = "/repos/{owner}/{repo}/releases/{id}" 2408 RELEASE_CREATE_ASSET = "/repos/{owner}/{repo}/releases/{id}/assets" 2409 # Note that we don't strictly need the get_assets route, as the release 2410 # object already contains the assets. 2411 2412 def __init__(self, allspice_client): 2413 super().__init__(allspice_client) 2414 2415 def __eq__(self, other): 2416 if not isinstance(other, Release): 2417 return False 2418 return self.repo == other.repo and self.id == other.id 2419 2420 def __hash__(self): 2421 return hash(self.repo) ^ hash(self.id) 2422 2423 _fields_to_parsers: ClassVar[dict] = { 2424 "author": lambda allspice_client, author: User.parse_response(allspice_client, author), 2425 } 2426 _patchable_fields: ClassVar[set[str]] = { 2427 "body", 2428 "draft", 2429 "name", 2430 "prerelease", 2431 "tag_name", 2432 "target_commitish", 2433 } 2434 2435 @classmethod 2436 def parse_response(cls, allspice_client, result, repo) -> Release: 2437 release = super().parse_response(allspice_client, result) 2438 Release._add_read_property("repository", repo, release) 2439 # For legacy reasons 2440 Release._add_read_property("repo", repo, release) 2441 setattr( 2442 release, 2443 "_assets", 2444 [ 2445 ReleaseAsset.parse_response(allspice_client, asset, release) 2446 for asset in result["assets"] 2447 ], 2448 ) 2449 return release 2450 2451 @classmethod 2452 def request( 2453 cls, 2454 allspice_client, 2455 owner: str, 2456 repo: str, 2457 id: Optional[int] = None, 2458 ) -> Release: 2459 args = {"owner": owner, "repo": repo, "id": id} 2460 release_response = cls._get_gitea_api_object(allspice_client, args) 2461 repository = Repository.request(allspice_client, owner, repo) 2462 release = cls.parse_response(allspice_client, release_response, repository) 2463 return release 2464 2465 def commit(self): 2466 if self.repo is None: 2467 raise ValueError("Cannot commit a release without a repository.") 2468 2469 args = {"owner": self.repo.owner.username, "repo": self.repo.name, "id": self.id} 2470 self._commit(args) 2471 2472 def create_asset(self, file: IO, name: Optional[str] = None) -> ReleaseAsset: 2473 """ 2474 Create an asset for this release. 2475 2476 https://hub.allspice.io/api/swagger#/repository/repoCreateReleaseAsset 2477 2478 :param file: The file to upload. This should be a file-like object. 2479 :param name: The name of the file. 2480 :return: The created asset. 2481 """ 2482 2483 if self.repo is None: 2484 raise ValueError("Cannot commit a release without a repository.") 2485 2486 args: dict[str, Any] = {"files": {"attachment": file}} 2487 if name is not None: 2488 args["params"] = {"name": name} 2489 2490 result = self.allspice_client.requests_post( 2491 self.RELEASE_CREATE_ASSET.format( 2492 owner=self.repo.owner.username, 2493 repo=self.repo.name, 2494 id=self.id, 2495 ), 2496 **args, 2497 ) 2498 return ReleaseAsset.parse_response(self.allspice_client, result, self) 2499 2500 def delete(self): 2501 if self.repo is None: 2502 raise ValueError("Cannot commit a release without a repository.") 2503 2504 args = {"owner": self.repo.owner.username, "repo": self.repo.name, "id": self.id} 2505 self.allspice_client.requests_delete(self.API_OBJECT.format(**args)) 2506 self.deleted = True
A release on a repo.
2435 @classmethod 2436 def parse_response(cls, allspice_client, result, repo) -> Release: 2437 release = super().parse_response(allspice_client, result) 2438 Release._add_read_property("repository", repo, release) 2439 # For legacy reasons 2440 Release._add_read_property("repo", repo, release) 2441 setattr( 2442 release, 2443 "_assets", 2444 [ 2445 ReleaseAsset.parse_response(allspice_client, asset, release) 2446 for asset in result["assets"] 2447 ], 2448 ) 2449 return release
2451 @classmethod 2452 def request( 2453 cls, 2454 allspice_client, 2455 owner: str, 2456 repo: str, 2457 id: Optional[int] = None, 2458 ) -> Release: 2459 args = {"owner": owner, "repo": repo, "id": id} 2460 release_response = cls._get_gitea_api_object(allspice_client, args) 2461 repository = Repository.request(allspice_client, owner, repo) 2462 release = cls.parse_response(allspice_client, release_response, repository) 2463 return release
2472 def create_asset(self, file: IO, name: Optional[str] = None) -> ReleaseAsset: 2473 """ 2474 Create an asset for this release. 2475 2476 https://hub.allspice.io/api/swagger#/repository/repoCreateReleaseAsset 2477 2478 :param file: The file to upload. This should be a file-like object. 2479 :param name: The name of the file. 2480 :return: The created asset. 2481 """ 2482 2483 if self.repo is None: 2484 raise ValueError("Cannot commit a release without a repository.") 2485 2486 args: dict[str, Any] = {"files": {"attachment": file}} 2487 if name is not None: 2488 args["params"] = {"name": name} 2489 2490 result = self.allspice_client.requests_post( 2491 self.RELEASE_CREATE_ASSET.format( 2492 owner=self.repo.owner.username, 2493 repo=self.repo.name, 2494 id=self.id, 2495 ), 2496 **args, 2497 ) 2498 return ReleaseAsset.parse_response(self.allspice_client, result, self)
Create an asset for this release.
allspice.allspice.io/api/swagger#/repository/repoCreateReleaseAsset">https://huballspice.allspice.io/api/swagger#/repository/repoCreateReleaseAsset
Parameters
- file: The file to upload. This should be a file-like object.
- name: The name of the file.
Returns
The created asset.
2500 def delete(self): 2501 if self.repo is None: 2502 raise ValueError("Cannot commit a release without a repository.") 2503 2504 args = {"owner": self.repo.owner.username, "repo": self.repo.name, "id": self.id} 2505 self.allspice_client.requests_delete(self.API_OBJECT.format(**args)) 2506 self.deleted = True
477class Repository(ApiObject): 478 allow_fast_forward_only_merge: bool 479 allow_manual_merge: Any 480 allow_merge_commits: bool 481 allow_rebase: bool 482 allow_rebase_explicit: bool 483 allow_rebase_update: bool 484 allow_squash_merge: bool 485 archived: bool 486 archived_at: str 487 autodetect_manual_merge: Any 488 avatar_url: str 489 clone_url: str 490 created_at: str 491 default_allow_maintainer_edit: bool 492 default_branch: str 493 default_delete_branch_after_merge: bool 494 default_merge_style: str 495 description: str 496 empty: bool 497 enable_prune: Any 498 external_tracker: Any 499 external_wiki: Any 500 fork: bool 501 forks_count: int 502 full_name: str 503 has_actions: bool 504 has_issues: bool 505 has_packages: bool 506 has_projects: bool 507 has_pull_requests: bool 508 has_releases: bool 509 has_wiki: bool 510 html_url: str 511 id: int 512 ignore_whitespace_conflicts: bool 513 internal: bool 514 internal_tracker: Dict[str, bool] 515 language: str 516 languages_url: str 517 link: str 518 mirror: bool 519 mirror_interval: str 520 mirror_updated: str 521 name: str 522 object_format_name: str 523 open_issues_count: int 524 open_pr_counter: int 525 original_url: str 526 owner: Union["User", "Organization"] 527 parent: Any 528 permissions: Dict[str, bool] 529 private: bool 530 projects_mode: str 531 release_counter: int 532 repo_transfer: Any 533 size: int 534 ssh_url: str 535 stars_count: int 536 template: bool 537 updated_at: datetime 538 url: str 539 watchers_count: int 540 website: str 541 542 API_OBJECT = """/repos/{owner}/{name}""" # <owner>, <reponame> 543 REPO_IS_COLLABORATOR = """/repos/%s/%s/collaborators/%s""" # <owner>, <reponame>, <username> 544 REPO_SEARCH = """/repos/search/""" 545 REPO_BRANCHES = """/repos/%s/%s/branches""" # <owner>, <reponame> 546 REPO_BRANCH = """/repos/{owner}/{repo}/branches/{branch}""" 547 REPO_ISSUES = """/repos/{owner}/{repo}/issues""" # <owner, reponame> 548 REPO_DESIGN_REVIEWS = """/repos/{owner}/{repo}/pulls""" 549 REPO_DELETE = """/repos/%s/%s""" # <owner>, <reponame> 550 REPO_TIMES = """/repos/%s/%s/times""" # <owner>, <reponame> 551 REPO_USER_TIME = """/repos/%s/%s/times/%s""" # <owner>, <reponame>, <username> 552 REPO_COMMITS = "/repos/%s/%s/commits" # <owner>, <reponame> 553 REPO_TRANSFER = "/repos/{owner}/{repo}/transfer" 554 REPO_MILESTONES = """/repos/{owner}/{repo}/milestones""" 555 REPO_GET_ARCHIVE = "/repos/{owner}/{repo}/archive/{ref}.{format}" 556 REPO_GET_ALLSPICE_JSON = "/repos/{owner}/{repo}/allspice_generated/json/{content}" 557 REPO_GET_ALLSPICE_SVG = "/repos/{owner}/{repo}/allspice_generated/svg/{content}" 558 REPO_GET_ALLSPICE_PROJECT = "/repos/{owner}/{repo}/allspice_generated/project/{content}" 559 REPO_GET_TOPICS = "/repos/{owner}/{repo}/topics" 560 REPO_ADD_TOPIC = "/repos/{owner}/{repo}/topics/{topic}" 561 REPO_GET_RELEASES = "/repos/{owner}/{repo}/releases" 562 REPO_GET_LATEST_RELEASE = "/repos/{owner}/{repo}/releases/latest" 563 REPO_GET_RELEASE_BY_TAG = "/repos/{owner}/{repo}/releases/tags/{tag}" 564 REPO_GET_COMMIT_STATUS = "/repos/{owner}/{repo}/statuses/{sha}" 565 REPO_GET_RAW_FILE = "/repos/{owner}/{repo}/raw/{path}" 566 REPO_GET_TREE = "/repos/{owner}/{repo}/git/trees/{ref}" 567 568 class ArchiveFormat(Enum): 569 """ 570 Archive formats for Repository.get_archive 571 """ 572 573 TAR = "tar.gz" 574 ZIP = "zip" 575 576 class CommitStatusSort(Enum): 577 """ 578 Sort order for Repository.get_commit_status 579 """ 580 581 OLDEST = "oldest" 582 RECENT_UPDATE = "recentupdate" 583 LEAST_UPDATE = "leastupdate" 584 LEAST_INDEX = "leastindex" 585 HIGHEST_INDEX = "highestindex" 586 587 def __init__(self, allspice_client): 588 super().__init__(allspice_client) 589 590 def __eq__(self, other): 591 if not isinstance(other, Repository): 592 return False 593 return self.owner == other.owner and self.name == other.name 594 595 def __hash__(self): 596 return hash(self.owner) ^ hash(self.name) 597 598 _fields_to_parsers: ClassVar[dict] = { 599 # dont know how to tell apart user and org as owner except form email being empty. 600 "owner": lambda allspice_client, r: ( 601 Organization.parse_response(allspice_client, r) 602 if r["email"] == "" 603 else User.parse_response(allspice_client, r) 604 ), 605 "updated_at": lambda _, t: Util.convert_time(t), 606 } 607 608 @classmethod 609 def request( 610 cls, 611 allspice_client, 612 owner: str, 613 name: str, 614 ) -> Repository: 615 return cls._request(allspice_client, {"owner": owner, "name": name}) 616 617 @classmethod 618 def search( 619 cls, 620 allspice_client, 621 query: Optional[str] = None, 622 topic: bool = False, 623 include_description: bool = False, 624 user: Optional[User] = None, 625 owner_to_prioritize: Union[User, Organization, None] = None, 626 ) -> list[Repository]: 627 """ 628 Search for repositories. 629 630 See https://hub.allspice.io/api/swagger#/repository/repoSearch 631 632 :param query: The query string to search for 633 :param topic: If true, the query string will only be matched against the 634 repository's topic. 635 :param include_description: If true, the query string will be matched 636 against the repository's description as well. 637 :param user: If specified, only repositories that this user owns or 638 contributes to will be searched. 639 :param owner_to_prioritize: If specified, repositories owned by the 640 given entity will be prioritized in the search. 641 :returns: All repositories matching the query. If there are many 642 repositories matching this query, this may take some time. 643 """ 644 645 params = {} 646 647 if query is not None: 648 params["q"] = query 649 if topic: 650 params["topic"] = topic 651 if include_description: 652 params["include_description"] = include_description 653 if user is not None: 654 params["user"] = user.id 655 if owner_to_prioritize is not None: 656 params["owner_to_prioritize"] = owner_to_prioritize.id 657 658 responses = allspice_client.requests_get_paginated(cls.REPO_SEARCH, params=params) 659 660 return [Repository.parse_response(allspice_client, response) for response in responses] 661 662 _patchable_fields: ClassVar[set[str]] = { 663 "allow_manual_merge", 664 "allow_merge_commits", 665 "allow_rebase", 666 "allow_rebase_explicit", 667 "allow_rebase_update", 668 "allow_squash_merge", 669 "archived", 670 "autodetect_manual_merge", 671 "default_branch", 672 "default_delete_branch_after_merge", 673 "default_merge_style", 674 "description", 675 "enable_prune", 676 "external_tracker", 677 "external_wiki", 678 "has_actions", 679 "has_issues", 680 "has_projects", 681 "has_pull_requests", 682 "has_wiki", 683 "ignore_whitespace_conflicts", 684 "internal_tracker", 685 "mirror_interval", 686 "name", 687 "private", 688 "template", 689 "website", 690 } 691 692 def commit(self): 693 args = {"owner": self.owner.username, "name": self.name} 694 self._commit(args) 695 696 def get_branches(self) -> List["Branch"]: 697 """Get all the Branches of this Repository.""" 698 699 results = self.allspice_client.requests_get_paginated( 700 Repository.REPO_BRANCHES % (self.owner.username, self.name) 701 ) 702 return [Branch.parse_response(self.allspice_client, result) for result in results] 703 704 def get_branch(self, name: str) -> "Branch": 705 """Get a specific Branch of this Repository.""" 706 result = self.allspice_client.requests_get( 707 Repository.REPO_BRANCH.format(owner=self.owner.username, repo=self.name, branch=name) 708 ) 709 return Branch.parse_response(self.allspice_client, result) 710 711 def add_branch(self, create_from: Ref, newname: str) -> "Branch": 712 """Add a branch to the repository""" 713 # Note: will only work with gitea 1.13 or higher! 714 715 ref_name = Util.data_params_for_ref(create_from) 716 if "ref" not in ref_name: 717 raise ValueError("create_from must be a Branch, Commit or string") 718 ref_name = ref_name["ref"] 719 720 data = {"new_branch_name": newname, "old_ref_name": ref_name} 721 result = self.allspice_client.requests_post( 722 Repository.REPO_BRANCHES % (self.owner.username, self.name), data=data 723 ) 724 return Branch.parse_response(self.allspice_client, result) 725 726 def get_issues( 727 self, 728 state: Literal["open", "closed", "all"] = "all", 729 search_query: Optional[str] = None, 730 labels: Optional[List[str]] = None, 731 milestones: Optional[List[Union[Milestone, str]]] = None, 732 assignee: Optional[Union[User, str]] = None, 733 since: Optional[datetime] = None, 734 before: Optional[datetime] = None, 735 ) -> List["Issue"]: 736 """ 737 Get all Issues of this Repository (open and closed) 738 739 https://hub.allspice.io/api/swagger#/repository/repoListIssues 740 741 All params of this method are optional filters. If you don't specify a filter, it 742 will not be applied. 743 744 :param state: The state of the Issues to get. If None, all Issues are returned. 745 :param search_query: Filter issues by text. This is equivalent to searching for 746 `search_query` in the Issues on the web interface. 747 :param labels: Filter issues by labels. 748 :param milestones: Filter issues by milestones. 749 :param assignee: Filter issues by the assigned user. 750 :param since: Filter issues by the date they were created. 751 :param before: Filter issues by the date they were created. 752 :return: A list of Issues. 753 """ 754 755 data = { 756 "state": state, 757 } 758 if search_query: 759 data["q"] = search_query 760 if labels: 761 data["labels"] = ",".join(labels) 762 if milestones: 763 data["milestone"] = ",".join( 764 [ 765 milestone.name if isinstance(milestone, Milestone) else milestone 766 for milestone in milestones 767 ] 768 ) 769 if assignee: 770 if isinstance(assignee, User): 771 data["assignee"] = assignee.username 772 else: 773 data["assignee"] = assignee 774 if since: 775 data["since"] = Util.format_time(since) 776 if before: 777 data["before"] = Util.format_time(before) 778 779 results = self.allspice_client.requests_get_paginated( 780 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 781 params=data, 782 ) 783 784 issues = [] 785 for result in results: 786 issue = Issue.parse_response(self.allspice_client, result) 787 # See Issue.request 788 setattr(issue, "_repository", self) 789 # This is mostly for compatibility with an older implementation 790 Issue._add_read_property("repo", self, issue) 791 issues.append(issue) 792 793 return issues 794 795 def get_design_reviews( 796 self, 797 state: Literal["open", "closed", "all"] = "all", 798 milestone: Optional[Union[Milestone, str]] = None, 799 labels: Optional[List[str]] = None, 800 ) -> List["DesignReview"]: 801 """ 802 Get all Design Reviews of this Repository. 803 804 https://hub.allspice.io/api/swagger#/repository/repoListPullRequests 805 806 :param state: The state of the Design Reviews to get. If None, all Design Reviews 807 are returned. 808 :param milestone: The milestone of the Design Reviews to get. 809 :param labels: A list of label IDs to filter DRs by. 810 :return: A list of Design Reviews. 811 """ 812 813 params = { 814 "state": state, 815 } 816 if milestone: 817 if isinstance(milestone, Milestone): 818 params["milestone"] = milestone.name 819 else: 820 params["milestone"] = milestone 821 if labels: 822 params["labels"] = ",".join(labels) 823 824 results = self.allspice_client.requests_get_paginated( 825 self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name), 826 params=params, 827 ) 828 return [DesignReview.parse_response(self.allspice_client, result) for result in results] 829 830 def get_commits( 831 self, 832 sha: Optional[str] = None, 833 path: Optional[str] = None, 834 stat: bool = True, 835 ) -> List["Commit"]: 836 """ 837 Get all the Commits of this Repository. 838 839 https://hub.allspice.io/api/swagger#/repository/repoGetAllCommits 840 841 :param sha: The SHA of the commit to start listing commits from. 842 :param path: filepath of a file/dir. 843 :param stat: Include the number of additions and deletions in the response. 844 Disable for speedup. 845 :return: A list of Commits. 846 """ 847 848 data = {} 849 if sha: 850 data["sha"] = sha 851 if path: 852 data["path"] = path 853 if not stat: 854 data["stat"] = False 855 856 try: 857 results = self.allspice_client.requests_get_paginated( 858 Repository.REPO_COMMITS % (self.owner.username, self.name), 859 params=data, 860 ) 861 except ConflictException as err: 862 logging.warning(err) 863 logging.warning("Repository %s/%s is Empty" % (self.owner.username, self.name)) 864 results = [] 865 return [Commit.parse_response(self.allspice_client, result) for result in results] 866 867 def get_issues_state(self, state) -> List["Issue"]: 868 """ 869 DEPRECATED: Use get_issues() instead. 870 871 Get issues of state Issue.open or Issue.closed of a repository. 872 """ 873 874 assert state in [Issue.OPENED, Issue.CLOSED] 875 issues = [] 876 data = {"state": state} 877 results = self.allspice_client.requests_get_paginated( 878 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 879 params=data, 880 ) 881 for result in results: 882 issue = Issue.parse_response(self.allspice_client, result) 883 # adding data not contained in the issue response 884 # See Issue.request() 885 setattr(issue, "_repository", self) 886 Issue._add_read_property("repo", self, issue) 887 Issue._add_read_property("owner", self.owner, issue) 888 issues.append(issue) 889 return issues 890 891 def get_times(self): 892 results = self.allspice_client.requests_get( 893 Repository.REPO_TIMES % (self.owner.username, self.name) 894 ) 895 return results 896 897 def get_user_time(self, username) -> float: 898 if isinstance(username, User): 899 username = username.username 900 results = self.allspice_client.requests_get( 901 Repository.REPO_USER_TIME % (self.owner.username, self.name, username) 902 ) 903 time = sum(r["time"] for r in results) 904 return time 905 906 def get_full_name(self) -> str: 907 return self.owner.username + "/" + self.name 908 909 def create_issue(self, title, assignees=frozenset(), description="") -> ApiObject: 910 data = { 911 "assignees": assignees, 912 "body": description, 913 "closed": False, 914 "title": title, 915 } 916 result = self.allspice_client.requests_post( 917 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 918 data=data, 919 ) 920 921 issue = Issue.parse_response(self.allspice_client, result) 922 setattr(issue, "_repository", self) 923 Issue._add_read_property("repo", self, issue) 924 return issue 925 926 def create_design_review( 927 self, 928 title: str, 929 head: Union[Branch, str], 930 base: Union[Branch, str], 931 assignees: Optional[Set[Union[User, str]]] = None, 932 body: Optional[str] = None, 933 due_date: Optional[datetime] = None, 934 milestone: Optional["Milestone"] = None, 935 ) -> "DesignReview": 936 """ 937 Create a new Design Review. 938 939 See https://hub.allspice.io/api/swagger#/repository/repoCreatePullRequest 940 941 :param title: Title of the Design Review 942 :param head: Branch or name of the branch to merge into the base branch 943 :param base: Branch or name of the branch to merge into 944 :param assignees: Optional. A list of users to assign this review. List can be of 945 User objects or of usernames. 946 :param body: An Optional Description for the Design Review. 947 :param due_date: An Optional Due date for the Design Review. 948 :param milestone: An Optional Milestone for the Design Review 949 :return: The created Design Review 950 """ 951 952 data: dict[str, Any] = { 953 "title": title, 954 } 955 956 if isinstance(head, Branch): 957 data["head"] = head.name 958 else: 959 data["head"] = head 960 if isinstance(base, Branch): 961 data["base"] = base.name 962 else: 963 data["base"] = base 964 if assignees: 965 data["assignees"] = [a.username if isinstance(a, User) else a for a in assignees] 966 if body: 967 data["body"] = body 968 if due_date: 969 data["due_date"] = Util.format_time(due_date) 970 if milestone: 971 data["milestone"] = milestone.id 972 973 result = self.allspice_client.requests_post( 974 self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name), 975 data=data, 976 ) 977 978 return DesignReview.parse_response(self.allspice_client, result) 979 980 def create_milestone( 981 self, 982 title: str, 983 description: str, 984 due_date: Optional[str] = None, 985 state: str = "open", 986 ) -> "Milestone": 987 url = Repository.REPO_MILESTONES.format(owner=self.owner.username, repo=self.name) 988 data = {"title": title, "description": description, "state": state} 989 if due_date: 990 data["due_date"] = due_date 991 result = self.allspice_client.requests_post(url, data=data) 992 return Milestone.parse_response(self.allspice_client, result) 993 994 def create_gitea_hook(self, hook_url: str, events: List[str]): 995 url = f"/repos/{self.owner.username}/{self.name}/hooks" 996 data = { 997 "type": "gitea", 998 "config": {"content_type": "json", "url": hook_url}, 999 "events": events, 1000 "active": True, 1001 } 1002 return self.allspice_client.requests_post(url, data=data) 1003 1004 def list_hooks(self): 1005 url = f"/repos/{self.owner.username}/{self.name}/hooks" 1006 return self.allspice_client.requests_get(url) 1007 1008 def delete_hook(self, id: str): 1009 url = f"/repos/{self.owner.username}/{self.name}/hooks/{id}" 1010 self.allspice_client.requests_delete(url) 1011 1012 def is_collaborator(self, username) -> bool: 1013 if isinstance(username, User): 1014 username = username.username 1015 try: 1016 # returns 204 if its ok, 404 if its not 1017 self.allspice_client.requests_get( 1018 Repository.REPO_IS_COLLABORATOR % (self.owner.username, self.name, username) 1019 ) 1020 return True 1021 except Exception: 1022 return False 1023 1024 def get_users_with_access(self) -> Sequence[User]: 1025 url = f"/repos/{self.owner.username}/{self.name}/collaborators" 1026 response = self.allspice_client.requests_get(url) 1027 collabs = [User.parse_response(self.allspice_client, user) for user in response] 1028 if isinstance(self.owner, User): 1029 return [*collabs, self.owner] 1030 else: 1031 # owner must be org 1032 teams = self.owner.get_teams() 1033 for team in teams: 1034 team_repos = team.get_repos() 1035 if self.name in [n.name for n in team_repos]: 1036 collabs += team.get_members() 1037 return collabs 1038 1039 def remove_collaborator(self, user_name: str): 1040 url = f"/repos/{self.owner.username}/{self.name}/collaborators/{user_name}" 1041 self.allspice_client.requests_delete(url) 1042 1043 def transfer_ownership( 1044 self, 1045 new_owner: Union[User, Organization], 1046 new_teams: Set[Team] | FrozenSet[Team] = frozenset(), 1047 ): 1048 url = Repository.REPO_TRANSFER.format(owner=self.owner.username, repo=self.name) 1049 data: dict[str, Any] = {"new_owner": new_owner.username} 1050 if isinstance(new_owner, Organization): 1051 new_team_ids = [team.id for team in new_teams if team in new_owner.get_teams()] 1052 data["team_ids"] = new_team_ids 1053 self.allspice_client.requests_post(url, data=data) 1054 # TODO: make sure this instance is either updated or discarded 1055 1056 def get_git_content( 1057 self, 1058 ref: Optional["Ref"] = None, 1059 commit: "Optional[Commit]" = None, 1060 ) -> List[Content]: 1061 """ 1062 Get the metadata for all files in the root directory. 1063 1064 https://hub.allspice.io/api/swagger#/repository/repoGetContentsList 1065 1066 :param ref: branch or commit to get content from 1067 :param commit: commit to get content from (deprecated) 1068 """ 1069 url = f"/repos/{self.owner.username}/{self.name}/contents" 1070 data = Util.data_params_for_ref(ref or commit) 1071 1072 result = [ 1073 Content.parse_response(self.allspice_client, f) 1074 for f in self.allspice_client.requests_get(url, data) 1075 ] 1076 return result 1077 1078 def get_tree(self, ref: Optional[Ref] = None, recursive: bool = False) -> List[GitEntry]: 1079 """ 1080 Get the repository's tree on a given ref. 1081 1082 By default, this will only return the top-level entries in the tree. If you want 1083 to get the entire tree, set `recursive` to True. 1084 1085 :param ref: The ref to get the tree from. If not provided, the default branch is used. 1086 :param recursive: Whether to get the entire tree or just the top-level entries. 1087 """ 1088 1089 ref = Util.data_params_for_ref(ref).get("ref", self.default_branch) 1090 url = self.REPO_GET_TREE.format(owner=self.owner.username, repo=self.name, ref=ref) 1091 params = {"recursive": recursive} 1092 results = self.allspice_client.requests_get_paginated(url, params=params) 1093 return [GitEntry.parse_response(self.allspice_client, result) for result in results] 1094 1095 def get_file_content( 1096 self, 1097 content: Content, 1098 ref: Optional[Ref] = None, 1099 commit: Optional[Commit] = None, 1100 ) -> Union[str, List["Content"]]: 1101 """https://hub.allspice.io/api/swagger#/repository/repoGetContents""" 1102 url = f"/repos/{self.owner.username}/{self.name}/contents/{content.path}" 1103 data = Util.data_params_for_ref(ref or commit) 1104 1105 if content.type == Content.FILE: 1106 return self.allspice_client.requests_get(url, data)["content"] 1107 else: 1108 return [ 1109 Content.parse_response(self.allspice_client, f) 1110 for f in self.allspice_client.requests_get(url, data) 1111 ] 1112 1113 def get_raw_file( 1114 self, 1115 file_path: str, 1116 ref: Optional[Ref] = None, 1117 ) -> bytes: 1118 """ 1119 Get the raw, binary data of a single file. 1120 1121 Note 1: if the file you are requesting is a text file, you might want to 1122 use .decode() on the result to get a string. For example: 1123 1124 content = repo.get_raw_file("file.txt").decode("utf-8") 1125 1126 Note 2: this method will store the entire file in memory. If you want 1127 to download a large file, you might want to use `download_to_file` 1128 instead. 1129 1130 See https://hub.allspice.io/api/swagger#/repository/repoGetRawFile 1131 1132 :param file_path: The path to the file to get. 1133 :param ref: The branch or commit to get the file from. If not provided, 1134 the default branch is used. 1135 """ 1136 1137 url = self.REPO_GET_RAW_FILE.format( 1138 owner=self.owner.username, 1139 repo=self.name, 1140 path=file_path, 1141 ) 1142 params = Util.data_params_for_ref(ref) 1143 return self.allspice_client.requests_get_raw(url, params=params) 1144 1145 def download_to_file( 1146 self, 1147 file_path: str, 1148 io: IO, 1149 ref: Optional[Ref] = None, 1150 ) -> None: 1151 """ 1152 Download the binary data of a file to a file-like object. 1153 1154 Example: 1155 1156 with open("schematic.DSN", "wb") as f: 1157 Repository.download_to_file("Schematics/my_schematic.DSN", f) 1158 1159 :param file_path: The path to the file in the repository from the root 1160 of the repository. 1161 :param io: The file-like object to write the data to. 1162 """ 1163 1164 url = self.allspice_client._AllSpice__get_url( 1165 self.REPO_GET_RAW_FILE.format( 1166 owner=self.owner.username, 1167 repo=self.name, 1168 path=file_path, 1169 ) 1170 ) 1171 params = Util.data_params_for_ref(ref) 1172 response = self.allspice_client.requests.get( 1173 url, 1174 params=params, 1175 headers=self.allspice_client.headers, 1176 stream=True, 1177 ) 1178 1179 for chunk in response.iter_content(chunk_size=4096): 1180 if chunk: 1181 io.write(chunk) 1182 1183 def get_generated_json(self, content: Union[Content, str], ref: Optional[Ref] = None) -> dict: 1184 """ 1185 Get the json blob for a cad file if it exists, otherwise enqueue 1186 a new job and return a 503 status. 1187 1188 WARNING: This is still experimental and not recommended for critical 1189 applications. The structure and content of the returned dictionary can 1190 change at any time. 1191 1192 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON 1193 """ 1194 1195 if isinstance(content, Content): 1196 content = content.path 1197 1198 url = self.REPO_GET_ALLSPICE_JSON.format( 1199 owner=self.owner.username, 1200 repo=self.name, 1201 content=content, 1202 ) 1203 data = Util.data_params_for_ref(ref) 1204 return self.allspice_client.requests_get(url, data) 1205 1206 def get_generated_svg(self, content: Union[Content, str], ref: Optional[Ref] = None) -> bytes: 1207 """ 1208 Get the svg blob for a cad file if it exists, otherwise enqueue 1209 a new job and return a 503 status. 1210 1211 WARNING: This is still experimental and not yet recommended for 1212 critical applications. The content of the returned svg can change 1213 at any time. 1214 1215 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG 1216 """ 1217 1218 if isinstance(content, Content): 1219 content = content.path 1220 1221 url = self.REPO_GET_ALLSPICE_SVG.format( 1222 owner=self.owner.username, 1223 repo=self.name, 1224 content=content, 1225 ) 1226 data = Util.data_params_for_ref(ref) 1227 return self.allspice_client.requests_get_raw(url, data) 1228 1229 def get_generated_projectdata( 1230 self, content: Union[Content, str], ref: Optional[Ref] = None 1231 ) -> dict: 1232 """ 1233 Get the json project data based on the cad file provided 1234 1235 WARNING: This is still experimental and not yet recommended for 1236 critical applications. The content of the returned dictionary can change 1237 at any time. 1238 1239 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceProject 1240 """ 1241 if isinstance(content, Content): 1242 content = content.path 1243 1244 url = self.REPO_GET_ALLSPICE_PROJECT.format( 1245 owner=self.owner.username, 1246 repo=self.name, 1247 content=content, 1248 ) 1249 data = Util.data_params_for_ref(ref) 1250 return self.allspice_client.requests_get(url, data) 1251 1252 def create_file(self, file_path: str, content: str, data: Optional[dict] = None): 1253 """https://hub.allspice.io/api/swagger#/repository/repoCreateFile""" 1254 if not data: 1255 data = {} 1256 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1257 data.update({"content": content}) 1258 return self.allspice_client.requests_post(url, data) 1259 1260 def change_file(self, file_path: str, file_sha: str, content: str, data: Optional[dict] = None): 1261 """https://hub.allspice.io/api/swagger#/repository/repoCreateFile""" 1262 if not data: 1263 data = {} 1264 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1265 data.update({"sha": file_sha, "content": content}) 1266 return self.allspice_client.requests_put(url, data) 1267 1268 def delete_file(self, file_path: str, file_sha: str, data: Optional[dict] = None): 1269 """https://hub.allspice.io/api/swagger#/repository/repoDeleteFile""" 1270 if not data: 1271 data = {} 1272 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1273 data.update({"sha": file_sha}) 1274 return self.allspice_client.requests_delete(url, data) 1275 1276 def get_archive( 1277 self, 1278 ref: Ref = "main", 1279 archive_format: ArchiveFormat = ArchiveFormat.ZIP, 1280 ) -> bytes: 1281 """ 1282 Download all the files in a specific ref of a repository as a zip or tarball 1283 archive. 1284 1285 https://hub.allspice.io/api/swagger#/repository/repoGetArchive 1286 1287 :param ref: branch or commit to get content from, defaults to the "main" branch 1288 :param archive_format: zip or tar, defaults to zip 1289 """ 1290 1291 ref_string = Util.data_params_for_ref(ref)["ref"] 1292 url = self.REPO_GET_ARCHIVE.format( 1293 owner=self.owner.username, 1294 repo=self.name, 1295 ref=ref_string, 1296 format=archive_format.value, 1297 ) 1298 return self.allspice_client.requests_get_raw(url) 1299 1300 def get_topics(self) -> list[str]: 1301 """ 1302 Gets the list of topics on this repository. 1303 1304 See http://localhost:3000/api/swagger#/repository/repoListTopics 1305 """ 1306 1307 url = self.REPO_GET_TOPICS.format( 1308 owner=self.owner.username, 1309 repo=self.name, 1310 ) 1311 return self.allspice_client.requests_get(url)["topics"] 1312 1313 def add_topic(self, topic: str): 1314 """ 1315 Adds a topic to the repository. 1316 1317 See https://hub.allspice.io/api/swagger#/repository/repoAddTopic 1318 1319 :param topic: The topic to add. Topic names must consist only of 1320 lowercase letters, numnbers and dashes (-), and cannot start with 1321 dashes. Topic names also must be under 35 characters long. 1322 """ 1323 1324 url = self.REPO_ADD_TOPIC.format(owner=self.owner.username, repo=self.name, topic=topic) 1325 self.allspice_client.requests_put(url) 1326 1327 def create_release( 1328 self, 1329 tag_name: str, 1330 name: Optional[str] = None, 1331 body: Optional[str] = None, 1332 draft: bool = False, 1333 ): 1334 """ 1335 Create a release for this repository. The release will be created for 1336 the tag with the given name. If there is no tag with this name, create 1337 the tag first. 1338 1339 See https://hub.allspice.io/api/swagger#/repository/repoCreateRelease 1340 """ 1341 1342 url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name) 1343 data = { 1344 "tag_name": tag_name, 1345 "draft": draft, 1346 } 1347 if name is not None: 1348 data["name"] = name 1349 if body is not None: 1350 data["body"] = body 1351 response = self.allspice_client.requests_post(url, data) 1352 return Release.parse_response(self.allspice_client, response, self) 1353 1354 def get_releases( 1355 self, draft: Optional[bool] = None, pre_release: Optional[bool] = None 1356 ) -> List[Release]: 1357 """ 1358 Get the list of releases for this repository. 1359 1360 See https://hub.allspice.io/api/swagger#/repository/repoListReleases 1361 """ 1362 1363 data = {} 1364 1365 if draft is not None: 1366 data["draft"] = draft 1367 if pre_release is not None: 1368 data["pre-release"] = pre_release 1369 1370 url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name) 1371 responses = self.allspice_client.requests_get_paginated(url, params=data) 1372 1373 return [ 1374 Release.parse_response(self.allspice_client, response, self) for response in responses 1375 ] 1376 1377 def get_latest_release(self) -> Release: 1378 """ 1379 Get the latest release for this repository. 1380 1381 See https://hub.allspice.io/api/swagger#/repository/repoGetLatestRelease 1382 """ 1383 1384 url = self.REPO_GET_LATEST_RELEASE.format(owner=self.owner.username, repo=self.name) 1385 response = self.allspice_client.requests_get(url) 1386 release = Release.parse_response(self.allspice_client, response, self) 1387 return release 1388 1389 def get_release_by_tag(self, tag: str) -> Release: 1390 """ 1391 Get a release by its tag. 1392 1393 See https://hub.allspice.io/api/swagger#/repository/repoGetReleaseByTag 1394 """ 1395 1396 url = self.REPO_GET_RELEASE_BY_TAG.format( 1397 owner=self.owner.username, repo=self.name, tag=tag 1398 ) 1399 response = self.allspice_client.requests_get(url) 1400 release = Release.parse_response(self.allspice_client, response, self) 1401 return release 1402 1403 def get_commit_statuses( 1404 self, 1405 commit: Union[str, Commit], 1406 sort: Optional[CommitStatusSort] = None, 1407 state: Optional[CommitStatusState] = None, 1408 ) -> List[CommitStatus]: 1409 """ 1410 Get a list of statuses for a commit. 1411 1412 This is roughly equivalent to the Commit.get_statuses method, but this 1413 method allows you to sort and filter commits and is more convenient if 1414 you have a commit SHA and don't need to get the commit itself. 1415 1416 See https://hub.allspice.io/api/swagger#/repository/repoListStatuses 1417 """ 1418 1419 if isinstance(commit, Commit): 1420 commit = commit.sha 1421 1422 params = {} 1423 if sort is not None: 1424 params["sort"] = sort.value 1425 if state is not None: 1426 params["state"] = state.value 1427 1428 url = self.REPO_GET_COMMIT_STATUS.format( 1429 owner=self.owner.username, repo=self.name, sha=commit 1430 ) 1431 response = self.allspice_client.requests_get_paginated(url, params=params) 1432 return [CommitStatus.parse_response(self.allspice_client, status) for status in response] 1433 1434 def create_commit_status( 1435 self, 1436 commit: Union[str, Commit], 1437 context: Optional[str] = None, 1438 description: Optional[str] = None, 1439 state: Optional[CommitStatusState] = None, 1440 target_url: Optional[str] = None, 1441 ) -> CommitStatus: 1442 """ 1443 Create a status on a commit. 1444 1445 See https://hub.allspice.io/api/swagger#/repository/repoCreateStatus 1446 """ 1447 1448 if isinstance(commit, Commit): 1449 commit = commit.sha 1450 1451 data = {} 1452 if context is not None: 1453 data["context"] = context 1454 if description is not None: 1455 data["description"] = description 1456 if state is not None: 1457 data["state"] = state.value 1458 if target_url is not None: 1459 data["target_url"] = target_url 1460 1461 url = self.REPO_GET_COMMIT_STATUS.format( 1462 owner=self.owner.username, repo=self.name, sha=commit 1463 ) 1464 response = self.allspice_client.requests_post(url, data=data) 1465 return CommitStatus.parse_response(self.allspice_client, response) 1466 1467 def delete(self): 1468 self.allspice_client.requests_delete( 1469 Repository.REPO_DELETE % (self.owner.username, self.name) 1470 ) 1471 self.deleted = True
617 @classmethod 618 def search( 619 cls, 620 allspice_client, 621 query: Optional[str] = None, 622 topic: bool = False, 623 include_description: bool = False, 624 user: Optional[User] = None, 625 owner_to_prioritize: Union[User, Organization, None] = None, 626 ) -> list[Repository]: 627 """ 628 Search for repositories. 629 630 See https://hub.allspice.io/api/swagger#/repository/repoSearch 631 632 :param query: The query string to search for 633 :param topic: If true, the query string will only be matched against the 634 repository's topic. 635 :param include_description: If true, the query string will be matched 636 against the repository's description as well. 637 :param user: If specified, only repositories that this user owns or 638 contributes to will be searched. 639 :param owner_to_prioritize: If specified, repositories owned by the 640 given entity will be prioritized in the search. 641 :returns: All repositories matching the query. If there are many 642 repositories matching this query, this may take some time. 643 """ 644 645 params = {} 646 647 if query is not None: 648 params["q"] = query 649 if topic: 650 params["topic"] = topic 651 if include_description: 652 params["include_description"] = include_description 653 if user is not None: 654 params["user"] = user.id 655 if owner_to_prioritize is not None: 656 params["owner_to_prioritize"] = owner_to_prioritize.id 657 658 responses = allspice_client.requests_get_paginated(cls.REPO_SEARCH, params=params) 659 660 return [Repository.parse_response(allspice_client, response) for response in responses]
Search for repositories.
See allspice.allspice.io/api/swagger#/repository/repoSearch">https://huballspice.allspice.io/api/swagger#/repository/repoSearch
Parameters
- query: The query string to search for
- topic: If true, the query string will only be matched against the repository's topic.
- include_description: If true, the query string will be matched against the repository's description as well.
- user: If specified, only repositories that this user owns or contributes to will be searched.
- owner_to_prioritize: If specified, repositories owned by the given entity will be prioritized in the search. :returns: All repositories matching the query. If there are many repositories matching this query, this may take some time.
696 def get_branches(self) -> List["Branch"]: 697 """Get all the Branches of this Repository.""" 698 699 results = self.allspice_client.requests_get_paginated( 700 Repository.REPO_BRANCHES % (self.owner.username, self.name) 701 ) 702 return [Branch.parse_response(self.allspice_client, result) for result in results]
Get all the Branches of this Repository.
704 def get_branch(self, name: str) -> "Branch": 705 """Get a specific Branch of this Repository.""" 706 result = self.allspice_client.requests_get( 707 Repository.REPO_BRANCH.format(owner=self.owner.username, repo=self.name, branch=name) 708 ) 709 return Branch.parse_response(self.allspice_client, result)
Get a specific Branch of this Repository.
711 def add_branch(self, create_from: Ref, newname: str) -> "Branch": 712 """Add a branch to the repository""" 713 # Note: will only work with gitea 1.13 or higher! 714 715 ref_name = Util.data_params_for_ref(create_from) 716 if "ref" not in ref_name: 717 raise ValueError("create_from must be a Branch, Commit or string") 718 ref_name = ref_name["ref"] 719 720 data = {"new_branch_name": newname, "old_ref_name": ref_name} 721 result = self.allspice_client.requests_post( 722 Repository.REPO_BRANCHES % (self.owner.username, self.name), data=data 723 ) 724 return Branch.parse_response(self.allspice_client, result)
Add a branch to the repository
726 def get_issues( 727 self, 728 state: Literal["open", "closed", "all"] = "all", 729 search_query: Optional[str] = None, 730 labels: Optional[List[str]] = None, 731 milestones: Optional[List[Union[Milestone, str]]] = None, 732 assignee: Optional[Union[User, str]] = None, 733 since: Optional[datetime] = None, 734 before: Optional[datetime] = None, 735 ) -> List["Issue"]: 736 """ 737 Get all Issues of this Repository (open and closed) 738 739 https://hub.allspice.io/api/swagger#/repository/repoListIssues 740 741 All params of this method are optional filters. If you don't specify a filter, it 742 will not be applied. 743 744 :param state: The state of the Issues to get. If None, all Issues are returned. 745 :param search_query: Filter issues by text. This is equivalent to searching for 746 `search_query` in the Issues on the web interface. 747 :param labels: Filter issues by labels. 748 :param milestones: Filter issues by milestones. 749 :param assignee: Filter issues by the assigned user. 750 :param since: Filter issues by the date they were created. 751 :param before: Filter issues by the date they were created. 752 :return: A list of Issues. 753 """ 754 755 data = { 756 "state": state, 757 } 758 if search_query: 759 data["q"] = search_query 760 if labels: 761 data["labels"] = ",".join(labels) 762 if milestones: 763 data["milestone"] = ",".join( 764 [ 765 milestone.name if isinstance(milestone, Milestone) else milestone 766 for milestone in milestones 767 ] 768 ) 769 if assignee: 770 if isinstance(assignee, User): 771 data["assignee"] = assignee.username 772 else: 773 data["assignee"] = assignee 774 if since: 775 data["since"] = Util.format_time(since) 776 if before: 777 data["before"] = Util.format_time(before) 778 779 results = self.allspice_client.requests_get_paginated( 780 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 781 params=data, 782 ) 783 784 issues = [] 785 for result in results: 786 issue = Issue.parse_response(self.allspice_client, result) 787 # See Issue.request 788 setattr(issue, "_repository", self) 789 # This is mostly for compatibility with an older implementation 790 Issue._add_read_property("repo", self, issue) 791 issues.append(issue) 792 793 return issues
Get all Issues of this Repository (open and closed)
allspice.allspice.io/api/swagger#/repository/repoListIssues">https://huballspice.allspice.io/api/swagger#/repository/repoListIssues
All params of this method are optional filters. If you don't specify a filter, it will not be applied.
Parameters
- state: The state of the Issues to get. If None, all Issues are returned.
- search_query: Filter issues by text. This is equivalent to searching for
search_query
in the Issues on the web interface. - labels: Filter issues by labels.
- milestones: Filter issues by milestones.
- assignee: Filter issues by the assigned user.
- since: Filter issues by the date they were created.
- before: Filter issues by the date they were created.
Returns
A list of Issues.
795 def get_design_reviews( 796 self, 797 state: Literal["open", "closed", "all"] = "all", 798 milestone: Optional[Union[Milestone, str]] = None, 799 labels: Optional[List[str]] = None, 800 ) -> List["DesignReview"]: 801 """ 802 Get all Design Reviews of this Repository. 803 804 https://hub.allspice.io/api/swagger#/repository/repoListPullRequests 805 806 :param state: The state of the Design Reviews to get. If None, all Design Reviews 807 are returned. 808 :param milestone: The milestone of the Design Reviews to get. 809 :param labels: A list of label IDs to filter DRs by. 810 :return: A list of Design Reviews. 811 """ 812 813 params = { 814 "state": state, 815 } 816 if milestone: 817 if isinstance(milestone, Milestone): 818 params["milestone"] = milestone.name 819 else: 820 params["milestone"] = milestone 821 if labels: 822 params["labels"] = ",".join(labels) 823 824 results = self.allspice_client.requests_get_paginated( 825 self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name), 826 params=params, 827 ) 828 return [DesignReview.parse_response(self.allspice_client, result) for result in results]
Get all Design Reviews of this Repository.
allspice.allspice.io/api/swagger#/repository/repoListPullRequests">https://huballspice.allspice.io/api/swagger#/repository/repoListPullRequests
Parameters
- state: The state of the Design Reviews to get. If None, all Design Reviews are returned.
- milestone: The milestone of the Design Reviews to get.
- labels: A list of label IDs to filter DRs by.
Returns
A list of Design Reviews.
830 def get_commits( 831 self, 832 sha: Optional[str] = None, 833 path: Optional[str] = None, 834 stat: bool = True, 835 ) -> List["Commit"]: 836 """ 837 Get all the Commits of this Repository. 838 839 https://hub.allspice.io/api/swagger#/repository/repoGetAllCommits 840 841 :param sha: The SHA of the commit to start listing commits from. 842 :param path: filepath of a file/dir. 843 :param stat: Include the number of additions and deletions in the response. 844 Disable for speedup. 845 :return: A list of Commits. 846 """ 847 848 data = {} 849 if sha: 850 data["sha"] = sha 851 if path: 852 data["path"] = path 853 if not stat: 854 data["stat"] = False 855 856 try: 857 results = self.allspice_client.requests_get_paginated( 858 Repository.REPO_COMMITS % (self.owner.username, self.name), 859 params=data, 860 ) 861 except ConflictException as err: 862 logging.warning(err) 863 logging.warning("Repository %s/%s is Empty" % (self.owner.username, self.name)) 864 results = [] 865 return [Commit.parse_response(self.allspice_client, result) for result in results]
Get all the Commits of this Repository.
allspice.allspice.io/api/swagger#/repository/repoGetAllCommits">https://huballspice.allspice.io/api/swagger#/repository/repoGetAllCommits
Parameters
- sha: The SHA of the commit to start listing commits from.
- path: filepath of a file/dir.
- stat: Include the number of additions and deletions in the response. Disable for speedup.
Returns
A list of Commits.
867 def get_issues_state(self, state) -> List["Issue"]: 868 """ 869 DEPRECATED: Use get_issues() instead. 870 871 Get issues of state Issue.open or Issue.closed of a repository. 872 """ 873 874 assert state in [Issue.OPENED, Issue.CLOSED] 875 issues = [] 876 data = {"state": state} 877 results = self.allspice_client.requests_get_paginated( 878 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 879 params=data, 880 ) 881 for result in results: 882 issue = Issue.parse_response(self.allspice_client, result) 883 # adding data not contained in the issue response 884 # See Issue.request() 885 setattr(issue, "_repository", self) 886 Issue._add_read_property("repo", self, issue) 887 Issue._add_read_property("owner", self.owner, issue) 888 issues.append(issue) 889 return issues
DEPRECATED: Use get_issues() instead.
Get issues of state Issue.open or Issue.closed of a repository.
897 def get_user_time(self, username) -> float: 898 if isinstance(username, User): 899 username = username.username 900 results = self.allspice_client.requests_get( 901 Repository.REPO_USER_TIME % (self.owner.username, self.name, username) 902 ) 903 time = sum(r["time"] for r in results) 904 return time
909 def create_issue(self, title, assignees=frozenset(), description="") -> ApiObject: 910 data = { 911 "assignees": assignees, 912 "body": description, 913 "closed": False, 914 "title": title, 915 } 916 result = self.allspice_client.requests_post( 917 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 918 data=data, 919 ) 920 921 issue = Issue.parse_response(self.allspice_client, result) 922 setattr(issue, "_repository", self) 923 Issue._add_read_property("repo", self, issue) 924 return issue
926 def create_design_review( 927 self, 928 title: str, 929 head: Union[Branch, str], 930 base: Union[Branch, str], 931 assignees: Optional[Set[Union[User, str]]] = None, 932 body: Optional[str] = None, 933 due_date: Optional[datetime] = None, 934 milestone: Optional["Milestone"] = None, 935 ) -> "DesignReview": 936 """ 937 Create a new Design Review. 938 939 See https://hub.allspice.io/api/swagger#/repository/repoCreatePullRequest 940 941 :param title: Title of the Design Review 942 :param head: Branch or name of the branch to merge into the base branch 943 :param base: Branch or name of the branch to merge into 944 :param assignees: Optional. A list of users to assign this review. List can be of 945 User objects or of usernames. 946 :param body: An Optional Description for the Design Review. 947 :param due_date: An Optional Due date for the Design Review. 948 :param milestone: An Optional Milestone for the Design Review 949 :return: The created Design Review 950 """ 951 952 data: dict[str, Any] = { 953 "title": title, 954 } 955 956 if isinstance(head, Branch): 957 data["head"] = head.name 958 else: 959 data["head"] = head 960 if isinstance(base, Branch): 961 data["base"] = base.name 962 else: 963 data["base"] = base 964 if assignees: 965 data["assignees"] = [a.username if isinstance(a, User) else a for a in assignees] 966 if body: 967 data["body"] = body 968 if due_date: 969 data["due_date"] = Util.format_time(due_date) 970 if milestone: 971 data["milestone"] = milestone.id 972 973 result = self.allspice_client.requests_post( 974 self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name), 975 data=data, 976 ) 977 978 return DesignReview.parse_response(self.allspice_client, result)
Create a new Design Review.
See allspice.allspice.io/api/swagger#/repository/repoCreatePullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoCreatePullRequest
Parameters
- title: Title of the Design Review
- head: Branch or name of the branch to merge into the base branch
- base: Branch or name of the branch to merge into
- assignees: Optional. A list of users to assign this review. List can be of User objects or of usernames.
- body: An Optional Description for the Design Review.
- due_date: An Optional Due date for the Design Review.
- milestone: An Optional Milestone for the Design Review
Returns
The created Design Review
980 def create_milestone( 981 self, 982 title: str, 983 description: str, 984 due_date: Optional[str] = None, 985 state: str = "open", 986 ) -> "Milestone": 987 url = Repository.REPO_MILESTONES.format(owner=self.owner.username, repo=self.name) 988 data = {"title": title, "description": description, "state": state} 989 if due_date: 990 data["due_date"] = due_date 991 result = self.allspice_client.requests_post(url, data=data) 992 return Milestone.parse_response(self.allspice_client, result)
994 def create_gitea_hook(self, hook_url: str, events: List[str]): 995 url = f"/repos/{self.owner.username}/{self.name}/hooks" 996 data = { 997 "type": "gitea", 998 "config": {"content_type": "json", "url": hook_url}, 999 "events": events, 1000 "active": True, 1001 } 1002 return self.allspice_client.requests_post(url, data=data)
1012 def is_collaborator(self, username) -> bool: 1013 if isinstance(username, User): 1014 username = username.username 1015 try: 1016 # returns 204 if its ok, 404 if its not 1017 self.allspice_client.requests_get( 1018 Repository.REPO_IS_COLLABORATOR % (self.owner.username, self.name, username) 1019 ) 1020 return True 1021 except Exception: 1022 return False
1024 def get_users_with_access(self) -> Sequence[User]: 1025 url = f"/repos/{self.owner.username}/{self.name}/collaborators" 1026 response = self.allspice_client.requests_get(url) 1027 collabs = [User.parse_response(self.allspice_client, user) for user in response] 1028 if isinstance(self.owner, User): 1029 return [*collabs, self.owner] 1030 else: 1031 # owner must be org 1032 teams = self.owner.get_teams() 1033 for team in teams: 1034 team_repos = team.get_repos() 1035 if self.name in [n.name for n in team_repos]: 1036 collabs += team.get_members() 1037 return collabs
1043 def transfer_ownership( 1044 self, 1045 new_owner: Union[User, Organization], 1046 new_teams: Set[Team] | FrozenSet[Team] = frozenset(), 1047 ): 1048 url = Repository.REPO_TRANSFER.format(owner=self.owner.username, repo=self.name) 1049 data: dict[str, Any] = {"new_owner": new_owner.username} 1050 if isinstance(new_owner, Organization): 1051 new_team_ids = [team.id for team in new_teams if team in new_owner.get_teams()] 1052 data["team_ids"] = new_team_ids 1053 self.allspice_client.requests_post(url, data=data) 1054 # TODO: make sure this instance is either updated or discarded
1056 def get_git_content( 1057 self, 1058 ref: Optional["Ref"] = None, 1059 commit: "Optional[Commit]" = None, 1060 ) -> List[Content]: 1061 """ 1062 Get the metadata for all files in the root directory. 1063 1064 https://hub.allspice.io/api/swagger#/repository/repoGetContentsList 1065 1066 :param ref: branch or commit to get content from 1067 :param commit: commit to get content from (deprecated) 1068 """ 1069 url = f"/repos/{self.owner.username}/{self.name}/contents" 1070 data = Util.data_params_for_ref(ref or commit) 1071 1072 result = [ 1073 Content.parse_response(self.allspice_client, f) 1074 for f in self.allspice_client.requests_get(url, data) 1075 ] 1076 return result
Get the metadata for all files in the root directory.
allspice.allspice.io/api/swagger#/repository/repoGetContentsList">https://huballspice.allspice.io/api/swagger#/repository/repoGetContentsList
Parameters
- ref: branch or commit to get content from
- commit: commit to get content from (deprecated)
1078 def get_tree(self, ref: Optional[Ref] = None, recursive: bool = False) -> List[GitEntry]: 1079 """ 1080 Get the repository's tree on a given ref. 1081 1082 By default, this will only return the top-level entries in the tree. If you want 1083 to get the entire tree, set `recursive` to True. 1084 1085 :param ref: The ref to get the tree from. If not provided, the default branch is used. 1086 :param recursive: Whether to get the entire tree or just the top-level entries. 1087 """ 1088 1089 ref = Util.data_params_for_ref(ref).get("ref", self.default_branch) 1090 url = self.REPO_GET_TREE.format(owner=self.owner.username, repo=self.name, ref=ref) 1091 params = {"recursive": recursive} 1092 results = self.allspice_client.requests_get_paginated(url, params=params) 1093 return [GitEntry.parse_response(self.allspice_client, result) for result in results]
Get the repository's tree on a given ref.
By default, this will only return the top-level entries in the tree. If you want
to get the entire tree, set recursive
to True.
Parameters
- ref: The ref to get the tree from. If not provided, the default branch is used.
- recursive: Whether to get the entire tree or just the top-level entries.
1095 def get_file_content( 1096 self, 1097 content: Content, 1098 ref: Optional[Ref] = None, 1099 commit: Optional[Commit] = None, 1100 ) -> Union[str, List["Content"]]: 1101 """https://hub.allspice.io/api/swagger#/repository/repoGetContents""" 1102 url = f"/repos/{self.owner.username}/{self.name}/contents/{content.path}" 1103 data = Util.data_params_for_ref(ref or commit) 1104 1105 if content.type == Content.FILE: 1106 return self.allspice_client.requests_get(url, data)["content"] 1107 else: 1108 return [ 1109 Content.parse_response(self.allspice_client, f) 1110 for f in self.allspice_client.requests_get(url, data) 1111 ]
allspice.allspice.io/api/swagger#/repository/repoGetContents">https://huballspice.allspice.io/api/swagger#/repository/repoGetContents
1113 def get_raw_file( 1114 self, 1115 file_path: str, 1116 ref: Optional[Ref] = None, 1117 ) -> bytes: 1118 """ 1119 Get the raw, binary data of a single file. 1120 1121 Note 1: if the file you are requesting is a text file, you might want to 1122 use .decode() on the result to get a string. For example: 1123 1124 content = repo.get_raw_file("file.txt").decode("utf-8") 1125 1126 Note 2: this method will store the entire file in memory. If you want 1127 to download a large file, you might want to use `download_to_file` 1128 instead. 1129 1130 See https://hub.allspice.io/api/swagger#/repository/repoGetRawFile 1131 1132 :param file_path: The path to the file to get. 1133 :param ref: The branch or commit to get the file from. If not provided, 1134 the default branch is used. 1135 """ 1136 1137 url = self.REPO_GET_RAW_FILE.format( 1138 owner=self.owner.username, 1139 repo=self.name, 1140 path=file_path, 1141 ) 1142 params = Util.data_params_for_ref(ref) 1143 return self.allspice_client.requests_get_raw(url, params=params)
Get the raw, binary data of a single file.
Note 1: if the file you are requesting is a text file, you might want to use .decode() on the result to get a string. For example:
content = repo.get_raw_file("file.txt").decode("utf-8")
Note 2: this method will store the entire file in memory. If you want
to download a large file, you might want to use download_to_file
instead.
See allspice.allspice.io/api/swagger#/repository/repoGetRawFile">https://huballspice.allspice.io/api/swagger#/repository/repoGetRawFile
Parameters
- file_path: The path to the file to get.
- ref: The branch or commit to get the file from. If not provided, the default branch is used.
1145 def download_to_file( 1146 self, 1147 file_path: str, 1148 io: IO, 1149 ref: Optional[Ref] = None, 1150 ) -> None: 1151 """ 1152 Download the binary data of a file to a file-like object. 1153 1154 Example: 1155 1156 with open("schematic.DSN", "wb") as f: 1157 Repository.download_to_file("Schematics/my_schematic.DSN", f) 1158 1159 :param file_path: The path to the file in the repository from the root 1160 of the repository. 1161 :param io: The file-like object to write the data to. 1162 """ 1163 1164 url = self.allspice_client._AllSpice__get_url( 1165 self.REPO_GET_RAW_FILE.format( 1166 owner=self.owner.username, 1167 repo=self.name, 1168 path=file_path, 1169 ) 1170 ) 1171 params = Util.data_params_for_ref(ref) 1172 response = self.allspice_client.requests.get( 1173 url, 1174 params=params, 1175 headers=self.allspice_client.headers, 1176 stream=True, 1177 ) 1178 1179 for chunk in response.iter_content(chunk_size=4096): 1180 if chunk: 1181 io.write(chunk)
Download the binary data of a file to a file-like object.
Example:
with open("schematic.DSN", "wb") as f:
Repository.download_to_file("Schematics/my_schematic.DSN", f)
Parameters
- file_path: The path to the file in the repository from the root of the repository.
- io: The file-like object to write the data to.
1183 def get_generated_json(self, content: Union[Content, str], ref: Optional[Ref] = None) -> dict: 1184 """ 1185 Get the json blob for a cad file if it exists, otherwise enqueue 1186 a new job and return a 503 status. 1187 1188 WARNING: This is still experimental and not recommended for critical 1189 applications. The structure and content of the returned dictionary can 1190 change at any time. 1191 1192 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON 1193 """ 1194 1195 if isinstance(content, Content): 1196 content = content.path 1197 1198 url = self.REPO_GET_ALLSPICE_JSON.format( 1199 owner=self.owner.username, 1200 repo=self.name, 1201 content=content, 1202 ) 1203 data = Util.data_params_for_ref(ref) 1204 return self.allspice_client.requests_get(url, data)
Get the json blob for a cad file if it exists, otherwise enqueue a new job and return a 503 status.
WARNING: This is still experimental and not recommended for critical applications. The structure and content of the returned dictionary can change at any time.
See allspice.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON">https://huballspice.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON
1206 def get_generated_svg(self, content: Union[Content, str], ref: Optional[Ref] = None) -> bytes: 1207 """ 1208 Get the svg blob for a cad file if it exists, otherwise enqueue 1209 a new job and return a 503 status. 1210 1211 WARNING: This is still experimental and not yet recommended for 1212 critical applications. The content of the returned svg can change 1213 at any time. 1214 1215 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG 1216 """ 1217 1218 if isinstance(content, Content): 1219 content = content.path 1220 1221 url = self.REPO_GET_ALLSPICE_SVG.format( 1222 owner=self.owner.username, 1223 repo=self.name, 1224 content=content, 1225 ) 1226 data = Util.data_params_for_ref(ref) 1227 return self.allspice_client.requests_get_raw(url, data)
Get the svg blob for a cad file if it exists, otherwise enqueue a new job and return a 503 status.
WARNING: This is still experimental and not yet recommended for critical applications. The content of the returned svg can change at any time.
See allspice.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG">https://huballspice.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG
1229 def get_generated_projectdata( 1230 self, content: Union[Content, str], ref: Optional[Ref] = None 1231 ) -> dict: 1232 """ 1233 Get the json project data based on the cad file provided 1234 1235 WARNING: This is still experimental and not yet recommended for 1236 critical applications. The content of the returned dictionary can change 1237 at any time. 1238 1239 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceProject 1240 """ 1241 if isinstance(content, Content): 1242 content = content.path 1243 1244 url = self.REPO_GET_ALLSPICE_PROJECT.format( 1245 owner=self.owner.username, 1246 repo=self.name, 1247 content=content, 1248 ) 1249 data = Util.data_params_for_ref(ref) 1250 return self.allspice_client.requests_get(url, data)
Get the json project data based on the cad file provided
WARNING: This is still experimental and not yet recommended for critical applications. The content of the returned dictionary can change at any time.
See allspice.allspice.io/api/swagger#/repository/repoGetAllSpiceProject">https://huballspice.allspice.io/api/swagger#/repository/repoGetAllSpiceProject
1252 def create_file(self, file_path: str, content: str, data: Optional[dict] = None): 1253 """https://hub.allspice.io/api/swagger#/repository/repoCreateFile""" 1254 if not data: 1255 data = {} 1256 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1257 data.update({"content": content}) 1258 return self.allspice_client.requests_post(url, data)
allspice.allspice.io/api/swagger#/repository/repoCreateFile">https://huballspice.allspice.io/api/swagger#/repository/repoCreateFile
1260 def change_file(self, file_path: str, file_sha: str, content: str, data: Optional[dict] = None): 1261 """https://hub.allspice.io/api/swagger#/repository/repoCreateFile""" 1262 if not data: 1263 data = {} 1264 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1265 data.update({"sha": file_sha, "content": content}) 1266 return self.allspice_client.requests_put(url, data)
allspice.allspice.io/api/swagger#/repository/repoCreateFile">https://huballspice.allspice.io/api/swagger#/repository/repoCreateFile
1268 def delete_file(self, file_path: str, file_sha: str, data: Optional[dict] = None): 1269 """https://hub.allspice.io/api/swagger#/repository/repoDeleteFile""" 1270 if not data: 1271 data = {} 1272 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1273 data.update({"sha": file_sha}) 1274 return self.allspice_client.requests_delete(url, data)
allspice.allspice.io/api/swagger#/repository/repoDeleteFile">https://huballspice.allspice.io/api/swagger#/repository/repoDeleteFile
1276 def get_archive( 1277 self, 1278 ref: Ref = "main", 1279 archive_format: ArchiveFormat = ArchiveFormat.ZIP, 1280 ) -> bytes: 1281 """ 1282 Download all the files in a specific ref of a repository as a zip or tarball 1283 archive. 1284 1285 https://hub.allspice.io/api/swagger#/repository/repoGetArchive 1286 1287 :param ref: branch or commit to get content from, defaults to the "main" branch 1288 :param archive_format: zip or tar, defaults to zip 1289 """ 1290 1291 ref_string = Util.data_params_for_ref(ref)["ref"] 1292 url = self.REPO_GET_ARCHIVE.format( 1293 owner=self.owner.username, 1294 repo=self.name, 1295 ref=ref_string, 1296 format=archive_format.value, 1297 ) 1298 return self.allspice_client.requests_get_raw(url)
Download all the files in a specific ref of a repository as a zip or tarball archive.
allspice.allspice.io/api/swagger#/repository/repoGetArchive">https://huballspice.allspice.io/api/swagger#/repository/repoGetArchive
Parameters
- ref: branch or commit to get content from, defaults to the "main" branch
- archive_format: zip or tar, defaults to zip
1300 def get_topics(self) -> list[str]: 1301 """ 1302 Gets the list of topics on this repository. 1303 1304 See http://localhost:3000/api/swagger#/repository/repoListTopics 1305 """ 1306 1307 url = self.REPO_GET_TOPICS.format( 1308 owner=self.owner.username, 1309 repo=self.name, 1310 ) 1311 return self.allspice_client.requests_get(url)["topics"]
Gets the list of topics on this repository.
See http://localhost:3000/api/swagger#/repository/repoListTopics
1313 def add_topic(self, topic: str): 1314 """ 1315 Adds a topic to the repository. 1316 1317 See https://hub.allspice.io/api/swagger#/repository/repoAddTopic 1318 1319 :param topic: The topic to add. Topic names must consist only of 1320 lowercase letters, numnbers and dashes (-), and cannot start with 1321 dashes. Topic names also must be under 35 characters long. 1322 """ 1323 1324 url = self.REPO_ADD_TOPIC.format(owner=self.owner.username, repo=self.name, topic=topic) 1325 self.allspice_client.requests_put(url)
Adds a topic to the repository.
See allspice.allspice.io/api/swagger#/repository/repoAddTopic">https://huballspice.allspice.io/api/swagger#/repository/repoAddTopic
Parameters
- topic: The topic to add. Topic names must consist only of lowercase letters, numnbers and dashes (-), and cannot start with dashes. Topic names also must be under 35 characters long.
1327 def create_release( 1328 self, 1329 tag_name: str, 1330 name: Optional[str] = None, 1331 body: Optional[str] = None, 1332 draft: bool = False, 1333 ): 1334 """ 1335 Create a release for this repository. The release will be created for 1336 the tag with the given name. If there is no tag with this name, create 1337 the tag first. 1338 1339 See https://hub.allspice.io/api/swagger#/repository/repoCreateRelease 1340 """ 1341 1342 url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name) 1343 data = { 1344 "tag_name": tag_name, 1345 "draft": draft, 1346 } 1347 if name is not None: 1348 data["name"] = name 1349 if body is not None: 1350 data["body"] = body 1351 response = self.allspice_client.requests_post(url, data) 1352 return Release.parse_response(self.allspice_client, response, self)
Create a release for this repository. The release will be created for the tag with the given name. If there is no tag with this name, create the tag first.
See allspice.allspice.io/api/swagger#/repository/repoCreateRelease">https://huballspice.allspice.io/api/swagger#/repository/repoCreateRelease
1354 def get_releases( 1355 self, draft: Optional[bool] = None, pre_release: Optional[bool] = None 1356 ) -> List[Release]: 1357 """ 1358 Get the list of releases for this repository. 1359 1360 See https://hub.allspice.io/api/swagger#/repository/repoListReleases 1361 """ 1362 1363 data = {} 1364 1365 if draft is not None: 1366 data["draft"] = draft 1367 if pre_release is not None: 1368 data["pre-release"] = pre_release 1369 1370 url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name) 1371 responses = self.allspice_client.requests_get_paginated(url, params=data) 1372 1373 return [ 1374 Release.parse_response(self.allspice_client, response, self) for response in responses 1375 ]
Get the list of releases for this repository.
See allspice.allspice.io/api/swagger#/repository/repoListReleases">https://huballspice.allspice.io/api/swagger#/repository/repoListReleases
1377 def get_latest_release(self) -> Release: 1378 """ 1379 Get the latest release for this repository. 1380 1381 See https://hub.allspice.io/api/swagger#/repository/repoGetLatestRelease 1382 """ 1383 1384 url = self.REPO_GET_LATEST_RELEASE.format(owner=self.owner.username, repo=self.name) 1385 response = self.allspice_client.requests_get(url) 1386 release = Release.parse_response(self.allspice_client, response, self) 1387 return release
Get the latest release for this repository.
See allspice.allspice.io/api/swagger#/repository/repoGetLatestRelease">https://huballspice.allspice.io/api/swagger#/repository/repoGetLatestRelease
1389 def get_release_by_tag(self, tag: str) -> Release: 1390 """ 1391 Get a release by its tag. 1392 1393 See https://hub.allspice.io/api/swagger#/repository/repoGetReleaseByTag 1394 """ 1395 1396 url = self.REPO_GET_RELEASE_BY_TAG.format( 1397 owner=self.owner.username, repo=self.name, tag=tag 1398 ) 1399 response = self.allspice_client.requests_get(url) 1400 release = Release.parse_response(self.allspice_client, response, self) 1401 return release
Get a release by its tag.
See allspice.allspice.io/api/swagger#/repository/repoGetReleaseByTag">https://huballspice.allspice.io/api/swagger#/repository/repoGetReleaseByTag
1403 def get_commit_statuses( 1404 self, 1405 commit: Union[str, Commit], 1406 sort: Optional[CommitStatusSort] = None, 1407 state: Optional[CommitStatusState] = None, 1408 ) -> List[CommitStatus]: 1409 """ 1410 Get a list of statuses for a commit. 1411 1412 This is roughly equivalent to the Commit.get_statuses method, but this 1413 method allows you to sort and filter commits and is more convenient if 1414 you have a commit SHA and don't need to get the commit itself. 1415 1416 See https://hub.allspice.io/api/swagger#/repository/repoListStatuses 1417 """ 1418 1419 if isinstance(commit, Commit): 1420 commit = commit.sha 1421 1422 params = {} 1423 if sort is not None: 1424 params["sort"] = sort.value 1425 if state is not None: 1426 params["state"] = state.value 1427 1428 url = self.REPO_GET_COMMIT_STATUS.format( 1429 owner=self.owner.username, repo=self.name, sha=commit 1430 ) 1431 response = self.allspice_client.requests_get_paginated(url, params=params) 1432 return [CommitStatus.parse_response(self.allspice_client, status) for status in response]
Get a list of statuses for a commit.
This is roughly equivalent to the Commit.get_statuses method, but this method allows you to sort and filter commits and is more convenient if you have a commit SHA and don't need to get the commit itself.
See allspice.allspice.io/api/swagger#/repository/repoListStatuses">https://huballspice.allspice.io/api/swagger#/repository/repoListStatuses
1434 def create_commit_status( 1435 self, 1436 commit: Union[str, Commit], 1437 context: Optional[str] = None, 1438 description: Optional[str] = None, 1439 state: Optional[CommitStatusState] = None, 1440 target_url: Optional[str] = None, 1441 ) -> CommitStatus: 1442 """ 1443 Create a status on a commit. 1444 1445 See https://hub.allspice.io/api/swagger#/repository/repoCreateStatus 1446 """ 1447 1448 if isinstance(commit, Commit): 1449 commit = commit.sha 1450 1451 data = {} 1452 if context is not None: 1453 data["context"] = context 1454 if description is not None: 1455 data["description"] = description 1456 if state is not None: 1457 data["state"] = state.value 1458 if target_url is not None: 1459 data["target_url"] = target_url 1460 1461 url = self.REPO_GET_COMMIT_STATUS.format( 1462 owner=self.owner.username, repo=self.name, sha=commit 1463 ) 1464 response = self.allspice_client.requests_post(url, data=data) 1465 return CommitStatus.parse_response(self.allspice_client, response)
Create a status on a commit.
See allspice.allspice.io/api/swagger#/repository/repoCreateStatus">https://huballspice.allspice.io/api/swagger#/repository/repoCreateStatus
568 class ArchiveFormat(Enum): 569 """ 570 Archive formats for Repository.get_archive 571 """ 572 573 TAR = "tar.gz" 574 ZIP = "zip"
Archive formats for Repository.get_archive
576 class CommitStatusSort(Enum): 577 """ 578 Sort order for Repository.get_commit_status 579 """ 580 581 OLDEST = "oldest" 582 RECENT_UPDATE = "recentupdate" 583 LEAST_UPDATE = "leastupdate" 584 LEAST_INDEX = "leastindex" 585 HIGHEST_INDEX = "highestindex"
Sort order for Repository.get_commit_status
2300class Team(ApiObject): 2301 can_create_org_repo: bool 2302 description: str 2303 id: int 2304 includes_all_repositories: bool 2305 name: str 2306 organization: Optional["Organization"] 2307 permission: str 2308 units: List[str] 2309 units_map: Dict[str, str] 2310 2311 API_OBJECT = """/teams/{id}""" # <id> 2312 ADD_REPO = """/teams/%s/repos/%s/%s""" # <id, org, repo> 2313 TEAM_DELETE = """/teams/%s""" # <id> 2314 GET_MEMBERS = """/teams/%s/members""" # <id> 2315 GET_REPOS = """/teams/%s/repos""" # <id> 2316 2317 def __init__(self, allspice_client): 2318 super().__init__(allspice_client) 2319 2320 def __eq__(self, other): 2321 if not isinstance(other, Team): 2322 return False 2323 return self.organization == other.organization and self.id == other.id 2324 2325 def __hash__(self): 2326 return hash(self.organization) ^ hash(self.id) 2327 2328 _fields_to_parsers: ClassVar[dict] = { 2329 "organization": lambda allspice_client, o: Organization.parse_response(allspice_client, o) 2330 } 2331 2332 _patchable_fields: ClassVar[set[str]] = { 2333 "can_create_org_repo", 2334 "description", 2335 "includes_all_repositories", 2336 "name", 2337 "permission", 2338 "units", 2339 "units_map", 2340 } 2341 2342 @classmethod 2343 def request(cls, allspice_client, id: int): 2344 return cls._request(allspice_client, {"id": id}) 2345 2346 def commit(self): 2347 args = {"id": self.id} 2348 self._commit(args) 2349 2350 def add_user(self, user: User): 2351 """https://hub.allspice.io/api/swagger#/organization/orgAddTeamMember""" 2352 url = f"/teams/{self.id}/members/{user.login}" 2353 self.allspice_client.requests_put(url) 2354 2355 def add_repo(self, org: Organization, repo: Union[Repository, str]): 2356 if isinstance(repo, Repository): 2357 repo_name = repo.name 2358 else: 2359 repo_name = repo 2360 self.allspice_client.requests_put(Team.ADD_REPO % (self.id, org.username, repo_name)) 2361 2362 def get_members(self): 2363 """Get all users assigned to the team.""" 2364 results = self.allspice_client.requests_get_paginated( 2365 Team.GET_MEMBERS % self.id, 2366 ) 2367 return [User.parse_response(self.allspice_client, result) for result in results] 2368 2369 def get_repos(self): 2370 """Get all repos of this Team.""" 2371 results = self.allspice_client.requests_get(Team.GET_REPOS % self.id) 2372 return [Repository.parse_response(self.allspice_client, result) for result in results] 2373 2374 def delete(self): 2375 self.allspice_client.requests_delete(Team.TEAM_DELETE % self.id) 2376 self.deleted = True 2377 2378 def remove_team_member(self, user_name: str): 2379 url = f"/teams/{self.id}/members/{user_name}" 2380 self.allspice_client.requests_delete(url)
2350 def add_user(self, user: User): 2351 """https://hub.allspice.io/api/swagger#/organization/orgAddTeamMember""" 2352 url = f"/teams/{self.id}/members/{user.login}" 2353 self.allspice_client.requests_put(url)
allspice.allspice.io/api/swagger#/organization/orgAddTeamMember">https://huballspice.allspice.io/api/swagger#/organization/orgAddTeamMember
2362 def get_members(self): 2363 """Get all users assigned to the team.""" 2364 results = self.allspice_client.requests_get_paginated( 2365 Team.GET_MEMBERS % self.id, 2366 ) 2367 return [User.parse_response(self.allspice_client, result) for result in results]
Get all users assigned to the team.
2369 def get_repos(self): 2370 """Get all repos of this Team.""" 2371 results = self.allspice_client.requests_get(Team.GET_REPOS % self.id) 2372 return [Repository.parse_response(self.allspice_client, result) for result in results]
Get all repos of this Team.
241class User(ApiObject): 242 active: bool 243 admin: Any 244 allow_create_organization: Any 245 allow_git_hook: Any 246 allow_import_local: Any 247 avatar_url: str 248 created: str 249 description: str 250 email: str 251 emails: List[Any] 252 followers_count: int 253 following_count: int 254 full_name: str 255 html_url: str 256 id: int 257 is_admin: bool 258 language: str 259 last_login: str 260 location: str 261 login: str 262 login_name: str 263 max_repo_creation: Any 264 must_change_password: Any 265 password: Any 266 prohibit_login: bool 267 restricted: bool 268 source_id: int 269 starred_repos_count: int 270 username: str 271 visibility: str 272 website: str 273 274 API_OBJECT = """/users/{name}""" # <org> 275 USER_MAIL = """/user/emails?sudo=%s""" # <name> 276 USER_PATCH = """/admin/users/%s""" # <username> 277 ADMIN_DELETE_USER = """/admin/users/%s""" # <username> 278 ADMIN_EDIT_USER = """/admin/users/{username}""" # <username> 279 USER_HEATMAP = """/users/%s/heatmap""" # <username> 280 281 def __init__(self, allspice_client): 282 super().__init__(allspice_client) 283 self._emails = [] 284 285 def __eq__(self, other): 286 if not isinstance(other, User): 287 return False 288 return self.allspice_client == other.allspice_client and self.id == other.id 289 290 def __hash__(self): 291 return hash(self.allspice_client) ^ hash(self.id) 292 293 @property 294 def emails(self): 295 self.__request_emails() 296 return self._emails 297 298 @classmethod 299 def request(cls, allspice_client, name: str) -> "User": 300 api_object = cls._request(allspice_client, {"name": name}) 301 return api_object 302 303 _patchable_fields: ClassVar[set[str]] = { 304 "active", 305 "admin", 306 "allow_create_organization", 307 "allow_git_hook", 308 "allow_import_local", 309 "email", 310 "full_name", 311 "location", 312 "login_name", 313 "max_repo_creation", 314 "must_change_password", 315 "password", 316 "prohibit_login", 317 "website", 318 } 319 320 def commit(self, login_name: str, source_id: int = 0): 321 """ 322 Unfortunately it is necessary to require the login name 323 as well as the login source (that is not supplied when getting a user) for 324 changing a user. 325 Usually source_id is 0 and the login_name is equal to the username. 326 """ 327 values = self.get_dirty_fields() 328 values.update( 329 # api-doc says that the "source_id" is necessary; works without though 330 {"login_name": login_name, "source_id": source_id} 331 ) 332 args = {"username": self.username} 333 self.allspice_client.requests_patch(User.ADMIN_EDIT_USER.format(**args), data=values) 334 self._dirty_fields = {} 335 336 def create_repo( 337 self, 338 repoName: str, 339 description: str = "", 340 private: bool = False, 341 autoInit=True, 342 gitignores: Optional[str] = None, 343 license: Optional[str] = None, 344 readme: str = "Default", 345 issue_labels: Optional[str] = None, 346 default_branch="master", 347 ): 348 """Create a user Repository 349 350 Throws: 351 AlreadyExistsException: If the Repository exists already. 352 Exception: If something else went wrong. 353 """ 354 result = self.allspice_client.requests_post( 355 "/user/repos", 356 data={ 357 "name": repoName, 358 "description": description, 359 "private": private, 360 "auto_init": autoInit, 361 "gitignores": gitignores, 362 "license": license, 363 "issue_labels": issue_labels, 364 "readme": readme, 365 "default_branch": default_branch, 366 }, 367 ) 368 if "id" in result: 369 self.allspice_client.logger.info("Successfully created Repository %s " % result["name"]) 370 else: 371 self.allspice_client.logger.error(result["message"]) 372 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 373 return Repository.parse_response(self.allspice_client, result) 374 375 def get_repositories(self) -> List["Repository"]: 376 """Get all Repositories owned by this User.""" 377 url = f"/users/{self.username}/repos" 378 results = self.allspice_client.requests_get_paginated(url) 379 return [Repository.parse_response(self.allspice_client, result) for result in results] 380 381 def get_orgs(self) -> List[Organization]: 382 """Get all Organizations this user is a member of.""" 383 url = f"/users/{self.username}/orgs" 384 results = self.allspice_client.requests_get_paginated(url) 385 return [Organization.parse_response(self.allspice_client, result) for result in results] 386 387 def get_teams(self) -> List["Team"]: 388 url = "/user/teams" 389 results = self.allspice_client.requests_get_paginated(url, sudo=self) 390 return [Team.parse_response(self.allspice_client, result) for result in results] 391 392 def get_accessible_repos(self) -> List["Repository"]: 393 """Get all Repositories accessible by the logged in User.""" 394 results = self.allspice_client.requests_get("/user/repos", sudo=self) 395 return [Repository.parse_response(self.allspice_client, result) for result in results] 396 397 def __request_emails(self): 398 result = self.allspice_client.requests_get(User.USER_MAIL % self.login) 399 # report if the adress changed by this 400 for mail in result: 401 self._emails.append(mail["email"]) 402 if mail["primary"]: 403 self._email = mail["email"] 404 405 def delete(self): 406 """Deletes this User. Also deletes all Repositories he owns.""" 407 self.allspice_client.requests_delete(User.ADMIN_DELETE_USER % self.username) 408 self.deleted = True 409 410 def get_heatmap(self) -> List[Tuple[datetime, int]]: 411 results = self.allspice_client.requests_get(User.USER_HEATMAP % self.username) 412 results = [ 413 (datetime.fromtimestamp(result["timestamp"]), result["contributions"]) 414 for result in results 415 ] 416 return results
320 def commit(self, login_name: str, source_id: int = 0): 321 """ 322 Unfortunately it is necessary to require the login name 323 as well as the login source (that is not supplied when getting a user) for 324 changing a user. 325 Usually source_id is 0 and the login_name is equal to the username. 326 """ 327 values = self.get_dirty_fields() 328 values.update( 329 # api-doc says that the "source_id" is necessary; works without though 330 {"login_name": login_name, "source_id": source_id} 331 ) 332 args = {"username": self.username} 333 self.allspice_client.requests_patch(User.ADMIN_EDIT_USER.format(**args), data=values) 334 self._dirty_fields = {}
Unfortunately it is necessary to require the login name as well as the login source (that is not supplied when getting a user) for changing a user. Usually source_id is 0 and the login_name is equal to the username.
336 def create_repo( 337 self, 338 repoName: str, 339 description: str = "", 340 private: bool = False, 341 autoInit=True, 342 gitignores: Optional[str] = None, 343 license: Optional[str] = None, 344 readme: str = "Default", 345 issue_labels: Optional[str] = None, 346 default_branch="master", 347 ): 348 """Create a user Repository 349 350 Throws: 351 AlreadyExistsException: If the Repository exists already. 352 Exception: If something else went wrong. 353 """ 354 result = self.allspice_client.requests_post( 355 "/user/repos", 356 data={ 357 "name": repoName, 358 "description": description, 359 "private": private, 360 "auto_init": autoInit, 361 "gitignores": gitignores, 362 "license": license, 363 "issue_labels": issue_labels, 364 "readme": readme, 365 "default_branch": default_branch, 366 }, 367 ) 368 if "id" in result: 369 self.allspice_client.logger.info("Successfully created Repository %s " % result["name"]) 370 else: 371 self.allspice_client.logger.error(result["message"]) 372 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 373 return Repository.parse_response(self.allspice_client, result)
Create a user Repository
Throws: AlreadyExistsException: If the Repository exists already. Exception: If something else went wrong.
375 def get_repositories(self) -> List["Repository"]: 376 """Get all Repositories owned by this User.""" 377 url = f"/users/{self.username}/repos" 378 results = self.allspice_client.requests_get_paginated(url) 379 return [Repository.parse_response(self.allspice_client, result) for result in results]
Get all Repositories owned by this User.
381 def get_orgs(self) -> List[Organization]: 382 """Get all Organizations this user is a member of.""" 383 url = f"/users/{self.username}/orgs" 384 results = self.allspice_client.requests_get_paginated(url) 385 return [Organization.parse_response(self.allspice_client, result) for result in results]
Get all Organizations this user is a member of.
392 def get_accessible_repos(self) -> List["Repository"]: 393 """Get all Repositories accessible by the logged in User.""" 394 results = self.allspice_client.requests_get("/user/repos", sudo=self) 395 return [Repository.parse_response(self.allspice_client, result) for result in results]
Get all Repositories accessible by the logged in User.
405 def delete(self): 406 """Deletes this User. Also deletes all Repositories he owns.""" 407 self.allspice_client.requests_delete(User.ADMIN_DELETE_USER % self.username) 408 self.deleted = True
Deletes this User. Also deletes all Repositories he owns.