Source code for pyvergeos.resources.updates

"""Update management resources for VergeOS system updates.

This module provides programmatic access to VergeOS system updates, including
checking for updates, downloading, installing, and monitoring update progress.

Key concepts:
    - **Update Settings**: Main singleton configuration for update behavior
    - **Update Source**: Update server (Verge.io Updates, Trial/NFR, etc.)
    - **Update Branch**: Version branch (stable-4.13, etc.)
    - **Update Package**: Available/installed update packages
    - **Update Source Package**: Packages available from a specific source/branch
    - **Update Source Status**: Current operational status of update source
    - **Update Log**: History of update operations
    - **Update Dashboard**: Aggregated view of update status

Update workflow:
    1. Check for updates (refresh from update source)
    2. Download available packages
    3. Install downloaded packages
    4. Reboot nodes (done automatically per-node with workload migration)

Example:
    >>> # Get update settings
    >>> settings = client.update_settings.get()
    >>> print(f"Branch: {settings.branch_name}, Source: {settings.source_name}")

    >>> # Check for updates
    >>> client.update_settings.check()

    >>> # Download and install updates
    >>> client.update_settings.download()
    >>> client.update_settings.install()

    >>> # Or do everything at once
    >>> client.update_settings.update_all()

    >>> # View update logs
    >>> for log in client.update_logs.list(limit=10):
    ...     print(f"{log.level}: {log.text}")
"""

from __future__ import annotations

import builtins
from typing import TYPE_CHECKING, Any

from pyvergeos.exceptions import NotFoundError
from pyvergeos.filters import build_filter
from pyvergeos.resources.base import ResourceManager, ResourceObject

if TYPE_CHECKING:
    from pyvergeos.client import VergeClient


# =============================================================================
# Update Logs
# =============================================================================


