allspice.allspice

  1import json
  2import logging
  3from typing import Any, Dict, List, Mapping, Optional, Union
  4
  5import requests
  6import urllib3
  7from frozendict import frozendict
  8
  9from .apiobject import Organization, Repository, Team, User
 10from .exceptions import (
 11    AlreadyExistsException,
 12    ConflictException,
 13    NotFoundException,
 14    NotYetGeneratedException,
 15)
 16from .ratelimiter import RateLimitedSession
 17
 18
 19class AllSpice:
 20    """Object to establish a session with AllSpice Hub."""
 21
 22    ADMIN_CREATE_USER = """/admin/users"""
 23    GET_USERS_ADMIN = """/admin/users"""
 24    ADMIN_REPO_CREATE = """/admin/users/%s/repos"""  # <ownername>
 25    ALLSPICE_HUB_VERSION = """/version"""
 26    GET_USER = """/user"""
 27    GET_REPOSITORY = """/repos/{owner}/{name}"""
 28    CREATE_ORG = """/admin/users/%s/orgs"""  # <username>
 29    CREATE_TEAM = """/orgs/%s/teams"""  # <orgname>
 30
 31    def __init__(
 32        self,
 33        allspice_hub_url="https://hub.allspice.io",
 34        token_text=None,
 35        auth=None,
 36        verify=True,
 37        log_level="INFO",
 38        ratelimiting=(100, 60),
 39    ):
 40        """Initializing an instance of the AllSpice Hub Client
 41
 42        Args:
 43            allspice_hub_url (str): The URL for the AllSpice Hub instance.
 44                Defaults to `https://hub.allspice.io`.
 45
 46            token_text (str, None): The access token, by default None.
 47
 48            auth (tuple, None): The user credentials
 49                `(username, password)`, by default None.
 50
 51            verify (bool): If True, allow insecure server connections
 52                when using SSL.
 53
 54            log_level (str): The log level, by default `INFO`.
 55
 56            ratelimiting (tuple[int, int], None): `(max_calls, period)`,
 57                If None, no rate limiting is applied. By default, 100 calls
 58                per minute are allowed.
 59        """
 60
 61        self.logger = logging.getLogger(__name__)
 62        self.logger.setLevel(log_level)
 63        self.headers = {
 64            "Content-type": "application/json",
 65        }
 66        self.url = allspice_hub_url
 67
 68        if ratelimiting is None:
 69            self.requests = requests.Session()
 70        else:
 71            (max_calls, period) = ratelimiting
 72            self.requests = RateLimitedSession(max_calls=max_calls, period=period)
 73
 74        # Manage authentification
 75        if not token_text and not auth:
 76            raise ValueError("Please provide auth or token_text, but not both")
 77        if token_text:
 78            self.headers["Authorization"] = "token " + token_text
 79        if auth:
 80            self.logger.warning(
 81                "Using basic auth is not recommended. Prefer using a token instead."
 82            )
 83            self.requests.auth = auth
 84
 85        # Manage SSL certification verification
 86        self.requests.verify = verify
 87        if not verify:
 88            urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
 89
 90    def __get_url(self, endpoint):
 91        url = self.url + "/api/v1" + endpoint
 92        self.logger.debug("Url: %s" % url)
 93        return url
 94
 95    def __get(self, endpoint: str, params: Mapping = frozendict()) -> requests.Response:
 96        request = self.requests.get(self.__get_url(endpoint), headers=self.headers, params=params)
 97        if request.status_code not in [200, 201]:
 98            message = f"Received status code: {request.status_code} ({request.url})"
 99            if request.status_code in [404]:
