allspice.allspice

  1import json
  2import logging
  3import sys
  4from typing import Any, Dict, List, Mapping, Optional, Union
  5
  6import requests
  7import urllib3
  8from frozendict import frozendict
  9
 10from .apiobject import Organization, Repository, Team, User
 11from .exceptions import (
 12    AlreadyExistsException,
 13    ConflictException,
 14    NotFoundException,
 15    NotYetGeneratedException,
 16)
 17from .ratelimiter import RateLimitedSession
 18
 19
 20class AllSpice:
 21    """Object to establish a session with AllSpice Hub."""
 22
 23    ADMIN_CREATE_USER = """/admin/users"""
 24    GET_USERS_ADMIN = """/admin/users"""
 25    ADMIN_REPO_CREATE = """/admin/users/%s/repos"""  # <ownername>
 26    ALLSPICE_HUB_VERSION = """/version"""
 27    GET_USER = """/user"""
 28    GET_REPOSITORY = """/repos/{owner}/{name}"""
 29    CREATE_ORG = """/admin/users/%s/orgs"""  # <username>
 30    CREATE_TEAM = """/orgs/%s/teams"""  # <orgname>
 31
 32    def __init__(
 33        self,
 34        allspice_hub_url="https://hub.allspice.io",
 35        token_text=None,
 36        auth=None,
 37        verify=True,
 38        log_level="INFO",
 39        ratelimiting=(100, 60),
 40    ):
 41        """Initializing an instance of the AllSpice Hub Client
 42
 43        Args:
 44            allspice_hub_url (str): The URL for the AllSpice Hub instance.
 45                Defaults to `https://hub.allspice.io`.
 46
 47            token_text (str, None): The access token, by default None.
 48
 49            auth (tuple, None): The user credentials
 50                `(username, password)`, by default None.
 51
 52            verify (bool): If True, allow insecure server connections
 53                when using SSL.
 54
 55            log_level (str): The log level, by default `INFO`.
 56
 57            ratelimiting (tuple[int, int], None): `(max_calls, period)`,
 58                If None, no rate limiting is applied. By default, 100 calls
 59                per minute are allowed.
 60        """
 61
 62        self.logger = logging.getLogger(__name__)
 63        handler = logging.StreamHandler(sys.stderr)
 64        handler.setFormatter(
 65            logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
 66        )
 67        self.logger.addHandler(handler)
 68        self.logger.setLevel(log_level)
 69        self.headers = {
 70            "Content-type": "application/json",
 71        }
 72        self.url = allspice_hub_url
 73
 74        if ratelimiting is None:
 75            self.requests = requests.Session()
 76        else:
 77            (max_calls, period) = ratelimiting
 78            self.requests = RateLimitedSession(max_calls=max_calls, period=period)
 79
 80        # Manage authentification
 81        if not token_text and not auth:
 82            raise ValueError("Please provide auth or token_text, but not both")
 83        if token_text:
 84            self.headers["Authorization"] = "token " + token_text
 85        if auth:
 86            self.logger.warning(
 87                "Using basic auth is not recommended. Prefer using a token instead."
 88            )
 89            self.requests.auth = auth
 90
 91        # Manage SSL certification verification
 92        self.requests.verify = verify
 93        if not verify:
 94            urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
 95
 96    def __get_url(self, endpoint):
 97        url = self.url + "/api/v1" + endpoint
 98        self.logger.debug("Url: %s" % url)
 99        return url
