allspice.allspice
1import json 2import logging 3from typing import Any, Dict, List, Mapping, Optional, Union 4 5import requests 6import urllib3 7from frozendict import frozendict 8 9from .apiobject import Organization, Repository, Team, User 10from .exceptions import ( 11 AlreadyExistsException, 12 ConflictException, 13 NotFoundException, 14 NotYetGeneratedException, 15) 16from .ratelimiter import RateLimitedSession 17 18 19class AllSpice: 20 """Object to establish a session with AllSpice Hub.""" 21 22 ADMIN_CREATE_USER = """/admin/users""" 23 GET_USERS_ADMIN = """/admin/users""" 24 ADMIN_REPO_CREATE = """/admin/users/%s/repos""" # <ownername> 25 ALLSPICE_HUB_VERSION = """/version""" 26 GET_USER = """/user""" 27 GET_REPOSITORY = """/repos/{owner}/{name}""" 28 CREATE_ORG = """/admin/users/%s/orgs""" # <username> 29 CREATE_TEAM = """/orgs/%s/teams""" # <orgname> 30 31 def __init__( 32 self, 33 allspice_hub_url="https://hub.allspice.io", 34 token_text=None, 35 auth=None, 36 verify=True, 37 log_level="INFO", 38 ratelimiting=(100, 60), 39 ): 40 """Initializing an instance of the AllSpice Hub Client 41 42 Args: 43 allspice_hub_url (str): The URL for the AllSpice Hub instance. 44 Defaults to `https://hub.allspice.io`. 45 46 token_text (str, None): The access token, by default None. 47 48 auth (tuple, None): The user credentials 49 `(username, password)`, by default None. 50 51 verify (bool): If True, allow insecure server connections 52 when using SSL. 53 54 log_level (str): The log level, by default `INFO`. 55 56 ratelimiting (tuple[int, int], None): `(max_calls, period)`, 57 If None, no rate limiting is applied. By default, 100 calls 58 per minute are allowed. 59 """ 60 61 self.logger = logging.getLogger(__name__) 62 self.logger.setLevel(log_level) 63 self.headers = { 64 "Content-type": "application/json", 65 } 66 self.url = allspice_hub_url 67 68 if ratelimiting is None: 69 self.requests = requests.Session() 70 else: 71 (max_calls, period) = ratelimiting 72 self.requests = RateLimitedSession(max_calls=max_calls, period=period) 73 74 # Manage authentification 75 if not token_text and not auth: 76 raise ValueError("Please provide auth or token_text, but not both") 77 if token_text: 78 self.headers["Authorization"] = "token " + token_text 79 if auth: 80 self.logger.warning( 81 "Using basic auth is not recommended. Prefer using a token instead." 82 ) 83 self.requests.auth = auth 84 85 # Manage SSL certification verification 86 self.requests.verify = verify 87 if not verify: 88 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) 89 90 def __get_url(self, endpoint): 91 url = self.url + "/api/v1" + endpoint 92 self.logger.debug("Url: %s" % url) 93 return url 94 95 def __get(self, endpoint: str, params: Mapping = frozendict()) -> requests.Response: 96 request = self.requests.get(self.__get_url(endpoint), headers=self.headers, params=params) 97 if request.status_code not in [200, 201]: 98 message = f"Received status code: {request.status_code} ({request.url})" 99 if request.status_code in [404]: 100 raise NotFoundException(message) 101 if request.status_code in [403]: 102 raise Exception( 103 f"Unauthorized: {request.url} - Check your permissions and try again! ({message})" 104 ) 105 if request.status_code in [409]: 106 raise ConflictException(message) 107 if request.status_code in [503]: 108 raise NotYetGeneratedException(message) 109 raise Exception(message) 110 return request 111 112 @staticmethod 113 def parse_result(result) -> Dict: 114 """Parses the result-JSON to a dict.""" 115 if result.text and len(result.text) > 3: 116 return json.loads(result.text) 117 return {} 118 119 def requests_get(self, endpoint: str, params: Mapping = frozendict(), sudo=None): 120 combined_params = {} 121 combined_params.update(params) 122 if sudo: 123 combined_params["sudo"] = sudo.username 124 return self.parse_result(self.__get(endpoint, combined_params)) 125 126 def requests_get_raw(self, endpoint: str, params=frozendict(), sudo=None) -> bytes: 127 combined_params = {} 128 combined_params.update(params) 129 if sudo: 130 combined_params["sudo"] = sudo.username 131 return self.__get(endpoint, combined_params).content 132 133 def requests_get_paginated( 134 self, 135 endpoint: str, 136 params=frozendict(), 137 sudo=None, 138 page_key: str = "page", 139 first_page: int = 1, 140 ): 141 page = first_page 142 combined_params = {} 143 combined_params.update(params) 144 aggregated_result = [] 145 while True: 146 combined_params[page_key] = page 147 result = self.requests_get(endpoint, combined_params, sudo) 148 149 if not result: 150 return aggregated_result 151 152 if isinstance(result, dict): 153 if "data" in result: 154 data = result["data"] 155 if len(data) == 0: 156 return aggregated_result 157 aggregated_result.extend(data) 158 elif "tree" in result: 159 data = result["tree"] 160 if data is None or len(data) == 0: 161 return aggregated_result 162 aggregated_result.extend(data) 163 else: 164 raise NotImplementedError( 165 "requests_get_paginated does not know how to handle responses of this type." 166 ) 167 else: 168 aggregated_result.extend(result) 169 170 page += 1 171 172 def requests_put(self, endpoint: str, data: Optional[dict] = None): 173 if not data: 174 data = {} 175 request = self.requests.put( 176 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 177 ) 178 if request.status_code not in [200, 204]: 179 message = f"Received status code: {request.status_code} ({request.url}) {request.text}" 180 self.logger.error(message) 181 raise Exception(message) 182 183 def requests_delete(self, endpoint: str, data: Optional[dict] = None): 184 request = self.requests.delete( 185 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 186 ) 187 if request.status_code not in [200, 204]: 188 message = f"Received status code: {request.status_code} ({request.url})" 189 self.logger.error(message) 190 raise Exception(message) 191 192 def requests_post( 193 self, 194 endpoint: str, 195 data: Optional[dict] = None, 196 params: Optional[dict] = None, 197 files: Optional[dict] = None, 198 ): 199 """ 200 Make a POST call to the endpoint. 201 202 :param endpoint: The path to the endpoint 203 :param data: A dictionary for JSON data 204 :param params: A dictionary of query params 205 :param files: A dictionary of files, see requests.post. Using both files and data 206 can lead to unexpected results! 207 :return: The JSON response parsed as a dict 208 """ 209 210 # This should ideally be a TypedDict of the type of arguments taken by 211 # `requests.post`. 212 args: dict[str, Any] = { 213 "headers": self.headers.copy(), 214 } 215 if data is not None: 216 args["data"] = json.dumps(data) 217 if params is not None: 218 args["params"] = params 219 if files is not None: 220 args["headers"].pop("Content-type") 221 args["files"] = files 222 223 request = self.requests.post(self.__get_url(endpoint), **args) 224 225 if request.status_code not in [200, 201, 202]: 226 if "already exists" in request.text or "e-mail already in use" in request.text: 227 self.logger.warning(request.text) 228 raise AlreadyExistsException() 229 self.logger.error(f"Received status code: {request.status_code} ({request.url})") 230 self.logger.error(f"With info: {data} ({self.headers})") 231 self.logger.error(f"Answer: {request.text}") 232 raise Exception( 233 f"Received status code: {request.status_code} ({request.url}), {request.text}" 234 ) 235 return self.parse_result(request) 236 237 def requests_patch(self, endpoint: str, data: dict): 238 request = self.requests.patch( 239 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 240 ) 241 if request.status_code not in [200, 201]: 242 error_message = f"Received status code: {request.status_code} ({request.url}) {data}" 243 self.logger.error(error_message) 244 raise Exception(error_message) 245 return self.parse_result(request) 246 247 def get_orgs_public_members_all(self, orgname): 248 path = "/orgs/" + orgname + "/public_members" 249 return self.requests_get(path) 250 251 def get_orgs(self): 252 path = "/admin/orgs" 253 results = self.requests_get(path) 254 return [Organization.parse_response(self, result) for result in results] 255 256 def get_user(self): 257 result = self.requests_get(AllSpice.GET_USER) 258 return User.parse_response(self, result) 259 260 def get_version(self) -> str: 261 result = self.requests_get(AllSpice.ALLSPICE_HUB_VERSION) 262 return result["version"] 263 264 def get_users(self) -> List[User]: 265 results = self.requests_get(AllSpice.GET_USERS_ADMIN) 266 return [User.parse_response(self, result) for result in results] 267 268 def get_user_by_email(self, email: str) -> Optional[User]: 269 users = self.get_users() 270 for user in users: 271 if user.email == email or email in user.emails: 272 return user 273 return None 274 275 def get_user_by_name(self, username: str) -> Optional[User]: 276 users = self.get_users() 277 for user in users: 278 if user.username == username: 279 return user 280 return None 281 282 def get_repository(self, owner: str, name: str) -> Repository: 283 path = self.GET_REPOSITORY.format(owner=owner, name=name) 284 result = self.requests_get(path) 285 return Repository.parse_response(self, result) 286 287 def create_user( 288 self, 289 user_name: str, 290 email: str, 291 password: str, 292 full_name: Optional[str] = None, 293 login_name: Optional[str] = None, 294 change_pw=True, 295 send_notify=True, 296 source_id=0, 297 ): 298 """Create User. 299 Throws: 300 AlreadyExistsException, if the User exists already 301 Exception, if something else went wrong. 302 """ 303 if not login_name: 304 login_name = user_name 305 if not full_name: 306 full_name = user_name 307 request_data = { 308 "source_id": source_id, 309 "login_name": login_name, 310 "full_name": full_name, 311 "username": user_name, 312 "email": email, 313 "password": password, 314 "send_notify": send_notify, 315 "must_change_password": change_pw, 316 } 317 318 self.logger.debug("Gitea post payload: %s", request_data) 319 result = self.requests_post(AllSpice.ADMIN_CREATE_USER, data=request_data) 320 if "id" in result: 321 self.logger.info( 322 "Successfully created User %s <%s> (id %s)", 323 result["login"], 324 result["email"], 325 result["id"], 326 ) 327 self.logger.debug("Gitea response: %s", result) 328 else: 329 self.logger.error(result["message"]) 330 raise Exception("User not created... (gitea: %s)" % result["message"]) 331 user = User.parse_response(self, result) 332 return user 333 334 def create_repo( 335 self, 336 repoOwner: Union[User, Organization], 337 repoName: str, 338 description: str = "", 339 private: bool = False, 340 autoInit=True, 341 gitignores: Optional[str] = None, 342 license: Optional[str] = None, 343 readme: str = "Default", 344 issue_labels: Optional[str] = None, 345 default_branch="master", 346 ): 347 """Create a Repository as the administrator 348 349 Throws: 350 AlreadyExistsException: If the Repository exists already. 351 Exception: If something else went wrong. 352 353 Note: 354 Non-admin users can not use this method. Please use instead 355 `allspice.User.create_repo` or `allspice.Organization.create_repo`. 356 """ 357 # although this only says user in the api, this also works for 358 # organizations 359 assert isinstance(repoOwner, User) or isinstance(repoOwner, Organization) 360 result = self.requests_post( 361 AllSpice.ADMIN_REPO_CREATE % repoOwner.username, 362 data={ 363 "name": repoName, 364 "description": description, 365 "private": private, 366 "auto_init": autoInit, 367 "gitignores": gitignores, 368 "license": license, 369 "issue_labels": issue_labels, 370 "readme": readme, 371 "default_branch": default_branch, 372 }, 373 ) 374 if "id" in result: 375 self.logger.info("Successfully created Repository %s " % result["name"]) 376 else: 377 self.logger.error(result["message"]) 378 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 379 return Repository.parse_response(self, result) 380 381 def create_org( 382 self, 383 owner: User, 384 orgName: str, 385 description: str, 386 location="", 387 website="", 388 full_name="", 389 ): 390 assert isinstance(owner, User) 391 result = self.requests_post( 392 AllSpice.CREATE_ORG % owner.username, 393 data={ 394 "username": orgName, 395 "description": description, 396 "location": location, 397 "website": website, 398 "full_name": full_name, 399 }, 400 ) 401 if "id" in result: 402 self.logger.info("Successfully created Organization %s" % result["username"]) 403 else: 404 self.logger.error("Organization not created... (gitea: %s)" % result["message"]) 405 self.logger.error(result["message"]) 406 raise Exception("Organization not created... (gitea: %s)" % result["message"]) 407 return Organization.parse_response(self, result) 408 409 def create_team( 410 self, 411 org: Organization, 412 name: str, 413 description: str = "", 414 permission: str = "read", 415 can_create_org_repo: bool = False, 416 includes_all_repositories: bool = False, 417 units=( 418 "repo.code", 419 "repo.issues", 420 "repo.ext_issues", 421 "repo.wiki", 422 "repo.pulls", 423 "repo.releases", 424 "repo.ext_wiki", 425 ), 426 units_map={}, 427 ): 428 """Creates a Team. 429 430 Args: 431 org (Organization): Organization the Team will be part of. 432 name (str): The Name of the Team to be created. 433 description (str): Optional, None, short description of the new Team. 434 permission (str): Optional, 'read', What permissions the members 435 units_map (dict): Optional, {}, a mapping of units to their 436 permissions. If None or empty, the `permission` permission will 437 be applied to all units. Note: When both `units` and `units_map` 438 are given, `units_map` will be preferred. 439 """ 440 441 result = self.requests_post( 442 AllSpice.CREATE_TEAM % org.username, 443 data={ 444 "name": name, 445 "description": description, 446 "permission": permission, 447 "can_create_org_repo": can_create_org_repo, 448 "includes_all_repositories": includes_all_repositories, 449 "units": units, 450 "units_map": units_map, 451 }, 452 ) 453 454 if "id" in result: 455 self.logger.info("Successfully created Team %s" % result["name"]) 456 else: 457 self.logger.error("Team not created... (gitea: %s)" % result["message"]) 458 self.logger.error(result["message"]) 459 raise Exception("Team not created... (gitea: %s)" % result["message"]) 460 api_object = Team.parse_response(self, result) 461 setattr( 462 api_object, "_organization", org 463 ) # fixes strange behaviour of gitea not returning a valid organization here. 464 return api_object
20class AllSpice: 21 """Object to establish a session with AllSpice Hub.""" 22 23 ADMIN_CREATE_USER = """/admin/users""" 24 GET_USERS_ADMIN = """/admin/users""" 25 ADMIN_REPO_CREATE = """/admin/users/%s/repos""" # <ownername> 26 ALLSPICE_HUB_VERSION = """/version""" 27 GET_USER = """/user""" 28 GET_REPOSITORY = """/repos/{owner}/{name}""" 29 CREATE_ORG = """/admin/users/%s/orgs""" # <username> 30 CREATE_TEAM = """/orgs/%s/teams""" # <orgname> 31 32 def __init__( 33 self, 34 allspice_hub_url="https://hub.allspice.io", 35 token_text=None, 36 auth=None, 37 verify=True, 38 log_level="INFO", 39 ratelimiting=(100, 60), 40 ): 41 """Initializing an instance of the AllSpice Hub Client 42 43 Args: 44 allspice_hub_url (str): The URL for the AllSpice Hub instance. 45 Defaults to `https://hub.allspice.io`. 46 47 token_text (str, None): The access token, by default None. 48 49 auth (tuple, None): The user credentials 50 `(username, password)`, by default None. 51 52 verify (bool): If True, allow insecure server connections 53 when using SSL. 54 55 log_level (str): The log level, by default `INFO`. 56 57 ratelimiting (tuple[int, int], None): `(max_calls, period)`, 58 If None, no rate limiting is applied. By default, 100 calls 59 per minute are allowed. 60 """ 61 62 self.logger = logging.getLogger(__name__) 63 self.logger.setLevel(log_level) 64 self.headers = { 65 "Content-type": "application/json", 66 } 67 self.url = allspice_hub_url 68 69 if ratelimiting is None: 70 self.requests = requests.Session() 71 else: 72 (max_calls, period) = ratelimiting 73 self.requests = RateLimitedSession(max_calls=max_calls, period=period) 74 75 # Manage authentification 76 if not token_text and not auth: 77 raise ValueError("Please provide auth or token_text, but not both") 78 if token_text: 79 self.headers["Authorization"] = "token " + token_text 80 if auth: 81 self.logger.warning( 82 "Using basic auth is not recommended. Prefer using a token instead." 83 ) 84 self.requests.auth = auth 85 86 # Manage SSL certification verification 87 self.requests.verify = verify 88 if not verify: 89 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) 90 91 def __get_url(self, endpoint): 92 url = self.url + "/api/v1" + endpoint 93 self.logger.debug("Url: %s" % url) 94 return url 95 96 def __get(self, endpoint: str, params: Mapping = frozendict()) -> requests.Response: 97 request = self.requests.get(self.__get_url(endpoint), headers=self.headers, params=params) 98 if request.status_code not in [200, 201]: 99 message = f"Received status code: {request.status_code} ({request.url})" 100 if request.status_code in [404]: 101 raise NotFoundException(message) 102 if request.status_code in [403]: 103 raise Exception( 104 f"Unauthorized: {request.url} - Check your permissions and try again! ({message})" 105 ) 106 if request.status_code in [409]: 107 raise ConflictException(message) 108 if request.status_code in [503]: 109 raise NotYetGeneratedException(message) 110 raise Exception(message) 111 return request 112 113 @staticmethod 114 def parse_result(result) -> Dict: 115 """Parses the result-JSON to a dict.""" 116 if result.text and len(result.text) > 3: 117 return json.loads(result.text) 118 return {} 119 120 def requests_get(self, endpoint: str, params: Mapping = frozendict(), sudo=None): 121 combined_params = {} 122 combined_params.update(params) 123 if sudo: 124 combined_params["sudo"] = sudo.username 125 return self.parse_result(self.__get(endpoint, combined_params)) 126 127 def requests_get_raw(self, endpoint: str, params=frozendict(), sudo=None) -> bytes: 128 combined_params = {} 129 combined_params.update(params) 130 if sudo: 131 combined_params["sudo"] = sudo.username 132 return self.__get(endpoint, combined_params).content 133 134 def requests_get_paginated( 135 self, 136 endpoint: str, 137 params=frozendict(), 138 sudo=None, 139 page_key: str = "page", 140 first_page: int = 1, 141 ): 142 page = first_page 143 combined_params = {} 144 combined_params.update(params) 145 aggregated_result = [] 146 while True: 147 combined_params[page_key] = page 148 result = self.requests_get(endpoint, combined_params, sudo) 149 150 if not result: 151 return aggregated_result 152 153 if isinstance(result, dict): 154 if "data" in result: 155 data = result["data"] 156 if len(data) == 0: 157 return aggregated_result 158 aggregated_result.extend(data) 159 elif "tree" in result: 160 data = result["tree"] 161 if data is None or len(data) == 0: 162 return aggregated_result 163 aggregated_result.extend(data) 164 else: 165 raise NotImplementedError( 166 "requests_get_paginated does not know how to handle responses of this type." 167 ) 168 else: 169 aggregated_result.extend(result) 170 171 page += 1 172 173 def requests_put(self, endpoint: str, data: Optional[dict] = None): 174 if not data: 175 data = {} 176 request = self.requests.put( 177 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 178 ) 179 if request.status_code not in [200, 204]: 180 message = f"Received status code: {request.status_code} ({request.url}) {request.text}" 181 self.logger.error(message) 182 raise Exception(message) 183 184 def requests_delete(self, endpoint: str, data: Optional[dict] = None): 185 request = self.requests.delete( 186 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 187 ) 188 if request.status_code not in [200, 204]: 189 message = f"Received status code: {request.status_code} ({request.url})" 190 self.logger.error(message) 191 raise Exception(message) 192 193 def requests_post( 194 self, 195 endpoint: str, 196 data: Optional[dict] = None, 197 params: Optional[dict] = None, 198 files: Optional[dict] = None, 199 ): 200 """ 201 Make a POST call to the endpoint. 202 203 :param endpoint: The path to the endpoint 204 :param data: A dictionary for JSON data 205 :param params: A dictionary of query params 206 :param files: A dictionary of files, see requests.post. Using both files and data 207 can lead to unexpected results! 208 :return: The JSON response parsed as a dict 209 """ 210 211 # This should ideally be a TypedDict of the type of arguments taken by 212 # `requests.post`. 213 args: dict[str, Any] = { 214 "headers": self.headers.copy(), 215 } 216 if data is not None: 217 args["data"] = json.dumps(data) 218 if params is not None: 219 args["params"] = params 220 if files is not None: 221 args["headers"].pop("Content-type") 222 args["files"] = files 223 224 request = self.requests.post(self.__get_url(endpoint), **args) 225 226 if request.status_code not in [200, 201, 202]: 227 if "already exists" in request.text or "e-mail already in use" in request.text: 228 self.logger.warning(request.text) 229 raise AlreadyExistsException() 230 self.logger.error(f"Received status code: {request.status_code} ({request.url})") 231 self.logger.error(f"With info: {data} ({self.headers})") 232 self.logger.error(f"Answer: {request.text}") 233 raise Exception( 234 f"Received status code: {request.status_code} ({request.url}), {request.text}" 235 ) 236 return self.parse_result(request) 237 238 def requests_patch(self, endpoint: str, data: dict): 239 request = self.requests.patch( 240 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 241 ) 242 if request.status_code not in [200, 201]: 243 error_message = f"Received status code: {request.status_code} ({request.url}) {data}" 244 self.logger.error(error_message) 245 raise Exception(error_message) 246 return self.parse_result(request) 247 248 def get_orgs_public_members_all(self, orgname): 249 path = "/orgs/" + orgname + "/public_members" 250 return self.requests_get(path) 251 252 def get_orgs(self): 253 path = "/admin/orgs" 254 results = self.requests_get(path) 255 return [Organization.parse_response(self, result) for result in results] 256 257 def get_user(self): 258 result = self.requests_get(AllSpice.GET_USER) 259 return User.parse_response(self, result) 260 261 def get_version(self) -> str: 262 result = self.requests_get(AllSpice.ALLSPICE_HUB_VERSION) 263 return result["version"] 264 265 def get_users(self) -> List[User]: 266 results = self.requests_get(AllSpice.GET_USERS_ADMIN) 267 return [User.parse_response(self, result) for result in results] 268 269 def get_user_by_email(self, email: str) -> Optional[User]: 270 users = self.get_users() 271 for user in users: 272 if user.email == email or email in user.emails: 273 return user 274 return None 275 276 def get_user_by_name(self, username: str) -> Optional[User]: 277 users = self.get_users() 278 for user in users: 279 if user.username == username: 280 return user 281 return None 282 283 def get_repository(self, owner: str, name: str) -> Repository: 284 path = self.GET_REPOSITORY.format(owner=owner, name=name) 285 result = self.requests_get(path) 286 return Repository.parse_response(self, result) 287 288 def create_user( 289 self, 290 user_name: str, 291 email: str, 292 password: str, 293 full_name: Optional[str] = None, 294 login_name: Optional[str] = None, 295 change_pw=True, 296 send_notify=True, 297 source_id=0, 298 ): 299 """Create User. 300 Throws: 301 AlreadyExistsException, if the User exists already 302 Exception, if something else went wrong. 303 """ 304 if not login_name: 305 login_name = user_name 306 if not full_name: 307 full_name = user_name 308 request_data = { 309 "source_id": source_id, 310 "login_name": login_name, 311 "full_name": full_name, 312 "username": user_name, 313 "email": email, 314 "password": password, 315 "send_notify": send_notify, 316 "must_change_password": change_pw, 317 } 318 319 self.logger.debug("Gitea post payload: %s", request_data) 320 result = self.requests_post(AllSpice.ADMIN_CREATE_USER, data=request_data) 321 if "id" in result: 322 self.logger.info( 323 "Successfully created User %s <%s> (id %s)", 324 result["login"], 325 result["email"], 326 result["id"], 327 ) 328 self.logger.debug("Gitea response: %s", result) 329 else: 330 self.logger.error(result["message"]) 331 raise Exception("User not created... (gitea: %s)" % result["message"]) 332 user = User.parse_response(self, result) 333 return user 334 335 def create_repo( 336 self, 337 repoOwner: Union[User, Organization], 338 repoName: str, 339 description: str = "", 340 private: bool = False, 341 autoInit=True, 342 gitignores: Optional[str] = None, 343 license: Optional[str] = None, 344 readme: str = "Default", 345 issue_labels: Optional[str] = None, 346 default_branch="master", 347 ): 348 """Create a Repository as the administrator 349 350 Throws: 351 AlreadyExistsException: If the Repository exists already. 352 Exception: If something else went wrong. 353 354 Note: 355 Non-admin users can not use this method. Please use instead 356 `allspice.User.create_repo` or `allspice.Organization.create_repo`. 357 """ 358 # although this only says user in the api, this also works for 359 # organizations 360 assert isinstance(repoOwner, User) or isinstance(repoOwner, Organization) 361 result = self.requests_post( 362 AllSpice.ADMIN_REPO_CREATE % repoOwner.username, 363 data={ 364 "name": repoName, 365 "description": description, 366 "private": private, 367 "auto_init": autoInit, 368 "gitignores": gitignores, 369 "license": license, 370 "issue_labels": issue_labels, 371 "readme": readme, 372 "default_branch": default_branch, 373 }, 374 ) 375 if "id" in result: 376 self.logger.info("Successfully created Repository %s " % result["name"]) 377 else: 378 self.logger.error(result["message"]) 379 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 380 return Repository.parse_response(self, result) 381 382 def create_org( 383 self, 384 owner: User, 385 orgName: str, 386 description: str, 387 location="", 388 website="", 389 full_name="", 390 ): 391 assert isinstance(owner, User) 392 result = self.requests_post( 393 AllSpice.CREATE_ORG % owner.username, 394 data={ 395 "username": orgName, 396 "description": description, 397 "location": location, 398 "website": website, 399 "full_name": full_name, 400 }, 401 ) 402 if "id" in result: 403 self.logger.info("Successfully created Organization %s" % result["username"]) 404 else: 405 self.logger.error("Organization not created... (gitea: %s)" % result["message"]) 406 self.logger.error(result["message"]) 407 raise Exception("Organization not created... (gitea: %s)" % result["message"]) 408 return Organization.parse_response(self, result) 409 410 def create_team( 411 self, 412 org: Organization, 413 name: str, 414 description: str = "", 415 permission: str = "read", 416 can_create_org_repo: bool = False, 417 includes_all_repositories: bool = False, 418 units=( 419 "repo.code", 420 "repo.issues", 421 "repo.ext_issues", 422 "repo.wiki", 423 "repo.pulls", 424 "repo.releases", 425 "repo.ext_wiki", 426 ), 427 units_map={}, 428 ): 429 """Creates a Team. 430 431 Args: 432 org (Organization): Organization the Team will be part of. 433 name (str): The Name of the Team to be created. 434 description (str): Optional, None, short description of the new Team. 435 permission (str): Optional, 'read', What permissions the members 436 units_map (dict): Optional, {}, a mapping of units to their 437 permissions. If None or empty, the `permission` permission will 438 be applied to all units. Note: When both `units` and `units_map` 439 are given, `units_map` will be preferred. 440 """ 441 442 result = self.requests_post( 443 AllSpice.CREATE_TEAM % org.username, 444 data={ 445 "name": name, 446 "description": description, 447 "permission": permission, 448 "can_create_org_repo": can_create_org_repo, 449 "includes_all_repositories": includes_all_repositories, 450 "units": units, 451 "units_map": units_map, 452 }, 453 ) 454 455 if "id" in result: 456 self.logger.info("Successfully created Team %s" % result["name"]) 457 else: 458 self.logger.error("Team not created... (gitea: %s)" % result["message"]) 459 self.logger.error(result["message"]) 460 raise Exception("Team not created... (gitea: %s)" % result["message"]) 461 api_object = Team.parse_response(self, result) 462 setattr( 463 api_object, "_organization", org 464 ) # fixes strange behaviour of gitea not returning a valid organization here. 465 return api_object
Object to establish a session with AllSpice Hub.
32 def __init__( 33 self, 34 allspice_hub_url="https://hub.allspice.io", 35 token_text=None, 36 auth=None, 37 verify=True, 38 log_level="INFO", 39 ratelimiting=(100, 60), 40 ): 41 """Initializing an instance of the AllSpice Hub Client 42 43 Args: 44 allspice_hub_url (str): The URL for the AllSpice Hub instance. 45 Defaults to `https://hub.allspice.io`. 46 47 token_text (str, None): The access token, by default None. 48 49 auth (tuple, None): The user credentials 50 `(username, password)`, by default None. 51 52 verify (bool): If True, allow insecure server connections 53 when using SSL. 54 55 log_level (str): The log level, by default `INFO`. 56 57 ratelimiting (tuple[int, int], None): `(max_calls, period)`, 58 If None, no rate limiting is applied. By default, 100 calls 59 per minute are allowed. 60 """ 61 62 self.logger = logging.getLogger(__name__) 63 self.logger.setLevel(log_level) 64 self.headers = { 65 "Content-type": "application/json", 66 } 67 self.url = allspice_hub_url 68 69 if ratelimiting is None: 70 self.requests = requests.Session() 71 else: 72 (max_calls, period) = ratelimiting 73 self.requests = RateLimitedSession(max_calls=max_calls, period=period) 74 75 # Manage authentification 76 if not token_text and not auth: 77 raise ValueError("Please provide auth or token_text, but not both") 78 if token_text: 79 self.headers["Authorization"] = "token " + token_text 80 if auth: 81 self.logger.warning( 82 "Using basic auth is not recommended. Prefer using a token instead." 83 ) 84 self.requests.auth = auth 85 86 # Manage SSL certification verification 87 self.requests.verify = verify 88 if not verify: 89 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
Initializing an instance of the AllSpice Hub Client
Args:
allspice_hub_url (str): The URL for the AllSpice Hub instance.
Defaults to https://hub.allspice.io
.
token_text (str, None): The access token, by default None.
auth (tuple, None): The user credentials
`(username, password)`, by default None.
verify (bool): If True, allow insecure server connections
when using SSL.
log_level (str): The log level, by default `INFO`.
ratelimiting (tuple[int, int], None): `(max_calls, period)`,
If None, no rate limiting is applied. By default, 100 calls
per minute are allowed.
113 @staticmethod 114 def parse_result(result) -> Dict: 115 """Parses the result-JSON to a dict.""" 116 if result.text and len(result.text) > 3: 117 return json.loads(result.text) 118 return {}
Parses the result-JSON to a dict.
134 def requests_get_paginated( 135 self, 136 endpoint: str, 137 params=frozendict(), 138 sudo=None, 139 page_key: str = "page", 140 first_page: int = 1, 141 ): 142 page = first_page 143 combined_params = {} 144 combined_params.update(params) 145 aggregated_result = [] 146 while True: 147 combined_params[page_key] = page 148 result = self.requests_get(endpoint, combined_params, sudo) 149 150 if not result: 151 return aggregated_result 152 153 if isinstance(result, dict): 154 if "data" in result: 155 data = result["data"] 156 if len(data) == 0: 157 return aggregated_result 158 aggregated_result.extend(data) 159 elif "tree" in result: 160 data = result["tree"] 161 if data is None or len(data) == 0: 162 return aggregated_result 163 aggregated_result.extend(data) 164 else: 165 raise NotImplementedError( 166 "requests_get_paginated does not know how to handle responses of this type." 167 ) 168 else: 169 aggregated_result.extend(result) 170 171 page += 1
173 def requests_put(self, endpoint: str, data: Optional[dict] = None): 174 if not data: 175 data = {} 176 request = self.requests.put( 177 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 178 ) 179 if request.status_code not in [200, 204]: 180 message = f"Received status code: {request.status_code} ({request.url}) {request.text}" 181 self.logger.error(message) 182 raise Exception(message)
184 def requests_delete(self, endpoint: str, data: Optional[dict] = None): 185 request = self.requests.delete( 186 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 187 ) 188 if request.status_code not in [200, 204]: 189 message = f"Received status code: {request.status_code} ({request.url})" 190 self.logger.error(message) 191 raise Exception(message)
193 def requests_post( 194 self, 195 endpoint: str, 196 data: Optional[dict] = None, 197 params: Optional[dict] = None, 198 files: Optional[dict] = None, 199 ): 200 """ 201 Make a POST call to the endpoint. 202 203 :param endpoint: The path to the endpoint 204 :param data: A dictionary for JSON data 205 :param params: A dictionary of query params 206 :param files: A dictionary of files, see requests.post. Using both files and data 207 can lead to unexpected results! 208 :return: The JSON response parsed as a dict 209 """ 210 211 # This should ideally be a TypedDict of the type of arguments taken by 212 # `requests.post`. 213 args: dict[str, Any] = { 214 "headers": self.headers.copy(), 215 } 216 if data is not None: 217 args["data"] = json.dumps(data) 218 if params is not None: 219 args["params"] = params 220 if files is not None: 221 args["headers"].pop("Content-type") 222 args["files"] = files 223 224 request = self.requests.post(self.__get_url(endpoint), **args) 225 226 if request.status_code not in [200, 201, 202]: 227 if "already exists" in request.text or "e-mail already in use" in request.text: 228 self.logger.warning(request.text) 229 raise AlreadyExistsException() 230 self.logger.error(f"Received status code: {request.status_code} ({request.url})") 231 self.logger.error(f"With info: {data} ({self.headers})") 232 self.logger.error(f"Answer: {request.text}") 233 raise Exception( 234 f"Received status code: {request.status_code} ({request.url}), {request.text}" 235 ) 236 return self.parse_result(request)
Make a POST call to the endpoint.
Parameters
- endpoint: The path to the endpoint
- data: A dictionary for JSON data
- params: A dictionary of query params
- files: A dictionary of files, see requests.post. Using both files and data can lead to unexpected results!
Returns
The JSON response parsed as a dict
238 def requests_patch(self, endpoint: str, data: dict): 239 request = self.requests.patch( 240 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 241 ) 242 if request.status_code not in [200, 201]: 243 error_message = f"Received status code: {request.status_code} ({request.url}) {data}" 244 self.logger.error(error_message) 245 raise Exception(error_message) 246 return self.parse_result(request)
288 def create_user( 289 self, 290 user_name: str, 291 email: str, 292 password: str, 293 full_name: Optional[str] = None, 294 login_name: Optional[str] = None, 295 change_pw=True, 296 send_notify=True, 297 source_id=0, 298 ): 299 """Create User. 300 Throws: 301 AlreadyExistsException, if the User exists already 302 Exception, if something else went wrong. 303 """ 304 if not login_name: 305 login_name = user_name 306 if not full_name: 307 full_name = user_name 308 request_data = { 309 "source_id": source_id, 310 "login_name": login_name, 311 "full_name": full_name, 312 "username": user_name, 313 "email": email, 314 "password": password, 315 "send_notify": send_notify, 316 "must_change_password": change_pw, 317 } 318 319 self.logger.debug("Gitea post payload: %s", request_data) 320 result = self.requests_post(AllSpice.ADMIN_CREATE_USER, data=request_data) 321 if "id" in result: 322 self.logger.info( 323 "Successfully created User %s <%s> (id %s)", 324 result["login"], 325 result["email"], 326 result["id"], 327 ) 328 self.logger.debug("Gitea response: %s", result) 329 else: 330 self.logger.error(result["message"]) 331 raise Exception("User not created... (gitea: %s)" % result["message"]) 332 user = User.parse_response(self, result) 333 return user
Create User. Throws: AlreadyExistsException, if the User exists already Exception, if something else went wrong.
335 def create_repo( 336 self, 337 repoOwner: Union[User, Organization], 338 repoName: str, 339 description: str = "", 340 private: bool = False, 341 autoInit=True, 342 gitignores: Optional[str] = None, 343 license: Optional[str] = None, 344 readme: str = "Default", 345 issue_labels: Optional[str] = None, 346 default_branch="master", 347 ): 348 """Create a Repository as the administrator 349 350 Throws: 351 AlreadyExistsException: If the Repository exists already. 352 Exception: If something else went wrong. 353 354 Note: 355 Non-admin users can not use this method. Please use instead 356 `allspice.User.create_repo` or `allspice.Organization.create_repo`. 357 """ 358 # although this only says user in the api, this also works for 359 # organizations 360 assert isinstance(repoOwner, User) or isinstance(repoOwner, Organization) 361 result = self.requests_post( 362 AllSpice.ADMIN_REPO_CREATE % repoOwner.username, 363 data={ 364 "name": repoName, 365 "description": description, 366 "private": private, 367 "auto_init": autoInit, 368 "gitignores": gitignores, 369 "license": license, 370 "issue_labels": issue_labels, 371 "readme": readme, 372 "default_branch": default_branch, 373 }, 374 ) 375 if "id" in result: 376 self.logger.info("Successfully created Repository %s " % result["name"]) 377 else: 378 self.logger.error(result["message"]) 379 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 380 return Repository.parse_response(self, result)
Create a Repository as the administrator
Throws: AlreadyExistsException: If the Repository exists already. Exception: If something else went wrong.
Note:
Non-admin users can not use this method. Please use instead
allspice.User.create_repo
or allspice.Organization.create_repo
.
382 def create_org( 383 self, 384 owner: User, 385 orgName: str, 386 description: str, 387 location="", 388 website="", 389 full_name="", 390 ): 391 assert isinstance(owner, User) 392 result = self.requests_post( 393 AllSpice.CREATE_ORG % owner.username, 394 data={ 395 "username": orgName, 396 "description": description, 397 "location": location, 398 "website": website, 399 "full_name": full_name, 400 }, 401 ) 402 if "id" in result: 403 self.logger.info("Successfully created Organization %s" % result["username"]) 404 else: 405 self.logger.error("Organization not created... (gitea: %s)" % result["message"]) 406 self.logger.error(result["message"]) 407 raise Exception("Organization not created... (gitea: %s)" % result["message"]) 408 return Organization.parse_response(self, result)
410 def create_team( 411 self, 412 org: Organization, 413 name: str, 414 description: str = "", 415 permission: str = "read", 416 can_create_org_repo: bool = False, 417 includes_all_repositories: bool = False, 418 units=( 419 "repo.code", 420 "repo.issues", 421 "repo.ext_issues", 422 "repo.wiki", 423 "repo.pulls", 424 "repo.releases", 425 "repo.ext_wiki", 426 ), 427 units_map={}, 428 ): 429 """Creates a Team. 430 431 Args: 432 org (Organization): Organization the Team will be part of. 433 name (str): The Name of the Team to be created. 434 description (str): Optional, None, short description of the new Team. 435 permission (str): Optional, 'read', What permissions the members 436 units_map (dict): Optional, {}, a mapping of units to their 437 permissions. If None or empty, the `permission` permission will 438 be applied to all units. Note: When both `units` and `units_map` 439 are given, `units_map` will be preferred. 440 """ 441 442 result = self.requests_post( 443 AllSpice.CREATE_TEAM % org.username, 444 data={ 445 "name": name, 446 "description": description, 447 "permission": permission, 448 "can_create_org_repo": can_create_org_repo, 449 "includes_all_repositories": includes_all_repositories, 450 "units": units, 451 "units_map": units_map, 452 }, 453 ) 454 455 if "id" in result: 456 self.logger.info("Successfully created Team %s" % result["name"]) 457 else: 458 self.logger.error("Team not created... (gitea: %s)" % result["message"]) 459 self.logger.error(result["message"]) 460 raise Exception("Team not created... (gitea: %s)" % result["message"]) 461 api_object = Team.parse_response(self, result) 462 setattr( 463 api_object, "_organization", org 464 ) # fixes strange behaviour of gitea not returning a valid organization here. 465 return api_object
Creates a Team.
Args:
org (Organization): Organization the Team will be part of.
name (str): The Name of the Team to be created.
description (str): Optional, None, short description of the new Team.
permission (str): Optional, 'read', What permissions the members
units_map (dict): Optional, {}, a mapping of units to their
permissions. If None or empty, the permission
permission will
be applied to all units. Note: When both units
and units_map
are given, units_map
will be preferred.