
  1import json
  2import logging
  3import sys
  4from typing import Any, Dict, List, Mapping, Optional, Union
  6import requests
  7import urllib3
  8from frozendict import frozendict
 10from .apiobject import Organization, Repository, Team, User
 11from .exceptions import (
 12    AlreadyExistsException,
 13    ConflictException,
 14    NotFoundException,
 15    NotYetGeneratedException,
 17from .ratelimiter import RateLimitedSession
 20class AllSpice:
 21    """Object to establish a session with AllSpice Hub."""
 23    ADMIN_CREATE_USER = """/admin/users"""
 24    GET_USERS_ADMIN = """/admin/users"""
 25    ADMIN_REPO_CREATE = """/admin/users/%s/repos"""  # <ownername>
 26    ALLSPICE_HUB_VERSION = """/version"""
 27    GET_USER = """/user"""
 28    GET_REPOSITORY = """/repos/{owner}/{name}"""
 29    CREATE_ORG = """/admin/users/%s/orgs"""  # <username>
 30    CREATE_TEAM = """/orgs/%s/teams"""  # <orgname>
 32    def __init__(
 33        self,
 34        allspice_hub_url="",
 35        token_text=None,
 36        auth=None,
 37        verify=True,
 38        log_level="INFO",
 39        ratelimiting=(100, 60),
 40    ):
 41        """Initializing an instance of the AllSpice Hub Client
 43        Args:
 44            allspice_hub_url (str): The URL for the AllSpice Hub instance.
 45                Defaults to ``.
 47            token_text (str, None): The access token, by default None.
 49            auth (tuple, None): The user credentials
 50                `(username, password)`, by default None.
 52            verify (bool): If True, allow insecure server connections
 53                when using SSL.
 55            log_level (str): The log level, by default `INFO`.
 57            ratelimiting (tuple[int, int], None): `(max_calls, period)`,
 58                If None, no rate limiting is applied. By default, 100 calls
 59                per minute are allowed.
 60        """
 62        self.logger = logging.getLogger(__name__)
 63        handler = logging.StreamHandler(sys.stderr)
 64        handler.setFormatter(
 65            logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
 66        )
 67        self.logger.addHandler(handler)
 68        self.logger.setLevel(log_level)
 69        self.headers = {
 70            "Content-type": "application/json",
 71        }
 72        self.url = allspice_hub_url
 74        if ratelimiting is None:
 75            self.requests = requests.Session()
 76        else:
 77            (max_calls, period) = ratelimiting
 78            self.requests = RateLimitedSession(max_calls=max_calls, period=period)
 80        # Manage authentification
 81        if not token_text and not auth:
 82            raise ValueError("Please provide auth or token_text, but not both")
 83        if token_text:
 84            self.headers["Authorization"] = "token " + token_text
 85        if auth:
 86            self.logger.warning(
 87                "Using basic auth is not recommended. Prefer using a token instead."
 88            )
 89            self.requests.auth = auth
 91        # Manage SSL certification verification
 92        self.requests.verify = verify
 93        if not verify:
 94            urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
 96    def __get_url(self, endpoint):
 97        url = self.url + "/api/v1" + endpoint
 98        self.logger.debug("Url: %s" % url)
 99        return url
101    def __get(self, endpoint: str, params: Mapping = frozendict()) -> requests.Response:
102        request = self.requests.get(self.__get_url(endpoint), headers=self.headers, params=params)
103        if request.status_code not in [200, 201]:
104            message = f"Received status code: {request.status_code} ({request.url})"
105            if request.status_code in [404]:
106                raise NotFoundException(message)
107            if request.status_code in [403]:
108                raise Exception(
109                    f"Unauthorized: {request.url} - Check your permissions and try again! ({message})"
110                )
111            if request.status_code in [409]:
112                raise ConflictException(message)
113            if request.status_code in [503]:
114                raise NotYetGeneratedException(message)
115            raise Exception(message)
116        return request
118    @staticmethod
119    def parse_result(result) -> Dict:
120        """Parses the result-JSON to a dict."""
121        if result.text and len(result.text) > 3:
122            return json.loads(result.text)
123        return {}
125    def requests_get(self, endpoint: str, params: Mapping = frozendict(), sudo=None):
126        combined_params = {}
127        combined_params.update(params)
128        if sudo:
129            combined_params["sudo"] = sudo.username
130        return self.parse_result(self.__get(endpoint, combined_params))
132    def requests_get_raw(self, endpoint: str, params=frozendict(), sudo=None) -> bytes:
133        combined_params = {}
134        combined_params.update(params)
135        if sudo:
136            combined_params["sudo"] = sudo.username
137        return self.__get(endpoint, combined_params).content
139    def requests_get_paginated(
140        self,
141        endpoint: str,
142        params=frozendict(),
143        sudo=None,
144        page_key: str = "page",
145        first_page: int = 1,
146    ):
147        page = first_page
148        combined_params = {}
149        combined_params.update(params)
150        aggregated_result = []
151        while True:
152            combined_params[page_key] = page
153            result = self.requests_get(endpoint, combined_params, sudo)
155            if not result:
156                return aggregated_result
158            if isinstance(result, dict):
159                if "data" in result:
160                    data = result["data"]
161                    if len(data) == 0:
162                        return aggregated_result
163                    aggregated_result.extend(data)
164                elif "tree" in result:
165                    data = result["tree"]
166                    if data is None or len(data) == 0:
167                        return aggregated_result
168                    aggregated_result.extend(data)
169                else:
170                    raise NotImplementedError(
171                        "requests_get_paginated does not know how to handle responses of this type."
172                    )
173            else:
174                aggregated_result.extend(result)
176            page += 1
178    def requests_put(self, endpoint: str, data: Optional[dict] = None):
179        if not data:
180            data = {}
181        request = self.requests.put(
182            self.__get_url(endpoint), headers=self.headers, data=json.dumps(data)
183        )
184        if request.status_code not in [200, 204]:
185            message = f"Received status code: {request.status_code} ({request.url}) {request.text}"
186            self.logger.error(message)
187            raise Exception(message)
189    def requests_delete(self, endpoint: str, data: Optional[dict] = None):
190        request = self.requests.delete(
191            self.__get_url(endpoint), headers=self.headers, data=json.dumps(data)
192        )
193        if request.status_code not in [200, 204]:
194            message = f"Received status code: {request.status_code} ({request.url})"
195            self.logger.error(message)
196            raise Exception(message)
198    def requests_post(
199        self,
200        endpoint: str,
201        data: Optional[dict] = None,
202        params: Optional[dict] = None,
203        files: Optional[dict] = None,
204    ):
205        """
206        Make a POST call to the endpoint.
208        :param endpoint: The path to the endpoint
209        :param data: A dictionary for JSON data
210        :param params: A dictionary of query params
211        :param files: A dictionary of files, see Using both files and data
212                      can lead to unexpected results!
213        :return: The JSON response parsed as a dict
214        """
216        # This should ideally be a TypedDict of the type of arguments taken by
217        # ``.
218        args: dict[str, Any] = {
219            "headers": self.headers.copy(),
220        }
221        if data is not None:
222            args["data"] = json.dumps(data)
223        if params is not None:
224            args["params"] = params
225        if files is not None:
226            args["headers"].pop("Content-type")
227            args["files"] = files
229        request =, **args)
231        if request.status_code not in [200, 201, 202]:
232            if "already exists" in request.text or "e-mail already in use" in request.text:
233                self.logger.warning(request.text)
234                raise AlreadyExistsException()
235            self.logger.error(f"Received status code: {request.status_code} ({request.url})")
236            self.logger.error(f"With info: {data} ({self.headers})")
237            self.logger.error(f"Answer: {request.text}")
238            raise Exception(
239                f"Received status code: {request.status_code} ({request.url}), {request.text}"
240            )
241        return self.parse_result(request)
243    def requests_patch(self, endpoint: str, data: dict):
244        request = self.requests.patch(
245            self.__get_url(endpoint), headers=self.headers, data=json.dumps(data)
246        )
247        if request.status_code not in [200, 201]:
248            error_message = f"Received status code: {request.status_code} ({request.url}) {data}"
249            self.logger.error(error_message)
250            raise Exception(error_message)
251        return self.parse_result(request)
253    def get_orgs_public_members_all(self, orgname):
254        path = "/orgs/" + orgname + "/public_members"
255        return self.requests_get(path)
257    def get_orgs(self):
258        path = "/admin/orgs"
259        results = self.requests_get(path)
260        return [Organization.parse_response(self, result) for result in results]
262    def get_user(self):
263        result = self.requests_get(AllSpice.GET_USER)
264        return User.parse_response(self, result)
266    def get_version(self) -> str:
267        result = self.requests_get(AllSpice.ALLSPICE_HUB_VERSION)
268        return result["version"]
270    def get_users(self) -> List[User]:
271        results = self.requests_get(AllSpice.GET_USERS_ADMIN)
272        return [User.parse_response(self, result) for result in results]
274    def get_user_by_email(self, email: str) -> Optional[User]:
275        users = self.get_users()
276        for user in users:
277            if == email or email in user.emails:
278                return user
279        return None
281    def get_user_by_name(self, username: str) -> Optional[User]:
282        users = self.get_users()
283        for user in users:
284            if user.username == username:
285                return user
286        return None
288    def get_repository(self, owner: str, name: str) -> Repository:
289        path = self.GET_REPOSITORY.format(owner=owner, name=name)
290        result = self.requests_get(path)
291        return Repository.parse_response(self, result)
293    def create_user(
294        self,
295        user_name: str,
296        email: str,
297        password: str,
298        full_name: Optional[str] = None,
299        login_name: Optional[str] = None,
300        change_pw=True,
301        send_notify=True,
302        source_id=0,
303    ):
304        """Create User.
305        Throws:
306            AlreadyExistsException, if the User exists already
307            Exception, if something else went wrong.
308        """
309        if not login_name:
310            login_name = user_name
311        if not full_name:
312            full_name = user_name
313        request_data = {
314            "source_id": source_id,
315            "login_name": login_name,
316            "full_name": full_name,
317            "username": user_name,
318            "email": email,
319            "password": password,
320            "send_notify": send_notify,
321            "must_change_password": change_pw,
322        }
324        self.logger.debug("Gitea post payload: %s", request_data)
325        result = self.requests_post(AllSpice.ADMIN_CREATE_USER, data=request_data)
326        if "id" in result:
328                "Successfully created User %s <%s> (id %s)",
329                result["login"],
330                result["email"],
331                result["id"],
332            )
333            self.logger.debug("Gitea response: %s", result)
334        else:
335            self.logger.error(result["message"])
336            raise Exception("User not created... (gitea: %s)" % result["message"])
337        user = User.parse_response(self, result)
338        return user
340    def create_repo(
341        self,
342        repoOwner: Union[User, Organization],
343        repoName: str,
344        description: str = "",
345        private: bool = False,
346        autoInit=True,
347        gitignores: Optional[str] = None,
348        license: Optional[str] = None,
349        readme: str = "Default",
350        issue_labels: Optional[str] = None,
351        default_branch="master",
352    ):
353        """Create a Repository as the administrator
355        Throws:
356            AlreadyExistsException: If the Repository exists already.
357            Exception: If something else went wrong.
359        Note:
360            Non-admin users can not use this method. Please use instead
361            `allspice.User.create_repo` or `allspice.Organization.create_repo`.
362        """
363        # although this only says user in the api, this also works for
364        # organizations
365        assert isinstance(repoOwner, User) or isinstance(repoOwner, Organization)
366        result = self.requests_post(
367            AllSpice.ADMIN_REPO_CREATE % repoOwner.username,
368            data={
369                "name": repoName,
370                "description": description,
371                "private": private,
372                "auto_init": autoInit,
373                "gitignores": gitignores,
374                "license": license,
375                "issue_labels": issue_labels,
376                "readme": readme,
377                "default_branch": default_branch,
378            },
379        )
380        if "id" in result:
381  "Successfully created Repository %s " % result["name"])
382        else:
383            self.logger.error(result["message"])
384            raise Exception("Repository not created... (gitea: %s)" % result["message"])
385        return Repository.parse_response(self, result)
387    def create_org(
388        self,
389        owner: User,
390        orgName: str,
391        description: str,
392        location="",
393        website="",
394        full_name="",
395    ):
396        assert isinstance(owner, User)
397        result = self.requests_post(
398            AllSpice.CREATE_ORG % owner.username,
399            data={
400                "username": orgName,
401                "description": description,
402                "location": location,
403                "website": website,
404                "full_name": full_name,
405            },
406        )
407        if "id" in result:
408  "Successfully created Organization %s" % result["username"])
409        else:
410            self.logger.error("Organization not created... (gitea: %s)" % result["message"])
411            self.logger.error(result["message"])
412            raise Exception("Organization not created... (gitea: %s)" % result["message"])
413        return Organization.parse_response(self, result)
415    def create_team(
416        self,
417        org: Organization,
418        name: str,
419        description: str = "",
420        permission: str = "read",
421        can_create_org_repo: bool = False,
422        includes_all_repositories: bool = False,
423        units=(
424            "repo.code",
425            "repo.issues",
426            "repo.ext_issues",
427            "",
428            "repo.pulls",
429            "repo.releases",
430            "repo.ext_wiki",
431        ),
432        units_map={},
433    ):
434        """Creates a Team.
436        Args:
437            org (Organization): Organization the Team will be part of.
438            name (str): The Name of the Team to be created.
439            description (str): Optional, None, short description of the new Team.
440            permission (str): Optional, 'read', What permissions the members
441            units_map (dict): Optional, {}, a mapping of units to their
442                permissions. If None or empty, the `permission` permission will
443                be applied to all units. Note: When both `units` and `units_map`
444                are given, `units_map` will be preferred.
445        """
447        result = self.requests_post(
448            AllSpice.CREATE_TEAM % org.username,
449            data={
450                "name": name,
451                "description": description,
452                "permission": permission,
453                "can_create_org_repo": can_create_org_repo,
454                "includes_all_repositories": includes_all_repositories,
455                "units": units,
456                "units_map": units_map,
457            },
458        )
460        if "id" in result:
461  "Successfully created Team %s" % result["name"])
462        else:
463            self.logger.error("Team not created... (gitea: %s)" % result["message"])
464            self.logger.error(result["message"])
465            raise Exception("Team not created... (gitea: %s)" % result["message"])
466        api_object = Team.parse_response(self, result)
467        setattr(
468            api_object, "_organization", org
469        )  # fixes strange behaviour of gitea not returning a valid organization here.
470        return api_object
class AllSpice:
 21class AllSpice:
 22    """Object to establish a session with AllSpice Hub."""
 24    ADMIN_CREATE_USER = """/admin/users"""
 25    GET_USERS_ADMIN = """/admin/users"""
 26    ADMIN_REPO_CREATE = """/admin/users/%s/repos"""  # <ownername>
 27    ALLSPICE_HUB_VERSION = """/version"""
 28    GET_USER = """/user"""
 29    GET_REPOSITORY = """/repos/{owner}/{name}"""
 30    CREATE_ORG = """/admin/users/%s/orgs"""  # <username>
 31    CREATE_TEAM = """/orgs/%s/teams"""  # <orgname>
 33    def __init__(
 34        self,
 35        allspice_hub_url="",
 36        token_text=None,
 37        auth=None,
 38        verify=True,
 39        log_level="INFO",
 40        ratelimiting=(100, 60),
 41    ):
 42        """Initializing an instance of the AllSpice Hub Client
 44        Args:
 45            allspice_hub_url (str): The URL for the AllSpice Hub instance.
 46                Defaults to ``.
 48            token_text (str, None): The access token, by default None.
 50            auth (tuple, None): The user credentials
 51                `(username, password)`, by default None.
 53            verify (bool): If True, allow insecure server connections
 54                when using SSL.
 56            log_level (str): The log level, by default `INFO`.
 58            ratelimiting (tuple[int, int], None): `(max_calls, period)`,
 59                If None, no rate limiting is applied. By default, 100 calls
 60                per minute are allowed.
 61        """
 63        self.logger = logging.getLogger(__name__)
 64        handler = logging.StreamHandler(sys.stderr)
 65        handler.setFormatter(
 66            logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
 67        )
 68        self.logger.addHandler(handler)
 69        self.logger.setLevel(log_level)
 70        self.headers = {
 71            "Content-type": "application/json",
 72        }
 73        self.url = allspice_hub_url
 75        if ratelimiting is None:
 76            self.requests = requests.Session()
 77        else:
 78            (max_calls, period) = ratelimiting
 79            self.requests = RateLimitedSession(max_calls=max_calls, period=period)
 81        # Manage authentification
 82        if not token_text and not auth:
 83            raise ValueError("Please provide auth or token_text, but not both")
 84        if token_text:
 85            self.headers["Authorization"] = "token " + token_text
 86        if auth:
 87            self.logger.warning(
 88                "Using basic auth is not recommended. Prefer using a token instead."
 89            )
 90            self.requests.auth = auth
 92        # Manage SSL certification verification
 93        self.requests.verify = verify
 94        if not verify:
 95            urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
 97    def __get_url(self, endpoint):
 98        url = self.url + "/api/v1" + endpoint
 99        self.logger.debug("Url: %s" % url)