100
101    def __get(self, endpoint: str, params: Mapping = frozendict()) -> requests.Response:
102        request = self.requests.get(self.__get_url(endpoint), headers=self.headers, params=params)
103        if request.status_code not in [200, 201]:
104            message = f"Received status code: {request.status_code} ({request.url})"
105            if request.status_code in [404]:
106                raise NotFoundException(message)
107            if request.status_code in [403]:
108                raise Exception(
109                    f"Unauthorized: {request.url} - Check your permissions and try again! ({message})"
110                )
111            if request.status_code in [409]:
112                raise ConflictException(message)
113            if request.status_code in [503]:
114                raise NotYetGeneratedException(message)
115            raise Exception(message)
116        return request
117
118    @staticmethod
119    def parse_result(result) -> Dict:
120        """Parses the result-JSON to a dict."""
121        if result.text and len(result.text) > 3:
122            return json.loads(result.text)
123        return {}
124
125    def requests_get(self, endpoint: str, params: Mapping = frozendict(), sudo=None):
126        combined_params = {}
127        combined_params.update(params)
128        if sudo:
129            combined_params["sudo"] = sudo.username
130        return self.parse_result(self.__get(endpoint, combined_params))
131
132    def requests_get_raw(self, endpoint: str, params=frozendict(), sudo=None) -> bytes:
133        combined_params = {}
134        combined_params.update(params)
135        if sudo:
136            combined_params["sudo"] = sudo.username
137        return self.__get(endpoint, combined_params).content
138
139    def requests_get_paginated(
140        self,
141        endpoint: str,
142        params=frozendict(),
143        sudo=None,
144        page_key: str = "page",
145        first_page: int = 1,
146    ):
147        page = first_page
148        combined_params = {}
149        combined_params.update(params)
150        aggregated_result = []
151        while True:
152            combined_params[page_key] = page
153            result = self.requests_get(endpoint, combined_params, sudo)
154
155            if not result:
156                return aggregated_result
157
158            if isinstance(result, dict):
159                if "data" in result:
160                    data = result["data"]
161                    if len(data) == 0:
162                        return aggregated_result
163                    aggregated_result.extend(data)
164                elif "tree" in result:
165                    data = result["tree"]
166                    if data is None or len(data) == 0:
167                        return aggregated_result
168                    aggregated_result.extend(data)
169                else:
170                    raise NotImplementedError(
171                        "requests_get_paginated does not know how to handle responses of this type."
172                    )
173            else:
174                aggregated_result.extend(result)
175
176            page += 1
177
178    def requests_put(self, endpoint: str, data: Optional[dict] = None):
179        if not data:
180            data = {}
181        request = self.requests.put(
182            self.__get_url(endpoint), headers=self.headers, data=json.dumps(data)
183        )
184        if request.status_code not in [200, 204]:
185            message = f"Received status code: {request.status_code} ({request.url}) {request.text}"
186            self.logger.error(message)
187            raise Exception(message)
188
189    def requests_delete(self, endpoint: str, data: Optional[dict] = None):
190        request = self.requests.delete(
191            self.__get_url(endpoint), headers=self.headers, data=json.dumps(data)
192        )
193        if request.status_code not in [200, 204]:
194            message = f"Received status code: {request.status_code} ({request.url})"
195            self.logger.error(message)
196            raise Exception(message)
197
198    def requests_post(
199        self,
200        endpoint: str,
201        data: Optional[dict] = None,
202        params: Optional[dict] = None,
203        files: Optional[dict] = None,
204    ):
205        """
206        Make a POST call to the endpoint.
207
208        :param endpoint: The path to the endpoint
209        :param data: A dictionary for JSON data
210        :param params: A dictionary of query params
211        :param files: A dictionary of files, see requests.post. Using both files and data
212                      can lead to unexpected results!
213        :return: The JSON response parsed as a dict
214        """
215
216        # This should ideally be a TypedDict of the type of arguments taken by
217        # `requests.post`.
218        args: dict[str, Any] = {
219            "headers": self.headers.copy(),
220        }
221        if data is not None:
222            args["data"] = json.dumps(data)
223        if params is not None:
224            args["params"] = params
225        if files is not None:
226            args["headers"].pop("Content-type")
227            args["files"] = files
228
229        request = self.requests.post(self.__get_url(endpoint), **args)
230
231        if request.status_code not in [200, 201, 202]:
232            if "already exists" in request.text or "e-mail already in use" in request.text:
233                self.logger.warning(request.text)
234                raise AlreadyExistsException()
235            self.logger.error(f"Received status code: {request.status_code} ({request.url})")
236            self.logger.error(f"With info: {data} ({self.headers})")
237            self.logger.error(f"Answer: {request.text}")
238            raise Exception(
239                f"Received status code: {request.status_code} ({request.url}), {request.text}"
240            )
241        return self.parse_result(request)
242
243    def requests_patch(self, endpoint: str, data: dict):
244        request = self.requests.patch(
245            self.__get_url(endpoint), headers=self.headers, data=json.dumps(data)
246        )
247        if request.status_code not in [200, 201]:
248            error_message = f"Received status code: {request.status_code} ({request.url}) {data}"
249            self.logger.error(error_message)
250            raise Exception(error_message)
251        return self.parse_result(request)
252
253    def get_orgs_public_members_all(self, orgname):
254        path = "/orgs/" + orgname + "/public_members"
255        return self.requests_get(path)
256
257    def get_orgs(self):
258        path = "/admin/orgs"
259        results = self.requests_get(path)
260        return [Organization.parse_response(self, result) for result in results]
261
262    def get_user(self):
263        result = self.requests_get(AllSpice.GET_USER)
264        return User.parse_response(self, result)
265
266    def get_version(self) -> str:
267        result = self.requests_get(AllSpice.ALLSPICE_HUB_VERSION)
268        return result["version"]
269
270    def get_users(self) -> List[User]:
271        results = self.requests_get(AllSpice.GET_USERS_ADMIN)
272        return [User.parse_response(self, result) for result in results]
273
274    def get_user_by_email(self, email: str) -> Optional[User]:
275        users = self.get_users()
276        for user in users:
277            if user.email == email or email in user.emails:
278                return user
279        return None
280
281    def get_user_by_name(self, username: str) -> Optional[User]:
282        users = self.get_users()
283        for user in users:
284            if user.username == username:
285                return user
286        return None
287
288    def get_repository(self, owner: str, name: str) -> Repository:
289        path = self.GET_REPOSITORY.format(owner=owner, name=name)
290        result = self.requests_get(path)
291        return Repository.parse_response(self, result)
292
293    def create_user(
294        self,
295        user_name: str,
296        email: str,
297        password: str,
298        full_name: Optional[str] = None,
299        login_name: Optional[str] = None,
300        change_pw=True,
301        send_notify=True,
302        source_id=0,
303    ):
304        """Create User.
305        Throws:
306            AlreadyExistsException, if the User exists already
307            Exception, if something else went wrong.
308        """
309        if not login_name:
310            login_name = user_name
311        if not full_name:
312            full_name = user_name
313        request_data = {
314            "source_id": source_id,
315            "login_name": login_name,
316            "full_name": full_name,
317            "username": user_name,
318            "email": email,
319            "password": password,
320            "send_notify": send_notify,
321            "must_change_password": change_pw,
322        }
323
324        self.logger.debug("Gitea post payload: %s", request_data)
325        result = self.requests_post(AllSpice.ADMIN_CREATE_USER, data=request_data)
326        if "id" in result:
327            self.logger.info(
328                "Successfully created User %s <%s> (id %s)",
329                result["login"],
330                result["email"],
331                result["id"],
332            )
333            self.logger.debug("Gitea response: %s", result)
334        else:
335            self.logger.error(result["message"])
336            raise Exception("User not created... (gitea: %s)" % result["message"])
337        user = User.parse_response(self, result)
338        return user
339
340    def create_repo(
341        self,
342        repoOwner: Union[User, Organization],
343        repoName: str,
344        description: str = "",
345        private: bool = False,
346        autoInit=True,
347        gitignores: Optional[str] = None,
348        license: Optional[str] = None,
349        readme: str = "Default",
350        issue_labels: Optional[str] = None,
351        default_branch="master",
352    ):
353        """Create a Repository as the administrator
354
355        Throws:
356            AlreadyExistsException: If the Repository exists already.
357            Exception: If something else went wrong.
358
359        Note:
360            Non-admin users can not use this method. Please use instead
361            `allspice.User.create_repo` or `allspice.Organization.create_repo`.
362        """
363        # although this only says user in the api, this also works for
364        # organizations
365        assert isinstance(repoOwner, User) or isinstance(repoOwner, Organization)
366        result = self.requests_post(
367            AllSpice.ADMIN_REPO_CREATE % repoOwner.username,
368            data={
369                "name": repoName,
370                "description": description,
371                "private": private,
372                "auto_init": autoInit,
373                "gitignores": gitignores,
374                "license": license,
375                "issue_labels": issue_labels,
376                "readme": readme,
377                "default_branch": default_branch,
378            },
379        )
380        if "id" in result:
381            self.logger.info("Successfully created Repository %s " % result["name"])
382        else:
383            self.logger.error(result["message"])
384            raise Exception("Repository not created... (gitea: %s)" % result["message"])
385        return Repository.parse_response(self, result)
386
387    def create_org(
388        self,
389        owner: User,
390        orgName: str,
391        description: str,
392        location="",
393        website="",
394        full_name="",
395    ):
396        assert isinstance(owner, User)
397        result = self.requests_post(
398            AllSpice.CREATE_ORG % owner.username,
399            data={
400                "username": orgName,
401                "description": description,
402                "location": location,
403                "website": website,
404                "full_name": full_name,
405            },
406        )
407        if "id" in result:
408            self.logger.info("Successfully created Organization %s" % result["username"])
409        else:
410            self.logger.error("Organization not created... (gitea: %s)" % result["message"])
411            self.logger.error(result["message"])
412            raise Exception("Organization not created... (gitea: %s)" % result["message"])
413        return Organization.parse_response(self, result)
414
415    def create_team(
416        self,
417        org: Organization,
418        name: str,
419        description: str = "",
420        permission: str = "read",
421        can_create_org_repo: bool = False,
422        includes_all_repositories: bool = False,
423        units=(
424            "repo.code",
425            "repo.issues",
426            "repo.ext_issues",
427            "repo.wiki",
428            "repo.pulls",
429            "repo.releases",
430            "repo.ext_wiki",
431        ),
432        units_map={},
433    ):
434        """Creates a Team.
435
436        Args:
437            org (Organization): Organization the Team will be part of.
438            name (str): The Name of the Team to be created.
439            description (str): Optional, None, short description of the new Team.
440            permission (str): Optional, 'read', What permissions the members
441            units_map (dict): Optional, {}, a mapping of units to their
442                permissions. If None or empty, the `permission` permission will
443                be applied to all units. Note: When both `units` and `units_map`
444                are given, `units_map` will be preferred.
445        """
446
447        result = self.requests_post(
448            AllSpice.CREATE_TEAM % org.username,
449            data={
450                "name": name,
451                "description": description,
452                "permission": permission,
453                "can_create_org_repo": can_create_org_repo,
454                "includes_all_repositories": includes_all_repositories,
455                "units": units,
456                "units_map": units_map,
457            },
458        )
459
460        if "id" in result:
461            self.logger.info("Successfully created Team %s" % result["name"])
462        else:
463            self.logger.error("Team not created... (gitea: %s)" % result["message"])
464            self.logger.error(result["message"])
465            raise Exception("Team not created... (gitea: %s)" % result["message"])
466        api_object = Team.parse_response(self, result)
467        setattr(
468            api_object, "_organization", org
469        )  # fixes strange behaviour of gitea not returning a valid organization here.
470        return api_object
class AllSpice:
 21class AllSpice:
 22    """Object to establish a session with AllSpice Hub."""
 23
 24    ADMIN_CREATE_USER = """/admin/users"""
 25    GET_USERS_ADMIN = """/admin/users"""
 26    ADMIN_REPO_CREATE = """/admin/users/%s/repos"""  # <ownername>
 27    ALLSPICE_HUB_VERSION = """/version"""
 28    GET_USER = """/user"""
 29    GET_REPOSITORY = """/repos/{owner}/{name}"""
 30    CREATE_ORG = """/admin/users/%s/orgs"""  # <username>
 31    CREATE_TEAM = """/orgs/%s/teams"""  # <orgname>
 32
 33    def __init__(
 34        self,
 35        allspice_hub_url="https://hub.allspice.io",
 36        token_text=None,
 37        auth=None,
 38        verify=True,
 39        log_level="INFO",
 40        ratelimiting=(100, 60),
 41    ):
 42        """Initializing an instance of the AllSpice Hub Client
 43
 44        Args:
 45            allspice_hub_url (str): The URL for the AllSpice Hub instance.
 46                Defaults to `https://hub.allspice.io`.
 47
 48            token_text (str, None): The access token, by default None.
 49
 50            auth (tuple, None): The user credentials
 51                `(username, password)`, by default None.
 52
 53            verify (bool): If True, allow insecure server connections
 54                when using SSL.
 55
 56            log_level (str): The log level, by default `INFO`.
 57
 58            ratelimiting (tuple[int, int], None): `(max_calls, period)`,
 59                If None, no rate limiting is applied. By default, 100 calls
 60                per minute are allowed.
 61        """
 62
 63        self.logger = logging.getLogger(__name__)
 64        handler = logging.StreamHandler(sys.stderr)
 65        handler.setFormatter(
 66            logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
 67        )
 68        self.logger.addHandler(handler)
 69        self.logger.setLevel(log_level)
 70        self.headers = {
 71            "Content-type": "application/json",
 72        }
 73        self.url = allspice_hub_url
 74
 75        if ratelimiting is None:
 76            self.requests = requests.Session()
 77        else:
 78            (max_calls, period) = ratelimiting
 79            self.requests = RateLimitedSession(max_calls=max_calls, period=period)
 80
 81        # Manage authentification
 82        if not token_text and not auth:
 83            raise ValueError("Please provide auth or token_text, but not both")
 84        if token_text:
 85            self.headers["Authorization"] = "token " + token_text
 86        if auth:
 87            self.logger.warning(
 88                "Using basic auth is not recommended. Prefer using a token instead."
 89            )
 90            self.requests.auth = auth
 91
 92        # Manage SSL certification verification
 93        self.requests.verify = verify
 94        if not verify:
 95            urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
 96
 97    def __get_url(self, endpoint):
 98        url = self.url + "/api/v1" + endpoint
 99        self.logger.debug("Url: %s" % url)