100                raise NotFoundException(message)
101            if request.status_code in [403]:
102                raise Exception(
103                    f"Unauthorized: {request.url} - Check your permissions and try again! ({message})"
104                )
105            if request.status_code in [409]:
106                raise ConflictException(message)
107            if request.status_code in [503]:
108                raise NotYetGeneratedException(message)
109            raise Exception(message)
110        return request
111
112    @staticmethod
113    def parse_result(result) -> Dict:
114        """Parses the result-JSON to a dict."""
115        if result.text and len(result.text) > 3:
116            return json.loads(result.text)
117        return {}
118
119    def requests_get(self, endpoint: str, params: Mapping = frozendict(), sudo=None):
120        combined_params = {}
121        combined_params.update(params)
122        if sudo:
123            combined_params["sudo"] = sudo.username
124        return self.parse_result(self.__get(endpoint, combined_params))
125
126    def requests_get_raw(self, endpoint: str, params=frozendict(), sudo=None) -> bytes:
127        combined_params = {}
128        combined_params.update(params)
129        if sudo:
130            combined_params["sudo"] = sudo.username
131        return self.__get(endpoint, combined_params).content
132
133    def requests_get_paginated(
134        self,
135        endpoint: str,
136        params=frozendict(),
137        sudo=None,
138        page_key: str = "page",
139        first_page: int = 1,
140    ):
141        page = first_page
142        combined_params = {}
143        combined_params.update(params)
144        aggregated_result = []
145        while True:
146            combined_params[page_key] = page
147            result = self.requests_get(endpoint, combined_params, sudo)
148
149            if not result:
150                return aggregated_result
151
152            if isinstance(result, dict):
153                if "data" in result:
154                    data = result["data"]
155                    if len(data) == 0:
156                        return aggregated_result
157                    aggregated_result.extend(data)
158                elif "tree" in result:
159                    data = result["tree"]
160                    if data is None or len(data) == 0:
161                        return aggregated_result
162                    aggregated_result.extend(data)
163                else:
164                    raise NotImplementedError(
165                        "requests_get_paginated does not know how to handle responses of this type."
166                    )
167            else:
168                aggregated_result.extend(result)
169
170            page += 1
171
172    def requests_put(self, endpoint: str, data: Optional[dict] = None):
173        if not data:
174            data = {}
175        request = self.requests.put(
176            self.__get_url(endpoint), headers=self.headers, data=json.dumps(data)
177        )
178        if request.status_code not in [200, 204]:
179            message = f"Received status code: {request.status_code} ({request.url}) {request.text}"
180            self.logger.error(message)
181            raise Exception(message)
182
183    def requests_delete(self, endpoint: str, data: Optional[dict] = None):
184        request = self.requests.delete(
185            self.__get_url(endpoint), headers=self.headers, data=json.dumps(data)
186        )
187        if request.status_code not in [200, 204]:
188            message = f"Received status code: {request.status_code} ({request.url})"
189            self.logger.error(message)
190            raise Exception(message)
191
192    def requests_post(
193        self,
194        endpoint: str,
195        data: Optional[dict] = None,
196        params: Optional[dict] = None,
197        files: Optional[dict] = None,
198    ):
199        """
200        Make a POST call to the endpoint.
201
202        :param endpoint: The path to the endpoint
203        :param data: A dictionary for JSON data
204        :param params: A dictionary of query params
205        :param files: A dictionary of files, see requests.post. Using both files and data
206                      can lead to unexpected results!
207        :return: The JSON response parsed as a dict
208        """
209
210        # This should ideally be a TypedDict of the type of arguments taken by
211        # `requests.post`.
212        args: dict[str, Any] = {
213            "headers": self.headers.copy(),
214        }
215        if data is not None:
216            args["data"] = json.dumps(data)
217        if params is not None:
218            args["params"] = params
219        if files is not None:
220            args["headers"].pop("Content-type")
221            args["files"] = files
222
223        request = self.requests.post(self.__get_url(endpoint), **args)
224
225        if request.status_code not in [200, 201, 202]:
226            if "already exists" in request.text or "e-mail already in use" in request.text:
227                self.logger.warning(request.text)
228                raise AlreadyExistsException()
229            self.logger.error(f"Received status code: {request.status_code} ({request.url})")
230            self.logger.error(f"With info: {data} ({self.headers})")
231            self.logger.error(f"Answer: {request.text}")
232            raise Exception(
233                f"Received status code: {request.status_code} ({request.url}), {request.text}"
234            )
235        return self.parse_result(request)
236
237    def requests_patch(self, endpoint: str, data: dict):
238        request = self.requests.patch(
239            self.__get_url(endpoint), headers=self.headers, data=json.dumps(data)
240        )
241        if request.status_code not in [200, 201]:
242            error_message = f"Received status code: {request.status_code} ({request.url}) {data}"
243            self.logger.error(error_message)
244            raise Exception(error_message)
245        return self.parse_result(request)
246
247    def get_orgs_public_members_all(self, orgname):
248        path = "/orgs/" + orgname + "/public_members"
249        return self.requests_get(path)
250
251    def get_orgs(self):
252        path = "/admin/orgs"
253        results = self.requests_get(path)
254        return [Organization.parse_response(self, result) for result in results]
255
256    def get_user(self):
257        result = self.requests_get(AllSpice.GET_USER)
258        return User.parse_response(self, result)
259
260    def get_version(self) -> str:
261        result = self.requests_get(AllSpice.ALLSPICE_HUB_VERSION)
262        return result["version"]
263
264    def get_users(self) -> List[User]:
265        results = self.requests_get(AllSpice.GET_USERS_ADMIN)
266        return [User.parse_response(self, result) for result in results]
267
268    def get_user_by_email(self, email: str) -> Optional[User]:
269        users = self.get_users()
270        for user in users:
271            if user.email == email or email in user.emails:
272                return user
273        return None
274
275    def get_user_by_name(self, username: str) -> Optional[User]:
276        users = self.get_users()
277        for user in users:
278            if user.username == username:
279                return user
280        return None
281
282    def get_repository(self, owner: str, name: str) -> Repository:
283        path = self.GET_REPOSITORY.format(owner=owner, name=name)
284        result = self.requests_get(path)
285        return Repository.parse_response(self, result)
286
287    def create_user(
288        self,
289        user_name: str,
290        email: str,
291        password: str,
292        full_name: Optional[str] = None,
293        login_name: Optional[str] = None,
294        change_pw=True,
295        send_notify=True,
296        source_id=0,
297    ):
298        """Create User.
299        Throws:
300            AlreadyExistsException, if the User exists already
301            Exception, if something else went wrong.
302        """
303        if not login_name:
304            login_name = user_name
305        if not full_name:
306            full_name = user_name
307        request_data = {
308            "source_id": source_id,
309            "login_name": login_name,
310            "full_name": full_name,
311            "username": user_name,
312            "email": email,
313            "password": password,
314            "send_notify": send_notify,
315            "must_change_password": change_pw,
316        }
317
318        self.logger.debug("Gitea post payload: %s", request_data)
319        result = self.requests_post(AllSpice.ADMIN_CREATE_USER, data=request_data)
320        if "id" in result:
321            self.logger.info(
322                "Successfully created User %s <%s> (id %s)",
323                result["login"],
324                result["email"],
325                result["id"],
326            )
327            self.logger.debug("Gitea response: %s", result)
328        else:
329            self.logger.error(result["message"])
330            raise Exception("User not created... (gitea: %s)" % result["message"])
331        user = User.parse_response(self, result)
332        return user
333
334    def create_repo(
335        self,
336        repoOwner: Union[User, Organization],
337        repoName: str,
338        description: str = "",
339        private: bool = False,
340        autoInit=True,
341        gitignores: Optional[str] = None,
342        license: Optional[str] = None,
343        readme: str = "Default",
344        issue_labels: Optional[str] = None,
345        default_branch="master",
346    ):
347        """Create a Repository as the administrator
348
349        Throws:
350            AlreadyExistsException: If the Repository exists already.
351            Exception: If something else went wrong.
352
353        Note:
354            Non-admin users can not use this method. Please use instead
355            `allspice.User.create_repo` or `allspice.Organization.create_repo`.
356        """
357        # although this only says user in the api, this also works for
358        # organizations
359        assert isinstance(repoOwner, User) or isinstance(repoOwner, Organization)
360        result = self.requests_post(
361            AllSpice.ADMIN_REPO_CREATE % repoOwner.username,
362            data={
363                "name": repoName,
364                "description": description,
365                "private": private,
366                "auto_init": autoInit,
367                "gitignores": gitignores,
368                "license": license,
369                "issue_labels": issue_labels,
370                "readme": readme,
371                "default_branch": default_branch,
372            },
373        )
374        if "id" in result:
375            self.logger.info("Successfully created Repository %s " % result["name"])
376        else:
377            self.logger.error(result["message"])
378            raise Exception("Repository not created... (gitea: %s)" % result["message"])
379        return Repository.parse_response(self, result)
380
381    def create_org(
382        self,
383        owner: User,
384        orgName: str,
385        description: str,
386        location="",
387        website="",
388        full_name="",
389    ):
390        assert isinstance(owner, User)
391        result = self.requests_post(
392            AllSpice.CREATE_ORG % owner.username,
393            data={
394                "username": orgName,
395                "description": description,
396                "location": location,
397                "website": website,
398                "full_name": full_name,
399            },
400        )
401        if "id" in result:
402            self.logger.info("Successfully created Organization %s" % result["username"])
403        else:
404            self.logger.error("Organization not created... (gitea: %s)" % result["message"])
405            self.logger.error(result["message"])
406            raise Exception("Organization not created... (gitea: %s)" % result["message"])
407        return Organization.parse_response(self, result)
408
409    def create_team(
410        self,
411        org: Organization,
412        name: str,
413        description: str = "",
414        permission: str = "read",
415        can_create_org_repo: bool = False,
416        includes_all_repositories: bool = False,
417        units=(
418            "repo.code",
419            "repo.issues",
420            "repo.ext_issues",
421            "repo.wiki",
422            "repo.pulls",
423            "repo.releases",
424            "repo.ext_wiki",
425        ),
426        units_map={},
427    ):
428        """Creates a Team.
429
430        Args:
431            org (Organization): Organization the Team will be part of.
432            name (str): The Name of the Team to be created.
433            description (str): Optional, None, short description of the new Team.
434            permission (str): Optional, 'read', What permissions the members
435            units_map (dict): Optional, {}, a mapping of units to their
436                permissions. If None or empty, the `permission` permission will
437                be applied to all units. Note: When both `units` and `units_map`
438                are given, `units_map` will be preferred.
439        """
440
441        result = self.requests_post(
442            AllSpice.CREATE_TEAM % org.username,
443            data={
444                "name": name,
445                "description": description,
446                "permission": permission,
447                "can_create_org_repo": can_create_org_repo,
448                "includes_all_repositories": includes_all_repositories,
449                "units": units,
450                "units_map": units_map,
451            },
452        )
453
454        if "id" in result:
455            self.logger.info("Successfully created Team %s" % result["name"])
456        else:
457            self.logger.error("Team not created... (gitea: %s)" % result["message"])
458            self.logger.error(result["message"])
459            raise Exception("Team not created... (gitea: %s)" % result["message"])
460        api_object = Team.parse_response(self, result)
461        setattr(
462            api_object, "_organization", org
463        )  # fixes strange behaviour of gitea not returning a valid organization here.
464        return api_object
class AllSpice:
 20class AllSpice:
 21    """Object to establish a session with AllSpice Hub."""
 22
 23    ADMIN_CREATE_USER = """/admin/users"""
 24    GET_USERS_ADMIN = """/admin/users"""
 25    ADMIN_REPO_CREATE = """/admin/users/%s/repos"""  # <ownername>
 26    ALLSPICE_HUB_VERSION = """/version"""
 27    GET_USER = """/user"""
 28    GET_REPOSITORY = """/repos/{owner}/{name}"""
 29    CREATE_ORG = """/admin/users/%s/orgs"""  # <username>
 30    CREATE_TEAM = """/orgs/%s/teams"""  # <orgname>
 31
 32    def __init__(
 33        self,
 34        allspice_hub_url="https://hub.allspice.io",
 35        token_text=None,
 36        auth=None,
 37        verify=True,
 38        log_level="INFO",
 39        ratelimiting=(100, 60),
 40    ):
 41        """Initializing an instance of the AllSpice Hub Client
 42
 43        Args:
 44            allspice_hub_url (str): The URL for the AllSpice Hub instance.
 45                Defaults to `https://hub.allspice.io`.
 46
 47            token_text (str, None): The access token, by default None.
 48
 49            auth (tuple, None): The user credentials
 50                `(username, password)`, by default None.
 51
 52            verify (bool): If True, allow insecure server connections
 53                when using SSL.
 54
 55            log_level (str): The log level, by default `INFO`.
 56
 57            ratelimiting (tuple[int, int], None): `(max_calls, period)`,
 58                If None, no rate limiting is applied. By default, 100 calls
 59                per minute are allowed.
 60        """
 61
 62        self.logger = logging.getLogger(__name__)
 63        self.logger.setLevel(log_level)
 64        self.headers = {
 65            "Content-type": "application/json",
 66        }
 67        self.url = allspice_hub_url
 68
 69        if ratelimiting is None:
 70            self.requests = requests.Session()
 71        else:
 72            (max_calls, period) = ratelimiting
 73            self.requests = RateLimitedSession(max_calls=max_calls, period=period)
 74
 75        # Manage authentification
 76        if not token_text and not auth:
 77            raise ValueError("Please provide auth or token_text, but not both")
 78        if token_text:
 79            self.headers["Authorization"] = "token " + token_text
 80        if auth:
 81            self.logger.warning(
 82                "Using basic auth is not recommended. Prefer using a token instead."
 83            )
 84            self.requests.auth = auth
 85
 86        # Manage SSL certification verification
 87        self.requests.verify = verify
 88        if not verify:
 89            urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
 90
 91    def __get_url(self, endpoint):
 92        url = self.url + "/api/v1" + endpoint
 93        self.logger.debug("Url: %s" % url)
 94        return url
 95
 96    def __get(self, endpoint: str, params: Mapping = frozendict()) -> requests.Response:
 97        request = self.requests.get(self.__get_url(endpoint), headers=self.headers, params=params)
 98        if request.status_code not in [200, 201]:
 99            message = f"Received status code: {request.status_code} ({request.url})"