100        return url
102    def __get(self, endpoint: str, params: Mapping = frozendict()) -> requests.Response:
103        request = self.requests.get(self.__get_url(endpoint), headers=self.headers, params=params)
104        if request.status_code not in [200, 201]:
105            message = f"Received status code: {request.status_code} ({request.url})"
106            if request.status_code in [404]:
107                raise NotFoundException(message)
108            if request.status_code in [403]:
109                raise Exception(
110                    f"Unauthorized: {request.url} - Check your permissions and try again! ({message})"
111                )
112            if request.status_code in [409]:
113                raise ConflictException(message)
114            if request.status_code in [503]:
115                raise NotYetGeneratedException(message)
116            raise Exception(message)
117        return request
119    @staticmethod
120    def parse_result(result) -> Dict:
121        """Parses the result-JSON to a dict."""
122        if result.text and len(result.text) > 3:
123            return json.loads(result.text)
124        return {}
126    def requests_get(self, endpoint: str, params: Mapping = frozendict(), sudo=None):
127        combined_params = {}
128        combined_params.update(params)
129        if sudo:
130            combined_params["sudo"] = sudo.username
131        return self.parse_result(self.__get(endpoint, combined_params))
133    def requests_get_raw(self, endpoint: str, params=frozendict(), sudo=None) -> bytes:
134        combined_params = {}
135        combined_params.update(params)
136        if sudo:
137            combined_params["sudo"] = sudo.username
138        return self.__get(endpoint, combined_params).content
140    def requests_get_paginated(
141        self,
142        endpoint: str,
143        params=frozendict(),
144        sudo=None,
145        page_key: str = "page",
146        first_page: int = 1,
147    ):
148        page = first_page
149        combined_params = {}
150        combined_params.update(params)
151        aggregated_result = []
152        while True:
153            combined_params[page_key] = page
154            result = self.requests_get(endpoint, combined_params, sudo)
156            if not result:
157                return aggregated_result
159            if isinstance(result, dict):
160                if "data" in result:
161                    data = result["data"]
162                    if len(data) == 0:
163                        return aggregated_result
164                    aggregated_result.extend(data)
165                elif "tree" in result:
166                    data = result["tree"]
167                    if data is None or len(data) == 0:
168                        return aggregated_result
169                    aggregated_result.extend(data)
170                else:
171                    raise NotImplementedError(
172                        "requests_get_paginated does not know how to handle responses of this type."
173                    )
174            else:
175                aggregated_result.extend(result)
177            page += 1
179    def requests_put(self, endpoint: str, data: Optional[dict] = None):
180        if not data:
181            data = {}
182        request = self.requests.put(
183            self.__get_url(endpoint), headers=self.headers, data=json.dumps(data)
184        )
185        if request.status_code not in [200, 204]:
186            message = f"Received status code: {request.status_code} ({request.url}) {request.text}"
187            self.logger.error(message)
188            raise Exception(message)
190    def requests_delete(self, endpoint: str, data: Optional[dict] = None):
191        request = self.requests.delete(
192            self.__get_url(endpoint), headers=self.headers, data=json.dumps(data)
193        )
194        if request.status_code not in [200, 204]:
195            message = f"Received status code: {request.status_code} ({request.url})"
196            self.logger.error(message)
197            raise Exception(message)
199    def requests_post(
200        self,
201        endpoint: str,
202        data: Optional[dict] = None,
203        params: Optional[dict] = None,
204        files: Optional[dict] = None,
205    ):
206        """
207        Make a POST call to the endpoint.
209        :param endpoint: The path to the endpoint
210        :param data: A dictionary for JSON data
211        :param params: A dictionary of query params
212        :param files: A dictionary of files, see Using both files and data
213                      can lead to unexpected results!
214        :return: The JSON response parsed as a dict
215        """
217        # This should ideally be a TypedDict of the type of arguments taken by
218        # ``.
219        args: dict[str, Any] = {
220            "headers": self.headers.copy(),
221        }
222        if data is not None:
223            args["data"] = json.dumps(data)
224        if params is not None:
225            args["params"] = params
226        if files is not None:
227            args["headers"].pop("Content-type")
228            args["files"] = files
230        request =, **args)
232        if request.status_code not in [200, 201, 202]:
233            if "already exists" in request.text or "e-mail already in use" in request.text:
234                self.logger.warning(request.text)
235                raise AlreadyExistsException()
236            self.logger.error(f"Received status code: {request.status_code} ({request.url})")
237            self.logger.error(f"With info: {data} ({self.headers})")
238            self.logger.error(f"Answer: {request.text}")
239            raise Exception(
240                f"Received status code: {request.status_code} ({request.url}), {request.text}"
241            )
242        return self.parse_result(request)
244    def requests_patch(self, endpoint: str, data: dict):
245        request = self.requests.patch(
246            self.__get_url(endpoint), headers=self.headers, data=json.dumps(data)
247        )
248        if request.status_code not in [200, 201]:
249            error_message = f"Received status code: {request.status_code} ({request.url}) {data}"
250            self.logger.error(error_message)
251            raise Exception(error_message)
252        return self.parse_result(request)
254    def get_orgs_public_members_all(self, orgname):
255        path = "/orgs/" + orgname + "/public_members"
256        return self.requests_get(path)
258    def get_orgs(self):
259        path = "/admin/orgs"
260        results = self.requests_get(path)
261        return [Organization.parse_response(self, result) for result in results]
263    def get_user(self):
264        result = self.requests_get(AllSpice.GET_USER)
265        return User.parse_response(self, result)
267    def get_version(self) -> str:
268        result = self.requests_get(AllSpice.ALLSPICE_HUB_VERSION)
269        return result["version"]
271    def get_users(self) -> List[User]:
272        results = self.requests_get(AllSpice.GET_USERS_ADMIN)
273        return [User.parse_response(self, result) for result in results]
275    def get_user_by_email(self, email: str) -> Optional[User]:
276        users = self.get_users()
277        for user in users:
278            if == email or email in user.emails:
279                return user
280        return None
282    def get_user_by_name(self, username: str) -> Optional[User]:
283        users = self.get_users()
284        for user in users:
285            if user.username == username:
286                return user
287        return None
289    def get_repository(self, owner: str, name: str) -> Repository:
290        path = self.GET_REPOSITORY.format(owner=owner, name=name)
291        result = self.requests_get(path)
292        return Repository.parse_response(self, result)
294    def create_user(
295        self,
296        user_name: str,
297        email: str,
298        password: str,
299        full_name: Optional[str] = None,
300        login_name: Optional[str] = None,
301        change_pw=True,
302        send_notify=True,
303        source_id=0,
304    ):
305        """Create User.
306        Throws:
307            AlreadyExistsException, if the User exists already
308            Exception, if something else went wrong.
309        """
310        if not login_name:
311            login_name = user_name
312        if not full_name:
313            full_name = user_name
314        request_data = {
315            "source_id": source_id,
316            "login_name": login_name,
317            "full_name": full_name,
318            "username": user_name,
319            "email": email,
320            "password": password,
321            "send_notify": send_notify,
322            "must_change_password": change_pw,
323        }
325        self.logger.debug("Gitea post payload: %s", request_data)
326        result = self.requests_post(AllSpice.ADMIN_CREATE_USER, data=request_data)
327        if "id" in result:
329                "Successfully created User %s <%s> (id %s)",
330                result["login"],
331                result["email"],
332                result["id"],
333            )
334            self.logger.debug("Gitea response: %s", result)
335        else:
336            self.logger.error(result["message"])
337            raise Exception("User not created... (gitea: %s)" % result["message"])
338        user = User.parse_response(self, result)
339        return user
341    def create_repo(
342        self,
343        repoOwner: Union[User, Organization],
344        repoName: str,
345        description: str = "",
346        private: bool = False,
347        autoInit=True,
348        gitignores: Optional[str] = None,
349        license: Optional[str] = None,
350        readme: str = "Default",
351        issue_labels: Optional[str] = None,
352        default_branch="master",
353    ):
354        """Create a Repository as the administrator
356        Throws:
357            AlreadyExistsException: If the Repository exists already.
358            Exception: If something else went wrong.
360        Note:
361            Non-admin users can not use this method. Please use instead
362            `allspice.User.create_repo` or `allspice.Organization.create_repo`.
363        """
364        # although this only says user in the api, this also works for
365        # organizations
366        assert isinstance(repoOwner, User) or isinstance(repoOwner, Organization)
367        result = self.requests_post(
368            AllSpice.ADMIN_REPO_CREATE % repoOwner.username,
369            data={
370                "name": repoName,
371                "description": description,
372                "private": private,
373                "auto_init": autoInit,
374                "gitignores": gitignores,
375                "license": license,
376                "issue_labels": issue_labels,
377                "readme": readme,
378                "default_branch": default_branch,
379            },
380        )
381        if "id" in result:
382  "Successfully created Repository %s " % result["name"])
383        else:
384            self.logger.error(result["message"])
385            raise Exception("Repository not created... (gitea: %s)" % result["message"])
386        return Repository.parse_response(self, result)
388    def create_org(
389        self,
390        owner: User,
391        orgName: str,
392        description: str,
393        location="",
394        website="",
395        full_name="",
396    ):
397        assert isinstance(owner, User)
398        result = self.requests_post(
399            AllSpice.CREATE_ORG % owner.username,
400            data={
401                "username": orgName,
402                "description": description,
403                "location": location,
404                "website": website,
405                "full_name": full_name,
406            },
407        )
408        if "id" in result:
409  "Successfully created Organization %s" % result["username"])
410        else:
411            self.logger.error("Organization not created... (gitea: %s)" % result["message"])
412            self.logger.error(result["message"])
413            raise Exception("Organization not created... (gitea: %s)" % result["message"])
414        return Organization.parse_response(self, result)
416    def create_team(
417        self,
418        org: Organization,
419        name: str,
420        description: str = "",
421        permission: str = "read",
422        can_create_org_repo: bool = False,
423        includes_all_repositories: bool = False,
424        units=(
425            "repo.code",
426            "repo.issues",
427            "repo.ext_issues",
428            "",
429            "repo.pulls",
430            "repo.releases",
431            "repo.ext_wiki",
432        ),
433        units_map={},
434    ):
435        """Creates a Team.
437        Args:
438            org (Organization): Organization the Team will be part of.
439            name (str): The Name of the Team to be created.
440            description (str): Optional, None, short description of the new Team.
441            permission (str): Optional, 'read', What permissions the members
442            units_map (dict): Optional, {}, a mapping of units to their
443                permissions. If None or empty, the `permission` permission will
444                be applied to all units. Note: When both `units` and `units_map`
445                are given, `units_map` will be preferred.
446        """
448        result = self.requests_post(
449            AllSpice.CREATE_TEAM % org.username,
450            data={
451                "name": name,
452                "description": description,
453                "permission": permission,
454                "can_create_org_repo": can_create_org_repo,
455                "includes_all_repositories": includes_all_repositories,
456                "units": units,
457                "units_map": units_map,
458            },
459        )
461        if "id" in result:
462  "Successfully created Team %s" % result["name"])
463        else:
464            self.logger.error("Team not created... (gitea: %s)" % result["message"])
465            self.logger.error(result["message"])
466            raise Exception("Team not created... (gitea: %s)" % result["message"])
467        api_object = Team.parse_response(self, result)
468        setattr(
469            api_object, "_organization", org
470        )  # fixes strange behaviour of gitea not returning a valid organization here.
471        return api_object