100        return url
101
102    def __get(self, endpoint: str, params: Mapping = frozendict()) -> requests.Response:
103        request = self.requests.get(self.__get_url(endpoint), headers=self.headers, params=params)
104        if request.status_code not in [200, 201]:
105            message = f"Received status code: {request.status_code} ({request.url})"
106            if request.status_code in [404]:
107                raise NotFoundException(message)
108            if request.status_code in [403]:
109                raise Exception(
110                    f"Unauthorized: {request.url} - Check your permissions and try again! ({message})"
111                )
112            if request.status_code in [409]:
113                raise ConflictException(message)
114            if request.status_code in [503]:
115                raise NotYetGeneratedException(message)
116            raise Exception(message)
117        return request
118
119    @staticmethod
120    def parse_result(result) -> Dict:
121        """Parses the result-JSON to a dict."""
122        if result.text and len(result.text) > 3:
123            return json.loads(result.text)
124        return {}
125
126    def requests_get(self, endpoint: str, params: Mapping = frozendict(), sudo=None):
127        combined_params = {}
128        combined_params.update(params)
129        if sudo:
130            combined_params["sudo"] = sudo.username
131        return self.parse_result(self.__get(endpoint, combined_params))
132
133    def requests_get_raw(self, endpoint: str, params=frozendict(), sudo=None) -> bytes:
134        combined_params = {}
135        combined_params.update(params)
136        if sudo:
137            combined_params["sudo"] = sudo.username
138        return self.__get(endpoint, combined_params).content
139
140    def requests_get_paginated(
141        self,
142        endpoint: str,
143        params=frozendict(),
144        sudo=None,
145        page_key: str = "page",
146        first_page: int = 1,
147    ):
148        page = first_page
149        combined_params = {}
150        combined_params.update(params)
151        aggregated_result = []
152        while True:
153            combined_params[page_key] = page
154            result = self.requests_get(endpoint, combined_params, sudo)
155
156            if not result:
157                return aggregated_result
158
159            if isinstance(result, dict):
160                if "data" in result:
161                    data = result["data"]
162                    if len(data) == 0:
163                        return aggregated_result
164                    aggregated_result.extend(data)
165                elif "tree" in result:
166                    data = result["tree"]
167                    if data is None or len(data) == 0:
168                        return aggregated_result
169                    aggregated_result.extend(data)
170                else:
171                    raise NotImplementedError(
172                        "requests_get_paginated does not know how to handle responses of this type."
173                    )
174            else:
175                aggregated_result.extend(result)
176
177            page += 1
178
179    def requests_put(self, endpoint: str, data: Optional[dict] = None):
180        if not data:
181            data = {}
182        request = self.requests.put(
183            self.__get_url(endpoint), headers=self.headers, data=json.dumps(data)
184        )
185        if request.status_code not in [200, 204]:
186            message = f"Received status code: {request.status_code} ({request.url}) {request.text}"
187            self.logger.error(message)
188            raise Exception(message)
189
190    def requests_delete(self, endpoint: str, data: Optional[dict] = None):
191        request = self.requests.delete(
192            self.__get_url(endpoint), headers=self.headers, data=json.dumps(data)
193        )
194        if request.status_code not in [200, 204]:
195            message = f"Received status code: {request.status_code} ({request.url})"
196            self.logger.error(message)
197            raise Exception(message)
198
199    def requests_post(
200        self,
201        endpoint: str,
202        data: Optional[dict] = None,
203        params: Optional[dict] = None,
204        files: Optional[dict] = None,
205    ):
206        """
207        Make a POST call to the endpoint.
208
209        :param endpoint: The path to the endpoint
210        :param data: A dictionary for JSON data
211        :param params: A dictionary of query params
212        :param files: A dictionary of files, see requests.post. Using both files and data
213                      can lead to unexpected results!
214        :return: The JSON response parsed as a dict
215        """
216
217        # This should ideally be a TypedDict of the type of arguments taken by
218        # `requests.post`.
219        args: dict[str, Any] = {
220            "headers": self.headers.copy(),
221        }
222        if data is not None:
223            args["data"] = json.dumps(data)
224        if params is not None:
225            args["params"] = params
226        if files is not None:
227            args["headers"].pop("Content-type")
228            args["files"] = files
229
230        request = self.requests.post(self.__get_url(endpoint), **args)
231
232        if request.status_code not in [200, 201, 202]:
233            if "already exists" in request.text or "e-mail already in use" in request.text:
234                self.logger.warning(request.text)
235                raise AlreadyExistsException()
236            self.logger.error(f"Received status code: {request.status_code} ({request.url})")
237            self.logger.error(f"With info: {data} ({self.headers})")
238            self.logger.error(f"Answer: {request.text}")
239            raise Exception(
240                f"Received status code: {request.status_code} ({request.url}), {request.text}"
241            )
242        return self.parse_result(request)
243
244    def requests_patch(self, endpoint: str, data: dict):
245        request = self.requests.patch(
246            self.__get_url(endpoint), headers=self.headers, data=json.dumps(data)
247        )
248        if request.status_code not in [200, 201]:
249            error_message = f"Received status code: {request.status_code} ({request.url}) {data}"
250            self.logger.error(error_message)
251            raise Exception(error_message)
252        return self.parse_result(request)
253
254    def get_orgs_public_members_all(self, orgname):
255        path = "/orgs/" + orgname + "/public_members"
256        return self.requests_get(path)
257
258    def get_orgs(self):
259        path = "/admin/orgs"
260        results = self.requests_get(path)
261        return [Organization.parse_response(self, result) for result in results]
262
263    def get_user(self):
264        result = self.requests_get(AllSpice.GET_USER)
265        return User.parse_response(self, result)
266
267    def get_version(self) -> str:
268        result = self.requests_get(AllSpice.ALLSPICE_HUB_VERSION)
269        return result["version"]
270
271    def get_users(self) -> List[User]:
272        results = self.requests_get(AllSpice.GET_USERS_ADMIN)
273        return [User.parse_response(self, result) for result in results]
274
275    def get_user_by_email(self, email: str) -> Optional[User]:
276        users = self.get_users()
277        for user in users:
278            if user.email == email or email in user.emails:
279                return user
280        return None
281
282    def get_user_by_name(self, username: str) -> Optional[User]:
283        users = self.get_users()
284        for user in users:
285            if user.username == username:
286                return user
287        return None
288
289    def get_repository(self, owner: str, name: str) -> Repository:
290        path = self.GET_REPOSITORY.format(owner=owner, name=name)
291        result = self.requests_get(path)
292        return Repository.parse_response(self, result)
293
294    def create_user(
295        self,
296        user_name: str,
297        email: str,
298        password: str,
299        full_name: Optional[str] = None,
300        login_name: Optional[str] = None,
301        change_pw=True,
302        send_notify=True,
303        source_id=0,
304    ):
305        """Create User.
306        Throws:
307            AlreadyExistsException, if the User exists already
308            Exception, if something else went wrong.
309        """
310        if not login_name:
311            login_name = user_name
312        if not full_name:
313            full_name = user_name
314        request_data = {
315            "source_id": source_id,
316            "login_name": login_name,
317            "full_name": full_name,
318            "username": user_name,
319            "email": email,
320            "password": password,
321            "send_notify": send_notify,
322            "must_change_password": change_pw,
323        }
324
325        self.logger.debug("Gitea post payload: %s", request_data)
326        result = self.requests_post(AllSpice.ADMIN_CREATE_USER, data=request_data)
327        if "id" in result:
328            self.logger.info(
329                "Successfully created User %s <%s> (id %s)",
330                result["login"],
331                result["email"],
332                result["id"],
333            )
334            self.logger.debug("Gitea response: %s", result)
335        else:
336            self.logger.error(result["message"])
337            raise Exception("User not created... (gitea: %s)" % result["message"])
338        user = User.parse_response(self, result)
339        return user
340
341    def create_repo(
342        self,
343        repoOwner: Union[User, Organization],
344        repoName: str,
345        description: str = "",
346        private: bool = False,
347        autoInit=True,
348        gitignores: Optional[str] = None,
349        license: Optional[str] = None,
350        readme: str = "Default",
351        issue_labels: Optional[str] = None,
352        default_branch="master",
353    ):
354        """Create a Repository as the administrator
355
356        Throws:
357            AlreadyExistsException: If the Repository exists already.
358            Exception: If something else went wrong.
359
360        Note:
361            Non-admin users can not use this method. Please use instead
362            `allspice.User.create_repo` or `allspice.Organization.create_repo`.
363        """
364        # although this only says user in the api, this also works for
365        # organizations
366        assert isinstance(repoOwner, User) or isinstance(repoOwner, Organization)
367        result = self.requests_post(
368            AllSpice.ADMIN_REPO_CREATE % repoOwner.username,
369            data={
370                "name": repoName,
371                "description": description,
372                "private": private,
373                "auto_init": autoInit,
374                "gitignores": gitignores,
375                "license": license,
376                "issue_labels": issue_labels,
377                "readme": readme,
378                "default_branch": default_branch,
379            },
380        )
381        if "id" in result:
382            self.logger.info("Successfully created Repository %s " % result["name"])
383        else:
384            self.logger.error(result["message"])
385            raise Exception("Repository not created... (gitea: %s)" % result["message"])
386        return Repository.parse_response(self, result)
387
388    def create_org(
389        self,
390        owner: User,
391        orgName: str,
392        description: str,
393        location="",
394        website="",
395        full_name="",
396    ):
397        assert isinstance(owner, User)
398        result = self.requests_post(
399            AllSpice.CREATE_ORG % owner.username,
400            data={
401                "username": orgName,
402                "description": description,
403                "location": location,
404                "website": website,
405                "full_name": full_name,
406            },
407        )
408        if "id" in result:
409            self.logger.info("Successfully created Organization %s" % result["username"])
410        else:
411            self.logger.error("Organization not created... (gitea: %s)" % result["message"])
412            self.logger.error(result["message"])
413            raise Exception("Organization not created... (gitea: %s)" % result["message"])
414        return Organization.parse_response(self, result)
415
416    def create_team(
417        self,
418        org: Organization,
419        name: str,
420        description: str = "",
421        permission: str = "read",
422        can_create_org_repo: bool = False,
423        includes_all_repositories: bool = False,
424        units=(
425            "repo.code",
426            "repo.issues",
427            "repo.ext_issues",
428            "repo.wiki",
429            "repo.pulls",
430            "repo.releases",
431            "repo.ext_wiki",
432        ),
433        units_map={},
434    ):
435        """Creates a Team.
436
437        Args:
438            org (Organization): Organization the Team will be part of.
439            name (str): The Name of the Team to be created.
440            description (str): Optional, None, short description of the new Team.
441            permission (str): Optional, 'read', What permissions the members
442            units_map (dict): Optional, {}, a mapping of units to their
443                permissions. If None or empty, the `permission` permission will
444                be applied to all units. Note: When both `units` and `units_map`
445                are given, `units_map` will be preferred.
446        """
447
448        result = self.requests_post(
449            AllSpice.CREATE_TEAM % org.username,
450            data={
451                "name": name,
452                "description": description,
453                "permission": permission,
454                "can_create_org_repo": can_create_org_repo,
455                "includes_all_repositories": includes_all_repositories,
456                "units": units,
457                "units_map": units_map,
458            },
459        )
460
461        if "id" in result:
462            self.logger.info("Successfully created Team %s" % result["name"])
463        else:
464            self.logger.error("Team not created... (gitea: %s)" % result["message"])
465            self.logger.error(result["message"])
466            raise Exception("Team not created... (gitea: %s)" % result["message"])
467        api_object = Team.parse_response(self, result)
468        setattr(
469            api_object, "_organization", org
470        )  # fixes strange behaviour of gitea not returning a valid organization here.
471        return api_object

