import builtins
import enum
from collections.abc import AsyncIterator
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
from aiohttp import WSMsgType
from yarl import URL
from ._config import Config
from ._core import _Core
from ._rewrite import rewrite_module
from ._utils import NoPublicConstructor, asyncgeneratorcontextmanager
@rewrite_module
@dataclass(frozen=True)
class AppTemplate:
name: str
title: str
version: str
short_description: str = ""
tags: list[str] = field(default_factory=list)
input: dict[str, Any] | None = None
description: str = ""
@rewrite_module
@dataclass(frozen=True)
class AppValue:
instance_id: str
type: str
path: str
value: Any
@rewrite_module
class AppState(str, enum.Enum):
QUEUED = "queued"
PROGRESSING = "progressing"
HEALTHY = "healthy"
DEGRADED = "degraded"
ERRORED = "errored"
UNINSTALLING = "uninstalling"
UNINSTALLED = "uninstalled"
[docs]
@classmethod
def get_active_states(cls) -> list["AppState"]:
return [state for state in cls if state != cls.UNINSTALLED]
@rewrite_module
@dataclass(frozen=True)
class App:
id: str
name: str
display_name: str
template_name: str
template_version: str
project_name: str
org_name: str
cluster_name: str
namespace: str
state: str
creator: str
created_at: datetime
updated_at: datetime
endpoints: list[str]
@rewrite_module
@dataclass(frozen=True)
class AppEventResource:
kind: str | None = None
name: str | None = None
uid: str | None = None
health_status: str | None = None
health_message: str | None = None
@rewrite_module
@dataclass(frozen=True)
class AppEvent:
created_at: datetime
state: str
reason: str | None
message: str | None
resources: list[AppEventResource]
@rewrite_module
@dataclass(frozen=True)
class AppConfigurationRevision:
revision_number: int
creator: str
comment: str | None
created_at: datetime
end_at: datetime | None
@rewrite_module
class Apps(metaclass=NoPublicConstructor):
def __init__(self, core: _Core, config: Config) -> None:
self._core = core
self._config = config
def _build_base_url(
self,
cluster_name: str | None = None,
org_name: str | None = None,
project_name: str | None = None,
) -> URL:
pk = self._config.get_project_key(cluster_name, org_name, project_name)
# Get TODO: the base URL without the /api/v1 prefix
base_url = self._config.api_url.with_path("")
url = (
base_url
/ "apis/apps/v1/cluster"
/ pk.cluster_name
/ "org"
/ pk.org_name
/ "project"
/ pk.project_name
)
return url
def _build_v2_base_url(
self,
) -> URL:
base_url = self._config.api_url.with_path("")
return base_url / "apis/apps/v2"
def _get_monitoring_url(self, cluster_name: str | None) -> URL:
if cluster_name is None:
cluster_name = self._config.cluster_name
return self._config.get_cluster(cluster_name).monitoring_url.with_path(
"/api/v1"
)
@staticmethod
def _parse_app_read_instance(item: dict[str, Any]) -> App:
return App(
id=item["id"],
name=item["name"],
created_at=datetime.fromisoformat(item["created_at"]),
updated_at=datetime.fromisoformat(item["updated_at"]),
creator=item["creator"],
display_name=item["display_name"],
template_name=item["template_name"],
template_version=item["template_version"],
project_name=item["project_name"],
org_name=item["org_name"],
cluster_name=item["cluster_name"],
namespace=item["namespace"],
state=item["state"],
endpoints=item["endpoints"],
)
[docs]
@asyncgeneratorcontextmanager
async def list(
self,
states: list[AppState] | None = None,
cluster_name: str | None = None,
org_name: str | None = None,
project_name: str | None = None,
) -> AsyncIterator[App]:
url = self._build_v2_base_url() / "instances"
cluster_name = cluster_name or self._config.cluster_name
org_name = org_name or self._config.org_name
project_name = project_name or self._config.project_name_or_raise
current_page = 1
url = url.update_query(
cluster=cluster_name,
org=org_name,
project=project_name,
page=current_page,
)
if states:
url = url.update_query(states=[state.value for state in states])
auth = await self._config._api_auth()
while True:
async with self._core.request("GET", url, auth=auth) as resp:
data = await resp.json()
for item in data["items"]:
yield self._parse_app_read_instance(item)
total_pages = data.get("pages", 1)
if current_page >= total_pages:
break
current_page += 1
url = url.update_query(page=current_page)
[docs]
async def get(self, app_id: str) -> App:
url = self._build_v2_base_url() / "instances" / app_id
auth = await self._config._api_auth()
async with self._core.request("GET", url, auth=auth) as resp:
resp.raise_for_status()
item = await resp.json()
return self._parse_app_read_instance(item)
[docs]
async def install(
self,
app_data: dict[str, Any],
cluster_name: str | None = None,
org_name: str | None = None,
project_name: str | None = None,
) -> App:
url = (
self._build_base_url(
cluster_name=cluster_name,
org_name=org_name,
project_name=project_name,
)
/ "instances"
)
auth = await self._config._api_auth()
async with self._core.request("POST", url, json=app_data, auth=auth) as resp:
resp.raise_for_status()
item = await resp.json()
return self._parse_app_read_instance(item)
def _can_configure_app(self, existing_app: App, app_data: dict[str, Any]) -> bool:
if existing_app.template_name != app_data["template_name"]:
return False
elif existing_app.template_version != app_data["template_version"]:
return False
return True
[docs]
async def uninstall(
self,
app_id: str,
cluster_name: str | None = None,
org_name: str | None = None,
project_name: str | None = None,
*,
force: bool = False,
) -> None:
url = (
self._build_base_url(
cluster_name=cluster_name,
org_name=org_name,
project_name=project_name,
)
/ "instances"
/ app_id
)
params = {}
if force:
params["force"] = "true"
auth = await self._config._api_auth()
async with self._core.request("DELETE", url, params=params, auth=auth):
pass
[docs]
@asyncgeneratorcontextmanager
async def get_values(
self,
app_id: str | None = None,
value_type: str | None = None,
cluster_name: str | None = None,
org_name: str | None = None,
project_name: str | None = None,
) -> AsyncIterator[AppValue]:
"""Get values from app instances.
Args:
app_id: Optional app instance ID to filter values
value_type: Optional value type to filter
cluster_name: Optional cluster name override
org_name: Optional organization name override
project_name: Optional project name override
Returns:
An async iterator of AppValue objects
"""
base_url = self._build_base_url(
cluster_name=cluster_name,
org_name=org_name,
project_name=project_name,
)
if app_id is not None:
url = base_url / "instances" / app_id / "values"
else:
url = base_url / "instances" / "values"
params = {}
if value_type is not None:
params["type"] = value_type
auth = await self._config._api_auth()
async with self._core.request("GET", url, params=params, auth=auth) as resp:
data = await resp.json()
for item in data["items"]:
yield AppValue(
instance_id=item.get("instance_id", item.get("app_instance_id")),
type=item["type"],
path=item["path"],
value=item.get("value"),
)
[docs]
@asyncgeneratorcontextmanager
async def list_templates(
self,
cluster_name: str | None = None,
org_name: str | None = None,
project_name: str | None = None,
) -> AsyncIterator[AppTemplate]:
url = (
self._build_base_url(
cluster_name=cluster_name,
org_name=org_name,
project_name=project_name,
)
/ "templates"
)
auth = await self._config._api_auth()
async with self._core.request("GET", url, auth=auth) as resp:
data = await resp.json()
for item in data:
# Create the AppTemplate object with only the required fields
yield AppTemplate(
name=item.get("name", ""),
version=item.get("version", ""),
title=item.get("title", ""),
short_description=item.get("short_description", ""),
tags=item.get("tags", []),
input=None,
description="",
)
[docs]
@asyncgeneratorcontextmanager
async def list_template_versions(
self,
name: str,
cluster_name: str | None = None,
org_name: str | None = None,
project_name: str | None = None,
) -> AsyncIterator[AppTemplate]:
"""List all available versions for a specific app template.
Args:
name: The name of the app template
cluster_name: Optional cluster name override
org_name: Optional organization name override
project_name: Optional project name override
Returns:
An async iterator of AppTemplate objects
"""
url = (
self._build_base_url(
cluster_name=cluster_name,
org_name=org_name,
project_name=project_name,
)
/ "templates"
/ name
)
auth = await self._config._api_auth()
async with self._core.request("GET", url, auth=auth) as resp:
data = await resp.json()
for item in data:
# Return AppTemplate objects with the same name but different versions
yield AppTemplate(
name=name,
version=item.get("version", ""),
title=item.get("title", ""),
short_description=item.get("short_description", ""),
tags=item.get("tags", []),
input=None,
description="",
)
[docs]
async def get_template(
self,
name: str,
version: str | None = None,
cluster_name: str | None = None,
org_name: str | None = None,
project_name: str | None = None,
) -> AppTemplate | None:
"""Get detailed information for a specific app template.
Args:
name: The name of the app template
version: Optional version of the template (latest if not specified)
cluster_name: Optional cluster name override
org_name: Optional organization name override
project_name: Optional project name override
Returns:
An AppTemplate object with complete template information
"""
if version is None:
version = "latest"
url = (
self._build_base_url(
cluster_name=cluster_name,
org_name=org_name,
project_name=project_name,
)
/ "templates"
/ name
/ version
)
auth = await self._config._api_auth()
async with self._core.request("GET", url, auth=auth) as resp:
resp.raise_for_status()
data = await resp.json()
if data is None:
return None
return AppTemplate(
name=data.get("name", name),
title=data.get("title", ""),
version=data.get("version", ""),
short_description=data.get("short_description", ""),
tags=data.get("tags", []),
input=data.get("input"),
description=data.get("description", ""),
)
[docs]
async def get_output(
self,
app_id: str,
cluster_name: str | None = None,
org_name: str | None = None,
project_name: str | None = None,
) -> dict[str, Any]:
"""Get output parameters from an app instance.
Args:
app_id: The ID of the app instance
cluster_name: Optional cluster name override
org_name: Optional organization name override
project_name: Optional project name override
Returns:
A dictionary containing output parameters as key-value pairs
Raises:
ResourceNotFound: If app instance not found or no output records exist
"""
url = (
self._build_base_url(
cluster_name=cluster_name,
org_name=org_name,
project_name=project_name,
)
/ "instances"
/ app_id
/ "output"
)
auth = await self._config._api_auth()
async with self._core.request("GET", url, auth=auth) as resp:
resp.raise_for_status()
return await resp.json()
[docs]
@asyncgeneratorcontextmanager
async def logs(
self,
app_id: str,
*,
cluster_name: str | None = None,
org_name: str | None = None,
project_name: str | None = None,
since: datetime | None = None,
timestamps: bool = False,
) -> AsyncIterator[bytes]:
"""Get logs for an app instance.
Args:
app_id: The ID of the app instance
cluster_name: Optional cluster name override
org_name: Optional organization name override
project_name: Optional project name override
since: Optional timestamp to start logs from
timestamps: Include timestamps in the logs output
Returns:
An async iterator of log chunks as bytes
"""
url = self._get_monitoring_url(cluster_name) / "apps" / app_id / "log_ws"
if url.scheme == "https": # pragma: no cover
url = url.with_scheme("wss")
else:
url = url.with_scheme("ws")
if since is not None:
if since.tzinfo is None:
# Interpret naive datetime object as local time.
since = since.astimezone() # pragma: no cover
url = url.update_query(since=since.isoformat())
if timestamps:
url = url.update_query(timestamps="true")
auth = await self._config._api_auth()
async with self._core.ws_connect(
url,
auth=auth,
timeout=None,
heartbeat=30,
) as ws:
async for msg in ws:
if msg.type == WSMsgType.BINARY:
if msg.data:
yield msg.data
elif msg.type == WSMsgType.ERROR: # pragma: no cover
raise ws.exception() # type: ignore
else: # pragma: no cover
raise RuntimeError(f"Incorrect WebSocket message: {msg!r}")
[docs]
@asyncgeneratorcontextmanager
async def get_events(
self,
app_id: str,
cluster_name: str | None = None,
org_name: str | None = None,
project_name: str | None = None,
) -> AsyncIterator[AppEvent]:
"""Get events for an app instance.
Args:
app_id: The ID of the app instance
cluster_name: Optional cluster name override
org_name: Optional organization name override
project_name: Optional project name override
Returns:
An async iterator of AppEvent objects
"""
url = (
self._build_base_url(
cluster_name=cluster_name,
org_name=org_name,
project_name=project_name,
)
/ "instances"
/ app_id
/ "events"
)
auth = await self._config._api_auth()
current_page = 1
while True:
async with self._core.request("GET", url, auth=auth) as resp:
resp.raise_for_status()
data = await resp.json()
for item in data.get("items", []):
resources = []
for res in item.get("resources", []):
resources.append(
AppEventResource(
kind=res.get("kind"),
name=res.get("name"),
uid=res.get("uid"),
health_status=res.get("health_status"),
health_message=res.get("health_message"),
)
)
yield AppEvent(
created_at=item["created_at"],
state=item["state"],
reason=item["reason"],
message=item.get("message"),
resources=resources,
)
total_pages = data.get("pages", 1)
if current_page >= total_pages:
break
current_page += 1
url = url.update_query(page=current_page)
[docs]
async def get_revisions(
self,
app_id: str,
) -> builtins.list[AppConfigurationRevision]:
url = self._build_v2_base_url() / "instances" / app_id / "revisions"
auth = await self._config._api_auth()
async with self._core.request("GET", url, auth=auth) as resp:
resp.raise_for_status()
data = await resp.json()
return [
AppConfigurationRevision(
revision_number=item["revision_number"],
creator=item["creator"],
comment=item["comment"],
created_at=item["created_at"],
end_at=item["end_at"],
)
for item in data
]
[docs]
async def rollback(
self,
app_id: str,
revision_number: int,
cluster_name: str | None = None,
org_name: str | None = None,
project_name: str | None = None,
comment: str | None = None,
) -> App:
url = (
self._build_base_url(
cluster_name=cluster_name or self._config.cluster_name,
org_name=org_name or self._config.org_name,
project_name=project_name or self._config.project_name_or_raise,
)
/ "instances"
/ app_id
/ "revisions"
/ str(revision_number)
/ "rollback"
)
auth = await self._config._api_auth()
payload = {}
if comment is not None:
payload["comment"] = comment
async with self._core.request("POST", url, json=payload, auth=auth) as resp:
resp.raise_for_status()
item = await resp.json()
return self._parse_app_read_instance(item)