"""Catalog management resources for VergeOS recipe repositories.
Catalogs organize recipes (VM and tenant templates) into logical groups.
Catalog repositories define where catalogs are sourced from - local, remote,
or from the Verge.io marketplace.
Key concepts:
- **Repository**: A source of catalogs (local, remote, git, Verge.io marketplace)
- **Catalog**: A collection of related recipes within a repository
- **Status**: Current state of a repository (online, refreshing, error, etc.)
Repository types:
- local: Locally created catalogs and recipes
- provider: Service provider catalogs (inherited from parent)
- remote: Remote HTTP/HTTPS URL
- remote-git: Remote Git repository
- yottabyte: Verge.io official marketplace
Example:
>>> # List all catalog repositories
>>> for repo in client.catalog_repositories.list():
... print(f"{repo.name} ({repo.type}): {repo.status_info}")
>>> # Get repository with catalogs
>>> repo = client.catalog_repositories.get(name="Verge.io Recipes")
>>> for catalog in repo.catalogs.list():
... print(f" {catalog.name}: {catalog.description}")
>>> # Refresh a repository to get latest recipes
>>> repo.refresh_catalogs()
>>> # View repository logs
>>> for log in repo.logs.list(limit=10):
... print(f"{log.level}: {log.text}")
"""
from __future__ import annotations
import builtins
from typing import TYPE_CHECKING, Any
from pyvergeos.exceptions import NotFoundError
from pyvergeos.filters import build_filter
from pyvergeos.resources.base import ResourceManager, ResourceObject
if TYPE_CHECKING:
from pyvergeos.client import VergeClient
# =============================================================================
# Catalog Repository Status
# =============================================================================
[docs]
class CatalogRepositoryStatus(ResourceObject):
"""Catalog repository status resource object.
Represents the current operational status of a catalog repository.
Attributes:
key: Status $key (row ID).
repository: Parent repository key.
status: Current status (online, refreshing, downloading, etc.).
state: State indicator (online, offline, warning, error).
info: Additional status information text.
last_update: Timestamp of last status change.
"""
@property
def repository_key(self) -> int | None:
"""Get the parent repository key."""
repo = self.get("repository")
return int(repo) if repo is not None else None
@property
def is_online(self) -> bool:
"""Check if repository is online."""
return self.get("state") == "online"
@property
def is_error(self) -> bool:
"""Check if repository is in error state."""
return self.get("state") == "error"
@property
def is_busy(self) -> bool:
"""Check if repository is currently busy (refreshing, downloading, etc.)."""
status = self.get("status", "")
return status in ("refreshing", "downloading", "installing", "applying")
[docs]
class CatalogRepositoryStatusManager(ResourceManager["CatalogRepositoryStatus"]):
"""Manager for catalog repository status operations.
Status objects are read-only and automatically created for each repository.
Example:
>>> # Get status for all repositories
>>> for status in client.catalog_repository_status.list():
... print(f"{status.repository_key}: {status.status}")
>>> # Get status for a specific repository
>>> status = client.catalog_repository_status.get_for_repository(1)
"""
_endpoint = "catalog_repository_status"
_default_fields = [
"$key",
"repository",
"repository#name as repository_display",
"status",
"state",
"info",
"last_update",
]
[docs]
def __init__(self, client: VergeClient, *, repository_key: int | None = None) -> None:
super().__init__(client)
self._repository_key = repository_key
[docs]
def list(
self,
filter: str | None = None,
fields: builtins.list[str] | None = None,
limit: int | None = None,
offset: int | None = None,
repository: int | None = None,
**filter_kwargs: Any,
) -> builtins.list[CatalogRepositoryStatus]:
"""List repository status records with optional filtering.
Args:
filter: OData filter string.
fields: List of fields to return (uses defaults if not specified).
limit: Maximum number of results.
offset: Skip this many results.
repository: Filter by repository key. Ignored if manager is scoped.
**filter_kwargs: Shorthand filter arguments.
Returns:
List of CatalogRepositoryStatus objects.
"""
params: dict[str, Any] = {}
# Build filter
filters: builtins.list[str] = []
if filter:
filters.append(filter)
if filter_kwargs:
filters.append(build_filter(**filter_kwargs))
# Add repository filter (from scope or parameter)
repo_key = self._repository_key
if repo_key is None and repository is not None:
repo_key = repository
if repo_key is not None:
filters.append(f"repository eq {repo_key}")
if filters:
params["filter"] = " and ".join(filters)
# Use default fields if not specified
if fields:
params["fields"] = ",".join(fields)
else:
params["fields"] = ",".join(self._default_fields)
# Pagination
if limit is not None:
params["limit"] = limit
if offset is not None:
params["offset"] = offset
response = self._client._request("GET", self._endpoint, params=params)
if response is None:
return []
if not isinstance(response, list):
response = [response]
return [self._to_model(item) for item in response if item]
[docs]
def get( # type: ignore[override]
self,
key: int | None = None,
*,
fields: builtins.list[str] | None = None,
) -> CatalogRepositoryStatus:
"""Get a single status record by key.
Args:
key: Status $key (row ID).
fields: List of fields to return.
Returns:
CatalogRepositoryStatus object.
Raises:
NotFoundError: If status not found.
ValueError: If key not provided.
"""
if key is None:
raise ValueError("Key must be provided")
params: dict[str, Any] = {}
if fields:
params["fields"] = ",".join(fields)
else:
params["fields"] = ",".join(self._default_fields)
response = self._client._request("GET", f"{self._endpoint}/{key}", params=params)
if response is None:
raise NotFoundError(f"Repository status with key {key} not found")
if not isinstance(response, dict):
raise NotFoundError(f"Repository status with key {key} returned invalid response")
return self._to_model(response)
[docs]
def get_for_repository(self, repository_key: int) -> CatalogRepositoryStatus:
"""Get status for a specific repository.
Args:
repository_key: Repository $key.
Returns:
CatalogRepositoryStatus object.
Raises:
NotFoundError: If status not found.
"""
results = self.list(repository=repository_key, limit=1)
if not results:
raise NotFoundError(f"Status for repository {repository_key} not found")
return results[0]
def _to_model(self, data: dict[str, Any]) -> CatalogRepositoryStatus:
"""Convert API response to CatalogRepositoryStatus object."""
return CatalogRepositoryStatus(data, self)
# =============================================================================
# Catalog Repository Logs
# =============================================================================
[docs]
class CatalogRepositoryLog(ResourceObject):
"""Catalog repository log entry resource object.
Represents a log entry from repository operations.
Attributes:
key: Log entry $key (row ID).
catalog_repository: Repository key.
level: Log level (message, warning, error, critical, etc.).
text: Log message text.
timestamp: Log timestamp (microseconds).
user: User who triggered the log.
"""
@property
def repository_key(self) -> int | None:
"""Get the repository key."""
repo = self.get("catalog_repository")
return int(repo) if repo is not None else None
@property
def is_error(self) -> bool:
"""Check if this is an error log entry."""
level = self.get("level", "")
return level in ("error", "critical")
@property
def is_warning(self) -> bool:
"""Check if this is a warning log entry."""
return self.get("level") == "warning"
[docs]
class CatalogRepositoryLogManager(ResourceManager["CatalogRepositoryLog"]):
"""Manager for catalog repository log operations.
Example:
>>> # List all repository logs
>>> for log in client.catalog_repository_logs.list():
... print(f"{log.level}: {log.text}")
>>> # List logs for a specific repository
>>> for log in repo.logs.list():
... print(f"{log.level}: {log.text}")
"""
_endpoint = "catalog_repository_logs"
_default_fields = [
"$key",
"catalog_repository",
"catalog_repository#name as repository_display",
"level",
"text",
"timestamp",
"user",
]
[docs]
def __init__(self, client: VergeClient, *, repository_key: int | None = None) -> None:
super().__init__(client)
self._repository_key = repository_key
[docs]
def list(
self,
filter: str | None = None,
fields: builtins.list[str] | None = None,
limit: int | None = None,
offset: int | None = None,
catalog_repository: int | None = None,
level: str | None = None,
**filter_kwargs: Any,
) -> builtins.list[CatalogRepositoryLog]:
"""List repository logs with optional filtering.
Args:
filter: OData filter string.
fields: List of fields to return (uses defaults if not specified).
limit: Maximum number of results.
offset: Skip this many results.
catalog_repository: Filter by repository key. Ignored if scoped.
level: Filter by log level.
**filter_kwargs: Shorthand filter arguments.
Returns:
List of CatalogRepositoryLog objects.
"""
params: dict[str, Any] = {}
# Build filter
filters: builtins.list[str] = []
if filter:
filters.append(filter)
if filter_kwargs:
filters.append(build_filter(**filter_kwargs))
# Add repository filter (from scope or parameter)
repo_key = self._repository_key
if repo_key is None and catalog_repository is not None:
repo_key = catalog_repository
if repo_key is not None:
filters.append(f"catalog_repository eq {repo_key}")
# Add level filter
if level is not None:
filters.append(f"level eq '{level}'")
if filters:
params["filter"] = " and ".join(filters)
# Use default fields if not specified
if fields:
params["fields"] = ",".join(fields)
else:
params["fields"] = ",".join(self._default_fields)
# Pagination
if limit is not None:
params["limit"] = limit
if offset is not None:
params["offset"] = offset
# Default sort by timestamp descending
params["sort"] = "-timestamp"
response = self._client._request("GET", self._endpoint, params=params)
if response is None:
return []
if not isinstance(response, list):
response = [response]
return [self._to_model(item) for item in response if item]
[docs]
def get( # type: ignore[override]
self,
key: int | None = None,
*,
fields: builtins.list[str] | None = None,
) -> CatalogRepositoryLog:
"""Get a single log entry by key.
Args:
key: Log entry $key (row ID).
fields: List of fields to return.
Returns:
CatalogRepositoryLog object.
Raises:
NotFoundError: If log entry not found.
ValueError: If key not provided.
"""
if key is None:
raise ValueError("Key must be provided")
params: dict[str, Any] = {}
if fields:
params["fields"] = ",".join(fields)
else:
params["fields"] = ",".join(self._default_fields)
response = self._client._request("GET", f"{self._endpoint}/{key}", params=params)
if response is None:
raise NotFoundError(f"Repository log with key {key} not found")
if not isinstance(response, dict):
raise NotFoundError(f"Repository log with key {key} returned invalid response")
return self._to_model(response)
[docs]
def list_errors(
self,
limit: int | None = None,
) -> builtins.list[CatalogRepositoryLog]:
"""List error and critical log entries.
Args:
limit: Maximum number of results.
Returns:
List of error/critical log entries.
"""
return self.list(
filter="(level eq 'error') or (level eq 'critical')",
limit=limit,
)
[docs]
def list_warnings(
self,
limit: int | None = None,
) -> builtins.list[CatalogRepositoryLog]:
"""List warning log entries.
Args:
limit: Maximum number of results.
Returns:
List of warning log entries.
"""
return self.list(level="warning", limit=limit)
def _to_model(self, data: dict[str, Any]) -> CatalogRepositoryLog:
"""Convert API response to CatalogRepositoryLog object."""
return CatalogRepositoryLog(data, self)
# =============================================================================
# Catalog Logs
# =============================================================================
[docs]
class CatalogLog(ResourceObject):
"""Catalog log entry resource object.
Represents a log entry from catalog operations.
Attributes:
key: Log entry $key (row ID).
catalog: Catalog key (40-char hex string).
level: Log level (message, warning, error, critical, etc.).
text: Log message text.
timestamp: Log timestamp (microseconds).
user: User who triggered the log.
"""
@property
def catalog_key(self) -> str | None:
"""Get the catalog key."""
cat = self.get("catalog")
return str(cat) if cat is not None else None
@property
def is_error(self) -> bool:
"""Check if this is an error log entry."""
level = self.get("level", "")
return level in ("error", "critical")
@property
def is_warning(self) -> bool:
"""Check if this is a warning log entry."""
return self.get("level") == "warning"
[docs]
class CatalogLogManager(ResourceManager["CatalogLog"]):
"""Manager for catalog log operations.
Example:
>>> # List all catalog logs
>>> for log in client.catalog_logs.list():
... print(f"{log.level}: {log.text}")
>>> # List logs for a specific catalog
>>> for log in catalog.logs.list():
... print(f"{log.level}: {log.text}")
"""
_endpoint = "catalog_logs"
_default_fields = [
"$key",
"catalog",
"catalog#name as catalog_display",
"level",
"text",
"timestamp",
"user",
]
[docs]
def __init__(self, client: VergeClient, *, catalog_key: str | None = None) -> None:
super().__init__(client)
self._catalog_key = catalog_key
[docs]
def list(
self,
filter: str | None = None,
fields: builtins.list[str] | None = None,
limit: int | None = None,
offset: int | None = None,
catalog: str | None = None,
level: str | None = None,
**filter_kwargs: Any,
) -> builtins.list[CatalogLog]:
"""List catalog logs with optional filtering.
Args:
filter: OData filter string.
fields: List of fields to return (uses defaults if not specified).
limit: Maximum number of results.
offset: Skip this many results.
catalog: Filter by catalog key. Ignored if manager is scoped.
level: Filter by log level.
**filter_kwargs: Shorthand filter arguments.
Returns:
List of CatalogLog objects.
"""
params: dict[str, Any] = {}
# Build filter
filters: builtins.list[str] = []
if filter:
filters.append(filter)
if filter_kwargs:
filters.append(build_filter(**filter_kwargs))
# Add catalog filter (from scope or parameter)
cat_key = self._catalog_key
if cat_key is None and catalog is not None:
cat_key = catalog
if cat_key is not None:
filters.append(f"catalog eq '{cat_key}'")
# Add level filter
if level is not None:
filters.append(f"level eq '{level}'")
if filters:
params["filter"] = " and ".join(filters)
# Use default fields if not specified
if fields:
params["fields"] = ",".join(fields)
else:
params["fields"] = ",".join(self._default_fields)
# Pagination
if limit is not None:
params["limit"] = limit
if offset is not None:
params["offset"] = offset
# Default sort by timestamp descending
params["sort"] = "-timestamp"
response = self._client._request("GET", self._endpoint, params=params)
if response is None:
return []
if not isinstance(response, list):
response = [response]
return [self._to_model(item) for item in response if item]
[docs]
def get( # type: ignore[override]
self,
key: int | None = None,
*,
fields: builtins.list[str] | None = None,
) -> CatalogLog:
"""Get a single log entry by key.
Args:
key: Log entry $key (row ID).
fields: List of fields to return.
Returns:
CatalogLog object.
Raises:
NotFoundError: If log entry not found.
ValueError: If key not provided.
"""
if key is None:
raise ValueError("Key must be provided")
params: dict[str, Any] = {}
if fields:
params["fields"] = ",".join(fields)
else:
params["fields"] = ",".join(self._default_fields)
response = self._client._request("GET", f"{self._endpoint}/{key}", params=params)
if response is None:
raise NotFoundError(f"Catalog log with key {key} not found")
if not isinstance(response, dict):
raise NotFoundError(f"Catalog log with key {key} returned invalid response")
return self._to_model(response)
[docs]
def list_errors(
self,
limit: int | None = None,
) -> builtins.list[CatalogLog]:
"""List error and critical log entries.
Args:
limit: Maximum number of results.
Returns:
List of error/critical log entries.
"""
return self.list(
filter="(level eq 'error') or (level eq 'critical')",
limit=limit,
)
[docs]
def list_warnings(
self,
limit: int | None = None,
) -> builtins.list[CatalogLog]:
"""List warning log entries.
Args:
limit: Maximum number of results.
Returns:
List of warning log entries.
"""
return self.list(level="warning", limit=limit)
def _to_model(self, data: dict[str, Any]) -> CatalogLog:
"""Convert API response to CatalogLog object."""
return CatalogLog(data, self)
# =============================================================================
# Catalogs
# =============================================================================
[docs]
class Catalog(ResourceObject):
"""Catalog resource object.
Represents a catalog containing recipes.
Note:
Catalog keys are 40-character hex strings, not integers like most
other VergeOS resources.
Attributes:
key: The catalog unique identifier ($key) - 40-char hex string.
id: The catalog ID (same as $key).
repository: Parent repository key.
name: Catalog name.
description: Catalog description.
publishing_scope: Scope (private, global, tenant, none).
enabled: Whether the catalog is enabled.
created: Creation timestamp.
"""
@property
def key(self) -> str: # type: ignore[override]
"""Resource primary key ($key) - 40-character hex string.
Raises:
ValueError: If resource has no $key (not yet persisted).
"""
k = self.get("$key")
if k is None:
raise ValueError("Resource has no $key - may not be persisted")
return str(k)
[docs]
def refresh(self) -> Catalog:
"""Refresh resource data from API.
Returns:
Updated Catalog object.
"""
from typing import cast
manager = cast("CatalogManager", self._manager)
return manager.get(self.key)
[docs]
def save(self, **kwargs: Any) -> Catalog:
"""Save changes to resource.
Args:
**kwargs: Fields to update.
Returns:
Updated Catalog object.
"""
from typing import cast
manager = cast("CatalogManager", self._manager)
return manager.update(self.key, **kwargs)
[docs]
def delete(self) -> None:
"""Delete this catalog."""
from typing import cast
manager = cast("CatalogManager", self._manager)
manager.delete(self.key)
@property
def repository_key(self) -> int | None:
"""Get the parent repository key."""
repo = self.get("repository")
return int(repo) if repo is not None else None
@property
def is_enabled(self) -> bool:
"""Check if the catalog is enabled."""
return bool(self.get("enabled", True))
@property
def scope(self) -> str:
"""Get the publishing scope."""
return str(self.get("publishing_scope", "private"))
@property
def logs(self) -> CatalogLogManager:
"""Get a log manager scoped to this catalog.
Returns:
CatalogLogManager for this catalog.
"""
from typing import cast
manager = cast("CatalogManager", self._manager)
return CatalogLogManager(manager._client, catalog_key=self.key)
[docs]
class CatalogManager(ResourceManager["Catalog"]):
"""Manager for catalog operations.
Catalogs organize recipes into logical groups within repositories.
Example:
>>> # List all catalogs
>>> for catalog in client.catalogs.list():
... print(f"{catalog.name}: {catalog.description}")
>>> # List catalogs in a specific repository
>>> for catalog in client.catalogs.list(repository=1):
... print(f"{catalog.name}")
>>> # Get a specific catalog
>>> catalog = client.catalogs.get(name="VergeOS Recipes")
"""
_endpoint = "catalogs"
_default_fields = [
"$key",
"id",
"repository",
"repository#name as repository_display",
"name",
"description",
"publishing_scope",
"enabled",
"created",
]
[docs]
def __init__(self, client: VergeClient, *, repository_key: int | None = None) -> None:
super().__init__(client)
self._repository_key = repository_key
[docs]
def list(
self,
filter: str | None = None,
fields: builtins.list[str] | None = None,
limit: int | None = None,
offset: int | None = None,
repository: int | None = None,
enabled: bool | None = None,
**filter_kwargs: Any,
) -> builtins.list[Catalog]:
"""List catalogs with optional filtering.
Args:
filter: OData filter string.
fields: List of fields to return (uses defaults if not specified).
limit: Maximum number of results.
offset: Skip this many results.
repository: Filter by repository key. Ignored if manager is scoped.
enabled: Filter by enabled state.
**filter_kwargs: Shorthand filter arguments (name, etc.).
Returns:
List of Catalog objects.
Example:
>>> # List all catalogs
>>> catalogs = client.catalogs.list()
>>> # List enabled catalogs only
>>> catalogs = client.catalogs.list(enabled=True)
>>> # Filter by name
>>> catalogs = client.catalogs.list(name="VergeOS*")
"""
params: dict[str, Any] = {}
# Build filter
filters: builtins.list[str] = []
if filter:
filters.append(filter)
if filter_kwargs:
filters.append(build_filter(**filter_kwargs))
# Add repository filter (from scope or parameter)
repo_key = self._repository_key
if repo_key is None and repository is not None:
repo_key = repository
if repo_key is not None:
filters.append(f"repository eq {repo_key}")
# Add enabled filter
if enabled is not None:
filters.append(f"enabled eq {1 if enabled else 0}")
if filters:
params["filter"] = " and ".join(filters)
# Use default fields if not specified
if fields:
params["fields"] = ",".join(fields)
else:
params["fields"] = ",".join(self._default_fields)
# Pagination
if limit is not None:
params["limit"] = limit
if offset is not None:
params["offset"] = offset
response = self._client._request("GET", self._endpoint, params=params)
if response is None:
return []
if not isinstance(response, list):
response = [response]
return [self._to_model(item) for item in response if item]
[docs]
def get( # type: ignore[override]
self,
key: str | None = None,
*,
name: str | None = None,
fields: builtins.list[str] | None = None,
) -> Catalog:
"""Get a single catalog by key or name.
Args:
key: Catalog $key (40-character hex string).
name: Catalog name.
fields: List of fields to return.
Returns:
Catalog object.
Raises:
NotFoundError: If catalog not found.
ValueError: If no identifier provided.
Example:
>>> # Get by key
>>> catalog = client.catalogs.get("8f73f8bcc9c9f1aaba32f733bfc295acaf548554")
>>> # Get by name
>>> catalog = client.catalogs.get(name="VergeOS Recipes")
"""
if key is not None:
# Fetch by key using id filter
params: dict[str, Any] = {
"filter": f"id eq '{key}'",
}
if fields:
params["fields"] = ",".join(fields)
else:
params["fields"] = ",".join(self._default_fields)
response = self._client._request("GET", self._endpoint, params=params)
if response is None:
raise NotFoundError(f"Catalog with key {key} not found")
if isinstance(response, list):
if not response:
raise NotFoundError(f"Catalog with key {key} not found")
response = response[0]
if not isinstance(response, dict):
raise NotFoundError(f"Catalog with key {key} returned invalid response")
return self._to_model(response)
if name is not None:
# Search by name
escaped_name = name.replace("'", "''")
results = self.list(filter=f"name eq '{escaped_name}'", fields=fields, limit=1)
if not results:
raise NotFoundError(f"Catalog with name '{name}' not found")
return results[0]
raise ValueError("Either key or name must be provided")
[docs]
def create( # type: ignore[override]
self,
name: str,
*,
repository: int,
description: str | None = None,
publishing_scope: str = "private",
enabled: bool = True,
) -> Catalog:
"""Create a new catalog.
Args:
name: Catalog name.
repository: Parent repository key.
description: Catalog description.
publishing_scope: Scope (private, global, tenant, none).
enabled: Whether the catalog is enabled.
Returns:
Created Catalog object.
Example:
>>> catalog = client.catalogs.create(
... name="My Recipes",
... repository=1,
... description="Custom VM recipes",
... publishing_scope="private"
... )
"""
body: dict[str, Any] = {
"name": name,
"repository": repository,
"enabled": enabled,
}
if description is not None:
body["description"] = description
if publishing_scope:
body["publishing_scope"] = publishing_scope
response = self._client._request("POST", self._endpoint, json_data=body)
# Get the created catalog
if response and isinstance(response, dict):
cat_key = response.get("$key")
if cat_key:
return self.get(key=str(cat_key))
# Fallback: search by name
return self.get(name=name)
[docs]
def update( # type: ignore[override]
self,
key: str,
*,
name: str | None = None,
description: str | None = None,
publishing_scope: str | None = None,
enabled: bool | None = None,
) -> Catalog:
"""Update a catalog.
Args:
key: Catalog $key (40-character hex string).
name: New name.
description: New description.
publishing_scope: New scope.
enabled: Enable or disable.
Returns:
Updated Catalog object.
Example:
>>> client.catalogs.update(catalog.key, description="Updated description")
"""
body: dict[str, Any] = {}
if name is not None:
body["name"] = name
if description is not None:
body["description"] = description
if publishing_scope is not None:
body["publishing_scope"] = publishing_scope
if enabled is not None:
body["enabled"] = enabled
if not body:
return self.get(key)
self._client._request("PUT", f"{self._endpoint}/{key}", json_data=body)
return self.get(key)
[docs]
def delete(self, key: str) -> None: # type: ignore[override]
"""Delete a catalog.
This operation is destructive and cannot be undone.
All recipes in the catalog will be deleted.
Args:
key: Catalog $key (40-character hex string).
Raises:
NotFoundError: If catalog not found.
Example:
>>> client.catalogs.delete(catalog.key)
"""
self._client._request("DELETE", f"{self._endpoint}/{key}")
[docs]
def logs(self, key: str) -> CatalogLogManager:
"""Get a log manager scoped to a specific catalog.
Args:
key: Catalog $key (40-character hex string).
Returns:
CatalogLogManager for the catalog.
"""
return CatalogLogManager(self._client, catalog_key=key)
def _to_model(self, data: dict[str, Any]) -> Catalog:
"""Convert API response to Catalog object."""
return Catalog(data, self)
# =============================================================================
# Catalog Repositories
# =============================================================================
[docs]
class CatalogRepository(ResourceObject):
"""Catalog repository resource object.
Represents a source of catalogs and recipes.
Attributes:
key: Repository $key (row ID).
name: Repository name.
description: Repository description.
type: Repository type (local, provider, remote, remote-git, yottabyte).
url: URL for remote repositories.
user: Username for authentication.
allow_insecure: Allow insecure SSL certificates.
auto_refresh: Automatically refresh repository.
max_tier: Maximum storage tier for downloaded recipes.
override_default_scope: Override default publishing scope.
enabled: Whether the repository is enabled.
last_refreshed: Timestamp of last refresh.
"""
@property
def is_enabled(self) -> bool:
"""Check if the repository is enabled."""
return bool(self.get("enabled", True))
@property
def is_local(self) -> bool:
"""Check if this is a local repository."""
return self.get("type") == "local"
@property
def is_remote(self) -> bool:
"""Check if this is a remote repository."""
repo_type = self.get("type", "")
return repo_type in ("remote", "remote-git", "yottabyte", "provider")
@property
def repository_type(self) -> str:
"""Get the repository type."""
return str(self.get("type", "local"))
@property
def status_info(self) -> str | None:
"""Get the current status from the status object.
Note: This may require fetching status separately for full details.
"""
status = self.get("status")
if isinstance(status, dict):
return status.get("status")
return None
@property
def status(self) -> CatalogRepositoryStatusManager:
"""Get a status manager scoped to this repository.
Returns:
CatalogRepositoryStatusManager for this repository.
"""
from typing import cast
manager = cast("CatalogRepositoryManager", self._manager)
return CatalogRepositoryStatusManager(manager._client, repository_key=self.key)
@property
def catalogs(self) -> CatalogManager:
"""Get a catalog manager scoped to this repository.
Returns:
CatalogManager for this repository.
"""
from typing import cast
manager = cast("CatalogRepositoryManager", self._manager)
return CatalogManager(manager._client, repository_key=self.key)
@property
def logs(self) -> CatalogRepositoryLogManager:
"""Get a log manager scoped to this repository.
Returns:
CatalogRepositoryLogManager for this repository.
"""
from typing import cast
manager = cast("CatalogRepositoryManager", self._manager)
return CatalogRepositoryLogManager(manager._client, repository_key=self.key)
[docs]
def refresh_catalogs(self) -> dict[str, Any] | None:
"""Refresh this repository to fetch latest catalogs/recipes.
Returns:
Task information dict or None.
"""
from typing import cast
manager = cast("CatalogRepositoryManager", self._manager)
return manager.refresh(self.key)
[docs]
def get_status(self) -> CatalogRepositoryStatus:
"""Get the current status for this repository.
Returns:
CatalogRepositoryStatus object.
"""
from typing import cast
manager = cast("CatalogRepositoryManager", self._manager)
return manager.get_status(self.key)
[docs]
class CatalogRepositoryManager(ResourceManager["CatalogRepository"]):
"""Manager for catalog repository operations.
Catalog repositories define where catalogs and recipes are sourced from.
Example:
>>> # List all repositories
>>> for repo in client.catalog_repositories.list():
... print(f"{repo.name} ({repo.type})")
>>> # Get the local repository
>>> local = client.catalog_repositories.get(name="Local")
>>> # Get the Verge.io marketplace
>>> marketplace = client.catalog_repositories.get(name="Verge.io Recipes")
>>> # Refresh a repository
>>> repo.refresh()
"""
_endpoint = "catalog_repositories"
_default_fields = [
"$key",
"name",
"description",
"type",
"url",
"user",
"allow_insecure",
"auto_refresh",
"max_tier",
"override_default_scope",
"enabled",
"last_refreshed",
"status#status as status_value",
"status#state as status_state",
]
[docs]
def __init__(self, client: VergeClient) -> None:
super().__init__(client)
[docs]
def list(
self,
filter: str | None = None,
fields: builtins.list[str] | None = None,
limit: int | None = None,
offset: int | None = None,
type: str | None = None,
enabled: bool | None = None,
**filter_kwargs: Any,
) -> builtins.list[CatalogRepository]:
"""List repositories with optional filtering.
Args:
filter: OData filter string.
fields: List of fields to return (uses defaults if not specified).
limit: Maximum number of results.
offset: Skip this many results.
type: Filter by repository type.
enabled: Filter by enabled state.
**filter_kwargs: Shorthand filter arguments (name, etc.).
Returns:
List of CatalogRepository objects.
Example:
>>> # List all repositories
>>> repos = client.catalog_repositories.list()
>>> # List enabled repositories only
>>> repos = client.catalog_repositories.list(enabled=True)
>>> # List remote repositories
>>> repos = client.catalog_repositories.list(type="remote")
"""
params: dict[str, Any] = {}
# Build filter
filters: builtins.list[str] = []
if filter:
filters.append(filter)
if filter_kwargs:
filters.append(build_filter(**filter_kwargs))
# Add type filter
if type is not None:
filters.append(f"type eq '{type}'")
# Add enabled filter
if enabled is not None:
filters.append(f"enabled eq {1 if enabled else 0}")
if filters:
params["filter"] = " and ".join(filters)
# Use default fields if not specified
if fields:
params["fields"] = ",".join(fields)
else:
params["fields"] = ",".join(self._default_fields)
# Pagination
if limit is not None:
params["limit"] = limit
if offset is not None:
params["offset"] = offset
response = self._client._request("GET", self._endpoint, params=params)
if response is None:
return []
if not isinstance(response, list):
response = [response]
return [self._to_model(item) for item in response if item]
[docs]
def get(
self,
key: int | None = None,
*,
name: str | None = None,
fields: builtins.list[str] | None = None,
) -> CatalogRepository:
"""Get a single repository by key or name.
Args:
key: Repository $key (row ID).
name: Repository name.
fields: List of fields to return.
Returns:
CatalogRepository object.
Raises:
NotFoundError: If repository not found.
ValueError: If no identifier provided.
Example:
>>> # Get by key
>>> repo = client.catalog_repositories.get(1)
>>> # Get by name
>>> repo = client.catalog_repositories.get(name="Local")
"""
if key is not None:
params: dict[str, Any] = {}
if fields:
params["fields"] = ",".join(fields)
else:
params["fields"] = ",".join(self._default_fields)
response = self._client._request("GET", f"{self._endpoint}/{key}", params=params)
if response is None:
raise NotFoundError(f"Catalog repository with key {key} not found")
if not isinstance(response, dict):
raise NotFoundError(f"Catalog repository with key {key} returned invalid response")
return self._to_model(response)
if name is not None:
escaped_name = name.replace("'", "''")
results = self.list(filter=f"name eq '{escaped_name}'", fields=fields, limit=1)
if not results:
raise NotFoundError(f"Catalog repository with name '{name}' not found")
return results[0]
raise ValueError("Either key or name must be provided")
[docs]
def create( # type: ignore[override]
self,
name: str,
*,
type: str = "local",
description: str | None = None,
url: str | None = None,
user: str | None = None,
password: str | None = None,
allow_insecure: bool = False,
auto_refresh: bool = True,
max_tier: str = "1",
override_default_scope: str = "none",
enabled: bool = True,
) -> CatalogRepository:
"""Create a new catalog repository.
Args:
name: Repository name.
type: Repository type (local, remote, remote-git, yottabyte).
description: Repository description.
url: URL for remote repositories.
user: Username for authentication.
password: Password for authentication.
allow_insecure: Allow insecure SSL certificates.
auto_refresh: Automatically refresh repository.
max_tier: Maximum storage tier for downloads (1-5).
override_default_scope: Override default publishing scope.
enabled: Whether the repository is enabled.
Returns:
Created CatalogRepository object.
Example:
>>> # Create a local repository
>>> repo = client.catalog_repositories.create(
... name="My Recipes",
... type="local",
... description="Custom local recipes"
... )
>>> # Create a remote repository
>>> repo = client.catalog_repositories.create(
... name="Partner Recipes",
... type="remote",
... url="https://recipes.example.com/api/v4",
... user="api-user",
... password="api-key"
... )
"""
body: dict[str, Any] = {
"name": name,
"type": type,
"auto_refresh": auto_refresh,
"max_tier": max_tier,
"override_default_scope": override_default_scope,
"enabled": enabled,
}
if description is not None:
body["description"] = description
if url is not None:
body["url"] = url
if user is not None:
body["user"] = user
if password is not None:
body["password"] = password
if allow_insecure:
body["allow_insecure"] = allow_insecure
response = self._client._request("POST", self._endpoint, json_data=body)
# Get the created repository
if response and isinstance(response, dict):
repo_key = response.get("$key")
if repo_key:
return self.get(key=int(repo_key))
# Fallback: search by name
return self.get(name=name)
[docs]
def update( # type: ignore[override]
self,
key: int,
*,
name: str | None = None,
description: str | None = None,
url: str | None = None,
user: str | None = None,
password: str | None = None,
allow_insecure: bool | None = None,
auto_refresh: bool | None = None,
max_tier: str | None = None,
override_default_scope: str | None = None,
enabled: bool | None = None,
) -> CatalogRepository:
"""Update a catalog repository.
Args:
key: Repository $key (row ID).
name: New name.
description: New description.
url: New URL.
user: New username.
password: New password.
allow_insecure: Allow insecure SSL certificates.
auto_refresh: Automatically refresh repository.
max_tier: Maximum storage tier.
override_default_scope: Override default scope.
enabled: Enable or disable.
Returns:
Updated CatalogRepository object.
Example:
>>> client.catalog_repositories.update(
... repo.key,
... description="Updated description",
... auto_refresh=False
... )
"""
body: dict[str, Any] = {}
if name is not None:
body["name"] = name
if description is not None:
body["description"] = description
if url is not None:
body["url"] = url
if user is not None:
body["user"] = user
if password is not None:
body["password"] = password
if allow_insecure is not None:
body["allow_insecure"] = allow_insecure
if auto_refresh is not None:
body["auto_refresh"] = auto_refresh
if max_tier is not None:
body["max_tier"] = max_tier
if override_default_scope is not None:
body["override_default_scope"] = override_default_scope
if enabled is not None:
body["enabled"] = enabled
if not body:
return self.get(key)
self._client._request("PUT", f"{self._endpoint}/{key}", json_data=body)
return self.get(key)
[docs]
def delete(self, key: int) -> None:
"""Delete a catalog repository.
This operation is destructive and cannot be undone.
All catalogs and recipes in the repository will be deleted.
Note: The default "Local" repository (key=1) cannot be deleted.
Args:
key: Repository $key (row ID).
Raises:
NotFoundError: If repository not found.
APIError: If repository cannot be deleted.
Example:
>>> client.catalog_repositories.delete(repo.key)
"""
self._client._request("DELETE", f"{self._endpoint}/{key}")
[docs]
def refresh(self, key: int) -> dict[str, Any] | None:
"""Refresh a repository to fetch latest catalogs/recipes.
This triggers a refresh operation that connects to the remote
source and downloads any new or updated catalogs and recipes.
Args:
key: Repository $key (row ID).
Returns:
Task information dict or None.
Example:
>>> result = client.catalog_repositories.refresh(repo.key)
"""
# The refresh action uses the catalog_repository_actions endpoint
body: dict[str, Any] = {
"repository": key,
"action": "refresh",
}
result = self._client._request("POST", "catalog_repository_actions", json_data=body)
if isinstance(result, dict):
return result
return None
[docs]
def get_status(self, key: int) -> CatalogRepositoryStatus:
"""Get the current status for a repository.
Args:
key: Repository $key (row ID).
Returns:
CatalogRepositoryStatus object.
"""
mgr = CatalogRepositoryStatusManager(self._client)
return mgr.get_for_repository(key)
[docs]
def catalogs(self, key: int) -> CatalogManager:
"""Get a catalog manager scoped to a specific repository.
Args:
key: Repository $key (row ID).
Returns:
CatalogManager for the repository.
"""
return CatalogManager(self._client, repository_key=key)
[docs]
def logs(self, key: int) -> CatalogRepositoryLogManager:
"""Get a log manager scoped to a specific repository.
Args:
key: Repository $key (row ID).
Returns:
CatalogRepositoryLogManager for the repository.
"""
return CatalogRepositoryLogManager(self._client, repository_key=key)
def _to_model(self, data: dict[str, Any]) -> CatalogRepository:
"""Convert API response to CatalogRepository object."""
return CatalogRepository(data, self)