[docs] class UpdateLog(ResourceObject): """Update log entry resource object. Represents a log entry from update operations. Attributes: key: Log entry $key (row ID). level: Log level (audit, message, warning, error, critical). text: Log message text. timestamp: Log timestamp (microseconds since epoch). user: User who triggered the operation. object_name: Name of the object related to the log entry. """ @property def is_error(self) -> bool: """Check if this is an error log entry.""" level = self.get("level", "") return level in ("error", "critical") @property def is_warning(self) -> bool: """Check if this is a warning log entry.""" return self.get("level") == "warning" @property def is_audit(self) -> bool: """Check if this is an audit log entry.""" return self.get("level") == "audit"
[docs] class UpdateLogManager(ResourceManager["UpdateLog"]): """Manager for update log operations. Update logs provide history of update operations including downloads, installs, and errors. Example: >>> # List recent update logs >>> for log in client.update_logs.list(limit=20): ... print(f"{log.level}: {log.text}") >>> # List errors only >>> errors = client.update_logs.list_errors() >>> # List warnings only >>> warnings = client.update_logs.list_warnings() """ _endpoint = "update_logs" _default_fields = [ "$key", "level", "text", "timestamp", "user", "object_name", ]
[docs] def list( # noqa: A003 self, filter: str | None = None, # noqa: A002 fields: builtins.list[str] | None = None, limit: int | None = None, offset: int | None = None, level: str | None = None, **filter_kwargs: Any, ) -> builtins.list[UpdateLog]: """List update logs with optional filtering. Args: filter: OData filter string. fields: List of fields to return (uses defaults if not specified). limit: Maximum number of results. offset: Skip this many results. level: Filter by log level (audit, message, warning, error, critical). **filter_kwargs: Shorthand filter arguments. Returns: List of UpdateLog objects, sorted by timestamp descending. """ params: dict[str, Any] = {} # Build filter filters: builtins.list[str] = [] if filter: filters.append(filter) if filter_kwargs: filters.append(build_filter(**filter_kwargs)) # Add level filter if level is not None: filters.append(f"level eq '{level}'") if filters: params["filter"] = " and ".join(filters) # Use default fields if not specified if fields: params["fields"] = ",".join(fields) else: params["fields"] = ",".join(self._default_fields) # Pagination if limit is not None: params["limit"] = limit if offset is not None: params["offset"] = offset # Default sort by timestamp descending params["sort"] = "-timestamp" response = self._client._request("GET", self._endpoint, params=params) if response is None: return [] if not isinstance(response, list): response = [response] return [self._to_model(item) for item in response if item]
[docs] def get( # type: ignore[override] self, key: int | None = None, *, fields: builtins.list[str] | None = None, ) -> UpdateLog: """Get a single log entry by key. Args: key: Log entry $key (row ID). fields: List of fields to return. Returns: UpdateLog object. Raises: NotFoundError: If log entry not found. ValueError: If key not provided. """ if key is None: raise ValueError("Key must be provided") params: dict[str, Any] = {} if fields: params["fields"] = ",".join(fields) else: params["fields"] = ",".join(self._default_fields) response = self._client._request("GET", f"{self._endpoint}/{key}", params=params) if response is None: raise NotFoundError(f"Update log with key {key} not found") if not isinstance(response, dict): raise NotFoundError(f"Update log with key {key} returned invalid response") return self._to_model(response)
[docs] def list_errors( self, limit: int | None = None, ) -> builtins.list[UpdateLog]: """List error and critical log entries. Args: limit: Maximum number of results. Returns: List of error/critical log entries. """ return self.list( filter="(level eq 'error') or (level eq 'critical')", limit=limit, )
[docs] def list_warnings( self, limit: int | None = None, ) -> builtins.list[UpdateLog]: """List warning log entries. Args: limit: Maximum number of results. Returns: List of warning log entries. """ return self.list(level="warning", limit=limit)
def _to_model(self, data: dict[str, Any]) -> UpdateLog: """Convert API response to UpdateLog object.""" return UpdateLog(data, self)
# ============================================================================= # Update Branches # =============================================================================
[docs] class UpdateBranch(ResourceObject): """Update branch resource object. Represents an update branch (version stream) like stable-4.13. Attributes: key: Branch $key (row ID). name: Branch name (e.g., 'stable-4.13'). description: Human-readable description. created: Creation timestamp. """ pass
[docs] class UpdateBranchManager(ResourceManager["UpdateBranch"]): """Manager for update branch operations. Update branches define version streams available for updates. Branches are typically read-only and managed by the update source. Example: >>> # List available branches >>> for branch in client.update_branches.list(): ... print(f"{branch.name}: {branch.description}") >>> # Get current branch from settings >>> settings = client.update_settings.get() >>> current_branch = client.update_branches.get(settings.branch) """ _endpoint = "update_branches" _default_fields = [ "$key", "name", "description", "created", ]
[docs] def list( # noqa: A003 self, filter: str | None = None, # noqa: A002 fields: builtins.list[str] | None = None, limit: int | None = None, offset: int | None = None, **filter_kwargs: Any, ) -> builtins.list[UpdateBranch]: """List update branches with optional filtering. Args: filter: OData filter string. fields: List of fields to return (uses defaults if not specified). limit: Maximum number of results. offset: Skip this many results. **filter_kwargs: Shorthand filter arguments. Returns: List of UpdateBranch objects. """ params: dict[str, Any] = {} # Build filter filters: builtins.list[str] = [] if filter: filters.append(filter) if filter_kwargs: filters.append(build_filter(**filter_kwargs)) if filters: params["filter"] = " and ".join(filters) # Use default fields if not specified if fields: params["fields"] = ",".join(fields) else: params["fields"] = ",".join(self._default_fields) # Pagination if limit is not None: params["limit"] = limit if offset is not None: params["offset"] = offset response = self._client._request("GET", self._endpoint, params=params) if response is None: return [] if not isinstance(response, list): response = [response] return [self._to_model(item) for item in response if item]
[docs] def get( self, key: int | None = None, *, name: str | None = None, fields: builtins.list[str] | None = None, ) -> UpdateBranch: """Get a single branch by key or name. Args: key: Branch $key (row ID). name: Branch name. fields: List of fields to return. Returns: UpdateBranch object. Raises: NotFoundError: If branch not found. ValueError: If no identifier provided. """ if key is not None: params: dict[str, Any] = {} if fields: params["fields"] = ",".join(fields) else: params["fields"] = ",".join(self._default_fields) response = self._client._request("GET", f"{self._endpoint}/{key}", params=params) if response is None: raise NotFoundError(f"Update branch with key {key} not found") if not isinstance(response, dict): raise NotFoundError(f"Update branch with key {key} returned invalid response") return self._to_model(response) if name is not None: escaped_name = name.replace("'", "''") results = self.list(filter=f"name eq '{escaped_name}'", fields=fields, limit=1) if not results: raise NotFoundError(f"Update branch with name '{name}' not found") return results[0] raise ValueError("Either key or name must be provided")
def _to_model(self, data: dict[str, Any]) -> UpdateBranch: """Convert API response to UpdateBranch object.""" return UpdateBranch(data, self)
# ============================================================================= # Update Source Status # =============================================================================
[docs] class UpdateSourceStatus(ResourceObject): """Update source status resource object. Represents the current operational status of an update source. Attributes: key: Status $key (row ID). source: Parent source key. status: Current status (idle, refreshing, downloading, installing, applying, error). info: Additional status information text. nodes_updated: Count of nodes that have been updated. last_update: Timestamp of last status change. """ @property def source_key(self) -> int | None: """Get the parent source key.""" source = self.get("source") return int(source) if source is not None else None @property def is_idle(self) -> bool: """Check if source is idle (not performing any operation).""" return self.get("status") == "idle" @property def is_busy(self) -> bool: """Check if source is currently busy with an operation.""" status = self.get("status", "") return status in ("refreshing", "downloading", "installing", "applying") @property def is_error(self) -> bool: """Check if source is in error state.""" return self.get("status") == "error" @property def is_refreshing(self) -> bool: """Check if source is currently checking for updates.""" return self.get("status") == "refreshing" @property def is_downloading(self) -> bool: """Check if source is currently downloading updates.""" return self.get("status") == "downloading" @property def is_installing(self) -> bool: """Check if source is currently installing updates.""" return self.get("status") == "installing" @property def is_applying(self) -> bool: """Check if source is currently applying updates (rebooting nodes).""" return self.get("status") == "applying"
[docs] class UpdateSourceStatusManager(ResourceManager["UpdateSourceStatus"]): """Manager for update source status operations. Status objects are read-only and automatically created/updated for each update source. Example: >>> # Get status for the active update source >>> settings = client.update_settings.get() >>> status = client.update_source_status.get_for_source(settings.source) >>> print(f"Status: {status.status}, Nodes updated: {status.nodes_updated}") >>> # Check if update is in progress >>> if status.is_busy: ... print("Update in progress...") """ _endpoint = "update_source_status" _default_fields = [ "$key", "source", "source#name as source_display", "status", "info", "nodes_updated", "last_update", ]
[docs] def __init__(self, client: VergeClient, *, source_key: int | None = None) -> None: super().__init__(client) self._source_key = source_key
[docs] def list( # noqa: A003 self, filter: str | None = None, # noqa: A002 fields: builtins.list[str] | None = None, limit: int | None = None, offset: int | None = None, source: int | None = None, **filter_kwargs: Any, ) -> builtins.list[UpdateSourceStatus]: """List source status records with optional filtering. Args: filter: OData filter string. fields: List of fields to return (uses defaults if not specified). limit: Maximum number of results. offset: Skip this many results. source: Filter by source key. Ignored if manager is scoped. **filter_kwargs: Shorthand filter arguments. Returns: List of UpdateSourceStatus objects. """ params: dict[str, Any] = {} # Build filter filters: builtins.list[str] = [] if filter: filters.append(filter) if filter_kwargs: filters.append(build_filter(**filter_kwargs)) # Add source filter (from scope or parameter) src_key = self._source_key if src_key is None and source is not None: src_key = source if src_key is not None: filters.append(f"source eq {src_key}") if filters: params["filter"] = " and ".join(filters) # Use default fields if not specified if fields: params["fields"] = ",".join(fields) else: params["fields"] = ",".join(self._default_fields) # Pagination if limit is not None: params["limit"] = limit if offset is not None: params["offset"] = offset response = self._client._request("GET", self._endpoint, params=params) if response is None: return [] if not isinstance(response, list): response = [response] return [self._to_model(item) for item in response if item]
[docs] def get( # type: ignore[override] self, key: int | None = None, *, fields: builtins.list[str] | None = None, ) -> UpdateSourceStatus: """Get a single status record by key. Args: key: Status $key (row ID). fields: List of fields to return. Returns: UpdateSourceStatus object. Raises: NotFoundError: If status not found. ValueError: If key not provided. """ if key is None: raise ValueError("Key must be provided") params: dict[str, Any] = {} if fields: params["fields"] = ",".join(fields) else: params["fields"] = ",".join(self._default_fields) response = self._client._request("GET", f"{self._endpoint}/{key}", params=params) if response is None: raise NotFoundError(f"Update source status with key {key} not found") if not isinstance(response, dict): raise NotFoundError(f"Update source status with key {key} returned invalid response") return self._to_model(response)
[docs] def get_for_source(self, source_key: int) -> UpdateSourceStatus: """Get status for a specific update source. Args: source_key: Update source $key. Returns: UpdateSourceStatus object. Raises: NotFoundError: If status not found. """ results = self.list(source=source_key, limit=1) if not results: raise NotFoundError(f"Status for source {source_key} not found") return results[0]
def _to_model(self, data: dict[str, Any]) -> UpdateSourceStatus: """Convert API response to UpdateSourceStatus object.""" return UpdateSourceStatus(data, self)
# ============================================================================= # Update Source Packages # =============================================================================
[docs] class UpdateSourcePackage(ResourceObject): """Update source package resource object. Represents a package available from an update source for a specific branch. Attributes: key: Package $key (row ID). name: Package name. description: Package description. version: Package version. branch: Branch key. source: Source key. downloaded: Whether the package has been downloaded. optional: Whether the package is optional. require_license_feature: License feature required for this package. created: Creation timestamp. """ @property def branch_key(self) -> int | None: """Get the branch key.""" branch = self.get("branch") return int(branch) if branch is not None else None @property def source_key(self) -> int | None: """Get the source key.""" source = self.get("source") return int(source) if source is not None else None @property def is_downloaded(self) -> bool: """Check if the package has been downloaded.""" return bool(self.get("downloaded", False)) @property def is_optional(self) -> bool: """Check if the package is optional.""" return bool(self.get("optional", False))
[docs] class UpdateSourcePackageManager(ResourceManager["UpdateSourcePackage"]): """Manager for update source package operations. Source packages represent what's available from an update source for a specific branch. Example: >>> # List all available packages >>> for pkg in client.update_source_packages.list(): ... status = "Downloaded" if pkg.is_downloaded else "Available" ... print(f"{pkg.name} ({pkg.version}): {status}") >>> # List packages for a specific source >>> packages = client.update_source_packages.list(source=1) >>> # List packages not yet downloaded >>> pending = client.update_source_packages.list(downloaded=False) """ _endpoint = "update_source_packages" _default_fields = [ "$key", "name", "description", "version", "branch", "branch#name as branch_display", "source", "source#name as source_display", "downloaded", "optional", "require_license_feature", "created", ]
[docs] def __init__( self, client: VergeClient, *, source_key: int | None = None, branch_key: int | None = None, ) -> None: super().__init__(client) self._source_key = source_key self._branch_key = branch_key
[docs] def list( # noqa: A003 self, filter: str | None = None, # noqa: A002 fields: builtins.list[str] | None = None, limit: int | None = None, offset: int | None = None, source: int | None = None, branch: int | None = None, downloaded: bool | None = None, **filter_kwargs: Any, ) -> builtins.list[UpdateSourcePackage]: """List source packages with optional filtering. Args: filter: OData filter string. fields: List of fields to return (uses defaults if not specified). limit: Maximum number of results. offset: Skip this many results. source: Filter by source key. Ignored if manager is scoped. branch: Filter by branch key. Ignored if manager is scoped. downloaded: Filter by download status. **filter_kwargs: Shorthand filter arguments. Returns: List of UpdateSourcePackage objects. """ params: dict[str, Any] = {} # Build filter filters: builtins.list[str] = [] if filter: filters.append(filter) if filter_kwargs: filters.append(build_filter(**filter_kwargs)) # Add source filter (from scope or parameter) src_key = self._source_key if src_key is None and source is not None: src_key = source if src_key is not None: filters.append(f"source eq {src_key}") # Add branch filter (from scope or parameter) br_key = self._branch_key if br_key is None and branch is not None: br_key = branch if br_key is not None: filters.append(f"branch eq {br_key}") # Add downloaded filter if downloaded is not None: filters.append(f"downloaded eq {1 if downloaded else 0}") if filters: params["filter"] = " and ".join(filters) # Use default fields if not specified if fields: params["fields"] = ",".join(fields) else: params["fields"] = ",".join(self._default_fields) # Pagination if limit is not None: params["limit"] = limit if offset is not None: params["offset"] = offset response = self._client._request("GET", self._endpoint, params=params) if response is None: return [] if not isinstance(response, list): response = [response] return [self._to_model(item) for item in response if item]
[docs] def get( self, key: int | None = None, *, name: str | None = None, fields: builtins.list[str] | None = None, ) -> UpdateSourcePackage: """Get a single source package by key or name. Args: key: Package $key (row ID). name: Package name (may return multiple, takes first). fields: List of fields to return. Returns: UpdateSourcePackage object. Raises: NotFoundError: If package not found. ValueError: If no identifier provided. """ if key is not None: params: dict[str, Any] = {} if fields: params["fields"] = ",".join(fields) else: params["fields"] = ",".join(self._default_fields) response = self._client._request("GET", f"{self._endpoint}/{key}", params=params) if response is None: raise NotFoundError(f"Update source package with key {key} not found") if not isinstance(response, dict): raise NotFoundError( f"Update source package with key {key} returned invalid response" ) return self._to_model(response) if name is not None: escaped_name = name.replace("'", "''") results = self.list(filter=f"name eq '{escaped_name}'", fields=fields, limit=1) if not results: raise NotFoundError(f"Update source package with name '{name}' not found") return results[0] raise ValueError("Either key or name must be provided")
[docs] def list_pending(self) -> builtins.list[UpdateSourcePackage]: """List packages that are available but not yet downloaded. Returns: List of packages pending download. """ return self.list(downloaded=False)
[docs] def list_downloaded(self) -> builtins.list[UpdateSourcePackage]: """List packages that have been downloaded. Returns: List of downloaded packages. """ return self.list(downloaded=True)
def _to_model(self, data: dict[str, Any]) -> UpdateSourcePackage: """Convert API response to UpdateSourcePackage object.""" return UpdateSourcePackage(data, self)
# ============================================================================= # Update Sources # =============================================================================
[docs] class UpdateSource(ResourceObject): """Update source resource object. Represents an update server (e.g., Verge.io Updates, Trial/NFR). Attributes: key: Source $key (row ID). name: Source name. description: Source description. url: Update server URL. user: Authentication username. last_updated: Timestamp of last successful update. last_refreshed: Timestamp of last refresh check. enabled: Whether the source is enabled. """ @property def is_enabled(self) -> bool: """Check if the source is enabled.""" return bool(self.get("enabled", True)) @property def status(self) -> UpdateSourceStatusManager: """Get a status manager scoped to this source. Returns: UpdateSourceStatusManager for this source. """ from typing import cast manager = cast("UpdateSourceManager", self._manager) return UpdateSourceStatusManager(manager._client, source_key=self.key) @property def packages(self) -> UpdateSourcePackageManager: """Get a package manager scoped to this source. Returns: UpdateSourcePackageManager for this source. """ from typing import cast manager = cast("UpdateSourceManager", self._manager) return UpdateSourcePackageManager(manager._client, source_key=self.key)
[docs] def get_status(self) -> UpdateSourceStatus: """Get the current status for this source. Returns: UpdateSourceStatus object. """ from typing import cast manager = cast("UpdateSourceManager", self._manager) return manager.get_status(self.key)
[docs] def refresh(self) -> dict[str, Any] | None: # type: ignore[override] """Trigger a refresh to check for updates. Returns: Task information dict or None. """ from typing import cast manager = cast("UpdateSourceManager", self._manager) return manager.action(self.key, "refresh")
[docs] class UpdateSourceManager(ResourceManager["UpdateSource"]): """Manager for update source operations. Update sources define where updates come from (Verge.io update servers). Example: >>> # List available sources >>> for source in client.update_sources.list(): ... print(f"{source.name}: {source.url}") >>> # Get the active source >>> settings = client.update_settings.get() >>> source = client.update_sources.get(settings.source) >>> # Check source status >>> status = source.get_status() >>> print(f"Status: {status.status}") """ _endpoint = "update_sources" _default_fields = [ "$key", "name", "description", "url", "user", "last_updated", "last_refreshed", "enabled", ]
[docs] def list( # noqa: A003 self, filter: str | None = None, # noqa: A002 fields: builtins.list[str] | None = None, limit: int | None = None, offset: int | None = None, enabled: bool | None = None, **filter_kwargs: Any, ) -> builtins.list[UpdateSource]: """List update sources with optional filtering. Args: filter: OData filter string. fields: List of fields to return (uses defaults if not specified). limit: Maximum number of results. offset: Skip this many results. enabled: Filter by enabled state. **filter_kwargs: Shorthand filter arguments. Returns: List of UpdateSource objects. """ params: dict[str, Any] = {} # Build filter filters: builtins.list[str] = [] if filter: filters.append(filter) if filter_kwargs: filters.append(build_filter(**filter_kwargs)) # Add enabled filter if enabled is not None: filters.append(f"enabled eq {1 if enabled else 0}") if filters: params["filter"] = " and ".join(filters) # Use default fields if not specified if fields: params["fields"] = ",".join(fields) else: params["fields"] = ",".join(self._default_fields) # Pagination if limit is not None: params["limit"] = limit if offset is not None: params["offset"] = offset response = self._client._request("GET", self._endpoint, params=params) if response is None: return [] if not isinstance(response, list): response = [response] return [self._to_model(item) for item in response if item]
[docs] def get( self, key: int | None = None, *, name: str | None = None, fields: builtins.list[str] | None = None, ) -> UpdateSource: """Get a single source by key or name. Args: key: Source $key (row ID). name: Source name. fields: List of fields to return. Returns: UpdateSource object. Raises: NotFoundError: If source not found. ValueError: If no identifier provided. """ if key is not None: params: dict[str, Any] = {} if fields: params["fields"] = ",".join(fields) else: params["fields"] = ",".join(self._default_fields) response = self._client._request("GET", f"{self._endpoint}/{key}", params=params) if response is None: raise NotFoundError(f"Update source with key {key} not found") if not isinstance(response, dict): raise NotFoundError(f"Update source with key {key} returned invalid response") return self._to_model(response) if name is not None: escaped_name = name.replace("'", "''") results = self.list(filter=f"name eq '{escaped_name}'", fields=fields, limit=1) if not results: raise NotFoundError(f"Update source with name '{name}' not found") return results[0] raise ValueError("Either key or name must be provided")
[docs] def create( # type: ignore[override] self, name: str, *, url: str, description: str | None = None, user: str | None = None, password: str | None = None, enabled: bool = True, ) -> UpdateSource: """Create a new update source. Args: name: Source name. url: Update server URL. description: Source description. user: Authentication username. password: Authentication password. enabled: Whether the source is enabled. Returns: Created UpdateSource object. """ body: dict[str, Any] = { "name": name, "url": url, "enabled": enabled, } if description is not None: body["description"] = description if user is not None: body["user"] = user if password is not None: body["password"] = password response = self._client._request("POST", self._endpoint, json_data=body) # Get the created source if response and isinstance(response, dict): src_key = response.get("$key") if src_key: return self.get(key=int(src_key)) # Fallback: search by name return self.get(name=name)
[docs] def update( # type: ignore[override] self, key: int, *, name: str | None = None, description: str | None = None, url: str | None = None, user: str | None = None, password: str | None = None, enabled: bool | None = None, ) -> UpdateSource: """Update an update source. Args: key: Source $key (row ID). name: New name. description: New description. url: New URL. user: New username. password: New password. enabled: Enable or disable. Returns: Updated UpdateSource object. """ body: dict[str, Any] = {} if name is not None: body["name"] = name if description is not None: body["description"] = description if url is not None: body["url"] = url if user is not None: body["user"] = user if password is not None: body["password"] = password if enabled is not None: body["enabled"] = enabled if not body: return self.get(key) self._client._request("PUT", f"{self._endpoint}/{key}", json_data=body) return self.get(key)
[docs] def delete(self, key: int) -> None: """Delete an update source. Args: key: Source $key (row ID). Raises: NotFoundError: If source not found. """ self._client._request("DELETE", f"{self._endpoint}/{key}")
[docs] def action(self, key: int, action_name: str, **kwargs: Any) -> dict[str, Any] | None: """Perform an action on an update source. Args: key: Source $key (row ID). action_name: Action name (refresh, download, install, apply, all). **kwargs: Additional action parameters. Returns: Task information dict or None. """ body: dict[str, Any] = { "source": key, "action": action_name, } body.update(kwargs) result = self._client._request("POST", "update_actions", json_data=body) if isinstance(result, dict): return result return None
[docs] def get_status(self, key: int) -> UpdateSourceStatus: """Get the current status for an update source. Args: key: Source $key (row ID). Returns: UpdateSourceStatus object. """ mgr = UpdateSourceStatusManager(self._client) return mgr.get_for_source(key)
[docs] def packages(self, key: int) -> UpdateSourcePackageManager: """Get a package manager scoped to a specific source. Args: key: Source $key (row ID). Returns: UpdateSourcePackageManager for the source. """ return UpdateSourcePackageManager(self._client, source_key=key)
def _to_model(self, data: dict[str, Any]) -> UpdateSource: """Convert API response to UpdateSource object.""" return UpdateSource(data, self)
# ============================================================================= # Update Packages # =============================================================================
[docs] class UpdatePackage(ResourceObject): """Update package resource object. Represents an installed or available update package. Attributes: key: Package key (name string, not integer). name: Package name. description: Package description. version: Package version. branch: Branch key. type: Package type (ybpkg). optional: Whether the package is optional. created: Creation timestamp. modified: Last modified timestamp. """ @property def key(self) -> str: # type: ignore[override] """Resource primary key ($key) - package name string. Unlike most resources, update packages use the name as the key. Raises: ValueError: If resource has no $key. """ k = self.get("$key") if k is None: k = self.get("name") if k is None: raise ValueError("Resource has no $key - may not be persisted") return str(k) @property def branch_key(self) -> int | None: """Get the branch key.""" branch = self.get("branch") return int(branch) if branch is not None else None @property def is_optional(self) -> bool: """Check if the package is optional.""" return bool(self.get("optional", False))
[docs] class UpdatePackageManager(ResourceManager["UpdatePackage"]): """Manager for update package operations. Update packages represent the actual software packages that make up a VergeOS system. Example: >>> # List all packages >>> for pkg in client.update_packages.list(): ... print(f"{pkg.name}: {pkg.version}") >>> # Get a specific package >>> pkg = client.update_packages.get("yb-system") """ _endpoint = "update_packages" _default_fields = [ "$key", "name", "description", "version", "branch", "branch#name as branch_display", "type", "optional", "created", "modified", ]
[docs] def list( # noqa: A003 self, filter: str | None = None, # noqa: A002 fields: builtins.list[str] | None = None, limit: int | None = None, offset: int | None = None, branch: int | None = None, **filter_kwargs: Any, ) -> builtins.list[UpdatePackage]: """List update packages with optional filtering. Args: filter: OData filter string. fields: List of fields to return (uses defaults if not specified). limit: Maximum number of results. offset: Skip this many results. branch: Filter by branch key. **filter_kwargs: Shorthand filter arguments. Returns: List of UpdatePackage objects. """ params: dict[str, Any] = {} # Build filter filters: builtins.list[str] = [] if filter: filters.append(filter) if filter_kwargs: filters.append(build_filter(**filter_kwargs)) # Add branch filter if branch is not None: filters.append(f"branch eq {branch}") if filters: params["filter"] = " and ".join(filters) # Use default fields if not specified if fields: params["fields"] = ",".join(fields) else: params["fields"] = ",".join(self._default_fields) # Pagination if limit is not None: params["limit"] = limit if offset is not None: params["offset"] = offset response = self._client._request("GET", self._endpoint, params=params) if response is None: return [] if not isinstance(response, list): response = [response] return [self._to_model(item) for item in response if item]
[docs] def get( # type: ignore[override] self, key: str | None = None, *, name: str | None = None, fields: builtins.list[str] | None = None, ) -> UpdatePackage: """Get a single package by key (name). Args: key: Package key (name string). name: Package name (alias for key). fields: List of fields to return. Returns: UpdatePackage object. Raises: NotFoundError: If package not found. ValueError: If no identifier provided. """ # Key and name are the same for packages pkg_name = key or name if pkg_name is None: raise ValueError("Either key or name must be provided") params: dict[str, Any] = {} if fields: params["fields"] = ",".join(fields) else: params["fields"] = ",".join(self._default_fields) response = self._client._request("GET", f"{self._endpoint}/{pkg_name}", params=params) if response is None: raise NotFoundError(f"Update package '{pkg_name}' not found") if not isinstance(response, dict): raise NotFoundError(f"Update package '{pkg_name}' returned invalid response") return self._to_model(response)
def _to_model(self, data: dict[str, Any]) -> UpdatePackage: """Convert API response to UpdatePackage object.""" return UpdatePackage(data, self)
# ============================================================================= # Update Settings # =============================================================================
[docs] class UpdateSettings(ResourceObject): """Update settings resource object. Singleton configuration for system update behavior. There is only one update settings record (key=1). Attributes: key: Settings $key (always 1). name: Settings name. source: Active update source key. branch: Selected update branch key. auto_refresh: Automatically check for updates. auto_update: Automatically install updates. auto_reboot: Automatically reboot after updates. update_time: Scheduled update time (HH:MM format). max_vsan_usage: Maximum vSAN usage percentage for automatic updates. warm_reboot: Use kexec for faster reboots. multi_cluster_update: Allow multiple clusters to update simultaneously. snapshot_cloud_on_update: Take system snapshot before updates. snapshot_cloud_expire_seconds: Snapshot expiration period. installed: Whether updates are installed and pending reboot. reboot_required: Whether a reboot is required. applying_updates: Whether updates are currently being applied. release_notes_url: URL to release notes. anonymize_statistics: Anonymize statistics sent to update server. """ @property def source_key(self) -> int | None: """Get the active source key.""" source = self.get("source") return int(source) if source is not None else None @property def source_name(self) -> str | None: """Get the active source name if available.""" source = self.get("source") if isinstance(source, dict): return source.get("name") return self.get("source_display") @property def branch_key(self) -> int | None: """Get the selected branch key.""" branch = self.get("branch") return int(branch) if branch is not None else None @property def branch_name(self) -> str | None: """Get the selected branch name if available.""" branch = self.get("branch") if isinstance(branch, dict): return branch.get("name") return self.get("branch_display") @property def is_auto_refresh(self) -> bool: """Check if auto-refresh is enabled.""" return bool(self.get("auto_refresh", True)) @property def is_auto_update(self) -> bool: """Check if auto-update is enabled.""" return bool(self.get("auto_update", False)) @property def is_installed(self) -> bool: """Check if updates are installed and pending reboot.""" return bool(self.get("installed", False)) @property def is_reboot_required(self) -> bool: """Check if a reboot is required.""" return bool(self.get("reboot_required", False)) @property def is_applying_updates(self) -> bool: """Check if updates are currently being applied.""" return bool(self.get("applying_updates", False))
[docs] def check(self) -> dict[str, Any] | None: """Check for available updates. Returns: Task information dict or None. """ from typing import cast manager = cast("UpdateSettingsManager", self._manager) return manager.check()
[docs] def download(self) -> dict[str, Any] | None: """Download available updates. Returns: Task information dict or None. """ from typing import cast manager = cast("UpdateSettingsManager", self._manager) return manager.download()
[docs] def install(self) -> dict[str, Any] | None: """Install downloaded updates. Returns: Task information dict or None. """ from typing import cast manager = cast("UpdateSettingsManager", self._manager) return manager.install()
[docs] def update_all(self, force: bool = False) -> dict[str, Any] | None: """Download, install, and reboot in one operation. Args: force: Allow unmigratable workloads to be temporarily rebooted. Returns: Task information dict or None. """ from typing import cast manager = cast("UpdateSettingsManager", self._manager) return manager.update_all(force=force)
[docs] class UpdateSettingsManager(ResourceManager["UpdateSettings"]): """Manager for update settings operations. Update settings is a singleton - there is only one record with key=1. Example: >>> # Get current settings >>> settings = client.update_settings.get() >>> print(f"Branch: {settings.branch_name}") >>> print(f"Auto-refresh: {settings.is_auto_refresh}") >>> # Check for updates >>> client.update_settings.check() >>> # Download available updates >>> client.update_settings.download() >>> # Install updates >>> client.update_settings.install() >>> # Or do everything at once >>> client.update_settings.update_all() >>> # Update settings >>> client.update_settings.update( ... auto_refresh=True, ... update_time="02:00", ... snapshot_cloud_on_update=True, ... ) """ _endpoint = "update_settings" _default_fields = [ "$key", "name", "source", "source#name as source_display", "branch", "branch#name as branch_display", "branch#description as branch_description", "auto_refresh", "auto_update", "auto_reboot", "update_time", "max_vsan_usage", "warm_reboot", "multi_cluster_update", "snapshot_cloud_on_update", "snapshot_cloud_expire_seconds", "installed", "reboot_required", "applying_updates", "applying_updates_force", "release_notes_url", "anonymize_statistics", ]
[docs] def get( # type: ignore[override] self, key: int | None = None, *, fields: builtins.list[str] | None = None, ) -> UpdateSettings: """Get update settings. Update settings is a singleton (key=1), so the key parameter is optional and ignored. Args: key: Ignored (settings is always key=1). fields: List of fields to return. Returns: UpdateSettings object. """ params: dict[str, Any] = {} if fields: params["fields"] = ",".join(fields) else: params["fields"] = ",".join(self._default_fields) response = self._client._request("GET", f"{self._endpoint}/1", params=params) if response is None: raise NotFoundError("Update settings not found") if not isinstance(response, dict): raise NotFoundError("Update settings returned invalid response") return self._to_model(response)
[docs] def update( # type: ignore[override] self, key: int | None = None, *, source: int | None = None, branch: int | None = None, user: str | None = None, password: str | None = None, auto_refresh: bool | None = None, auto_update: bool | None = None, auto_reboot: bool | None = None, update_time: str | None = None, max_vsan_usage: int | None = None, warm_reboot: bool | None = None, multi_cluster_update: bool | None = None, snapshot_cloud_on_update: bool | None = None, snapshot_cloud_expire_seconds: int | None = None, anonymize_statistics: bool | None = None, ) -> UpdateSettings: """Update the update settings. Args: key: Ignored (settings is always key=1). source: Update source key. branch: Update branch key. user: Update server username. password: Update server password. auto_refresh: Automatically check for updates. auto_update: Automatically install updates. auto_reboot: Automatically reboot after updates. update_time: Scheduled update time (HH:MM format). max_vsan_usage: Maximum vSAN usage percentage (10-100). warm_reboot: Use kexec for faster reboots. multi_cluster_update: Allow multiple clusters to update simultaneously. snapshot_cloud_on_update: Take system snapshot before updates. snapshot_cloud_expire_seconds: Snapshot expiration period in seconds. anonymize_statistics: Anonymize statistics sent to update server. Returns: Updated UpdateSettings object. """ body: dict[str, Any] = {} if source is not None: body["source"] = source if branch is not None: body["branch"] = branch if user is not None: body["user"] = user if password is not None: body["password"] = password if auto_refresh is not None: body["auto_refresh"] = auto_refresh if auto_update is not None: body["auto_update"] = auto_update if auto_reboot is not None: body["auto_reboot"] = auto_reboot if update_time is not None: body["update_time"] = update_time if max_vsan_usage is not None: body["max_vsan_usage"] = max_vsan_usage if warm_reboot is not None: body["warm_reboot"] = warm_reboot if multi_cluster_update is not None: body["multi_cluster_update"] = multi_cluster_update if snapshot_cloud_on_update is not None: body["snapshot_cloud_on_update"] = snapshot_cloud_on_update if snapshot_cloud_expire_seconds is not None: body["snapshot_cloud_expire_seconds"] = snapshot_cloud_expire_seconds if anonymize_statistics is not None: body["anonymize_statistics"] = anonymize_statistics if not body: return self.get() self._client._request("PUT", f"{self._endpoint}/1", json_data=body) return self.get()
[docs] def check(self) -> dict[str, Any] | None: """Check for available updates. Triggers a refresh to check the update source for new packages. Returns: Task information dict or None. Example: >>> result = client.update_settings.check() >>> if result: ... task = client.tasks.wait(result.get("task")) """ return self._action("check")
[docs] def download(self) -> dict[str, Any] | None: """Download available updates. Downloads packages that are available from the update source. Returns: Task information dict or None. Example: >>> result = client.update_settings.download() >>> if result: ... task = client.tasks.wait(result.get("task")) """ return self._action("download")
[docs] def install(self) -> dict[str, Any] | None: """Install downloaded updates. Installs packages that have been downloaded. This prepares them for application during node reboots. Returns: Task information dict or None. Example: >>> result = client.update_settings.install() >>> if result: ... task = client.tasks.wait(result.get("task")) """ return self._action("install")
[docs] def update_all(self, force: bool = False) -> dict[str, Any] | None: """Download, install, and reboot in one operation. This is the recommended way to apply updates. It will: 1. Download available packages 2. Install packages 3. Reboot nodes one at a time with workload migration Args: force: Allow unmigratable workloads to be temporarily rebooted. Use this if there are VMs that cannot be migrated (e.g., VMs with GPU passthrough). Returns: Task information dict or None. Example: >>> # Normal update >>> result = client.update_settings.update_all() >>> # Force update (allows non-migratable workloads to reboot) >>> result = client.update_settings.update_all(force=True) """ return self._action("all", force=force)
def _action(self, action: str, **kwargs: Any) -> dict[str, Any] | None: """Perform an action on update settings. Delegates to the update_actions endpoint with the configured source. Args: action: Action name (check, download, install, all). **kwargs: Additional action parameters. Returns: Task/action response dict or None. """ # Map settings-level action names to API action names action_map: dict[str, str] = {"check": "refresh"} api_action = action_map.get(action, action) # Get the configured source key from settings settings = self.get() source_key = settings.source_key if source_key is None: raise ValueError("No update source configured in settings") body: dict[str, Any] = {"source": source_key, "action": api_action} body.update(kwargs) result = self._client._request("POST", "update_actions", json_data=body) if isinstance(result, dict): return result return None def _to_model(self, data: dict[str, Any]) -> UpdateSettings: """Convert API response to UpdateSettings object.""" return UpdateSettings(data, self)
# ============================================================================= # Update Dashboard # =============================================================================
[docs] class UpdateDashboard(ResourceObject): """Update dashboard resource object. Provides an aggregated view of update status including packages, branches, settings, and logs. Attributes: logs: Recent update logs. packages: Available packages with status. branches: Available branches. settings: Current update settings. node_count: Number of physical nodes. counts: Event and task counts. """ @property def node_count(self) -> int: """Get the number of physical nodes.""" nc = self.get("node_count") if isinstance(nc, dict): return int(nc.get("$count", 0)) return int(nc) if nc is not None else 0 @property def event_count(self) -> int: """Get the number of update-related events.""" counts = self.get("counts", {}) if isinstance(counts, dict): return int(counts.get("event_count", 0)) return 0 @property def task_count(self) -> int: """Get the number of active update tasks.""" counts = self.get("counts", {}) if isinstance(counts, dict): return int(counts.get("task_count", 0)) return 0
[docs] def get_settings(self) -> dict[str, Any]: """Get the settings portion of the dashboard. Returns: Settings dict with source, branch, and status info. """ settings = self.get("settings", {}) return settings if isinstance(settings, dict) else {}
[docs] def get_packages(self) -> builtins.list[dict[str, Any]]: """Get the packages list from the dashboard. Returns: List of package dicts with version and status info. """ packages = self.get("packages", []) return packages if isinstance(packages, list) else []
[docs] def get_branches(self) -> builtins.list[dict[str, Any]]: """Get the available branches from the dashboard. Returns: List of branch dicts. """ branches = self.get("branches", []) return branches if isinstance(branches, list) else []
[docs] def get_logs(self) -> builtins.list[dict[str, Any]]: """Get recent logs from the dashboard. Returns: List of log entry dicts. """ logs = self.get("logs", []) return logs if isinstance(logs, list) else []
[docs] class UpdateDashboardManager(ResourceManager["UpdateDashboard"]): """Manager for update dashboard operations. The update dashboard provides an aggregated view of the update system including packages, branches, settings, and logs. Example: >>> # Get the dashboard >>> dashboard = client.update_dashboard.get() >>> # Check node count >>> print(f"Nodes: {dashboard.node_count}") >>> # Get settings summary >>> settings = dashboard.get_settings() >>> print(f"Branch: {settings.get('branch#name')}") >>> # List packages >>> for pkg in dashboard.get_packages(): ... print(f"{pkg.get('name')}: {pkg.get('version')}") """ _endpoint = "update_dashboard"
[docs] def get( # type: ignore[override] self, *, fields: builtins.list[str] | None = None, ) -> UpdateDashboard: """Get the update dashboard. Args: fields: List of fields to return (optional). Returns: UpdateDashboard object with aggregated update information. """ params: dict[str, Any] = {} if fields: params["fields"] = ",".join(fields) response = self._client._request("GET", self._endpoint, params=params) if response is None: raise NotFoundError("Update dashboard not found") if not isinstance(response, dict): raise NotFoundError("Update dashboard returned invalid response") return self._to_model(response)
def _to_model(self, data: dict[str, Any]) -> UpdateDashboard: """Convert API response to UpdateDashboard object.""" return UpdateDashboard(data, self)