100            if request.status_code in [404]:
101                raise NotFoundException(message)
102            if request.status_code in [403]:
103                raise Exception(
104                    f"Unauthorized: {request.url} - Check your permissions and try again! ({message})"
105                )
106            if request.status_code in [409]:
107                raise ConflictException(message)
108            if request.status_code in [503]:
109                raise NotYetGeneratedException(message)
110            raise Exception(message)
111        return request
112
113    @staticmethod
114    def parse_result(result) -> Dict:
115        """Parses the result-JSON to a dict."""
116        if result.text and len(result.text) > 3:
117            return json.loads(result.text)
118        return {}
119
120    def requests_get(self, endpoint: str, params: Mapping = frozendict(), sudo=None):
121        combined_params = {}
122        combined_params.update(params)
123        if sudo:
124            combined_params["sudo"] = sudo.username
125        return self.parse_result(self.__get(endpoint, combined_params))
126
127    def requests_get_raw(self, endpoint: str, params=frozendict(), sudo=None) -> bytes:
128        combined_params = {}
129        combined_params.update(params)
130        if sudo:
131            combined_params["sudo"] = sudo.username
132        return self.__get(endpoint, combined_params).content
133
134    def requests_get_paginated(
135        self,
136        endpoint: str,
137        params=frozendict(),
138        sudo=None,
139        page_key: str = "page",
140        first_page: int = 1,
141    ):
142        page = first_page
143        combined_params = {}
144        combined_params.update(params)
145        aggregated_result = []
146        while True:
147            combined_params[page_key] = page
148            result = self.requests_get(endpoint, combined_params, sudo)
149
150            if not result:
151                return aggregated_result
152
153            if isinstance(result, dict):
154                if "data" in result:
155                    data = result["data"]
156                    if len(data) == 0:
157                        return aggregated_result
158                    aggregated_result.extend(data)
159                elif "tree" in result:
160                    data = result["tree"]
161                    if data is None or len(data) == 0:
162                        return aggregated_result
163                    aggregated_result.extend(data)
164                else:
165                    raise NotImplementedError(
166                        "requests_get_paginated does not know how to handle responses of this type."
167                    )
168            else:
169                aggregated_result.extend(result)
170
171            page += 1
172
173    def requests_put(self, endpoint: str, data: Optional[dict] = None):
174        if not data:
175            data = {}
176        request = self.requests.put(
177            self.__get_url(endpoint), headers=self.headers, data=json.dumps(data)
178        )
179        if request.status_code not in [200, 204]:
180            message = f"Received status code: {request.status_code} ({request.url}) {request.text}"
181            self.logger.error(message)
182            raise Exception(message)
183
184    def requests_delete(self, endpoint: str, data: Optional[dict] = None):
185        request = self.requests.delete(
186            self.__get_url(endpoint), headers=self.headers, data=json.dumps(data)
187        )
188        if request.status_code not in [200, 204]:
189            message = f"Received status code: {request.status_code} ({request.url})"
190            self.logger.error(message)
191            raise Exception(message)
192
193    def requests_post(
194        self,
195        endpoint: str,
196        data: Optional[dict] = None,
197        params: Optional[dict] = None,
198        files: Optional[dict] = None,
199    ):
200        """
201        Make a POST call to the endpoint.
202
203        :param endpoint: The path to the endpoint
204        :param data: A dictionary for JSON data
205        :param params: A dictionary of query params
206        :param files: A dictionary of files, see requests.post. Using both files and data
207                      can lead to unexpected results!
208        :return: The JSON response parsed as a dict
209        """
210
211        # This should ideally be a TypedDict of the type of arguments taken by
212        # `requests.post`.
213        args: dict[str, Any] = {
214            "headers": self.headers.copy(),
215        }
216        if data is not None:
217            args["data"] = json.dumps(data)
218        if params is not None:
219            args["params"] = params
220        if files is not None:
221            args["headers"].pop("Content-type")
222            args["files"] = files
223
224        request = self.requests.post(self.__get_url(endpoint), **args)
225
226        if request.status_code not in [200, 201, 202]:
227            if "already exists" in request.text or "e-mail already in use" in request.text:
228                self.logger.warning(request.text)
229                raise AlreadyExistsException()
230            self.logger.error(f"Received status code: {request.status_code} ({request.url})")
231            self.logger.error(f"With info: {data} ({self.headers})")
232            self.logger.error(f"Answer: {request.text}")
233            raise Exception(
234                f"Received status code: {request.status_code} ({request.url}), {request.text}"
235            )
236        return self.parse_result(request)
237
238    def requests_patch(self, endpoint: str, data: dict):
239        request = self.requests.patch(
240            self.__get_url(endpoint), headers=self.headers, data=json.dumps(data)
241        )
242        if request.status_code not in [200, 201]:
243            error_message = f"Received status code: {request.status_code} ({request.url}) {data}"
244            self.logger.error(error_message)
245            raise Exception(error_message)
246        return self.parse_result(request)
247
248    def get_orgs_public_members_all(self, orgname):
249        path = "/orgs/" + orgname + "/public_members"
250        return self.requests_get(path)
251
252    def get_orgs(self):
253        path = "/admin/orgs"
254        results = self.requests_get(path)
255        return [Organization.parse_response(self, result) for result in results]
256
257    def get_user(self):
258        result = self.requests_get(AllSpice.GET_USER)
259        return User.parse_response(self, result)
260
261    def get_version(self) -> str:
262        result = self.requests_get(AllSpice.ALLSPICE_HUB_VERSION)
263        return result["version"]
264
265    def get_users(self) -> List[User]:
266        results = self.requests_get(AllSpice.GET_USERS_ADMIN)
267        return [User.parse_response(self, result) for result in results]
268
269    def get_user_by_email(self, email: str) -> Optional[User]:
270        users = self.get_users()
271        for user in users:
272            if user.email == email or email in user.emails:
273                return user
274        return None
275
276    def get_user_by_name(self, username: str) -> Optional[User]:
277        users = self.get_users()
278        for user in users:
279            if user.username == username:
280                return user
281        return None
282
283    def get_repository(self, owner: str, name: str) -> Repository:
284        path = self.GET_REPOSITORY.format(owner=owner, name=name)
285        result = self.requests_get(path)
286        return Repository.parse_response(self, result)
287
288    def create_user(
289        self,
290        user_name: str,
291        email: str,
292        password: str,
293        full_name: Optional[str] = None,
294        login_name: Optional[str] = None,
295        change_pw=True,
296        send_notify=True,
297        source_id=0,
298    ):
299        """Create User.
300        Throws:
301            AlreadyExistsException, if the User exists already
302            Exception, if something else went wrong.
303        """
304        if not login_name:
305            login_name = user_name
306        if not full_name:
307            full_name = user_name
308        request_data = {
309            "source_id": source_id,
310            "login_name": login_name,
311            "full_name": full_name,
312            "username": user_name,
313            "email": email,
314            "password": password,
315            "send_notify": send_notify,
316            "must_change_password": change_pw,
317        }
318
319        self.logger.debug("Gitea post payload: %s", request_data)
320        result = self.requests_post(AllSpice.ADMIN_CREATE_USER, data=request_data)
321        if "id" in result:
322            self.logger.info(
323                "Successfully created User %s <%s> (id %s)",
324                result["login"],
325                result["email"],
326                result["id"],
327            )
328            self.logger.debug("Gitea response: %s", result)
329        else:
330            self.logger.error(result["message"])
331            raise Exception("User not created... (gitea: %s)" % result["message"])
332        user = User.parse_response(self, result)
333        return user
334
335    def create_repo(
336        self,
337        repoOwner: Union[User, Organization],
338        repoName: str,
339        description: str = "",
340        private: bool = False,
341        autoInit=True,
342        gitignores: Optional[str] = None,
343        license: Optional[str] = None,
344        readme: str = "Default",
345        issue_labels: Optional[str] = None,
346        default_branch="master",
347    ):
348        """Create a Repository as the administrator
349
350        Throws:
351            AlreadyExistsException: If the Repository exists already.
352            Exception: If something else went wrong.
353
354        Note:
355            Non-admin users can not use this method. Please use instead
356            `allspice.User.create_repo` or `allspice.Organization.create_repo`.
357        """
358        # although this only says user in the api, this also works for
359        # organizations
360        assert isinstance(repoOwner, User) or isinstance(repoOwner, Organization)
361        result = self.requests_post(
362            AllSpice.ADMIN_REPO_CREATE % repoOwner.username,
363            data={
364                "name": repoName,
365                "description": description,
366                "private": private,
367                "auto_init": autoInit,
368                "gitignores": gitignores,
369                "license": license,
370                "issue_labels": issue_labels,
371                "readme": readme,
372                "default_branch": default_branch,
373            },
374        )
375        if "id" in result:
376            self.logger.info("Successfully created Repository %s " % result["name"])
377        else:
378            self.logger.error(result["message"])
379            raise Exception("Repository not created... (gitea: %s)" % result["message"])
380        return Repository.parse_response(self, result)
381
382    def create_org(
383        self,
384        owner: User,
385        orgName: str,
386        description: str,
387        location="",
388        website="",
389        full_name="",
390    ):
391        assert isinstance(owner, User)
392        result = self.requests_post(
393            AllSpice.CREATE_ORG % owner.username,
394            data={
395                "username": orgName,
396                "description": description,
397                "location": location,
398                "website": website,
399                "full_name": full_name,
400            },
401        )
402        if "id" in result:
403            self.logger.info("Successfully created Organization %s" % result["username"])
404        else:
405            self.logger.error("Organization not created... (gitea: %s)" % result["message"])
406            self.logger.error(result["message"])
407            raise Exception("Organization not created... (gitea: %s)" % result["message"])
408        return Organization.parse_response(self, result)
409
410    def create_team(
411        self,
412        org: Organization,
413        name: str,
414        description: str = "",
415        permission: str = "read",
416        can_create_org_repo: bool = False,
417        includes_all_repositories: bool = False,
418        units=(
419            "repo.code",
420            "repo.issues",
421            "repo.ext_issues",
422            "repo.wiki",
423            "repo.pulls",
424            "repo.releases",
425            "repo.ext_wiki",
426        ),
427        units_map={},
428    ):
429        """Creates a Team.
430
431        Args:
432            org (Organization): Organization the Team will be part of.
433            name (str): The Name of the Team to be created.
434            description (str): Optional, None, short description of the new Team.
435            permission (str): Optional, 'read', What permissions the members
436            units_map (dict): Optional, {}, a mapping of units to their
437                permissions. If None or empty, the `permission` permission will
438                be applied to all units. Note: When both `units` and `units_map`
439                are given, `units_map` will be preferred.
440        """
441
442        result = self.requests_post(
443            AllSpice.CREATE_TEAM % org.username,
444            data={
445                "name": name,
446                "description": description,
447                "permission": permission,
448                "can_create_org_repo": can_create_org_repo,
449                "includes_all_repositories": includes_all_repositories,
450                "units": units,
451                "units_map": units_map,
452            },
453        )
454
455        if "id" in result:
456            self.logger.info("Successfully created Team %s" % result["name"])
457        else:
458            self.logger.error("Team not created... (gitea: %s)" % result["message"])
459            self.logger.error(result["message"])
460            raise Exception("Team not created... (gitea: %s)" % result["message"])
461        api_object = Team.parse_response(self, result)
462        setattr(
463            api_object, "_organization", org
464        )  # fixes strange behaviour of gitea not returning a valid organization here.
465        return api_object