Object to establish a session with AllSpice Hub.

AllSpice( allspice_hub_url='', token_text=None, auth=None, verify=True, log_level='INFO', ratelimiting=(100, 60))
33    def __init__(
34        self,
35        allspice_hub_url="",
36        token_text=None,
37        auth=None,
38        verify=True,
39        log_level="INFO",
40        ratelimiting=(100, 60),
41    ):
42        """Initializing an instance of the AllSpice Hub Client
44        Args:
45            allspice_hub_url (str): The URL for the AllSpice Hub instance.
46                Defaults to ``.
48            token_text (str, None): The access token, by default None.
50            auth (tuple, None): The user credentials
51                `(username, password)`, by default None.
53            verify (bool): If True, allow insecure server connections
54                when using SSL.
56            log_level (str): The log level, by default `INFO`.
58            ratelimiting (tuple[int, int], None): `(max_calls, period)`,
59                If None, no rate limiting is applied. By default, 100 calls
60                per minute are allowed.
61        """
63        self.logger = logging.getLogger(__name__)
64        handler = logging.StreamHandler(sys.stderr)
65        handler.setFormatter(
66            logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
67        )
68        self.logger.addHandler(handler)
69        self.logger.setLevel(log_level)
70        self.headers = {
71            "Content-type": "application/json",
72        }
73        self.url = allspice_hub_url
75        if ratelimiting is None:
76            self.requests = requests.Session()
77        else:
78            (max_calls, period) = ratelimiting
79            self.requests = RateLimitedSession(max_calls=max_calls, period=period)
81        # Manage authentification
82        if not token_text and not auth:
83            raise ValueError("Please provide auth or token_text, but not both")
84        if token_text:
85            self.headers["Authorization"] = "token " + token_text
86        if auth:
87            self.logger.warning(
88                "Using basic auth is not recommended. Prefer using a token instead."
89            )
90            self.requests.auth = auth
92        # Manage SSL certification verification
93        self.requests.verify = verify
94        if not verify:
95            urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