Object to establish a session with AllSpice Hub.

AllSpice( allspice_hub_url='https://hub.allspice.io', token_text=None, auth=None, verify=True, log_level='INFO', ratelimiting=(100, 60))
33    def __init__(
34        self,
35        allspice_hub_url="https://hub.allspice.io",
36        token_text=None,
37        auth=None,
38        verify=True,
39        log_level="INFO",
40        ratelimiting=(100, 60),
41    ):
42        """Initializing an instance of the AllSpice Hub Client
43
44        Args:
45            allspice_hub_url (str): The URL for the AllSpice Hub instance.
46                Defaults to `https://hub.allspice.io`.
47
48            token_text (str, None): The access token, by default None.
49
50            auth (tuple, None): The user credentials
51                `(username, password)`, by default None.
52
53            verify (bool): If True, allow insecure server connections
54                when using SSL.
55
56            log_level (str): The log level, by default `INFO`.
57
58            ratelimiting (tuple[int, int], None): `(max_calls, period)`,
59                If None, no rate limiting is applied. By default, 100 calls
60                per minute are allowed.
61        """
62
63        self.logger = logging.getLogger(__name__)
64        handler = logging.StreamHandler(sys.stderr)
65        handler.setFormatter(
66            logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
67        )
68        self.logger.addHandler(handler)
69        self.logger.setLevel(log_level)
70        self.headers = {
71            "Content-type": "application/json",
72        }
73        self.url = allspice_hub_url
74
75        if ratelimiting is None:
76            self.requests = requests.Session()
77        else:
78            (max_calls, period) = ratelimiting
79            self.requests = RateLimitedSession(max_calls=max_calls, period=period)
80
81        # Manage authentification
82        if not token_text and not auth:
83            raise ValueError("Please provide auth or token_text, but not both")
84        if token_text:
85            self.headers["Authorization"] = "token " + token_text
86        if auth:
87            self.logger.warning(
88                "Using basic auth is not recommended. Prefer using a token instead."
89            )
90            self.requests.auth = auth
91
92        # Manage SSL certification verification
93        self.requests.verify = verify
94        if not verify:
95            urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

