allspice.allspice

  1import json
  2import logging
  3import sys
  4from typing import Any, Dict, List, Mapping, Optional, Union
  5
  6import requests
  7import urllib3
  8from frozendict import frozendict
  9
 10from .apiobject import Organization, Repository, Team, User
 11from .exceptions import (
 12    AlreadyExistsException,
 13    ConflictException,
 14    NotFoundException,
 15    NotYetGeneratedException,
 16)
 17from .ratelimiter import RateLimitedSession
 18
 19
 20class AllSpice:
 21    """Object to establish a session with AllSpice Hub."""
 22
 23    ADMIN_CREATE_USER = """/admin/users"""
 24    GET_USERS_ADMIN = """/admin/users"""
 25    ADMIN_REPO_CREATE = """/admin/users/%s/repos"""  # <ownername>
 26    ALLSPICE_HUB_VERSION = """/version"""
 27    GET_USER = """/user"""
 28    GET_REPOSITORY = """/repos/{owner}/{name}"""
 29    CREATE_ORG = """/admin/users/%s/orgs"""  # <username>
 30    CREATE_TEAM = """/orgs/%s/teams"""  # <orgname>
 31
 32    def __init__(
 33        self,
 34        allspice_hub_url="https://hub.allspice.io",
 35        token_text=None,
 36        auth=None,
 37        verify=True,
 38        log_level="INFO",
 39        ratelimiting=(100, 60),
 40        use_new_schdoc_renderer: Optional[bool] = None,
 41    ):
 42        """Initializing an instance of the AllSpice Hub Client
 43
 44        Args:
 45            allspice_hub_url (str): The URL for the AllSpice Hub instance.
 46                Defaults to `https://hub.allspice.io`.
 47
 48            token_text (str, None): The access token, by default None.
 49
 50            auth (tuple, None): The user credentials
 51                `(username, password)`, by default None.
 52
 53            verify (bool): If True, allow insecure server connections
 54                when using SSL.
 55
 56            log_level (str): The log level, by default `INFO`.
 57
 58            ratelimiting (tuple[int, int], None): `(max_calls, period)`,
 59                If None, no rate limiting is applied. By default, 100 calls
 60                per minute are allowed.
 61
 62            use_new_schdoc_renderer (bool): Allows explicit override for using the new Altium schematic renderer. If set,
 63            this will take precedence over the default behavior on the AllSpice Hub instance.
 64        """
 65
 66        self.logger = logging.getLogger(__name__)
 67        handler = logging.StreamHandler(sys.stderr)
 68        handler.setFormatter(
 69            logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
 70        )
 71        self.logger.addHandler(handler)
 72        self.logger.setLevel(log_level)
 73        self.headers = {
 74            "Content-type": "application/json",
 75        }
 76        self.url = allspice_hub_url
 77
 78        if ratelimiting is None:
 79            self.requests = requests.Session()
 80        else:
 81            (max_calls, period) = ratelimiting
 82            self.requests = RateLimitedSession(max_calls=max_calls, period=period)
 83
 84        # Manage authentification
 85        if not token_text and not auth:
 86            raise ValueError("Please provide auth or token_text, but not both")
 87        if token_text:
 88            self.headers["Authorization"] = "token " + token_text
 89        if auth:
 90            self.logger.warning(
 91                "Using basic auth is not recommended. Prefer using a token instead."
 92            )
 93            self.requests.auth = auth
 94
 95        # Manage SSL certification verification
 96        self.requests.verify = verify
 97        if not verify:
 98            urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
 99
