diff --git a/permit/pdp_api/__init__.py b/permit/pdp_api/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/permit/pdp_api/interfaces.py b/permit/pdp_api/interfaces.py new file mode 100644 index 0000000..e69de29 diff --git a/permit/pdp_api/pdp_api.py b/permit/pdp_api/pdp_api.py new file mode 100644 index 0000000..6fb378e --- /dev/null +++ b/permit/pdp_api/pdp_api.py @@ -0,0 +1,69 @@ +import json +from pprint import pformat +from typing import List, Optional, Union + +import aiohttp +from loguru import logger +from permit_backend.schemas import RoleAssignmentRead + +from ..config import PermitConfig +from ..exceptions import PermitConnectionError +from ..utils.context import Context, ContextStore +from ..utils.sync import SyncClass +from .base import pagination_params + + +class PdpApi: + def __init__(self, config: PermitConfig): + self._config = config + self._headers = { + "Content-Type": "application/json", + "Authorization": f"bearer {self._config.token}", + } + self._base_url = self._config.pdp + + async def list_role_assignments( + self, + user: Optional[str] = None, + role: Optional[str] = None, + tenant: Optional[str] = None, + resource_instance: Optional[str] = None, + page: int = 1, + per_page: int = 100, + ) -> dict: + """ + List all role assignments stored in the PDP. + You can filter the results by specifying the query parameters. + user: the user key to filter, will only return role assignments granted to this user. + role: the role key to filter by, will only return role assignments of this role. + tenant: the tenant key to filter by, will only return role assignments granted within this tenant. + resource: the resource key to filter by, will only return role assignments granted on this resource. + resource_instance: the resource instance key to filter by, will only return role assignments granted with this instance as the object. + + """ + + url = f"{self._base_url}/local/role_assignments" + params = pagination_params(page, per_page) + + if user is not None: + params.update(dict(user=user)) + if role is not None: + params.update(dict(role=role)) + if tenant is not None: + params.update(dict(tenant=tenant)) + if resource_instance is not None: + params.update(dict(resource_instance=resource_instance)) + + async with aiohttp.ClientSession() as session: + async with session.get( + url, headers=self._headers, params=params + ) as response: + if response.status != 200: + raise PermitConnectionError( + f"Failed to list role assignments: {response.status}" + ) + return await response.json() + + +class SyncEnforcer(PdpApi, metaclass=SyncClass): + pass diff --git a/permit/permit.py b/permit/permit.py index b811c9e..fc34241 100644 --- a/permit/permit.py +++ b/permit/permit.py @@ -8,6 +8,7 @@ from .config import PermitConfig from .enforcement.enforcer import Action, CheckQuery, Enforcer, Resource, User from .logger import configure_logger +from .pdp_api.pdp_api import PdpApi from .utils.context import Context @@ -21,6 +22,7 @@ def __init__(self, config: Optional[PermitConfig] = None, **options): self._enforcer = Enforcer(self._config) self._api = PermitApiClient(self._config) self._elements = ElementsApi(self._config) + self._pdp_api = PdpApi(self._config) logger.debug( "Permit SDK initialized with config:\n${}", @@ -64,6 +66,18 @@ def elements(self) -> ElementsApi: """ return self._elements + @property + def pdp(self) -> PdpApi: + """ + Access the Permit PDP API using this property. + + Usage example: + + permit = Permit(token + await permit.pdp.list_role_assignments() + """ + return self._pdp_api + async def bulk_check( self, checks: list[CheckQuery], diff --git a/tests/test_rbac_e2e.py b/tests/test_rbac_e2e.py index 8d0ec6d..ecc55b7 100644 --- a/tests/test_rbac_e2e.py +++ b/tests/test_rbac_e2e.py @@ -4,9 +4,9 @@ import pytest from loguru import logger -from permit import Permit, RoleAssignmentRead from permit.exceptions import PermitApiError, PermitConnectionError +from .permit import Permit from .utils import handle_api_error @@ -15,6 +15,7 @@ def print_break(): async def test_permission_check_e2e(permit: Permit): + logger.info("initial setup of objects") try: document = await permit.api.resources.create( @@ -191,7 +192,7 @@ async def test_permission_check_e2e(permit: Permit): logger.info("testing bulk permission check") assert ( - await permit.bulk_check( + await permit.pdp_api.list_role_assignment( [ { "user": "auth0|elon", @@ -219,8 +220,12 @@ async def test_permission_check_e2e(permit: Permit): print_break() - logger.info("changing the user roles") + # list role assignments in PDP + logger.info("testing list role assignments") + assert permit.pdp_api.list_role_assignments({}) + print_break() + logger.info("changing the user roles") # change the user role - assign admin role await permit.api.users.assign_role( {