Initializing an instance of the AllSpice Hub Client

Args: allspice_hub_url (str): The URL for the AllSpice Hub instance. Defaults to https://hub.allspice.io.

token_text (str, None): The access token, by default None.

auth (tuple, None): The user credentials
    `(username, password)`, by default None.

verify (bool): If True, allow insecure server connections
    when using SSL.

log_level (str): The log level, by default `INFO`.

ratelimiting (tuple[int, int], None): `(max_calls, period)`,
    If None, no rate limiting is applied. By default, 100 calls
    per minute are allowed.
ADMIN_CREATE_USER = '/admin/users'
GET_USERS_ADMIN = '/admin/users'
ADMIN_REPO_CREATE = '/admin/users/%s/repos'
ALLSPICE_HUB_VERSION = '/version'
GET_USER = '/user'
GET_REPOSITORY = '/repos/{owner}/{name}'
CREATE_ORG = '/admin/users/%s/orgs'
CREATE_TEAM = '/orgs/%s/teams'
logger
headers
url
@staticmethod
def parse_result(result) -> Dict:
119    @staticmethod
120    def parse_result(result) -> Dict:
121        """Parses the result-JSON to a dict."""
122        if result.text and len(result.text) > 3:
123            return json.loads(result.text)
124        return {}

Parses the result-JSON to a dict.