Object to establish a session with AllSpice Hub.

AllSpice( allspice_hub_url='https://hub.allspice.io', token_text=None, auth=None, verify=True, log_level='INFO', ratelimiting=(100, 60))
32    def __init__(
33        self,
34        allspice_hub_url="https://hub.allspice.io",
35        token_text=None,
36        auth=None,
37        verify=True,
38        log_level="INFO",
39        ratelimiting=(100, 60),
40    ):
41        """Initializing an instance of the AllSpice Hub Client
42
43        Args:
44            allspice_hub_url (str): The URL for the AllSpice Hub instance.
45                Defaults to `https://hub.allspice.io`.
46
47            token_text (str, None): The access token, by default None.
48
49            auth (tuple, None): The user credentials
50                `(username, password)`, by default None.
51
52            verify (bool): If True, allow insecure server connections
53                when using SSL.
54
55            log_level (str): The log level, by default `INFO`.
56
57            ratelimiting (tuple[int, int], None): `(max_calls, period)`,
58                If None, no rate limiting is applied. By default, 100 calls
59                per minute are allowed.
60        """
61
62        self.logger = logging.getLogger(__name__)
63        self.logger.setLevel(log_level)
64        self.headers = {
65            "Content-type": "application/json",
66        }
67        self.url = allspice_hub_url
68
69        if ratelimiting is None:
70            self.requests = requests.Session()
71        else:
72            (max_calls, period) = ratelimiting
73            self.requests = RateLimitedSession(max_calls=max_calls, period=period)
74
75        # Manage authentification
76        if not token_text and not auth:
77            raise ValueError("Please provide auth or token_text, but not both")
78        if token_text:
79            self.headers["Authorization"] = "token " + token_text
80        if auth:
81            self.logger.warning(
82                "Using basic auth is not recommended. Prefer using a token instead."
83            )
84            self.requests.auth = auth
85
86        # Manage SSL certification verification
87        self.requests.verify = verify
88        if not verify:
89            urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