100        self.use_new_schdoc_renderer = use_new_schdoc_renderer
101
102    def __get_url(self, endpoint):
103        url = self.url + "/api/v1" + endpoint
104        self.logger.debug("Url: %s" % url)
105        return url
106
107    def __get(self, endpoint: str, params: Mapping = frozendict()) -> requests.Response:
108        request = self.requests.get(self.__get_url(endpoint), headers=self.headers, params=params)
109        if request.status_code not in [200, 201]:
110            message = f"Received status code: {request.status_code} ({request.url})"
111            if request.status_code in [404]:
112                raise NotFoundException(message)
113            if request.status_code in [403]:
114                raise Exception(
115                    f"Unauthorized: {request.url} - Check your permissions and try again! ({message})"
116                )
117            if request.status_code in [409]:
118                raise ConflictException(message)
119            if request.status_code in [503]:
120                raise NotYetGeneratedException(message)
121            raise Exception(message)
122        return request
123
124    @staticmethod
125    def parse_result(result) -> Dict:
126        """Parses the result-JSON to a dict."""
127        if result.text and len(result.text) > 3:
128            return json.loads(result.text)
129        return {}
130
131    def requests_get(self, endpoint: str, params: Mapping = frozendict(), sudo=None):
132        combined_params = {}
133        combined_params.update(params)
134        if sudo:
135            combined_params["sudo"] = sudo.username
136        return self.parse_result(self.__get(endpoint, combined_params))
137
138    def requests_get_raw(self, endpoint: str, params=frozendict(), sudo=None) -> bytes:
139        combined_params = {}
140        combined_params.update(params)
141        if sudo:
142            combined_params["sudo"] = sudo.username
143        return self.__get(endpoint, combined_params).content
144
145    def requests_get_paginated(
146        self,
147        endpoint: str,
148        params=frozendict(),
149        sudo=None,
150        page_key: str = "page",
151        first_page: int = 1,
152    ):
153        page = first_page
154        combined_params = {}
155        combined_params.update(params)
156        aggregated_result = []
157        while True:
158            combined_params[page_key] = page
159            result = self.requests_get(endpoint, combined_params, sudo)
160
161            if not result:
162                return aggregated_result
163
164            if isinstance(result, dict):
165                if "data" in result:
166                    data = result["data"]
167                    if len(data) == 0:
168                        return aggregated_result
169                    aggregated_result.extend(data)
170                elif "tree" in result:
171                    data = result["tree"]
172                    if data is None or len(data) == 0:
173                        return aggregated_result
174                    aggregated_result.extend(data)
175                else:
176                    raise NotImplementedError(
177                        "requests_get_paginated does not know how to handle responses of this type."
178                    )
179            else:
180                aggregated_result.extend(result)
181
182            page += 1
183
184    def requests_put(self, endpoint: str, data: Optional[dict] = None):
185        if not data:
186            data = {}
187        request = self.requests.put(
188            self.__get_url(endpoint), headers=self.headers, data=json.dumps(data)
189        )
190        if request.status_code not in [200, 204]:
191            message = f"Received status code: {request.status_code} ({request.url}) {request.text}"
192            self.logger.error(message)
193            raise Exception(message)
194
195    def requests_delete(self, endpoint: str, data: Optional[dict] = None):
196        request = self.requests.delete(
197            self.__get_url(endpoint), headers=self.headers, data=json.dumps(data)
198        )
199        if request.status_code not in [200, 204]:
200            message = f"Received status code: {request.status_code} ({request.url})"
201            self.logger.error(message)
202            raise Exception(message)
203
204    def requests_post(
205        self,
206        endpoint: str,
207        data: Optional[dict] = None,
208        params: Optional[dict] = None,
209        files: Optional[dict] = None,
210    ):
211        """
212        Make a POST call to the endpoint.
213
214        :param endpoint: The path to the endpoint
215        :param data: A dictionary for JSON data
216        :param params: A dictionary of query params
217        :param files: A dictionary of files, see requests.post. Using both files and data
218                      can lead to unexpected results!
219        :return: The JSON response parsed as a dict
220        """
221
222        # This should ideally be a TypedDict of the type of arguments taken by
223        # `requests.post`.
224        args: dict[str, Any] = {
225            "headers": self.headers.copy(),
226        }
227        if data is not None:
228            args["data"] = json.dumps(data)
229        if params is not None:
230            args["params"] = params
231        if files is not None:
232            args["headers"].pop("Content-type")
233            args["files"] = files
234
235        request = self.requests.post(self.__get_url(endpoint), **args)
236
237        if request.status_code not in [200, 201, 202]:
238            if "already exists" in request.text or "e-mail already in use" in request.text:
239                self.logger.warning(request.text)
240                raise AlreadyExistsException()
241            self.logger.error(f"Received status code: {request.status_code} ({request.url})")
242            self.logger.error(f"With info: {data} ({self.headers})")
243            self.logger.error(f"Answer: {request.text}")
244            raise Exception(
245                f"Received status code: {request.status_code} ({request.url}), {request.text}"
246            )
247        return self.parse_result(request)
248
249    def requests_patch(self, endpoint: str, data: dict):
250        request = self.requests.patch(
251            self.__get_url(endpoint), headers=self.headers, data=json.dumps(data)
252        )
253        if request.status_code not in [200, 201]:
254            error_message = f"Received status code: {request.status_code} ({request.url}) {data}"
255            self.logger.error(error_message)
256            raise Exception(error_message)
257        return self.parse_result(request)
258
259    def get_orgs_public_members_all(self, orgname):
260        path = "/orgs/" + orgname + "/public_members"
261        return self.requests_get(path)
262
263    def get_orgs(self):
264        path = "/admin/orgs"
265        results = self.requests_get(path)
266        return [Organization.parse_response(self, result) for result in results]
267
268    def get_user(self):
269        result = self.requests_get(AllSpice.GET_USER)
270        return User.parse_response(self, result)
271
272    def get_version(self) -> str:
273        result = self.requests_get(AllSpice.ALLSPICE_HUB_VERSION)
274        return result["version"]
275
276    def get_users(self) -> List[User]:
277        results = self.requests_get(AllSpice.GET_USERS_ADMIN)
278        return [User.parse_response(self, result) for result in results]
279
280    def get_user_by_email(self, email: str) -> Optional[User]:
281        users = self.get_users()
282        for user in users:
283            if user.email == email or email in user.emails:
284                return user
285        return None
286
287    def get_user_by_name(self, username: str) -> Optional[User]:
288        users = self.get_users()
289        for user in users:
290            if user.username == username:
291                return user
292        return None
293
294    def get_repository(self, owner: str, name: str) -> Repository:
295        path = self.GET_REPOSITORY.format(owner=owner, name=name)
296        result = self.requests_get(path)
297        return Repository.parse_response(self, result)
298
299    def create_user(
300        self,
301        user_name: str,
302        email: str,
303        password: str,
304        full_name: Optional[str] = None,
305        login_name: Optional[str] = None,
306        change_pw=True,
307        send_notify=True,
308        source_id=0,
309    ):
310        """Create User.
311        Throws:
312            AlreadyExistsException, if the User exists already
313            Exception, if something else went wrong.
314        """
315        if not login_name:
316            login_name = user_name
317        if not full_name:
318            full_name = user_name
319        request_data = {
320            "source_id": source_id,
321            "login_name": login_name,
322            "full_name": full_name,
323            "username": user_name,
324            "email": email,
325            "password": password,
326            "send_notify": send_notify,
327            "must_change_password": change_pw,
328        }
329
330        self.logger.debug("Gitea post payload: %s", request_data)
331        result = self.requests_post(AllSpice.ADMIN_CREATE_USER, data=request_data)
332        if "id" in result:
333            self.logger.info(
334                "Successfully created User %s <%s> (id %s)",
335                result["login"],
336                result["email"],
337                result["id"],
338            )
339            self.logger.debug("Gitea response: %s", result)
340        else:
341            self.logger.error(result["message"])
342            raise Exception("User not created... (gitea: %s)" % result["message"])
343        user = User.parse_response(self, result)
344        return user
345
346    def create_repo(
347        self,
348        repoOwner: Union[User, Organization],
349        repoName: str,
350        description: str = "",
351        private: bool = False,
352        autoInit=True,
353        gitignores: Optional[str] = None,
354        license: Optional[str] = None,
355        readme: str = "Default",
356        issue_labels: Optional[str] = None,
357        default_branch="master",
358    ):
359        """Create a Repository as the administrator
360
361        Throws:
362            AlreadyExistsException: If the Repository exists already.
363            Exception: If something else went wrong.
364
365        Note:
366            Non-admin users can not use this method. Please use instead
367            `allspice.User.create_repo` or `allspice.Organization.create_repo`.
368        """
369        # although this only says user in the api, this also works for
370        # organizations
371        assert isinstance(repoOwner, User) or isinstance(repoOwner, Organization)
372        result = self.requests_post(
373            AllSpice.ADMIN_REPO_CREATE % repoOwner.username,
374            data={
375                "name": repoName,
376                "description": description,
377                "private": private,
378                "auto_init": autoInit,
379                "gitignores": gitignores,
380                "license": license,
381                "issue_labels": issue_labels,
382                "readme": readme,
383                "default_branch": default_branch,
384            },
385        )
386        if "id" in result:
387            self.logger.info("Successfully created Repository %s " % result["name"])
388        else:
389            self.logger.error(result["message"])
390            raise Exception("Repository not created... (gitea: %s)" % result["message"])
391        return Repository.parse_response(self, result)
392
393    def create_org(
394        self,
395        owner: User,
396        orgName: str,
397        description: str,
398        location="",
399        website="",
400        full_name="",
401    ):
402        assert isinstance(owner, User)
403        result = self.requests_post(
404            AllSpice.CREATE_ORG % owner.username,
405            data={
406                "username": orgName,
407                "description": description,
408                "location": location,
409                "website": website,
410                "full_name": full_name,
411            },
412        )
413        if "id" in result:
414            self.logger.info("Successfully created Organization %s" % result["username"])
415        else:
416            self.logger.error("Organization not created... (gitea: %s)" % result["message"])
417            self.logger.error(result["message"])
418            raise Exception("Organization not created... (gitea: %s)" % result["message"])
419        return Organization.parse_response(self, result)
420
421    def create_team(
422        self,
423        org: Organization,
424        name: str,
425        description: str = "",
426        permission: str = "read",
427        can_create_org_repo: bool = False,
428        includes_all_repositories: bool = False,
429        units=(
430            "repo.code",
431            "repo.issues",
432            "repo.ext_issues",
433            "repo.wiki",
434            "repo.pulls",
435            "repo.releases",
436            "repo.ext_wiki",
437        ),
438        units_map={},
439    ):
440        """Creates a Team.
441
442        Args:
443            org (Organization): Organization the Team will be part of.
444            name (str): The Name of the Team to be created.
445            description (str): Optional, None, short description of the new Team.
446            permission (str): Optional, 'read', What permissions the members
447            units_map (dict): Optional, {}, a mapping of units to their
448                permissions. If None or empty, the `permission` permission will
449                be applied to all units. Note: When both `units` and `units_map`
450                are given, `units_map` will be preferred.
451        """
452
453        result = self.requests_post(
454            AllSpice.CREATE_TEAM % org.username,
455            data={
456                "name": name,
457                "description": description,
458                "permission": permission,
459                "can_create_org_repo": can_create_org_repo,
460                "includes_all_repositories": includes_all_repositories,
461                "units": units,
462                "units_map": units_map,
463            },
464        )
465
466        if "id" in result:
467            self.logger.info("Successfully created Team %s" % result["name"])
468        else:
469            self.logger.error("Team not created... (gitea: %s)" % result["message"])
470            self.logger.error(result["message"])
471            raise Exception("Team not created... (gitea: %s)" % result["message"])
472        api_object = Team.parse_response(self, result)
473        setattr(
474            api_object, "_organization", org
475        )  # fixes strange behaviour of gitea not returning a valid organization here.
476        return api_object
class AllSpice:
 21class AllSpice:
 22    """Object to establish a session with AllSpice Hub."""
 23
 24    ADMIN_CREATE_USER = """/admin/users"""
 25    GET_USERS_ADMIN = """/admin/users"""
 26    ADMIN_REPO_CREATE = """/admin/users/%s/repos"""  # <ownername>
 27    ALLSPICE_HUB_VERSION = """/version"""
 28    GET_USER = """/user"""
 29    GET_REPOSITORY = """/repos/{owner}/{name}"""
 30    CREATE_ORG = """/admin/users/%s/orgs"""  # <username>
 31    CREATE_TEAM = """/orgs/%s/teams"""  # <orgname>
 32
 33    def __init__(
 34        self,
 35        allspice_hub_url="https://hub.allspice.io",
 36        token_text=None,
 37        auth=None,
 38        verify=True,
 39        log_level="INFO",
 40        ratelimiting=(100, 60),
 41        use_new_schdoc_renderer: Optional[bool] = None,
 42    ):
 43        """Initializing an instance of the AllSpice Hub Client
 44
 45        Args:
 46            allspice_hub_url (str): The URL for the AllSpice Hub instance.
 47                Defaults to `https://hub.allspice.io`.
 48
 49            token_text (str, None): The access token, by default None.
 50
 51            auth (tuple, None): The user credentials
 52                `(username, password)`, by default None.
 53
 54            verify (bool): If True, allow insecure server connections
 55                when using SSL.
 56
 57            log_level (str): The log level, by default `INFO`.
 58
 59            ratelimiting (tuple[int, int], None): `(max_calls, period)`,
 60                If None, no rate limiting is applied. By default, 100 calls
 61                per minute are allowed.
 62
 63            use_new_schdoc_renderer (bool): Allows explicit override for using the new Altium schematic renderer. If set,
 64            this will take precedence over the default behavior on the AllSpice Hub instance.
 65        """
 66
 67        self.logger = logging.getLogger(__name__)
 68        handler = logging.StreamHandler(sys.stderr)
 69        handler.setFormatter(
 70            logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
 71        )
 72        self.logger.addHandler(handler)
 73        self.logger.setLevel(log_level)
 74        self.headers = {
 75            "Content-type": "application/json",
 76        }
 77        self.url = allspice_hub_url
 78
 79        if ratelimiting is None:
 80            self.requests = requests.Session()
 81        else:
 82            (max_calls, period) = ratelimiting
 83            self.requests = RateLimitedSession(max_calls=max_calls, period=period)
 84
 85        # Manage authentification
 86        if not token_text and not auth:
 87            raise ValueError("Please provide auth or token_text, but not both")
 88        if token_text:
 89            self.headers["Authorization"] = "token " + token_text
 90        if auth:
 91            self.logger.warning(
 92                "Using basic auth is not recommended. Prefer using a token instead."
 93            )
 94            self.requests.auth = auth
 95
 96        # Manage SSL certification verification
 97        self.requests.verify = verify
 98        if not verify:
 99            urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