def requests_get( self, endpoint: str, params: Mapping = frozendict.frozendict({}), sudo=None):
126    def requests_get(self, endpoint: str, params: Mapping = frozendict(), sudo=None):
127        combined_params = {}
128        combined_params.update(params)
129        if sudo:
130            combined_params["sudo"] = sudo.username
131        return self.parse_result(self.__get(endpoint, combined_params))
def requests_get_raw( self, endpoint: str, params=frozendict.frozendict({}), sudo=None) -> bytes:
133    def requests_get_raw(self, endpoint: str, params=frozendict(), sudo=None) -> bytes:
134        combined_params = {}
135        combined_params.update(params)
136        if sudo:
137            combined_params["sudo"] = sudo.username
138        return self.__get(endpoint, combined_params).content
def requests_get_paginated( self, endpoint: str, params=frozendict.frozendict({}), sudo=None, page_key: str = 'page', first_page: int = 1):
140    def requests_get_paginated(
141        self,
142        endpoint: str,
143        params=frozendict(),
144        sudo=None,
145        page_key: str = "page",
146        first_page: int = 1,
147    ):
148        page = first_page
149        combined_params = {}
150        combined_params.update(params)
151        aggregated_result = []
152        while True:
153            combined_params[page_key] = page
154            result = self.requests_get(endpoint, combined_params, sudo)
155
156            if not result:
157                return aggregated_result
158
159            if isinstance(result, dict):
160                if "data" in result:
161                    data = result["data"]
162                    if len(data) == 0:
163                        return aggregated_result
164                    aggregated_result.extend(data)
165                elif "tree" in result:
166                    data = result["tree"]
167                    if data is None or len(data) == 0:
168                        return aggregated_result
169                    aggregated_result.extend(data)
170                else:
171                    raise NotImplementedError(
172                        "requests_get_paginated does not know how to handle responses of this type."
173                    )
174            else:
175                aggregated_result.extend(result)
176
177            page += 1
def requests_put(self, endpoint: str, data: Optional[dict] = None):
179    def requests_put(self, endpoint: str, data: Optional[dict] = None):
180        if not data:
181            data = {}
182        request = self.requests.put(
183            self.__get_url(endpoint), headers=self.headers, data=json.dumps(data)
184        )
185        if request.status_code not in [200, 204]:
186            message = f"Received status code: {request.status_code} ({request.url}) {request.text}"
187            self.logger.error(message)
188            raise Exception(message)
def requests_delete(self, endpoint: str, data: Optional[dict] = None):
190    def requests_delete(self, endpoint: str, data: Optional[dict] = None):
191        request = self.requests.delete(
192            self.__get_url(endpoint), headers=self.headers, data=json.dumps(data)
193        )
194        if request.status_code not in [200, 204]:
195            message = f"Received status code: {request.status_code} ({request.url})"
196            self.logger.error(message)
197            raise Exception(message)
def requests_post( self, endpoint: str, data: Optional[dict] = None, params: Optional[dict] = None, files: Optional[dict] = None):
199    def requests_post(
200        self,
201        endpoint: str,
202        data: Optional[dict] = None,
203        params: Optional[dict] = None,
204        files: Optional[dict] = None,
205    ):
206        """
207        Make a POST call to the endpoint.
208
209        :param endpoint: The path to the endpoint
210        :param data: A dictionary for JSON data
211        :param params: A dictionary of query params
212        :param files: A dictionary of files, see requests.post. Using both files and data
213                      can lead to unexpected results!
214        :return: The JSON response parsed as a dict
215        """
216
217        # This should ideally be a TypedDict of the type of arguments taken by
218        # `requests.post`.
219        args: dict[str, Any] = {
220            "headers": self.headers.copy(),
221        }
222        if data is not None:
223            args["data"] = json.dumps(data)
224        if params is not None:
225            args["params"] = params
226        if files is not None:
227            args["headers"].pop("Content-type")
228            args["files"] = files
229
230        request = self.requests.post(self.__get_url(endpoint), **args)
231
232        if request.status_code not in [200, 201, 202]:
233            if "already exists" in request.text or "e-mail already in use" in request.text:
234                self.logger.warning(request.text)
235                raise AlreadyExistsException()
236            self.logger.error(f"Received status code: {request.status_code} ({request.url})")
237            self.logger.error(f"With info: {data} ({self.headers})")
238            self.logger.error(f"Answer: {request.text}")
239            raise Exception(
240                f"Received status code: {request.status_code} ({request.url}), {request.text}"
241            )
242        return self.parse_result(request)