Initializing an instance of the AllSpice Hub Client

Args: allspice_hub_url (str): The URL for the AllSpice Hub instance. Defaults to

token_text (str, None): The access token, by default None.

auth (tuple, None): The user credentials
    `(username, password)`, by default None.

verify (bool): If True, allow insecure server connections
    when using SSL.

log_level (str): The log level, by default `INFO`.

ratelimiting (tuple[int, int], None): `(max_calls, period)`,
    If None, no rate limiting is applied. By default, 100 calls
    per minute are allowed.
ADMIN_CREATE_USER = '/admin/users'
GET_USERS_ADMIN = '/admin/users'
ADMIN_REPO_CREATE = '/admin/users/%s/repos'
GET_USER = '/user'
GET_REPOSITORY = '/repos/{owner}/{name}'
CREATE_ORG = '/admin/users/%s/orgs'
CREATE_TEAM = '/orgs/%s/teams'
def parse_result(result) -> Dict:
119    @staticmethod
120    def parse_result(result) -> Dict:
121        """Parses the result-JSON to a dict."""
122        if result.text and len(result.text) > 3:
123            return json.loads(result.text)
124        return {}

Parses the result-JSON to a dict.

def requests_get( self, endpoint: str, params: Mapping = frozendict.frozendict({}), sudo=None):
126    def requests_get(self, endpoint: str, params: Mapping = frozendict(), sudo=None):
127        combined_params = {}
128        combined_params.update(params)
129        if sudo:
130            combined_params["sudo"] = sudo.username
131        return self.parse_result(self.__get(endpoint, combined_params))
def requests_get_raw( self, endpoint: str, params=frozendict.frozendict({}), sudo=None) -> bytes:
133    def requests_get_raw(self, endpoint: str, params=frozendict(), sudo=None) -> bytes:
134        combined_params = {}
135        combined_params.update(params)
136        if sudo:
137            combined_params["sudo"] = sudo.username
138        return self.__get(endpoint, combined_params).content
def requests_get_paginated( self, endpoint: str, params=frozendict.frozendict({}), sudo=None, page_key: str = 'page', first_page: int = 1):
140    def requests_get_paginated(
141        self,
142        endpoint: str,
143        params=frozendict(),
144        sudo=None,
145        page_key: str = "page",
146        first_page: int = 1,
147    ):
148        page = first_page
149        combined_params = {}
150        combined_params.update(params)
151        aggregated_result = []
152        while True:
153            combined_params[page_key] = page
154            result = self.requests_get(endpoint, combined_params, sudo)
156            if not result:
157                return aggregated_result
159            if isinstance(result, dict):
160                if "data" in result:
161                    data = result["data"]
162                    if len(data) == 0:
163                        return aggregated_result
164                    aggregated_result.extend(data)
165                elif "tree" in result:
166                    data = result["tree"]
167                    if data is None or len(data) == 0:
168                        return aggregated_result
169                    aggregated_result.extend(data)
170                else:
171                    raise NotImplementedError(
172                        "requests_get_paginated does not know how to handle responses of this type."
173                    )
174            else:
175                aggregated_result.extend(result)
177            page += 1
def requests_put(self, endpoint: str, data: Optional[dict] = None):
179    def requests_put(self, endpoint: str, data: Optional[dict] = None):
180        if not data:
181            data = {}
182        request = self.requests.put(
183            self.__get_url(endpoint), headers=self.headers, data=json.dumps(data)
184        )
185        if request.status_code not in [200, 204]:
186            message = f"Received status code: {request.status_code} ({request.url}) {request.text}"
187            self.logger.error(message)
188            raise Exception(message)
def requests_delete(self, endpoint: str, data: Optional[dict] = None):
190    def requests_delete(self, endpoint: str, data: Optional[dict] = None):
191        request = self.requests.delete(
192            self.__get_url(endpoint), headers=self.headers, data=json.dumps(data)
193        )
194        if request.status_code not in [200, 204]:
195            message = f"Received status code: {request.status_code} ({request.url})"
196            self.logger.error(message)
197            raise Exception(message)
def requests_post( self, endpoint: str, data: Optional[dict] = None, params: Optional[dict] = None, files: Optional[dict] = None):
199    def requests_post(
200        self,
201        endpoint: str,
202        data: Optional[dict] = None,
203        params: Optional[dict] = None,
204        files: Optional[dict] = None,
205    ):
206        """
207        Make a POST call to the endpoint.
209        :param endpoint: The path to the endpoint
210        :param data: A dictionary for JSON data
211        :param params: A dictionary of query params
212        :param files: A dictionary of files, see Using both files and data
213                      can lead to unexpected results!
214        :return: The JSON response parsed as a dict
215        """
217        # This should ideally be a TypedDict of the type of arguments taken by
218        # ``.
219        args: dict[str, Any] = {
220            "headers": self.headers.copy(),
221        }
222        if data is not None:
223            args["data"] = json.dumps(data)
224        if params is not None:
225            args["params"] = params
226        if files is not None:
227            args["headers"].pop("Content-type")
228            args["files"] = files
230        request =, **args)
232        if request.status_code not in [200, 201, 202]:
233            if "already exists" in request.text or "e-mail already in use" in request.text:
234                self.logger.warning(request.text)
235                raise AlreadyExistsException()
236            self.logger.error(f"Received status code: {request.status_code} ({request.url})")
237            self.logger.error(f"With info: {data} ({self.headers})")
238            self.logger.error(f"Answer: {request.text}")
239            raise Exception(
240                f"Received status code: {request.status_code} ({request.url}), {request.text}"
241            )
242        return self.parse_result(request)