100
101        self.use_new_schdoc_renderer = use_new_schdoc_renderer
102
103    def __get_url(self, endpoint):
104        url = self.url + "/api/v1" + endpoint
105        self.logger.debug("Url: %s" % url)
106        return url
107
108    def __get(self, endpoint: str, params: Mapping = frozendict()) -> requests.Response:
109        request = self.requests.get(self.__get_url(endpoint), headers=self.headers, params=params)
110        if request.status_code not in [200, 201]:
111            message = f"Received status code: {request.status_code} ({request.url})"
112            if request.status_code in [404]:
113                raise NotFoundException(message)
114            if request.status_code in [403]:
115                raise Exception(
116                    f"Unauthorized: {request.url} - Check your permissions and try again! ({message})"
117                )
118            if request.status_code in [409]:
119                raise ConflictException(message)
120            if request.status_code in [503]:
121                raise NotYetGeneratedException(message)
122            raise Exception(message)
123        return request
124
125    @staticmethod
126    def parse_result(result) -> Dict:
127        """Parses the result-JSON to a dict."""
128        if result.text and len(result.text) > 3:
129            return json.loads(result.text)
130        return {}
131
132    def requests_get(self, endpoint: str, params: Mapping = frozendict(), sudo=None):
133        combined_params = {}
134        combined_params.update(params)
135        if sudo:
136            combined_params["sudo"] = sudo.username
137        return self.parse_result(self.__get(endpoint, combined_params))
138
139    def requests_get_raw(self, endpoint: str, params=frozendict(), sudo=None) -> bytes:
140        combined_params = {}
141        combined_params.update(params)
142        if sudo:
143            combined_params["sudo"] = sudo.username
144        return self.__get(endpoint, combined_params).content
145
146    def requests_get_paginated(
147        self,
148        endpoint: str,
149        params=frozendict(),
150        sudo=None,
151        page_key: str = "page",
152        first_page: int = 1,
153    ):
154        page = first_page
155        combined_params = {}
156        combined_params.update(params)
157        aggregated_result = []
158        while True:
159            combined_params[page_key] = page
160            result = self.requests_get(endpoint, combined_params, sudo)
161
162            if not result:
163                return aggregated_result
164
165            if isinstance(result, dict):
166                if "data" in result:
167                    data = result["data"]
168                    if len(data) == 0:
169                        return aggregated_result
170                    aggregated_result.extend(data)
171                elif "tree" in result:
172                    data = result["tree"]
173                    if data is None or len(data) == 0:
174                        return aggregated_result
175                    aggregated_result.extend(data)
176                else:
177                    raise NotImplementedError(
178                        "requests_get_paginated does not know how to handle responses of this type."
179                    )
180            else:
181                aggregated_result.extend(result)
182
183            page += 1
184
185    def requests_put(self, endpoint: str, data: Optional[dict] = None):
186        if not data:
187            data = {}
188        request = self.requests.put(
189            self.__get_url(endpoint), headers=self.headers, data=json.dumps(data)
190        )
191        if request.status_code not in [200, 204]:
192            message = f"Received status code: {request.status_code} ({request.url}) {request.text}"
193            self.logger.error(message)
194            raise Exception(message)
195
196    def requests_delete(self, endpoint: str, data: Optional[dict] = None):
197        request = self.requests.delete(
198            self.__get_url(endpoint), headers=self.headers, data=json.dumps(data)
199        )
200        if request.status_code not in [200, 204]:
201            message = f"Received status code: {request.status_code} ({request.url})"
202            self.logger.error(message)
203            raise Exception(message)
204
205    def requests_post(
206        self,
207        endpoint: str,
208        data: Optional[dict] = None,
209        params: Optional[dict] = None,
210        files: Optional[dict] = None,
211    ):
212        """
213        Make a POST call to the endpoint.
214
215        :param endpoint: The path to the endpoint
216        :param data: A dictionary for JSON data
217        :param params: A dictionary of query params
218        :param files: A dictionary of files, see requests.post. Using both files and data
219                      can lead to unexpected results!
220        :return: The JSON response parsed as a dict
221        """
222
223        # This should ideally be a TypedDict of the type of arguments taken by
224        # `requests.post`.
225        args: dict[str, Any] = {
226            "headers": self.headers.copy(),
227        }
228        if data is not None:
229            args["data"] = json.dumps(data)
230        if params is not None:
231            args["params"] = params
232        if files is not None:
233            args["headers"].pop("Content-type")
234            args["files"] = files
235
236        request = self.requests.post(self.__get_url(endpoint), **args)
237
238        if request.status_code not in [200, 201, 202]:
239            if "already exists" in request.text or "e-mail already in use" in request.text:
240                self.logger.warning(request.text)
241                raise AlreadyExistsException()
242            self.logger.error(f"Received status code: {request.status_code} ({request.url})")
243            self.logger.error(f"With info: {data} ({self.headers})")
244            self.logger.error(f"Answer: {request.text}")
245            raise Exception(
246                f"Received status code: {request.status_code} ({request.url}), {request.text}"
247            )
248        return self.parse_result(request)
249
250    def requests_patch(self, endpoint: str, data: dict):
251        request = self.requests.patch(
252            self.__get_url(endpoint), headers=self.headers, data=json.dumps(data)
253        )
254        if request.status_code not in [200, 201]:
255            error_message = f"Received status code: {request.status_code} ({request.url}) {data}"
256            self.logger.error(error_message)
257            raise Exception(error_message)
258        return self.parse_result(request)
259
260    def get_orgs_public_members_all(self, orgname):
261        path = "/orgs/" + orgname + "/public_members"
262        return self.requests_get(path)
263
264    def get_orgs(self):
265        path = "/admin/orgs"
266        results = self.requests_get(path)
267        return [Organization.parse_response(self, result) for result in results]
268
269    def get_user(self):
270        result = self.requests_get(AllSpice.GET_USER)
271        return User.parse_response(self, result)
272
273    def get_version(self) -> str:
274        result = self.requests_get(AllSpice.ALLSPICE_HUB_VERSION)
275        return result["version"]
276
277    def get_users(self) -> List[User]:
278        results = self.requests_get(AllSpice.GET_USERS_ADMIN)
279        return [User.parse_response(self, result) for result in results]
280
281    def get_user_by_email(self, email: str) -> Optional[User]:
282        users = self.get_users()
283        for user in users:
284            if user.email == email or email in user.emails:
285                return user
286        return None
287
288    def get_user_by_name(self, username: str) -> Optional[User]:
289        users = self.get_users()
290        for user in users:
291            if user.username == username:
292                return user
293        return None
294
295    def get_repository(self, owner: str, name: str) -> Repository:
296        path = self.GET_REPOSITORY.format(owner=owner, name=name)
297        result = self.requests_get(path)
298        return Repository.parse_response(self, result)
299
300    def create_user(
301        self,
302        user_name: str,
303        email: str,
304        password: str,
305        full_name: Optional[str] = None,
306        login_name: Optional[str] = None,
307        change_pw=True,
308        send_notify=True,
309        source_id=0,
310    ):
311        """Create User.
312        Throws:
313            AlreadyExistsException, if the User exists already
314            Exception, if something else went wrong.
315        """
316        if not login_name:
317            login_name = user_name
318        if not full_name:
319            full_name = user_name
320        request_data = {
321            "source_id": source_id,
322            "login_name": login_name,
323            "full_name": full_name,
324            "username": user_name,
325            "email": email,
326            "password": password,
327            "send_notify": send_notify,
328            "must_change_password": change_pw,
329        }
330
331        self.logger.debug("Gitea post payload: %s", request_data)
332        result = self.requests_post(AllSpice.ADMIN_CREATE_USER, data=request_data)
333        if "id" in result:
334            self.logger.info(
335                "Successfully created User %s <%s> (id %s)",
336                result["login"],
337                result["email"],
338                result["id"],
339            )
340            self.logger.debug("Gitea response: %s", result)
341        else:
342            self.logger.error(result["message"])
343            raise Exception("User not created... (gitea: %s)" % result["message"])
344        user = User.parse_response(self, result)
345        return user
346
347    def create_repo(
348        self,
349        repoOwner: Union[User, Organization],
350        repoName: str,
351        description: str = "",
352        private: bool = False,
353        autoInit=True,
354        gitignores: Optional[str] = None,
355        license: Optional[str] = None,
356        readme: str = "Default",
357        issue_labels: Optional[str] = None,
358        default_branch="master",
359    ):
360        """Create a Repository as the administrator
361
362        Throws:
363            AlreadyExistsException: If the Repository exists already.
364            Exception: If something else went wrong.
365
366        Note:
367            Non-admin users can not use this method. Please use instead
368            `allspice.User.create_repo` or `allspice.Organization.create_repo`.
369        """
370        # although this only says user in the api, this also works for
371        # organizations
372        assert isinstance(repoOwner, User) or isinstance(repoOwner, Organization)
373        result = self.requests_post(
374            AllSpice.ADMIN_REPO_CREATE % repoOwner.username,
375            data={
376                "name": repoName,
377                "description": description,
378                "private": private,
379                "auto_init": autoInit,
380                "gitignores": gitignores,
381                "license": license,
382                "issue_labels": issue_labels,
383                "readme": readme,
384                "default_branch": default_branch,
385            },
386        )
387        if "id" in result:
388            self.logger.info("Successfully created Repository %s " % result["name"])
389        else:
390            self.logger.error(result["message"])
391            raise Exception("Repository not created... (gitea: %s)" % result["message"])
392        return Repository.parse_response(self, result)
393
394    def create_org(
395        self,
396        owner: User,
397        orgName: str,
398        description: str,
399        location="",
400        website="",
401        full_name="",
402    ):
403        assert isinstance(owner, User)
404        result = self.requests_post(
405            AllSpice.CREATE_ORG % owner.username,
406            data={
407                "username": orgName,
408                "description": description,
409                "location": location,
410                "website": website,
411                "full_name": full_name,
412            },
413        )
414        if "id" in result:
415            self.logger.info("Successfully created Organization %s" % result["username"])
416        else:
417            self.logger.error("Organization not created... (gitea: %s)" % result["message"])
418            self.logger.error(result["message"])
419            raise Exception("Organization not created... (gitea: %s)" % result["message"])
420        return Organization.parse_response(self, result)
421
422    def create_team(
423        self,
424        org: Organization,
425        name: str,
426        description: str = "",
427        permission: str = "read",
428        can_create_org_repo: bool = False,
429        includes_all_repositories: bool = False,
430        units=(
431            "repo.code",
432            "repo.issues",
433            "repo.ext_issues",
434            "repo.wiki",
435            "repo.pulls",
436            "repo.releases",
437            "repo.ext_wiki",
438        ),
439        units_map={},
440    ):
441        """Creates a Team.
442
443        Args:
444            org (Organization): Organization the Team will be part of.
445            name (str): The Name of the Team to be created.
446            description (str): Optional, None, short description of the new Team.
447            permission (str): Optional, 'read', What permissions the members
448            units_map (dict): Optional, {}, a mapping of units to their
449                permissions. If None or empty, the `permission` permission will
450                be applied to all units. Note: When both `units` and `units_map`
451                are given, `units_map` will be preferred.
452        """
453
454        result = self.requests_post(
455            AllSpice.CREATE_TEAM % org.username,
456            data={
457                "name": name,
458                "description": description,
459                "permission": permission,
460                "can_create_org_repo": can_create_org_repo,
461                "includes_all_repositories": includes_all_repositories,
462                "units": units,
463                "units_map": units_map,
464            },
465        )
466
467        if "id" in result:
468            self.logger.info("Successfully created Team %s" % result["name"])
469        else:
470            self.logger.error("Team not created... (gitea: %s)" % result["message"])
471            self.logger.error(result["message"])
472            raise Exception("Team not created... (gitea: %s)" % result["message"])
473        api_object = Team.parse_response(self, result)
474        setattr(
475            api_object, "_organization", org
476        )  # fixes strange behaviour of gitea not returning a valid organization here.
477        return api_object