Make a POST call to the endpoint.

Parameters
  • endpoint: The path to the endpoint
  • data: A dictionary for JSON data
  • params: A dictionary of query params
  • files: A dictionary of files, see requests.post. Using both files and data can lead to unexpected results!
Returns

The JSON response parsed as a dict

def requests_patch(self, endpoint: str, data: dict):
244    def requests_patch(self, endpoint: str, data: dict):
245        request = self.requests.patch(
246            self.__get_url(endpoint), headers=self.headers, data=json.dumps(data)
247        )
248        if request.status_code not in [200, 201]:
249            error_message = f"Received status code: {request.status_code} ({request.url}) {data}"
250            self.logger.error(error_message)
251            raise Exception(error_message)
252        return self.parse_result(request)
def get_orgs_public_members_all(self, orgname):
254    def get_orgs_public_members_all(self, orgname):
255        path = "/orgs/" + orgname + "/public_members"
256        return self.requests_get(path)
def get_orgs(self):
258    def get_orgs(self):
259        path = "/admin/orgs"
260        results = self.requests_get(path)
261        return [Organization.parse_response(self, result) for result in results]
def get_user(self):
263    def get_user(self):
264        result = self.requests_get(AllSpice.GET_USER)
265        return User.parse_response(self, result)
def get_version(self) -> str:
267    def get_version(self) -> str:
268        result = self.requests_get(AllSpice.ALLSPICE_HUB_VERSION)
269        return result["version"]
def get_users(self) -> List[allspice.User]:
271    def get_users(self) -> List[User]:
272        results = self.requests_get(AllSpice.GET_USERS_ADMIN)
273        return [User.parse_response(self, result) for result in results]
def get_user_by_email(self, email: str) -> Optional[allspice.User]:
275    def get_user_by_email(self, email: str) -> Optional[User]:
276        users = self.get_users()
277        for user in users:
278            if user.email == email or email in user.emails:
279                return user
280        return None
def get_user_by_name(self, username: str) -> Optional[allspice.User]:
282    def get_user_by_name(self, username: str) -> Optional[User]:
283        users = self.get_users()
284        for user in users:
285            if user.username == username:
286                return user
287        return None
def get_repository(self, owner: str, name: str) -> allspice.Repository:
289    def get_repository(self, owner: str, name: str) -> Repository:
290        path = self.GET_REPOSITORY.format(owner=owner, name=name)
291        result = self.requests_get(path)
292        return Repository.parse_response(self, result)
def create_user( self, user_name: str, email: str, password: str, full_name: Optional[str] = None, login_name: Optional[str] = None, change_pw=True, send_notify=True, source_id=0):
294    def create_user(
295        self,
296        user_name: str,
297        email: str,
298        password: str,
299        full_name: Optional[str] = None,
300        login_name: Optional[str] = None,
301        change_pw=True,
302        send_notify=True,
303        source_id=0,
304    ):
305        """Create User.
306        Throws:
307            AlreadyExistsException, if the User exists already
308            Exception, if something else went wrong.
309        """
310        if not login_name:
311            login_name = user_name
312        if not full_name:
313            full_name = user_name
314        request_data = {
315            "source_id": source_id,
316            "login_name": login_name,
317            "full_name": full_name,
318            "username": user_name,
319            "email": email,
320            "password": password,
321            "send_notify": send_notify,
322            "must_change_password": change_pw,
323        }
324
325        self.logger.debug("Gitea post payload: %s", request_data)
326        result = self.requests_post(AllSpice.ADMIN_CREATE_USER, data=request_data)
327        if "id" in result:
328            self.logger.info(
329                "Successfully created User %s <%s> (id %s)",
330                result["login"],
331                result["email"],
332                result["id"],
333            )
334            self.logger.debug("Gitea response: %s", result)
335        else:
336            self.logger.error(result["message"])
337            raise Exception("User not created... (gitea: %s)" % result["message"])
338        user = User.parse_response(self, result)
339        return user