Make a POST call to the endpoint.

  • endpoint: The path to the endpoint
  • data: A dictionary for JSON data
  • params: A dictionary of query params
  • files: A dictionary of files, see Using both files and data can lead to unexpected results!

The JSON response parsed as a dict

def requests_patch(self, endpoint: str, data: dict):
244    def requests_patch(self, endpoint: str, data: dict):
245        request = self.requests.patch(
246            self.__get_url(endpoint), headers=self.headers, data=json.dumps(data)
247        )
248        if request.status_code not in [200, 201]:
249            error_message = f"Received status code: {request.status_code} ({request.url}) {data}"
250            self.logger.error(error_message)
251            raise Exception(error_message)
252        return self.parse_result(request)
def get_orgs_public_members_all(self, orgname):
254    def get_orgs_public_members_all(self, orgname):
255        path = "/orgs/" + orgname + "/public_members"
256        return self.requests_get(path)
def get_orgs(self):
258    def get_orgs(self):
259        path = "/admin/orgs"
260        results = self.requests_get(path)
261        return [Organization.parse_response(self, result) for result in results]
def get_user(self):
263    def get_user(self):
264        result = self.requests_get(AllSpice.GET_USER)
265        return User.parse_response(self, result)
def get_version(self) -> str:
267    def get_version(self) -> str:
268        result = self.requests_get(AllSpice.ALLSPICE_HUB_VERSION)
269        return result["version"]
def get_users(self) -> List[allspice.User]:
271    def get_users(self) -> List[User]:
272        results = self.requests_get(AllSpice.GET_USERS_ADMIN)
273        return [User.parse_response(self, result) for result in results]
def get_user_by_email(self, email: str) -> Optional[allspice.User]:
275    def get_user_by_email(self, email: str) -> Optional[User]:
276        users = self.get_users()
277        for user in users:
278            if == email or email in user.emails:
279                return user
280        return None
def get_user_by_name(self, username: str) -> Optional[allspice.User]:
282    def get_user_by_name(self, username: str) -> Optional[User]:
283        users = self.get_users()
284        for user in users:
285            if user.username == username:
286                return user
287        return None
def get_repository(self, owner: str, name: str) -> allspice.Repository:
289    def get_repository(self, owner: str, name: str) -> Repository:
290        path = self.GET_REPOSITORY.format(owner=owner, name=name)
291        result = self.requests_get(path)
292        return Repository.parse_response(self, result)
def create_user( self, user_name: str, email: str, password: str, full_name: Optional[str] = None, login_name: Optional[str] = None, change_pw=True, send_notify=True, source_id=0):
294    def create_user(
295        self,
296        user_name: str,
297        email: str,
298        password: str,
299        full_name: Optional[str] = None,
300        login_name: Optional[str] = None,
301        change_pw=True,
302        send_notify=True,
303        source_id=0,
304    ):
305        """Create User.
306        Throws:
307            AlreadyExistsException, if the User exists already
308            Exception, if something else went wrong.
309        """
310        if not login_name:
311            login_name = user_name
312        if not full_name:
313            full_name = user_name
314        request_data = {
315            "source_id": source_id,
316            "login_name": login_name,
317            "full_name": full_name,
318            "username": user_name,
319            "email": email,
320            "password": password,
321            "send_notify": send_notify,
322            "must_change_password": change_pw,
323        }
325        self.logger.debug("Gitea post payload: %s", request_data)
326        result = self.requests_post(AllSpice.ADMIN_CREATE_USER, data=request_data)
327        if "id" in result:
329                "Successfully created User %s <%s> (id %s)",
330                result["login"],
331                result["email"],
332                result["id"],
333            )
334            self.logger.debug("Gitea response: %s", result)
335        else:
336            self.logger.error(result["message"])
337            raise Exception("User not created... (gitea: %s)" % result["message"])
338        user = User.parse_response(self, result)
339        return user