Object to establish a session with AllSpice Hub.

AllSpice( allspice_hub_url='https://hub.allspice.io', token_text=None, auth=None, verify=True, log_level='INFO', ratelimiting=(100, 60), use_new_schdoc_renderer: Optional[bool] = None)
 33    def __init__(
 34        self,
 35        allspice_hub_url="https://hub.allspice.io",
 36        token_text=None,
 37        auth=None,
 38        verify=True,
 39        log_level="INFO",
 40        ratelimiting=(100, 60),
 41        use_new_schdoc_renderer: Optional[bool] = None,
 42    ):
 43        """Initializing an instance of the AllSpice Hub Client
 44
 45        Args:
 46            allspice_hub_url (str): The URL for the AllSpice Hub instance.
 47                Defaults to `https://hub.allspice.io`.
 48
 49            token_text (str, None): The access token, by default None.
 50
 51            auth (tuple, None): The user credentials
 52                `(username, password)`, by default None.
 53
 54            verify (bool): If True, allow insecure server connections
 55                when using SSL.
 56
 57            log_level (str): The log level, by default `INFO`.
 58
 59            ratelimiting (tuple[int, int], None): `(max_calls, period)`,
 60                If None, no rate limiting is applied. By default, 100 calls
 61                per minute are allowed.
 62
 63            use_new_schdoc_renderer (bool): Allows explicit override for using the new Altium schematic renderer. If set,
 64            this will take precedence over the default behavior on the AllSpice Hub instance.
 65        """
 66
 67        self.logger = logging.getLogger(__name__)
 68        handler = logging.StreamHandler(sys.stderr)
 69        handler.setFormatter(
 70            logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
 71        )
 72        self.logger.addHandler(handler)
 73        self.logger.setLevel(log_level)
 74        self.headers = {
 75            "Content-type": "application/json",
 76        }
 77        self.url = allspice_hub_url
 78
 79        if ratelimiting is None:
 80            self.requests = requests.Session()
 81        else:
 82            (max_calls, period) = ratelimiting
 83            self.requests = RateLimitedSession(max_calls=max_calls, period=period)
 84
 85        # Manage authentification
 86        if not token_text and not auth:
 87            raise ValueError("Please provide auth or token_text, but not both")
 88        if token_text:
 89            self.headers["Authorization"] = "token " + token_text
 90        if auth:
 91            self.logger.warning(
 92                "Using basic auth is not recommended. Prefer using a token instead."
 93            )
 94            self.requests.auth = auth
 95
 96        # Manage SSL certification verification
 97        self.requests.verify = verify
 98        if not verify:
 99            urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
