Source code for neuro_sdk._disks

import logging
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
from typing import Any, AsyncIterator, Mapping, Optional, Union

from dateutil.parser import isoparse
from yarl import URL

from ._config import Config
from ._core import _Core
from ._rewrite import rewrite_module
from ._utils import (
    ORG_NAME_SENTINEL,
    NoPublicConstructor,
    OrgNameSentinel,
    asyncgeneratorcontextmanager,
)

logger = logging.getLogger(__package__)


@rewrite_module
@dataclass(frozen=True)
class Disk:
    id: str
    storage: int  # In bytes
    owner: str
    status: "Disk.Status"
    cluster_name: str
    org_name: Optional[str]
    created_at: datetime
    last_usage: Optional[datetime] = None
    name: Optional[str] = None
    timeout_unused: Optional[timedelta] = None
    used_bytes: Optional[int] = None

    @property
    def uri(self) -> URL:
        base = f"disk://{self.cluster_name}"
        if self.org_name:
            base += f"/{self.org_name}"
        return URL(f"{base}/{self.owner}/{self.id}")

[docs] class Status(Enum): PENDING = "Pending" READY = "Ready" BROKEN = "Broken"
@rewrite_module class Disks(metaclass=NoPublicConstructor): def __init__(self, core: _Core, config: Config) -> None: self._core = core self._config = config def _parse_disk_payload(self, payload: Mapping[str, Any]) -> Disk: last_usage_raw = payload.get("last_usage") if last_usage_raw is not None: last_usage: Optional[datetime] = isoparse(last_usage_raw) else: last_usage = None life_span_raw = payload.get("life_span") if life_span_raw is not None: timeout_unused: Optional[timedelta] = timedelta(seconds=life_span_raw) else: timeout_unused = None return Disk( id=payload["id"], storage=payload["storage"], used_bytes=payload.get("used_bytes"), owner=payload["owner"], name=payload.get("name"), status=Disk.Status(payload["status"]), cluster_name=self._config.cluster_name, org_name=payload.get("org_name"), created_at=isoparse(payload["created_at"]), last_usage=last_usage, timeout_unused=timeout_unused, ) def _get_disks_url(self, cluster_name: Optional[str]) -> URL: if cluster_name is None: cluster_name = self._config.cluster_name return self._config.get_cluster(cluster_name).disks_url
[docs] @asyncgeneratorcontextmanager async def list(self, cluster_name: Optional[str] = None) -> AsyncIterator[Disk]: url = self._get_disks_url(cluster_name) auth = await self._config._api_auth() async with self._core.request("GET", url, auth=auth) as resp: ret = await resp.json() for disk_payload in ret: yield self._parse_disk_payload(disk_payload)
[docs] async def create( self, storage: int, timeout_unused: Optional[timedelta] = None, name: Optional[str] = None, cluster_name: Optional[str] = None, org_name: Union[Optional[str], OrgNameSentinel] = ORG_NAME_SENTINEL, ) -> Disk: url = self._get_disks_url(cluster_name) auth = await self._config._api_auth() data = { "storage": storage, "life_span": timeout_unused.total_seconds() if timeout_unused else None, "name": name, "org_name": org_name if not isinstance(org_name, OrgNameSentinel) else self._config.org_name, } async with self._core.request("POST", url, auth=auth, json=data) as resp: payload = await resp.json() return self._parse_disk_payload(payload)
[docs] async def get( self, disk_id_or_name: str, cluster_name: Optional[str] = None, owner: Optional[str] = None, ) -> Disk: url = self._get_disks_url(cluster_name) / disk_id_or_name auth = await self._config._api_auth() params = {} if owner: params["owner"] = owner async with self._core.request("GET", url, auth=auth, params=params) as resp: payload = await resp.json() return self._parse_disk_payload(payload)
[docs] async def rm( self, disk_id_or_name: str, cluster_name: Optional[str] = None, owner: Optional[str] = None, ) -> None: url = self._get_disks_url(cluster_name) / disk_id_or_name auth = await self._config._api_auth() params = {} if owner: params["owner"] = owner async with self._core.request("DELETE", url, auth=auth, params=params): pass