Source code for neuro_sdk._users

from dataclasses import dataclass
from decimal import Decimal
from enum import Enum
from typing import Any, Dict, Optional, Sequence

from aiohttp.web import HTTPCreated, HTTPNoContent
from yarl import URL

from ._admin import _Admin
from ._config import Config
from ._core import _Core
from ._errors import ClientError, NotSupportedError
from ._rewrite import rewrite_module
from ._utils import NoPublicConstructor


@rewrite_module
class Action(str, Enum):
    READ = "read"
    WRITE = "write"
    MANAGE = "manage"


@rewrite_module
@dataclass(frozen=True)
class Permission:
    uri: URL
    action: Action


@rewrite_module
@dataclass(frozen=True)
class Share:
    user: str
    permission: Permission


@rewrite_module
@dataclass(frozen=True)
class Quota:
    credits: Optional[Decimal] = None
    total_running_jobs: Optional[int] = None


@rewrite_module
class Users(metaclass=NoPublicConstructor):
    def __init__(self, core: _Core, config: Config, admin: _Admin) -> None:
        self._core = core
        self._config = config
        self._admin = admin

    async def get_quota(self) -> Quota:
        try:
            ret = await self._admin.get_cluster_user(
                cluster_name=self._config.cluster_name,
                org_name=self._config.org_name,
                user_name=self._config.username,
            )
        except NotSupportedError:
            # FOSS configuration without admin service and limits
            return Quota(credits=None, total_running_jobs=None)
        return Quota(
            credits=ret.balance.credits,
            total_running_jobs=ret.quota.total_running_jobs,
        )

    async def get_org_quota(self) -> Optional[Quota]:
        if self._config.org_name is None:
            return None
        try:
            ret = await self._admin.get_org_cluster(
                cluster_name=self._config.cluster_name,
                org_name=self._config.org_name,
            )
        except NotSupportedError:
            # FOSS configuration without admin service and limits
            return Quota(credits=None, total_running_jobs=None)
        return Quota(
            credits=ret.balance.credits,
            total_running_jobs=ret.quota.total_running_jobs,
        )

[docs] async def get_acl( self, user: str, scheme: Optional[str] = None, *, uri: Optional[URL] = None ) -> Sequence[Permission]: url = self._get_user_url(user) / "permissions" if scheme: if uri is not None: raise ValueError("Conflicting arguments 'uri' and 'scheme'") uri = URL.build(scheme=scheme) params = {"uri": str(uri)} if uri is not None else {} auth = await self._config._api_auth() async with self._core.request("GET", url, params=params, auth=auth) as resp: payload = await resp.json() ret = [] for item in payload: uri = URL(item["uri"]) action = Action(item["action"]) ret.append(Permission(uri, action)) return ret
[docs] async def get_shares( self, user: str, scheme: Optional[str] = None, *, uri: Optional[URL] = None ) -> Sequence[Share]: url = self._get_user_url(user) / "permissions" / "shared" if scheme: if uri is not None: raise ValueError("Conflicting arguments 'uri' and 'scheme'") uri = URL.build(scheme=scheme) params = {"uri": str(uri)} if uri is not None else {} auth = await self._config._api_auth() async with self._core.request("GET", url, params=params, auth=auth) as resp: payload = await resp.json() ret = [] for item in payload: uri = URL(item["uri"]) action = Action(item["action"]) ret.append(Share(item["user"], Permission(uri, action))) return ret
[docs] async def get_subroles( self, user: str, ) -> Sequence[str]: url = self._get_user_url(user) / "subroles" auth = await self._config._api_auth() async with self._core.request("GET", url, auth=auth) as resp: payload = await resp.json() return payload["subroles"]
[docs] async def share(self, user: str, permission: Permission) -> Permission: url = self._get_user_url(user) / "permissions" payload = [_permission_to_api(permission)] auth = await self._config._api_auth() async with self._core.request("POST", url, json=payload, auth=auth) as resp: if resp.status != HTTPCreated.status_code: raise ClientError("Server return unexpected result.") payload = await resp.json() perm = payload[0] return Permission( uri=URL(perm["uri"]), action=Action(perm["action"]), )
[docs] async def revoke(self, user: str, uri: URL) -> None: url = self._get_user_url(user) / "permissions" auth = await self._config._api_auth() async with self._core.request( "DELETE", url, params={"uri": str(uri)}, auth=auth ) as resp: # TODO: server part contain TODO record for returning more then # HTTPNoContent, this part must me refactored then if resp.status != HTTPNoContent.status_code: raise ClientError(f"Server return unexpected result: {resp.status}.") return None
[docs] async def add(self, role_name: str) -> None: url = self._config.api_url / "users" auth = await self._config._api_auth() async with self._core.request( "POST", url, json={"name": role_name}, auth=auth ) as resp: if resp.status != HTTPCreated.status_code: raise ClientError(f"Server return unexpected result: {resp.status}.") return None
[docs] async def remove(self, role_name: str) -> None: url = self._get_user_url(role_name) auth = await self._config._api_auth() async with self._core.request("DELETE", url, auth=auth) as resp: if resp.status != HTTPNoContent.status_code: raise ClientError(f"Server return unexpected result: {resp.status}.") return None
def _get_user_url(self, user: str) -> URL: if ":" in user: raise ValueError(f"Invalid name: {user!r}") return self._config.api_url / "users" / user.replace("/", ":") def _permission_to_api(perm: Permission) -> Dict[str, Any]: primitive: Dict[str, Any] = {"uri": str(perm.uri), "action": perm.action.value} return primitive