100
101        self.use_new_schdoc_renderer = use_new_schdoc_renderer

Initializing an instance of the AllSpice Hub Client

Args: allspice_hub_url (str): The URL for the AllSpice Hub instance. Defaults to https://hub.allspice.io.

token_text (str, None): The access token, by default None.

auth (tuple, None): The user credentials
    `(username, password)`, by default None.

verify (bool): If True, allow insecure server connections
    when using SSL.

log_level (str): The log level, by default `INFO`.

ratelimiting (tuple[int, int], None): `(max_calls, period)`,
    If None, no rate limiting is applied. By default, 100 calls
    per minute are allowed.

use_new_schdoc_renderer (bool): Allows explicit override for using the new Altium schematic renderer. If set,
this will take precedence over the default behavior on the AllSpice Hub instance.
ADMIN_CREATE_USER = '/admin/users'
GET_USERS_ADMIN = '/admin/users'
ADMIN_REPO_CREATE = '/admin/users/%s/repos'
ALLSPICE_HUB_VERSION = '/version'
GET_USER = '/user'
GET_REPOSITORY = '/repos/{owner}/{name}'
CREATE_ORG = '/admin/users/%s/orgs'
CREATE_TEAM = '/orgs/%s/teams'
logger
headers
url
use_new_schdoc_renderer
@staticmethod
def parse_result(result) -> Dict:
125    @staticmethod
126    def parse_result(result) -> Dict:
127        """Parses the result-JSON to a dict."""
128        if result.text and len(result.text) > 3:
129            return json.loads(result.text)
130        return {}