Initializing an instance of the AllSpice Hub Client

Args: allspice_hub_url (str): The URL for the AllSpice Hub instance. Defaults to https://hub.allspice.io.

token_text (str, None): The access token, by default None.

auth (tuple, None): The user credentials
    `(username, password)`, by default None.

verify (bool): If True, allow insecure server connections
    when using SSL.

log_level (str): The log level, by default `INFO`.

ratelimiting (tuple[int, int], None): `(max_calls, period)`,
    If None, no rate limiting is applied. By default, 100 calls
    per minute are allowed.
ADMIN_CREATE_USER = '/admin/users'
GET_USERS_ADMIN = '/admin/users'
ADMIN_REPO_CREATE = '/admin/users/%s/repos'
ALLSPICE_HUB_VERSION = '/version'
GET_USER = '/user'
GET_REPOSITORY = '/repos/{owner}/{name}'
CREATE_ORG = '/admin/users/%s/orgs'
CREATE_TEAM = '/orgs/%s/teams'
logger
headers
url
@staticmethod
def parse_result(result) -> Dict:
113    @staticmethod
114    def parse_result(result) -> Dict:
115        """Parses the result-JSON to a dict."""
116        if result.text and len(result.text) > 3:
117            return json.loads(result.text)
118        return {}

Parses the result-JSON to a dict.

