allspice
A very simple API client for AllSpice Hub
Note that not the full Swagger-API is accessible. The whole implementation is focused on making access and working with Organizations, Teams, Repositories and Users as pain free as possible.
Forked from https://github.com/Langenfeld/py-gitea.
Usage
Docs
See the documentation site.
Examples
Check the examples directory for full, working example scripts that you can adapt or refer to for your own needs.
Quickstart
First get an allspice_client
object wrapping access and authentication (via an api token) for your instance of AllSpice Hub.
from allspice import *
# By default, points to hub.allspice.io.
allspice_client = AllSpice(token_text=TOKEN)
# If you are self-hosting:
allspice_client = AllSpice(allspice_hub_url=URL, token_text=TOKEN)
Operations like requesting the AllSpice version or authentication user can be requested directly from the allspice_client
object:
print("AllSpice Version: " + allspice_client.get_version())
print("API-Token belongs to user: " + allspice_client.get_user().username)
Adding entities like Users, Organizations, ... also is done via the allspice_client object.
user = allspice_client.create_user("Test Testson", "test@test.test", "password")
All operations on entities in allspice are then accomplished via the according wrapper objects for those entities.
Each of those objects has a .request
method that creates an entity according to your allspice_client instance.
other_user = User.request(allspice_client, "OtherUserName")
print(other_user.username)
Note that the fields of the User, Organization,... classes are dynamically created at runtime, and thus not visible during divelopment. Refer to the AllSpice API documentation for the fields names.
Fields that can not be altered via allspice-api, are read only. After altering a field, the .commit
method of the according object must be called to synchronize the changed fields with your allspice_client instance.
org = Organization.request(allspice_client, test_org)
org.description = "some new description"
org.location = "some new location"
org.commit()
An entity in allspice can be deleted by calling delete.
org.delete()
All entity objects do have methods to execute some of the requests possible though the AllSpice api:
org = Organization.request(allspice_client, ORGNAME)
teams = org.get_teams()
for team in teams:
repos = team.get_repos()
for repo in repos:
print(repo.name)
1""" 2.. include:: ../README.md 3 :start-line: 1 4 :end-before: Installation 5""" 6 7from .allspice import ( 8 AllSpice, 9) 10from .apiobject import ( 11 Branch, 12 Comment, 13 Commit, 14 Content, 15 DesignReview, 16 Issue, 17 Milestone, 18 Organization, 19 Release, 20 Repository, 21 Team, 22 User, 23) 24from .exceptions import AlreadyExistsException, NotFoundException 25 26__version__ = "3.7.0" 27 28__all__ = [ 29 "AllSpice", 30 "AlreadyExistsException", 31 "Branch", 32 "Comment", 33 "Commit", 34 "Content", 35 "DesignReview", 36 "Issue", 37 "Milestone", 38 "NotFoundException", 39 "Organization", 40 "Release", 41 "Repository", 42 "Team", 43 "User", 44]
20class AllSpice: 21 """Object to establish a session with AllSpice Hub.""" 22 23 ADMIN_CREATE_USER = """/admin/users""" 24 GET_USERS_ADMIN = """/admin/users""" 25 ADMIN_REPO_CREATE = """/admin/users/%s/repos""" # <ownername> 26 ALLSPICE_HUB_VERSION = """/version""" 27 GET_USER = """/user""" 28 GET_REPOSITORY = """/repos/{owner}/{name}""" 29 CREATE_ORG = """/admin/users/%s/orgs""" # <username> 30 CREATE_TEAM = """/orgs/%s/teams""" # <orgname> 31 32 def __init__( 33 self, 34 allspice_hub_url="https://hub.allspice.io", 35 token_text=None, 36 auth=None, 37 verify=True, 38 log_level="INFO", 39 ratelimiting=(100, 60), 40 ): 41 """Initializing an instance of the AllSpice Hub Client 42 43 Args: 44 allspice_hub_url (str): The URL for the AllSpice Hub instance. 45 Defaults to `https://hub.allspice.io`. 46 47 token_text (str, None): The access token, by default None. 48 49 auth (tuple, None): The user credentials 50 `(username, password)`, by default None. 51 52 verify (bool): If True, allow insecure server connections 53 when using SSL. 54 55 log_level (str): The log level, by default `INFO`. 56 57 ratelimiting (tuple[int, int], None): `(max_calls, period)`, 58 If None, no rate limiting is applied. By default, 100 calls 59 per minute are allowed. 60 """ 61 62 self.logger = logging.getLogger(__name__) 63 self.logger.setLevel(log_level) 64 self.headers = { 65 "Content-type": "application/json", 66 } 67 self.url = allspice_hub_url 68 69 if ratelimiting is None: 70 self.requests = requests.Session() 71 else: 72 (max_calls, period) = ratelimiting 73 self.requests = RateLimitedSession(max_calls=max_calls, period=period) 74 75 # Manage authentification 76 if not token_text and not auth: 77 raise ValueError("Please provide auth or token_text, but not both") 78 if token_text: 79 self.headers["Authorization"] = "token " + token_text 80 if auth: 81 self.logger.warning( 82 "Using basic auth is not recommended. Prefer using a token instead." 83 ) 84 self.requests.auth = auth 85 86 # Manage SSL certification verification 87 self.requests.verify = verify 88 if not verify: 89 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) 90 91 def __get_url(self, endpoint): 92 url = self.url + "/api/v1" + endpoint 93 self.logger.debug("Url: %s" % url) 94 return url 95 96 def __get(self, endpoint: str, params: Mapping = frozendict()) -> requests.Response: 97 request = self.requests.get(self.__get_url(endpoint), headers=self.headers, params=params) 98 if request.status_code not in [200, 201]: 99 message = f"Received status code: {request.status_code} ({request.url})" 100 if request.status_code in [404]: 101 raise NotFoundException(message) 102 if request.status_code in [403]: 103 raise Exception( 104 f"Unauthorized: {request.url} - Check your permissions and try again! ({message})" 105 ) 106 if request.status_code in [409]: 107 raise ConflictException(message) 108 if request.status_code in [503]: 109 raise NotYetGeneratedException(message) 110 raise Exception(message) 111 return request 112 113 @staticmethod 114 def parse_result(result) -> Dict: 115 """Parses the result-JSON to a dict.""" 116 if result.text and len(result.text) > 3: 117 return json.loads(result.text) 118 return {} 119 120 def requests_get(self, endpoint: str, params: Mapping = frozendict(), sudo=None): 121 combined_params = {} 122 combined_params.update(params) 123 if sudo: 124 combined_params["sudo"] = sudo.username 125 return self.parse_result(self.__get(endpoint, combined_params)) 126 127 def requests_get_raw(self, endpoint: str, params=frozendict(), sudo=None) -> bytes: 128 combined_params = {} 129 combined_params.update(params) 130 if sudo: 131 combined_params["sudo"] = sudo.username 132 return self.__get(endpoint, combined_params).content 133 134 def requests_get_paginated( 135 self, 136 endpoint: str, 137 params=frozendict(), 138 sudo=None, 139 page_key: str = "page", 140 first_page: int = 1, 141 ): 142 page = first_page 143 combined_params = {} 144 combined_params.update(params) 145 aggregated_result = [] 146 while True: 147 combined_params[page_key] = page 148 result = self.requests_get(endpoint, combined_params, sudo) 149 150 if not result: 151 return aggregated_result 152 153 if isinstance(result, dict): 154 if "data" in result: 155 data = result["data"] 156 if len(data) == 0: 157 return aggregated_result 158 aggregated_result.extend(data) 159 elif "tree" in result: 160 data = result["tree"] 161 if data is None or len(data) == 0: 162 return aggregated_result 163 aggregated_result.extend(data) 164 else: 165 raise NotImplementedError( 166 "requests_get_paginated does not know how to handle responses of this type." 167 ) 168 else: 169 aggregated_result.extend(result) 170 171 page += 1 172 173 def requests_put(self, endpoint: str, data: Optional[dict] = None): 174 if not data: 175 data = {} 176 request = self.requests.put( 177 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 178 ) 179 if request.status_code not in [200, 204]: 180 message = f"Received status code: {request.status_code} ({request.url}) {request.text}" 181 self.logger.error(message) 182 raise Exception(message) 183 184 def requests_delete(self, endpoint: str, data: Optional[dict] = None): 185 request = self.requests.delete( 186 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 187 ) 188 if request.status_code not in [200, 204]: 189 message = f"Received status code: {request.status_code} ({request.url})" 190 self.logger.error(message) 191 raise Exception(message) 192 193 def requests_post( 194 self, 195 endpoint: str, 196 data: Optional[dict] = None, 197 params: Optional[dict] = None, 198 files: Optional[dict] = None, 199 ): 200 """ 201 Make a POST call to the endpoint. 202 203 :param endpoint: The path to the endpoint 204 :param data: A dictionary for JSON data 205 :param params: A dictionary of query params 206 :param files: A dictionary of files, see requests.post. Using both files and data 207 can lead to unexpected results! 208 :return: The JSON response parsed as a dict 209 """ 210 211 # This should ideally be a TypedDict of the type of arguments taken by 212 # `requests.post`. 213 args: dict[str, Any] = { 214 "headers": self.headers.copy(), 215 } 216 if data is not None: 217 args["data"] = json.dumps(data) 218 if params is not None: 219 args["params"] = params 220 if files is not None: 221 args["headers"].pop("Content-type") 222 args["files"] = files 223 224 request = self.requests.post(self.__get_url(endpoint), **args) 225 226 if request.status_code not in [200, 201, 202]: 227 if "already exists" in request.text or "e-mail already in use" in request.text: 228 self.logger.warning(request.text) 229 raise AlreadyExistsException() 230 self.logger.error(f"Received status code: {request.status_code} ({request.url})") 231 self.logger.error(f"With info: {data} ({self.headers})") 232 self.logger.error(f"Answer: {request.text}") 233 raise Exception( 234 f"Received status code: {request.status_code} ({request.url}), {request.text}" 235 ) 236 return self.parse_result(request) 237 238 def requests_patch(self, endpoint: str, data: dict): 239 request = self.requests.patch( 240 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 241 ) 242 if request.status_code not in [200, 201]: 243 error_message = f"Received status code: {request.status_code} ({request.url}) {data}" 244 self.logger.error(error_message) 245 raise Exception(error_message) 246 return self.parse_result(request) 247 248 def get_orgs_public_members_all(self, orgname): 249 path = "/orgs/" + orgname + "/public_members" 250 return self.requests_get(path) 251 252 def get_orgs(self): 253 path = "/admin/orgs" 254 results = self.requests_get(path) 255 return [Organization.parse_response(self, result) for result in results] 256 257 def get_user(self): 258 result = self.requests_get(AllSpice.GET_USER) 259 return User.parse_response(self, result) 260 261 def get_version(self) -> str: 262 result = self.requests_get(AllSpice.ALLSPICE_HUB_VERSION) 263 return result["version"] 264 265 def get_users(self) -> List[User]: 266 results = self.requests_get(AllSpice.GET_USERS_ADMIN) 267 return [User.parse_response(self, result) for result in results] 268 269 def get_user_by_email(self, email: str) -> Optional[User]: 270 users = self.get_users() 271 for user in users: 272 if user.email == email or email in user.emails: 273 return user 274 return None 275 276 def get_user_by_name(self, username: str) -> Optional[User]: 277 users = self.get_users() 278 for user in users: 279 if user.username == username: 280 return user 281 return None 282 283 def get_repository(self, owner: str, name: str) -> Repository: 284 path = self.GET_REPOSITORY.format(owner=owner, name=name) 285 result = self.requests_get(path) 286 return Repository.parse_response(self, result) 287 288 def create_user( 289 self, 290 user_name: str, 291 email: str, 292 password: str, 293 full_name: Optional[str] = None, 294 login_name: Optional[str] = None, 295 change_pw=True, 296 send_notify=True, 297 source_id=0, 298 ): 299 """Create User. 300 Throws: 301 AlreadyExistsException, if the User exists already 302 Exception, if something else went wrong. 303 """ 304 if not login_name: 305 login_name = user_name 306 if not full_name: 307 full_name = user_name 308 request_data = { 309 "source_id": source_id, 310 "login_name": login_name, 311 "full_name": full_name, 312 "username": user_name, 313 "email": email, 314 "password": password, 315 "send_notify": send_notify, 316 "must_change_password": change_pw, 317 } 318 319 self.logger.debug("Gitea post payload: %s", request_data) 320 result = self.requests_post(AllSpice.ADMIN_CREATE_USER, data=request_data) 321 if "id" in result: 322 self.logger.info( 323 "Successfully created User %s <%s> (id %s)", 324 result["login"], 325 result["email"], 326 result["id"], 327 ) 328 self.logger.debug("Gitea response: %s", result) 329 else: 330 self.logger.error(result["message"]) 331 raise Exception("User not created... (gitea: %s)" % result["message"]) 332 user = User.parse_response(self, result) 333 return user 334 335 def create_repo( 336 self, 337 repoOwner: Union[User, Organization], 338 repoName: str, 339 description: str = "", 340 private: bool = False, 341 autoInit=True, 342 gitignores: Optional[str] = None, 343 license: Optional[str] = None, 344 readme: str = "Default", 345 issue_labels: Optional[str] = None, 346 default_branch="master", 347 ): 348 """Create a Repository as the administrator 349 350 Throws: 351 AlreadyExistsException: If the Repository exists already. 352 Exception: If something else went wrong. 353 354 Note: 355 Non-admin users can not use this method. Please use instead 356 `allspice.User.create_repo` or `allspice.Organization.create_repo`. 357 """ 358 # although this only says user in the api, this also works for 359 # organizations 360 assert isinstance(repoOwner, User) or isinstance(repoOwner, Organization) 361 result = self.requests_post( 362 AllSpice.ADMIN_REPO_CREATE % repoOwner.username, 363 data={ 364 "name": repoName, 365 "description": description, 366 "private": private, 367 "auto_init": autoInit, 368 "gitignores": gitignores, 369 "license": license, 370 "issue_labels": issue_labels, 371 "readme": readme, 372 "default_branch": default_branch, 373 }, 374 ) 375 if "id" in result: 376 self.logger.info("Successfully created Repository %s " % result["name"]) 377 else: 378 self.logger.error(result["message"]) 379 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 380 return Repository.parse_response(self, result) 381 382 def create_org( 383 self, 384 owner: User, 385 orgName: str, 386 description: str, 387 location="", 388 website="", 389 full_name="", 390 ): 391 assert isinstance(owner, User) 392 result = self.requests_post( 393 AllSpice.CREATE_ORG % owner.username, 394 data={ 395 "username": orgName, 396 "description": description, 397 "location": location, 398 "website": website, 399 "full_name": full_name, 400 }, 401 ) 402 if "id" in result: 403 self.logger.info("Successfully created Organization %s" % result["username"]) 404 else: 405 self.logger.error("Organization not created... (gitea: %s)" % result["message"]) 406 self.logger.error(result["message"]) 407 raise Exception("Organization not created... (gitea: %s)" % result["message"]) 408 return Organization.parse_response(self, result) 409 410 def create_team( 411 self, 412 org: Organization, 413 name: str, 414 description: str = "", 415 permission: str = "read", 416 can_create_org_repo: bool = False, 417 includes_all_repositories: bool = False, 418 units=( 419 "repo.code", 420 "repo.issues", 421 "repo.ext_issues", 422 "repo.wiki", 423 "repo.pulls", 424 "repo.releases", 425 "repo.ext_wiki", 426 ), 427 units_map={}, 428 ): 429 """Creates a Team. 430 431 Args: 432 org (Organization): Organization the Team will be part of. 433 name (str): The Name of the Team to be created. 434 description (str): Optional, None, short description of the new Team. 435 permission (str): Optional, 'read', What permissions the members 436 units_map (dict): Optional, {}, a mapping of units to their 437 permissions. If None or empty, the `permission` permission will 438 be applied to all units. Note: When both `units` and `units_map` 439 are given, `units_map` will be preferred. 440 """ 441 442 result = self.requests_post( 443 AllSpice.CREATE_TEAM % org.username, 444 data={ 445 "name": name, 446 "description": description, 447 "permission": permission, 448 "can_create_org_repo": can_create_org_repo, 449 "includes_all_repositories": includes_all_repositories, 450 "units": units, 451 "units_map": units_map, 452 }, 453 ) 454 455 if "id" in result: 456 self.logger.info("Successfully created Team %s" % result["name"]) 457 else: 458 self.logger.error("Team not created... (gitea: %s)" % result["message"]) 459 self.logger.error(result["message"]) 460 raise Exception("Team not created... (gitea: %s)" % result["message"]) 461 api_object = Team.parse_response(self, result) 462 setattr( 463 api_object, "_organization", org 464 ) # fixes strange behaviour of gitea not returning a valid organization here. 465 return api_object
Object to establish a session with AllSpice Hub.
32 def __init__( 33 self, 34 allspice_hub_url="https://hub.allspice.io", 35 token_text=None, 36 auth=None, 37 verify=True, 38 log_level="INFO", 39 ratelimiting=(100, 60), 40 ): 41 """Initializing an instance of the AllSpice Hub Client 42 43 Args: 44 allspice_hub_url (str): The URL for the AllSpice Hub instance. 45 Defaults to `https://hub.allspice.io`. 46 47 token_text (str, None): The access token, by default None. 48 49 auth (tuple, None): The user credentials 50 `(username, password)`, by default None. 51 52 verify (bool): If True, allow insecure server connections 53 when using SSL. 54 55 log_level (str): The log level, by default `INFO`. 56 57 ratelimiting (tuple[int, int], None): `(max_calls, period)`, 58 If None, no rate limiting is applied. By default, 100 calls 59 per minute are allowed. 60 """ 61 62 self.logger = logging.getLogger(__name__) 63 self.logger.setLevel(log_level) 64 self.headers = { 65 "Content-type": "application/json", 66 } 67 self.url = allspice_hub_url 68 69 if ratelimiting is None: 70 self.requests = requests.Session() 71 else: 72 (max_calls, period) = ratelimiting 73 self.requests = RateLimitedSession(max_calls=max_calls, period=period) 74 75 # Manage authentification 76 if not token_text and not auth: 77 raise ValueError("Please provide auth or token_text, but not both") 78 if token_text: 79 self.headers["Authorization"] = "token " + token_text 80 if auth: 81 self.logger.warning( 82 "Using basic auth is not recommended. Prefer using a token instead." 83 ) 84 self.requests.auth = auth 85 86 # Manage SSL certification verification 87 self.requests.verify = verify 88 if not verify: 89 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
Initializing an instance of the AllSpice Hub Client
Args:
allspice_hub_url (str): The URL for the AllSpice Hub instance.
Defaults to https://hub.allspice.io
.
token_text (str, None): The access token, by default None.
auth (tuple, None): The user credentials
`(username, password)`, by default None.
verify (bool): If True, allow insecure server connections
when using SSL.
log_level (str): The log level, by default `INFO`.
ratelimiting (tuple[int, int], None): `(max_calls, period)`,
If None, no rate limiting is applied. By default, 100 calls
per minute are allowed.
113 @staticmethod 114 def parse_result(result) -> Dict: 115 """Parses the result-JSON to a dict.""" 116 if result.text and len(result.text) > 3: 117 return json.loads(result.text) 118 return {}
Parses the result-JSON to a dict.
134 def requests_get_paginated( 135 self, 136 endpoint: str, 137 params=frozendict(), 138 sudo=None, 139 page_key: str = "page", 140 first_page: int = 1, 141 ): 142 page = first_page 143 combined_params = {} 144 combined_params.update(params) 145 aggregated_result = [] 146 while True: 147 combined_params[page_key] = page 148 result = self.requests_get(endpoint, combined_params, sudo) 149 150 if not result: 151 return aggregated_result 152 153 if isinstance(result, dict): 154 if "data" in result: 155 data = result["data"] 156 if len(data) == 0: 157 return aggregated_result 158 aggregated_result.extend(data) 159 elif "tree" in result: 160 data = result["tree"] 161 if data is None or len(data) == 0: 162 return aggregated_result 163 aggregated_result.extend(data) 164 else: 165 raise NotImplementedError( 166 "requests_get_paginated does not know how to handle responses of this type." 167 ) 168 else: 169 aggregated_result.extend(result) 170 171 page += 1
173 def requests_put(self, endpoint: str, data: Optional[dict] = None): 174 if not data: 175 data = {} 176 request = self.requests.put( 177 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 178 ) 179 if request.status_code not in [200, 204]: 180 message = f"Received status code: {request.status_code} ({request.url}) {request.text}" 181 self.logger.error(message) 182 raise Exception(message)
184 def requests_delete(self, endpoint: str, data: Optional[dict] = None): 185 request = self.requests.delete( 186 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 187 ) 188 if request.status_code not in [200, 204]: 189 message = f"Received status code: {request.status_code} ({request.url})" 190 self.logger.error(message) 191 raise Exception(message)
193 def requests_post( 194 self, 195 endpoint: str, 196 data: Optional[dict] = None, 197 params: Optional[dict] = None, 198 files: Optional[dict] = None, 199 ): 200 """ 201 Make a POST call to the endpoint. 202 203 :param endpoint: The path to the endpoint 204 :param data: A dictionary for JSON data 205 :param params: A dictionary of query params 206 :param files: A dictionary of files, see requests.post. Using both files and data 207 can lead to unexpected results! 208 :return: The JSON response parsed as a dict 209 """ 210 211 # This should ideally be a TypedDict of the type of arguments taken by 212 # `requests.post`. 213 args: dict[str, Any] = { 214 "headers": self.headers.copy(), 215 } 216 if data is not None: 217 args["data"] = json.dumps(data) 218 if params is not None: 219 args["params"] = params 220 if files is not None: 221 args["headers"].pop("Content-type") 222 args["files"] = files 223 224 request = self.requests.post(self.__get_url(endpoint), **args) 225 226 if request.status_code not in [200, 201, 202]: 227 if "already exists" in request.text or "e-mail already in use" in request.text: 228 self.logger.warning(request.text) 229 raise AlreadyExistsException() 230 self.logger.error(f"Received status code: {request.status_code} ({request.url})") 231 self.logger.error(f"With info: {data} ({self.headers})") 232 self.logger.error(f"Answer: {request.text}") 233 raise Exception( 234 f"Received status code: {request.status_code} ({request.url}), {request.text}" 235 ) 236 return self.parse_result(request)
Make a POST call to the endpoint.
Parameters
- endpoint: The path to the endpoint
- data: A dictionary for JSON data
- params: A dictionary of query params
- files: A dictionary of files, see requests.post. Using both files and data can lead to unexpected results!
Returns
The JSON response parsed as a dict
238 def requests_patch(self, endpoint: str, data: dict): 239 request = self.requests.patch( 240 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 241 ) 242 if request.status_code not in [200, 201]: 243 error_message = f"Received status code: {request.status_code} ({request.url}) {data}" 244 self.logger.error(error_message) 245 raise Exception(error_message) 246 return self.parse_result(request)
288 def create_user( 289 self, 290 user_name: str, 291 email: str, 292 password: str, 293 full_name: Optional[str] = None, 294 login_name: Optional[str] = None, 295 change_pw=True, 296 send_notify=True, 297 source_id=0, 298 ): 299 """Create User. 300 Throws: 301 AlreadyExistsException, if the User exists already 302 Exception, if something else went wrong. 303 """ 304 if not login_name: 305 login_name = user_name 306 if not full_name: 307 full_name = user_name 308 request_data = { 309 "source_id": source_id, 310 "login_name": login_name, 311 "full_name": full_name, 312 "username": user_name, 313 "email": email, 314 "password": password, 315 "send_notify": send_notify, 316 "must_change_password": change_pw, 317 } 318 319 self.logger.debug("Gitea post payload: %s", request_data) 320 result = self.requests_post(AllSpice.ADMIN_CREATE_USER, data=request_data) 321 if "id" in result: 322 self.logger.info( 323 "Successfully created User %s <%s> (id %s)", 324 result["login"], 325 result["email"], 326 result["id"], 327 ) 328 self.logger.debug("Gitea response: %s", result) 329 else: 330 self.logger.error(result["message"]) 331 raise Exception("User not created... (gitea: %s)" % result["message"]) 332 user = User.parse_response(self, result) 333 return user
Create User. Throws: AlreadyExistsException, if the User exists already Exception, if something else went wrong.
335 def create_repo( 336 self, 337 repoOwner: Union[User, Organization], 338 repoName: str, 339 description: str = "", 340 private: bool = False, 341 autoInit=True, 342 gitignores: Optional[str] = None, 343 license: Optional[str] = None, 344 readme: str = "Default", 345 issue_labels: Optional[str] = None, 346 default_branch="master", 347 ): 348 """Create a Repository as the administrator 349 350 Throws: 351 AlreadyExistsException: If the Repository exists already. 352 Exception: If something else went wrong. 353 354 Note: 355 Non-admin users can not use this method. Please use instead 356 `allspice.User.create_repo` or `allspice.Organization.create_repo`. 357 """ 358 # although this only says user in the api, this also works for 359 # organizations 360 assert isinstance(repoOwner, User) or isinstance(repoOwner, Organization) 361 result = self.requests_post( 362 AllSpice.ADMIN_REPO_CREATE % repoOwner.username, 363 data={ 364 "name": repoName, 365 "description": description, 366 "private": private, 367 "auto_init": autoInit, 368 "gitignores": gitignores, 369 "license": license, 370 "issue_labels": issue_labels, 371 "readme": readme, 372 "default_branch": default_branch, 373 }, 374 ) 375 if "id" in result: 376 self.logger.info("Successfully created Repository %s " % result["name"]) 377 else: 378 self.logger.error(result["message"]) 379 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 380 return Repository.parse_response(self, result)
Create a Repository as the administrator
Throws: AlreadyExistsException: If the Repository exists already. Exception: If something else went wrong.
Note:
Non-admin users can not use this method. Please use instead
allspice.User.create_repo
or allspice.Organization.create_repo
.
382 def create_org( 383 self, 384 owner: User, 385 orgName: str, 386 description: str, 387 location="", 388 website="", 389 full_name="", 390 ): 391 assert isinstance(owner, User) 392 result = self.requests_post( 393 AllSpice.CREATE_ORG % owner.username, 394 data={ 395 "username": orgName, 396 "description": description, 397 "location": location, 398 "website": website, 399 "full_name": full_name, 400 }, 401 ) 402 if "id" in result: 403 self.logger.info("Successfully created Organization %s" % result["username"]) 404 else: 405 self.logger.error("Organization not created... (gitea: %s)" % result["message"]) 406 self.logger.error(result["message"]) 407 raise Exception("Organization not created... (gitea: %s)" % result["message"]) 408 return Organization.parse_response(self, result)
410 def create_team( 411 self, 412 org: Organization, 413 name: str, 414 description: str = "", 415 permission: str = "read", 416 can_create_org_repo: bool = False, 417 includes_all_repositories: bool = False, 418 units=( 419 "repo.code", 420 "repo.issues", 421 "repo.ext_issues", 422 "repo.wiki", 423 "repo.pulls", 424 "repo.releases", 425 "repo.ext_wiki", 426 ), 427 units_map={}, 428 ): 429 """Creates a Team. 430 431 Args: 432 org (Organization): Organization the Team will be part of. 433 name (str): The Name of the Team to be created. 434 description (str): Optional, None, short description of the new Team. 435 permission (str): Optional, 'read', What permissions the members 436 units_map (dict): Optional, {}, a mapping of units to their 437 permissions. If None or empty, the `permission` permission will 438 be applied to all units. Note: When both `units` and `units_map` 439 are given, `units_map` will be preferred. 440 """ 441 442 result = self.requests_post( 443 AllSpice.CREATE_TEAM % org.username, 444 data={ 445 "name": name, 446 "description": description, 447 "permission": permission, 448 "can_create_org_repo": can_create_org_repo, 449 "includes_all_repositories": includes_all_repositories, 450 "units": units, 451 "units_map": units_map, 452 }, 453 ) 454 455 if "id" in result: 456 self.logger.info("Successfully created Team %s" % result["name"]) 457 else: 458 self.logger.error("Team not created... (gitea: %s)" % result["message"]) 459 self.logger.error(result["message"]) 460 raise Exception("Team not created... (gitea: %s)" % result["message"]) 461 api_object = Team.parse_response(self, result) 462 setattr( 463 api_object, "_organization", org 464 ) # fixes strange behaviour of gitea not returning a valid organization here. 465 return api_object
Creates a Team.
Args:
org (Organization): Organization the Team will be part of.
name (str): The Name of the Team to be created.
description (str): Optional, None, short description of the new Team.
permission (str): Optional, 'read', What permissions the members
units_map (dict): Optional, {}, a mapping of units to their
permissions. If None or empty, the permission
permission will
be applied to all units. Note: When both units
and units_map
are given, units_map
will be preferred.
Common base class for all non-exit exceptions.
419class Branch(ReadonlyApiObject): 420 commit: Dict[str, Optional[Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]]] 421 effective_branch_protection_name: str 422 enable_status_check: bool 423 name: str 424 protected: bool 425 required_approvals: int 426 status_check_contexts: List[Any] 427 user_can_merge: bool 428 user_can_push: bool 429 430 API_OBJECT = """/repos/{owner}/{repo}/branches/{branch}""" 431 432 def __init__(self, allspice_client): 433 super().__init__(allspice_client) 434 435 def __eq__(self, other): 436 if not isinstance(other, Branch): 437 return False 438 return self.commit == other.commit and self.name == other.name 439 440 def __hash__(self): 441 return hash(self.commit["id"]) ^ hash(self.name) 442 443 _fields_to_parsers: ClassVar[dict] = { 444 # This is not a commit object 445 # "commit": lambda allspice_client, c: Commit.parse_response(allspice_client, c) 446 } 447 448 @classmethod 449 def request(cls, allspice_client, owner: str, repo: str, branch: str): 450 return cls._request(allspice_client, {"owner": owner, "repo": repo, "branch": branch})
Inherited Members
1565class Comment(ApiObject): 1566 assets: List[Union[Any, Dict[str, Union[int, str]]]] 1567 body: str 1568 created_at: datetime 1569 html_url: str 1570 id: int 1571 issue_url: str 1572 original_author: str 1573 original_author_id: int 1574 pull_request_url: str 1575 updated_at: datetime 1576 user: User 1577 1578 API_OBJECT = """/repos/{owner}/{repo}/issues/comments/{id}""" 1579 GET_ATTACHMENTS_PATH = """/repos/{owner}/{repo}/issues/comments/{id}/assets""" 1580 ATTACHMENT_PATH = """/repos/{owner}/{repo}/issues/comments/{id}/assets/{attachment_id}""" 1581 1582 def __init__(self, allspice_client): 1583 super().__init__(allspice_client) 1584 1585 def __eq__(self, other): 1586 if not isinstance(other, Comment): 1587 return False 1588 return self.repository == other.repository and self.id == other.id 1589 1590 def __hash__(self): 1591 return hash(self.repository) ^ hash(self.id) 1592 1593 @classmethod 1594 def request(cls, allspice_client, owner: str, repo: str, id: str) -> "Comment": 1595 return cls._request(allspice_client, {"owner": owner, "repo": repo, "id": id}) 1596 1597 _fields_to_parsers: ClassVar[dict] = { 1598 "user": lambda allspice_client, r: User.parse_response(allspice_client, r), 1599 "created_at": lambda _, t: Util.convert_time(t), 1600 "updated_at": lambda _, t: Util.convert_time(t), 1601 } 1602 1603 _patchable_fields: ClassVar[set[str]] = {"body"} 1604 1605 @property 1606 def parent_url(self) -> str: 1607 """URL of the parent of this comment (the issue or the pull request)""" 1608 1609 if self.issue_url is not None and self.issue_url != "": 1610 return self.issue_url 1611 else: 1612 return self.pull_request_url 1613 1614 @cached_property 1615 def repository(self) -> Repository: 1616 """The repository this comment was posted on.""" 1617 1618 owner_name, repo_name = self.parent_url.split("/")[-4:-2] 1619 return Repository.request(self.allspice_client, owner_name, repo_name) 1620 1621 def __fields_for_path(self): 1622 return { 1623 "owner": self.repository.owner.username, 1624 "repo": self.repository.name, 1625 "id": self.id, 1626 } 1627 1628 def commit(self): 1629 self._commit(self.__fields_for_path()) 1630 1631 def delete(self): 1632 self.allspice_client.requests_delete(self.API_OBJECT.format(**self.__fields_for_path())) 1633 self.deleted = True 1634 1635 def get_attachments(self) -> List[Attachment]: 1636 """ 1637 Get all attachments on this comment. This returns Attachment objects, which 1638 contain a link to download the attachment. 1639 1640 https://hub.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments 1641 """ 1642 1643 results = self.allspice_client.requests_get( 1644 self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()) 1645 ) 1646 return [Attachment.parse_response(self.allspice_client, result) for result in results] 1647 1648 def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment: 1649 """ 1650 Create an attachment on this comment. 1651 1652 https://hub.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment 1653 1654 :param file: The file to attach. This should be a file-like object. 1655 :param name: The name of the file. If not provided, the name of the file will be 1656 used. 1657 :return: The created attachment. 1658 """ 1659 1660 args: dict[str, Any] = { 1661 "files": {"attachment": file}, 1662 } 1663 if name is not None: 1664 args["params"] = {"name": name} 1665 1666 result = self.allspice_client.requests_post( 1667 self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()), 1668 **args, 1669 ) 1670 return Attachment.parse_response(self.allspice_client, result) 1671 1672 def edit_attachment(self, attachment: Attachment, data: dict) -> Attachment: 1673 """ 1674 Edit an attachment. 1675 1676 The list of params that can be edited is available at 1677 https://hub.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment 1678 1679 :param attachment: The attachment to be edited 1680 :param data: The data parameter should be a dictionary of the fields to edit. 1681 :return: The edited attachment 1682 """ 1683 1684 args = { 1685 **self.__fields_for_path(), 1686 "attachment_id": attachment.id, 1687 } 1688 result = self.allspice_client.requests_patch( 1689 self.ATTACHMENT_PATH.format(**args), 1690 data=data, 1691 ) 1692 return Attachment.parse_response(self.allspice_client, result) 1693 1694 def delete_attachment(self, attachment: Attachment): 1695 """https://hub.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment""" 1696 1697 args = { 1698 **self.__fields_for_path(), 1699 "attachment_id": attachment.id, 1700 } 1701 self.allspice_client.requests_delete(self.ATTACHMENT_PATH.format(**args)) 1702 attachment.deleted = True
1605 @property 1606 def parent_url(self) -> str: 1607 """URL of the parent of this comment (the issue or the pull request)""" 1608 1609 if self.issue_url is not None and self.issue_url != "": 1610 return self.issue_url 1611 else: 1612 return self.pull_request_url
URL of the parent of this comment (the issue or the pull request)
1614 @cached_property 1615 def repository(self) -> Repository: 1616 """The repository this comment was posted on.""" 1617 1618 owner_name, repo_name = self.parent_url.split("/")[-4:-2] 1619 return Repository.request(self.allspice_client, owner_name, repo_name)
The repository this comment was posted on.
1635 def get_attachments(self) -> List[Attachment]: 1636 """ 1637 Get all attachments on this comment. This returns Attachment objects, which 1638 contain a link to download the attachment. 1639 1640 https://hub.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments 1641 """ 1642 1643 results = self.allspice_client.requests_get( 1644 self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()) 1645 ) 1646 return [Attachment.parse_response(self.allspice_client, result) for result in results]
Get all attachments on this comment. This returns Attachment objects, which contain a link to download the attachment.
allspice.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments">https://huballspice.allspice.io/api/swagger#/issue/issueListIssueCommentAttachments
1648 def create_attachment(self, file: IO, name: Optional[str] = None) -> Attachment: 1649 """ 1650 Create an attachment on this comment. 1651 1652 https://hub.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment 1653 1654 :param file: The file to attach. This should be a file-like object. 1655 :param name: The name of the file. If not provided, the name of the file will be 1656 used. 1657 :return: The created attachment. 1658 """ 1659 1660 args: dict[str, Any] = { 1661 "files": {"attachment": file}, 1662 } 1663 if name is not None: 1664 args["params"] = {"name": name} 1665 1666 result = self.allspice_client.requests_post( 1667 self.GET_ATTACHMENTS_PATH.format(**self.__fields_for_path()), 1668 **args, 1669 ) 1670 return Attachment.parse_response(self.allspice_client, result)
Create an attachment on this comment.
allspice.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateIssueCommentAttachment
Parameters
- file: The file to attach. This should be a file-like object.
- name: The name of the file. If not provided, the name of the file will be used.
Returns
The created attachment.
1672 def edit_attachment(self, attachment: Attachment, data: dict) -> Attachment: 1673 """ 1674 Edit an attachment. 1675 1676 The list of params that can be edited is available at 1677 https://hub.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment 1678 1679 :param attachment: The attachment to be edited 1680 :param data: The data parameter should be a dictionary of the fields to edit. 1681 :return: The edited attachment 1682 """ 1683 1684 args = { 1685 **self.__fields_for_path(), 1686 "attachment_id": attachment.id, 1687 } 1688 result = self.allspice_client.requests_patch( 1689 self.ATTACHMENT_PATH.format(**args), 1690 data=data, 1691 ) 1692 return Attachment.parse_response(self.allspice_client, result)
Edit an attachment.
The list of params that can be edited is available at allspice.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueEditIssueCommentAttachment
Parameters
- attachment: The attachment to be edited
- data: The data parameter should be a dictionary of the fields to edit.
Returns
The edited attachment
1694 def delete_attachment(self, attachment: Attachment): 1695 """https://hub.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment""" 1696 1697 args = { 1698 **self.__fields_for_path(), 1699 "attachment_id": attachment.id, 1700 } 1701 self.allspice_client.requests_delete(self.ATTACHMENT_PATH.format(**args)) 1702 attachment.deleted = True
allspice.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment">https://huballspice.allspice.io/api/swagger#/issue/issueDeleteIssueCommentAttachment
1705class Commit(ReadonlyApiObject): 1706 author: User 1707 commit: Dict[str, Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]] 1708 committer: Dict[str, Union[int, str, bool]] 1709 created: str 1710 files: List[Dict[str, str]] 1711 html_url: str 1712 inner_commit: Dict[str, Union[str, Dict[str, str], Dict[str, Optional[Union[bool, str]]]]] 1713 parents: List[Union[Dict[str, str], Any]] 1714 sha: str 1715 stats: Dict[str, int] 1716 url: str 1717 1718 API_OBJECT = """/repos/{owner}/{repo}/commits/{sha}""" 1719 COMMIT_GET_STATUS = """/repos/{owner}/{repo}/commits/{sha}/status""" 1720 COMMIT_GET_STATUSES = """/repos/{owner}/{repo}/commits/{sha}/statuses""" 1721 1722 # Regex to extract owner and repo names from the url property 1723 URL_REGEXP = re.compile(r"/repos/([^/]+)/([^/]+)/git/commits") 1724 1725 def __init__(self, allspice_client): 1726 super().__init__(allspice_client) 1727 1728 _fields_to_parsers: ClassVar[dict] = { 1729 # NOTE: api may return None for commiters that are no allspice users 1730 "author": lambda allspice_client, u: ( 1731 User.parse_response(allspice_client, u) if u else None 1732 ) 1733 } 1734 1735 def __eq__(self, other): 1736 if not isinstance(other, Commit): 1737 return False 1738 return self.sha == other.sha 1739 1740 def __hash__(self): 1741 return hash(self.sha) 1742 1743 @classmethod 1744 def parse_response(cls, allspice_client, result) -> "Commit": 1745 commit_cache = result["commit"] 1746 api_object = cls(allspice_client) 1747 cls._initialize(allspice_client, api_object, result) 1748 # inner_commit for legacy reasons 1749 Commit._add_read_property("inner_commit", commit_cache, api_object) 1750 return api_object 1751 1752 def get_status(self) -> CommitCombinedStatus: 1753 """ 1754 Get a combined status consisting of all statues on this commit. 1755 1756 Note that the returned object is a CommitCombinedStatus object, which 1757 also contains a list of all statuses on the commit. 1758 1759 https://hub.allspice.io/api/swagger#/repository/repoGetCommitStatus 1760 """ 1761 1762 result = self.allspice_client.requests_get( 1763 self.COMMIT_GET_STATUS.format(**self._fields_for_path) 1764 ) 1765 return CommitCombinedStatus.parse_response(self.allspice_client, result) 1766 1767 def get_statuses(self) -> List[CommitStatus]: 1768 """ 1769 Get a list of all statuses on this commit. 1770 1771 https://hub.allspice.io/api/swagger#/repository/repoListCommitStatuses 1772 """ 1773 1774 results = self.allspice_client.requests_get( 1775 self.COMMIT_GET_STATUSES.format(**self._fields_for_path) 1776 ) 1777 return [CommitStatus.parse_response(self.allspice_client, result) for result in results] 1778 1779 @cached_property 1780 def _fields_for_path(self) -> dict[str, str]: 1781 matches = self.URL_REGEXP.search(self.url) 1782 if not matches: 1783 raise ValueError(f"Invalid commit URL: {self.url}") 1784 1785 return { 1786 "owner": matches.group(1), 1787 "repo": matches.group(2), 1788 "sha": self.sha, 1789 }
1743 @classmethod 1744 def parse_response(cls, allspice_client, result) -> "Commit": 1745 commit_cache = result["commit"] 1746 api_object = cls(allspice_client) 1747 cls._initialize(allspice_client, api_object, result) 1748 # inner_commit for legacy reasons 1749 Commit._add_read_property("inner_commit", commit_cache, api_object) 1750 return api_object
1752 def get_status(self) -> CommitCombinedStatus: 1753 """ 1754 Get a combined status consisting of all statues on this commit. 1755 1756 Note that the returned object is a CommitCombinedStatus object, which 1757 also contains a list of all statuses on the commit. 1758 1759 https://hub.allspice.io/api/swagger#/repository/repoGetCommitStatus 1760 """ 1761 1762 result = self.allspice_client.requests_get( 1763 self.COMMIT_GET_STATUS.format(**self._fields_for_path) 1764 ) 1765 return CommitCombinedStatus.parse_response(self.allspice_client, result)
Get a combined status consisting of all statues on this commit.
Note that the returned object is a CommitCombinedStatus object, which also contains a list of all statuses on the commit.
allspice.allspice.io/api/swagger#/repository/repoGetCommitStatus">https://huballspice.allspice.io/api/swagger#/repository/repoGetCommitStatus
1767 def get_statuses(self) -> List[CommitStatus]: 1768 """ 1769 Get a list of all statuses on this commit. 1770 1771 https://hub.allspice.io/api/swagger#/repository/repoListCommitStatuses 1772 """ 1773 1774 results = self.allspice_client.requests_get( 1775 self.COMMIT_GET_STATUSES.format(**self._fields_for_path) 1776 ) 1777 return [CommitStatus.parse_response(self.allspice_client, result) for result in results]
Get a list of all statuses on this commit.
allspice.allspice.io/api/swagger#/repository/repoListCommitStatuses">https://huballspice.allspice.io/api/swagger#/repository/repoListCommitStatuses
Inherited Members
2527class Content(ReadonlyApiObject): 2528 content: Any 2529 download_url: str 2530 encoding: Any 2531 git_url: str 2532 html_url: str 2533 last_commit_sha: str 2534 name: str 2535 path: str 2536 sha: str 2537 size: int 2538 submodule_git_url: Any 2539 target: Any 2540 type: str 2541 url: str 2542 2543 FILE = "file" 2544 2545 def __init__(self, allspice_client): 2546 super().__init__(allspice_client) 2547 2548 def __eq__(self, other): 2549 if not isinstance(other, Content): 2550 return False 2551 2552 return self.sha == other.sha and self.name == other.name 2553 2554 def __hash__(self): 2555 return hash(self.sha) ^ hash(self.name)
Inherited Members
2026class DesignReview(ApiObject): 2027 """ 2028 A Design Review. See 2029 https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest. 2030 2031 Note: The base and head fields are not `Branch` objects - they are plain strings 2032 referring to the branch names. This is because DRs can exist for branches that have 2033 been deleted, which don't have an associated `Branch` object from the API. You can use 2034 the `Repository.get_branch` method to get a `Branch` object for a branch if you know 2035 it exists. 2036 """ 2037 2038 additions: int 2039 allow_maintainer_edit: bool 2040 allow_maintainer_edits: Any 2041 assignee: User 2042 assignees: List["User"] 2043 base: str 2044 body: str 2045 changed_files: int 2046 closed_at: Any 2047 comments: int 2048 created_at: str 2049 deletions: int 2050 diff_url: str 2051 draft: bool 2052 due_date: Optional[str] 2053 head: str 2054 html_url: str 2055 id: int 2056 is_locked: bool 2057 labels: List[Any] 2058 merge_base: str 2059 merge_commit_sha: Any 2060 mergeable: bool 2061 merged: bool 2062 merged_at: Any 2063 merged_by: Any 2064 milestone: Any 2065 number: int 2066 patch_url: str 2067 pin_order: int 2068 repository: Optional["Repository"] 2069 requested_reviewers: Any 2070 review_comments: int 2071 state: str 2072 title: str 2073 updated_at: str 2074 url: str 2075 user: User 2076 2077 API_OBJECT = "/repos/{owner}/{repo}/pulls/{index}" 2078 MERGE_DESIGN_REVIEW = "/repos/{owner}/{repo}/pulls/{index}/merge" 2079 GET_COMMENTS = "/repos/{owner}/{repo}/issues/{index}/comments" 2080 2081 OPEN = "open" 2082 CLOSED = "closed" 2083 2084 class MergeType(Enum): 2085 MERGE = "merge" 2086 REBASE = "rebase" 2087 REBASE_MERGE = "rebase-merge" 2088 SQUASH = "squash" 2089 MANUALLY_MERGED = "manually-merged" 2090 2091 def __init__(self, allspice_client): 2092 super().__init__(allspice_client) 2093 2094 def __eq__(self, other): 2095 if not isinstance(other, DesignReview): 2096 return False 2097 return self.repository == other.repository and self.id == other.id 2098 2099 def __hash__(self): 2100 return hash(self.repository) ^ hash(self.id) 2101 2102 @classmethod 2103 def parse_response(cls, allspice_client, result) -> "DesignReview": 2104 api_object = super().parse_response(allspice_client, result) 2105 cls._add_read_property( 2106 "repository", 2107 Repository.parse_response(allspice_client, result["base"]["repo"]), 2108 api_object, 2109 ) 2110 2111 return api_object 2112 2113 @classmethod 2114 def request(cls, allspice_client, owner: str, repo: str, number: str): 2115 """See https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest""" 2116 return cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number}) 2117 2118 _fields_to_parsers: ClassVar[dict] = { 2119 "assignee": lambda allspice_client, u: User.parse_response(allspice_client, u), 2120 "assignees": lambda allspice_client, us: [ 2121 User.parse_response(allspice_client, u) for u in us 2122 ], 2123 "base": lambda _, b: b["ref"], 2124 "head": lambda _, h: h["ref"], 2125 "merged_by": lambda allspice_client, u: User.parse_response(allspice_client, u), 2126 "milestone": lambda allspice_client, m: Milestone.parse_response(allspice_client, m), 2127 "user": lambda allspice_client, u: User.parse_response(allspice_client, u), 2128 } 2129 2130 _patchable_fields: ClassVar[set[str]] = { 2131 "allow_maintainer_edits", 2132 "assignee", 2133 "assignees", 2134 "base", 2135 "body", 2136 "due_date", 2137 "milestone", 2138 "state", 2139 "title", 2140 } 2141 2142 _parsers_to_fields: ClassVar[dict] = { 2143 "assignee": lambda u: u.username, 2144 "assignees": lambda us: [u.username for u in us], 2145 "base": lambda b: b.name if isinstance(b, Branch) else b, 2146 "milestone": lambda m: m.id, 2147 } 2148 2149 def commit(self): 2150 data = self.get_dirty_fields() 2151 if "due_date" in data and data["due_date"] is None: 2152 data["unset_due_date"] = True 2153 2154 args = { 2155 "owner": self.repository.owner.username, 2156 "repo": self.repository.name, 2157 "index": self.number, 2158 } 2159 self._commit(args, data) 2160 2161 def merge(self, merge_type: MergeType): 2162 """ 2163 Merge the pull request. See 2164 https://hub.allspice.io/api/swagger#/repository/repoMergePullRequest 2165 2166 :param merge_type: The type of merge to perform. See the MergeType enum. 2167 """ 2168 2169 self.allspice_client.requests_put( 2170 self.MERGE_DESIGN_REVIEW.format( 2171 owner=self.repository.owner.username, 2172 repo=self.repository.name, 2173 index=self.number, 2174 ), 2175 data={"Do": merge_type.value}, 2176 ) 2177 2178 def get_comments(self) -> List[Comment]: 2179 """ 2180 Get the comments on this pull request, but not specifically on a review. 2181 2182 https://hub.allspice.io/api/swagger#/issue/issueGetComments 2183 2184 :return: A list of comments on this pull request. 2185 """ 2186 2187 results = self.allspice_client.requests_get( 2188 self.GET_COMMENTS.format( 2189 owner=self.repository.owner.username, 2190 repo=self.repository.name, 2191 index=self.number, 2192 ) 2193 ) 2194 return [Comment.parse_response(self.allspice_client, result) for result in results] 2195 2196 def create_comment(self, body: str): 2197 """ 2198 Create a comment on this pull request. This uses the same endpoint as the 2199 comments on issues, and will not be associated with any reviews. 2200 2201 https://hub.allspice.io/api/swagger#/issue/issueCreateComment 2202 2203 :param body: The body of the comment. 2204 :return: The comment that was created. 2205 """ 2206 2207 result = self.allspice_client.requests_post( 2208 self.GET_COMMENTS.format( 2209 owner=self.repository.owner.username, 2210 repo=self.repository.name, 2211 index=self.number, 2212 ), 2213 data={"body": body}, 2214 ) 2215 return Comment.parse_response(self.allspice_client, result)
A Design Review. See allspice.allspice.io/api/swagger#/repository/repoGetPullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoGetPullRequest.
Note: The base and head fields are not Branch
objects - they are plain strings
referring to the branch names. This is because DRs can exist for branches that have
been deleted, which don't have an associated Branch
object from the API. You can use
the Repository.get_branch
method to get a Branch
object for a branch if you know
it exists.
2102 @classmethod 2103 def parse_response(cls, allspice_client, result) -> "DesignReview": 2104 api_object = super().parse_response(allspice_client, result) 2105 cls._add_read_property( 2106 "repository", 2107 Repository.parse_response(allspice_client, result["base"]["repo"]), 2108 api_object, 2109 ) 2110 2111 return api_object
2113 @classmethod 2114 def request(cls, allspice_client, owner: str, repo: str, number: str): 2115 """See https://hub.allspice.io/api/swagger#/repository/repoGetPullRequest""" 2116 return cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number})
See allspice.allspice.io/api/swagger#/repository/repoGetPullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoGetPullRequest
2149 def commit(self): 2150 data = self.get_dirty_fields() 2151 if "due_date" in data and data["due_date"] is None: 2152 data["unset_due_date"] = True 2153 2154 args = { 2155 "owner": self.repository.owner.username, 2156 "repo": self.repository.name, 2157 "index": self.number, 2158 } 2159 self._commit(args, data)
2161 def merge(self, merge_type: MergeType): 2162 """ 2163 Merge the pull request. See 2164 https://hub.allspice.io/api/swagger#/repository/repoMergePullRequest 2165 2166 :param merge_type: The type of merge to perform. See the MergeType enum. 2167 """ 2168 2169 self.allspice_client.requests_put( 2170 self.MERGE_DESIGN_REVIEW.format( 2171 owner=self.repository.owner.username, 2172 repo=self.repository.name, 2173 index=self.number, 2174 ), 2175 data={"Do": merge_type.value}, 2176 )
Merge the pull request. See allspice.allspice.io/api/swagger#/repository/repoMergePullRequest">https://huballspice.allspice.io/api/swagger#/repository/repoMergePullRequest
Parameters
- merge_type: The type of merge to perform. See the MergeType enum.
2178 def get_comments(self) -> List[Comment]: 2179 """ 2180 Get the comments on this pull request, but not specifically on a review. 2181 2182 https://hub.allspice.io/api/swagger#/issue/issueGetComments 2183 2184 :return: A list of comments on this pull request. 2185 """ 2186 2187 results = self.allspice_client.requests_get( 2188 self.GET_COMMENTS.format( 2189 owner=self.repository.owner.username, 2190 repo=self.repository.name, 2191 index=self.number, 2192 ) 2193 ) 2194 return [Comment.parse_response(self.allspice_client, result) for result in results]
Get the comments on this pull request, but not specifically on a review.
allspice.allspice.io/api/swagger#/issue/issueGetComments">https://huballspice.allspice.io/api/swagger#/issue/issueGetComments
Returns
A list of comments on this pull request.
2196 def create_comment(self, body: str): 2197 """ 2198 Create a comment on this pull request. This uses the same endpoint as the 2199 comments on issues, and will not be associated with any reviews. 2200 2201 https://hub.allspice.io/api/swagger#/issue/issueCreateComment 2202 2203 :param body: The body of the comment. 2204 :return: The comment that was created. 2205 """ 2206 2207 result = self.allspice_client.requests_post( 2208 self.GET_COMMENTS.format( 2209 owner=self.repository.owner.username, 2210 repo=self.repository.name, 2211 index=self.number, 2212 ), 2213 data={"body": body}, 2214 ) 2215 return Comment.parse_response(self.allspice_client, result)
Create a comment on this pull request. This uses the same endpoint as the comments on issues, and will not be associated with any reviews.
allspice.allspice.io/api/swagger#/issue/issueCreateComment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateComment
Parameters
- body: The body of the comment.
Returns
The comment that was created.
1880class Issue(ApiObject): 1881 assets: List[Any] 1882 assignee: Any 1883 assignees: Any 1884 body: str 1885 closed_at: Any 1886 comments: int 1887 created_at: str 1888 due_date: Any 1889 html_url: str 1890 id: int 1891 is_locked: bool 1892 labels: List[Any] 1893 milestone: Optional["Milestone"] 1894 number: int 1895 original_author: str 1896 original_author_id: int 1897 pin_order: int 1898 pull_request: Any 1899 ref: str 1900 repository: Dict[str, Union[int, str]] 1901 state: str 1902 title: str 1903 updated_at: str 1904 url: str 1905 user: User 1906 1907 API_OBJECT = """/repos/{owner}/{repo}/issues/{index}""" # <owner, repo, index> 1908 GET_TIME = """/repos/%s/%s/issues/%s/times""" # <owner, repo, index> 1909 GET_COMMENTS = """/repos/{owner}/{repo}/issues/{index}/comments""" 1910 CREATE_ISSUE = """/repos/{owner}/{repo}/issues""" 1911 1912 OPENED = "open" 1913 CLOSED = "closed" 1914 1915 def __init__(self, allspice_client): 1916 super().__init__(allspice_client) 1917 1918 def __eq__(self, other): 1919 if not isinstance(other, Issue): 1920 return False 1921 return self.repository == other.repository and self.id == other.id 1922 1923 def __hash__(self): 1924 return hash(self.repository) ^ hash(self.id) 1925 1926 _fields_to_parsers: ClassVar[dict] = { 1927 "milestone": lambda allspice_client, m: Milestone.parse_response(allspice_client, m), 1928 "user": lambda allspice_client, u: User.parse_response(allspice_client, u), 1929 "assignee": lambda allspice_client, u: User.parse_response(allspice_client, u), 1930 "assignees": lambda allspice_client, us: [ 1931 User.parse_response(allspice_client, u) for u in us 1932 ], 1933 "state": lambda _, s: (Issue.CLOSED if s == "closed" else Issue.OPENED), 1934 } 1935 1936 _parsers_to_fields: ClassVar[dict] = { 1937 "milestone": lambda m: m.id, 1938 } 1939 1940 _patchable_fields: ClassVar[set[str]] = { 1941 "assignee", 1942 "assignees", 1943 "body", 1944 "due_date", 1945 "milestone", 1946 "state", 1947 "title", 1948 } 1949 1950 def commit(self): 1951 args = { 1952 "owner": self.repository.owner.username, 1953 "repo": self.repository.name, 1954 "index": self.number, 1955 } 1956 self._commit(args) 1957 1958 @classmethod 1959 def request(cls, allspice_client, owner: str, repo: str, number: str): 1960 api_object = cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number}) 1961 # The repository in the response is a RepositoryMeta object, so request 1962 # the full repository object and add it to the issue object. 1963 repository = Repository.request(allspice_client, owner, repo) 1964 setattr(api_object, "_repository", repository) 1965 # For legacy reasons 1966 cls._add_read_property("repo", repository, api_object) 1967 return api_object 1968 1969 @classmethod 1970 def create_issue(cls, allspice_client, repo: Repository, title: str, body: str = ""): 1971 args = {"owner": repo.owner.username, "repo": repo.name} 1972 data = {"title": title, "body": body} 1973 result = allspice_client.requests_post(Issue.CREATE_ISSUE.format(**args), data=data) 1974 issue = Issue.parse_response(allspice_client, result) 1975 setattr(issue, "_repository", repo) 1976 cls._add_read_property("repo", repo, issue) 1977 return issue 1978 1979 @property 1980 def owner(self) -> Organization | User: 1981 return self.repository.owner 1982 1983 def get_time_sum(self, user: User) -> int: 1984 results = self.allspice_client.requests_get( 1985 Issue.GET_TIME % (self.owner.username, self.repository.name, self.number) 1986 ) 1987 return sum(result["time"] for result in results if result and result["user_id"] == user.id) 1988 1989 def get_times(self) -> Optional[Dict]: 1990 return self.allspice_client.requests_get( 1991 Issue.GET_TIME % (self.owner.username, self.repository.name, self.number) 1992 ) 1993 1994 def delete_time(self, time_id: str): 1995 path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times/{time_id}" 1996 self.allspice_client.requests_delete(path) 1997 1998 def add_time(self, time: int, created: Optional[str] = None, user_name: Optional[User] = None): 1999 path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times" 2000 self.allspice_client.requests_post( 2001 path, data={"created": created, "time": int(time), "user_name": user_name} 2002 ) 2003 2004 def get_comments(self) -> List[Comment]: 2005 """https://hub.allspice.io/api/swagger#/issue/issueGetComments""" 2006 2007 results = self.allspice_client.requests_get( 2008 self.GET_COMMENTS.format( 2009 owner=self.owner.username, repo=self.repository.name, index=self.number 2010 ) 2011 ) 2012 2013 return [Comment.parse_response(self.allspice_client, result) for result in results] 2014 2015 def create_comment(self, body: str) -> Comment: 2016 """https://hub.allspice.io/api/swagger#/issue/issueCreateComment""" 2017 2018 path = self.GET_COMMENTS.format( 2019 owner=self.owner.username, repo=self.repository.name, index=self.number 2020 ) 2021 2022 response = self.allspice_client.requests_post(path, data={"body": body}) 2023 return Comment.parse_response(self.allspice_client, response)
1958 @classmethod 1959 def request(cls, allspice_client, owner: str, repo: str, number: str): 1960 api_object = cls._request(allspice_client, {"owner": owner, "repo": repo, "index": number}) 1961 # The repository in the response is a RepositoryMeta object, so request 1962 # the full repository object and add it to the issue object. 1963 repository = Repository.request(allspice_client, owner, repo) 1964 setattr(api_object, "_repository", repository) 1965 # For legacy reasons 1966 cls._add_read_property("repo", repository, api_object) 1967 return api_object
1969 @classmethod 1970 def create_issue(cls, allspice_client, repo: Repository, title: str, body: str = ""): 1971 args = {"owner": repo.owner.username, "repo": repo.name} 1972 data = {"title": title, "body": body} 1973 result = allspice_client.requests_post(Issue.CREATE_ISSUE.format(**args), data=data) 1974 issue = Issue.parse_response(allspice_client, result) 1975 setattr(issue, "_repository", repo) 1976 cls._add_read_property("repo", repo, issue) 1977 return issue
1998 def add_time(self, time: int, created: Optional[str] = None, user_name: Optional[User] = None): 1999 path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times" 2000 self.allspice_client.requests_post( 2001 path, data={"created": created, "time": int(time), "user_name": user_name} 2002 )
2004 def get_comments(self) -> List[Comment]: 2005 """https://hub.allspice.io/api/swagger#/issue/issueGetComments""" 2006 2007 results = self.allspice_client.requests_get( 2008 self.GET_COMMENTS.format( 2009 owner=self.owner.username, repo=self.repository.name, index=self.number 2010 ) 2011 ) 2012 2013 return [Comment.parse_response(self.allspice_client, result) for result in results]
allspice.allspice.io/api/swagger#/issue/issueGetComments">https://huballspice.allspice.io/api/swagger#/issue/issueGetComments
2015 def create_comment(self, body: str) -> Comment: 2016 """https://hub.allspice.io/api/swagger#/issue/issueCreateComment""" 2017 2018 path = self.GET_COMMENTS.format( 2019 owner=self.owner.username, repo=self.repository.name, index=self.number 2020 ) 2021 2022 response = self.allspice_client.requests_post(path, data={"body": body}) 2023 return Comment.parse_response(self.allspice_client, response)
allspice.allspice.io/api/swagger#/issue/issueCreateComment">https://huballspice.allspice.io/api/swagger#/issue/issueCreateComment
1449class Milestone(ApiObject): 1450 allow_merge_commits: Any 1451 allow_rebase: Any 1452 allow_rebase_explicit: Any 1453 allow_squash_merge: Any 1454 archived: Any 1455 closed_at: Any 1456 closed_issues: int 1457 created_at: str 1458 default_branch: Any 1459 description: str 1460 due_on: Any 1461 has_issues: Any 1462 has_pull_requests: Any 1463 has_wiki: Any 1464 id: int 1465 ignore_whitespace_conflicts: Any 1466 name: Any 1467 open_issues: int 1468 private: Any 1469 state: str 1470 title: str 1471 updated_at: str 1472 website: Any 1473 1474 API_OBJECT = """/repos/{owner}/{repo}/milestones/{number}""" # <owner, repo> 1475 1476 def __init__(self, allspice_client): 1477 super().__init__(allspice_client) 1478 1479 def __eq__(self, other): 1480 if not isinstance(other, Milestone): 1481 return False 1482 return self.allspice_client == other.allspice_client and self.id == other.id 1483 1484 def __hash__(self): 1485 return hash(self.allspice_client) ^ hash(self.id) 1486 1487 _fields_to_parsers: ClassVar[dict] = { 1488 "closed_at": lambda _, t: Util.convert_time(t), 1489 "due_on": lambda _, t: Util.convert_time(t), 1490 } 1491 1492 _patchable_fields: ClassVar[set[str]] = { 1493 "allow_merge_commits", 1494 "allow_rebase", 1495 "allow_rebase_explicit", 1496 "allow_squash_merge", 1497 "archived", 1498 "default_branch", 1499 "description", 1500 "has_issues", 1501 "has_pull_requests", 1502 "has_wiki", 1503 "ignore_whitespace_conflicts", 1504 "name", 1505 "private", 1506 "website", 1507 } 1508 1509 @classmethod 1510 def request(cls, allspice_client, owner: str, repo: str, number: str): 1511 return cls._request(allspice_client, {"owner": owner, "repo": repo, "number": number})
Common base class for all non-exit exceptions.
33class Organization(ApiObject): 34 """see https://hub.allspice.io/api/swagger#/organization/orgGetAll""" 35 36 active: Optional[bool] 37 avatar_url: str 38 created: Optional[str] 39 description: str 40 email: str 41 followers_count: Optional[int] 42 following_count: Optional[int] 43 full_name: str 44 html_url: Optional[str] 45 id: int 46 is_admin: Optional[bool] 47 language: Optional[str] 48 last_login: Optional[str] 49 location: str 50 login: Optional[str] 51 login_name: Optional[str] 52 name: Optional[str] 53 prohibit_login: Optional[bool] 54 repo_admin_change_team_access: Optional[bool] 55 restricted: Optional[bool] 56 source_id: Optional[int] 57 starred_repos_count: Optional[int] 58 username: str 59 visibility: str 60 website: str 61 62 API_OBJECT = """/orgs/{name}""" # <org> 63 ORG_REPOS_REQUEST = """/orgs/%s/repos""" # <org> 64 ORG_TEAMS_REQUEST = """/orgs/%s/teams""" # <org> 65 ORG_TEAMS_CREATE = """/orgs/%s/teams""" # <org> 66 ORG_GET_MEMBERS = """/orgs/%s/members""" # <org> 67 ORG_IS_MEMBER = """/orgs/%s/members/%s""" # <org>, <username> 68 ORG_HEATMAP = """/users/%s/heatmap""" # <username> 69 70 def __init__(self, allspice_client): 71 super().__init__(allspice_client) 72 73 def __eq__(self, other): 74 if not isinstance(other, Organization): 75 return False 76 return self.allspice_client == other.allspice_client and self.name == other.name 77 78 def __hash__(self): 79 return hash(self.allspice_client) ^ hash(self.name) 80 81 @classmethod 82 def request(cls, allspice_client, name: str) -> Self: 83 return cls._request(allspice_client, {"name": name}) 84 85 @classmethod 86 def parse_response(cls, allspice_client, result) -> "Organization": 87 api_object = super().parse_response(allspice_client, result) 88 # add "name" field to make this behave similar to users for gitea < 1.18 89 # also necessary for repository-owner when org is repo owner 90 if not hasattr(api_object, "name"): 91 Organization._add_read_property("name", result["username"], api_object) 92 return api_object 93 94 _patchable_fields: ClassVar[set[str]] = { 95 "description", 96 "full_name", 97 "location", 98 "visibility", 99 "website", 100 } 101 102 def commit(self): 103 args = {"name": self.name} 104 self._commit(args) 105 106 def create_repo( 107 self, 108 repoName: str, 109 description: str = "", 110 private: bool = False, 111 autoInit=True, 112 gitignores: Optional[str] = None, 113 license: Optional[str] = None, 114 readme: str = "Default", 115 issue_labels: Optional[str] = None, 116 default_branch="master", 117 ): 118 """Create an organization Repository 119 120 Throws: 121 AlreadyExistsException: If the Repository exists already. 122 Exception: If something else went wrong. 123 """ 124 result = self.allspice_client.requests_post( 125 f"/orgs/{self.name}/repos", 126 data={ 127 "name": repoName, 128 "description": description, 129 "private": private, 130 "auto_init": autoInit, 131 "gitignores": gitignores, 132 "license": license, 133 "issue_labels": issue_labels, 134 "readme": readme, 135 "default_branch": default_branch, 136 }, 137 ) 138 if "id" in result: 139 self.allspice_client.logger.info("Successfully created Repository %s " % result["name"]) 140 else: 141 self.allspice_client.logger.error(result["message"]) 142 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 143 return Repository.parse_response(self.allspice_client, result) 144 145 def get_repositories(self) -> List["Repository"]: 146 results = self.allspice_client.requests_get_paginated( 147 Organization.ORG_REPOS_REQUEST % self.username 148 ) 149 return [Repository.parse_response(self.allspice_client, result) for result in results] 150 151 def get_repository(self, name) -> "Repository": 152 repos = self.get_repositories() 153 for repo in repos: 154 if repo.name == name: 155 return repo 156 raise NotFoundException("Repository %s not existent in organization." % name) 157 158 def get_teams(self) -> List["Team"]: 159 results = self.allspice_client.requests_get(Organization.ORG_TEAMS_REQUEST % self.username) 160 teams = [Team.parse_response(self.allspice_client, result) for result in results] 161 # organisation seems to be missing using this request, so we add org manually 162 for t in teams: 163 setattr(t, "_organization", self) 164 return teams 165 166 def get_team(self, name) -> "Team": 167 teams = self.get_teams() 168 for team in teams: 169 if team.name == name: 170 return team 171 raise NotFoundException("Team not existent in organization.") 172 173 def create_team( 174 self, 175 name: str, 176 description: str = "", 177 permission: str = "read", 178 can_create_org_repo: bool = False, 179 includes_all_repositories: bool = False, 180 units=( 181 "repo.code", 182 "repo.issues", 183 "repo.ext_issues", 184 "repo.wiki", 185 "repo.pulls", 186 "repo.releases", 187 "repo.ext_wiki", 188 ), 189 units_map={}, 190 ) -> "Team": 191 """Alias for AllSpice#create_team""" 192 # TODO: Move AllSpice#create_team to Organization#create_team and 193 # deprecate AllSpice#create_team. 194 return self.allspice_client.create_team( 195 org=self, 196 name=name, 197 description=description, 198 permission=permission, 199 can_create_org_repo=can_create_org_repo, 200 includes_all_repositories=includes_all_repositories, 201 units=units, 202 units_map=units_map, 203 ) 204 205 def get_members(self) -> List["User"]: 206 results = self.allspice_client.requests_get(Organization.ORG_GET_MEMBERS % self.username) 207 return [User.parse_response(self.allspice_client, result) for result in results] 208 209 def is_member(self, username) -> bool: 210 if isinstance(username, User): 211 username = username.username 212 try: 213 # returns 204 if its ok, 404 if its not 214 self.allspice_client.requests_get( 215 Organization.ORG_IS_MEMBER % (self.username, username) 216 ) 217 return True 218 except Exception: 219 return False 220 221 def remove_member(self, user: "User"): 222 path = f"/orgs/{self.username}/members/{user.username}" 223 self.allspice_client.requests_delete(path) 224 225 def delete(self): 226 """Delete this Organization. Invalidates this Objects data. Also deletes all Repositories owned by the User""" 227 for repo in self.get_repositories(): 228 repo.delete() 229 self.allspice_client.requests_delete(Organization.API_OBJECT.format(name=self.username)) 230 self.deleted = True 231 232 def get_heatmap(self) -> List[Tuple[datetime, int]]: 233 results = self.allspice_client.requests_get(User.USER_HEATMAP % self.username) 234 results = [ 235 (datetime.fromtimestamp(result["timestamp"]), result["contributions"]) 236 for result in results 237 ] 238 return results
see allspice.allspice.io/api/swagger#/organization/orgGetAll">https://huballspice.allspice.io/api/swagger#/organization/orgGetAll
85 @classmethod 86 def parse_response(cls, allspice_client, result) -> "Organization": 87 api_object = super().parse_response(allspice_client, result) 88 # add "name" field to make this behave similar to users for gitea < 1.18 89 # also necessary for repository-owner when org is repo owner 90 if not hasattr(api_object, "name"): 91 Organization._add_read_property("name", result["username"], api_object) 92 return api_object
106 def create_repo( 107 self, 108 repoName: str, 109 description: str = "", 110 private: bool = False, 111 autoInit=True, 112 gitignores: Optional[str] = None, 113 license: Optional[str] = None, 114 readme: str = "Default", 115 issue_labels: Optional[str] = None, 116 default_branch="master", 117 ): 118 """Create an organization Repository 119 120 Throws: 121 AlreadyExistsException: If the Repository exists already. 122 Exception: If something else went wrong. 123 """ 124 result = self.allspice_client.requests_post( 125 f"/orgs/{self.name}/repos", 126 data={ 127 "name": repoName, 128 "description": description, 129 "private": private, 130 "auto_init": autoInit, 131 "gitignores": gitignores, 132 "license": license, 133 "issue_labels": issue_labels, 134 "readme": readme, 135 "default_branch": default_branch, 136 }, 137 ) 138 if "id" in result: 139 self.allspice_client.logger.info("Successfully created Repository %s " % result["name"]) 140 else: 141 self.allspice_client.logger.error(result["message"]) 142 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 143 return Repository.parse_response(self.allspice_client, result)
Create an organization Repository
Throws: AlreadyExistsException: If the Repository exists already. Exception: If something else went wrong.
158 def get_teams(self) -> List["Team"]: 159 results = self.allspice_client.requests_get(Organization.ORG_TEAMS_REQUEST % self.username) 160 teams = [Team.parse_response(self.allspice_client, result) for result in results] 161 # organisation seems to be missing using this request, so we add org manually 162 for t in teams: 163 setattr(t, "_organization", self) 164 return teams
173 def create_team( 174 self, 175 name: str, 176 description: str = "", 177 permission: str = "read", 178 can_create_org_repo: bool = False, 179 includes_all_repositories: bool = False, 180 units=( 181 "repo.code", 182 "repo.issues", 183 "repo.ext_issues", 184 "repo.wiki", 185 "repo.pulls", 186 "repo.releases", 187 "repo.ext_wiki", 188 ), 189 units_map={}, 190 ) -> "Team": 191 """Alias for AllSpice#create_team""" 192 # TODO: Move AllSpice#create_team to Organization#create_team and 193 # deprecate AllSpice#create_team. 194 return self.allspice_client.create_team( 195 org=self, 196 name=name, 197 description=description, 198 permission=permission, 199 can_create_org_repo=can_create_org_repo, 200 includes_all_repositories=includes_all_repositories, 201 units=units, 202 units_map=units_map, 203 )
Alias for AllSpice#create_team
209 def is_member(self, username) -> bool: 210 if isinstance(username, User): 211 username = username.username 212 try: 213 # returns 204 if its ok, 404 if its not 214 self.allspice_client.requests_get( 215 Organization.ORG_IS_MEMBER % (self.username, username) 216 ) 217 return True 218 except Exception: 219 return False
225 def delete(self): 226 """Delete this Organization. Invalidates this Objects data. Also deletes all Repositories owned by the User""" 227 for repo in self.get_repositories(): 228 repo.delete() 229 self.allspice_client.requests_delete(Organization.API_OBJECT.format(name=self.username)) 230 self.deleted = True
Delete this Organization. Invalidates this Objects data. Also deletes all Repositories owned by the User
2301class Release(ApiObject): 2302 """ 2303 A release on a repo. 2304 """ 2305 2306 assets: List[Union[Any, Dict[str, Union[int, str]], "ReleaseAsset"]] 2307 author: User 2308 body: str 2309 created_at: str 2310 draft: bool 2311 html_url: str 2312 id: int 2313 name: str 2314 prerelease: bool 2315 published_at: str 2316 repo: Optional["Repository"] 2317 repository: Optional["Repository"] 2318 tag_name: str 2319 tarball_url: str 2320 target_commitish: str 2321 upload_url: str 2322 url: str 2323 zipball_url: str 2324 2325 API_OBJECT = "/repos/{owner}/{repo}/releases/{id}" 2326 RELEASE_CREATE_ASSET = "/repos/{owner}/{repo}/releases/{id}/assets" 2327 # Note that we don't strictly need the get_assets route, as the release 2328 # object already contains the assets. 2329 2330 def __init__(self, allspice_client): 2331 super().__init__(allspice_client) 2332 2333 def __eq__(self, other): 2334 if not isinstance(other, Release): 2335 return False 2336 return self.repo == other.repo and self.id == other.id 2337 2338 def __hash__(self): 2339 return hash(self.repo) ^ hash(self.id) 2340 2341 _fields_to_parsers: ClassVar[dict] = { 2342 "author": lambda allspice_client, author: User.parse_response(allspice_client, author), 2343 } 2344 _patchable_fields: ClassVar[set[str]] = { 2345 "body", 2346 "draft", 2347 "name", 2348 "prerelease", 2349 "tag_name", 2350 "target_commitish", 2351 } 2352 2353 @classmethod 2354 def parse_response(cls, allspice_client, result, repo) -> Release: 2355 release = super().parse_response(allspice_client, result) 2356 Release._add_read_property("repository", repo, release) 2357 # For legacy reasons 2358 Release._add_read_property("repo", repo, release) 2359 setattr( 2360 release, 2361 "_assets", 2362 [ 2363 ReleaseAsset.parse_response(allspice_client, asset, release) 2364 for asset in result["assets"] 2365 ], 2366 ) 2367 return release 2368 2369 @classmethod 2370 def request( 2371 cls, 2372 allspice_client, 2373 owner: str, 2374 repo: str, 2375 id: Optional[int] = None, 2376 ) -> Release: 2377 args = {"owner": owner, "repo": repo, "id": id} 2378 release_response = cls._get_gitea_api_object(allspice_client, args) 2379 repository = Repository.request(allspice_client, owner, repo) 2380 release = cls.parse_response(allspice_client, release_response, repository) 2381 return release 2382 2383 def commit(self): 2384 args = {"owner": self.repo.owner.name, "repo": self.repo.name, "id": self.id} 2385 self._commit(args) 2386 2387 def create_asset(self, file: IO, name: Optional[str] = None) -> ReleaseAsset: 2388 """ 2389 Create an asset for this release. 2390 2391 https://hub.allspice.io/api/swagger#/repository/repoCreateReleaseAsset 2392 2393 :param file: The file to upload. This should be a file-like object. 2394 :param name: The name of the file. 2395 :return: The created asset. 2396 """ 2397 2398 args: dict[str, Any] = {"files": {"attachment": file}} 2399 if name is not None: 2400 args["params"] = {"name": name} 2401 2402 result = self.allspice_client.requests_post( 2403 self.RELEASE_CREATE_ASSET.format( 2404 owner=self.repo.owner.username, 2405 repo=self.repo.name, 2406 id=self.id, 2407 ), 2408 **args, 2409 ) 2410 return ReleaseAsset.parse_response(self.allspice_client, result, self) 2411 2412 def delete(self): 2413 args = {"owner": self.repo.owner.name, "repo": self.repo.name, "id": self.id} 2414 self.allspice_client.requests_delete(self.API_OBJECT.format(**args)) 2415 self.deleted = True
A release on a repo.
2353 @classmethod 2354 def parse_response(cls, allspice_client, result, repo) -> Release: 2355 release = super().parse_response(allspice_client, result) 2356 Release._add_read_property("repository", repo, release) 2357 # For legacy reasons 2358 Release._add_read_property("repo", repo, release) 2359 setattr( 2360 release, 2361 "_assets", 2362 [ 2363 ReleaseAsset.parse_response(allspice_client, asset, release) 2364 for asset in result["assets"] 2365 ], 2366 ) 2367 return release
2369 @classmethod 2370 def request( 2371 cls, 2372 allspice_client, 2373 owner: str, 2374 repo: str, 2375 id: Optional[int] = None, 2376 ) -> Release: 2377 args = {"owner": owner, "repo": repo, "id": id} 2378 release_response = cls._get_gitea_api_object(allspice_client, args) 2379 repository = Repository.request(allspice_client, owner, repo) 2380 release = cls.parse_response(allspice_client, release_response, repository) 2381 return release
2387 def create_asset(self, file: IO, name: Optional[str] = None) -> ReleaseAsset: 2388 """ 2389 Create an asset for this release. 2390 2391 https://hub.allspice.io/api/swagger#/repository/repoCreateReleaseAsset 2392 2393 :param file: The file to upload. This should be a file-like object. 2394 :param name: The name of the file. 2395 :return: The created asset. 2396 """ 2397 2398 args: dict[str, Any] = {"files": {"attachment": file}} 2399 if name is not None: 2400 args["params"] = {"name": name} 2401 2402 result = self.allspice_client.requests_post( 2403 self.RELEASE_CREATE_ASSET.format( 2404 owner=self.repo.owner.username, 2405 repo=self.repo.name, 2406 id=self.id, 2407 ), 2408 **args, 2409 ) 2410 return ReleaseAsset.parse_response(self.allspice_client, result, self)
Create an asset for this release.
allspice.allspice.io/api/swagger#/repository/repoCreateReleaseAsset">https://huballspice.allspice.io/api/swagger#/repository/repoCreateReleaseAsset
Parameters
- file: The file to upload. This should be a file-like object.
- name: The name of the file.
Returns
The created asset.
477class Repository(ApiObject): 478 allow_fast_forward_only_merge: bool 479 allow_manual_merge: Any 480 allow_merge_commits: bool 481 allow_rebase: bool 482 allow_rebase_explicit: bool 483 allow_rebase_update: bool 484 allow_squash_merge: bool 485 archived: bool 486 archived_at: str 487 autodetect_manual_merge: Any 488 avatar_url: str 489 clone_url: str 490 created_at: str 491 default_allow_maintainer_edit: bool 492 default_branch: str 493 default_delete_branch_after_merge: bool 494 default_merge_style: str 495 description: str 496 empty: bool 497 enable_prune: Any 498 external_tracker: Any 499 external_wiki: Any 500 fork: bool 501 forks_count: int 502 full_name: str 503 has_actions: bool 504 has_issues: bool 505 has_packages: bool 506 has_projects: bool 507 has_pull_requests: bool 508 has_releases: bool 509 has_wiki: bool 510 html_url: str 511 id: int 512 ignore_whitespace_conflicts: bool 513 internal: bool 514 internal_tracker: Dict[str, bool] 515 language: str 516 languages_url: str 517 link: str 518 mirror: bool 519 mirror_interval: str 520 mirror_updated: str 521 name: str 522 object_format_name: str 523 open_issues_count: int 524 open_pr_counter: int 525 original_url: str 526 owner: Union["User", "Organization"] 527 parent: Any 528 permissions: Dict[str, bool] 529 private: bool 530 projects_mode: str 531 release_counter: int 532 repo_transfer: Any 533 size: int 534 ssh_url: str 535 stars_count: int 536 template: bool 537 updated_at: datetime 538 url: str 539 watchers_count: int 540 website: str 541 542 API_OBJECT = """/repos/{owner}/{name}""" # <owner>, <reponame> 543 REPO_IS_COLLABORATOR = """/repos/%s/%s/collaborators/%s""" # <owner>, <reponame>, <username> 544 REPO_SEARCH = """/repos/search/""" 545 REPO_BRANCHES = """/repos/%s/%s/branches""" # <owner>, <reponame> 546 REPO_BRANCH = """/repos/{owner}/{repo}/branches/{branch}""" 547 REPO_ISSUES = """/repos/{owner}/{repo}/issues""" # <owner, reponame> 548 REPO_DESIGN_REVIEWS = """/repos/{owner}/{repo}/pulls""" 549 REPO_DELETE = """/repos/%s/%s""" # <owner>, <reponame> 550 REPO_TIMES = """/repos/%s/%s/times""" # <owner>, <reponame> 551 REPO_USER_TIME = """/repos/%s/%s/times/%s""" # <owner>, <reponame>, <username> 552 REPO_COMMITS = "/repos/%s/%s/commits" # <owner>, <reponame> 553 REPO_TRANSFER = "/repos/{owner}/{repo}/transfer" 554 REPO_MILESTONES = """/repos/{owner}/{repo}/milestones""" 555 REPO_GET_ARCHIVE = "/repos/{owner}/{repo}/archive/{ref}.{format}" 556 REPO_GET_ALLSPICE_JSON = "/repos/{owner}/{repo}/allspice_generated/json/{content}" 557 REPO_GET_ALLSPICE_SVG = "/repos/{owner}/{repo}/allspice_generated/svg/{content}" 558 REPO_GET_TOPICS = "/repos/{owner}/{repo}/topics" 559 REPO_ADD_TOPIC = "/repos/{owner}/{repo}/topics/{topic}" 560 REPO_GET_RELEASES = "/repos/{owner}/{repo}/releases" 561 REPO_GET_LATEST_RELEASE = "/repos/{owner}/{repo}/releases/latest" 562 REPO_GET_RELEASE_BY_TAG = "/repos/{owner}/{repo}/releases/tags/{tag}" 563 REPO_GET_COMMIT_STATUS = "/repos/{owner}/{repo}/statuses/{sha}" 564 REPO_GET_RAW_FILE = "/repos/{owner}/{repo}/raw/{path}" 565 REPO_GET_TREE = "/repos/{owner}/{repo}/git/trees/{ref}" 566 567 class ArchiveFormat(Enum): 568 """ 569 Archive formats for Repository.get_archive 570 """ 571 572 TAR = "tar.gz" 573 ZIP = "zip" 574 575 class CommitStatusSort(Enum): 576 """ 577 Sort order for Repository.get_commit_status 578 """ 579 580 OLDEST = "oldest" 581 RECENT_UPDATE = "recentupdate" 582 LEAST_UPDATE = "leastupdate" 583 LEAST_INDEX = "leastindex" 584 HIGHEST_INDEX = "highestindex" 585 586 def __init__(self, allspice_client): 587 super().__init__(allspice_client) 588 589 def __eq__(self, other): 590 if not isinstance(other, Repository): 591 return False 592 return self.owner == other.owner and self.name == other.name 593 594 def __hash__(self): 595 return hash(self.owner) ^ hash(self.name) 596 597 _fields_to_parsers: ClassVar[dict] = { 598 # dont know how to tell apart user and org as owner except form email being empty. 599 "owner": lambda allspice_client, r: ( 600 Organization.parse_response(allspice_client, r) 601 if r["email"] == "" 602 else User.parse_response(allspice_client, r) 603 ), 604 "updated_at": lambda _, t: Util.convert_time(t), 605 } 606 607 @classmethod 608 def request( 609 cls, 610 allspice_client, 611 owner: str, 612 name: str, 613 ) -> Repository: 614 return cls._request(allspice_client, {"owner": owner, "name": name}) 615 616 @classmethod 617 def search( 618 cls, 619 allspice_client, 620 query: Optional[str] = None, 621 topic: bool = False, 622 include_description: bool = False, 623 user: Optional[User] = None, 624 owner_to_prioritize: Union[User, Organization, None] = None, 625 ) -> list[Repository]: 626 """ 627 Search for repositories. 628 629 See https://hub.allspice.io/api/swagger#/repository/repoSearch 630 631 :param query: The query string to search for 632 :param topic: If true, the query string will only be matched against the 633 repository's topic. 634 :param include_description: If true, the query string will be matched 635 against the repository's description as well. 636 :param user: If specified, only repositories that this user owns or 637 contributes to will be searched. 638 :param owner_to_prioritize: If specified, repositories owned by the 639 given entity will be prioritized in the search. 640 :returns: All repositories matching the query. If there are many 641 repositories matching this query, this may take some time. 642 """ 643 644 params = {} 645 646 if query is not None: 647 params["q"] = query 648 if topic: 649 params["topic"] = topic 650 if include_description: 651 params["include_description"] = include_description 652 if user is not None: 653 params["user"] = user.id 654 if owner_to_prioritize is not None: 655 params["owner_to_prioritize"] = owner_to_prioritize.id 656 657 responses = allspice_client.requests_get_paginated(cls.REPO_SEARCH, params=params) 658 659 return [Repository.parse_response(allspice_client, response) for response in responses] 660 661 _patchable_fields: ClassVar[set[str]] = { 662 "allow_manual_merge", 663 "allow_merge_commits", 664 "allow_rebase", 665 "allow_rebase_explicit", 666 "allow_rebase_update", 667 "allow_squash_merge", 668 "archived", 669 "autodetect_manual_merge", 670 "default_branch", 671 "default_delete_branch_after_merge", 672 "default_merge_style", 673 "description", 674 "enable_prune", 675 "external_tracker", 676 "external_wiki", 677 "has_issues", 678 "has_projects", 679 "has_pull_requests", 680 "has_wiki", 681 "ignore_whitespace_conflicts", 682 "internal_tracker", 683 "mirror_interval", 684 "name", 685 "private", 686 "template", 687 "website", 688 } 689 690 def commit(self): 691 args = {"owner": self.owner.username, "name": self.name} 692 self._commit(args) 693 694 def get_branches(self) -> List["Branch"]: 695 """Get all the Branches of this Repository.""" 696 697 results = self.allspice_client.requests_get_paginated( 698 Repository.REPO_BRANCHES % (self.owner.username, self.name) 699 ) 700 return [Branch.parse_response(self.allspice_client, result) for result in results] 701 702 def get_branch(self, name: str) -> "Branch": 703 """Get a specific Branch of this Repository.""" 704 result = self.allspice_client.requests_get( 705 Repository.REPO_BRANCH.format(owner=self.owner.username, repo=self.name, branch=name) 706 ) 707 return Branch.parse_response(self.allspice_client, result) 708 709 def add_branch(self, create_from: Ref, newname: str) -> "Branch": 710 """Add a branch to the repository""" 711 # Note: will only work with gitea 1.13 or higher! 712 713 ref_name = Util.data_params_for_ref(create_from) 714 if "ref" not in ref_name: 715 raise ValueError("create_from must be a Branch, Commit or string") 716 ref_name = ref_name["ref"] 717 718 data = {"new_branch_name": newname, "old_ref_name": ref_name} 719 result = self.allspice_client.requests_post( 720 Repository.REPO_BRANCHES % (self.owner.username, self.name), data=data 721 ) 722 return Branch.parse_response(self.allspice_client, result) 723 724 def get_issues( 725 self, 726 state: Literal["open", "closed", "all"] = "all", 727 search_query: Optional[str] = None, 728 labels: Optional[List[str]] = None, 729 milestones: Optional[List[Union[Milestone, str]]] = None, 730 assignee: Optional[Union[User, str]] = None, 731 since: Optional[datetime] = None, 732 before: Optional[datetime] = None, 733 ) -> List["Issue"]: 734 """ 735 Get all Issues of this Repository (open and closed) 736 737 https://hub.allspice.io/api/swagger#/repository/repoListIssues 738 739 All params of this method are optional filters. If you don't specify a filter, it 740 will not be applied. 741 742 :param state: The state of the Issues to get. If None, all Issues are returned. 743 :param search_query: Filter issues by text. This is equivalent to searching for 744 `search_query` in the Issues on the web interface. 745 :param labels: Filter issues by labels. 746 :param milestones: Filter issues by milestones. 747 :param assignee: Filter issues by the assigned user. 748 :param since: Filter issues by the date they were created. 749 :param before: Filter issues by the date they were created. 750 :return: A list of Issues. 751 """ 752 753 data = { 754 "state": state, 755 } 756 if search_query: 757 data["q"] = search_query 758 if labels: 759 data["labels"] = ",".join(labels) 760 if milestones: 761 data["milestone"] = ",".join( 762 [ 763 milestone.name if isinstance(milestone, Milestone) else milestone 764 for milestone in milestones 765 ] 766 ) 767 if assignee: 768 if isinstance(assignee, User): 769 data["assignee"] = assignee.username 770 else: 771 data["assignee"] = assignee 772 if since: 773 data["since"] = Util.format_time(since) 774 if before: 775 data["before"] = Util.format_time(before) 776 777 results = self.allspice_client.requests_get_paginated( 778 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 779 params=data, 780 ) 781 782 issues = [] 783 for result in results: 784 issue = Issue.parse_response(self.allspice_client, result) 785 # See Issue.request 786 setattr(issue, "_repository", self) 787 # This is mostly for compatibility with an older implementation 788 Issue._add_read_property("repo", self, issue) 789 issues.append(issue) 790 791 return issues 792 793 def get_design_reviews( 794 self, 795 state: Literal["open", "closed", "all"] = "all", 796 milestone: Optional[Union[Milestone, str]] = None, 797 labels: Optional[List[str]] = None, 798 ) -> List["DesignReview"]: 799 """ 800 Get all Design Reviews of this Repository. 801 802 https://hub.allspice.io/api/swagger#/repository/repoListPullRequests 803 804 :param state: The state of the Design Reviews to get. If None, all Design Reviews 805 are returned. 806 :param milestone: The milestone of the Design Reviews to get. 807 :param labels: A list of label IDs to filter DRs by. 808 :return: A list of Design Reviews. 809 """ 810 811 params = { 812 "state": state, 813 } 814 if milestone: 815 if isinstance(milestone, Milestone): 816 params["milestone"] = milestone.name 817 else: 818 params["milestone"] = milestone 819 if labels: 820 params["labels"] = ",".join(labels) 821 822 results = self.allspice_client.requests_get_paginated( 823 self.REPO_DESIGN_REVIEWS.format(owner=self.owner.username, repo=self.name), 824 params=params, 825 ) 826 return [DesignReview.parse_response(self.allspice_client, result) for result in results] 827 828 def get_commits( 829 self, 830 sha: Optional[str] = None, 831 path: Optional[str] = None, 832 stat: bool = True, 833 ) -> List["Commit"]: 834 """ 835 Get all the Commits of this Repository. 836 837 https://hub.allspice.io/api/swagger#/repository/repoGetAllCommits 838 839 :param sha: The SHA of the commit to start listing commits from. 840 :param path: filepath of a file/dir. 841 :param stat: Include the number of additions and deletions in the response. 842 Disable for speedup. 843 :return: A list of Commits. 844 """ 845 846 data = {} 847 if sha: 848 data["sha"] = sha 849 if path: 850 data["path"] = path 851 if not stat: 852 data["stat"] = False 853 854 try: 855 results = self.allspice_client.requests_get_paginated( 856 Repository.REPO_COMMITS % (self.owner.username, self.name), 857 params=data, 858 ) 859 except ConflictException as err: 860 logging.warning(err) 861 logging.warning("Repository %s/%s is Empty" % (self.owner.username, self.name)) 862 results = [] 863 return [Commit.parse_response(self.allspice_client, result) for result in results] 864 865 def get_issues_state(self, state) -> List["Issue"]: 866 """ 867 DEPRECATED: Use get_issues() instead. 868 869 Get issues of state Issue.open or Issue.closed of a repository. 870 """ 871 872 assert state in [Issue.OPENED, Issue.CLOSED] 873 issues = [] 874 data = {"state": state} 875 results = self.allspice_client.requests_get_paginated( 876 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 877 params=data, 878 ) 879 for result in results: 880 issue = Issue.parse_response(self.allspice_client, result) 881 # adding data not contained in the issue response 882 # See Issue.request() 883 setattr(issue, "_repository", self) 884 Issue._add_read_property("repo", self, issue) 885 Issue._add_read_property("owner", self.owner, issue) 886 issues.append(issue) 887 return issues 888 889 def get_times(self): 890 results = self.allspice_client.requests_get( 891 Repository.REPO_TIMES % (self.owner.username, self.name) 892 ) 893 return results 894 895 def get_user_time(self, username) -> float: 896 if isinstance(username, User): 897 username = username.username 898 results = self.allspice_client.requests_get( 899 Repository.REPO_USER_TIME % (self.owner.username, self.name, username) 900 ) 901 time = sum(r["time"] for r in results) 902 return time 903 904 def get_full_name(self) -> str: 905 return self.owner.username + "/" + self.name 906 907 def create_issue(self, title, assignees=frozenset(), description="") -> ApiObject: 908 data = { 909 "assignees": assignees, 910 "body": description, 911 "closed": False, 912 "title": title, 913 } 914 result = self.allspice_client.requests_post( 915 Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), 916 data=data, 917 ) 918 919 issue = Issue.parse_response(self.allspice_client, result) 920 setattr(issue, "_repository", self) 921 Issue._add_read_property("repo", self, issue) 922 return issue 923 924 def create_design_review( 925 self, 926