Create User. Throws: AlreadyExistsException, if the User exists already Exception, if something else went wrong.

def create_repo( self, repoOwner: Union[allspice.User, allspice.Organization], repoName: str, description: str = '', private: bool = False, autoInit=True, gitignores: Optional[str] = None, license: Optional[str] = None, readme: str = 'Default', issue_labels: Optional[str] = None, default_branch='master'):
341    def create_repo(
342        self,
343        repoOwner: Union[User, Organization],
344        repoName: str,
345        description: str = "",
346        private: bool = False,
347        autoInit=True,
348        gitignores: Optional[str] = None,
349        license: Optional[str] = None,
350        readme: str = "Default",
351        issue_labels: Optional[str] = None,
352        default_branch="master",
353    ):
354        """Create a Repository as the administrator
356        Throws:
357            AlreadyExistsException: If the Repository exists already.
358            Exception: If something else went wrong.
360        Note:
361            Non-admin users can not use this method. Please use instead
362            `allspice.User.create_repo` or `allspice.Organization.create_repo`.
363        """
364        # although this only says user in the api, this also works for
365        # organizations
366        assert isinstance(repoOwner, User) or isinstance(repoOwner, Organization)
367        result = self.requests_post(
368            AllSpice.ADMIN_REPO_CREATE % repoOwner.username,
369            data={
370                "name": repoName,
371                "description": description,
372                "private": private,
373                "auto_init": autoInit,
374                "gitignores": gitignores,
375                "license": license,
376                "issue_labels": issue_labels,
377                "readme": readme,
378                "default_branch": default_branch,
379            },
380        )
381        if "id" in result:
382  "Successfully created Repository %s " % result["name"])
383        else:
384            self.logger.error(result["message"])
385            raise Exception("Repository not created... (gitea: %s)" % result["message"])
386        return Repository.parse_response(self, result)