def requests_get( self, endpoint: str, params: Mapping = frozendict.frozendict({}), sudo=None):
120    def requests_get(self, endpoint: str, params: Mapping = frozendict(), sudo=None):
121        combined_params = {}
122        combined_params.update(params)
123        if sudo:
124            combined_params["sudo"] = sudo.username
125        return self.parse_result(self.__get(endpoint, combined_params))
def requests_get_raw( self, endpoint: str, params=frozendict.frozendict({}), sudo=None) -> bytes:
127    def requests_get_raw(self, endpoint: str, params=frozendict(), sudo=None) -> bytes:
128        combined_params = {}
129        combined_params.update(params)
130        if sudo:
131            combined_params["sudo"] = sudo.username
132        return self.__get(endpoint, combined_params).content
def requests_get_paginated( self, endpoint: str, params=frozendict.frozendict({}), sudo=None, page_key: str = 'page', first_page: int = 1):
134    def requests_get_paginated(
135        self,
136        endpoint: str,
137        params=frozendict(),
138        sudo=None,
139        page_key: str = "page",
140        first_page: int = 1,
141    ):
142        page = first_page
143        combined_params = {}
144        combined_params.update(params)
145        aggregated_result = []
146        while True:
147            combined_params[page_key] = page
148            result = self.requests_get(endpoint, combined_params, sudo)
149
150            if not result:
151                return aggregated_result
152
153            if isinstance(result, dict):
154                if "data" in result:
155                    data = result["data"]
156                    if len(data) == 0:
157                        return aggregated_result
158                    aggregated_result.extend(data)
159                elif "tree" in result:
160                    data = result["tree"]
161                    if data is None or len(data) == 0:
162                        return aggregated_result
163                    aggregated_result.extend(data)
164                else:
165                    raise NotImplementedError(
166                        "requests_get_paginated does not know how to handle responses of this type."
167                    )
168            else:
169                aggregated_result.extend(result)
170
171            page += 1
def requests_put(self, endpoint: str, data: Optional[dict] = None):
173    def requests_put(self, endpoint: str, data: Optional[dict] = None):
174        if not data:
175            data = {}
176        request = self.requests.put(
177            self.__get_url(endpoint), headers=self.headers, data=json.dumps(data)
178        )
179        if request.status_code not in [200, 204]:
180            message = f"Received status code: {request.status_code} ({request.url}) {request.text}"
181            self.logger.error(message)
182            raise Exception(message)
def requests_delete(self, endpoint: str, data: Optional[dict] = None):
184    def requests_delete(self, endpoint: str, data: Optional[dict] = None):
185        request = self.requests.delete(
186            self.__get_url(endpoint), headers=self.headers, data=json.dumps(data)
187        )
188        if request.status_code not in [200, 204]:
189            message = f"Received status code: {request.status_code} ({request.url})"
190            self.logger.error(message)
191            raise Exception(message)
def requests_post( self, endpoint: str, data: Optional[dict] = None, params: Optional[dict] = None, files: Optional[dict] = None):
193    def requests_post(
194        self,
195        endpoint: str,
196        data: Optional[dict] = None,
197        params: Optional[dict] = None,
198        files: Optional[dict] = None,
199    ):
200        """
201        Make a POST call to the endpoint.
202
203        :param endpoint: The path to the endpoint
204        :param data: A dictionary for JSON data
205        :param params: A dictionary of query params
206        :param files: A dictionary of files, see requests.post. Using both files and data
207                      can lead to unexpected results!
208        :return: The JSON response parsed as a dict
209        """
210
211        # This should ideally be a TypedDict of the type of arguments taken by
212        # `requests.post`.
213        args: dict[str, Any] = {
214            "headers": self.headers.copy(),
215        }
216        if data is not None:
217            args["data"] = json.dumps(data)
218        if params is not None:
219            args["params"] = params
220        if files is not None:
221            args["headers"].pop("Content-type")
222            args["files"] = files
223
224        request = self.requests.post(self.__get_url(endpoint), **args)
225
226        if request.status_code not in [200, 201, 202]:
227            if "already exists" in request.text or "e-mail already in use" in request.text:
228                self.logger.warning(request.text)
229                raise AlreadyExistsException()
230            self.logger.error(f"Received status code: {request.status_code} ({request.url})")
231            self.logger.error(f"With info: {data} ({self.headers})")
232            self.logger.error(f"Answer: {request.text}")
233            raise Exception(
234                f"Received status code: {request.status_code} ({request.url}), {request.text}"
235            )
236        return self.parse_result(request)