Create User. Throws: AlreadyExistsException, if the User exists already Exception, if something else went wrong.

def create_repo( self, repoOwner: Union[allspice.User, allspice.Organization], repoName: str, description: str = '', private: bool = False, autoInit=True, gitignores: Optional[str] = None, license: Optional[str] = None, readme: str = 'Default', issue_labels: Optional[str] = None, default_branch='master'):
341    def create_repo(
342        self,
343        repoOwner: Union[User, Organization],
344        repoName: str,
345        description: str = "",
346        private: bool = False,
347        autoInit=True,
348        gitignores: Optional[str] = None,
349        license: Optional[str] = None,
350        readme: str = "Default",
351        issue_labels: Optional[str] = None,
352        default_branch="master",
353    ):
354        """Create a Repository as the administrator
355
356        Throws:
357            AlreadyExistsException: If the Repository exists already.
358            Exception: If something else went wrong.
359
360        Note:
361            Non-admin users can not use this method. Please use instead
362            `allspice.User.create_repo` or `allspice.Organization.create_repo`.
363        """
364        # although this only says user in the api, this also works for
365        # organizations
366        assert isinstance(repoOwner, User) or isinstance(repoOwner, Organization)
367        result = self.requests_post(
368            AllSpice.ADMIN_REPO_CREATE % repoOwner.username,
369            data={
370                "name": repoName,
371                "description": description,
372                "private": private,
373                "auto_init": autoInit,
374                "gitignores": gitignores,
375                "license": license,
376                "issue_labels": issue_labels,
377                "readme": readme,
378                "default_branch": default_branch,
379            },
380        )
381        if "id" in result:
382            self.logger.info("Successfully created Repository %s " % result["name"])
383        else:
384            self.logger.error(result["message"])
385            raise Exception("Repository not created... (gitea: %s)" % result["message"])
386        return Repository.parse_response(self, result)

Create a Repository as the administrator

Throws: AlreadyExistsException: If the Repository exists already. Exception: If something else went wrong.

Note: Non-admin users can not use this method. Please use instead allspice.User.create_repo or allspice.Organization.create_repo.

def create_org( self, owner: allspice.User, orgName: str, description: str, location='', website='', full_name=''):
388    def create_org(
389        self,
390        owner: User,
391        orgName: str,
392        description: str,
393        location="",
394        website="",
395        full_name="",
396    ):
397        assert isinstance(owner, User)
398        result = self.requests_post(
399            AllSpice.CREATE_ORG % owner.username,
400            data={
401                "username": orgName,
402                "description": description,
403                "location": location,
404                "website": website,
405                "full_name": full_name,
406            },
407        )
408        if "id" in result:
409            self.logger.info("Successfully created Organization %s" % result["username"])
410        else:
411            self.logger.error("Organization not created... (gitea: %s)" % result["message"])
412            self.logger.error(result["message"])
413            raise Exception("Organization not created... (gitea: %s)" % result["message"])
414        return Organization.parse_response(self, result)
def create_team( self, org: allspice.Organization, name: str, description: str = '', permission: str = 'read', can_create_org_repo: bool = False, includes_all_repositories: bool = False, units=('repo.code', 'repo.issues', 'repo.ext_issues', 'repo.wiki', 'repo.pulls', 'repo.releases', 'repo.ext_wiki'), units_map={}):
416    def create_team(
417        self,
418        org: Organization,
419        name: str,
420        description: str = "",
421        permission: str = "read",
422        can_create_org_repo: bool = False,
423        includes_all_repositories: bool = False,
424        units=(
425            "repo.code",
426            "repo.issues",
427            "repo.ext_issues",
428            "repo.wiki",
429            "repo.pulls",
430            "repo.releases",
431            "repo.ext_wiki",
432        ),
433        units_map={},
434    ):
435        """Creates a Team.
436
437        Args:
438            org (Organization): Organization the Team will be part of.
439            name (str): The Name of the Team to be created.
440            description (str): Optional, None, short description of the new Team.
441            permission (str): Optional, 'read', What permissions the members
442            units_map (dict): Optional, {}, a mapping of units to their
443                permissions. If None or empty, the `permission` permission will
444                be applied to all units. Note: When both `units` and `units_map`
445                are given, `units_map` will be preferred.
446        """
447
448        result = self.requests_post(
449            AllSpice.CREATE_TEAM % org.username,
450            data={
451                "name": name,
452                "description": description,
453                "permission": permission,
454                "can_create_org_repo": can_create_org_repo,
455                "includes_all_repositories": includes_all_repositories,
456                "units": units,
457                "units_map": units_map,
458            },
459        )
460
461        if "id" in result:
462            self.logger.info("Successfully created Team %s" % result["name"])
463        else:
464            self.logger.error("Team not created... (gitea: %s)" % result["message"])
465            self.logger.error(result["message"])
466            raise Exception("Team not created... (gitea: %s)" % result["message"])
467        api_object = Team.parse_response(self, result)
468        setattr(
469            api_object, "_organization", org
470        )  # fixes strange behaviour of gitea not returning a valid organization here.
471        return api_object

Creates a Team.

Args: org (Organization): Organization the Team will be part of. name (str): The Name of the Team to be created. description (str): Optional, None, short description of the new Team. permission (str): Optional, 'read', What permissions the members units_map (dict): Optional, {}, a mapping of units to their permissions. If None or empty, the permission permission will be applied to all units. Note: When both units and units_map are given, units_map will be preferred.