allspice.allspice
1import json 2import logging 3import sys 4from typing import Any, Dict, List, Mapping, Optional, Union 5 6import requests 7import urllib3 8from frozendict import frozendict 9 10from .apiobject import Organization, Repository, Team, User 11from .exceptions import ( 12 AlreadyExistsException, 13 ConflictException, 14 NotFoundException, 15 NotYetGeneratedException, 16) 17from .ratelimiter import RateLimitedSession 18 19 20class AllSpice: 21 """Object to establish a session with AllSpice Hub.""" 22 23 ADMIN_CREATE_USER = """/admin/users""" 24 GET_USERS_ADMIN = """/admin/users""" 25 ADMIN_REPO_CREATE = """/admin/users/%s/repos""" # <ownername> 26 ALLSPICE_HUB_VERSION = """/version""" 27 GET_USER = """/user""" 28 GET_REPOSITORY = """/repos/{owner}/{name}""" 29 CREATE_ORG = """/admin/users/%s/orgs""" # <username> 30 CREATE_TEAM = """/orgs/%s/teams""" # <orgname> 31 32 def __init__( 33 self, 34 allspice_hub_url="https://hub.allspice.io", 35 token_text=None, 36 auth=None, 37 verify=True, 38 log_level="INFO", 39 ratelimiting=(100, 60), 40 ): 41 """Initializing an instance of the AllSpice Hub Client 42 43 Args: 44 allspice_hub_url (str): The URL for the AllSpice Hub instance. 45 Defaults to `https://hub.allspice.io`. 46 47 token_text (str, None): The access token, by default None. 48 49 auth (tuple, None): The user credentials 50 `(username, password)`, by default None. 51 52 verify (bool): If True, allow insecure server connections 53 when using SSL. 54 55 log_level (str): The log level, by default `INFO`. 56 57 ratelimiting (tuple[int, int], None): `(max_calls, period)`, 58 If None, no rate limiting is applied. By default, 100 calls 59 per minute are allowed. 60 """ 61 62 self.logger = logging.getLogger(__name__) 63 handler = logging.StreamHandler(sys.stderr) 64 handler.setFormatter( 65 logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") 66 ) 67 self.logger.addHandler(handler) 68 self.logger.setLevel(log_level) 69 self.headers = { 70 "Content-type": "application/json", 71 } 72 self.url = allspice_hub_url 73 74 if ratelimiting is None: 75 self.requests = requests.Session() 76 else: 77 (max_calls, period) = ratelimiting 78 self.requests = RateLimitedSession(max_calls=max_calls, period=period) 79 80 # Manage authentification 81 if not token_text and not auth: 82 raise ValueError("Please provide auth or token_text, but not both") 83 if token_text: 84 self.headers["Authorization"] = "token " + token_text 85 if auth: 86 self.logger.warning( 87 "Using basic auth is not recommended. Prefer using a token instead." 88 ) 89 self.requests.auth = auth 90 91 # Manage SSL certification verification 92 self.requests.verify = verify 93 if not verify: 94 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) 95 96 def __get_url(self, endpoint): 97 url = self.url + "/api/v1" + endpoint 98 self.logger.debug("Url: %s" % url) 99 return url 100 101 def __get(self, endpoint: str, params: Mapping = frozendict()) -> requests.Response: 102 request = self.requests.get(self.__get_url(endpoint), headers=self.headers, params=params) 103 if request.status_code not in [200, 201]: 104 message = f"Received status code: {request.status_code} ({request.url})" 105 if request.status_code in [404]: 106 raise NotFoundException(message) 107 if request.status_code in [403]: 108 raise Exception( 109 f"Unauthorized: {request.url} - Check your permissions and try again! ({message})" 110 ) 111 if request.status_code in [409]: 112 raise ConflictException(message) 113 if request.status_code in [503]: 114 raise NotYetGeneratedException(message) 115 raise Exception(message) 116 return request 117 118 @staticmethod 119 def parse_result(result) -> Dict: 120 """Parses the result-JSON to a dict.""" 121 if result.text and len(result.text) > 3: 122 return json.loads(result.text) 123 return {} 124 125 def requests_get(self, endpoint: str, params: Mapping = frozendict(), sudo=None): 126 combined_params = {} 127 combined_params.update(params) 128 if sudo: 129 combined_params["sudo"] = sudo.username 130 return self.parse_result(self.__get(endpoint, combined_params)) 131 132 def requests_get_raw(self, endpoint: str, params=frozendict(), sudo=None) -> bytes: 133 combined_params = {} 134 combined_params.update(params) 135 if sudo: 136 combined_params["sudo"] = sudo.username 137 return self.__get(endpoint, combined_params).content 138 139 def requests_get_paginated( 140 self, 141 endpoint: str, 142 params=frozendict(), 143 sudo=None, 144 page_key: str = "page", 145 first_page: int = 1, 146 ): 147 page = first_page 148 combined_params = {} 149 combined_params.update(params) 150 aggregated_result = [] 151 while True: 152 combined_params[page_key] = page 153 result = self.requests_get(endpoint, combined_params, sudo) 154 155 if not result: 156 return aggregated_result 157 158 if isinstance(result, dict): 159 if "data" in result: 160 data = result["data"] 161 if len(data) == 0: 162 return aggregated_result 163 aggregated_result.extend(data) 164 elif "tree" in result: 165 data = result["tree"] 166 if data is None or len(data) == 0: 167 return aggregated_result 168 aggregated_result.extend(data) 169 else: 170 raise NotImplementedError( 171 "requests_get_paginated does not know how to handle responses of this type." 172 ) 173 else: 174 aggregated_result.extend(result) 175 176 page += 1 177 178 def requests_put(self, endpoint: str, data: Optional[dict] = None): 179 if not data: 180 data = {} 181 request = self.requests.put( 182 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 183 ) 184 if request.status_code not in [200, 204]: 185 message = f"Received status code: {request.status_code} ({request.url}) {request.text}" 186 self.logger.error(message) 187 raise Exception(message) 188 189 def requests_delete(self, endpoint: str, data: Optional[dict] = None): 190 request = self.requests.delete( 191 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 192 ) 193 if request.status_code not in [200, 204]: 194 message = f"Received status code: {request.status_code} ({request.url})" 195 self.logger.error(message) 196 raise Exception(message) 197 198 def requests_post( 199 self, 200 endpoint: str, 201 data: Optional[dict] = None, 202 params: Optional[dict] = None, 203 files: Optional[dict] = None, 204 ): 205 """ 206 Make a POST call to the endpoint. 207 208 :param endpoint: The path to the endpoint 209 :param data: A dictionary for JSON data 210 :param params: A dictionary of query params 211 :param files: A dictionary of files, see requests.post. Using both files and data 212 can lead to unexpected results! 213 :return: The JSON response parsed as a dict 214 """ 215 216 # This should ideally be a TypedDict of the type of arguments taken by 217 # `requests.post`. 218 args: dict[str, Any] = { 219 "headers": self.headers.copy(), 220 } 221 if data is not None: 222 args["data"] = json.dumps(data) 223 if params is not None: 224 args["params"] = params 225 if files is not None: 226 args["headers"].pop("Content-type") 227 args["files"] = files 228 229 request = self.requests.post(self.__get_url(endpoint), **args) 230 231 if request.status_code not in [200, 201, 202]: 232 if "already exists" in request.text or "e-mail already in use" in request.text: 233 self.logger.warning(request.text) 234 raise AlreadyExistsException() 235 self.logger.error(f"Received status code: {request.status_code} ({request.url})") 236 self.logger.error(f"With info: {data} ({self.headers})") 237 self.logger.error(f"Answer: {request.text}") 238 raise Exception( 239 f"Received status code: {request.status_code} ({request.url}), {request.text}" 240 ) 241 return self.parse_result(request) 242 243 def requests_patch(self, endpoint: str, data: dict): 244 request = self.requests.patch( 245 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 246 ) 247 if request.status_code not in [200, 201]: 248 error_message = f"Received status code: {request.status_code} ({request.url}) {data}" 249 self.logger.error(error_message) 250 raise Exception(error_message) 251 return self.parse_result(request) 252 253 def get_orgs_public_members_all(self, orgname): 254 path = "/orgs/" + orgname + "/public_members" 255 return self.requests_get(path) 256 257 def get_orgs(self): 258 path = "/admin/orgs" 259 results = self.requests_get(path) 260 return [Organization.parse_response(self, result) for result in results] 261 262 def get_user(self): 263 result = self.requests_get(AllSpice.GET_USER) 264 return User.parse_response(self, result) 265 266 def get_version(self) -> str: 267 result = self.requests_get(AllSpice.ALLSPICE_HUB_VERSION) 268 return result["version"] 269 270 def get_users(self) -> List[User]: 271 results = self.requests_get(AllSpice.GET_USERS_ADMIN) 272 return [User.parse_response(self, result) for result in results] 273 274 def get_user_by_email(self, email: str) -> Optional[User]: 275 users = self.get_users() 276 for user in users: 277 if user.email == email or email in user.emails: 278 return user 279 return None 280 281 def get_user_by_name(self, username: str) -> Optional[User]: 282 users = self.get_users() 283 for user in users: 284 if user.username == username: 285 return user 286 return None 287 288 def get_repository(self, owner: str, name: str) -> Repository: 289 path = self.GET_REPOSITORY.format(owner=owner, name=name) 290 result = self.requests_get(path) 291 return Repository.parse_response(self, result) 292 293 def create_user( 294 self, 295 user_name: str, 296 email: str, 297 password: str, 298 full_name: Optional[str] = None, 299 login_name: Optional[str] = None, 300 change_pw=True, 301 send_notify=True, 302 source_id=0, 303 ): 304 """Create User. 305 Throws: 306 AlreadyExistsException, if the User exists already 307 Exception, if something else went wrong. 308 """ 309 if not login_name: 310 login_name = user_name 311 if not full_name: 312 full_name = user_name 313 request_data = { 314 "source_id": source_id, 315 "login_name": login_name, 316 "full_name": full_name, 317 "username": user_name, 318 "email": email, 319 "password": password, 320 "send_notify": send_notify, 321 "must_change_password": change_pw, 322 } 323 324 self.logger.debug("Gitea post payload: %s", request_data) 325 result = self.requests_post(AllSpice.ADMIN_CREATE_USER, data=request_data) 326 if "id" in result: 327 self.logger.info( 328 "Successfully created User %s <%s> (id %s)", 329 result["login"], 330 result["email"], 331 result["id"], 332 ) 333 self.logger.debug("Gitea response: %s", result) 334 else: 335 self.logger.error(result["message"]) 336 raise Exception("User not created... (gitea: %s)" % result["message"]) 337 user = User.parse_response(self, result) 338 return user 339 340 def create_repo( 341 self, 342 repoOwner: Union[User, Organization], 343 repoName: str, 344 description: str = "", 345 private: bool = False, 346 autoInit=True, 347 gitignores: Optional[str] = None, 348 license: Optional[str] = None, 349 readme: str = "Default", 350 issue_labels: Optional[str] = None, 351 default_branch="master", 352 ): 353 """Create a Repository as the administrator 354 355 Throws: 356 AlreadyExistsException: If the Repository exists already. 357 Exception: If something else went wrong. 358 359 Note: 360 Non-admin users can not use this method. Please use instead 361 `allspice.User.create_repo` or `allspice.Organization.create_repo`. 362 """ 363 # although this only says user in the api, this also works for 364 # organizations 365 assert isinstance(repoOwner, User) or isinstance(repoOwner, Organization) 366 result = self.requests_post( 367 AllSpice.ADMIN_REPO_CREATE % repoOwner.username, 368 data={ 369 "name": repoName, 370 "description": description, 371 "private": private, 372 "auto_init": autoInit, 373 "gitignores": gitignores, 374 "license": license, 375 "issue_labels": issue_labels, 376 "readme": readme, 377 "default_branch": default_branch, 378 }, 379 ) 380 if "id" in result: 381 self.logger.info("Successfully created Repository %s " % result["name"]) 382 else: 383 self.logger.error(result["message"]) 384 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 385 return Repository.parse_response(self, result) 386 387 def create_org( 388 self, 389 owner: User, 390 orgName: str, 391 description: str, 392 location="", 393 website="", 394 full_name="", 395 ): 396 assert isinstance(owner, User) 397 result = self.requests_post( 398 AllSpice.CREATE_ORG % owner.username, 399 data={ 400 "username": orgName, 401 "description": description, 402 "location": location, 403 "website": website, 404 "full_name": full_name, 405 }, 406 ) 407 if "id" in result: 408 self.logger.info("Successfully created Organization %s" % result["username"]) 409 else: 410 self.logger.error("Organization not created... (gitea: %s)" % result["message"]) 411 self.logger.error(result["message"]) 412 raise Exception("Organization not created... (gitea: %s)" % result["message"]) 413 return Organization.parse_response(self, result) 414 415 def create_team( 416 self, 417 org: Organization, 418 name: str, 419 description: str = "", 420 permission: str = "read", 421 can_create_org_repo: bool = False, 422 includes_all_repositories: bool = False, 423 units=( 424 "repo.code", 425 "repo.issues", 426 "repo.ext_issues", 427 "repo.wiki", 428 "repo.pulls", 429 "repo.releases", 430 "repo.ext_wiki", 431 ), 432 units_map={}, 433 ): 434 """Creates a Team. 435 436 Args: 437 org (Organization): Organization the Team will be part of. 438 name (str): The Name of the Team to be created. 439 description (str): Optional, None, short description of the new Team. 440 permission (str): Optional, 'read', What permissions the members 441 units_map (dict): Optional, {}, a mapping of units to their 442 permissions. If None or empty, the `permission` permission will 443 be applied to all units. Note: When both `units` and `units_map` 444 are given, `units_map` will be preferred. 445 """ 446 447 result = self.requests_post( 448 AllSpice.CREATE_TEAM % org.username, 449 data={ 450 "name": name, 451 "description": description, 452 "permission": permission, 453 "can_create_org_repo": can_create_org_repo, 454 "includes_all_repositories": includes_all_repositories, 455 "units": units, 456 "units_map": units_map, 457 }, 458 ) 459 460 if "id" in result: 461 self.logger.info("Successfully created Team %s" % result["name"]) 462 else: 463 self.logger.error("Team not created... (gitea: %s)" % result["message"]) 464 self.logger.error(result["message"]) 465 raise Exception("Team not created... (gitea: %s)" % result["message"]) 466 api_object = Team.parse_response(self, result) 467 setattr( 468 api_object, "_organization", org 469 ) # fixes strange behaviour of gitea not returning a valid organization here. 470 return api_object
21class AllSpice: 22 """Object to establish a session with AllSpice Hub.""" 23 24 ADMIN_CREATE_USER = """/admin/users""" 25 GET_USERS_ADMIN = """/admin/users""" 26 ADMIN_REPO_CREATE = """/admin/users/%s/repos""" # <ownername> 27 ALLSPICE_HUB_VERSION = """/version""" 28 GET_USER = """/user""" 29 GET_REPOSITORY = """/repos/{owner}/{name}""" 30 CREATE_ORG = """/admin/users/%s/orgs""" # <username> 31 CREATE_TEAM = """/orgs/%s/teams""" # <orgname> 32 33 def __init__( 34 self, 35 allspice_hub_url="https://hub.allspice.io", 36 token_text=None, 37 auth=None, 38 verify=True, 39 log_level="INFO", 40 ratelimiting=(100, 60), 41 ): 42 """Initializing an instance of the AllSpice Hub Client 43 44 Args: 45 allspice_hub_url (str): The URL for the AllSpice Hub instance. 46 Defaults to `https://hub.allspice.io`. 47 48 token_text (str, None): The access token, by default None. 49 50 auth (tuple, None): The user credentials 51 `(username, password)`, by default None. 52 53 verify (bool): If True, allow insecure server connections 54 when using SSL. 55 56 log_level (str): The log level, by default `INFO`. 57 58 ratelimiting (tuple[int, int], None): `(max_calls, period)`, 59 If None, no rate limiting is applied. By default, 100 calls 60 per minute are allowed. 61 """ 62 63 self.logger = logging.getLogger(__name__) 64 handler = logging.StreamHandler(sys.stderr) 65 handler.setFormatter( 66 logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") 67 ) 68 self.logger.addHandler(handler) 69 self.logger.setLevel(log_level) 70 self.headers = { 71 "Content-type": "application/json", 72 } 73 self.url = allspice_hub_url 74 75 if ratelimiting is None: 76 self.requests = requests.Session() 77 else: 78 (max_calls, period) = ratelimiting 79 self.requests = RateLimitedSession(max_calls=max_calls, period=period) 80 81 # Manage authentification 82 if not token_text and not auth: 83 raise ValueError("Please provide auth or token_text, but not both") 84 if token_text: 85 self.headers["Authorization"] = "token " + token_text 86 if auth: 87 self.logger.warning( 88 "Using basic auth is not recommended. Prefer using a token instead." 89 ) 90 self.requests.auth = auth 91 92 # Manage SSL certification verification 93 self.requests.verify = verify 94 if not verify: 95 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) 96 97 def __get_url(self, endpoint): 98 url = self.url + "/api/v1" + endpoint 99 self.logger.debug("Url: %s" % url) 100 return url 101 102 def __get(self, endpoint: str, params: Mapping = frozendict()) -> requests.Response: 103 request = self.requests.get(self.__get_url(endpoint), headers=self.headers, params=params) 104 if request.status_code not in [200, 201]: 105 message = f"Received status code: {request.status_code} ({request.url})" 106 if request.status_code in [404]: 107 raise NotFoundException(message) 108 if request.status_code in [403]: 109 raise Exception( 110 f"Unauthorized: {request.url} - Check your permissions and try again! ({message})" 111 ) 112 if request.status_code in [409]: 113 raise ConflictException(message) 114 if request.status_code in [503]: 115 raise NotYetGeneratedException(message) 116 raise Exception(message) 117 return request 118 119 @staticmethod 120 def parse_result(result) -> Dict: 121 """Parses the result-JSON to a dict.""" 122 if result.text and len(result.text) > 3: 123 return json.loads(result.text) 124 return {} 125 126 def requests_get(self, endpoint: str, params: Mapping = frozendict(), sudo=None): 127 combined_params = {} 128 combined_params.update(params) 129 if sudo: 130 combined_params["sudo"] = sudo.username 131 return self.parse_result(self.__get(endpoint, combined_params)) 132 133 def requests_get_raw(self, endpoint: str, params=frozendict(), sudo=None) -> bytes: 134 combined_params = {} 135 combined_params.update(params) 136 if sudo: 137 combined_params["sudo"] = sudo.username 138 return self.__get(endpoint, combined_params).content 139 140 def requests_get_paginated( 141 self, 142 endpoint: str, 143 params=frozendict(), 144 sudo=None, 145 page_key: str = "page", 146 first_page: int = 1, 147 ): 148 page = first_page 149 combined_params = {} 150 combined_params.update(params) 151 aggregated_result = [] 152 while True: 153 combined_params[page_key] = page 154 result = self.requests_get(endpoint, combined_params, sudo) 155 156 if not result: 157 return aggregated_result 158 159 if isinstance(result, dict): 160 if "data" in result: 161 data = result["data"] 162 if len(data) == 0: 163 return aggregated_result 164 aggregated_result.extend(data) 165 elif "tree" in result: 166 data = result["tree"] 167 if data is None or len(data) == 0: 168 return aggregated_result 169 aggregated_result.extend(data) 170 else: 171 raise NotImplementedError( 172 "requests_get_paginated does not know how to handle responses of this type." 173 ) 174 else: 175 aggregated_result.extend(result) 176 177 page += 1 178 179 def requests_put(self, endpoint: str, data: Optional[dict] = None): 180 if not data: 181 data = {} 182 request = self.requests.put( 183 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 184 ) 185 if request.status_code not in [200, 204]: 186 message = f"Received status code: {request.status_code} ({request.url}) {request.text}" 187 self.logger.error(message) 188 raise Exception(message) 189 190 def requests_delete(self, endpoint: str, data: Optional[dict] = None): 191 request = self.requests.delete( 192 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 193 ) 194 if request.status_code not in [200, 204]: 195 message = f"Received status code: {request.status_code} ({request.url})" 196 self.logger.error(message) 197 raise Exception(message) 198 199 def requests_post( 200 self, 201 endpoint: str, 202 data: Optional[dict] = None, 203 params: Optional[dict] = None, 204 files: Optional[dict] = None, 205 ): 206 """ 207 Make a POST call to the endpoint. 208 209 :param endpoint: The path to the endpoint 210 :param data: A dictionary for JSON data 211 :param params: A dictionary of query params 212 :param files: A dictionary of files, see requests.post. Using both files and data 213 can lead to unexpected results! 214 :return: The JSON response parsed as a dict 215 """ 216 217 # This should ideally be a TypedDict of the type of arguments taken by 218 # `requests.post`. 219 args: dict[str, Any] = { 220 "headers": self.headers.copy(), 221 } 222 if data is not None: 223 args["data"] = json.dumps(data) 224 if params is not None: 225 args["params"] = params 226 if files is not None: 227 args["headers"].pop("Content-type") 228 args["files"] = files 229 230 request = self.requests.post(self.__get_url(endpoint), **args) 231 232 if request.status_code not in [200, 201, 202]: 233 if "already exists" in request.text or "e-mail already in use" in request.text: 234 self.logger.warning(request.text) 235 raise AlreadyExistsException() 236 self.logger.error(f"Received status code: {request.status_code} ({request.url})") 237 self.logger.error(f"With info: {data} ({self.headers})") 238 self.logger.error(f"Answer: {request.text}") 239 raise Exception( 240 f"Received status code: {request.status_code} ({request.url}), {request.text}" 241 ) 242 return self.parse_result(request) 243 244 def requests_patch(self, endpoint: str, data: dict): 245 request = self.requests.patch( 246 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 247 ) 248 if request.status_code not in [200, 201]: 249 error_message = f"Received status code: {request.status_code} ({request.url}) {data}" 250 self.logger.error(error_message) 251 raise Exception(error_message) 252 return self.parse_result(request) 253 254 def get_orgs_public_members_all(self, orgname): 255 path = "/orgs/" + orgname + "/public_members" 256 return self.requests_get(path) 257 258 def get_orgs(self): 259 path = "/admin/orgs" 260 results = self.requests_get(path) 261 return [Organization.parse_response(self, result) for result in results] 262 263 def get_user(self): 264 result = self.requests_get(AllSpice.GET_USER) 265 return User.parse_response(self, result) 266 267 def get_version(self) -> str: 268 result = self.requests_get(AllSpice.ALLSPICE_HUB_VERSION) 269 return result["version"] 270 271 def get_users(self) -> List[User]: 272 results = self.requests_get(AllSpice.GET_USERS_ADMIN) 273 return [User.parse_response(self, result) for result in results] 274 275 def get_user_by_email(self, email: str) -> Optional[User]: 276 users = self.get_users() 277 for user in users: 278 if user.email == email or email in user.emails: 279 return user 280 return None 281 282 def get_user_by_name(self, username: str) -> Optional[User]: 283 users = self.get_users() 284 for user in users: 285 if user.username == username: 286 return user 287 return None 288 289 def get_repository(self, owner: str, name: str) -> Repository: 290 path = self.GET_REPOSITORY.format(owner=owner, name=name) 291 result = self.requests_get(path) 292 return Repository.parse_response(self, result) 293 294 def create_user( 295 self, 296 user_name: str, 297 email: str, 298 password: str, 299 full_name: Optional[str] = None, 300 login_name: Optional[str] = None, 301 change_pw=True, 302 send_notify=True, 303 source_id=0, 304 ): 305 """Create User. 306 Throws: 307 AlreadyExistsException, if the User exists already 308 Exception, if something else went wrong. 309 """ 310 if not login_name: 311 login_name = user_name 312 if not full_name: 313 full_name = user_name 314 request_data = { 315 "source_id": source_id, 316 "login_name": login_name, 317 "full_name": full_name, 318 "username": user_name, 319 "email": email, 320 "password": password, 321 "send_notify": send_notify, 322 "must_change_password": change_pw, 323 } 324 325 self.logger.debug("Gitea post payload: %s", request_data) 326 result = self.requests_post(AllSpice.ADMIN_CREATE_USER, data=request_data) 327 if "id" in result: 328 self.logger.info( 329 "Successfully created User %s <%s> (id %s)", 330 result["login"], 331 result["email"], 332 result["id"], 333 ) 334 self.logger.debug("Gitea response: %s", result) 335 else: 336 self.logger.error(result["message"]) 337 raise Exception("User not created... (gitea: %s)" % result["message"]) 338 user = User.parse_response(self, result) 339 return user 340 341 def create_repo( 342 self, 343 repoOwner: Union[User, Organization], 344 repoName: str, 345 description: str = "", 346 private: bool = False, 347 autoInit=True, 348 gitignores: Optional[str] = None, 349 license: Optional[str] = None, 350 readme: str = "Default", 351 issue_labels: Optional[str] = None, 352 default_branch="master", 353 ): 354 """Create a Repository as the administrator 355 356 Throws: 357 AlreadyExistsException: If the Repository exists already. 358 Exception: If something else went wrong. 359 360 Note: 361 Non-admin users can not use this method. Please use instead 362 `allspice.User.create_repo` or `allspice.Organization.create_repo`. 363 """ 364 # although this only says user in the api, this also works for 365 # organizations 366 assert isinstance(repoOwner, User) or isinstance(repoOwner, Organization) 367 result = self.requests_post( 368 AllSpice.ADMIN_REPO_CREATE % repoOwner.username, 369 data={ 370 "name": repoName, 371 "description": description, 372 "private": private, 373 "auto_init": autoInit, 374 "gitignores": gitignores, 375 "license": license, 376 "issue_labels": issue_labels, 377 "readme": readme, 378 "default_branch": default_branch, 379 }, 380 ) 381 if "id" in result: 382 self.logger.info("Successfully created Repository %s " % result["name"]) 383 else: 384 self.logger.error(result["message"]) 385 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 386 return Repository.parse_response(self, result) 387 388 def create_org( 389 self, 390 owner: User, 391 orgName: str, 392 description: str, 393 location="", 394 website="", 395 full_name="", 396 ): 397 assert isinstance(owner, User) 398 result = self.requests_post( 399 AllSpice.CREATE_ORG % owner.username, 400 data={ 401 "username": orgName, 402 "description": description, 403 "location": location, 404 "website": website, 405 "full_name": full_name, 406 }, 407 ) 408 if "id" in result: 409 self.logger.info("Successfully created Organization %s" % result["username"]) 410 else: 411 self.logger.error("Organization not created... (gitea: %s)" % result["message"]) 412 self.logger.error(result["message"]) 413 raise Exception("Organization not created... (gitea: %s)" % result["message"]) 414 return Organization.parse_response(self, result) 415 416 def create_team( 417 self, 418 org: Organization, 419 name: str, 420 description: str = "", 421 permission: str = "read", 422 can_create_org_repo: bool = False, 423 includes_all_repositories: bool = False, 424 units=( 425 "repo.code", 426 "repo.issues", 427 "repo.ext_issues", 428 "repo.wiki", 429 "repo.pulls", 430 "repo.releases", 431 "repo.ext_wiki", 432 ), 433 units_map={}, 434 ): 435 """Creates a Team. 436 437 Args: 438 org (Organization): Organization the Team will be part of. 439 name (str): The Name of the Team to be created. 440 description (str): Optional, None, short description of the new Team. 441 permission (str): Optional, 'read', What permissions the members 442 units_map (dict): Optional, {}, a mapping of units to their 443 permissions. If None or empty, the `permission` permission will 444 be applied to all units. Note: When both `units` and `units_map` 445 are given, `units_map` will be preferred. 446 """ 447 448 result = self.requests_post( 449 AllSpice.CREATE_TEAM % org.username, 450 data={ 451 "name": name, 452 "description": description, 453 "permission": permission, 454 "can_create_org_repo": can_create_org_repo, 455 "includes_all_repositories": includes_all_repositories, 456 "units": units, 457 "units_map": units_map, 458 }, 459 ) 460 461 if "id" in result: 462 self.logger.info("Successfully created Team %s" % result["name"]) 463 else: 464 self.logger.error("Team not created... (gitea: %s)" % result["message"]) 465 self.logger.error(result["message"]) 466 raise Exception("Team not created... (gitea: %s)" % result["message"]) 467 api_object = Team.parse_response(self, result) 468 setattr( 469 api_object, "_organization", org 470 ) # fixes strange behaviour of gitea not returning a valid organization here. 471 return api_object
Object to establish a session with AllSpice Hub.
33 def __init__( 34 self, 35 allspice_hub_url="https://hub.allspice.io", 36 token_text=None, 37 auth=None, 38 verify=True, 39 log_level="INFO", 40 ratelimiting=(100, 60), 41 ): 42 """Initializing an instance of the AllSpice Hub Client 43 44 Args: 45 allspice_hub_url (str): The URL for the AllSpice Hub instance. 46 Defaults to `https://hub.allspice.io`. 47 48 token_text (str, None): The access token, by default None. 49 50 auth (tuple, None): The user credentials 51 `(username, password)`, by default None. 52 53 verify (bool): If True, allow insecure server connections 54 when using SSL. 55 56 log_level (str): The log level, by default `INFO`. 57 58 ratelimiting (tuple[int, int], None): `(max_calls, period)`, 59 If None, no rate limiting is applied. By default, 100 calls 60 per minute are allowed. 61 """ 62 63 self.logger = logging.getLogger(__name__) 64 handler = logging.StreamHandler(sys.stderr) 65 handler.setFormatter( 66 logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") 67 ) 68 self.logger.addHandler(handler) 69 self.logger.setLevel(log_level) 70 self.headers = { 71 "Content-type": "application/json", 72 } 73 self.url = allspice_hub_url 74 75 if ratelimiting is None: 76 self.requests = requests.Session() 77 else: 78 (max_calls, period) = ratelimiting 79 self.requests = RateLimitedSession(max_calls=max_calls, period=period) 80 81 # Manage authentification 82 if not token_text and not auth: 83 raise ValueError("Please provide auth or token_text, but not both") 84 if token_text: 85 self.headers["Authorization"] = "token " + token_text 86 if auth: 87 self.logger.warning( 88 "Using basic auth is not recommended. Prefer using a token instead." 89 ) 90 self.requests.auth = auth 91 92 # Manage SSL certification verification 93 self.requests.verify = verify 94 if not verify: 95 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
Initializing an instance of the AllSpice Hub Client
Args:
allspice_hub_url (str): The URL for the AllSpice Hub instance.
Defaults to https://hub.allspice.io
.
token_text (str, None): The access token, by default None.
auth (tuple, None): The user credentials
`(username, password)`, by default None.
verify (bool): If True, allow insecure server connections
when using SSL.
log_level (str): The log level, by default `INFO`.
ratelimiting (tuple[int, int], None): `(max_calls, period)`,
If None, no rate limiting is applied. By default, 100 calls
per minute are allowed.
119 @staticmethod 120 def parse_result(result) -> Dict: 121 """Parses the result-JSON to a dict.""" 122 if result.text and len(result.text) > 3: 123 return json.loads(result.text) 124 return {}
Parses the result-JSON to a dict.
140 def requests_get_paginated( 141 self, 142 endpoint: str, 143 params=frozendict(), 144 sudo=None, 145 page_key: str = "page", 146 first_page: int = 1, 147 ): 148 page = first_page 149 combined_params = {} 150 combined_params.update(params) 151 aggregated_result = [] 152 while True: 153 combined_params[page_key] = page 154 result = self.requests_get(endpoint, combined_params, sudo) 155 156 if not result: 157 return aggregated_result 158 159 if isinstance(result, dict): 160 if "data" in result: 161 data = result["data"] 162 if len(data) == 0: 163 return aggregated_result 164 aggregated_result.extend(data) 165 elif "tree" in result: 166 data = result["tree"] 167 if data is None or len(data) == 0: 168 return aggregated_result 169 aggregated_result.extend(data) 170 else: 171 raise NotImplementedError( 172 "requests_get_paginated does not know how to handle responses of this type." 173 ) 174 else: 175 aggregated_result.extend(result) 176 177 page += 1
179 def requests_put(self, endpoint: str, data: Optional[dict] = None): 180 if not data: 181 data = {} 182 request = self.requests.put( 183 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 184 ) 185 if request.status_code not in [200, 204]: 186 message = f"Received status code: {request.status_code} ({request.url}) {request.text}" 187 self.logger.error(message) 188 raise Exception(message)
190 def requests_delete(self, endpoint: str, data: Optional[dict] = None): 191 request = self.requests.delete( 192 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 193 ) 194 if request.status_code not in [200, 204]: 195 message = f"Received status code: {request.status_code} ({request.url})" 196 self.logger.error(message) 197 raise Exception(message)
199 def requests_post( 200 self, 201 endpoint: str, 202 data: Optional[dict] = None, 203 params: Optional[dict] = None, 204 files: Optional[dict] = None, 205 ): 206 """ 207 Make a POST call to the endpoint. 208 209 :param endpoint: The path to the endpoint 210 :param data: A dictionary for JSON data 211 :param params: A dictionary of query params 212 :param files: A dictionary of files, see requests.post. Using both files and data 213 can lead to unexpected results! 214 :return: The JSON response parsed as a dict 215 """ 216 217 # This should ideally be a TypedDict of the type of arguments taken by 218 # `requests.post`. 219 args: dict[str, Any] = { 220 "headers": self.headers.copy(), 221 } 222 if data is not None: 223 args["data"] = json.dumps(data) 224 if params is not None: 225 args["params"] = params 226 if files is not None: 227 args["headers"].pop("Content-type") 228 args["files"] = files 229 230 request = self.requests.post(self.__get_url(endpoint), **args) 231 232 if request.status_code not in [200, 201, 202]: 233 if "already exists" in request.text or "e-mail already in use" in request.text: 234 self.logger.warning(request.text) 235 raise AlreadyExistsException() 236 self.logger.error(f"Received status code: {request.status_code} ({request.url})") 237 self.logger.error(f"With info: {data} ({self.headers})") 238 self.logger.error(f"Answer: {request.text}") 239 raise Exception( 240 f"Received status code: {request.status_code} ({request.url}), {request.text}" 241 ) 242 return self.parse_result(request)
Make a POST call to the endpoint.
Parameters
- endpoint: The path to the endpoint
- data: A dictionary for JSON data
- params: A dictionary of query params
- files: A dictionary of files, see requests.post. Using both files and data can lead to unexpected results!
Returns
The JSON response parsed as a dict
244 def requests_patch(self, endpoint: str, data: dict): 245 request = self.requests.patch( 246 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 247 ) 248 if request.status_code not in [200, 201]: 249 error_message = f"Received status code: {request.status_code} ({request.url}) {data}" 250 self.logger.error(error_message) 251 raise Exception(error_message) 252 return self.parse_result(request)
294 def create_user( 295 self, 296 user_name: str, 297 email: str, 298 password: str, 299 full_name: Optional[str] = None, 300 login_name: Optional[str] = None, 301 change_pw=True, 302 send_notify=True, 303 source_id=0, 304 ): 305 """Create User. 306 Throws: 307 AlreadyExistsException, if the User exists already 308 Exception, if something else went wrong. 309 """ 310 if not login_name: 311 login_name = user_name 312 if not full_name: 313 full_name = user_name 314 request_data = { 315 "source_id": source_id, 316 "login_name": login_name, 317 "full_name": full_name, 318 "username": user_name, 319 "email": email, 320 "password": password, 321 "send_notify": send_notify, 322 "must_change_password": change_pw, 323 } 324 325 self.logger.debug("Gitea post payload: %s", request_data) 326 result = self.requests_post(AllSpice.ADMIN_CREATE_USER, data=request_data) 327 if "id" in result: 328 self.logger.info( 329 "Successfully created User %s <%s> (id %s)", 330 result["login"], 331 result["email"], 332 result["id"], 333 ) 334 self.logger.debug("Gitea response: %s", result) 335 else: 336 self.logger.error(result["message"]) 337 raise Exception("User not created... (gitea: %s)" % result["message"]) 338 user = User.parse_response(self, result) 339 return user
Create User. Throws: AlreadyExistsException, if the User exists already Exception, if something else went wrong.
341 def create_repo( 342 self, 343 repoOwner: Union[User, Organization], 344 repoName: str, 345 description: str = "", 346 private: bool = False, 347 autoInit=True, 348 gitignores: Optional[str] = None, 349 license: Optional[str] = None, 350 readme: str = "Default", 351 issue_labels: Optional[str] = None, 352 default_branch="master", 353 ): 354 """Create a Repository as the administrator 355 356 Throws: 357 AlreadyExistsException: If the Repository exists already. 358 Exception: If something else went wrong. 359 360 Note: 361 Non-admin users can not use this method. Please use instead 362 `allspice.User.create_repo` or `allspice.Organization.create_repo`. 363 """ 364 # although this only says user in the api, this also works for 365 # organizations 366 assert isinstance(repoOwner, User) or isinstance(repoOwner, Organization) 367 result = self.requests_post( 368 AllSpice.ADMIN_REPO_CREATE % repoOwner.username, 369 data={ 370 "name": repoName, 371 "description": description, 372 "private": private, 373 "auto_init": autoInit, 374 "gitignores": gitignores, 375 "license": license, 376 "issue_labels": issue_labels, 377 "readme": readme, 378 "default_branch": default_branch, 379 }, 380 ) 381 if "id" in result: 382 self.logger.info("Successfully created Repository %s " % result["name"]) 383 else: 384 self.logger.error(result["message"]) 385 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 386 return Repository.parse_response(self, result)
Create a Repository as the administrator
Throws: AlreadyExistsException: If the Repository exists already. Exception: If something else went wrong.
Note:
Non-admin users can not use this method. Please use instead
allspice.User.create_repo
or allspice.Organization.create_repo
.
388 def create_org( 389 self, 390 owner: User, 391 orgName: str, 392 description: str, 393 location="", 394 website="", 395 full_name="", 396 ): 397 assert isinstance(owner, User) 398 result = self.requests_post( 399 AllSpice.CREATE_ORG % owner.username, 400 data={ 401 "username": orgName, 402 "description": description, 403 "location": location, 404 "website": website, 405 "full_name": full_name, 406 }, 407 ) 408 if "id" in result: 409 self.logger.info("Successfully created Organization %s" % result["username"]) 410 else: 411 self.logger.error("Organization not created... (gitea: %s)" % result["message"]) 412 self.logger.error(result["message"]) 413 raise Exception("Organization not created... (gitea: %s)" % result["message"]) 414 return Organization.parse_response(self, result)
416 def create_team( 417 self, 418 org: Organization, 419 name: str, 420 description: str = "", 421 permission: str = "read", 422 can_create_org_repo: bool = False, 423 includes_all_repositories: bool = False, 424 units=( 425 "repo.code", 426 "repo.issues", 427 "repo.ext_issues", 428 "repo.wiki", 429 "repo.pulls", 430 "repo.releases", 431 "repo.ext_wiki", 432 ), 433 units_map={}, 434 ): 435 """Creates a Team. 436 437 Args: 438 org (Organization): Organization the Team will be part of. 439 name (str): The Name of the Team to be created. 440 description (str): Optional, None, short description of the new Team. 441 permission (str): Optional, 'read', What permissions the members 442 units_map (dict): Optional, {}, a mapping of units to their 443 permissions. If None or empty, the `permission` permission will 444 be applied to all units. Note: When both `units` and `units_map` 445 are given, `units_map` will be preferred. 446 """ 447 448 result = self.requests_post( 449 AllSpice.CREATE_TEAM % org.username, 450 data={ 451 "name": name, 452 "description": description, 453 "permission": permission, 454 "can_create_org_repo": can_create_org_repo, 455 "includes_all_repositories": includes_all_repositories, 456 "units": units, 457 "units_map": units_map, 458 }, 459 ) 460 461 if "id" in result: 462 self.logger.info("Successfully created Team %s" % result["name"]) 463 else: 464 self.logger.error("Team not created... (gitea: %s)" % result["message"]) 465 self.logger.error(result["message"]) 466 raise Exception("Team not created... (gitea: %s)" % result["message"]) 467 api_object = Team.parse_response(self, result) 468 setattr( 469 api_object, "_organization", org 470 ) # fixes strange behaviour of gitea not returning a valid organization here. 471 return api_object
Creates a Team.
Args:
org (Organization): Organization the Team will be part of.
name (str): The Name of the Team to be created.
description (str): Optional, None, short description of the new Team.
permission (str): Optional, 'read', What permissions the members
units_map (dict): Optional, {}, a mapping of units to their
permissions. If None or empty, the permission
permission will
be applied to all units. Note: When both units
and units_map
are given, units_map
will be preferred.