Make a POST call to the endpoint.

Parameters
  • endpoint: The path to the endpoint
  • data: A dictionary for JSON data
  • params: A dictionary of query params
  • files: A dictionary of files, see requests.post. Using both files and data can lead to unexpected results!
Returns

The JSON response parsed as a dict

def requests_patch(self, endpoint: str, data: dict):
238    def requests_patch(self, endpoint: str, data: dict):
239        request = self.requests.patch(
240            self.__get_url(endpoint), headers=self.headers, data=json.dumps(data)
241        )
242        if request.status_code not in [200, 201]:
243            error_message = f"Received status code: {request.status_code} ({request.url}) {data}"
244            self.logger.error(error_message)
245            raise Exception(error_message)
246        return self.parse_result(request)
def get_orgs_public_members_all(self, orgname):
248    def get_orgs_public_members_all(self, orgname):
249        path = "/orgs/" + orgname + "/public_members"
250        return self.requests_get(path)
def get_orgs(self):
252    def get_orgs(self):
253        path = "/admin/orgs"
254        results = self.requests_get(path)
255        return [Organization.parse_response(self, result) for result in results]
def get_user(self):
257    def get_user(self):
258        result = self.requests_get(AllSpice.GET_USER)
259        return User.parse_response(self, result)
def get_version(self) -> str:
261    def get_version(self) -> str:
262        result = self.requests_get(AllSpice.ALLSPICE_HUB_VERSION)
263        return result["version"]
def get_users(self) -> List[allspice.User]:
265    def get_users(self) -> List[User]:
266        results = self.requests_get(AllSpice.GET_USERS_ADMIN)
267        return [User.parse_response(self, result) for result in results]
def get_user_by_email(self, email: str) -> Optional[allspice.User]:
269    def get_user_by_email(self, email: str) -> Optional[User]:
270        users = self.get_users()
271        for user in users:
272            if user.email == email or email in user.emails:
273                return user
274        return None
def get_user_by_name(self, username: str) -> Optional[allspice.User]:
276    def get_user_by_name(self, username: str) -> Optional[User]:
277        users = self.get_users()
278        for user in users:
279            if user.username == username:
280                return user
281        return None
def get_repository(self, owner: str, name: str) -> allspice.Repository:
283    def get_repository(self, owner: str, name: str) -> Repository:
284        path = self.GET_REPOSITORY.format(owner=owner, name=name)
285        result = self.requests_get(path)
286        return Repository.parse_response(self, result)
def create_user( self, user_name: str, email: str, password: str, full_name: Optional[str] = None, login_name: Optional[str] = None, change_pw=True, send_notify=True, source_id=0):
288    def create_user(
289        self,
290        user_name: str,
291        email: str,
292        password: str,
293        full_name: Optional[str] = None,
294        login_name: Optional[str] = None,
295        change_pw=True,
296        send_notify=True,
297        source_id=0,
298    ):
299        """Create User.
300        Throws:
301            AlreadyExistsException, if the User exists already
302            Exception, if something else went wrong.
303        """
304        if not login_name:
305            login_name = user_name
306        if not full_name:
307            full_name = user_name
308        request_data = {
309            "source_id": source_id,
310            "login_name": login_name,
311            "full_name": full_name,
312            "username": user_name,
313            "email": email,
314            "password": password,
315            "send_notify": send_notify,
316            "must_change_password": change_pw,
317        }
318
319        self.logger.debug("Gitea post payload: %s", request_data)
320        result = self.requests_post(AllSpice.ADMIN_CREATE_USER, data=request_data)
321        if "id" in result:
322            self.logger.info(
323                "Successfully created User %s <%s> (id %s)",
324                result["login"],
325                result["email"],
326                result["id"],
327            )
328            self.logger.debug("Gitea response: %s", result)
329        else:
330            self.logger.error(result["message"])
331            raise Exception("User not created... (gitea: %s)" % result["message"])
332        user = User.parse_response(self, result)
333        return user