Create a Repository as the administrator

Throws: AlreadyExistsException: If the Repository exists already. Exception: If something else went wrong.

Note: Non-admin users can not use this method. Please use instead allspice.User.create_repo or allspice.Organization.create_repo.

def create_org( self, owner: allspice.User, orgName: str, description: str, location='', website='', full_name=''):
388    def create_org(
389        self,
390        owner: User,
391        orgName: str,
392        description: str,
393        location="",
394        website="",
395        full_name="",
396    ):
397        assert isinstance(owner, User)
398        result = self.requests_post(
399            AllSpice.CREATE_ORG % owner.username,
400            data={
401                "username": orgName,
402                "description": description,
403                "location": location,
404                "website": website,
405                "full_name": full_name,
406            },
407        )
408        if "id" in result:
409  "Successfully created Organization %s" % result["username"])
410        else:
411            self.logger.error("Organization not created... (gitea: %s)" % result["message"])
412            self.logger.error(result["message"])
413            raise Exception("Organization not created... (gitea: %s)" % result["message"])
414        return Organization.parse_response(self, result)
def create_team( self, org: allspice.Organization, name: str, description: str = '', permission: str = 'read', can_create_org_repo: bool = False, includes_all_repositories: bool = False, units=('repo.code', 'repo.issues', 'repo.ext_issues', '', 'repo.pulls', 'repo.releases', 'repo.ext_wiki'), units_map={}):
416    def create_team(
417        self,
418        org: Organization,
419        name: str,
420        description: str = "",
421        permission: str = "read",
422        can_create_org_repo: bool = False,
423        includes_all_repositories: bool = False,
424        units=(
425            "repo.code",
426            "repo.issues",
427            "repo.ext_issues",
428            "",
429            "repo.pulls",
430            "repo.releases",
431            "repo.ext_wiki",
432        ),
433        units_map={},
434    ):
435        """Creates a Team.
437        Args:
438            org (Organization): Organization the Team will be part of.
439            name (str): The Name of the Team to be created.
440            description (str): Optional, None, short description of the new Team.
441            permission (str): Optional, 'read', What permissions the members
442            units_map (dict): Optional, {}, a mapping of units to their
443                permissions. If None or empty, the `permission` permission will
444                be applied to all units. Note: When both `units` and `units_map`
445                are given, `units_map` will be preferred.
446        """
448        result = self.requests_post(
449            AllSpice.CREATE_TEAM % org.username,
450            data={
451                "name": name,
452                "description": description,
453                "permission": permission,
454                "can_create_org_repo": can_create_org_repo,
455                "includes_all_repositories": includes_all_repositories,
456                "units": units,
457                "units_map": units_map,
458            },
459        )
461        if "id" in result:
462  "Successfully created Team %s" % result["name"])
463        else:
464            self.logger.error("Team not created... (gitea: %s)" % result["message"])
465            self.logger.error(result["message"])
466            raise Exception("Team not created... (gitea: %s)" % result["message"])
467        api_object = Team.parse_response(self, result)
468        setattr(
469            api_object, "_organization", org
470        )  # fixes strange behaviour of gitea not returning a valid organization here.
471        return api_object

Creates a Team.

Args: org (Organization): Organization the Team will be part of. name (str): The Name of the Team to be created. description (str): Optional, None, short description of the new Team. permission (str): Optional, 'read', What permissions the members units_map (dict): Optional, {}, a mapping of units to their permissions. If None or empty, the permission permission will be applied to all units. Note: When both units and units_map are given, units_map will be preferred.