Parses the result-JSON to a dict.

def requests_get( self, endpoint: str, params: Mapping = frozendict.frozendict({}), sudo=None):
132    def requests_get(self, endpoint: str, params: Mapping = frozendict(), sudo=None):
133        combined_params = {}
134        combined_params.update(params)
135        if sudo:
136            combined_params["sudo"] = sudo.username
137        return self.parse_result(self.__get(endpoint, combined_params))
def requests_get_raw( self, endpoint: str, params=frozendict.frozendict({}), sudo=None) -> bytes:
139    def requests_get_raw(self, endpoint: str, params=frozendict(), sudo=None) -> bytes:
140        combined_params = {}
141        combined_params.update(params)
142        if sudo:
143            combined_params["sudo"] = sudo.username
144        return self.__get(endpoint, combined_params).content
def requests_get_paginated( self, endpoint: str, params=frozendict.frozendict({}), sudo=None, page_key: str = 'page', first_page: int = 1):
146    def requests_get_paginated(
147        self,
148        endpoint: str,
149        params=frozendict(),
150        sudo=None,
151        page_key: str = "page",
152        first_page: int = 1,
153    ):
154        page = first_page
155        combined_params = {}
156        combined_params.update(params)
157        aggregated_result = []
158        while True:
159            combined_params[page_key] = page
160            result = self.requests_get(endpoint, combined_params, sudo)
161
162            if not result:
163                return aggregated_result
164
165            if isinstance(result, dict):
166                if "data" in result:
167                    data = result["data"]
168                    if len(data) == 0:
169                        return aggregated_result
170                    aggregated_result.extend(data)
171                elif "tree" in result:
172                    data = result["tree"]
173                    if data is None or len(data) == 0:
174                        return aggregated_result
175                    aggregated_result.extend(data)
176                else:
177                    raise NotImplementedError(
178                        "requests_get_paginated does not know how to handle responses of this type."
179                    )
180            else:
181                aggregated_result.extend(result)
182
183            page += 1
def requests_put(self, endpoint: str, data: Optional[dict] = None):
185    def requests_put(self, endpoint: str, data: Optional[dict] = None):
186        if not data:
187            data = {}
188        request = self.requests.put(
189            self.__get_url(endpoint), headers=self.headers, data=json.dumps(data)
190        )
191        if request.status_code not in [200, 204]:
192            message = f"Received status code: {request.status_code} ({request.url}) {request.text}"
193            self.logger.error(message)
194            raise Exception(message)
def requests_delete(self, endpoint: str, data: Optional[dict] = None):
196    def requests_delete(self, endpoint: str, data: Optional[dict] = None):
197        request = self.requests.delete(
198            self.__get_url(endpoint), headers=self.headers, data=json.dumps(data)
199        )
200        if request.status_code not in [200, 204]:
201            message = f"Received status code: {request.status_code} ({request.url})"
202            self.logger.error(message)
203            raise Exception(message)
def requests_post( self, endpoint: str, data: Optional[dict] = None, params: Optional[dict] = None, files: Optional[dict] = None):
205    def requests_post(
206        self,
207        endpoint: str,
208        data: Optional[dict] = None,
209        params: Optional[dict] = None,
210        files: Optional[dict] = None,
211    ):
212        """
213        Make a POST call to the endpoint.
214
215        :param endpoint: The path to the endpoint
216        :param data: A dictionary for JSON data
217        :param params: A dictionary of query params
218        :param files: A dictionary of files, see requests.post. Using both files and data
219                      can lead to unexpected results!
220        :return: The JSON response parsed as a dict
221        """
222
223        # This should ideally be a TypedDict of the type of arguments taken by
224        # `requests.post`.
225        args: dict[str, Any] = {
226            "headers": self.headers.copy(),
227        }
228        if data is not None:
229            args["data"] = json.dumps(data)
230        if params is not None:
231            args["params"] = params
232        if files is not None:
233            args["headers"].pop("Content-type")
234            args["files"] = files
235
236        request = self.requests.post(self.__get_url(endpoint), **args)
237
238        if request.status_code not in [200, 201, 202]:
239            if "already exists" in request.text or "e-mail already in use" in request.text:
240                self.logger.warning(request.text)
241                raise AlreadyExistsException()
242            self.logger.error(f"Received status code: {request.status_code} ({request.url})")
243            self.logger.error(f"With info: {data} ({self.headers})")
244            self.logger.error(f"Answer: {request.text}")
245            raise Exception(
246                f"Received status code: {request.status_code} ({request.url}), {request.text}"
247            )
248        return self.parse_result(request)