Create User. Throws: AlreadyExistsException, if the User exists already Exception, if something else went wrong.

def create_repo( self, repoOwner: Union[allspice.User, allspice.Organization], repoName: str, description: str = '', private: bool = False, autoInit=True, gitignores: Optional[str] = None, license: Optional[str] = None, readme: str = 'Default', issue_labels: Optional[str] = None, default_branch='master'):
335    def create_repo(
336        self,
337        repoOwner: Union[User, Organization],
338        repoName: str,
339        description: str = "",
340        private: bool = False,
341        autoInit=True,
342        gitignores: Optional[str] = None,
343        license: Optional[str] = None,
344        readme: str = "Default",
345        issue_labels: Optional[str] = None,
346        default_branch="master",
347    ):
348        """Create a Repository as the administrator
349
350        Throws:
351            AlreadyExistsException: If the Repository exists already.
352            Exception: If something else went wrong.
353
354        Note:
355            Non-admin users can not use this method. Please use instead
356            `allspice.User.create_repo` or `allspice.Organization.create_repo`.
357        """
358        # although this only says user in the api, this also works for
359        # organizations
360        assert isinstance(repoOwner, User) or isinstance(repoOwner, Organization)
361        result = self.requests_post(
362            AllSpice.ADMIN_REPO_CREATE % repoOwner.username,
363            data={
364                "name": repoName,
365                "description": description,
366                "private": private,
367                "auto_init": autoInit,
368                "gitignores": gitignores,
369                "license": license,
370                "issue_labels": issue_labels,
371                "readme": readme,
372                "default_branch": default_branch,
373            },
374        )
375        if "id" in result:
376            self.logger.info("Successfully created Repository %s " % result["name"])
377        else:
378            self.logger.error(result["message"])
379            raise Exception("Repository not created... (gitea: %s)" % result["message"])
380        return Repository.parse_response(self, result)

Create a Repository as the administrator

Throws: AlreadyExistsException: If the Repository exists already. Exception: If something else went wrong.

Note: Non-admin users can not use this method. Please use instead allspice.User.create_repo or allspice.Organization.create_repo.

def create_org( self, owner: allspice.User, orgName: str, description: str, location='', website='', full_name=''):
382    def create_org(
383        self,
384        owner: User,
385        orgName: str,
386        description: str,
387        location="",
388        website="",
389        full_name="",
390    ):
391        assert isinstance(owner, User)
392        result = self.requests_post(
393            AllSpice.CREATE_ORG % owner.username,
394            data={
395                "username": orgName,
396                "description": description,
397                "location": location,
398                "website": website,
399                "full_name": full_name,
400            },
401        )
402        if "id" in result:
403            self.logger.info("Successfully created Organization %s" % result["username"])
404        else:
405            self.logger.error("Organization not created... (gitea: %s)" % result["message"])
406            self.logger.error(result["message"])
407            raise Exception("Organization not created... (gitea: %s)" % result["message"])
408        return Organization.parse_response(self, result)
def create_team( self, org: allspice.Organization, name: str, description: str = '', permission: str = 'read', can_create_org_repo: bool = False, includes_all_repositories: bool = False, units=('repo.code', 'repo.issues', 'repo.ext_issues', 'repo.wiki', 'repo.pulls', 'repo.releases', 'repo.ext_wiki'), units_map={}):
410    def create_team(
411        self,
412        org: Organization,
413        name: str,
414        description: str = "",
415        permission: str = "read",
416        can_create_org_repo: bool = False,
417        includes_all_repositories: bool = False,
418        units=(
419            "repo.code",
420            "repo.issues",
421            "repo.ext_issues",
422            "repo.wiki",
423            "repo.pulls",
424            "repo.releases",
425            "repo.ext_wiki",
426        ),
427        units_map={},
428    ):
429        """Creates a Team.
430
431        Args:
432            org (Organization): Organization the Team will be part of.
433            name (str): The Name of the Team to be created.
434            description (str): Optional, None, short description of the new Team.
435            permission (str): Optional, 'read', What permissions the members
436            units_map (dict): Optional, {}, a mapping of units to their
437                permissions. If None or empty, the `permission` permission will
438                be applied to all units. Note: When both `units` and `units_map`
439                are given, `units_map` will be preferred.
440        """
441
442        result = self.requests_post(
443            AllSpice.CREATE_TEAM % org.username,
444            data={
445                "name": name,
446                "description": description,
447                "permission": permission,
448                "can_create_org_repo": can_create_org_repo,
449                "includes_all_repositories": includes_all_repositories,
450                "units": units,
451                "units_map": units_map,
452            },
453        )
454
455        if "id" in result:
456            self.logger.info("Successfully created Team %s" % result["name"])
457        else:
458            self.logger.error("Team not created... (gitea: %s)" % result["message"])
459            self.logger.error(result["message"])
460            raise Exception("Team not created... (gitea: %s)" % result["message"])
461        api_object = Team.parse_response(self, result)
462        setattr(
463            api_object, "_organization", org
464        )  # fixes strange behaviour of gitea not returning a valid organization here.
465        return api_object

Creates a Team.

Args: org (Organization): Organization the Team will be part of. name (str): The Name of the Team to be created. description (str): Optional, None, short description of the new Team. permission (str): Optional, 'read', What permissions the members units_map (dict): Optional, {}, a mapping of units to their permissions. If None or empty, the permission permission will be applied to all units. Note: When both units and units_map are given, units_map will be preferred.