allspice
A very simple API client for AllSpice Hub
Note that not the full Swagger-API is accessible. The whole implementation is focused on making access and working with Organizations, Teams, Repositories and Users as pain free as possible.
Forked from https://github.com/Langenfeld/py-gitea.
Usage
Docs
See the documentation site.
Examples
Check the examples directory for full, working example scripts that you can adapt or refer to for your own needs.
Quickstart
First get an allspice_client
object wrapping access and authentication (via an api token) for your instance of AllSpice Hub.
from allspice import *
# By default, points to hub.allspice.io.
allspice_client = AllSpice(token_text=TOKEN)
# If you are self-hosting:
allspice_client = AllSpice(allspice_hub_url=URL, token_text=TOKEN)
Operations like requesting the AllSpice version or authentication user can be requested directly from the allspice_client
object:
print("AllSpice Version: " + allspice_client.get_version())
print("API-Token belongs to user: " + allspice_client.get_user().username)
Adding entities like Users, Organizations, ... also is done via the allspice_client object.
user = allspice_client.create_user("Test Testson", "test@test.test", "password")
All operations on entities in allspice are then accomplished via the according wrapper objects for those entities.
Each of those objects has a .request
method that creates an entity according to your allspice_client instance.
other_user = User.request(allspice_client, "OtherUserName")
print(other_user.username)
Note that the fields of the User, Organization,... classes are dynamically created at runtime, and thus not visible during divelopment. Refer to the AllSpice API documentation for the fields names.
Fields that can not be altered via allspice-api, are read only. After altering a field, the .commit
method of the according object must be called to synchronize the changed fields with your allspice_client instance.
org = Organization.request(allspice_client, test_org)
org.description = "some new description"
org.location = "some new location"
org.commit()
An entity in allspice can be deleted by calling delete.
org.delete()
All entity objects do have methods to execute some of the requests possible though the AllSpice api:
org = Organization.request(allspice_client, ORGNAME)
teams = org.get_teams()
for team in teams:
repos = team.get_repos()
for repo in repos:
print(repo.name)
1""" 2.. include:: ../README.md 3 :start-line: 1 4 :end-before: Installation 5""" 6 7from .allspice import ( 8 AllSpice, 9) 10from .apiobject import ( 11 Branch, 12 Comment, 13 Commit, 14 Content, 15 DesignReview, 16 Issue, 17 Milestone, 18 Organization, 19 Release, 20 Repository, 21 Team, 22 User, 23) 24from .exceptions import AlreadyExistsException, NotFoundException 25 26__version__ = "3.8.0" 27 28__all__ = [ 29 "AllSpice", 30 "AlreadyExistsException", 31 "Branch", 32 "Comment", 33 "Commit", 34 "Content", 35 "DesignReview", 36 "Issue", 37 "Milestone", 38 "NotFoundException", 39 "Organization", 40 "Release", 41 "Repository", 42 "Team", 43 "User", 44]
21class AllSpice: 22 """Object to establish a session with AllSpice Hub.""" 23 24 ADMIN_CREATE_USER = """/admin/users""" 25 GET_USERS_ADMIN = """/admin/users""" 26 ADMIN_REPO_CREATE = """/admin/users/%s/repos""" # <ownername> 27 ALLSPICE_HUB_VERSION = """/version""" 28 GET_USER = """/user""" 29 GET_REPOSITORY = """/repos/{owner}/{name}""" 30 CREATE_ORG = """/admin/users/%s/orgs""" # <username> 31 CREATE_TEAM = """/orgs/%s/teams""" # <orgname> 32 33 def __init__( 34 self, 35 allspice_hub_url="https://hub.allspice.io", 36 token_text=None, 37 auth=None, 38 verify=True, 39 log_level="INFO", 40 ratelimiting=(100, 60), 41 ): 42 """Initializing an instance of the AllSpice Hub Client 43 44 Args: 45 allspice_hub_url (str): The URL for the AllSpice Hub instance. 46 Defaults to `https://hub.allspice.io`. 47 48 token_text (str, None): The access token, by default None. 49 50 auth (tuple, None): The user credentials 51 `(username, password)`, by default None. 52 53 verify (bool): If True, allow insecure server connections 54 when using SSL. 55 56 log_level (str): The log level, by default `INFO`. 57 58 ratelimiting (tuple[int, int], None): `(max_calls, period)`, 59 If None, no rate limiting is applied. By default, 100 calls 60 per minute are allowed. 61 """ 62 63 self.logger = logging.getLogger(__name__) 64 handler = logging.StreamHandler(sys.stderr) 65 handler.setFormatter( 66 logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") 67 ) 68 self.logger.addHandler(handler) 69 self.logger.setLevel(log_level) 70 self.headers = { 71 "Content-type": "application/json", 72 } 73 self.url = allspice_hub_url 74 75 if ratelimiting is None: 76 self.requests = requests.Session() 77 else: 78 (max_calls, period) = ratelimiting 79 self.requests = RateLimitedSession(max_calls=max_calls, period=period) 80 81 # Manage authentification 82 if not token_text and not auth: 83 raise ValueError("Please provide auth or token_text, but not both") 84 if token_text: 85 self.headers["Authorization"] = "token " + token_text 86 if auth: 87 self.logger.warning( 88 "Using basic auth is not recommended. Prefer using a token instead." 89 ) 90 self.requests.auth = auth 91 92 # Manage SSL certification verification 93 self.requests.verify = verify 94 if not verify: 95 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) 96 97 def __get_url(self, endpoint): 98 url = self.url + "/api/v1" + endpoint 99 self.logger.debug("Url: %s" % url) 100 return url 101 102 def __get(self, endpoint: str, params: Mapping = frozendict()) -> requests.Response: 103 request = self.requests.get(self.__get_url(endpoint), headers=self.headers, params=params) 104 if request.status_code not in [200, 201]: 105 message = f"Received status code: {request.status_code} ({request.url})" 106 if request.status_code in [404]: 107 raise NotFoundException(message) 108 if request.status_code in [403]: 109 raise Exception( 110 f"Unauthorized: {request.url} - Check your permissions and try again! ({message})" 111 ) 112 if request.status_code in [409]: 113 raise ConflictException(message) 114 if request.status_code in [503]: 115 raise NotYetGeneratedException(message) 116 raise Exception(message) 117 return request 118 119 @staticmethod 120 def parse_result(result) -> Dict: 121 """Parses the result-JSON to a dict.""" 122 if result.text and len(result.text) > 3: 123 return json.loads(result.text) 124 return {} 125 126 def requests_get(self, endpoint: str, params: Mapping = frozendict(), sudo=None): 127 combined_params = {} 128 combined_params.update(params) 129 if sudo: 130 combined_params["sudo"] = sudo.username 131 return self.parse_result(self.__get(endpoint, combined_params)) 132 133 def requests_get_raw(self, endpoint: str, params=frozendict(), sudo=None) -> bytes: 134 combined_params = {} 135 combined_params.update(params) 136 if sudo: 137 combined_params["sudo"] = sudo.username 138 return self.__get(endpoint, combined_params).content 139 140 def requests_get_paginated( 141 self, 142 endpoint: str, 143 params=frozendict(), 144 sudo=None, 145 page_key: str = "page", 146 first_page: int = 1, 147 ): 148 page = first_page 149 combined_params = {} 150 combined_params.update(params) 151 aggregated_result = [] 152 while True: 153 combined_params[page_key] = page 154 result = self.requests_get(endpoint, combined_params, sudo) 155 156 if not result: 157 return aggregated_result 158 159 if isinstance(result, dict): 160 if "data" in result: 161 data = result["data"] 162 if len(data) == 0: 163 return aggregated_result 164 aggregated_result.extend(data) 165 elif "tree" in result: 166 data = result["tree"] 167 if data is None or len(data) == 0: 168 return aggregated_result 169 aggregated_result.extend(data) 170 else: 171 raise NotImplementedError( 172 "requests_get_paginated does not know how to handle responses of this type." 173 ) 174 else: 175 aggregated_result.extend(result) 176 177 page += 1 178 179 def requests_put(self, endpoint: str, data: Optional[dict] = None): 180 if not data: 181 data = {} 182 request = self.requests.put( 183 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 184 ) 185 if request.status_code not in [200, 204]: 186 message = f"Received status code: {request.status_code} ({request.url}) {request.text}" 187 self.logger.error(message) 188 raise Exception(message) 189 190 def requests_delete(self, endpoint: str, data: Optional[dict] = None): 191 request = self.requests.delete( 192 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 193 ) 194 if request.status_code not in [200, 204]: 195 message = f"Received status code: {request.status_code} ({request.url})" 196 self.logger.error(message) 197 raise Exception(message) 198 199 def requests_post( 200 self, 201 endpoint: str, 202 data: Optional[dict] = None, 203 params: Optional[dict] = None, 204 files: Optional[dict] = None, 205 ): 206 """ 207 Make a POST call to the endpoint. 208 209 :param endpoint: The path to the endpoint 210 :param data: A dictionary for JSON data 211 :param params: A dictionary of query params 212 :param files: A dictionary of files, see requests.post. Using both files and data 213 can lead to unexpected results! 214 :return: The JSON response parsed as a dict 215 """ 216 217 # This should ideally be a TypedDict of the type of arguments taken by 218 # `requests.post`. 219 args: dict[str, Any] = { 220 "headers": self.headers.copy(), 221 } 222 if data is not None: 223 args["data"] = json.dumps(data) 224 if params is not None: 225 args["params"] = params 226 if files is not None: 227 args["headers"].pop("Content-type") 228 args["files"] = files 229 230 request = self.requests.post(self.__get_url(endpoint), **args) 231 232 if request.status_code not in [200, 201, 202]: 233 if "already exists" in request.text or "e-mail already in use" in request.text: 234 self.logger.warning(request.text) 235 raise AlreadyExistsException() 236 self.logger.error(f"Received status code: {request.status_code} ({request.url})") 237 self.logger.error(f"With info: {data} ({self.headers})") 238 self.logger.error(f"Answer: {request.text}") 239 raise Exception( 240 f"Received status code: {request.status_code} ({request.url}), {request.text}" 241 ) 242 return self.parse_result(request) 243 244 def requests_patch(self, endpoint: str, data: dict): 245 request = self.requests.patch( 246 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 247 ) 248 if request.status_code not in [200, 201]: 249 error_message = f"Received status code: {request.status_code} ({request.url}) {data}" 250 self.logger.error(error_message) 251 raise Exception(error_message) 252 return self.parse_result(request) 253 254 def get_orgs_public_members_all(self, orgname): 255 path = "/orgs/" + orgname + "/public_members" 256 return self.requests_get(path) 257 258 def get_orgs(self): 259 path = "/admin/orgs" 260 results = self.requests_get(path) 261 return [Organization.parse_response(self, result) for result in results] 262 263 def get_user(self): 264 result = self.requests_get(AllSpice.GET_USER) 265 return User.parse_response(self, result) 266 267 def get_version(self) -> str: 268 result = self.requests_get(AllSpice.ALLSPICE_HUB_VERSION) 269 return result["version"] 270 271 def get_users(self) -> List[User]: 272 results = self.requests_get(AllSpice.GET_USERS_ADMIN) 273 return [User.parse_response(self, result) for result in results] 274 275 def get_user_by_email(self, email: str) -> Optional[User]: 276 users = self.get_users() 277 for user in users: 278 if user.email == email or email in user.emails: 279 return user 280 return None 281 282 def get_user_by_name(self, username: str) -> Optional[User]: 283 users = self.get_users() 284 for user in users: 285 if user.username == username: 286 return user 287 return None 288 289 def get_repository(self, owner: str, name: str) -> Repository: 290 path = self.GET_REPOSITORY.format(owner=owner, name=name) 291 result = self.requests_get(path) 292 return Repository.parse_response(self, result) 293 294 def create_user( 295 self, 296 user_name: str, 297 email: str, 298 password: str, 299 full_name: Optional[str] = None, 300 login_name: Optional[str] = None, 301 change_pw=True, 302 send_notify=True, 303 source_id=0, 304 ): 305 """Create User. 306 Throws: 307 AlreadyExistsException, if the User exists already 308 Exception, if something else went wrong. 309 """ 310 if not login_name: 311 login_name = user_name 312 if not full_name: 313 full_name = user_name 314 request_data = { 315 "source_id": source_id, 316 "login_name": login_name, 317 "full_name": full_name, 318 "username": user_name, 319 "email": email, 320 "password": password, 321 "send_notify": send_notify, 322 "must_change_password": change_pw, 323 } 324 325 self.logger.debug("Gitea post payload: %s", request_data) 326 result = self.requests_post(AllSpice.ADMIN_CREATE_USER, data=request_data) 327 if "id" in result: 328 self.logger.info( 329 "Successfully created User %s <%s> (id %s)", 330 result["login"], 331 result["email"], 332 result["id"], 333 ) 334 self.logger.debug("Gitea response: %s", result) 335 else: 336 self.logger.error(result["message"]) 337 raise Exception("User not created... (gitea: %s)" % result["message"]) 338 user = User.parse_response(self, result) 339 return user 340 341 def create_repo( 342 self, 343 repoOwner: Union[User, Organization], 344 repoName: str, 345 description: str = "", 346 private: bool = False, 347 autoInit=True, 348 gitignores: Optional[str] = None, 349 license: Optional[str] = None, 350 readme: str = "Default", 351 issue_labels: Optional[str] = None, 352 default_branch="master", 353 ): 354 """Create a Repository as the administrator 355 356 Throws: 357 AlreadyExistsException: If the Repository exists already. 358 Exception: If something else went wrong. 359 360 Note: 361 Non-admin users can not use this method. Please use instead 362 `allspice.User.create_repo` or `allspice.Organization.create_repo`. 363 """ 364 # although this only says user in the api, this also works for 365 # organizations 366 assert isinstance(repoOwner, User) or isinstance(repoOwner, Organization) 367 result = self.requests_post( 368 AllSpice.ADMIN_REPO_CREATE % repoOwner.username, 369 data={ 370 "name": repoName, 371 "description": description, 372 "private": private, 373 "auto_init": autoInit, 374 "gitignores": gitignores, 375 "license": license, 376 "issue_labels": issue_labels, 377 "readme": readme, 378 "default_branch": default_branch, 379 }, 380 ) 381 if "id" in result: 382 self.logger.info("Successfully created Repository %s " % result["name"]) 383 else: 384 self.logger.error(result["message"]) 385 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 386 return Repository.parse_response(self, result) 387 388 def create_org( 389 self, 390 owner: User, 391 orgName: str, 392 description: str, 393 location="", 394 website="", 395 full_name="", 396 ): 397 assert isinstance(owner, User) 398 result = self.requests_post( 399 AllSpice.CREATE_ORG % owner.username, 400 data={ 401 "username": orgName, 402 "description": description, 403 "location": location, 404 "website": website, 405 "full_name": full_name, 406 }, 407 ) 408 if "id" in result: 409 self.logger.info("Successfully created Organization %s" % result["username"]) 410 else: 411 self.logger.error("Organization not created... (gitea: %s)" % result["message"]) 412 self.logger.error(result["message"]) 413 raise Exception("Organization not created... (gitea: %s)" % result["message"]) 414 return Organization.parse_response(self, result) 415 416 def create_team( 417 self, 418 org: Organization, 419 name: str, 420 description: str = "", 421 permission: str = "read", 422 can_create_org_repo: bool = False, 423 includes_all_repositories: bool = False, 424 units=( 425 "repo.code", 426 "repo.issues", 427 "repo.ext_issues", 428 "repo.wiki", 429 "repo.pulls", 430 "repo.releases", 431 "repo.ext_wiki", 432 ), 433 units_map={}, 434 ): 435 """Creates a Team. 436 437 Args: 438 org (Organization): Organization the Team will be part of. 439 name (str): The Name of the Team to be created. 440 description (str): Optional, None, short description of the new Team. 441 permission (str): Optional, 'read', What permissions the members 442 units_map (dict): Optional, {}, a mapping of units to their 443 permissions. If None or empty, the `permission` permission will 444 be applied to all units. Note: When both `units` and `units_map` 445 are given, `units_map` will be preferred. 446 """ 447 448 result = self.requests_post( 449 AllSpice.CREATE_TEAM % org.username, 450 data={ 451 "name": name, 452 "description": description, 453 "permission": permission, 454 "can_create_org_repo": can_create_org_repo, 455 "includes_all_repositories": includes_all_repositories, 456 "units": units, 457 "units_map": units_map, 458 }, 459 ) 460 461 if "id" in result: 462 self.logger.info("Successfully created Team %s" % result["name"]) 463 else: 464 self.logger.error("Team not created... (gitea: %s)" % result["message"]) 465 self.logger.error(result["message"]) 466 raise Exception("Team not created... (gitea: %s)" % result["message"]) 467 api_object = Team.parse_response(self, result) 468 setattr( 469 api_object, "_organization", org 470 ) # fixes strange behaviour of gitea not returning a valid organization here. 471 return api_object
Object to establish a session with AllSpice Hub.
33 def __init__( 34 self, 35 allspice_hub_url="https://hub.allspice.io", 36 token_text=None, 37 auth=None, 38 verify=True, 39 log_level="INFO", 40 ratelimiting=(100, 60), 41 ): 42 """Initializing an instance of the AllSpice Hub Client 43 44 Args: 45 allspice_hub_url (str): The URL for the AllSpice Hub instance. 46 Defaults to `https://hub.allspice.io`. 47 48 token_text (str, None): The access token, by default None. 49 50 auth (tuple, None): The user credentials 51 `(username, password)`, by default None. 52 53 verify (bool): If True, allow insecure server connections 54 when using SSL. 55 56 log_level (str): The log level, by default `INFO`. 57 58 ratelimiting (tuple[int, int], None): `(max_calls, period)`, 59 If None, no rate limiting is applied. By default, 100 calls 60 per minute are allowed. 61 """ 62 63 self.logger = logging.getLogger(__name__) 64 handler = logging.StreamHandler(sys.stderr) 65 handler.setFormatter( 66 logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") 67 ) 68 self.logger.addHandler(handler) 69 self.logger.setLevel(log_level) 70 self.headers = { 71 "Content-type": "application/json", 72 } 73 self.url = allspice_hub_url 74 75 if ratelimiting is None: 76 self.requests = requests.Session() 77 else: 78 (max_calls, period) = ratelimiting 79 self.requests = RateLimitedSession(max_calls=max_calls, period=period) 80 81 # Manage authentification 82 if not token_text and not auth: 83 raise ValueError("Please provide auth or token_text, but not both") 84 if token_text: 85 self.headers["Authorization"] = "token " + token_text 86 if auth: 87 self.logger.warning( 88 "Using basic auth is not recommended. Prefer using a token instead." 89 ) 90 self.requests.auth = auth 91 92 # Manage SSL certification verification 93 self.requests.verify = verify 94 if not verify: 95 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
Initializing an instance of the AllSpice Hub Client
Args:
allspice_hub_url (str): The URL for the AllSpice Hub instance.
Defaults to https://hub.allspice.io
.
token_text (str, None): The access token, by default None.
auth (tuple, None): The user credentials
`(username, password)`, by default None.
verify (bool): If True, allow insecure server connections
when using SSL.
log_level (str): The log level, by default `INFO`.
ratelimiting (tuple[int, int], None): `(max_calls, period)`,
If None, no rate limiting is applied. By default, 100 calls
per minute are allowed.
119 @staticmethod 120 def parse_result(result) -> Dict: 121 """Parses the result-JSON to a dict.""" 122 if result.text and len(result.text) > 3: 123 return json.loads(result.text) 124 return {}
Parses the result-JSON to a dict.
140 def requests_get_paginated( 141 self, 142 endpoint: str, 143 params=frozendict(), 144 sudo=None, 145 page_key: str = "page", 146 first_page: int = 1, 147 ): 148 page = first_page 149 combined_params = {} 150 combined_params.update(params) 151 aggregated_result = [] 152 while True: 153 combined_params[page_key] = page 154 result = self.requests_get(endpoint, combined_params, sudo) 155 156 if not result: 157 return aggregated_result 158 159 if isinstance(result, dict): 160 if "data" in result: 161 data = result["data"] 162 if len(data) == 0: 163 return aggregated_result 164 aggregated_result.extend(data) 165 elif "tree" in result: 166 data = result["tree"] 167 if data is None or len(data) == 0: 168 return aggregated_result 169 aggregated_result.extend(data) 170 else: 171 raise NotImplementedError( 172 "requests_get_paginated does not know how to handle responses of this type." 173 ) 174 else: 175 aggregated_result.extend(result) 176 177 page += 1
179 def requests_put(self, endpoint: str, data: Optional[dict] = None): 180 if not data: 181 data = {} 182 request = self.requests.put( 183 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 184 ) 185 if request.status_code not in [200, 204]: 186 message = f"Received status code: {request.status_code} ({request.url}) {request.text}" 187 self.logger.error(message) 188 raise Exception(message)
190 def requests_delete(self, endpoint: str, data: Optional[dict] = None): 191 request = self.requests.delete( 192 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 193 ) 194 if request.status_code not in [200, 204]: 195 message = f"Received status code: {request.status_code} ({request.url})" 196 self.logger.error(message) 197 raise Exception(message)
199 def requests_post( 200 self, 201 endpoint: str, 202 data: Optional[dict] = None, 203 params: Optional[dict] = None, 204 files: Optional[dict] = None, 205 ): 206 """ 207 Make a POST call to the endpoint. 208 209 :param endpoint: The path to the endpoint 210 :param data: A dictionary for JSON data 211 :param params: A dictionary of query params 212 :param files: A dictionary of files, see requests.post. Using both files and data 213 can lead to unexpected results! 214 :return: The JSON response parsed as a dict 215 """ 216 217 # This should ideally be a TypedDict of the type of arguments taken by 218 # `requests.post`. 219 args: dict[str, Any] = { 220 "headers": self.headers.copy(), 221 } 222 if data is not None: 223 args["data"] = json.dumps(data) 224 if params is not None: 225 args["params"] = params 226 if files is not None: 227 args["headers"].pop("Content-type") 228 args["files"] = files 229 230 request = self.requests.post(self.__get_url(endpoint), **args) 231 232 if request.status_code not in [200, 201, 202]: 233 if "already exists" in request.text or "e-mail already in use" in request.text: 234 self.logger.warning(request.text) 235 raise AlreadyExistsException() 236 self.logger.error(f"Received status code: {request.status_code} ({request.url})") 237 self.logger.error(f"With info: {data} ({self.headers})") 238 self.logger.error(f"Answer: {request.text}") 239 raise Exception( 240 f"Received status code: {request.status_code} ({request.url}), {request.text}" 241 ) 242 return self.parse_result(request)
Make a POST call to the endpoint.
Parameters
- endpoint: The path to the endpoint
- data: A dictionary for JSON data
- params: A dictionary of query params
- files: A dictionary of files, see requests.post. Using both files and data can lead to unexpected results!
Returns
The JSON response parsed as a dict
244 def requests_patch(self, endpoint: str, data: dict): 245 request = self.requests.patch( 246 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 247 ) 248 if request.status_code not in [200, 201]: 249 error_message = f"Received status code: {request.status_code} ({request.url}) {data}" 250 self.logger.error(error_message) 251 raise Exception(error_message) 252 return self.parse_result(request)
294 def create_user( 295 self, 296 user_name: str, 297 email: str, 298 password: str, 299 full_name: Optional[str] = None, 300 login_name: Optional[str] = None, 301 change_pw=True, 302 send_notify=True, 303 source_id=0, 304 ): 305 """Create User. 306 Throws: 307 AlreadyExistsException, if the User exists already 308 Exception, if something else went wrong. 309 """ 310 if not login_name: 311 login_name = user_name 312 if not full_name: 313 full_name = user_name 314 request_data = { 315 "source_id": source_id, 316 "login_name": login_name, 317 "full_name": full_name, 318 "username": user_name, 319 "email": email, 320 "password": password, 321 "send_notify": send_notify, 322 "must_change_password": change_pw, 323 } 324 325 self.logger.debug("Gitea post payload: %s", request_data) 326 result = self.requests_post(AllSpice.ADMIN_CREATE_USER, data=request_data) 327 if "id" in result: 328 self.logger.info( 329 "Successfully created User %s <%s> (id %s)", 330 result["login"], 331 result["email"], 332 result["id"], 333 ) 334 self.logger.debug("Gitea response: %s", result) 335 else: 336 self.logger.error(result["message"]) 337 raise Exception("User not created... (gitea: %s)" % result["message"]) 338 user = User.parse_response(self, result) 339 return user
Create User. Throws: AlreadyExistsException, if the User exists already Exception, if something else went wrong.
341 def create_repo( 342 self, 343 repoOwner: Union[User, Organization], 344 repoName: str, 345 description: str = "", 346 private: bool = False, 347 autoInit=True, 348 gitignores: Optional[str] = None, 349 license: Optional[str] = None, 350 readme: str = "Default", 351 issue_labels: Optional[str] = None, 352 default_branch="master", 353 ): 354 """Create a Repository as the administrator 355 356 Throws: 357 AlreadyExistsException: If the Repository exists already. 358 Exception: If something else went wrong. 359 360 Note: 361 Non-admin users can not use this method. Please use instead 362 `allspice.User.create_repo` or `allspice.Organization.create_repo`. 363 """ 364 # although this only says user in the api, this also works for 365 # organizations 366 assert isinstance(repoOwner, User) or isinstance(repoOwner, Organization) 367 result = self.requests_post( 368 AllSpice.ADMIN_REPO_CREATE % repoOwner.username, 369 data={ 370 "name": repoName, 371 "description": description, 372 "private": private, 373 "auto_init": autoInit, 374 "gitignores": gitignores, 375 "license": license, 376 "issue_labels": issue_labels, 377 "readme": readme, 378 "default_branch": default_branch, 379 }, 380 ) 381 if "id" in result: 382 self.logger.info("Successfully created Repository %s " % result["name"]) 383 else: 384 self.logger.error(result["message"]) 385 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 386 return Repository.parse_response(self, result)
Create a Repository as the administrator
Throws: AlreadyExistsException: If the Repository exists already. Exception: If something else went wrong.
Note:
Non-admin users can not use this method. Please use instead
allspice.User.create_repo
or allspice.Organization.create_repo
.
388 def create_org( 389 self, 390 owner: User, 391 orgName: str, 392 description: str, 393 location="", 394 website="", 395 full_name="", 396 ): 397 assert isinstance(owner, User) 398 result = self.requests_post( 399 AllSpice.CREATE_ORG % owner.username, 400 data={ 401 "username": orgName, 402 "description": description, 403 "location": location, 404 "website": website, 405 "full_name": full_name, 406 }, 407 ) 408 if "id" in result: 409 self.logger.info("Successfully created Organization %s" % result["username"]) 410 else: 411 self.logger.error("Organization not created... (gitea: %s)" % result["message"]) 412 self.logger.error(result["message"]) 413 raise Exception("Organization not created... (gitea: %s)" % result["message"]) 414 return Organization.parse_response(self, result)
416 def create_team( 417 self, 418 org: Organization, 419 name: str, 420 description: str = "", 421 permission: str = "read", 422 can_create_org_repo: bool = False, 423 includes_all_repositories: bool = False, 424 units=( 425 "repo.code", 426 "repo.issues", 427 "repo.ext_issues", 428 "repo.wiki", 429 "repo.pulls", 430 "repo.releases", 431 "repo.ext_wiki", 432 ), 433 units_map={}, 434 ): 435 """Creates a Team. 436 437 Args: 438 org (Organization): Organization the Team will be part of. 439 name (str): The Name of the Team to be created. 440 description (str): Optional, None, short description of the new Team. 441 permission (str): Optional, 'read', What permissions the members 442 units_map (dict): Optional, {}, a mapping of units to their 443 permissions. If None or empty, the `permission` permission will 444 be applied to all units. Note: When both `units` and `units_map` 445 are given, `units_map` will be preferred. 446 """ 447 448 result = self.requests_post( 449 AllSpice.CREATE_TEAM % org.username, 450 data={ 451 "name": name, 452 "description": description, 453 "permission": permission, 454 "can_create_org_repo": can_create_org_repo, 455 "includes_all_repositories": includes_all_repositories, 456 "units": units, 457 "units_map": units_map, 458 }, 459 ) 460 461 if "id" in result: 462 self.logger.info("Successfully created Team %s" % result["name"]) 463 else: 464 self.logger.error("Team not created... (gitea: %s)" % result["message"]) 465 self.logger.error(result["message"]) 466 raise Exception("Team not created... (gitea: %s)" % result["message"]) 467 api_object = Team.parse_response(self, result) 468 setattr( 469 api_object, "_organization", org 470 ) # fixes strange behaviour of gitea not returning a valid organization here. 471 return api_object
Creates a Team.
Args:
org (Organization): Organization the Team will be part of.
name (str): The Name of the Team to be created.
description (str): Optional, None, short description of the new Team.
permission (str): Optional, 'read', What permissions the members
units_map (dict): Optional, {}, a mapping of units to their
permissions. If None or empty, the permission
permission will
be applied to all units. Note: When both units
and units_map
are given, units_map
will be preferred.
Common base class for all non-exit exceptions.
419class Branch(ReadonlyApiObject): 420 commit: Dict[str, Optional[Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]]] 421 effective_branch_protection_name: str 422 enable_status_check: bool 423 name: str 424 protected: bool 425 required_approvals: int 426 status_check_contexts: List[Any] 427 user_can_merge: bool 428 user_can_push: bool 429 430 API_OBJECT = """/repos/{owner}/{repo}/branches/{branch}""" 431 432 def __init__(self, allspice_client): 433 super().__init__(allspice_client) 434 435 def __eq__(self, other): 436 if not isinstance(other, Branch): 437 return False 438 return self.commit == other.commit and self.name == other.name 439 440 def __hash__(self): 441 return hash(self.commit["id"]) ^ hash(self.name) 442 443 _fields_to_parsers: ClassVar[dict] = { 444 # This is not a commit object 445 # "commit": lambda allspice_client, c: Commit.parse_response(allspice_client, c) 446 } 447 448 @classmethod 449 def request(cls, allspice_client, owner: str, repo: str, branch: str): 450 return cls._request(allspice_client, {"owner": owner, "repo": repo, "branch": branch})
Inherited Members
1566class Comment(ApiObject): 1567 assets: List[Union[Any, Dict[str, Union[int, str]]]] 1568 body: str 1569 created_at: datetime 1570 html_url: str 1571 id: int 1572 issue_url: str 1573 original_author: str 1574 original_author_id: int 1575 pull_request_url: str 1576 updated_at: datetime 1577 user: User 1578 1579 API_OBJECT = """/repos/{owner}/{repo}/issues/comments/{id}""" 1580 GET_ATTACHMENTS_PATH = """/repos/{owner}/{repo}/issues/comments/{id}/assets""" 1581 ATTACHMENT_PATH = """/repos/{owner}/{repo}/issues/comments/{id}/assets/{attachment_id}""" 1582 1583 def __init__(self, allspice_client): 1584 super().__init__(allspice_client) 1585 1586 def __eq__(self, other): 1587 if not isinstance(other, Comment): 1588 return False 1589 return self.repository == other.repository and self.id == other.id 1590 1591 def __hash__(self): 1592 return hash(self.repository) ^ hash(self.id) 1593 1594 @classmethod 1595 def request(cls, allspice_client, owner: str, repo: str, id: str) -> "Comment": 1596 return cls._request(allspice_client, {"owner": owner, "repo": repo, "id": id}) 1597 1598 _fields_to_parsers: ClassVar[dict] = { 1599 "user": lambda allspice_client, r: User.parse_response(allspice_client, r), 1600 "created_at": lambda _, t: Util.convert_time(t), 1601 "updated_at": lambda _, t: Util.convert_time(t), 1602 } 1603 1604 _patchable_fields: ClassVar[set[str]] = {"body"} 1605 1606 @property 1607 def parent_url(self) -> str: 1608 """URL of the parent of this comment (the issue or the pull request)""" 1609 1610 if self.issue_url is not None and self.issue_url != "": 1611 return self.issue_url 1612 else: 1613 return self.pull_request_url 1614 1615 @cached_property 1616 def repository(self) -> Repository: 1617 """The repository this comment was posted on.""" 1618 1619 owner_name, repo_name = self.parent_url.split("/")[-4:-2] 1620 return Repository.request(self.allspice_client, owner_name, repo_name) 1621 1622 def __fields_for_path(self): 1623 return { 1624 "owner": self.repository.owner.username, 1625 "repo": self.repository.name, 1626 "id": self.id, 1627 } 1628 1629 def commit(self): 1630 self._commit(self.__fields_for_path()) 1631 1632 def delete(self): 1633 self.allspice_client.requests_delete(self.API_OBJECT.format(**self.__fields_for_path())) 1634 self.deleted = True 1635 1636 def get_attachments(self) -> List[Attachment]: 1637 """ 1638 Get all attachments on this comment. This returns Attachment objects, which 1639 contain a link to download the attachment. 1640 1641 https://hub.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments 1642 """ 1643 1644 results = self.allspice_client.requests_get( 1645 self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()) 1646 ) 1647 return [Attachment.parse_response(self.allspice_client, result) for result in results] 1648 1649 def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment: 1650 """ 1651 Create an attachment on this comment. 1652 1653 https://hub.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment 1654 1655 :param file: The file to attach. This should be a file-like object. 1656 :param name: The name of the file. If not provided, the name of the file will be 1657 used. 1658 :return: The created attachment. 1659 """ 1660 1661 args: dict[str, Any] = { 1662 "files": {"attachment": file}, 1663 } 1664 if name is not None: 1665 args["params"] = {"name": name} 1666 1667 result = self.allspice_client.requests_post( 1668 self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()), 1669 **args, 1670 ) 1671 return Attachment.parse_response(self.allspice_client, result) 1672 1673 def edit_attachment(self, attachment: Attachment, data: dict) -> Attachment: 1674 """ 1675 Edit an attachment. 1676 1677 The list of params that can be edited is available at 1678 https://hub.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment 1679 1680 :param attachment: The attachment to be edited 1681 :param data: The data parameter should be a dictionary of the fields to edit. 1682 :return: The edited attachment 1683 """ 1684 1685 args = { 1686 **self.__fields_for_path(), 1687 "attachment_id": attachment.id, 1688 } 1689 result = self.allspice_client.requests_patch( 1690 self.ATTACHMENT_PATH.format(**args), 1691 data=data, 1692 ) 1693 return Attachment.parse_response(self.allspice_client, result) 1694 1695 def delete_attachment(self, attachment: Attachment): 1696 """https://hub.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment""" 1697 1698 args = { 1699 **self.__fields_for_path(), 1700 "attachment_id": attachment.id, 1701 } 1702 self.allspice_client.requests_delete(self.ATTACHMENT_PATH.format(**args)) 1703 attachment.deleted = True
1606 @property 1607 def parent_url(self) -> str: 1608 """URL of the parent of this comment (the issue or the pull request)""" 1609 1610 if self.issue_url is not None and self.issue_url != "": 1611 return self.issue_url 1612 else: 1613 return self.pull_request_url
URL of the parent of this comment (the issue or the pull request)
1615 @cached_property 1616 def repository(self) -> Repository: 1617 """The repository this comment was posted on.""" 1618 1619 owner_name, repo_name = self.parent_url.split("/")[-4:-2] 1620 return Repository.request(self.allspice_client, owner_name, repo_name)
The repository this comment was posted on.
1636 def get_attachments(self) -> List[Attachment]: 1637 """ 1638 Get all attachments on this comment. This returns Attachment objects, which 1639 contain a link to download the attachment. 1640 1641 https://hub.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments 1642 """ 1643 1644 results = self.allspice_client.requests_get( 1645 self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()) 1646 ) 1647 return [Attachment.parse_response(self.allspice_client, result) for result in results]
Get all attachments on this comment. This returns Attachment objects, which contain a link to download the attachment.
allspice.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments">https://huballspice.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments
1649 def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment: 1650 """ 1651 Create an attachment on this comment. 1652 1653 https://hub.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment 1654 1655 :param file: The file to attach. This should be a file-like object. 1656 :param name: The name of the file. If not provided, the name of the file will be 1657 used. 1658 :return: The created attachment. 1659 """ 1660 1661 args: dict[str, Any] = { 1662 "files": {"attachment": file}, 1663 } 1664 if name is not None: 1665 args["params"] = {"name": name} 1666 1667 result = self.allspice_client.requests_post( 1668 self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()), 1669 **args, 1670 ) 1671 return Attachment.parse_response(self.allspice_client, result)
Create an attachment on this comment.
allspice.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment
Parameters
- file: The file to attach. This should be a file-like object.
- name: The name of the file. If not provided, the name of the file will be used.
Returns
The created attachment.
1673 def edit_attachment(self, attachment: Attachment, data: dict) -> Attachment: 1674 """ 1675 Edit an attachment. 1676 1677 The list of params that can be edited is available at 1678 https://hub.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment 1679 1680 :param attachment: The attachment to be edited 1681 :param data: The data parameter should be a dictionary of the fields to edit. 1682 :return: The edited attachment 1683 """ 1684 1685 args = { 1686 **self.__fields_for_path(), 1687 "attachment_id": attachment.id, 1688 } 1689 result = self.allspice_client.requests_patch( 1690 self.ATTACHMENT_PATH.format(**args), 1691 data=data, 1692 ) 1693 return Attachment.parse_response(self.allspice_client, result)
Edit an attachment.
The list of params that can be edited is available at allspice.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment
Parameters
- attachment: The attachment to be edited
- data: The data parameter should be a dictionary of the fields to edit.
Returns
The edited attachment
1695 def delete_attachment(self, attachment: Attachment): 1696 """https://hub.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment""" 1697 1698 args = { 1699 **self.__fields_for_path(), 1700 "attachment_id": attachment.id, 1701 } 1702 self.allspice_client.requests_delete(self.ATTACHMENT_PATH.format(**args)) 1703 attachment.deleted = True
allspice.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment
1706class Commit(ReadonlyApiObject): 1707 author: User 1708 commit: Dict[str, Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]] 1709 committer: Dict[str, Union[int, str, bool]] 1710 created: str 1711 files: List[Dict[str, str]] 1712 html_url: str 1713 inner_commit: Dict[str, Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]] 1714 parents: List[Union[Dict[str, str], Any]] 1715 sha: str 1716 stats: Dict[str, int] 1717 url: str 1718 1719 API_OBJECT = """/repos/{owner}/{repo}/commits/{sha}""" 1720 COMMIT_GET_STATUS = """/repos/{owner}/{repo}/commits/{sha}/status""" 1721 COMMIT_GET_STATUSES = """/repos/{owner}/{repo}/commits/{sha}/statuses""" 1722 1723 # Regex to extract owner and repo names from the url property 1724 URL_REGEXP = re.compile(r"/repos/([^/]+)/([^/]+)/git/commits") 1725 1726 def __init__(self, allspice_client): 1727 super().__init__(allspice_client) 1728 1729 _fields_to_parsers: ClassVar[dict] = { 1730 # NOTE: api may return None for commiters that are no allspice users 1731 "author": lambda allspice_client, u: ( 1732 User.parse_response(allspice_client, u) if u else None 1733 ) 1734 } 1735 1736 def __eq__(self, other): 1737 if not isinstance(other, Commit): 1738 return False 1739 return self.sha == other.sha 1740 1741 def __hash__(self): 1742 return hash(self.sha) 1743 1744 @classmethod 1745 def parse_response(cls, allspice_client, result) -> "Commit": 1746 commit_cache = result["commit"] 1747 api_object = cls(allspice_client) 1748 cls._initialize(allspice_client, api_object, result) 1749 # inner_commit for legacy reasons 1750 Commit._add_read_property("inner_commit", commit_cache, api_object) 1751 return api_object 1752 1753 def get_status(self) -> CommitCombinedStatus: 1754 """ 1755 Get a combined status consisting of all statues on this commit. 1756 1757 Note that the returned object is a CommitCombinedStatus object, which 1758 also contains a list of all statuses on the commit. 1759 1760 https://hub.allspice.io/api/swagger#/repository/repoGetCommitStatus 1761 """ 1762 1763 result = self.allspice_client.requests_get( 1764 self.COMMIT_GET_STATUS.format(**self._fields_for_path) 1765 ) 1766 return CommitCombinedStatus.parse_response(self.allspice_client, result) 1767 1768 def get_statuses(self) -> List[CommitStatus]: 1769 """ 1770 Get a list of all statuses on this commit. 1771 1772 https://hub.allspice.io/api/swagger#/repository/repoListCommitStatuses 1773 """ 1774 1775 results = self.allspice_client.requests_get( 1776 self.COMMIT_GET_STATUSES.format(**self._fields_for_path) 1777 ) 1778 return [CommitStatus.parse_response(self.allspice_client, result) for result in results] 1779 1780 @cached_property 1781 def _fields_for_path(self) -> dict[str, str]: 1782 matches = self.URL_REGEXP.search(self.url) 1783 if not matches: 1784 raise ValueError(f"Invalid commit URL: {self.url}") 1785 1786 return { 1787 "owner": matches.group(1), 1788 "repo": matches.group(2), 1789 "sha": self.sha, 1790 }
1744 @classmethod 1745 def parse_response(cls, allspice_client, result) -> "Commit": 1746 commit_cache = result["commit"] 1747 api_object = cls(allspice_client) 1748 cls._initialize(allspice_client, api_object, result) 1749 # inner_commit for legacy reasons 1750 Commit._add_read_property("inner_commit", commit_cache, api_object) 1751 return api_object
1753 def get_status(self) -> CommitCombinedStatus: 1754 """ 1755 Get a combined status consisting of all statues on this commit. 1756 1757 Note that the returned object is a CommitCombinedStatus object, which 1758 also contains a list of all statuses on the commit. 1759 1760 https://hub.allspice.io/api/swagger#/repository/repoGetCommitStatus 1761 """ 1762 1763 result = self.allspice_client.requests_get( 1764 self.COMMIT_GET_STATUS.format(**self._fields_for_path) 1765 ) 1766 return CommitCombinedStatus.parse_response(self.allspice_client, result)
Get a combined status consisting of all statues on this commit.
Note that the returned object is a CommitCombinedStatus object, which also contains a list of all statuses on the commit.
allspice.allspice.io/api/swagger#/repository/repoGetCommitStatus">https://huballspice.allspice.io/api/swagger#/repository/repoGetCommitStatus
1768 def get_statuses(self) -> List[CommitStatus]: 1769 """ 1770 Get a list of all statuses on this commit. 1771 1772 https://hub.allspice.io/api/swagger#/repository/repoListCommitStatuses 1773 """ 1774 1775 results = self.allspice_client.requests_get( 1776 self.COMMIT_GET_STATUSES.format(**self._fields_for_path) 1777 ) 1778 return [CommitStatus.parse_response(self.allspice_client, result) for result in results]
Get a list of all statuses on this commit.
allspice.allspice.io/api/swagger#/repository/repoListCommitStatuses">https://huballspice.allspice.io/api/swagger#/repository/repoListCommitStatuses
Inherited Members
2600class Content(ReadonlyApiObject): 2601 content: Any 2602 download_url: str 2603 encoding: Any 2604 git_url: str 2605 html_url: str 2606 last_commit_sha: str 2607 name: str 2608 path: str 2609 sha: str 2610 size: int 2611 submodule_git_url: Any 2612 target: Any 2613 type: str 2614 url: str 2615 2616 FILE = "file" 2617 2618 def __init__(self, allspice_client): 2619 super().__init__(allspice_client) 2620 2621 def __eq__(self, other): 2622 if not isinstance(other, Content): 2623 return False 2624 2625 return self.sha == other.sha and self.name == other.name 2626 2627 def __hash__(self): 2628 return hash(self.sha) ^ hash(self.name)
Inherited Members
2084class DesignReview(ApiObject): 2085 """ 2086 A Design Review. See 2087 https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest. 2088 2089 Note: The base and head fields are not `Branch` objects - they are plain strings 2090 referring to the branch names. This is because DRs can exist for branches that have 2091 been deleted, which don't have an associated `Branch` object from the API. You can use 2092 the `Repository.get_branch` method to get a `Branch` object for a branch if you know 2093 it exists. 2094 """ 2095 2096 additions: int 2097 allow_maintainer_edit: bool 2098 allow_maintainer_edits: Any 2099 assignee: User 2100 assignees: List["User"] 2101 base: str 2102 body: str 2103 changed_files: int 2104 closed_at: Optional[str] 2105 comments: int 2106 created_at: str 2107 deletions: int 2108 diff_url: str 2109 draft: bool 2110 due_date: Optional[str] 2111 head: str 2112 html_url: str 2113 id: int 2114 is_locked: bool 2115 labels: List[Any] 2116 merge_base: str 2117 merge_commit_sha: Optional[str] 2118 mergeable: bool 2119 merged: bool 2120 merged_at: Optional[str] 2121 merged_by: Optional["User"] 2122 milestone: Any 2123 number: int 2124 patch_url: str 2125 pin_order: int 2126 repository: Optional["Repository"] 2127 requested_reviewers: Any 2128 review_comments: int 2129 state: str 2130 title: str 2131 updated_at: str 2132 url: str 2133 user: User 2134 2135 API_OBJECT = "/repos/{owner}/{repo}/pulls/{index}" 2136 MERGE_DESIGN_REVIEW = "/repos/{owner}/{repo}/pulls/{index}/merge" 2137 GET_COMMENTS = "/repos/{owner}/{repo}/issues/{index}/comments" 2138 2139 OPEN = "open" 2140 CLOSED = "closed" 2141 2142 class MergeType(Enum): 2143 MERGE = "merge" 2144 REBASE = "rebase" 2145 REBASE_MERGE = "rebase-merge" 2146 SQUASH = "squash" 2147 MANUALLY_MERGED = "manually-merged" 2148 2149 def __init__(self, allspice_client): 2150 super().__init__(allspice_client) 2151 2152 def __eq__(self, other): 2153 if not isinstance(other, DesignReview): 2154 return False 2155 return self.repository == other.repository and self.id == other.id 2156 2157 def __hash__(self): 2158 return hash(self.repository) ^ hash(self.id) 2159 2160 @classmethod 2161 def parse_response(cls, allspice_client, result) -> "DesignReview": 2162 api_object = super().parse_response(allspice_client, result) 2163 cls._add_read_property( 2164 "repository", 2165 Repository.parse_response(allspice_client, result["base"]["repo"]), 2166 api_object, 2167 ) 2168 2169 return api_object 2170 2171 @classmethod 2172 def request(cls, allspice_client, owner: str, repo: str, number: str): 2173 """See https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest""" 2174 return cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number}) 2175 2176 _fields_to_parsers: ClassVar[dict] = { 2177 "assignee": lambda allspice_client, u: User.parse_response(allspice_client, u), 2178 "assignees": lambda allspice_client, us: [ 2179 User.parse_response(allspice_client, u) for u in us 2180 ], 2181 "base": lambda _, b: b["ref"], 2182 "head": lambda _, h: h["ref"], 2183 "merged_by": lambda allspice_client, u: User.parse_response(allspice_client, u), 2184 "milestone": lambda allspice_client, m: Milestone.parse_response(allspice_client, m), 2185 "user": lambda allspice_client, u: User.parse_response(allspice_client, u), 2186 } 2187 2188 _patchable_fields: ClassVar[set[str]] = { 2189 "allow_maintainer_edits", 2190 "assignee", 2191 "assignees", 2192 "base", 2193 "body", 2194 "due_date", 2195 "milestone", 2196 "state", 2197 "title", 2198 } 2199 2200 _parsers_to_fields: ClassVar[dict] = { 2201 "assignee": lambda u: u.username, 2202 "assignees": lambda us: [u.username for u in us], 2203 "base": lambda b: b.name if isinstance(b, Branch) else b, 2204 "milestone": lambda m: m.id, 2205 } 2206 2207 def commit(self): 2208 data = self.get_dirty_fields() 2209 if "due_date" in data and data["due_date"] is None: 2210 data["unset_due_date"] = True 2211 2212 args = { 2213 "owner": self.repository.owner.username, 2214 "repo": self.repository.name, 2215 "index": self.number, 2216 } 2217 self._commit(args, data) 2218 2219 def merge(self, merge_type: MergeType): 2220 """ 2221 Merge the pull request. See 2222 https://hub.allspice.io/api/swagger#/repository/repoMergePullRequest 2223 2224 :param merge_type: The type of merge to perform. See the MergeType enum. 2225 """ 2226 2227 self.allspice_client.requests_post( 2228 self.MERGE_DESIGN_REVIEW.format( 2229 owner=self.repository.owner.username, 2230 repo=self.repository.name, 2231 index=self.number, 2232 ), 2233 data={"Do": merge_type.value}, 2234 ) 2235 2236 def get_comments(self) -> List[Comment]: 2237 """ 2238 Get the comments on this pull request, but not specifically on a review. 2239 2240 https://hub.allspice.io/api/swagger#/issue/issueGetComments 2241 2242 :return: A list of comments on this pull request. 2243 """ 2244 2245 results = self.allspice_client.requests_get( 2246 self.GET_COMMENTS.format( 2247 owner=self.repository.owner.username, 2248 repo=self.repository.name, 2249 index=self.number, 2250 ) 2251 ) 2252 return [Comment.parse_response(self.allspice_client, result) for result in results] 2253 2254 def create_comment(self, body: str): 2255 """ 2256 Create a comment on this pull request. This uses the same endpoint as the 2257 comments on issues, and will not be associated with any reviews. 2258 2259 https://hub.allspice.io/api/swagger#/issue/issueCreateComment 2260 2261 :param body: The body of the comment. 2262 :return: The comment that was created. 2263 """ 2264 2265 result = self.allspice_client.requests_post( 2266 self.GET_COMMENTS.format( 2267 owner=self.repository.owner.username, 2268 repo=self.repository.name, 2269 index=self.number, 2270 ), 2271 data={"body": body}, 2272 ) 2273 return Comment.parse_response(self.allspice_client, result)
A Design Review. See allspice.allspice.io/api/swagger#/repository/repoGetPullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoGetPullRequest.
Note: The base and head fields are not Branch
objects - they are plain strings
referring to the branch names. This is because DRs can exist for branches that have
been deleted, which don't have an associated Branch
object from the API. You can use
the Repository.get_branch
method to get a Branch
object for a branch if you know
it exists.
2160 @classmethod 2161 def parse_response(cls, allspice_client, result) -> "DesignReview": 2162 api_object = super().parse_response(allspice_client, result) 2163 cls._add_read_property( 2164 "repository", 2165 Repository.parse_response(allspice_client, result["base"]["repo"]), 2166 api_object, 2167 ) 2168 2169 return api_object
2171 @classmethod 2172 def request(cls, allspice_client, owner: str, repo: str, number: str): 2173 """See https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest""" 2174 return cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number})
See allspice.allspice.io/api/swagger#/repository/repoGetPullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoGetPullRequest
2207 def commit(self): 2208 data = self.get_dirty_fields() 2209 if "due_date" in data and data["due_date"] is None: 2210 data["unset_due_date"] = True 2211 2212 args = { 2213 "owner": self.repository.owner.username, 2214 "repo": self.repository.name, 2215 "index": self.number, 2216 } 2217 self._commit(args, data)
2219 def merge(self, merge_type: MergeType): 2220 """ 2221 Merge the pull request. See 2222 https://hub.allspice.io/api/swagger#/repository/repoMergePullRequest 2223 2224 :param merge_type: The type of merge to perform. See the MergeType enum. 2225 """ 2226 2227 self.allspice_client.requests_post( 2228 self.MERGE_DESIGN_REVIEW.format( 2229 owner=self.repository.owner.username, 2230 repo=self.repository.name, 2231 index=self.number, 2232 ), 2233 data={"Do": merge_type.value}, 2234 )
Merge the pull request. See allspice.allspice.io/api/swagger#/repository/repoMergePullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoMergePullRequest
Parameters
- merge_type: The type of merge to perform. See the MergeType enum.
2236 def get_comments(self) -> List[Comment]: 2237 """ 2238 Get the comments on this pull request, but not specifically on a review. 2239 2240 https://hub.allspice.io/api/swagger#/issue/issueGetComments 2241 2242 :return: A list of comments on this pull request. 2243 """ 2244 2245 results = self.allspice_client.requests_get( 2246 self.GET_COMMENTS.format( 2247 owner=self.repository.owner.username, 2248 repo=self.repository.name, 2249 index=self.number, 2250 ) 2251 ) 2252 return [Comment.parse_response(self.allspice_client, result) for result in results]
Get the comments on this pull request, but not specifically on a review.
allspice.allspice.io/api/swagger#/issue/issueGetComments">https://huballspice.allspice.io/api/swagger#/issue/issueGetComments
Returns
A list of comments on this pull request.
2254 def create_comment(self, body: str): 2255 """ 2256 Create a comment on this pull request. This uses the same endpoint as the 2257 comments on issues, and will not be associated with any reviews. 2258 2259 https://hub.allspice.io/api/swagger#/issue/issueCreateComment 2260 2261 :param body: The body of the comment. 2262 :return: The comment that was created. 2263 """ 2264 2265 result = self.allspice_client.requests_post( 2266 self.GET_COMMENTS.format( 2267 owner=self.repository.owner.username, 2268 repo=self.repository.name, 2269 index=self.number, 2270 ), 2271 data={"body": body}, 2272 ) 2273 return Comment.parse_response(self.allspice_client, result)
Create a comment on this pull request. This uses the same endpoint as the comments on issues, and will not be associated with any reviews.
allspice.allspice.io/api/swagger#/issue/issueCreateComment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateComment
Parameters
- body: The body of the comment.
Returns
The comment that was created.
1881class Issue(ApiObject): 1882 """ 1883 An issue on a repository. 1884 1885 Note: `Issue.assets` may not have any entries even if the issue has 1886 attachments. This happens when an issue is fetched via a bulk method like 1887 `Repository.get_issues`. In most cases, prefer using 1888 `Issue.get_attachments` to get the attachments on an issue. 1889 """ 1890 1891 assets: List[Union[Any, "Attachment"]] 1892 assignee: Any 1893 assignees: Any 1894 body: str 1895 closed_at: Any 1896 comments: int 1897 created_at: str 1898 due_date: Any 1899 html_url: str 1900 id: int 1901 is_locked: bool 1902 labels: List[Any] 1903 milestone: Optional["Milestone"] 1904 number: int 1905 original_author: str 1906 original_author_id: int 1907 pin_order: int 1908 pull_request: Any 1909 ref: str 1910 repository: Dict[str, Union[int, str]] 1911 state: str 1912 title: str 1913 updated_at: str 1914 url: str 1915 user: User 1916 1917 API_OBJECT = """/repos/{owner}/{repo}/issues/{index}""" # <owner, repo, index> 1918 GET_TIME = """/repos/%s/%s/issues/%s/times""" # <owner, repo, index> 1919 GET_COMMENTS = """/repos/{owner}/{repo}/issues/{index}/comments""" 1920 CREATE_ISSUE = """/repos/{owner}/{repo}/issues""" 1921 GET_ATTACHMENTS = """/repos/{owner}/{repo}/issues/{index}/assets""" 1922 1923 OPENED = "open" 1924 CLOSED = "closed" 1925 1926 def __init__(self, allspice_client): 1927 super().__init__(allspice_client) 1928 1929 def __eq__(self, other): 1930 if not isinstance(other, Issue): 1931 return False 1932 return self.repository == other.repository and self.id == other.id 1933 1934 def __hash__(self): 1935 return hash(self.repository) ^ hash(self.id) 1936 1937 _fields_to_parsers: ClassVar[dict] = { 1938 "milestone": lambda allspice_client, m: Milestone.parse_response(allspice_client, m), 1939 "user": lambda allspice_client, u: User.parse_response(allspice_client, u), 1940 "assets": lambda allspice_client, assets: [ 1941 Attachment.parse_response(allspice_client, a) for a in assets 1942 ], 1943 "assignee": lambda allspice_client, u: User.parse_response(allspice_client, u), 1944 "assignees": lambda allspice_client, us: [ 1945 User.parse_response(allspice_client, u) for u in us 1946 ], 1947 "state": lambda _, s: (Issue.CLOSED if s == "closed" else Issue.OPENED), 1948 } 1949 1950 _parsers_to_fields: ClassVar[dict] = { 1951 "milestone": lambda m: m.id, 1952 } 1953 1954 _patchable_fields: ClassVar[set[str]] = { 1955 "assignee", 1956 "assignees", 1957 "body", 1958 "due_date", 1959 "milestone", 1960 "state", 1961 "title", 1962 } 1963 1964 def commit(self): 1965 args = { 1966 "owner": self.repository.owner.username, 1967 "repo": self.repository.name, 1968 "index": self.number, 1969 } 1970 self._commit(args) 1971 1972 @classmethod 1973 def request(cls, allspice_client, owner: str, repo: str, number: str): 1974 api_object = cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number}) 1975 # The repository in the response is a RepositoryMeta object, so request 1976 # the full repository object and add it to the issue object. 1977 repository = Repository.request(allspice_client, owner, repo) 1978 setattr(api_object, "_repository", repository) 1979 # For legacy reasons 1980 cls._add_read_property("repo", repository, api_object) 1981 return api_object 1982 1983 @classmethod 1984 def create_issue(cls, allspice_client, repo: Repository, title: str, body: str = ""): 1985 args = {"owner": repo.owner.username, "repo": repo.name} 1986 data = {"title": title, "body": body} 1987 result = allspice_client.requests_post(Issue.CREATE_ISSUE.format(**args), data=data) 1988 issue = Issue.parse_response(allspice_client, result) 1989 setattr(issue, "_repository", repo) 1990 cls._add_read_property("repo", repo, issue) 1991 return issue 1992 1993 @property 1994 def owner(self) -> Organization | User: 1995 return self.repository.owner 1996 1997 def get_time_sum(self, user: User) -> int: 1998 results = self.allspice_client.requests_get( 1999 Issue.GET_TIME % (self.owner.username, self.repository.name, self.number) 2000 ) 2001 return sum(result["time"] for result in results if result and result["user_id"] == user.id) 2002 2003 def get_times(self) -> Optional[Dict]: 2004 return self.allspice_client.requests_get( 2005 Issue.GET_TIME % (self.owner.username, self.repository.name, self.number) 2006 ) 2007 2008 def delete_time(self, time_id: str): 2009 path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times/{time_id}" 2010 self.allspice_client.requests_delete(path) 2011 2012 def add_time(self, time: int, created: Optional[str] = None, user_name: Optional[User] = None): 2013 path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times" 2014 self.allspice_client.requests_post( 2015 path, data={"created": created, "time": int(time), "user_name": user_name} 2016 ) 2017 2018 def get_comments(self) -> List[Comment]: 2019 """https://hub.allspice.io/api/swagger#/issue/issueGetComments""" 2020 2021 results = self.allspice_client.requests_get( 2022 self.GET_COMMENTS.format( 2023 owner=self.owner.username, repo=self.repository.name, index=self.number 2024 ) 2025 ) 2026 2027 return [Comment.parse_response(self.allspice_client, result) for result in results] 2028 2029 def create_comment(self, body: str) -> Comment: 2030 """https://hub.allspice.io/api/swagger#/issue/issueCreateComment""" 2031 2032 path = self.GET_COMMENTS.format( 2033 owner=self.owner.username, repo=self.repository.name, index=self.number 2034 ) 2035 2036 response = self.allspice_client.requests_post(path, data={"body": body}) 2037 return Comment.parse_response(self.allspice_client, response) 2038 2039 def get_attachments(self) -> List[Attachment]: 2040 """ 2041 Fetch all attachments on this issue. 2042 2043 Unlike the assets field, this will always fetch all attachments from the 2044 API. 2045 2046 See https://hub.allspice.io/api/swagger#/issue/issueListIssueAttachments 2047 """ 2048 2049 path = self.GET_ATTACHMENTS.format( 2050 owner=self.owner.username, repo=self.repository.name, index=self.number 2051 ) 2052 response = self.allspice_client.requests_get(path) 2053 2054 return [Attachment.parse_response(self.allspice_client, result) for result in response] 2055 2056 def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment: 2057 """ 2058 Create an attachment on this issue. 2059 2060 https://hub.allspice.io/api/swagger#/issue/issueCreateIssueAttachment 2061 2062 :param file: The file to attach. This should be a file-like object. 2063 :param name: The name of the file. If not provided, the name of the file will be 2064 used. 2065 :return: The created attachment. 2066 """ 2067 2068 args: dict[str, Any] = { 2069 "files": {"attachment": file}, 2070 } 2071 if name is not None: 2072 args["params"] = {"name": name} 2073 2074 result = self.allspice_client.requests_post( 2075 self.GET_ATTACHMENTS.format( 2076 owner=self.owner.username, repo=self.repository.name, index=self.number 2077 ), 2078 **args, 2079 ) 2080 2081 return Attachment.parse_response(self.allspice_client, result)
An issue on a repository.
Note: Issue.assets
may not have any entries even if the issue has
attachments. This happens when an issue is fetched via a bulk method like
Repository.get_issues
. In most cases, prefer using
Issue.get_attachments
to get the attachments on an issue.
1972 @classmethod 1973 def request(cls, allspice_client, owner: str, repo: str, number: str): 1974 api_object = cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number}) 1975 # The repository in the response is a RepositoryMeta object, so request 1976 # the full repository object and add it to the issue object. 1977 repository = Repository.request(allspice_client, owner, repo) 1978 setattr(api_object, "_repository", repository) 1979 # For legacy reasons 1980 cls._add_read_property("repo", repository, api_object) 1981 return api_object
1983 @classmethod 1984 def create_issue(cls, allspice_client, repo: Repository, title: str, body: str = ""): 1985 args = {"owner": repo.owner.username, "repo": repo.name} 1986 data = {"title": title, "body": body} 1987 result = allspice_client.requests_post(Issue.CREATE_ISSUE.format(**args), data=data) 1988 issue = Issue.parse_response(allspice_client, result) 1989 setattr(issue, "_repository", repo) 1990 cls._add_read_property("repo", repo, issue) 1991 return issue
2012 def add_time(self, time: int, created: Optional[str] = None, user_name: Optional[User] = None): 2013 path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times" 2014 self.allspice_client.requests_post( 2015 path, data={"created": created, "time": int(time), "user_name": user_name} 2016 )
2018 def get_comments(self) -> List[Comment]: 2019 """https://hub.allspice.io/api/swagger#/issue/issueGetComments""" 2020 2021 results = self.allspice_client.requests_get( 2022 self.GET_COMMENTS.format( 2023 owner=self.owner.username, repo=self.repository.name, index=self.number 2024 ) 2025 ) 2026 2027 return [Comment.parse_response(self.allspice_client, result) for result in results]
allspice.allspice.io/api/swagger#/issue/issueGetComments">https://huballspice.allspice.io/api/swagger#/issue/issueGetComments
2029 def create_comment(self, body: str) -> Comment: 2030 """https://hub.allspice.io/api/swagger#/issue/issueCreateComment""" 2031 2032 path = self.GET_COMMENTS.format( 2033 owner=self.owner.username, repo=self.repository.name, index=self.number 2034 ) 2035 2036 response = self.allspice_client.requests_post(path, data={"body": body}) 2037 return Comment.parse_response(self.allspice_client, response)
allspice.allspice.io/api/swagger#/issue/issueCreateComment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateComment
2039 def get_attachments(self) -> List[Attachment]: 2040 """ 2041 Fetch all attachments on this issue. 2042 2043 Unlike the assets field, this will always fetch all attachments from the 2044 API. 2045 2046 See https://hub.allspice.io/api/swagger#/issue/issueListIssueAttachments 2047 """ 2048 2049 path = self.GET_ATTACHMENTS.format( 2050 owner=self.owner.username, repo=self.repository.name, index=self.number 2051 ) 2052 response = self.allspice_client.requests_get(path) 2053 2054 return [Attachment.parse_response(self.allspice_client, result) for result in response]
Fetch all attachments on this issue.
Unlike the assets field, this will always fetch all attachments from the API.
See allspice.allspice.io/api/swagger#/issue/issueListIssueAttachments">https://huballspice.allspice.io/api/swagger#/issue/issueListIssueAttachments
2056 def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment: 2057 """ 2058 Create an attachment on this issue. 2059 2060 https://hub.allspice.io/api/swagger#/issue/issueCreateIssueAttachment 2061 2062 :param file: The file to attach. This should be a file-like object. 2063 :param name: The name of the file. If not provided, the name of the file will be 2064 used. 2065 :return: The created attachment. 2066 """ 2067 2068 args: dict[str, Any] = { 2069 "files": {"attachment": file}, 2070 } 2071 if name is not None: 2072 args["params"] = {"name": name} 2073 2074 result = self.allspice_client.requests_post( 2075 self.GET_ATTACHMENTS.format( 2076 owner=self.owner.username, repo=self.repository.name, index=self.number 2077 ), 2078 **args, 2079 ) 2080 2081 return Attachment.parse_response(self.allspice_client, result)
Create an attachment on this issue.
allspice.allspice.io/api/swagger#/issue/issueCreateIssueAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateIssueAttachment
Parameters
- file: The file to attach. This should be a file-like object.
- name: The name of the file. If not provided, the name of the file will be used.
Returns
The created attachment.
1450class Milestone(ApiObject): 1451 allow_merge_commits: Any 1452 allow_rebase: Any 1453 allow_rebase_explicit: Any 1454 allow_squash_merge: Any 1455 archived: Any 1456 closed_at: Any 1457 closed_issues: int 1458 created_at: str 1459 default_branch: Any 1460 description: str 1461 due_on: Any 1462 has_issues: Any 1463 has_pull_requests: Any 1464 has_wiki: Any 1465 id: int 1466 ignore_whitespace_conflicts: Any 1467 name: Any 1468 open_issues: int 1469 private: Any 1470 state: str 1471 title: str 1472 updated_at: str 1473 website: Any 1474 1475 API_OBJECT = """/repos/{owner}/{repo}/milestones/{number}""" # <owner, repo> 1476 1477 def __init__(self, allspice_client): 1478 super().__init__(allspice_client) 1479 1480 def __eq__(self, other): 1481 if not isinstance(other, Milestone): 1482 return False 1483 return self.allspice_client == other.allspice_client and self.id == other.id 1484 1485 def __hash__(self): 1486 return hash(self.allspice_client) ^ hash(self.id) 1487 1488 _fields_to_parsers: ClassVar[dict] = { 1489 "closed_at": lambda _, t: Util.convert_time(t), 1490 "due_on": lambda _, t: Util.convert_time(t), 1491 } 1492 1493 _patchable_fields: ClassVar[set[str]] = { 1494 "allow_merge_commits", 1495 "allow_rebase", 1496 "allow_rebase_explicit", 1497 "allow_squash_merge", 1498 "archived", 1499 "default_branch", 1500 "description", 1501 "has_issues", 1502 "has_pull_requests", 1503 "has_wiki", 1504 "ignore_whitespace_conflicts", 1505 "name", 1506 "private", 1507 "website", 1508 } 1509 1510 @classmethod 1511 def request(cls, allspice_client, owner: str, repo: str, number: str): 1512 return cls._request(allspice_client, {"owner": owner, "repo": repo, "number": number})
Common base class for all non-exit exceptions.
33class Organization(ApiObject): 34 """see https://hub.allspice.io/api/swagger#/organization/orgGetAll""" 35 36 active: Optional[bool] 37 avatar_url: str 38 created: Optional[str] 39 description: str 40 email: str 41 followers_count: Optional[int] 42 following_count: Optional[int] 43 full_name: str 44 html_url: Optional[str] 45 id: int 46 is_admin: Optional[bool] 47 language: Optional[str] 48 last_login: Optional[str] 49 location: str 50 login: Optional[str] 51 login_name: Optional[str] 52 name: Optional[str] 53 prohibit_login: Optional[bool] 54 repo_admin_change_team_access: Optional[bool] 55 restricted: Optional[bool] 56 source_id: Optional[int] 57 starred_repos_count: Optional[int] 58 username: str 59 visibility: str 60 website: str 61 62 API_OBJECT = """/orgs/{name}""" # <org> 63 ORG_REPOS_REQUEST = """/orgs/%s/repos""" # <org> 64 ORG_TEAMS_REQUEST = """/orgs/%s/teams""" # <org> 65 ORG_TEAMS_CREATE = """/orgs/%s/teams""" # <org> 66 ORG_GET_MEMBERS = """/orgs/%s/members""" # <org> 67 ORG_IS_MEMBER = """/orgs/%s/members/%s""" # <org>, <username> 68 ORG_HEATMAP = """/users/%s/heatmap""" # <username> 69 70 def __init__(self, allspice_client): 71 super().__init__(allspice_client) 72 73 def __eq__(self, other): 74 if not isinstance(other, Organization): 75 return False 76 return self.allspice_client == other.allspice_client and self.name == other.name 77 78 def __hash__(self): 79 return hash(self.allspice_client) ^ hash(self.name) 80 81 @classmethod 82 def request(cls, allspice_client, name: str) -> Self: 83 return cls._request(allspice_client, {"name": name}) 84 85 @classmethod 86 def parse_response(cls, allspice_client, result) -> "Organization": 87 api_object = super().parse_response(allspice_client, result) 88 # add "name" field to make this behave similar to users for gitea < 1.18 89 # also necessary for repository-owner when org is repo owner 90 if not hasattr(api_object, "name"): 91 Organization._add_read_property("name", result["username"], api_object) 92 return api_object 93 94 _patchable_fields: ClassVar[set[str]] = { 95 "description", 96 "full_name", 97 "location", 98 "visibility", 99 "website", 100 } 101 102 def commit(self): 103 args = {"name": self.name} 104 self._commit(args) 105 106 def create_repo( 107 self, 108 repoName: str, 109 description: str = "", 110 private: bool = False, 111 autoInit=True, 112 gitignores: Optional[str] = None, 113 license: Optional[str] = None, 114 readme: str = "Default", 115 issue_labels: Optional[str] = None, 116 default_branch="master", 117 ): 118 """Create an organization Repository 119 120 Throws: 121 AlreadyExistsException: If the Repository exists already. 122 Exception: If something else went wrong. 123 """ 124 result = self.allspice_client.requests_post( 125 f"/orgs/{self.name}/repos", 126 data={ 127 "name": repoName, 128 "description": description, 129 "private": private, 130 "auto_init": autoInit, 131 "gitignores": gitignores, 132 "license": license, 133 "issue_labels": issue_labels, 134 "readme": readme, 135 "default_branch": default_branch, 136 }, 137 ) 138 if "id" in result: 139 self.allspice_client.logger.info("Successfully created Repository %s " % result["name"]) 140 else: 141 self.allspice_client.logger.error(result["message"]) 142 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 143 return Repository.parse_response(self.allspice_client, result) 144 145 def get_repositories(self) -> List["Repository"]: 146 results = self.allspice_client.requests_get_paginated( 147 Organization.ORG_REPOS_REQUEST % self.username 148 ) 149 return [Repository.parse_response(self.allspice_client, result) for result in results] 150 151 def get_repository(self, name) -> "Repository": 152 repos = self.get_repositories() 153 for repo in repos: 154 if repo.name == name: 155 return repo 156 raise NotFoundException("Repository %s not existent in organization." % name) 157 158 def get_teams(self) -> List["Team"]: 159 results = self.allspice_client.requests_get(Organization.ORG_TEAMS_REQUEST % self.username) 160 teams = [Team.parse_response(self.allspice_client, result) for result in results] 161 # organisation seems to be missing using this request, so we add org manually 162 for t in teams: 163 setattr(t, "_organization", self) 164 return teams 165 166 def get_team(self, name) -> "Team": 167 teams = self.get_teams() 168 for team in teams: 169 if team.name == name: 170 return team 171 raise NotFoundException("Team not existent in organization.") 172 173 def create_team( 174 self, 175 name: str, 176 description: str = "", 177 permission: str = "read", 178 can_create_org_repo: bool = False, 179 includes_all_repositories: bool = False, 180 units=( 181 "repo.code", 182 "repo.issues", 183 "repo.ext_issues", 184 "repo.wiki", 185 "repo.pulls", 186 "repo.releases", 187 "repo.ext_wiki", 188 ), 189 units_map={}, 190 ) -> "Team": 191 """Alias for AllSpice#create_team""" 192 # TODO: Move AllSpice#create_team to Organization#create_team and 193 # deprecate AllSpice#create_team. 194 return self.allspice_client.create_team( 195 org=self, 196 name=name, 197 description=description, 198 permission=permission, 199 can_create_org_repo=can_create_org_repo, 200 includes_all_repositories=includes_all_repositories, 201 units=units, 202 units_map=units_map, 203 ) 204 205 def get_members(self) -> List["User"]: 206 results = self.allspice_client.requests_get(Organization.ORG_GET_MEMBERS % self.username) 207 return [User.parse_response(self.allspice_client, result) for result in results] 208 209 def is_member(self, username) -> bool: 210 if isinstance(username, User): 211 username = username.username 212 try: 213 # returns 204 if its ok, 404 if its not 214 self.allspice_client.requests_get( 215 Organization.ORG_IS_MEMBER % (self.username, username) 216 ) 217 return True 218 except Exception: 219 return False 220 221 def remove_member(self, user: "User"): 222 path = f"/orgs/{self.username}/members/{user.username}" 223 self.allspice_client.requests_delete(path) 224 225 def delete(self): 226 """Delete this Organization. Invalidates this Objects data. Also deletes all Repositories owned by the User""" 227 for repo in self.get_repositories(): 228 repo.delete() 229 self.allspice_client.requests_delete(Organization.API_OBJECT.format(name=self.username)) 230 self.deleted = True 231 232 def get_heatmap(self) -> List[Tuple[datetime, int]]: 233 results = self.allspice_client.requests_get(User.USER_HEATMAP % self.username) 234 results = [ 235 (datetime.fromtimestamp(result["timestamp"]), result["contributions"]) 236 for result in results 237 ] 238 return results
see allspice.allspice.io/api/swagger#/organization/orgGetAll">https://huballspice.allspice.io/api/swagger#/organization/orgGetAll
85 @classmethod 86 def parse_response(cls, allspice_client, result) -> "Organization": 87 api_object = super().parse_response(allspice_client, result) 88 # add "name" field to make this behave similar to users for gitea < 1.18 89 # also necessary for repository-owner when org is repo owner 90 if not hasattr(api_object, "name"): 91 Organization._add_read_property("name", result["username"], api_object) 92 return api_object
106 def create_repo( 107 self, 108 repoName: str, 109 description: str = "", 110 private: bool = False, 111 autoInit=True, 112 gitignores: Optional[str] = None, 113 license: Optional[str] = None, 114 readme: str = "Default", 115 issue_labels: Optional[str] = None, 116 default_branch="master", 117 ): 118 """Create an organization Repository 119 120 Throws: 121 AlreadyExistsException: If the Repository exists already. 122 Exception: If something else went wrong. 123 """ 124 result = self.allspice_client.requests_post( 125 f"/orgs/{self.name}/repos", 126 data={ 127 "name": repoName, 128 "description": description, 129 "private": private, 130 "auto_init": autoInit, 131 "gitignores": gitignores, 132 "license": license, 133 "issue_labels": issue_labels, 134 "readme": readme, 135 "default_branch": default_branch, 136 }, 137 ) 138 if "id" in result: 139 self.allspice_client.logger.info("Successfully created Repository %s " % result["name"]) 140 else: 141 self.allspice_client.logger.error(result["message"]) 142 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 143 return Repository.parse_response(self.allspice_client, result)
Create an organization Repository
Throws: AlreadyExistsException: If the Repository exists already. Exception: If something else went wrong.
158 def get_teams(self) -> List["Team"]: 159 results = self.allspice_client.requests_get(Organization.ORG_TEAMS_REQUEST % self.username) 160 teams = [Team.parse_response(self.allspice_client, result) for result in results] 161 # organisation seems to be missing using this request, so we add org manually 162 for t in teams: 163 setattr(t, "_organization", self) 164 return teams
173 def create_team( 174 self, 175 name: str, 176 description: str = "", 177 permission: str = "read", 178 can_create_org_repo: bool = False, 179 includes_all_repositories: bool = False, 180 units=( 181 "repo.code", 182 "repo.issues", 183 "repo.ext_issues", 184 "repo.wiki", 185 "repo.pulls", 186 "repo.releases", 187 "repo.ext_wiki", 188 ), 189 units_map={}, 190 ) -> "Team": 191 """Alias for AllSpice#create_team""" 192 # TODO: Move AllSpice#create_team to Organization#create_team and 193 # deprecate AllSpice#create_team. 194 return self.allspice_client.create_team( 195 org=self, 196 name=name, 197 description=description, 198 permission=permission, 199 can_create_org_repo=can_create_org_repo, 200 includes_all_repositories=includes_all_repositories, 201 units=units, 202 units_map=units_map, 203 )
Alias for AllSpice#create_team
209 def is_member(self, username) -> bool: 210 if isinstance(username, User): 211 username = username.username 212 try: 213 # returns 204 if its ok, 404 if its not 214 self.allspice_client.requests_get( 215 Organization.ORG_IS_MEMBER % (self.username, username) 216 ) 217 return True 218 except Exception: 219 return False
225 def delete(self): 226 """Delete this Organization. Invalidates this Objects data. Also deletes all Repositories owned by the User""" 227 for repo in self.get_repositories(): 228 repo.delete() 229 self.allspice_client.requests_delete(Organization.API_OBJECT.format(name=self.username)) 230 self.deleted = True
Delete this Organization. Invalidates this Objects data. Also deletes all Repositories owned by the User
2359class Release(ApiObject): 2360 """ 2361 A release on a repo. 2362 """ 2363 2364 assets: List[Union[Any, Dict[str, Union[int, str]], "ReleaseAsset"]] 2365 author: User 2366 body: str 2367 created_at: str 2368 draft: bool 2369 html_url: str 2370 id: int 2371 name: str 2372 prerelease: bool 2373 published_at: str 2374 repo: Optional["Repository"] 2375 repository: Optional["Repository"] 2376 tag_name: str 2377 tarball_url: str 2378 target_commitish: str 2379 upload_url: str 2380 url: str 2381 zipball_url: str 2382 2383 API_OBJECT = "/repos/{owner}/{repo}/releases/{id}" 2384 RELEASE_CREATE_ASSET = "/repos/{owner}/{repo}/releases/{id}/assets" 2385 # Note that we don't strictly need the get_assets route, as the release 2386 # object already contains the assets. 2387 2388 def __init__(self, allspice_client): 2389 super().__init__(allspice_client) 2390 2391 def __eq__(self, other): 2392 if not isinstance(other, Release): 2393 return False 2394 return self.repo == other.repo and self.id == other.id 2395 2396 def __hash__(self): 2397 return hash(self.repo) ^ hash(self.id) 2398 2399 _fields_to_parsers: ClassVar[dict] = { 2400 "author": lambda allspice_client, author: User.parse_response(allspice_client, author), 2401 } 2402 _patchable_fields: ClassVar[set[str]] = { 2403 "body", 2404 "draft", 2405 "name", 2406 "prerelease", 2407 "tag_name", 2408 "target_commitish", 2409 } 2410 2411 @classmethod 2412 def parse_response(cls, allspice_client, result, repo) -> Release: 2413 release = super().parse_response(allspice_client, result) 2414 Release._add_read_property("repository", repo, release) 2415 # For legacy reasons 2416 Release._add_read_property("repo", repo, release) 2417 setattr( 2418 release, 2419 "_assets", 2420 [ 2421 ReleaseAsset.parse_response(allspice_client, asset, release) 2422 for asset in result["assets"] 2423 ], 2424 ) 2425 return release 2426 2427 @classmethod 2428 def request( 2429 cls, 2430 allspice_client, 2431 owner: str, 2432 repo: str, 2433 id: Optional[int] = None, 2434 ) -> Release: 2435 args = {"owner": owner, "repo": repo, "id": id} 2436 release_response = cls._get_gitea_api_object(allspice_client, args) 2437 repository = Repository.request(allspice_client, owner, repo) 2438 release = cls.parse_response(allspice_client, release_response, repository) 2439 return release 2440 2441 def commit(self): 2442 if self.repo is None: 2443 raise ValueError("Cannot commit a release without a repository.") 2444 2445 args = {"owner": self.repo.owner.username, "repo": self.repo.name, "id": self.id} 2446 self._commit(args) 2447 2448 def create_asset(self, file: IO, name: Optional[str] = None) -> ReleaseAsset: 2449 """ 2450 Create an asset for this release. 2451 2452 https://hub.allspice.io/api/swagger#/repository/repoCreateReleaseAsset 2453 2454 :param file: The file to upload. This should be a file-like object. 2455 :param name: The name of the file. 2456 :return: The created asset. 2457 """ 2458 2459 if self.repo is None: 2460 raise ValueError("Cannot commit a release without a repository.") 2461 2462 args: dict[str, Any] = {"files": {"attachment": file}} 2463 if name is not None: 2464 args["params"] = {"name": name} 2465 2466 result = self.allspice_client.requests_post( 2467 self.RELEASE_CREATE_ASSET.format( 2468 owner=self.repo.owner.username, 2469 repo=self.repo.name, 2470 id=self.id, 2471 ), 2472 **args, 2473 ) 2474 return ReleaseAsset.parse_response(self.allspice_client, result, self) 2475 2476 def delete(self): 2477 if self.repo is None: 2478 raise ValueError("Cannot commit a release without a repository.") 2479 2480 args = {"owner": self.repo.owner.username, "repo": self.repo.name, "id": self.id} 2481 self.allspice_client.requests_delete(self.API_OBJECT.format(**args)) 2482 self.deleted = True
A release on a repo.
2411 @classmethod 2412 def parse_response(cls, allspice_client, result, repo) -> Release: 2413 release = super().parse_response(allspice_client, result) 2414 Release._add_read_property("repository", repo, release) 2415 # For legacy reasons 2416 Release._add_read_property("repo", repo, release) 2417 setattr( 2418 release, 2419 "_assets", 2420 [ 2421 ReleaseAsset.parse_response(allspice_client, asset, release) 2422 for asset in result["assets"] 2423 ], 2424 ) 2425 return release
2427 @classmethod 2428 def request( 2429 cls, 2430 allspice_client, 2431 owner: str, 2432 repo: str, 2433 id: Optional[int] = None, 2434 ) -> Release: 2435 args = {"owner": owner, "repo": repo, "id": id} 2436 release_response = cls._get_gitea_api_object(allspice_client, args) 2437 repository = Repository.request(allspice_client, owner, repo) 2438 release = cls.parse_response(allspice_client, release_response, repository) 2439 return release
2448 def create_asset(self, file: IO, name: Optional[str] = None) -> ReleaseAsset: 2449 """ 2450 Create an asset for this release. 2451 2452 https://hub.allspice.io/api/swagger#/repository/repoCreateReleaseAsset 2453 2454 :param file: The file to upload. This should be a file-like object. 2455 :param name: The name of the file. 2456 :return: The created asset. 2457 """ 2458 2459 if self.repo is None: 2460 raise ValueError("Cannot commit a release without a repository.") 2461 2462 args: dict[str, Any] = {"files": {"attachment": file}} 2463 if name is not None: 2464 args["params"] = {"name": name} 2465 2466 result = self.allspice_client.requests_post( 2467 self.RELEASE_CREATE_ASSET.format( 2468 owner=self.repo.owner.username, 2469 repo=self.repo.name, 2470 id=self.id, 2471 ), 2472 **args, 2473 ) 2474 return ReleaseAsset.parse_response(self.allspice_client, result, self)
Create an asset for this release.
allspice.allspice.io/api/swagger#/repository/repoCreateReleaseAsset">https://huballspice.allspice.io/api/swagger#/repository/repoCreateReleaseAsset
Parameters
- file: The file to upload. This should be a file-like object.
- name: The name of the file.
Returns
The created asset.
2476 def delete(self): 2477 if self.repo is None: 2478 raise ValueError("Cannot commit a release without a repository.") 2479 2480 args = {"owner": self.repo.owner.username, "repo": self.repo.name, "id": self.id} 2481 self.allspice_client.requests_delete(self.API_OBJECT.format(**args)) 2482 self.deleted = True
477class Repository(ApiObject): 478 allow_fast_forward_only_merge: bool 479 allow_manual_merge: Any 480 allow_merge_commits: bool 481 allow_rebase: bool 482 allow_rebase_explicit: bool 483 allow_rebase_update: bool 484 allow_squash_merge: bool 485 archived: bool 486 archived_at: str 487 autodetect_manual_merge: Any 488 avatar_url: str 489 clone_url: str 490 created_at: str 491 default_allow_maintainer_edit: bool 492 default_branch: str 493 default_delete_branch_after_merge: bool 494 default_merge_style: str 495 description: str 496 empty: bool 497 enable_prune: Any 498 external_tracker: Any 499 external_wiki: Any 500 fork: bool 501 forks_count: int 502 full_name: str 503 has_actions: bool 504 has_issues: bool 505 has_packages: bool 506 has_projects: bool 507 has_pull_requests: bool 508 has_releases: bool 509 has_wiki: bool 510 html_url: str 511 id: int 512 ignore_whitespace_conflicts: bool 513 internal: bool 514 internal_tracker: Dict[str, bool] 515 language: str 516 languages_url: str 517 link: str 518 mirror: bool 519 mirror_interval: str 520 mirror_updated: str 521 name: str 522 object_format_name: str 523 open_issues_count: int 524 open_pr_counter: int 525 original_url: str 526 owner: Union["User", "Organization"] 527 parent: Any 528 permissions: Dict[str, bool] 529 private: bool 530 projects_mode: str 531 release_counter: int 532 repo_transfer: Any 533 size: int 534 ssh_url: str 535 stars_count: int 536 template: bool 537 updated_at: datetime 538 url: str 539 watchers_count: int 540 website: str 541 542 API_OBJECT = """/repos/{owner}/{name}""" # <owner>, <reponame> 543 REPO_IS_COLLABORATOR = """/repos/%s/%s/collaborators/%s""" # <owner>, <reponame>, <username> 544 REPO_SEARCH = """/repos/search/""" 545 REPO_BRANCHES = """/repos/%s/%s/branches""" # <owner>, <reponame> 546 REPO_BRANCH = """/repos/{owner}/{repo}/branches/{branch}""" 547 REPO_ISSUES = """/repos/{owner}/{repo}/issues""" # <owner, reponame> 548 REPO_DESIGN_REVIEWS = """/repos/{owner}/{repo}/pulls""" 549 REPO_DELETE = """/repos/%s/%s""" # <owner>, <reponame> 550 REPO_TIMES = """/repos/%s/%s/times""" # <owner>, <reponame> 551 REPO_USER_TIME = """/repos/%s/%s/times/%s""" # <owner>, <reponame>, <username> 552 REPO_COMMITS = "/repos/%s/%s/commits" # <owner>, <reponame> 553 REPO_TRANSFER = "/repos/{owner}/{repo}/transfer" 554 REPO_MILESTONES = """/repos/{owner}/{repo}/milestones""" 555 REPO_GET_ARCHIVE = "/repos/{owner}/{repo}/archive/{ref}.{format}" 556 REPO_GET_ALLSPICE_JSON = "/repos/{owner}/{repo}/allspice_generated/json/{content}" 557 REPO_GET_ALLSPICE_SVG = "/repos/{owner}/{repo}/allspice_generated/svg/{content}" 558 REPO_GET_TOPICS = "/repos/{owner}/{repo}/topics" 559 REPO_ADD_TOPIC = "/repos/{owner}/{repo}/topics/{topic}" 560 REPO_GET_RELEASES = "/repos/{owner}/{repo}/releases" 561 REPO_GET_LATEST_RELEASE = "/repos/{owner}/{repo}/releases/latest" 562 REPO_GET_RELEASE_BY_TAG = "/repos/{owner}/{repo}/releases/tags/{tag}" 563 REPO_GET_COMMIT_STATUS = "/repos/{owner}/{repo}/statuses/{sha}" 564 REPO_GET_RAW_FILE = "/repos/{owner}/{repo}/raw/{path}" 565 REPO_GET_TREE = "/repos/{owner}/{repo}/git/trees/{ref}" 566 567 class ArchiveFormat(Enum): 568 """ 569 Archive formats for Repository.get_archive 570 """ 571 572 TAR = "tar.gz" 573 ZIP = "zip" 574 575 class CommitStatusSort(Enum): 576 """ 577 Sort order for Repository.get_commit_status 578 """ 579 580 OLDEST = "oldest" 581 RECENT_UPDATE = "recentupdate" 582 LEAST_UPDATE = "leastupdate" 583 LEAST_INDEX = "leastindex" 584 HIGHEST_INDEX = "highestindex" 585 586 def __init__(self, allspice_client): 587 super().__init__(allspice_client) 588 589 def __eq__(self, other): 590 if not isinstance(other, Repository): 591 return False 592 return self.owner == other.owner and self.name == other.name 593 594 def __hash__(self): 595 return hash(self.owner) ^ hash(self.name) 596 597 _fields_to_parsers: ClassVar[dict] = { 598 # dont know how to tell apart user and org as owner except form email being empty. 599 "owner": lambda allspice_client, r: ( 600 Organization.parse_response(allspice_client, r) 601 if r["email"] == "" 602 else User.parse_response(allspice_client, r) 603 ), 604 "updated_at": lambda _, t: Util.convert_time(t), 605 } 606 607 @classmethod 608 def request( 609 cls, 610 allspice_client, 611 owner: str, 612 name: str, 613 ) -> Repository: 614 return cls._request(allspice_client, {"owner": owner, "name": name}) 615 616 @classmethod 617 def search( 618 cls, 619 allspice_client, 620 query: Optional[str] = None, 621 topic: bool = False, 622 include_description: bool = False, 623 user: Optional[User] = None, 624 owner_to_prioritize: Union[User, Organization, None] = None, 625 ) -> list[Repository]: 626 """ 627 Search for repositories. 628 629 See https://hub.allspice.io/api/swagger#/repository/repoSearch 630 631 :param query: The query string to search for 632 :param topic: If true, the query string will only be matched against the 633 repository's topic. 634 :param include_description: If true, the query string will be matched 635 against the repository's description as well. 636 :param user: If specified, only repositories that this user owns or 637 contributes to will be searched. 638 :param owner_to_prioritize: If specified, repositories owned by the 639 given entity will be prioritized in the search. 640 :returns: All repositories matching the query. If there are many 641 repositories matching this query, this may take some time. 642 """ 643 644 params = {} 645 646 if query is not None: 647 params["q"] = query 648 if topic: 649 params["topic"] = topic 650 if include_description: 651 params["include_description"] = include_description 652 if user is not None: 653 params["user"] = user.id 654 if owner_to_prioritize is not None: 655 params["owner_to_prioritize"] = owner_to_prioritize.id 656 657 responses = allspice_client.requests_get_paginated(cls.REPO_SEARCH, params=params) 658 659 return [Repository.parse_response(allspice_client, response) for response in responses] 660 661 _patchable_fields: ClassVar[set[str]] = { 662 "allow_manual_merge", 663 "allow_merge_commits", 664 "allow_rebase", 665 "allow_rebase_explicit", 666 "allow_rebase_update", 667 "allow_squash_merge", 668 "archived", 669 "autodetect_manual_merge", 670 "default_branch", 671 "default_delete_branch_after_merge", 672 "default_merge_style", 673 "description", 674 "enable_prune", 675 "external_tracker", 676 "external_wiki", 677 "has_actions", 678 "has_issues", 679 "has_projects", 680 "has_pull_requests", 681 "has_wiki", 682 "ignore_whitespace_conflicts", 683 "internal_tracker", 684 "mirror_interval", 685 "name", 686 "private", 687 "template", 688 "website", 689 } 690 691 def commit(self): 692 args = {"owner": self.owner.username, "name": self.name} 693 self._commit(args) 694 695 def get_branches(self) -> List["Branch"]: 696 """Get all the Branches of this Repository.""" 697 698 results = self.allspice_client.requests_get_paginated( 699 Repository.REPO_BRANCHES % (self.owner.username, self.name) 700 ) 701 return [Branch.parse_response(self.allspice_client, result) for result in results] 702 703 def get_branch(self, name: str) -> "Branch": 704 """Get a specific Branch of this Repository.""" 705 result = self.allspice_client.requests_get( 706 Repository.REPO_BRANCH.format(owner=self.owner.username, repo=self.name, branch=name) 707 ) 708 return Branch.parse_response(self.allspice_client, result) 709 710 def add_branch(self, create_from: Ref, newname: str) -> "Branch": 711 """Add a branch to the repository""" 712 # Note: will only work with gitea 1.13 or higher! 713 714 ref_name = Util.data_params_for_ref(create_from) 715 if "ref" not in ref_name: 716 raise ValueError("create_from must be a Branch, Commit or string") 717 ref_name = ref_name["ref"] 718 719 data = {"new_branch_name": newname, "old_ref_name": ref_name} 720 result = self.allspice_client.requests_post( 721 Repository.REPO_BRANCHES % (self.owner.username, self.name), data=data 722 ) 723 return Branch.parse_response(self.allspice_client, result) 724 725 def get_issues( 726 self, 727 state: Literal["open", "closed", "all"] = "all", 728 search_query: Optional[str] = None, 729 labels: Optional[List[str]] = None, 730 milestones: Optional[List[Union[Milestone, str]]] = None, 731 assignee: Optional[Union[User, str]] = None, 732 since: Optional[datetime] = None, 733 before: Optional[datetime] = None, 734 ) -> List["Issue"]: 735 """ 736 Get all Issues of this Repository (open and closed) 737 738 https://hub.allspice.io/api/swagger#/repository/repoListIssues 739 740 All params of this method are optional filters. If you don't specify a filter, it 741 will not be applied. 742 743 :param state: The state of the Issues to get. If None, all Issues are returned. 744 :param search_query: Filter issues by text. This is equivalent to searching for 745 `search_query` in the Issues on the web interface. 746 :param labels: Filter issues by labels. 747 :param milestones: Filter issues by milestones. 748 :param assignee: Filter issues by the assigned user. 749 :param since: Filter issues by the date they were created. 750 :param before: Filter issues by the date they were created. 751 :return: A list of Issues. 752 """ 753 754 data = { 755 "state": state, 756 } 757 if search_query: 758 data["q"] = search_query 759 if labels: 760 data["labels"] = ",".join(labels) 761 if milestones: 762 data["milestone"] = ",".join( 763 [ 764 milestone.name if isinstance(milestone, Milestone) else milestone 765 for milestone in milestones 766 ] 767 ) 768 if assignee: 769 if isinstance(assignee, User): 770 data["assignee"] = assignee.username 771 else: 772 data["assignee"] = assignee 773 if since: 774 data["since"] = Util.format_time(since) 775 if before: 776 data["before"] = Util.format_time(before) 777 778 results = self.allspice_client.requests_get_paginated( 779 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 780 params=data, 781 ) 782 783 issues = [] 784 for result in results: 785 issue = Issue.parse_response(self.allspice_client, result) 786 # See Issue.request 787 setattr(issue, "_repository", self) 788 # This is mostly for compatibility with an older implementation 789 Issue._add_read_property("repo", self, issue) 790 issues.append(issue) 791 792 return issues 793 794 def get_design_reviews( 795 self, 796 state: Literal["open", "closed", "all"] = "all", 797 milestone: Optional[Union[Milestone, str]] = None, 798 labels: Optional[List[str]] = None, 799 ) -> List["DesignReview"]: 800 """ 801 Get all Design Reviews of this Repository. 802 803 https://hub.allspice.io/api/swagger#/repository/repoListPullRequests 804 805 :param state: The state of the Design Reviews to get. If None, all Design Reviews 806 are returned. 807 :param milestone: The milestone of the Design Reviews to get. 808 :param labels: A list of label IDs to filter DRs by. 809 :return: A list of Design Reviews. 810 """ 811 812 params = { 813 "state": state, 814 } 815 if milestone: 816 if isinstance(milestone, Milestone): 817 params["milestone"] = milestone.name 818 else: 819 params["milestone"] = milestone 820 if labels: 821 params["labels"] = ",".join(labels) 822 823 results = self.allspice_client.requests_get_paginated( 824 self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name), 825 params=params, 826 ) 827 return [DesignReview.parse_response(self.allspice_client, result) for result in results] 828 829 def get_commits( 830 self, 831 sha: Optional[str] = None, 832 path: Optional[str] = None, 833 stat: bool = True, 834 ) -> List["Commit"]: 835 """ 836 Get all the Commits of this Repository. 837 838 https://hub.allspice.io/api/swagger#/repository/repoGetAllCommits 839 840 :param sha: The SHA of the commit to start listing commits from. 841 :param path: filepath of a file/dir. 842 :param stat: Include the number of additions and deletions in the response. 843 Disable for speedup. 844 :return: A list of Commits. 845 """ 846 847 data = {} 848 if sha: 849 data["sha"] = sha 850 if path: 851 data["path"] = path 852 if not stat: 853 data["stat"] = False 854 855 try: 856 results = self.allspice_client.requests_get_paginated( 857 Repository.REPO_COMMITS % (self.owner.username, self.name), 858 params=data, 859 ) 860 except ConflictException as err: 861 logging.warning(err) 862 logging.warning("Repository %s/%s is Empty" % (self.owner.username, self.name)) 863 results = [] 864 return [Commit.parse_response(self.allspice_client, result) for result in results] 865 866 def get_issues_state(self, state) -> List["Issue"]: 867 """ 868 DEPRECATED: Use get_issues() instead. 869 870 Get issues of state Issue.open or Issue.closed of a repository. 871 """ 872 873 assert state in [Issue.OPENED, Issue.CLOSED] 874 issues = [] 875 data = {"state": state} 876 results = self.allspice_client.requests_get_paginated( 877 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 878 params=data, 879 ) 880 for result in results: 881 issue = Issue.parse_response(self.allspice_client, result) 882 # adding data not contained in the issue response 883 # See Issue.request() 884 setattr(issue, "_repository", self) 885 Issue._add_read_property("repo", self, issue) 886 Issue._add_read_property("owner", self.owner, issue) 887 issues.append(issue) 888 return issues 889 890 def get_times(self): 891 results = self.allspice_client.requests_get( 892 Repository.REPO_TIMES % (self.owner.username, self.name) 893 ) 894 return results 895 896 def get_user_time(self, username) -> float: 897 if isinstance(username, User): 898 username = username.username 899 results = self.allspice_client.requests_get( 900 Repository.REPO_USER_TIME % (self.owner.username, self.name, username) 901 ) 902 time = sum(r["time"] for r in results) 903 return time 904 905 def get_full_name(self) -> str: 906 return self.owner.username + "/" + self.name 907 908 def create_issue(self, title, assignees=frozenset(), description="") -> ApiObject: 909 data = { 910 "assignees": assignees, 911 "body": description, 912 "closed": False, 913 "title": title, 914 } 915 result = self.allspice_client.requests_post( 916 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 917 data=data, 918 ) 919 920 issue = Issue.parse_response(self.allspice_client, result) 921 setattr(issue, "_repository", self) 922 Issue._add_read_property("repo", self, issue) 923 return issue 924 925 def create_design_review( 926 self, 927 title: str, 928 head: Union[Branch, str], 929 base: Union[Branch, str], 930 assignees: Optional[Set[Union[User, str]]] = None, 931 body: Optional[str] = None, 932 due_date: Optional[datetime] = None, 933 milestone: Optional["Milestone"] = None, 934 ) -> "DesignReview": 935 """ 936 Create a new Design Review. 937 938 See https://hub.allspice.io/api/swagger#/repository/repoCreatePullRequest 939 940 :param title: Title of the Design Review 941 :param head: Branch or name of the branch to merge into the base branch 942 :param base: Branch or name of the branch to merge into 943 :param assignees: Optional. A list of users to assign this review. List can be of 944 User objects or of usernames. 945 :param body: An Optional Description for the Design Review. 946 :param due_date: An Optional Due date for the Design Review. 947 :param milestone: An Optional Milestone for the Design Review 948 :return: The created Design Review 949 """ 950 951 data: dict[str, Any] = { 952 "title": title, 953 } 954 955 if isinstance(head, Branch): 956 data["head"] = head.name 957 else: 958 data["head"] = head 959 if isinstance(base, Branch): 960 data["base"] = base.name 961 else: 962 data["base"] = base 963 if assignees: 964 data["assignees"] = [a.username if isinstance(a, User) else a for a in assignees] 965 if body: 966 data["body"] = body 967 if due_date: 968 data["due_date"] = Util.format_time(due_date) 969 if milestone: 970 data["milestone"] = milestone.id 971 972 result = self.allspice_client.requests_post( 973 self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name), 974 data=data, 975 ) 976 977 return DesignReview.parse_response(self.allspice_client, result) 978 979 def create_milestone( 980 self, 981 title: str, 982 description: str, 983 due_date: Optional[str] = None, 984 state: str = "open", 985 ) -> "Milestone": 986 url = Repository.REPO_MILESTONES.format(owner=self.owner.username, repo=self.name) 987 data = {"title": title, "description": description, "state": state} 988 if due_date: 989 data["due_date"] = due_date 990 result = self.allspice_client.requests_post(url, data=data) 991 return Milestone.parse_response(self.allspice_client, result) 992 993 def create_gitea_hook(self, hook_url: str, events: List[str]): 994 url = f"/repos/{self.owner.username}/{self.name}/hooks" 995 data = { 996 "type": "gitea", 997 "config": {"content_type": "json", "url": hook_url}, 998 "events": events, 999 "active": True, 1000 } 1001 return self.allspice_client.requests_post(url, data=data) 1002 1003 def list_hooks(self): 1004 url = f"/repos/{self.owner.username}/{self.name}/hooks" 1005 return self.allspice_client.requests_get(url) 1006 1007 def delete_hook(self, id: str): 1008 url = f"/repos/{self.owner.username}/{self.name}/hooks/{id}" 1009 self.allspice_client.requests_delete(url) 1010 1011 def is_collaborator(self, username) -> bool: 1012 if isinstance(username, User): 1013 username = username.username 1014 try: 1015 # returns 204 if its ok, 404 if its not 1016 self.allspice_client.requests_get( 1017 Repository.REPO_IS_COLLABORATOR % (self.owner.username, self.name, username) 1018 ) 1019 return True 1020 except Exception: 1021 return False 1022 1023 def get_users_with_access(self) -> Sequence[User]: 1024 url = f"/repos/{self.owner.username}/{self.name}/collaborators" 1025 response = self.allspice_client.requests_get(url) 1026 collabs = [User.parse_response(self.allspice_client, user) for user in response] 1027 if isinstance(self.owner, User): 1028 return [*collabs, self.owner] 1029 else: 1030 # owner must be org 1031 teams = self.owner.get_teams() 1032 for team in teams: 1033 team_repos = team.get_repos() 1034 if self.name in [n.name for n in team_repos]: 1035 collabs += team.get_members() 1036 return collabs 1037 1038 def remove_collaborator(self, user_name: str): 1039 url = f"/repos/{self.owner.username}/{self.name}/collaborators/{user_name}" 1040 self.allspice_client.requests_delete(url) 1041 1042 def transfer_ownership( 1043 self, 1044 new_owner: Union[User, Organization], 1045 new_teams: Set[Team] | FrozenSet[Team] = frozenset(), 1046 ): 1047 url = Repository.REPO_TRANSFER.format(owner=self.owner.username, repo=self.name) 1048 data: dict[str, Any] = {"new_owner": new_owner.username} 1049 if isinstance(new_owner, Organization): 1050 new_team_ids = [team.id for team in new_teams if team in new_owner.get_teams()] 1051 data["team_ids"] = new_team_ids 1052 self.allspice_client.requests_post(url, data=data) 1053 # TODO: make sure this instance is either updated or discarded 1054 1055 def get_git_content( 1056 self, 1057 ref: Optional["Ref"] = None, 1058 commit: "Optional[Commit]" = None, 1059 ) -> List[Content]: 1060 """ 1061 Get the metadata for all files in the root directory. 1062 1063 https://hub.allspice.io/api/swagger#/repository/repoGetContentsList 1064 1065 :param ref: branch or commit to get content from 1066 :param commit: commit to get content from (deprecated) 1067 """ 1068 url = f"/repos/{self.owner.username}/{self.name}/contents" 1069 data = Util.data_params_for_ref(ref or commit) 1070 1071 result = [ 1072 Content.parse_response(self.allspice_client, f) 1073 for f in self.allspice_client.requests_get(url, data) 1074 ] 1075 return result 1076 1077 def get_tree(self, ref: Optional[Ref] = None, recursive: bool = False) -> List[GitEntry]: 1078 """ 1079 Get the repository's tree on a given ref. 1080 1081 By default, this will only return the top-level entries in the tree. If you want 1082 to get the entire tree, set `recursive` to True. 1083 1084 :param ref: The ref to get the tree from. If not provided, the default branch is used. 1085 :param recursive: Whether to get the entire tree or just the top-level entries. 1086 """ 1087 1088 ref = Util.data_params_for_ref(ref).get("ref", self.default_branch) 1089 url = self.REPO_GET_TREE.format(owner=self.owner.username, repo=self.name, ref=ref) 1090 params = {"recursive": recursive} 1091 results = self.allspice_client.requests_get_paginated(url, params=params) 1092 return [GitEntry.parse_response(self.allspice_client, result) for result in results] 1093 1094 def get_file_content( 1095 self, 1096 content: Content, 1097 ref: Optional[Ref] = None, 1098 commit: Optional[Commit] = None, 1099 ) -> Union[str, List["Content"]]: 1100 """https://hub.allspice.io/api/swagger#/repository/repoGetContents""" 1101 url = f"/repos/{self.owner.username}/{self.name}/contents/{content.path}" 1102 data = Util.data_params_for_ref(ref or commit) 1103 1104 if content.type == Content.FILE: 1105 return self.allspice_client.requests_get(url, data)["content"] 1106 else: 1107 return [ 1108 Content.parse_response(self.allspice_client, f) 1109 for f in self.allspice_client.requests_get(url, data) 1110 ] 1111 1112 def get_raw_file( 1113 self, 1114 file_path: str, 1115 ref: Optional[Ref] = None, 1116 ) -> bytes: 1117 """ 1118 Get the raw, binary data of a single file. 1119 1120 Note 1: if the file you are requesting is a text file, you might want to 1121 use .decode() on the result to get a string. For example: 1122 1123 content = repo.get_raw_file("file.txt").decode("utf-8") 1124 1125 Note 2: this method will store the entire file in memory. If you want 1126 to download a large file, you might want to use `download_to_file` 1127 instead. 1128 1129 See https://hub.allspice.io/api/swagger#/repository/repoGetRawFile 1130 1131 :param file_path: The path to the file to get. 1132 :param ref: The branch or commit to get the file from. If not provided, 1133 the default branch is used. 1134 """ 1135 1136 url = self.REPO_GET_RAW_FILE.format( 1137 owner=self.owner.username, 1138 repo=self.name, 1139 path=file_path, 1140 ) 1141 params = Util.data_params_for_ref(ref) 1142 return self.allspice_client.requests_get_raw(url, params=params) 1143 1144 def download_to_file( 1145 self, 1146 file_path: str, 1147 io: IO, 1148 ref: Optional[Ref] = None, 1149 ) -> None: 1150 """ 1151 Download the binary data of a file to a file-like object. 1152 1153 Example: 1154 1155 with open("schematic.DSN", "wb") as f: 1156 Repository.download_to_file("Schematics/my_schematic.DSN", f) 1157 1158 :param file_path: The path to the file in the repository from the root 1159 of the repository. 1160 :param io: The file-like object to write the data to. 1161 """ 1162 1163 url = self.allspice_client._AllSpice__get_url( 1164 self.REPO_GET_RAW_FILE.format( 1165 owner=self.owner.username, 1166 repo=self.name, 1167 path=file_path, 1168 ) 1169 ) 1170 params = Util.data_params_for_ref(ref) 1171 response = self.allspice_client.requests.get( 1172 url, 1173 params=params, 1174 headers=self.allspice_client.headers, 1175 stream=True, 1176 ) 1177 1178 for chunk in response.iter_content(chunk_size=4096): 1179 if chunk: 1180 io.write(chunk) 1181 1182 def get_generated_json(self, content: Union[Content, str], ref: Optional[Ref] = None) -> dict: 1183 """ 1184 Get the json blob for a cad file if it exists, otherwise enqueue 1185 a new job and return a 503 status. 1186 1187 WARNING: This is still experimental and not recommended for critical 1188 applications. The structure and content of the returned dictionary can 1189 change at any time. 1190 1191 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON 1192 """ 1193 1194 if isinstance(content, Content): 1195 content = content.path 1196 1197 url = self.REPO_GET_ALLSPICE_JSON.format( 1198 owner=self.owner.username, 1199 repo=self.name, 1200 content=content, 1201 ) 1202 data = Util.data_params_for_ref(ref) 1203 return self.allspice_client.requests_get(url, data) 1204 1205 def get_generated_svg(self, content: Union[Content, str], ref: Optional[Ref] = None) -> bytes: 1206 """ 1207 Get the svg blob for a cad file if it exists, otherwise enqueue 1208 a new job and return a 503 status. 1209 1210 WARNING: This is still experimental and not yet recommended for 1211 critical applications. The content of the returned svg can change 1212 at any time. 1213 1214 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG 1215 """ 1216 1217 if isinstance(content, Content): 1218 content = content.path 1219 1220 url = self.REPO_GET_ALLSPICE_SVG.format( 1221 owner=self.owner.username, 1222 repo=self.name, 1223 content=content, 1224 ) 1225 data = Util.data_params_for_ref(ref) 1226 return self.allspice_client.requests_get_raw(url, data) 1227 1228 def create_file(self, file_path: str, content: str, data: Optional[dict] = None): 1229 """https://hub.allspice.io/api/swagger#/repository/repoCreateFile""" 1230 if not data: 1231 data = {} 1232 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1233 data.update({"content": content}) 1234 return self.allspice_client.requests_post(url, data) 1235 1236 def change_file(self, file_path: str, file_sha: str, content: str, data: Optional[dict] = None): 1237 """https://hub.allspice.io/api/swagger#/repository/repoCreateFile""" 1238 if not data: 1239 data = {} 1240 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1241 data.update({"sha": file_sha, "content": content}) 1242 return self.allspice_client.requests_put(url, data) 1243 1244 def delete_file(self, file_path: str, file_sha: str, data: Optional[dict] = None): 1245 """https://hub.allspice.io/api/swagger#/repository/repoDeleteFile""" 1246 if not data: 1247 data = {} 1248 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1249 data.update({"sha": file_sha}) 1250 return self.allspice_client.requests_delete(url, data) 1251 1252 def get_archive( 1253 self, 1254 ref: Ref = "main", 1255 archive_format: ArchiveFormat = ArchiveFormat.ZIP, 1256 ) -> bytes: 1257 """ 1258 Download all the files in a specific ref of a repository as a zip or tarball 1259 archive. 1260 1261 https://hub.allspice.io/api/swagger#/repository/repoGetArchive 1262 1263 :param ref: branch or commit to get content from, defaults to the "main" branch 1264 :param archive_format: zip or tar, defaults to zip 1265 """ 1266 1267 ref_string = Util.data_params_for_ref(ref)["ref"] 1268 url = self.REPO_GET_ARCHIVE.format( 1269 owner=self.owner.username, 1270 repo=self.name, 1271 ref=ref_string, 1272 format=archive_format.value, 1273 ) 1274 return self.allspice_client.requests_get_raw(url) 1275 1276 def get_topics(self) -> list[str]: 1277 """ 1278 Gets the list of topics on this repository. 1279 1280 See http://localhost:3000/api/swagger#/repository/repoListTopics 1281 """ 1282 1283 url = self.REPO_GET_TOPICS.format( 1284 owner=self.owner.username, 1285 repo=self.name, 1286 ) 1287 return self.allspice_client.requests_get(url)["topics"] 1288 1289 def add_topic(self, topic: str): 1290 """ 1291 Adds a topic to the repository. 1292 1293 See https://hub.allspice.io/api/swagger#/repository/repoAddTopic 1294 1295 :param topic: The topic to add. Topic names must consist only of 1296 lowercase letters, numnbers and dashes (-), and cannot start with 1297 dashes. Topic names also must be under 35 characters long. 1298 """ 1299 1300 url = self.REPO_ADD_TOPIC.format(owner=self.owner.username, repo=self.name, topic=topic) 1301 self.allspice_client.requests_put(url) 1302 1303 def create_release( 1304 self, 1305 tag_name: str, 1306 name: Optional[str] = None, 1307 body: Optional[str] = None, 1308 draft: bool = False, 1309 ): 1310 """ 1311 Create a release for this repository. The release will be created for 1312 the tag with the given name. If there is no tag with this name, create 1313 the tag first. 1314 1315 See https://hub.allspice.io/api/swagger#/repository/repoCreateRelease 1316 """ 1317 1318 url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name) 1319 data = { 1320 "tag_name": tag_name, 1321 "draft": draft, 1322 } 1323 if name is not None: 1324 data["name"] = name 1325 if body is not None: 1326 data["body"] = body 1327 response = self.allspice_client.requests_post(url, data) 1328 return Release.parse_response(self.allspice_client, response, self) 1329 1330 def get_releases( 1331 self, draft: Optional[bool] = None, pre_release: Optional[bool] = None 1332 ) -> List[Release]: 1333 """ 1334 Get the list of releases for this repository. 1335 1336 See https://hub.allspice.io/api/swagger#/repository/repoListReleases 1337 """ 1338 1339 data = {} 1340 1341 if draft is not None: 1342 data["draft"] = draft 1343 if pre_release is not None: 1344 data["pre-release"] = pre_release 1345 1346 url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name) 1347 responses = self.allspice_client.requests_get_paginated(url, params=data) 1348 1349 return [ 1350 Release.parse_response(self.allspice_client, response, self) for response in responses 1351 ] 1352 1353 def get_latest_release(self) -> Release: 1354 """ 1355 Get the latest release for this repository. 1356 1357 See https://hub.allspice.io/api/swagger#/repository/repoGetLatestRelease 1358 """ 1359 1360 url = self.REPO_GET_LATEST_RELEASE.format(owner=self.owner.username, repo=self.name) 1361 response = self.allspice_client.requests_get(url) 1362 release = Release.parse_response(self.allspice_client, response, self) 1363 return release 1364 1365 def get_release_by_tag(self, tag: str) -> Release: 1366 """ 1367 Get a release by its tag. 1368 1369 See https://hub.allspice.io/api/swagger#/repository/repoGetReleaseByTag 1370 """ 1371 1372 url = self.REPO_GET_RELEASE_BY_TAG.format( 1373 owner=self.owner.username, repo=self.name, tag=tag 1374 ) 1375 response = self.allspice_client.requests_get(url) 1376 release = Release.parse_response(self.allspice_client, response, self) 1377 return release 1378 1379 def get_commit_statuses( 1380 self, 1381 commit: Union[str, Commit], 1382 sort: Optional[CommitStatusSort] = None, 1383 state: Optional[CommitStatusState] = None, 1384 ) -> List[CommitStatus]: 1385 """ 1386 Get a list of statuses for a commit. 1387 1388 This is roughly equivalent to the Commit.get_statuses method, but this 1389 method allows you to sort and filter commits and is more convenient if 1390 you have a commit SHA and don't need to get the commit itself. 1391 1392 See https://hub.allspice.io/api/swagger#/repository/repoListStatuses 1393 """ 1394 1395 if isinstance(commit, Commit): 1396 commit = commit.sha 1397 1398 params = {} 1399 if sort is not None: 1400 params["sort"] = sort.value 1401 if state is not None: 1402 params["state"] = state.value 1403 1404 url = self.REPO_GET_COMMIT_STATUS.format( 1405 owner=self.owner.username, repo=self.name, sha=commit 1406 ) 1407 response = self.allspice_client.requests_get_paginated(url, params=params) 1408 return [CommitStatus.parse_response(self.allspice_client, status) for status in response] 1409 1410 def create_commit_status( 1411 self, 1412 commit: Union[str, Commit], 1413 context: Optional[str] = None, 1414 description: Optional[str] = None, 1415 state: Optional[CommitStatusState] = None, 1416 target_url: Optional[str] = None, 1417 ) -> CommitStatus: 1418 """ 1419 Create a status on a commit. 1420 1421 See https://hub.allspice.io/api/swagger#/repository/repoCreateStatus 1422 """ 1423 1424 if isinstance(commit, Commit): 1425 commit = commit.sha 1426 1427 data = {} 1428 if context is not None: 1429 data["context"] = context 1430 if description is not None: 1431 data["description"] = description 1432 if state is not None: 1433 data["state"] = state.value 1434 if target_url is not None: 1435 data["target_url"] = target_url 1436 1437 url = self.REPO_GET_COMMIT_STATUS.format( 1438 owner=self.owner.username, repo=self.name, sha=commit 1439 ) 1440 response = self.allspice_client.requests_post(url, data=data) 1441 return CommitStatus.parse_response(self.allspice_client, response) 1442 1443 def delete(self): 1444 self.allspice_client.requests_delete( 1445 Repository.REPO_DELETE % (self.owner.username, self.name) 1446 ) 1447 self.deleted = True
616 @classmethod 617 def search( 618 cls, 619 allspice_client, 620 query: Optional[str] = None, 621 topic: bool = False, 622 include_description: bool = False, 623 user: Optional[User] = None, 624 owner_to_prioritize: Union[User, Organization, None] = None, 625 ) -> list[Repository]: 626 """ 627 Search for repositories. 628 629 See https://hub.allspice.io/api/swagger#/repository/repoSearch 630 631 :param query: The query string to search for 632 :param topic: If true, the query string will only be matched against the 633 repository's topic. 634 :param include_description: If true, the query string will be matched 635 against the repository's description as well. 636 :param user: If specified, only repositories that this user owns or 637 contributes to will be searched. 638 :param owner_to_prioritize: If specified, repositories owned by the 639 given entity will be prioritized in the search. 640 :returns: All repositories matching the query. If there are many 641 repositories matching this query, this may take some time. 642 """ 643 644 params = {} 645 646 if query is not None: 647 params["q"] = query 648 if topic: 649 params["topic"] = topic 650 if include_description: 651 params["include_description"] = include_description 652 if user is not None: 653 params["user"] = user.id 654 if owner_to_prioritize is not None: 655 params["owner_to_prioritize"] = owner_to_prioritize.id 656 657 responses = allspice_client.requests_get_paginated(cls.REPO_SEARCH, params=params) 658 659 return [Repository.parse_response(allspice_client, response) for response in responses]
Search for repositories.
See allspice.allspice.io/api/swagger#/repository/repoSearch">https://huballspice.allspice.io/api/swagger#/repository/repoSearch
Parameters
- query: The query string to search for
- topic: If true, the query string will only be matched against the repository's topic.
- include_description: If true, the query string will be matched against the repository's description as well.
- user: If specified, only repositories that this user owns or contributes to will be searched.
- owner_to_prioritize: If specified, repositories owned by the given entity will be prioritized in the search. :returns: All repositories matching the query. If there are many repositories matching this query, this may take some time.
695 def get_branches(self) -> List["Branch"]: 696 """Get all the Branches of this Repository.""" 697 698 results = self.allspice_client.requests_get_paginated( 699 Repository.REPO_BRANCHES % (self.owner.username, self.name) 700 ) 701 return [Branch.parse_response(self.allspice_client, result) for result in results]
Get all the Branches of this Repository.
703 def get_branch(self, name: str) -> "Branch": 704 """Get a specific Branch of this Repository.""" 705 result = self.allspice_client.requests_get( 706 Repository.REPO_BRANCH.format(owner=self.owner.username, repo=self.name, branch=name) 707 ) 708 return Branch.parse_response(self.allspice_client, result)
Get a specific Branch of this Repository.
710 def add_branch(self, create_from: Ref, newname: str) -> "Branch": 711 """Add a branch to the repository""" 712 # Note: will only work with gitea 1.13 or higher! 713 714 ref_name = Util.data_params_for_ref(create_from) 715 if "ref" not in ref_name: 716 raise ValueError("create_from must be a Branch, Commit or string") 717 ref_name = ref_name["ref"] 718 719 data = {"new_branch_name": newname, "old_ref_name": ref_name} 720 result = self.allspice_client.requests_post( 721 Repository.REPO_BRANCHES % (self.owner.username, self.name), data=data 722 ) 723 return Branch.parse_response(self.allspice_client, result)
Add a branch to the repository
725 def get_issues( 726 self, 727 state: Literal["open", "closed", "all"] = "all", 728 search_query: Optional[str] = None, 729 labels: Optional[List[str]] = None, 730 milestones: Optional[List[Union[Milestone, str]]] = None, 731 assignee: Optional[Union[User, str]] = None, 732 since: Optional[datetime] = None, 733 before: Optional[datetime] = None, 734 ) -> List["Issue"]: 735 """ 736 Get all Issues of this Repository (open and closed) 737 738 https://hub.allspice.io/api/swagger#/repository/repoListIssues 739 740 All params of this method are optional filters. If you don't specify a filter, it 741 will not be applied. 742 743 :param state: The state of the Issues to get. If None, all Issues are returned. 744 :param search_query: Filter issues by text. This is equivalent to searching for 745 `search_query` in the Issues on the web interface. 746 :param labels: Filter issues by labels. 747 :param milestones: Filter issues by milestones. 748 :param assignee: Filter issues by the assigned user. 749 :param since: Filter issues by the date they were created. 750 :param before: Filter issues by the date they were created. 751 :return: A list of Issues. 752 """ 753 754 data = { 755 "state": state, 756 } 757 if search_query: 758 data["q"] = search_query 759 if labels: 760 data["labels"] = ",".join(labels) 761 if milestones: 762 data["milestone"] = ",".join( 763 [ 764 milestone.name if isinstance(milestone, Milestone) else milestone 765 for milestone in milestones 766 ] 767 ) 768 if assignee: 769 if isinstance(assignee, User): 770 data["assignee"] = assignee.username 771 else: 772 data["assignee"] = assignee 773 if since: 774 data["since"] = Util.format_time(since) 775 if before: 776 data["before"] = Util.format_time(before) 777 778 results = self.allspice_client.requests_get_paginated( 779 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 780 params=data, 781 ) 782 783 issues = [] 784 for result in results: 785 issue = Issue.parse_response(self.allspice_client, result) 786 # See Issue.request 787 setattr(issue, "_repository", self) 788 # This is mostly for compatibility with an older implementation 789 Issue._add_read_property("repo", self, issue) 790 issues.append(issue) 791 792 return issues
Get all Issues of this Repository (open and closed)
allspice.allspice.io/api/swagger#/repository/repoListIssues">https://huballspice.allspice.io/api/swagger#/repository/repoListIssues
All params of this method are optional filters. If you don't specify a filter, it will not be applied.
Parameters
- state: The state of the Issues to get. If None, all Issues are returned.
- search_query: Filter issues by text. This is equivalent to searching for
search_query
in the Issues on the web interface. - labels: Filter issues by labels.
- milestones: Filter issues by milestones.
- assignee: Filter issues by the assigned user.
- since: Filter issues by the date they were created.
- before: Filter issues by the date they were created.
Returns
A list of Issues.
794 def get_design_reviews( 795 self, 796 state: Literal["open", "closed", "all"] = "all", 797 milestone: Optional[Union[Milestone, str]] = None, 798 labels: Optional[List[str]] = None, 799 ) -> List["DesignReview"]: 800 """ 801 Get all Design Reviews of this Repository. 802 803 https://hub.allspice.io/api/swagger#/repository/repoListPullRequests 804 805 :param state: The state of the Design Reviews to get. If None, all Design Reviews 806 are returned. 807 :param milestone: The milestone of the Design Reviews to get. 808 :param labels: A list of label IDs to filter DRs by. 809 :return: A list of Design Reviews. 810 """ 811 812 params = { 813 "state": state, 814 } 815 if milestone: 816 if isinstance(milestone, Milestone): 817 params["milestone"] = milestone.name 818 else: 819 params["milestone"] = milestone 820 if labels: 821 params["labels"] = ",".join(labels) 822 823 results = self.allspice_client.requests_get_paginated( 824 self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name), 825 params=params, 826 ) 827 return [DesignReview.parse_response(self.allspice_client, result) for result in results]
Get all Design Reviews of this Repository.
allspice.allspice.io/api/swagger#/repository/repoListPullRequests">https://huballspice.allspice.io/api/swagger#/repository/repoListPullRequests
Parameters
- state: The state of the Design Reviews to get. If None, all Design Reviews are returned.
- milestone: The milestone of the Design Reviews to get.
- labels: A list of label IDs to filter DRs by.
Returns
A list of Design Reviews.
829 def get_commits( 830 self, 831 sha: Optional[str] = None, 832 path: Optional[str] = None, 833 stat: bool = True, 834 ) -> List["Commit"]: 835 """ 836 Get all the Commits of this Repository. 837 838 https://hub.allspice.io/api/swagger#/repository/repoGetAllCommits 839 840 :param sha: The SHA of the commit to start listing commits from. 841 :param path: filepath of a file/dir. 842 :param stat: Include the number of additions and deletions in the response. 843 Disable for speedup. 844 :return: A list of Commits. 845 """ 846 847 data = {} 848 if sha: 849 data["sha"] = sha 850 if path: 851 data["path"] = path 852 if not stat: 853 data["stat"] = False 854 855 try: 856 results = self.allspice_client.requests_get_paginated( 857 Repository.REPO_COMMITS % (self.owner.username, self.name), 858 params=data, 859 ) 860 except ConflictException as err: 861 logging.warning(err) 862 logging.warning("Repository %s/%s is Empty" % (self.owner.username, self.name)) 863 results = [] 864 return [Commit.parse_response(self.allspice_client, result) for result in results]
Get all the Commits of this Repository.
allspice.allspice.io/api/swagger#/repository/repoGetAllCommits">https://huballspice.allspice.io/api/swagger#/repository/repoGetAllCommits
Parameters
- sha: The SHA of the commit to start listing commits from.
- path: filepath of a file/dir.
- stat: Include the number of additions and deletions in the response. Disable for speedup.
Returns
A list of Commits.
866 def get_issues_state(self, state) -> List["Issue"]: 867 """ 868 DEPRECATED: Use get_issues() instead. 869 870 Get issues of state Issue.open or Issue.closed of a repository. 871 """ 872 873 assert state in [Issue.OPENED, Issue.CLOSED] 874 issues = [] 875 data = {"state": state} 876 results = self.allspice_client.requests_get_paginated( 877 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 878 params=data, 879 ) 880 for result in results: 881 issue = Issue.parse_response(self.allspice_client, result) 882 # adding data not contained in the issue response 883 # See Issue.request() 884 setattr(issue, "_repository", self) 885 Issue._add_read_property("repo", self, issue) 886 Issue._add_read_property("owner", self.owner, issue) 887 issues.append(issue) 888 return issues
DEPRECATED: Use get_issues() instead.
Get issues of state Issue.open or Issue.closed of a repository.
896 def get_user_time(self, username) -> float: 897 if isinstance(username, User): 898 username = username.username 899 results = self.allspice_client.requests_get( 900 Repository.REPO_USER_TIME % (self.owner.username, self.name, username) 901 ) 902 time = sum(r["time"] for r in results) 903 return time
908 def create_issue(self, title, assignees=frozenset(), description="") -> ApiObject: 909 data = { 910 "assignees": assignees, 911 "body": description, 912 "closed": False, 913 "title": title, 914 } 915 result = self.allspice_client.requests_post( 916 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 917 data=data, 918 ) 919 920 issue = Issue.parse_response(self.allspice_client, result) 921 setattr(issue, "_repository", self) 922 Issue._add_read_property("repo", self, issue) 923 return issue
925 def create_design_review( 926 self, 927 title: str, 928 head: Union[Branch, str], 929 base: Union[Branch, str], 930 assignees: Optional[Set[Union[User, str]]] = None, 931 body: Optional[str] = None, 932 due_date: Optional[datetime] = None, 933 milestone: Optional["Milestone"] = None, 934 ) -> "DesignReview": 935 """ 936 Create a new Design Review. 937 938 See https://hub.allspice.io/api/swagger#/repository/repoCreatePullRequest 939 940 :param title: Title of the Design Review 941 :param head: Branch or name of the branch to merge into the base branch 942 :param base: Branch or name of the branch to merge into 943 :param assignees: Optional. A list of users to assign this review. List can be of 944 User objects or of usernames. 945 :param body: An Optional Description for the Design Review. 946 :param due_date: An Optional Due date for the Design Review. 947 :param milestone: An Optional Milestone for the Design Review 948 :return: The created Design Review 949 """ 950 951 data: dict[str, Any] = { 952 "title": title, 953 } 954 955 if isinstance(head, Branch): 956 data["head"] = head.name 957 else: 958 data["head"] = head 959 if isinstance(base, Branch): 960 data["base"] = base.name 961 else: 962 data["base"] = base 963 if assignees: 964 data["assignees"] = [a.username if isinstance(a, User) else a for a in assignees] 965 if body: 966 data["body"] = body 967 if due_date: 968 data["due_date"] = Util.format_time(due_date) 969 if milestone: 970 data["milestone"] = milestone.id 971 972 result = self.allspice_client.requests_post( 973 self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name), 974 data=data, 975 ) 976 977 return DesignReview.parse_response(self.allspice_client, result)
Create a new Design Review.
See allspice.allspice.io/api/swagger#/repository/repoCreatePullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoCreatePullRequest
Parameters
- title: Title of the Design Review
- head: Branch or name of the branch to merge into the base branch
- base: Branch or name of the branch to merge into
- assignees: Optional. A list of users to assign this review. List can be of User objects or of usernames.
- body: An Optional Description for the Design Review.
- due_date: An Optional Due date for the Design Review.
- milestone: An Optional Milestone for the Design Review
Returns
The created Design Review
979 def create_milestone( 980 self, 981 title: str, 982 description: str, 983 due_date: Optional[str] = None, 984 state: str = "open", 985 ) -> "Milestone": 986 url = Repository.REPO_MILESTONES.format(owner=self.owner.username, repo=self.name) 987 data = {"title": title, "description": description, "state": state} 988 if due_date: 989 data["due_date"] = due_date 990 result = self.allspice_client.requests_post(url, data=data) 991 return Milestone.parse_response(self.allspice_client, result)
993 def create_gitea_hook(self, hook_url: str, events: List[str]): 994 url = f"/repos/{self.owner.username}/{self.name}/hooks" 995 data = { 996 "type": "gitea", 997 "config": {"content_type": "json", "url": hook_url}, 998 "events": events, 999 "active": True, 1000 } 1001 return self.allspice_client.requests_post(url, data=data)
1011 def is_collaborator(self, username) -> bool: 1012 if isinstance(username, User): 1013 username = username.username 1014 try: 1015 # returns 204 if its ok, 404 if its not 1016 self.allspice_client.requests_get( 1017 Repository.REPO_IS_COLLABORATOR % (self.owner.username, self.name, username) 1018 ) 1019 return True 1020 except Exception: 1021 return False
1023 def get_users_with_access(self) -> Sequence[User]: 1024 url = f"/repos/{self.owner.username}/{self.name}/collaborators" 1025 response = self.allspice_client.requests_get(url) 1026 collabs = [User.parse_response(self.allspice_client, user) for user in response] 1027 if isinstance(self.owner, User): 1028 return [*collabs, self.owner] 1029 else: 1030 # owner must be org 1031 teams = self.owner.get_teams() 1032 for team in teams: 1033 team_repos = team.get_repos() 1034 if self.name in [n.name for n in team_repos]: 1035 collabs += team.get_members() 1036 return collabs
1042 def transfer_ownership( 1043 self, 1044 new_owner: Union[User, Organization], 1045 new_teams: Set[Team] | FrozenSet[Team] = frozenset(), 1046 ): 1047 url = Repository.REPO_TRANSFER.format(owner=self.owner.username, repo=self.name) 1048 data: dict[str, Any] = {"new_owner": new_owner.username} 1049 if isinstance(new_owner, Organization): 1050 new_team_ids = [team.id for team in new_teams if team in new_owner.get_teams()] 1051 data["team_ids"] = new_team_ids 1052 self.allspice_client.requests_post(url, data=data) 1053 # TODO: make sure this instance is either updated or discarded
1055 def get_git_content( 1056 self, 1057 ref: Optional["Ref"] = None, 1058 commit: "Optional[Commit]" = None, 1059 ) -> List[Content]: 1060 """ 1061 Get the metadata for all files in the root directory. 1062 1063 https://hub.allspice.io/api/swagger#/repository/repoGetContentsList 1064 1065 :param ref: branch or commit to get content from 1066 :param commit: commit to get content from (deprecated) 1067 """ 1068 url = f"/repos/{self.owner.username}/{self.name}/contents" 1069 data = Util.data_params_for_ref(ref or commit) 1070 1071 result = [ 1072 Content.parse_response(self.allspice_client, f) 1073 for f in self.allspice_client.requests_get(url, data) 1074 ] 1075 return result
Get the metadata for all files in the root directory.
allspice.allspice.io/api/swagger#/repository/repoGetContentsList">https://huballspice.allspice.io/api/swagger#/repository/repoGetContentsList
Parameters
- ref: branch or commit to get content from
- commit: commit to get content from (deprecated)
1077 def get_tree(self, ref: Optional[Ref] = None, recursive: bool = False) -> List[GitEntry]: 1078 """ 1079 Get the repository's tree on a given ref. 1080 1081 By default, this will only return the top-level entries in the tree. If you want 1082 to get the entire tree, set `recursive` to True. 1083 1084 :param ref: The ref to get the tree from. If not provided, the default branch is used. 1085 :param recursive: Whether to get the entire tree or just the top-level entries. 1086 """ 1087 1088 ref = Util.data_params_for_ref(ref).get("ref", self.default_branch) 1089 url = self.REPO_GET_TREE.format(owner=self.owner.username, repo=self.name, ref=ref) 1090 params = {"recursive": recursive} 1091 results = self.allspice_client.requests_get_paginated(url, params=params) 1092 return [GitEntry.parse_response(self.allspice_client, result) for result in results]
Get the repository's tree on a given ref.
By default, this will only return the top-level entries in the tree. If you want
to get the entire tree, set recursive
to True.
Parameters
- ref: The ref to get the tree from. If not provided, the default branch is used.
- recursive: Whether to get the entire tree or just the top-level entries.
1094 def get_file_content( 1095 self, 1096 content: Content, 1097 ref: Optional[Ref] = None, 1098 commit: Optional[Commit] = None, 1099 ) -> Union[str, List["Content"]]: 1100 """https://hub.allspice.io/api/swagger#/repository/repoGetContents""" 1101 url = f"/repos/{self.owner.username}/{self.name}/contents/{content.path}" 1102 data = Util.data_params_for_ref(ref or commit) 1103 1104 if content.type == Content.FILE: 1105 return self.allspice_client.requests_get(url, data)["content"] 1106 else: 1107 return [ 1108 Content.parse_response(self.allspice_client, f) 1109 for f in self.allspice_client.requests_get(url, data) 1110 ]
allspice.allspice.io/api/swagger#/repository/repoGetContents">https://huballspice.allspice.io/api/swagger#/repository/repoGetContents
1112 def get_raw_file( 1113 self, 1114 file_path: str, 1115 ref: Optional[Ref] = None, 1116 ) -> bytes: 1117 """ 1118 Get the raw, binary data of a single file. 1119 1120 Note 1: if the file you are requesting is a text file, you might want to 1121 use .decode() on the result to get a string. For example: 1122 1123 content = repo.get_raw_file("file.txt").decode("utf-8") 1124 1125 Note 2: this method will store the entire file in memory. If you want 1126 to download a large file, you might want to use `download_to_file` 1127 instead. 1128 1129 See https://hub.allspice.io/api/swagger#/repository/repoGetRawFile 1130 1131 :param file_path: The path to the file to get. 1132 :param ref: The branch or commit to get the file from. If not provided, 1133 the default branch is used. 1134 """ 1135 1136 url = self.REPO_GET_RAW_FILE.format( 1137 owner=self.owner.username, 1138 repo=self.name, 1139 path=file_path, 1140 ) 1141 params = Util.data_params_for_ref(ref) 1142 return self.allspice_client.requests_get_raw(url, params=params)
Get the raw, binary data of a single file.
Note 1: if the file you are requesting is a text file, you might want to use .decode() on the result to get a string. For example:
content = repo.get_raw_file("file.txt").decode("utf-8")
Note 2: this method will store the entire file in memory. If you want
to download a large file, you might want to use download_to_file
instead.
See allspice.allspice.io/api/swagger#/repository/repoGetRawFile">https://huballspice.allspice.io/api/swagger#/repository/repoGetRawFile
Parameters
- file_path: The path to the file to get.
- ref: The branch or commit to get the file from. If not provided, the default branch is used.
1144 def download_to_file( 1145 self, 1146 file_path: str, 1147 io: IO, 1148 ref: Optional[Ref] = None, 1149 ) -> None: 1150 """ 1151 Download the binary data of a file to a file-like object. 1152 1153 Example: 1154 1155 with open("schematic.DSN", "wb") as f: 1156 Repository.download_to_file("Schematics/my_schematic.DSN", f) 1157 1158 :param file_path: The path to the file in the repository from the root 1159 of the repository. 1160 :param io: The file-like object to write the data to. 1161 """ 1162 1163 url = self.allspice_client._AllSpice__get_url( 1164 self.REPO_GET_RAW_FILE.format( 1165 owner=self.owner.username, 1166 repo=self.name, 1167 path=file_path, 1168 ) 1169 ) 1170 params = Util.data_params_for_ref(ref) 1171 response = self.allspice_client.requests.get( 1172 url, 1173 params=params, 1174 headers=self.allspice_client.headers, 1175 stream=True, 1176 ) 1177 1178 for chunk in response.iter_content(chunk_size=4096): 1179 if chunk: 1180 io.write(chunk)
Download the binary data of a file to a file-like object.
Example:
with open("schematic.DSN", "wb") as f:
Repository.download_to_file("Schematics/my_schematic.DSN", f)
Parameters
- file_path: The path to the file in the repository from the root of the repository.
- io: The file-like object to write the data to.
1182 def get_generated_json(self, content: Union[Content, str], ref: Optional[Ref] = None) -> dict: 1183 """ 1184 Get the json blob for a cad file if it exists, otherwise enqueue 1185 a new job and return a 503 status. 1186 1187 WARNING: This is still experimental and not recommended for critical 1188 applications. The structure and content of the returned dictionary can 1189 change at any time. 1190 1191 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON 1192 """ 1193 1194 if isinstance(content, Content): 1195 content = content.path 1196 1197 url = self.REPO_GET_ALLSPICE_JSON.format( 1198 owner=self.owner.username, 1199 repo=self.name, 1200 content=content, 1201 ) 1202 data = Util.data_params_for_ref(ref) 1203 return self.allspice_client.requests_get(url, data)
Get the json blob for a cad file if it exists, otherwise enqueue a new job and return a 503 status.
WARNING: This is still experimental and not recommended for critical applications. The structure and content of the returned dictionary can change at any time.
See allspice.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON">https://huballspice.allspice.io/api/swagger#/repository/repoGetAllSpiceJSON
1205 def get_generated_svg(self, content: Union[Content, str], ref: Optional[Ref] = None) -> bytes: 1206 """ 1207 Get the svg blob for a cad file if it exists, otherwise enqueue 1208 a new job and return a 503 status. 1209 1210 WARNING: This is still experimental and not yet recommended for 1211 critical applications. The content of the returned svg can change 1212 at any time. 1213 1214 See https://hub.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG 1215 """ 1216 1217 if isinstance(content, Content): 1218 content = content.path 1219 1220 url = self.REPO_GET_ALLSPICE_SVG.format( 1221 owner=self.owner.username, 1222 repo=self.name, 1223 content=content, 1224 ) 1225 data = Util.data_params_for_ref(ref) 1226 return self.allspice_client.requests_get_raw(url, data)
Get the svg blob for a cad file if it exists, otherwise enqueue a new job and return a 503 status.
WARNING: This is still experimental and not yet recommended for critical applications. The content of the returned svg can change at any time.
See allspice.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG">https://huballspice.allspice.io/api/swagger#/repository/repoGetAllSpiceSVG
1228 def create_file(self, file_path: str, content: str, data: Optional[dict] = None): 1229 """https://hub.allspice.io/api/swagger#/repository/repoCreateFile""" 1230 if not data: 1231 data = {} 1232 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1233 data.update({"content": content}) 1234 return self.allspice_client.requests_post(url, data)
allspice.allspice.io/api/swagger#/repository/repoCreateFile">https://huballspice.allspice.io/api/swagger#/repository/repoCreateFile
1236 def change_file(self, file_path: str, file_sha: str, content: str, data: Optional[dict] = None): 1237 """https://hub.allspice.io/api/swagger#/repository/repoCreateFile""" 1238 if not data: 1239 data = {} 1240 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1241 data.update({"sha": file_sha, "content": content}) 1242 return self.allspice_client.requests_put(url, data)
allspice.allspice.io/api/swagger#/repository/repoCreateFile">https://huballspice.allspice.io/api/swagger#/repository/repoCreateFile
1244 def delete_file(self, file_path: str, file_sha: str, data: Optional[dict] = None): 1245 """https://hub.allspice.io/api/swagger#/repository/repoDeleteFile""" 1246 if not data: 1247 data = {} 1248 url = f"/repos/{self.owner.username}/{self.name}/contents/{file_path}" 1249 data.update({"sha": file_sha}) 1250 return self.allspice_client.requests_delete(url, data)
allspice.allspice.io/api/swagger#/repository/repoDeleteFile">https://huballspice.allspice.io/api/swagger#/repository/repoDeleteFile
1252 def get_archive( 1253 self, 1254 ref: Ref = "main", 1255 archive_format: ArchiveFormat = ArchiveFormat.ZIP, 1256 ) -> bytes: 1257 """ 1258 Download all the files in a specific ref of a repository as a zip or tarball 1259 archive. 1260 1261 https://hub.allspice.io/api/swagger#/repository/repoGetArchive 1262 1263 :param ref: branch or commit to get content from, defaults to the "main" branch 1264 :param archive_format: zip or tar, defaults to zip 1265 """ 1266 1267 ref_string = Util.data_params_for_ref(ref)["ref"] 1268 url = self.REPO_GET_ARCHIVE.format( 1269 owner=self.owner.username, 1270 repo=self.name, 1271 ref=ref_string, 1272 format=archive_format.value, 1273 ) 1274 return self.allspice_client.requests_get_raw(url)
Download all the files in a specific ref of a repository as a zip or tarball archive.
allspice.allspice.io/api/swagger#/repository/repoGetArchive">https://huballspice.allspice.io/api/swagger#/repository/repoGetArchive
Parameters
- ref: branch or commit to get content from, defaults to the "main" branch
- archive_format: zip or tar, defaults to zip
1276 def get_topics(self) -> list[str]: 1277 """ 1278 Gets the list of topics on this repository. 1279 1280 See http://localhost:3000/api/swagger#/repository/repoListTopics 1281 """ 1282 1283 url = self.REPO_GET_TOPICS.format( 1284 owner=self.owner.username, 1285 repo=self.name, 1286 ) 1287 return self.allspice_client.requests_get(url)["topics"]
Gets the list of topics on this repository.
See http://localhost:3000/api/swagger#/repository/repoListTopics
1289 def add_topic(self, topic: str): 1290 """ 1291 Adds a topic to the repository. 1292 1293 See https://hub.allspice.io/api/swagger#/repository/repoAddTopic 1294 1295 :param topic: The topic to add. Topic names must consist only of 1296 lowercase letters, numnbers and dashes (-), and cannot start with 1297 dashes. Topic names also must be under 35 characters long. 1298 """ 1299 1300 url = self.REPO_ADD_TOPIC.format(owner=self.owner.username, repo=self.name, topic=topic) 1301 self.allspice_client.requests_put(url)
Adds a topic to the repository.
See allspice.allspice.io/api/swagger#/repository/repoAddTopic">https://huballspice.allspice.io/api/swagger#/repository/repoAddTopic
Parameters
- topic: The topic to add. Topic names must consist only of lowercase letters, numnbers and dashes (-), and cannot start with dashes. Topic names also must be under 35 characters long.
1303 def create_release( 1304 self, 1305 tag_name: str, 1306 name: Optional[str] = None, 1307 body: Optional[str] = None, 1308 draft: bool = False, 1309 ): 1310 """ 1311 Create a release for this repository. The release will be created for 1312 the tag with the given name. If there is no tag with this name, create 1313 the tag first. 1314 1315 See https://hub.allspice.io/api/swagger#/repository/repoCreateRelease 1316 """ 1317 1318 url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name) 1319 data = { 1320 "tag_name": tag_name, 1321 "draft": draft, 1322 } 1323 if name is not None: 1324 data["name"] = name 1325 if body is not None: 1326 data["body"] = body 1327 response = self.allspice_client.requests_post(url, data) 1328 return Release.parse_response(self.allspice_client, response, self)
Create a release for this repository. The release will be created for the tag with the given name. If there is no tag with this name, create the tag first.
See allspice.allspice.io/api/swagger#/repository/repoCreateRelease">https://huballspice.allspice.io/api/swagger#/repository/repoCreateRelease
1330 def get_releases( 1331 self, draft: Optional[bool] = None, pre_release: Optional[bool] = None 1332 ) -> List[Release]: 1333 """ 1334 Get the list of releases for this repository. 1335 1336 See https://hub.allspice.io/api/swagger#/repository/repoListReleases 1337 """ 1338 1339 data = {} 1340 1341 if draft is not None: 1342 data["draft"] = draft 1343 if pre_release is not None: 1344 data["pre-release"] = pre_release 1345 1346 url = self.REPO_GET_RELEASES.format(owner=self.owner.username, repo=self.name) 1347 responses = self.allspice_client.requests_get_paginated(url, params=data) 1348 1349 return [ 1350 Release.parse_response(self.allspice_client, response, self) for response in responses 1351 ]
Get the list of releases for this repository.
See allspice.allspice.io/api/swagger#/repository/repoListReleases">https://huballspice.allspice.io/api/swagger#/repository/repoListReleases
1353 def get_latest_release(self) -> Release: 1354 """ 1355 Get the latest release for this repository. 1356 1357 See https://hub.allspice.io/api/swagger#/repository/repoGetLatestRelease 1358 """ 1359 1360 url = self.REPO_GET_LATEST_RELEASE.format(owner=self.owner.username, repo=self.name) 1361 response = self.allspice_client.requests_get(url) 1362 release = Release.parse_response(self.allspice_client, response, self) 1363 return release
Get the latest release for this repository.
See allspice.allspice.io/api/swagger#/repository/repoGetLatestRelease">https://huballspice.allspice.io/api/swagger#/repository/repoGetLatestRelease
1365 def get_release_by_tag(self, tag: str) -> Release: 1366 """ 1367 Get a release by its tag. 1368 1369 See https://hub.allspice.io/api/swagger#/repository/repoGetReleaseByTag 1370 """ 1371 1372 url = self.REPO_GET_RELEASE_BY_TAG.format( 1373 owner=self.owner.username, repo=self.name, tag=tag 1374 ) 1375 response = self.allspice_client.requests_get(url) 1376 release = Release.parse_response(self.allspice_client, response, self) 1377 return release
Get a release by its tag.
See allspice.allspice.io/api/swagger#/repository/repoGetReleaseByTag">https://huballspice.allspice.io/api/swagger#/repository/repoGetReleaseByTag
1379 def get_commit_statuses( 1380 self, 1381 commit: Union[str, Commit], 1382 sort: Optional[CommitStatusSort] = None, 1383 state: Optional[CommitStatusState] = None, 1384 ) -> List[CommitStatus]: 1385 """ 1386 Get a list of statuses for a commit. 1387 1388 This is roughly equivalent to the Commit.get_statuses method, but this 1389 method allows you to sort and filter commits and is more convenient if 1390 you have a commit SHA and don't need to get the commit itself. 1391 1392 See https://hub.allspice.io/api/swagger#/repository/repoListStatuses 1393 """ 1394 1395 if isinstance(commit, Commit): 1396 commit = commit.sha 1397 1398 params = {} 1399 if sort is not None: 1400 params["sort"] = sort.value 1401 if state is not None: 1402 params["state"] = state.value 1403 1404 url = self.REPO_GET_COMMIT_STATUS.format( 1405 owner=self.owner.username, repo=self.name, sha=commit 1406 ) 1407 response = self.allspice_client.requests_get_paginated(url, params=params) 1408 return [CommitStatus.parse_response(self.allspice_client, status) for status in response]
Get a list of statuses for a commit.
This is roughly equivalent to the Commit.get_statuses method, but this method allows you to sort and filter commits and is more convenient if you have a commit SHA and don't need to get the commit itself.
See allspice.allspice.io/api/swagger#/repository/repoListStatuses">https://huballspice.allspice.io/api/swagger#/repository/repoListStatuses
1410 def create_commit_status( 1411 self, 1412 commit: Union[str, Commit], 1413 context: Optional[str] = None, 1414 description: Optional[str] = None, 1415 state: Optional[CommitStatusState] = None, 1416 target_url: Optional[str] = None, 1417 ) -> CommitStatus: 1418 """ 1419 Create a status on a commit. 1420 1421 See https://hub.allspice.io/api/swagger#/repository/repoCreateStatus 1422 """ 1423 1424 if isinstance(commit, Commit): 1425 commit = commit.sha 1426 1427 data = {} 1428 if context is not None: 1429 data["context"] = context 1430 if description is not None: 1431 data["description"] = description 1432 if state is not None: 1433 data["state"] = state.value 1434 if target_url is not None: 1435 data["target_url"] = target_url 1436 1437 url = self.REPO_GET_COMMIT_STATUS.format( 1438 owner=self.owner.username, repo=self.name, sha=commit 1439 ) 1440 response = self.allspice_client.requests_post(url, data=data) 1441 return CommitStatus.parse_response(self.allspice_client, response)
Create a status on a commit.
See allspice.allspice.io/api/swagger#/repository/repoCreateStatus">https://huballspice.allspice.io/api/swagger#/repository/repoCreateStatus
567 class ArchiveFormat(Enum): 568 """ 569 Archive formats for Repository.get_archive 570 """ 571 572 TAR = "tar.gz" 573 ZIP = "zip"
Archive formats for Repository.get_archive
575 class CommitStatusSort(Enum): 576 """ 577 Sort order for Repository.get_commit_status 578 """ 579 580 OLDEST = "oldest" 581 RECENT_UPDATE = "recentupdate" 582 LEAST_UPDATE = "leastupdate" 583 LEAST_INDEX = "leastindex" 584 HIGHEST_INDEX = "highestindex"
Sort order for Repository.get_commit_status
2276class Team(ApiObject): 2277 can_create_org_repo: bool 2278 description: str 2279 id: int 2280 includes_all_repositories: bool 2281 name: str 2282 organization: Optional["Organization"] 2283 permission: str 2284 units: List[str] 2285 units_map: Dict[str, str] 2286 2287 API_OBJECT = """/teams/{id}""" # <id> 2288 ADD_REPO = """/teams/%s/repos/%s/%s""" # <id, org, repo> 2289 TEAM_DELETE = """/teams/%s""" # <id> 2290 GET_MEMBERS = """/teams/%s/members""" # <id> 2291 GET_REPOS = """/teams/%s/repos""" # <id> 2292 2293 def __init__(self, allspice_client): 2294 super().__init__(allspice_client) 2295 2296 def __eq__(self, other): 2297 if not isinstance(other, Team): 2298 return False 2299 return self.organization == other.organization and self.id == other.id 2300 2301 def __hash__(self): 2302 return hash(self.organization) ^ hash(self.id) 2303 2304 _fields_to_parsers: ClassVar[dict] = { 2305 "organization": lambda allspice_client, o: Organization.parse_response(allspice_client, o) 2306 } 2307 2308 _patchable_fields: ClassVar[set[str]] = { 2309 "can_create_org_repo", 2310 "description", 2311 "includes_all_repositories", 2312 "name", 2313 "permission", 2314 "units", 2315 "units_map", 2316 } 2317 2318 @classmethod 2319 def request(cls, allspice_client, id: int): 2320 return cls._request(allspice_client, {"id": id}) 2321 2322 def commit(self): 2323 args = {"id": self.id} 2324 self._commit(args) 2325 2326 def add_user(self, user: User): 2327 """https://hub.allspice.io/api/swagger#/organization/orgAddTeamMember""" 2328 url = f"/teams/{self.id}/members/{user.login}" 2329 self.allspice_client.requests_put(url) 2330 2331 def add_repo(self, org: Organization, repo: Union[Repository, str]): 2332 if isinstance(repo, Repository): 2333 repo_name = repo.name 2334 else: 2335 repo_name = repo 2336 self.allspice_client.requests_put(Team.ADD_REPO % (self.id, org.username, repo_name)) 2337 2338 def get_members(self): 2339 """Get all users assigned to the team.""" 2340 results = self.allspice_client.requests_get_paginated( 2341 Team.GET_MEMBERS % self.id, 2342 ) 2343 return [User.parse_response(self.allspice_client, result) for result in results] 2344 2345 def get_repos(self): 2346 """Get all repos of this Team.""" 2347 results = self.allspice_client.requests_get(Team.GET_REPOS % self.id) 2348 return [Repository.parse_response(self.allspice_client, result) for result in results] 2349 2350 def delete(self): 2351 self.allspice_client.requests_delete(Team.TEAM_DELETE % self.id) 2352 self.deleted = True 2353 2354 def remove_team_member(self, user_name: str): 2355 url = f"/teams/{self.id}/members/{user_name}" 2356 self.allspice_client.requests_delete(url)
2326 def add_user(self, user: User): 2327 """https://hub.allspice.io/api/swagger#/organization/orgAddTeamMember""" 2328 url = f"/teams/{self.id}/members/{user.login}" 2329 self.allspice_client.requests_put(url)
allspice.allspice.io/api/swagger#/organization/orgAddTeamMember">https://huballspice.allspice.io/api/swagger#/organization/orgAddTeamMember
2338 def get_members(self): 2339 """Get all users assigned to the team.""" 2340 results = self.allspice_client.requests_get_paginated( 2341 Team.GET_MEMBERS % self.id, 2342 ) 2343 return [User.parse_response(self.allspice_client, result) for result in results]
Get all users assigned to the team.
2345 def get_repos(self): 2346 """Get all repos of this Team.""" 2347 results = self.allspice_client.requests_get(Team.GET_REPOS % self.id) 2348 return [Repository.parse_response(self.allspice_client, result) for result in results]
Get all repos of this Team.
241class User(ApiObject): 242 active: bool 243 admin: Any 244 allow_create_organization: Any 245 allow_git_hook: Any 246 allow_import_local: Any 247 avatar_url: str 248 created: str 249 description: str 250 email: str 251 emails: List[Any] 252 followers_count: int 253 following_count: int 254 full_name: str 255 html_url: str 256 id: int 257 is_admin: bool 258 language: str 259 last_login: str 260 location: str 261 login: str 262 login_name: str 263 max_repo_creation: Any 264 must_change_password: Any 265 password: Any 266 prohibit_login: bool 267 restricted: bool 268 source_id: int 269 starred_repos_count: int 270 username: str 271 visibility: str 272 website: str 273 274 API_OBJECT = """/users/{name}""" # <org> 275 USER_MAIL = """/user/emails?sudo=%s""" # <name> 276 USER_PATCH = """/admin/users/%s""" # <username> 277 ADMIN_DELETE_USER = """/admin/users/%s""" # <username> 278 ADMIN_EDIT_USER = """/admin/users/{username}""" # <username> 279 USER_HEATMAP = """/users/%s/heatmap""" # <username> 280 281 def __init__(self, allspice_client): 282 super().__init__(allspice_client) 283 self._emails = [] 284 285 def __eq__(self, other): 286 if not isinstance(other, User): 287 return False 288 return self.allspice_client == other.allspice_client and self.id == other.id 289 290 def __hash__(self): 291 return hash(self.allspice_client) ^ hash(self.id) 292 293 @property 294 def emails(self): 295 self.__request_emails() 296 return self._emails 297 298 @classmethod 299 def request(cls, allspice_client, name: str) -> "User": 300 api_object = cls._request(allspice_client, {"name": name}) 301 return api_object 302 303 _patchable_fields: ClassVar[set[str]] = { 304 "active", 305 "admin", 306 "allow_create_organization", 307 "allow_git_hook", 308 "allow_import_local", 309 "email", 310 "full_name", 311 "location", 312 "login_name", 313 "max_repo_creation", 314 "must_change_password", 315 "password", 316 "prohibit_login", 317 "website", 318 } 319 320 def commit(self, login_name: str, source_id: int = 0): 321 """ 322 Unfortunately it is necessary to require the login name 323 as well as the login source (that is not supplied when getting a user) for 324 changing a user. 325 Usually source_id is 0 and the login_name is equal to the username. 326 """ 327 values = self.get_dirty_fields() 328 values.update( 329 # api-doc says that the "source_id" is necessary; works without though 330 {"login_name": login_name, "source_id": source_id} 331 ) 332 args = {"username": self.username} 333 self.allspice_client.requests_patch(User.ADMIN_EDIT_USER.format(**args), data=values) 334 self._dirty_fields = {} 335 336 def create_repo( 337 self, 338 repoName: str, 339 description: str = "", 340 private: bool = False, 341 autoInit=True, 342 gitignores: Optional[str] = None, 343 license: Optional[str] = None, 344 readme: str = "Default", 345 issue_labels: Optional[str] = None, 346 default_branch="master", 347 ): 348 """Create a user Repository 349 350 Throws: 351 AlreadyExistsException: If the Repository exists already. 352 Exception: If something else went wrong. 353 """ 354 result = self.allspice_client.requests_post( 355 "/user/repos", 356 data={ 357 "name": repoName, 358 "description": description, 359 "private": private, 360 "auto_init": autoInit, 361 "gitignores": gitignores, 362 "license": license, 363 "issue_labels": issue_labels, 364 "readme": readme, 365 "default_branch": default_branch, 366 }, 367 ) 368 if "id" in result: 369 self.allspice_client.logger.info("Successfully created Repository %s " % result["name"]) 370 else: 371 self.allspice_client.logger.error(result["message"]) 372 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 373 return Repository.parse_response(self.allspice_client, result) 374 375 def get_repositories(self) -> List["Repository"]: 376 """Get all Repositories owned by this User.""" 377 url = f"/users/{self.username}/repos" 378 results = self.allspice_client.requests_get_paginated(url) 379 return [Repository.parse_response(self.allspice_client, result) for result in results] 380 381 def get_orgs(self) -> List[Organization]: 382 """Get all Organizations this user is a member of.""" 383 url = f"/users/{self.username}/orgs" 384 results = self.allspice_client.requests_get_paginated(url) 385 return [Organization.parse_response(self.allspice_client, result) for result in results] 386 387 def get_teams(self) -> List["Team"]: 388 url = "/user/teams" 389 results = self.allspice_client.requests_get_paginated(url, sudo=self) 390 return [Team.parse_response(self.allspice_client, result) for result in results] 391 392 def get_accessible_repos(self) -> List["Repository"]: 393 """Get all Repositories accessible by the logged in User.""" 394 results = self.allspice_client.requests_get("/user/repos", sudo=self) 395 return [Repository.parse_response(self.allspice_client, result) for result in results] 396 397 def __request_emails(self): 398 result = self.allspice_client.requests_get(User.USER_MAIL % self.login) 399 # report if the adress changed by this 400 for mail in result: 401 self._emails.append(mail["email"]) 402 if mail["primary"]: 403 self._email = mail["email"] 404 405 def delete(self): 406 """Deletes this User. Also deletes all Repositories he owns.""" 407 self.allspice_client.requests_delete(User.ADMIN_DELETE_USER % self.username) 408 self.deleted = True 409 410 def get_heatmap(self) -> List[Tuple[datetime, int]]: 411 results = self.allspice_client.requests_get(User.USER_HEATMAP % self.username) 412 results = [ 413 (datetime.fromtimestamp(result["timestamp"]), result["contributions"]) 414 for result in results 415 ] 416 return results
320 def commit(self, login_name: str, source_id: int = 0): 321 """ 322 Unfortunately it is necessary to require the login name 323 as well as the login source (that is not supplied when getting a user) for 324 changing a user. 325 Usually source_id is 0 and the login_name is equal to the username. 326 """ 327 values = self.get_dirty_fields() 328 values.update( 329 # api-doc says that the "source_id" is necessary; works without though 330 {"login_name": login_name, "source_id": source_id} 331 ) 332 args = {"username": self.username} 333 self.allspice_client.requests_patch(User.ADMIN_EDIT_USER.format(**args), data=values) 334 self._dirty_fields = {}
Unfortunately it is necessary to require the login name as well as the login source (that is not supplied when getting a user) for changing a user. Usually source_id is 0 and the login_name is equal to the username.
336 def create_repo( 337 self, 338 repoName: str, 339 description: str = "", 340 private: bool = False, 341 autoInit=True, 342 gitignores: Optional[str] = None, 343 license: Optional[str] = None, 344 readme: str = "Default", 345 issue_labels: Optional[str] = None, 346 default_branch="master", 347 ): 348 """Create a user Repository 349 350 Throws: 351 AlreadyExistsException: If the Repository exists already. 352 Exception: If something else went wrong. 353 """ 354 result = self.allspice_client.requests_post( 355 "/user/repos", 356 data={ 357 "name": repoName, 358 "description": description, 359 "private": private, 360 "auto_init": autoInit, 361 "gitignores": gitignores, 362 "license": license, 363 "issue_labels": issue_labels, 364 "readme": readme, 365 "default_branch": default_branch, 366 }, 367 ) 368 if "id" in result: 369 self.allspice_client.logger.info("Successfully created Repository %s " % result["name"]) 370 else: 371 self.allspice_client.logger.error(result["message"]) 372 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 373 return Repository.parse_response(self.allspice_client, result)
Create a user Repository
Throws: AlreadyExistsException: If the Repository exists already. Exception: If something else went wrong.
375 def get_repositories(self) -> List["Repository"]: 376 """Get all Repositories owned by this User.""" 377 url = f"/users/{self.username}/repos" 378 results = self.allspice_client.requests_get_paginated(url) 379 return [Repository.parse_response(self.allspice_client, result) for result in results]
Get all Repositories owned by this User.
381 def get_orgs(self) -> List[Organization]: 382 """Get all Organizations this user is a member of.""" 383 url = f"/users/{self.username}/orgs" 384 results = self.allspice_client.requests_get_paginated(url) 385 return [Organization.parse_response(self.allspice_client, result) for result in results]
Get all Organizations this user is a member of.
392 def get_accessible_repos(self) -> List["Repository"]: 393 """Get all Repositories accessible by the logged in User.""" 394 results = self.allspice_client.requests_get("/user/repos", sudo=self) 395 return [Repository.parse_response(self.allspice_client, result) for result in results]
Get all Repositories accessible by the logged in User.
405 def delete(self): 406 """Deletes this User. Also deletes all Repositories he owns.""" 407 self.allspice_client.requests_delete(User.ADMIN_DELETE_USER % self.username) 408 self.deleted = True
Deletes this User. Also deletes all Repositories he owns.