Make a POST call to the endpoint.

Parameters
  • endpoint: The path to the endpoint
  • data: A dictionary for JSON data
  • params: A dictionary of query params
  • files: A dictionary of files, see requests.post. Using both files and data can lead to unexpected results!
Returns

The JSON response parsed as a dict

def requests_patch(self, endpoint: str, data: dict):
250    def requests_patch(self, endpoint: str, data: dict):
251        request = self.requests.patch(
252            self.__get_url(endpoint), headers=self.headers, data=json.dumps(data)
253        )
254        if request.status_code not in [200, 201]:
255            error_message = f"Received status code: {request.status_code} ({request.url}) {data}"
256            self.logger.error(error_message)
257            raise Exception(error_message)
258        return self.parse_result(request)
def get_orgs_public_members_all(self, orgname):
260    def get_orgs_public_members_all(self, orgname):
261        path = "/orgs/" + orgname + "/public_members"
262        return self.requests_get(path)
def get_orgs(self):
264    def get_orgs(self):
265        path = "/admin/orgs"
266        results = self.requests_get(path)
267        return [Organization.parse_response(self, result) for result in results]
def get_user(self):
269    def get_user(self):
270        result = self.requests_get(AllSpice.GET_USER)
271        return User.parse_response(self, result)
def get_version(self) -> str:
273    def get_version(self) -> str:
274        result = self.requests_get(AllSpice.ALLSPICE_HUB_VERSION)
275        return result["version"]
def get_users(self) -> List[allspice.User]:
277    def get_users(self) -> List[User]:
278        results = self.requests_get(AllSpice.GET_USERS_ADMIN)
279        return [User.parse_response(self, result) for result in results]
def get_user_by_email(self, email: str) -> Optional[allspice.User]:
281    def get_user_by_email(self, email: str) -> Optional[User]:
282        users = self.get_users()
283        for user in users:
284            if user.email == email or email in user.emails:
285                return user
286        return None
def get_user_by_name(self, username: str) -> Optional[allspice.User]:
288    def get_user_by_name(self, username: str) -> Optional[User]:
289        users = self.get_users()
290        for user in users:
291            if user.username == username:
292                return user
293        return None
def get_repository(self, owner: str, name: str) -> allspice.Repository:
295    def get_repository(self, owner: str, name: str) -> Repository:
296        path = self.GET_REPOSITORY.format(owner=owner, name=name)
297        result = self.requests_get(path)
298        return Repository.parse_response(self, result)
def create_user( self, user_name: str, email: str, password: str, full_name: Optional[str] = None, login_name: Optional[str] = None, change_pw=True, send_notify=True, source_id=0):
300    def create_user(
301        self,
302        user_name: str,
303        email: str,
304        password: str,
305        full_name: Optional[str] = None,
306        login_name: Optional[str] = None,
307        change_pw=True,
308        send_notify=True,
309        source_id=0,
310    ):
311        """Create User.
312        Throws:
313            AlreadyExistsException, if the User exists already
314            Exception, if something else went wrong.
315        """
316        if not login_name:
317            login_name = user_name
318        if not full_name:
319            full_name = user_name
320        request_data = {
321            "source_id": source_id,
322            "login_name": login_name,
323            "full_name": full_name,
324            "username": user_name,
325            "email": email,
326            "password": password,
327            "send_notify": send_notify,
328            "must_change_password": change_pw,
329        }
330
331        self.logger.debug("Gitea post payload: %s", request_data)
332        result = self.requests_post(AllSpice.ADMIN_CREATE_USER, data=request_data)
333        if "id" in result:
334            self.logger.info(
335                "Successfully created User %s <%s> (id %s)",
336                result["login"],
337                result["email"],
338                result["id"],
339            )
340            self.logger.debug("Gitea response: %s", result)
341        else:
342            self.logger.error(result["message"])
343            raise Exception("User not created... (gitea: %s)" % result["message"])
344        user = User.parse_response(self, result)
345        return user

