allspice.allspice
1import json 2import logging 3import sys 4from typing import Any, Dict, List, Mapping, Optional, Union 5 6import requests 7import urllib3 8from frozendict import frozendict 9 10from .apiobject import Organization, Repository, Team, User 11from .exceptions import ( 12 AlreadyExistsException, 13 ConflictException, 14 NotFoundException, 15 NotYetGeneratedException, 16) 17from .ratelimiter import RateLimitedSession 18 19 20class AllSpice: 21 """Object to establish a session with AllSpice Hub.""" 22 23 ADMIN_CREATE_USER = """/admin/users""" 24 GET_USERS_ADMIN = """/admin/users""" 25 ADMIN_REPO_CREATE = """/admin/users/%s/repos""" # <ownername> 26 ALLSPICE_HUB_VERSION = """/version""" 27 GET_USER = """/user""" 28 GET_REPOSITORY = """/repos/{owner}/{name}""" 29 CREATE_ORG = """/admin/users/%s/orgs""" # <username> 30 CREATE_TEAM = """/orgs/%s/teams""" # <orgname> 31 32 def __init__( 33 self, 34 allspice_hub_url="https://hub.allspice.io", 35 token_text=None, 36 auth=None, 37 verify=True, 38 log_level="INFO", 39 ratelimiting=(100, 60), 40 use_new_schdoc_renderer: Optional[bool] = None, 41 ): 42 """Initializing an instance of the AllSpice Hub Client 43 44 Args: 45 allspice_hub_url (str): The URL for the AllSpice Hub instance. 46 Defaults to `https://hub.allspice.io`. 47 48 token_text (str, None): The access token, by default None. 49 50 auth (tuple, None): The user credentials 51 `(username, password)`, by default None. 52 53 verify (bool): If True, allow insecure server connections 54 when using SSL. 55 56 log_level (str): The log level, by default `INFO`. 57 58 ratelimiting (tuple[int, int], None): `(max_calls, period)`, 59 If None, no rate limiting is applied. By default, 100 calls 60 per minute are allowed. 61 62 use_new_schdoc_renderer (bool): Allows explicit override for using the new Altium schematic renderer. If set, 63 this will take precedence over the default behavior on the AllSpice Hub instance. 64 """ 65 66 self.logger = logging.getLogger(__name__) 67 handler = logging.StreamHandler(sys.stderr) 68 handler.setFormatter( 69 logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") 70 ) 71 self.logger.addHandler(handler) 72 self.logger.setLevel(log_level) 73 self.headers = { 74 "Content-type": "application/json", 75 } 76 self.url = allspice_hub_url 77 78 if ratelimiting is None: 79 self.requests = requests.Session() 80 else: 81 (max_calls, period) = ratelimiting 82 self.requests = RateLimitedSession(max_calls=max_calls, period=period) 83 84 # Manage authentification 85 if not token_text and not auth: 86 raise ValueError("Please provide auth or token_text, but not both") 87 if token_text: 88 self.headers["Authorization"] = "token " + token_text 89 if auth: 90 self.logger.warning( 91 "Using basic auth is not recommended. Prefer using a token instead." 92 ) 93 self.requests.auth = auth 94 95 # Manage SSL certification verification 96 self.requests.verify = verify 97 if not verify: 98 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) 99 100 self.use_new_schdoc_renderer = use_new_schdoc_renderer 101 102 def __get_url(self, endpoint): 103 url = self.url + "/api/v1" + endpoint 104 self.logger.debug("Url: %s" % url) 105 return url 106 107 def __get(self, endpoint: str, params: Mapping = frozendict()) -> requests.Response: 108 request = self.requests.get(self.__get_url(endpoint), headers=self.headers, params=params) 109 if request.status_code not in [200, 201]: 110 message = f"Received status code: {request.status_code} ({request.url})" 111 if request.status_code in [404]: 112 raise NotFoundException(message) 113 if request.status_code in [403]: 114 raise Exception( 115 f"Unauthorized: {request.url} - Check your permissions and try again! ({message})" 116 ) 117 if request.status_code in [409]: 118 raise ConflictException(message) 119 if request.status_code in [503]: 120 raise NotYetGeneratedException(message) 121 raise Exception(message) 122 return request 123 124 @staticmethod 125 def parse_result(result) -> Dict: 126 """Parses the result-JSON to a dict.""" 127 if result.text and len(result.text) > 3: 128 return json.loads(result.text) 129 return {} 130 131 def requests_get(self, endpoint: str, params: Mapping = frozendict(), sudo=None): 132 combined_params = {} 133 combined_params.update(params) 134 if sudo: 135 combined_params["sudo"] = sudo.username 136 return self.parse_result(self.__get(endpoint, combined_params)) 137 138 def requests_get_raw(self, endpoint: str, params=frozendict(), sudo=None) -> bytes: 139 combined_params = {} 140 combined_params.update(params) 141 if sudo: 142 combined_params["sudo"] = sudo.username 143 return self.__get(endpoint, combined_params).content 144 145 def requests_get_paginated( 146 self, 147 endpoint: str, 148 params=frozendict(), 149 sudo=None, 150 page_key: str = "page", 151 first_page: int = 1, 152 ): 153 page = first_page 154 combined_params = {} 155 combined_params.update(params) 156 aggregated_result = [] 157 while True: 158 combined_params[page_key] = page 159 result = self.requests_get(endpoint, combined_params, sudo) 160 161 if not result: 162 return aggregated_result 163 164 if isinstance(result, dict): 165 if "data" in result: 166 data = result["data"] 167 if len(data) == 0: 168 return aggregated_result 169 aggregated_result.extend(data) 170 elif "tree" in result: 171 data = result["tree"] 172 if data is None or len(data) == 0: 173 return aggregated_result 174 aggregated_result.extend(data) 175 else: 176 raise NotImplementedError( 177 "requests_get_paginated does not know how to handle responses of this type." 178 ) 179 else: 180 aggregated_result.extend(result) 181 182 page += 1 183 184 def requests_put(self, endpoint: str, data: Optional[dict] = None): 185 if not data: 186 data = {} 187 request = self.requests.put( 188 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 189 ) 190 if request.status_code not in [200, 204]: 191 message = f"Received status code: {request.status_code} ({request.url}) {request.text}" 192 self.logger.error(message) 193 raise Exception(message) 194 195 def requests_delete(self, endpoint: str, data: Optional[dict] = None): 196 request = self.requests.delete( 197 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 198 ) 199 if request.status_code not in [200, 204]: 200 message = f"Received status code: {request.status_code} ({request.url})" 201 self.logger.error(message) 202 raise Exception(message) 203 204 def requests_post( 205 self, 206 endpoint: str, 207 data: Optional[dict] = None, 208 params: Optional[dict] = None, 209 files: Optional[dict] = None, 210 ): 211 """ 212 Make a POST call to the endpoint. 213 214 :param endpoint: The path to the endpoint 215 :param data: A dictionary for JSON data 216 :param params: A dictionary of query params 217 :param files: A dictionary of files, see requests.post. Using both files and data 218 can lead to unexpected results! 219 :return: The JSON response parsed as a dict 220 """ 221 222 # This should ideally be a TypedDict of the type of arguments taken by 223 # `requests.post`. 224 args: dict[str, Any] = { 225 "headers": self.headers.copy(), 226 } 227 if data is not None: 228 args["data"] = json.dumps(data) 229 if params is not None: 230 args["params"] = params 231 if files is not None: 232 args["headers"].pop("Content-type") 233 args["files"] = files 234 235 request = self.requests.post(self.__get_url(endpoint), **args) 236 237 if request.status_code not in [200, 201, 202]: 238 if "already exists" in request.text or "e-mail already in use" in request.text: 239 self.logger.warning(request.text) 240 raise AlreadyExistsException() 241 self.logger.error(f"Received status code: {request.status_code} ({request.url})") 242 self.logger.error(f"With info: {data} ({self.headers})") 243 self.logger.error(f"Answer: {request.text}") 244 raise Exception( 245 f"Received status code: {request.status_code} ({request.url}), {request.text}" 246 ) 247 return self.parse_result(request) 248 249 def requests_patch(self, endpoint: str, data: dict): 250 request = self.requests.patch( 251 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 252 ) 253 if request.status_code not in [200, 201]: 254 error_message = f"Received status code: {request.status_code} ({request.url}) {data}" 255 self.logger.error(error_message) 256 raise Exception(error_message) 257 return self.parse_result(request) 258 259 def get_orgs_public_members_all(self, orgname): 260 path = "/orgs/" + orgname + "/public_members" 261 return self.requests_get(path) 262 263 def get_orgs(self): 264 path = "/admin/orgs" 265 results = self.requests_get(path) 266 return [Organization.parse_response(self, result) for result in results] 267 268 def get_user(self): 269 result = self.requests_get(AllSpice.GET_USER) 270 return User.parse_response(self, result) 271 272 def get_version(self) -> str: 273 result = self.requests_get(AllSpice.ALLSPICE_HUB_VERSION) 274 return result["version"] 275 276 def get_users(self) -> List[User]: 277 results = self.requests_get(AllSpice.GET_USERS_ADMIN) 278 return [User.parse_response(self, result) for result in results] 279 280 def get_user_by_email(self, email: str) -> Optional[User]: 281 users = self.get_users() 282 for user in users: 283 if user.email == email or email in user.emails: 284 return user 285 return None 286 287 def get_user_by_name(self, username: str) -> Optional[User]: 288 users = self.get_users() 289 for user in users: 290 if user.username == username: 291 return user 292 return None 293 294 def get_repository(self, owner: str, name: str) -> Repository: 295 path = self.GET_REPOSITORY.format(owner=owner, name=name) 296 result = self.requests_get(path) 297 return Repository.parse_response(self, result) 298 299 def create_user( 300 self, 301 user_name: str, 302 email: str, 303 password: str, 304 full_name: Optional[str] = None, 305 login_name: Optional[str] = None, 306 change_pw=True, 307 send_notify=True, 308 source_id=0, 309 ): 310 """Create User. 311 Throws: 312 AlreadyExistsException, if the User exists already 313 Exception, if something else went wrong. 314 """ 315 if not login_name: 316 login_name = user_name 317 if not full_name: 318 full_name = user_name 319 request_data = { 320 "source_id": source_id, 321 "login_name": login_name, 322 "full_name": full_name, 323 "username": user_name, 324 "email": email, 325 "password": password, 326 "send_notify": send_notify, 327 "must_change_password": change_pw, 328 } 329 330 self.logger.debug("Gitea post payload: %s", request_data) 331 result = self.requests_post(AllSpice.ADMIN_CREATE_USER, data=request_data) 332 if "id" in result: 333 self.logger.info( 334 "Successfully created User %s <%s> (id %s)", 335 result["login"], 336 result["email"], 337 result["id"], 338 ) 339 self.logger.debug("Gitea response: %s", result) 340 else: 341 self.logger.error(result["message"]) 342 raise Exception("User not created... (gitea: %s)" % result["message"]) 343 user = User.parse_response(self, result) 344 return user 345 346 def create_repo( 347 self, 348 repoOwner: Union[User, Organization], 349 repoName: str, 350 description: str = "", 351 private: bool = False, 352 autoInit=True, 353 gitignores: Optional[str] = None, 354 license: Optional[str] = None, 355 readme: str = "Default", 356 issue_labels: Optional[str] = None, 357 default_branch="master", 358 ): 359 """Create a Repository as the administrator 360 361 Throws: 362 AlreadyExistsException: If the Repository exists already. 363 Exception: If something else went wrong. 364 365 Note: 366 Non-admin users can not use this method. Please use instead 367 `allspice.User.create_repo` or `allspice.Organization.create_repo`. 368 """ 369 # although this only says user in the api, this also works for 370 # organizations 371 assert isinstance(repoOwner, User) or isinstance(repoOwner, Organization) 372 result = self.requests_post( 373 AllSpice.ADMIN_REPO_CREATE % repoOwner.username, 374 data={ 375 "name": repoName, 376 "description": description, 377 "private": private, 378 "auto_init": autoInit, 379 "gitignores": gitignores, 380 "license": license, 381 "issue_labels": issue_labels, 382 "readme": readme, 383 "default_branch": default_branch, 384 }, 385 ) 386 if "id" in result: 387 self.logger.info("Successfully created Repository %s " % result["name"]) 388 else: 389 self.logger.error(result["message"]) 390 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 391 return Repository.parse_response(self, result) 392 393 def create_org( 394 self, 395 owner: User, 396 orgName: str, 397 description: str, 398 location="", 399 website="", 400 full_name="", 401 ): 402 assert isinstance(owner, User) 403 result = self.requests_post( 404 AllSpice.CREATE_ORG % owner.username, 405 data={ 406 "username": orgName, 407 "description": description, 408 "location": location, 409 "website": website, 410 "full_name": full_name, 411 }, 412 ) 413 if "id" in result: 414 self.logger.info("Successfully created Organization %s" % result["username"]) 415 else: 416 self.logger.error("Organization not created... (gitea: %s)" % result["message"]) 417 self.logger.error(result["message"]) 418 raise Exception("Organization not created... (gitea: %s)" % result["message"]) 419 return Organization.parse_response(self, result) 420 421 def create_team( 422 self, 423 org: Organization, 424 name: str, 425 description: str = "", 426 permission: str = "read", 427 can_create_org_repo: bool = False, 428 includes_all_repositories: bool = False, 429 units=( 430 "repo.code", 431 "repo.issues", 432 "repo.ext_issues", 433 "repo.wiki", 434 "repo.pulls", 435 "repo.releases", 436 "repo.ext_wiki", 437 ), 438 units_map={}, 439 ): 440 """Creates a Team. 441 442 Args: 443 org (Organization): Organization the Team will be part of. 444 name (str): The Name of the Team to be created. 445 description (str): Optional, None, short description of the new Team. 446 permission (str): Optional, 'read', What permissions the members 447 units_map (dict): Optional, {}, a mapping of units to their 448 permissions. If None or empty, the `permission` permission will 449 be applied to all units. Note: When both `units` and `units_map` 450 are given, `units_map` will be preferred. 451 """ 452 453 result = self.requests_post( 454 AllSpice.CREATE_TEAM % org.username, 455 data={ 456 "name": name, 457 "description": description, 458 "permission": permission, 459 "can_create_org_repo": can_create_org_repo, 460 "includes_all_repositories": includes_all_repositories, 461 "units": units, 462 "units_map": units_map, 463 }, 464 ) 465 466 if "id" in result: 467 self.logger.info("Successfully created Team %s" % result["name"]) 468 else: 469 self.logger.error("Team not created... (gitea: %s)" % result["message"]) 470 self.logger.error(result["message"]) 471 raise Exception("Team not created... (gitea: %s)" % result["message"]) 472 api_object = Team.parse_response(self, result) 473 setattr( 474 api_object, "_organization", org 475 ) # fixes strange behaviour of gitea not returning a valid organization here. 476 return api_object
21class AllSpice: 22 """Object to establish a session with AllSpice Hub.""" 23 24 ADMIN_CREATE_USER = """/admin/users""" 25 GET_USERS_ADMIN = """/admin/users""" 26 ADMIN_REPO_CREATE = """/admin/users/%s/repos""" # <ownername> 27 ALLSPICE_HUB_VERSION = """/version""" 28 GET_USER = """/user""" 29 GET_REPOSITORY = """/repos/{owner}/{name}""" 30 CREATE_ORG = """/admin/users/%s/orgs""" # <username> 31 CREATE_TEAM = """/orgs/%s/teams""" # <orgname> 32 33 def __init__( 34 self, 35 allspice_hub_url="https://hub.allspice.io", 36 token_text=None, 37 auth=None, 38 verify=True, 39 log_level="INFO", 40 ratelimiting=(100, 60), 41 use_new_schdoc_renderer: Optional[bool] = None, 42 ): 43 """Initializing an instance of the AllSpice Hub Client 44 45 Args: 46 allspice_hub_url (str): The URL for the AllSpice Hub instance. 47 Defaults to `https://hub.allspice.io`. 48 49 token_text (str, None): The access token, by default None. 50 51 auth (tuple, None): The user credentials 52 `(username, password)`, by default None. 53 54 verify (bool): If True, allow insecure server connections 55 when using SSL. 56 57 log_level (str): The log level, by default `INFO`. 58 59 ratelimiting (tuple[int, int], None): `(max_calls, period)`, 60 If None, no rate limiting is applied. By default, 100 calls 61 per minute are allowed. 62 63 use_new_schdoc_renderer (bool): Allows explicit override for using the new Altium schematic renderer. If set, 64 this will take precedence over the default behavior on the AllSpice Hub instance. 65 """ 66 67 self.logger = logging.getLogger(__name__) 68 handler = logging.StreamHandler(sys.stderr) 69 handler.setFormatter( 70 logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") 71 ) 72 self.logger.addHandler(handler) 73 self.logger.setLevel(log_level) 74 self.headers = { 75 "Content-type": "application/json", 76 } 77 self.url = allspice_hub_url 78 79 if ratelimiting is None: 80 self.requests = requests.Session() 81 else: 82 (max_calls, period) = ratelimiting 83 self.requests = RateLimitedSession(max_calls=max_calls, period=period) 84 85 # Manage authentification 86 if not token_text and not auth: 87 raise ValueError("Please provide auth or token_text, but not both") 88 if token_text: 89 self.headers["Authorization"] = "token " + token_text 90 if auth: 91 self.logger.warning( 92 "Using basic auth is not recommended. Prefer using a token instead." 93 ) 94 self.requests.auth = auth 95 96 # Manage SSL certification verification 97 self.requests.verify = verify 98 if not verify: 99 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) 100 101 self.use_new_schdoc_renderer = use_new_schdoc_renderer 102 103 def __get_url(self, endpoint): 104 url = self.url + "/api/v1" + endpoint 105 self.logger.debug("Url: %s" % url) 106 return url 107 108 def __get(self, endpoint: str, params: Mapping = frozendict()) -> requests.Response: 109 request = self.requests.get(self.__get_url(endpoint), headers=self.headers, params=params) 110 if request.status_code not in [200, 201]: 111 message = f"Received status code: {request.status_code} ({request.url})" 112 if request.status_code in [404]: 113 raise NotFoundException(message) 114 if request.status_code in [403]: 115 raise Exception( 116 f"Unauthorized: {request.url} - Check your permissions and try again! ({message})" 117 ) 118 if request.status_code in [409]: 119 raise ConflictException(message) 120 if request.status_code in [503]: 121 raise NotYetGeneratedException(message) 122 raise Exception(message) 123 return request 124 125 @staticmethod 126 def parse_result(result) -> Dict: 127 """Parses the result-JSON to a dict.""" 128 if result.text and len(result.text) > 3: 129 return json.loads(result.text) 130 return {} 131 132 def requests_get(self, endpoint: str, params: Mapping = frozendict(), sudo=None): 133 combined_params = {} 134 combined_params.update(params) 135 if sudo: 136 combined_params["sudo"] = sudo.username 137 return self.parse_result(self.__get(endpoint, combined_params)) 138 139 def requests_get_raw(self, endpoint: str, params=frozendict(), sudo=None) -> bytes: 140 combined_params = {} 141 combined_params.update(params) 142 if sudo: 143 combined_params["sudo"] = sudo.username 144 return self.__get(endpoint, combined_params).content 145 146 def requests_get_paginated( 147 self, 148 endpoint: str, 149 params=frozendict(), 150 sudo=None, 151 page_key: str = "page", 152 first_page: int = 1, 153 ): 154 page = first_page 155 combined_params = {} 156 combined_params.update(params) 157 aggregated_result = [] 158 while True: 159 combined_params[page_key] = page 160 result = self.requests_get(endpoint, combined_params, sudo) 161 162 if not result: 163 return aggregated_result 164 165 if isinstance(result, dict): 166 if "data" in result: 167 data = result["data"] 168 if len(data) == 0: 169 return aggregated_result 170 aggregated_result.extend(data) 171 elif "tree" in result: 172 data = result["tree"] 173 if data is None or len(data) == 0: 174 return aggregated_result 175 aggregated_result.extend(data) 176 else: 177 raise NotImplementedError( 178 "requests_get_paginated does not know how to handle responses of this type." 179 ) 180 else: 181 aggregated_result.extend(result) 182 183 page += 1 184 185 def requests_put(self, endpoint: str, data: Optional[dict] = None): 186 if not data: 187 data = {} 188 request = self.requests.put( 189 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 190 ) 191 if request.status_code not in [200, 204]: 192 message = f"Received status code: {request.status_code} ({request.url}) {request.text}" 193 self.logger.error(message) 194 raise Exception(message) 195 196 def requests_delete(self, endpoint: str, data: Optional[dict] = None): 197 request = self.requests.delete( 198 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 199 ) 200 if request.status_code not in [200, 204]: 201 message = f"Received status code: {request.status_code} ({request.url})" 202 self.logger.error(message) 203 raise Exception(message) 204 205 def requests_post( 206 self, 207 endpoint: str, 208 data: Optional[dict] = None, 209 params: Optional[dict] = None, 210 files: Optional[dict] = None, 211 ): 212 """ 213 Make a POST call to the endpoint. 214 215 :param endpoint: The path to the endpoint 216 :param data: A dictionary for JSON data 217 :param params: A dictionary of query params 218 :param files: A dictionary of files, see requests.post. Using both files and data 219 can lead to unexpected results! 220 :return: The JSON response parsed as a dict 221 """ 222 223 # This should ideally be a TypedDict of the type of arguments taken by 224 # `requests.post`. 225 args: dict[str, Any] = { 226 "headers": self.headers.copy(), 227 } 228 if data is not None: 229 args["data"] = json.dumps(data) 230 if params is not None: 231 args["params"] = params 232 if files is not None: 233 args["headers"].pop("Content-type") 234 args["files"] = files 235 236 request = self.requests.post(self.__get_url(endpoint), **args) 237 238 if request.status_code not in [200, 201, 202]: 239 if "already exists" in request.text or "e-mail already in use" in request.text: 240 self.logger.warning(request.text) 241 raise AlreadyExistsException() 242 self.logger.error(f"Received status code: {request.status_code} ({request.url})") 243 self.logger.error(f"With info: {data} ({self.headers})") 244 self.logger.error(f"Answer: {request.text}") 245 raise Exception( 246 f"Received status code: {request.status_code} ({request.url}), {request.text}" 247 ) 248 return self.parse_result(request) 249 250 def requests_patch(self, endpoint: str, data: dict): 251 request = self.requests.patch( 252 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 253 ) 254 if request.status_code not in [200, 201]: 255 error_message = f"Received status code: {request.status_code} ({request.url}) {data}" 256 self.logger.error(error_message) 257 raise Exception(error_message) 258 return self.parse_result(request) 259 260 def get_orgs_public_members_all(self, orgname): 261 path = "/orgs/" + orgname + "/public_members" 262 return self.requests_get(path) 263 264 def get_orgs(self): 265 path = "/admin/orgs" 266 results = self.requests_get(path) 267 return [Organization.parse_response(self, result) for result in results] 268 269 def get_user(self): 270 result = self.requests_get(AllSpice.GET_USER) 271 return User.parse_response(self, result) 272 273 def get_version(self) -> str: 274 result = self.requests_get(AllSpice.ALLSPICE_HUB_VERSION) 275 return result["version"] 276 277 def get_users(self) -> List[User]: 278 results = self.requests_get(AllSpice.GET_USERS_ADMIN) 279 return [User.parse_response(self, result) for result in results] 280 281 def get_user_by_email(self, email: str) -> Optional[User]: 282 users = self.get_users() 283 for user in users: 284 if user.email == email or email in user.emails: 285 return user 286 return None 287 288 def get_user_by_name(self, username: str) -> Optional[User]: 289 users = self.get_users() 290 for user in users: 291 if user.username == username: 292 return user 293 return None 294 295 def get_repository(self, owner: str, name: str) -> Repository: 296 path = self.GET_REPOSITORY.format(owner=owner, name=name) 297 result = self.requests_get(path) 298 return Repository.parse_response(self, result) 299 300 def create_user( 301 self, 302 user_name: str, 303 email: str, 304 password: str, 305 full_name: Optional[str] = None, 306 login_name: Optional[str] = None, 307 change_pw=True, 308 send_notify=True, 309 source_id=0, 310 ): 311 """Create User. 312 Throws: 313 AlreadyExistsException, if the User exists already 314 Exception, if something else went wrong. 315 """ 316 if not login_name: 317 login_name = user_name 318 if not full_name: 319 full_name = user_name 320 request_data = { 321 "source_id": source_id, 322 "login_name": login_name, 323 "full_name": full_name, 324 "username": user_name, 325 "email": email, 326 "password": password, 327 "send_notify": send_notify, 328 "must_change_password": change_pw, 329 } 330 331 self.logger.debug("Gitea post payload: %s", request_data) 332 result = self.requests_post(AllSpice.ADMIN_CREATE_USER, data=request_data) 333 if "id" in result: 334 self.logger.info( 335 "Successfully created User %s <%s> (id %s)", 336 result["login"], 337 result["email"], 338 result["id"], 339 ) 340 self.logger.debug("Gitea response: %s", result) 341 else: 342 self.logger.error(result["message"]) 343 raise Exception("User not created... (gitea: %s)" % result["message"]) 344 user = User.parse_response(self, result) 345 return user 346 347 def create_repo( 348 self, 349 repoOwner: Union[User, Organization], 350 repoName: str, 351 description: str = "", 352 private: bool = False, 353 autoInit=True, 354 gitignores: Optional[str] = None, 355 license: Optional[str] = None, 356 readme: str = "Default", 357 issue_labels: Optional[str] = None, 358 default_branch="master", 359 ): 360 """Create a Repository as the administrator 361 362 Throws: 363 AlreadyExistsException: If the Repository exists already. 364 Exception: If something else went wrong. 365 366 Note: 367 Non-admin users can not use this method. Please use instead 368 `allspice.User.create_repo` or `allspice.Organization.create_repo`. 369 """ 370 # although this only says user in the api, this also works for 371 # organizations 372 assert isinstance(repoOwner, User) or isinstance(repoOwner, Organization) 373 result = self.requests_post( 374 AllSpice.ADMIN_REPO_CREATE % repoOwner.username, 375 data={ 376 "name": repoName, 377 "description": description, 378 "private": private, 379 "auto_init": autoInit, 380 "gitignores": gitignores, 381 "license": license, 382 "issue_labels": issue_labels, 383 "readme": readme, 384 "default_branch": default_branch, 385 }, 386 ) 387 if "id" in result: 388 self.logger.info("Successfully created Repository %s " % result["name"]) 389 else: 390 self.logger.error(result["message"]) 391 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 392 return Repository.parse_response(self, result) 393 394 def create_org( 395 self, 396 owner: User, 397 orgName: str, 398 description: str, 399 location="", 400 website="", 401 full_name="", 402 ): 403 assert isinstance(owner, User) 404 result = self.requests_post( 405 AllSpice.CREATE_ORG % owner.username, 406 data={ 407 "username": orgName, 408 "description": description, 409 "location": location, 410 "website": website, 411 "full_name": full_name, 412 }, 413 ) 414 if "id" in result: 415 self.logger.info("Successfully created Organization %s" % result["username"]) 416 else: 417 self.logger.error("Organization not created... (gitea: %s)" % result["message"]) 418 self.logger.error(result["message"]) 419 raise Exception("Organization not created... (gitea: %s)" % result["message"]) 420 return Organization.parse_response(self, result) 421 422 def create_team( 423 self, 424 org: Organization, 425 name: str, 426 description: str = "", 427 permission: str = "read", 428 can_create_org_repo: bool = False, 429 includes_all_repositories: bool = False, 430 units=( 431 "repo.code", 432 "repo.issues", 433 "repo.ext_issues", 434 "repo.wiki", 435 "repo.pulls", 436 "repo.releases", 437 "repo.ext_wiki", 438 ), 439 units_map={}, 440 ): 441 """Creates a Team. 442 443 Args: 444 org (Organization): Organization the Team will be part of. 445 name (str): The Name of the Team to be created. 446 description (str): Optional, None, short description of the new Team. 447 permission (str): Optional, 'read', What permissions the members 448 units_map (dict): Optional, {}, a mapping of units to their 449 permissions. If None or empty, the `permission` permission will 450 be applied to all units. Note: When both `units` and `units_map` 451 are given, `units_map` will be preferred. 452 """ 453 454 result = self.requests_post( 455 AllSpice.CREATE_TEAM % org.username, 456 data={ 457 "name": name, 458 "description": description, 459 "permission": permission, 460 "can_create_org_repo": can_create_org_repo, 461 "includes_all_repositories": includes_all_repositories, 462 "units": units, 463 "units_map": units_map, 464 }, 465 ) 466 467 if "id" in result: 468 self.logger.info("Successfully created Team %s" % result["name"]) 469 else: 470 self.logger.error("Team not created... (gitea: %s)" % result["message"]) 471 self.logger.error(result["message"]) 472 raise Exception("Team not created... (gitea: %s)" % result["message"]) 473 api_object = Team.parse_response(self, result) 474 setattr( 475 api_object, "_organization", org 476 ) # fixes strange behaviour of gitea not returning a valid organization here. 477 return api_object
Object to establish a session with AllSpice Hub.
33 def __init__( 34 self, 35 allspice_hub_url="https://hub.allspice.io", 36 token_text=None, 37 auth=None, 38 verify=True, 39 log_level="INFO", 40 ratelimiting=(100, 60), 41 use_new_schdoc_renderer: Optional[bool] = None, 42 ): 43 """Initializing an instance of the AllSpice Hub Client 44 45 Args: 46 allspice_hub_url (str): The URL for the AllSpice Hub instance. 47 Defaults to `https://hub.allspice.io`. 48 49 token_text (str, None): The access token, by default None. 50 51 auth (tuple, None): The user credentials 52 `(username, password)`, by default None. 53 54 verify (bool): If True, allow insecure server connections 55 when using SSL. 56 57 log_level (str): The log level, by default `INFO`. 58 59 ratelimiting (tuple[int, int], None): `(max_calls, period)`, 60 If None, no rate limiting is applied. By default, 100 calls 61 per minute are allowed. 62 63 use_new_schdoc_renderer (bool): Allows explicit override for using the new Altium schematic renderer. If set, 64 this will take precedence over the default behavior on the AllSpice Hub instance. 65 """ 66 67 self.logger = logging.getLogger(__name__) 68 handler = logging.StreamHandler(sys.stderr) 69 handler.setFormatter( 70 logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") 71 ) 72 self.logger.addHandler(handler) 73 self.logger.setLevel(log_level) 74 self.headers = { 75 "Content-type": "application/json", 76 } 77 self.url = allspice_hub_url 78 79 if ratelimiting is None: 80 self.requests = requests.Session() 81 else: 82 (max_calls, period) = ratelimiting 83 self.requests = RateLimitedSession(max_calls=max_calls, period=period) 84 85 # Manage authentification 86 if not token_text and not auth: 87 raise ValueError("Please provide auth or token_text, but not both") 88 if token_text: 89 self.headers["Authorization"] = "token " + token_text 90 if auth: 91 self.logger.warning( 92 "Using basic auth is not recommended. Prefer using a token instead." 93 ) 94 self.requests.auth = auth 95 96 # Manage SSL certification verification 97 self.requests.verify = verify 98 if not verify: 99 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) 100 101 self.use_new_schdoc_renderer = use_new_schdoc_renderer
Initializing an instance of the AllSpice Hub Client
Args:
allspice_hub_url (str): The URL for the AllSpice Hub instance.
Defaults to https://hub.allspice.io.
token_text (str, None): The access token, by default None.
auth (tuple, None): The user credentials
`(username, password)`, by default None.
verify (bool): If True, allow insecure server connections
when using SSL.
log_level (str): The log level, by default `INFO`.
ratelimiting (tuple[int, int], None): `(max_calls, period)`,
If None, no rate limiting is applied. By default, 100 calls
per minute are allowed.
use_new_schdoc_renderer (bool): Allows explicit override for using the new Altium schematic renderer. If set,
this will take precedence over the default behavior on the AllSpice Hub instance.
125 @staticmethod 126 def parse_result(result) -> Dict: 127 """Parses the result-JSON to a dict.""" 128 if result.text and len(result.text) > 3: 129 return json.loads(result.text) 130 return {}
Parses the result-JSON to a dict.
146 def requests_get_paginated( 147 self, 148 endpoint: str, 149 params=frozendict(), 150 sudo=None, 151 page_key: str = "page", 152 first_page: int = 1, 153 ): 154 page = first_page 155 combined_params = {} 156 combined_params.update(params) 157 aggregated_result = [] 158 while True: 159 combined_params[page_key] = page 160 result = self.requests_get(endpoint, combined_params, sudo) 161 162 if not result: 163 return aggregated_result 164 165 if isinstance(result, dict): 166 if "data" in result: 167 data = result["data"] 168 if len(data) == 0: 169 return aggregated_result 170 aggregated_result.extend(data) 171 elif "tree" in result: 172 data = result["tree"] 173 if data is None or len(data) == 0: 174 return aggregated_result 175 aggregated_result.extend(data) 176 else: 177 raise NotImplementedError( 178 "requests_get_paginated does not know how to handle responses of this type." 179 ) 180 else: 181 aggregated_result.extend(result) 182 183 page += 1
185 def requests_put(self, endpoint: str, data: Optional[dict] = None): 186 if not data: 187 data = {} 188 request = self.requests.put( 189 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 190 ) 191 if request.status_code not in [200, 204]: 192 message = f"Received status code: {request.status_code} ({request.url}) {request.text}" 193 self.logger.error(message) 194 raise Exception(message)
196 def requests_delete(self, endpoint: str, data: Optional[dict] = None): 197 request = self.requests.delete( 198 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 199 ) 200 if request.status_code not in [200, 204]: 201 message = f"Received status code: {request.status_code} ({request.url})" 202 self.logger.error(message) 203 raise Exception(message)
205 def requests_post( 206 self, 207 endpoint: str, 208 data: Optional[dict] = None, 209 params: Optional[dict] = None, 210 files: Optional[dict] = None, 211 ): 212 """ 213 Make a POST call to the endpoint. 214 215 :param endpoint: The path to the endpoint 216 :param data: A dictionary for JSON data 217 :param params: A dictionary of query params 218 :param files: A dictionary of files, see requests.post. Using both files and data 219 can lead to unexpected results! 220 :return: The JSON response parsed as a dict 221 """ 222 223 # This should ideally be a TypedDict of the type of arguments taken by 224 # `requests.post`. 225 args: dict[str, Any] = { 226 "headers": self.headers.copy(), 227 } 228 if data is not None: 229 args["data"] = json.dumps(data) 230 if params is not None: 231 args["params"] = params 232 if files is not None: 233 args["headers"].pop("Content-type") 234 args["files"] = files 235 236 request = self.requests.post(self.__get_url(endpoint), **args) 237 238 if request.status_code not in [200, 201, 202]: 239 if "already exists" in request.text or "e-mail already in use" in request.text: 240 self.logger.warning(request.text) 241 raise AlreadyExistsException() 242 self.logger.error(f"Received status code: {request.status_code} ({request.url})") 243 self.logger.error(f"With info: {data} ({self.headers})") 244 self.logger.error(f"Answer: {request.text}") 245 raise Exception( 246 f"Received status code: {request.status_code} ({request.url}), {request.text}" 247 ) 248 return self.parse_result(request)
Make a POST call to the endpoint.
Parameters
- endpoint: The path to the endpoint
- data: A dictionary for JSON data
- params: A dictionary of query params
- files: A dictionary of files, see requests.post. Using both files and data can lead to unexpected results!
Returns
The JSON response parsed as a dict
250 def requests_patch(self, endpoint: str, data: dict): 251 request = self.requests.patch( 252 self.__get_url(endpoint), headers=self.headers, data=json.dumps(data) 253 ) 254 if request.status_code not in [200, 201]: 255 error_message = f"Received status code: {request.status_code} ({request.url}) {data}" 256 self.logger.error(error_message) 257 raise Exception(error_message) 258 return self.parse_result(request)
300 def create_user( 301 self, 302 user_name: str, 303 email: str, 304 password: str, 305 full_name: Optional[str] = None, 306 login_name: Optional[str] = None, 307 change_pw=True, 308 send_notify=True, 309 source_id=0, 310 ): 311 """Create User. 312 Throws: 313 AlreadyExistsException, if the User exists already 314 Exception, if something else went wrong. 315 """ 316 if not login_name: 317 login_name = user_name 318 if not full_name: 319 full_name = user_name 320 request_data = { 321 "source_id": source_id, 322 "login_name": login_name, 323 "full_name": full_name, 324 "username": user_name, 325 "email": email, 326 "password": password, 327 "send_notify": send_notify, 328 "must_change_password": change_pw, 329 } 330 331 self.logger.debug("Gitea post payload: %s", request_data) 332 result = self.requests_post(AllSpice.ADMIN_CREATE_USER, data=request_data) 333 if "id" in result: 334 self.logger.info( 335 "Successfully created User %s <%s> (id %s)", 336 result["login"], 337 result["email"], 338 result["id"], 339 ) 340 self.logger.debug("Gitea response: %s", result) 341 else: 342 self.logger.error(result["message"]) 343 raise Exception("User not created... (gitea: %s)" % result["message"]) 344 user = User.parse_response(self, result) 345 return user
Create User. Throws: AlreadyExistsException, if the User exists already Exception, if something else went wrong.
347 def create_repo( 348 self, 349 repoOwner: Union[User, Organization], 350 repoName: str, 351 description: str = "", 352 private: bool = False, 353 autoInit=True, 354 gitignores: Optional[str] = None, 355 license: Optional[str] = None, 356 readme: str = "Default", 357 issue_labels: Optional[str] = None, 358 default_branch="master", 359 ): 360 """Create a Repository as the administrator 361 362 Throws: 363 AlreadyExistsException: If the Repository exists already. 364 Exception: If something else went wrong. 365 366 Note: 367 Non-admin users can not use this method. Please use instead 368 `allspice.User.create_repo` or `allspice.Organization.create_repo`. 369 """ 370 # although this only says user in the api, this also works for 371 # organizations 372 assert isinstance(repoOwner, User) or isinstance(repoOwner, Organization) 373 result = self.requests_post( 374 AllSpice.ADMIN_REPO_CREATE % repoOwner.username, 375 data={ 376 "name": repoName, 377 "description": description, 378 "private": private, 379 "auto_init": autoInit, 380 "gitignores": gitignores, 381 "license": license, 382 "issue_labels": issue_labels, 383 "readme": readme, 384 "default_branch": default_branch, 385 }, 386 ) 387 if "id" in result: 388 self.logger.info("Successfully created Repository %s " % result["name"]) 389 else: 390 self.logger.error(result["message"]) 391 raise Exception("Repository not created... (gitea: %s)" % result["message"]) 392 return Repository.parse_response(self, result)
Create a Repository as the administrator
Throws: AlreadyExistsException: If the Repository exists already. Exception: If something else went wrong.
Note:
Non-admin users can not use this method. Please use instead
allspice.User.create_repo or allspice.Organization.create_repo.
394 def create_org( 395 self, 396 owner: User, 397 orgName: str, 398 description: str, 399 location="", 400 website="", 401 full_name="", 402 ): 403 assert isinstance(owner, User) 404 result = self.requests_post( 405 AllSpice.CREATE_ORG % owner.username, 406 data={ 407 "username": orgName, 408 "description": description, 409 "location": location, 410 "website": website, 411 "full_name": full_name, 412 }, 413 ) 414 if "id" in result: 415 self.logger.info("Successfully created Organization %s" % result["username"]) 416 else: 417 self.logger.error("Organization not created... (gitea: %s)" % result["message"]) 418 self.logger.error(result["message"]) 419 raise Exception("Organization not created... (gitea: %s)" % result["message"]) 420 return Organization.parse_response(self, result)
422 def create_team( 423 self, 424 org: Organization, 425 name: str, 426 description: str = "", 427 permission: str = "read", 428 can_create_org_repo: bool = False, 429 includes_all_repositories: bool = False, 430 units=( 431 "repo.code", 432 "repo.issues", 433 "repo.ext_issues", 434 "repo.wiki", 435 "repo.pulls", 436 "repo.releases", 437 "repo.ext_wiki", 438 ), 439 units_map={}, 440 ): 441 """Creates a Team. 442 443 Args: 444 org (Organization): Organization the Team will be part of. 445 name (str): The Name of the Team to be created. 446 description (str): Optional, None, short description of the new Team. 447 permission (str): Optional, 'read', What permissions the members 448 units_map (dict): Optional, {}, a mapping of units to their 449 permissions. If None or empty, the `permission` permission will 450 be applied to all units. Note: When both `units` and `units_map` 451 are given, `units_map` will be preferred. 452 """ 453 454 result = self.requests_post( 455 AllSpice.CREATE_TEAM % org.username, 456 data={ 457 "name": name, 458 "description": description, 459 "permission": permission, 460 "can_create_org_repo": can_create_org_repo, 461 "includes_all_repositories": includes_all_repositories, 462 "units": units, 463 "units_map": units_map, 464 }, 465 ) 466 467 if "id" in result: 468 self.logger.info("Successfully created Team %s" % result["name"]) 469 else: 470 self.logger.error("Team not created... (gitea: %s)" % result["message"]) 471 self.logger.error(result["message"]) 472 raise Exception("Team not created... (gitea: %s)" % result["message"]) 473 api_object = Team.parse_response(self, result) 474 setattr( 475 api_object, "_organization", org 476 ) # fixes strange behaviour of gitea not returning a valid organization here. 477 return api_object
Creates a Team.
Args:
org (Organization): Organization the Team will be part of.
name (str): The Name of the Team to be created.
description (str): Optional, None, short description of the new Team.
permission (str): Optional, 'read', What permissions the members
units_map (dict): Optional, {}, a mapping of units to their
permissions. If None or empty, the permission permission will
be applied to all units. Note: When both units and units_map
are given, units_map will be preferred.