Create User. Throws: AlreadyExistsException, if the User exists already Exception, if something else went wrong.

def create_repo( self, repoOwner: Union[allspice.User, allspice.Organization], repoName: str, description: str = '', private: bool = False, autoInit=True, gitignores: Optional[str] = None, license: Optional[str] = None, readme: str = 'Default', issue_labels: Optional[str] = None, default_branch='master'):
347    def create_repo(
348        self,
349        repoOwner: Union[User, Organization],
350        repoName: str,
351        description: str = "",
352        private: bool = False,
353        autoInit=True,
354        gitignores: Optional[str] = None,
355        license: Optional[str] = None,
356        readme: str = "Default",
357        issue_labels: Optional[str] = None,
358        default_branch="master",
359    ):
360        """Create a Repository as the administrator
361
362        Throws:
363            AlreadyExistsException: If the Repository exists already.
364            Exception: If something else went wrong.
365
366        Note:
367            Non-admin users can not use this method. Please use instead
368            `allspice.User.create_repo` or `allspice.Organization.create_repo`.
369        """
370        # although this only says user in the api, this also works for
371        # organizations
372        assert isinstance(repoOwner, User) or isinstance(repoOwner, Organization)
373        result = self.requests_post(
374            AllSpice.ADMIN_REPO_CREATE % repoOwner.username,
375            data={
376                "name": repoName,
377                "description": description,
378                "private": private,
379                "auto_init": autoInit,
380                "gitignores": gitignores,
381                "license": license,
382                "issue_labels": issue_labels,
383                "readme": readme,
384                "default_branch": default_branch,
385            },
386        )
387        if "id" in result:
388            self.logger.info("Successfully created Repository %s " % result["name"])
389        else:
390            self.logger.error(result["message"])
391            raise Exception("Repository not created... (gitea: %s)" % result["message"])
392        return Repository.parse_response(self, result)

Create a Repository as the administrator

Throws: AlreadyExistsException: If the Repository exists already. Exception: If something else went wrong.

Note: Non-admin users can not use this method. Please use instead allspice.User.create_repo or allspice.Organization.create_repo.

def create_org( self, owner: allspice.User, orgName: str, description: str, location='', website='', full_name=''):
394    def create_org(
395        self,
396        owner: User,
397        orgName: str,
398        description: str,
399        location="",
400        website="",
401        full_name="",
402    ):
403        assert isinstance(owner, User)
404        result = self.requests_post(
405            AllSpice.CREATE_ORG % owner.username,
406            data={
407                "username": orgName,
408                "description": description,
409                "location": location,
410                "website": website,
411                "full_name": full_name,
412            },
413        )
414        if "id" in result:
415            self.logger.info("Successfully created Organization %s" % result["username"])
416        else:
417            self.logger.error("Organization not created... (gitea: %s)" % result["message"])
418            self.logger.error(result["message"])
419            raise Exception("Organization not created... (gitea: %s)" % result["message"])
420        return Organization.parse_response(self, result)
def create_team( self, org: allspice.Organization, name: str, description: str = '', permission: str = 'read', can_create_org_repo: bool = False, includes_all_repositories: bool = False, units=('repo.code', 'repo.issues', 'repo.ext_issues', 'repo.wiki', 'repo.pulls', 'repo.releases', 'repo.ext_wiki'), units_map={}):
422    def create_team(
423        self,
424        org: Organization,
425        name: str,
426        description: str = "",
427        permission: str = "read",
428        can_create_org_repo: bool = False,
429        includes_all_repositories: bool = False,
430        units=(
431            "repo.code",
432            "repo.issues",
433            "repo.ext_issues",
434            "repo.wiki",
435            "repo.pulls",
436            "repo.releases",
437            "repo.ext_wiki",
438        ),
439        units_map={},
440    ):
441        """Creates a Team.
442
443        Args:
444            org (Organization): Organization the Team will be part of.
445            name (str): The Name of the Team to be created.
446            description (str): Optional, None, short description of the new Team.
447            permission (str): Optional, 'read', What permissions the members
448            units_map (dict): Optional, {}, a mapping of units to their
449                permissions. If None or empty, the `permission` permission will
450                be applied to all units. Note: When both `units` and `units_map`
451                are given, `units_map` will be preferred.
452        """
453
454        result = self.requests_post(
455            AllSpice.CREATE_TEAM % org.username,
456            data={
457                "name": name,
458                "description": description,
459                "permission": permission,
460                "can_create_org_repo": can_create_org_repo,
461                "includes_all_repositories": includes_all_repositories,
462                "units": units,
463                "units_map": units_map,
464            },
465        )
466
467        if "id" in result:
468            self.logger.info("Successfully created Team %s" % result["name"])
469        else:
470            self.logger.error("Team not created... (gitea: %s)" % result["message"])
471            self.logger.error(result["message"])
472            raise Exception("Team not created... (gitea: %s)" % result["message"])
473        api_object = Team.parse_response(self, result)
474        setattr(
475            api_object, "_organization", org
476        )  # fixes strange behaviour of gitea not returning a valid organization here.
477        return api_object

Creates a Team.

Args: org (Organization): Organization the Team will be part of. name (str): The Name of the Team to be created. description (str): Optional, None, short description of the new Team. permission (str): Optional, 'read', What permissions the members units_map (dict): Optional, {}, a mapping of units to their permissions. If None or empty, the permission permission will be applied to all units. Note: When both units and units_map are given, units_map will be preferred.