Source code for pyvergeos.resources.system

"""System management for VergeOS - settings, statistics, licenses, diagnostics, and inventory."""

from __future__ import annotations

import builtins
import logging
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Any

from pyvergeos.constants import POLL_INTERVAL, TASK_WAIT_TIMEOUT
from pyvergeos.resources.base import ResourceManager, ResourceObject

if TYPE_CHECKING:
    from pyvergeos.client import VergeClient

logger = logging.getLogger(__name__)


# Diagnostic status values
DIAG_STATUS_INITIALIZING = "initializing"
DIAG_STATUS_BUILDING = "building"
DIAG_STATUS_UPLOADING = "uploading"
DIAG_STATUS_COMPLETE = "complete"
DIAG_STATUS_ERROR = "error"

DIAG_STATUS_DISPLAY = {
    DIAG_STATUS_INITIALIZING: "Initializing",
    DIAG_STATUS_BUILDING: "Building",
    DIAG_STATUS_UPLOADING: "Sending to Support",
    DIAG_STATUS_COMPLETE: "Complete",
    DIAG_STATUS_ERROR: "Error",
}


# =============================================================================
# System Settings
# =============================================================================


[docs] class SystemSetting(ResourceObject): """Represents a system setting in VergeOS. System settings are key-value pairs that control various aspects of VergeOS behavior. """ @property def key(self) -> str: # type: ignore[override] """Setting key (unique identifier).""" return str(self.get("key", "")) @property def value(self) -> str | None: """Current setting value.""" return self.get("value") @property def default_value(self) -> str | None: """Default setting value.""" return self.get("default_value") @property def description(self) -> str: """Setting description.""" return str(self.get("description", "")) @property def is_modified(self) -> bool: """Whether the setting has been modified from default.""" return self.value != self.default_value def __repr__(self) -> str: modified = " (modified)" if self.is_modified else "" return f"<SystemSetting {self.key}={self.value!r}{modified}>"
[docs] class SettingsManager(ResourceManager[SystemSetting]): """Manages system settings in VergeOS. Settings control various aspects of VergeOS behavior including connection limits, API rate limits, UI settings, and more. Example: >>> # List all settings >>> for setting in client.settings.list(): ... print(f"{setting.key}: {setting.value}") >>> # Get a specific setting >>> setting = client.settings.get("max_connections") >>> print(f"Max connections: {setting.value}") >>> # Find modified settings >>> modified = [s for s in client.settings.list() if s.is_modified] """ _endpoint = "settings" def _to_model(self, data: dict[str, Any]) -> SystemSetting: return SystemSetting(data, self)
[docs] def list( # type: ignore[override] # noqa: A003 self, filter: str | None = None, # noqa: A002 fields: builtins.list[str] | None = None, key_contains: str | None = None, **filter_kwargs: Any, ) -> builtins.list[SystemSetting]: """List system settings. Args: filter: OData filter string. fields: List of fields to return. key_contains: Filter settings where key contains this string. **filter_kwargs: Additional filter arguments. Returns: List of SystemSetting objects. Example: >>> # List all settings >>> settings = client.settings.list() >>> # List UI-related settings >>> ui_settings = client.settings.list(key_contains="ui_") """ # Use "all" to get all available fields by default if fields is None: fields = ["all"] filters = [] if filter: filters.append(filter) if key_contains: filters.append(f"key ct '{key_contains}'") combined_filter = " and ".join(filters) if filters else None return super().list(filter=combined_filter, fields=fields, **filter_kwargs)
[docs] def get( # type: ignore[override] self, key: str | None = None, *, fields: builtins.list[str] | None = None, ) -> SystemSetting: """Get a system setting by key. Args: key: Setting key name (e.g., "max_connections"). fields: List of fields to return. Returns: SystemSetting object. Raises: NotFoundError: If setting not found. ValueError: If key not provided. Example: >>> setting = client.settings.get("max_connections") >>> print(f"Value: {setting.value}, Default: {setting.default_value}") """ if key is None: raise ValueError("Setting key must be provided") # Use "all" to get all available fields by default if fields is None: fields = ["all"] # Settings uses 'key' as the keyfield, not $key params: dict[str, Any] = { "filter": f"key eq '{key}'", "fields": ",".join(fields), } response = self._client._request("GET", self._endpoint, params=params) if not response: from pyvergeos.exceptions import NotFoundError raise NotFoundError(f"Setting '{key}' not found") results = response if isinstance(response, list) else [response] if not results: from pyvergeos.exceptions import NotFoundError raise NotFoundError(f"Setting '{key}' not found") return self._to_model(results[0])
[docs] def update( # type: ignore[override] self, key: str, value: str, ) -> SystemSetting: """Update a system setting value. Args: key: Setting key name (e.g., "max_connections"). value: New value for the setting. Returns: Updated SystemSetting object. Raises: NotFoundError: If setting not found. ValueError: If key not provided. Example: >>> setting = client.system.settings.update("max_connections", "1000") >>> print(f"New value: {setting.value}") """ if not key: raise ValueError("Setting key must be provided") # Settings API uses the key as the identifier in the URL body = {"value": value} # First, we need to get the setting to find its row key # Settings uses 'key' as the keyfield, so we filter by it params: dict[str, Any] = { "filter": f"key eq '{key}'", "fields": "all", } response = self._client._request("GET", self._endpoint, params=params) if not response: from pyvergeos.exceptions import NotFoundError raise NotFoundError(f"Setting '{key}' not found") results = response if isinstance(response, list) else [response] if not results: from pyvergeos.exceptions import NotFoundError raise NotFoundError(f"Setting '{key}' not found") # Use the row identifier for PUT row_key = results[0].get("$key") or results[0].get("key") # For settings, we PUT with the key value in the body body["key"] = key self._client._request("PUT", f"{self._endpoint}/{row_key}", json_data=body) # Fetch and return the updated setting return self.get(key)
[docs] def reset(self, key: str) -> SystemSetting: """Reset a system setting to its default value. Args: key: Setting key name. Returns: Updated SystemSetting object with default value. Raises: NotFoundError: If setting not found. Example: >>> setting = client.system.settings.reset("max_connections") >>> print(f"Reset to: {setting.value}") """ # Get the current setting to find the default value current = self.get(key) if current.default_value is not None: return self.update(key, current.default_value) return current
[docs] def list_modified(self) -> builtins.list[SystemSetting]: """List only settings that have been modified from defaults. Returns: List of modified SystemSetting objects. Example: >>> for setting in client.system.settings.list_modified(): ... print(f"{setting.key}: {setting.value} (default: {setting.default_value})") """ return [s for s in self.list() if s.is_modified]
# ============================================================================= # Licenses # =============================================================================
[docs] class License(ResourceObject): """Represents a license in VergeOS. Licenses control feature availability and system capabilities. """ @property def name(self) -> str: """License name.""" return str(self.get("name", "")) @property def description(self) -> str: """License description.""" return str(self.get("description", "")) @property def features(self) -> dict[str, Any] | None: """License features as a dictionary.""" features = self.get("features") if features is None: return None if isinstance(features, dict): return features # If it's a string (JSON), try to parse it if isinstance(features, str): import json try: return json.loads(features) # type: ignore[no-any-return] except json.JSONDecodeError: return None return None @property def is_valid(self) -> bool: """Whether the license is currently valid.""" now = datetime.now(timezone.utc).timestamp() valid_from = self.get("valid_from") if valid_from and now < int(valid_from): return False valid_until = self.get("valid_until") return not (valid_until and now > int(valid_until)) @property def valid_from(self) -> datetime | None: """License validity start date.""" ts = self.get("valid_from") if ts: return datetime.fromtimestamp(int(ts), tz=timezone.utc) return None @property def valid_until(self) -> datetime | None: """License validity end date.""" ts = self.get("valid_until") if ts: return datetime.fromtimestamp(int(ts), tz=timezone.utc) return None @property def issued(self) -> datetime | None: """When the license was issued.""" ts = self.get("issued") if ts: return datetime.fromtimestamp(int(ts), tz=timezone.utc) return None @property def added(self) -> datetime | None: """When the license was added to the system.""" ts = self.get("added") if ts: return datetime.fromtimestamp(int(ts), tz=timezone.utc) return None @property def added_by(self) -> str: """User who added the license.""" return str(self.get("added_by", "")) @property def allow_branding(self) -> bool: """Whether branding is allowed.""" return bool(self.get("allow_branding", False)) @property def auto_renewal(self) -> bool: """Whether auto-renewal is enabled.""" return bool(self.get("auto_renewal", False)) @property def note(self) -> str: """License note.""" return str(self.get("note", "")) def __repr__(self) -> str: status = "valid" if self.is_valid else "invalid" return f"<License {self.name!r} ({status})>"
[docs] class LicenseManager(ResourceManager[License]): """Manages licenses in VergeOS. Licenses control which features are available and the capabilities of the VergeOS system. Example: >>> # List all licenses >>> for lic in client.licenses.list(): ... print(f"{lic.name}: {'valid' if lic.is_valid else 'invalid'}") >>> # Get license details >>> lic = client.licenses.get(name="Production") >>> print(f"Valid until: {lic.valid_until}") """ _endpoint = "licenses" def _to_model(self, data: dict[str, Any]) -> License: return License(data, self)
[docs] def list( # type: ignore[override] # noqa: A003 self, filter: str | None = None, # noqa: A002 fields: builtins.list[str] | None = None, name: str | None = None, **filter_kwargs: Any, ) -> builtins.list[License]: """List licenses. Args: filter: OData filter string. fields: List of fields to return. name: Filter by license name (supports wildcards). **filter_kwargs: Additional filter arguments. Returns: List of License objects. Example: >>> # List all licenses >>> licenses = client.licenses.list() >>> # Filter by name >>> prod_licenses = client.licenses.list(name="Production") """ if fields is None: fields = [ "$key", "name", "description", "added", "added_by", "issued", "valid_from", "valid_until", "features", "allow_branding", "auto_renewal", "note", ] filters = [] if filter: filters.append(filter) if name: filters.append(f"name eq '{name}'") combined_filter = " and ".join(filters) if filters else None return super().list(filter=combined_filter, fields=fields, **filter_kwargs)
[docs] def get( self, key: int | None = None, *, name: str | None = None, fields: builtins.list[str] | None = None, ) -> License: """Get a license by key or name. Args: key: License $key. name: License name - alternative to key. fields: List of fields to return. Returns: License object. Raises: NotFoundError: If license not found. ValueError: If neither key nor name provided. Example: >>> lic = client.licenses.get(name="Production") >>> print(f"Features: {lic.features}") """ # Use default fields if not specified if fields is None: fields = [ "$key", "name", "description", "added", "added_by", "issued", "valid_from", "valid_until", "features", "allow_branding", "auto_renewal", "note", ] if key is not None: return super().get(key, fields=fields) if name is not None: results = self.list(name=name, fields=fields) if not results: from pyvergeos.exceptions import NotFoundError raise NotFoundError(f"License '{name}' not found") return results[0] raise ValueError("Either key or name must be provided")
[docs] def generate_payload(self) -> str: """Generate a license request payload for air-gapped systems. For systems without internet connectivity, this generates a payload that can be sent to Verge.io support to obtain a license file. Returns: License request payload as a string. Raises: APIError: If payload generation fails. Example: >>> payload = client.system.licenses.generate_payload() >>> # Save payload to a file and send to support >>> with open("license_request.txt", "w") as f: ... f.write(payload) """ response = self._client._request( "POST", "license_actions", json_data={"action": "generate"}, ) if response is None: from pyvergeos.exceptions import APIError raise APIError("License payload generation returned no response") # Response may contain the payload directly or wrapped if isinstance(response, dict): # Try common response fields payload = response.get("payload") or response.get("result") or response.get("data") if payload: return str(payload) # If no specific field, return the whole response as JSON import json return json.dumps(response) return str(response)
[docs] def add(self, license_text: str) -> License: """Add a new license to the system. Args: license_text: The license text/key provided by Verge.io. Returns: The newly added License object. Raises: ValidationError: If the license is invalid. ConflictError: If the license already exists. Example: >>> license_data = open("license.txt").read() >>> lic = client.system.licenses.add(license_data) >>> print(f"Added license: {lic.name}") """ response = self._client._request( "POST", self._endpoint, json_data={"license": license_text}, ) if response is None: from pyvergeos.exceptions import APIError raise APIError("License add returned no response") if isinstance(response, dict): key = response.get("$key") if key is not None: return self.get(int(key)) return self._to_model(response) from pyvergeos.exceptions import APIError raise APIError("Unexpected response format from license add")
# ============================================================================= # System Statistics (Dashboard) # =============================================================================
[docs] class SystemStatistics: """System dashboard statistics. Contains counts and status information for all major resource types. """
[docs] def __init__(self, data: dict[str, Any]) -> None: self._data = data
def _get_count(self, key: str) -> int: """Safely extract a count value.""" value = self._data.get(key) if value is None: return 0 if isinstance(value, int): return value if isinstance(value, dict): # Handle objects like {"$count": 0} or {"instances_total": 0} if "$count" in value: return int(value["$count"]) if "instances_total" in value: return int(value["instances_total"]) return 0 # VM Statistics @property def vms_total(self) -> int: """Total number of VMs.""" return self._get_count("machines_count") @property def vms_online(self) -> int: """Number of online VMs.""" return self._get_count("machines_online") @property def vms_warning(self) -> int: """Number of VMs with warnings.""" return self._get_count("machines_warn") @property def vms_error(self) -> int: """Number of VMs with errors.""" return self._get_count("machines_error") # Tenant Statistics @property def tenants_total(self) -> int: """Total number of tenants.""" return self._get_count("tenants_count") @property def tenants_online(self) -> int: """Number of online tenants.""" return self._get_count("tenants_online") @property def tenants_warning(self) -> int: """Number of tenants with warnings.""" return self._get_count("tenants_warn") @property def tenants_error(self) -> int: """Number of tenants with errors.""" return self._get_count("tenants_error") # Network Statistics @property def networks_total(self) -> int: """Total number of networks.""" return self._get_count("vnets_count") @property def networks_online(self) -> int: """Number of online networks.""" return self._get_count("vnets_online") @property def networks_warning(self) -> int: """Number of networks with warnings.""" return self._get_count("vnets_warn") @property def networks_error(self) -> int: """Number of networks with errors.""" return self._get_count("vnets_error") # Node Statistics @property def nodes_total(self) -> int: """Total number of nodes.""" return self._get_count("nodes_count") @property def nodes_online(self) -> int: """Number of online nodes.""" return self._get_count("nodes_online") @property def nodes_warning(self) -> int: """Number of nodes with warnings.""" return self._get_count("nodes_warn") @property def nodes_error(self) -> int: """Number of nodes with errors.""" return self._get_count("nodes_error") # Cluster Statistics @property def clusters_total(self) -> int: """Total number of clusters.""" return self._get_count("clusters_count") @property def clusters_online(self) -> int: """Number of online clusters.""" return self._get_count("clusters_online") @property def clusters_warning(self) -> int: """Number of clusters with warnings.""" return self._get_count("clusters_warn") @property def clusters_error(self) -> int: """Number of clusters with errors.""" return self._get_count("clusters_error") # Storage Statistics @property def storage_tiers_total(self) -> int: """Total number of storage tiers.""" return self._get_count("storage_tiers_count") @property def cluster_tiers_total(self) -> int: """Total number of cluster tiers.""" return self._get_count("cluster_tiers_count") @property def cluster_tiers_online(self) -> int: """Number of online cluster tiers.""" return self._get_count("cluster_tiers_online") @property def cluster_tiers_warning(self) -> int: """Number of cluster tiers with warnings.""" return self._get_count("cluster_tiers_warn") @property def cluster_tiers_error(self) -> int: """Number of cluster tiers with errors.""" return self._get_count("cluster_tiers_error") # User and Group Statistics @property def users_total(self) -> int: """Total number of users.""" return self._get_count("users_count") @property def users_enabled(self) -> int: """Number of enabled users.""" return self._get_count("users_online") @property def groups_total(self) -> int: """Total number of groups.""" return self._get_count("groups_count") @property def groups_enabled(self) -> int: """Number of enabled groups.""" return self._get_count("groups_online") # Site Statistics @property def sites_total(self) -> int: """Total number of sites.""" return self._get_count("sites_count") @property def sites_online(self) -> int: """Number of online sites.""" return self._get_count("sites_online") @property def sites_warning(self) -> int: """Number of sites with warnings.""" return self._get_count("sites_warn") @property def sites_error(self) -> int: """Number of sites with errors.""" return self._get_count("sites_error") # Repository Statistics @property def repositories_total(self) -> int: """Total number of repositories.""" return self._get_count("repos_count") @property def repositories_online(self) -> int: """Number of online repositories.""" return self._get_count("repos_online") @property def repositories_warning(self) -> int: """Number of repositories with warnings.""" return self._get_count("repos_warn") @property def repositories_error(self) -> int: """Number of repositories with errors.""" return self._get_count("repos_error") # Alarm Statistics @property def alarms_total(self) -> int: """Total number of active alarms.""" return self._get_count("alarms_count") @property def alarms_warning(self) -> int: """Number of warning alarms.""" return self._get_count("alarms_warning") @property def alarms_error(self) -> int: """Number of error/critical alarms.""" return self._get_count("alarms_error") # Resource Instance Statistics @property def resource_instance_count(self) -> int: """Current resource instance count.""" return self._get_count("resource_instance_count") @property def resource_instance_max(self) -> int: """Maximum resource instances.""" return self._get_count("resource_instance_max")
[docs] def to_dict(self) -> dict[str, Any]: """Return statistics as a dictionary.""" return { "vms": { "total": self.vms_total, "online": self.vms_online, "warning": self.vms_warning, "error": self.vms_error, }, "tenants": { "total": self.tenants_total, "online": self.tenants_online, "warning": self.tenants_warning, "error": self.tenants_error, }, "networks": { "total": self.networks_total, "online": self.networks_online, "warning": self.networks_warning, "error": self.networks_error, }, "nodes": { "total": self.nodes_total, "online": self.nodes_online, "warning": self.nodes_warning, "error": self.nodes_error, }, "clusters": { "total": self.clusters_total, "online": self.clusters_online, "warning": self.clusters_warning, "error": self.clusters_error, }, "storage_tiers": { "total": self.storage_tiers_total, }, "cluster_tiers": { "total": self.cluster_tiers_total, "online": self.cluster_tiers_online, "warning": self.cluster_tiers_warning, "error": self.cluster_tiers_error, }, "users": { "total": self.users_total, "enabled": self.users_enabled, }, "groups": { "total": self.groups_total, "enabled": self.groups_enabled, }, "sites": { "total": self.sites_total, "online": self.sites_online, "warning": self.sites_warning, "error": self.sites_error, }, "repositories": { "total": self.repositories_total, "online": self.repositories_online, "warning": self.repositories_warning, "error": self.repositories_error, }, "alarms": { "total": self.alarms_total, "warning": self.alarms_warning, "error": self.alarms_error, }, "resource_instances": { "count": self.resource_instance_count, "max": self.resource_instance_max, }, }
def __repr__(self) -> str: return ( f"<SystemStatistics " f"VMs={self.vms_online}/{self.vms_total}, " f"Nodes={self.nodes_online}/{self.nodes_total}, " f"Alarms={self.alarms_total}>" )
# ============================================================================= # System Inventory # =============================================================================
[docs] class InventoryVM: """VM inventory item."""
[docs] def __init__(self, data: dict[str, Any]) -> None: self._data = data
@property def key(self) -> int: """Unique identifier for the VM.""" return int(self._data.get("$key", 0)) @property def name(self) -> str: """VM name.""" return str(self._data.get("name", "")) @property def description(self) -> str: """VM description.""" return str(self._data.get("description", "")) @property def power_state(self) -> str: """Current power state (e.g., 'running', 'stopped').""" return str(self._data.get("power_state", "")) @property def cpu_cores(self) -> int: """Number of CPU cores allocated to the VM.""" return int(self._data.get("cpu_cores", 0)) @property def ram_mb(self) -> int: """RAM allocated to the VM in megabytes.""" return int(self._data.get("ram", 0)) @property def ram_gb(self) -> float: """RAM allocated to the VM in gigabytes.""" return round(self.ram_mb / 1024, 1) @property def os_family(self) -> str: """Operating system family (e.g., 'linux', 'windows').""" return str(self._data.get("os_family", "")) @property def cluster(self) -> str: """Name of the cluster hosting this VM.""" return str(self._data.get("cluster_name", "")) @property def node(self) -> str: """Name of the node hosting this VM.""" return str(self._data.get("node_name", ""))
[docs] class InventoryNetwork: """Network inventory item."""
[docs] def __init__(self, data: dict[str, Any]) -> None: self._data = data
@property def key(self) -> int: """Unique identifier for the network.""" return int(self._data.get("$key", 0)) @property def name(self) -> str: """Network name.""" return str(self._data.get("name", "")) @property def description(self) -> str: """Network description.""" return str(self._data.get("description", "")) @property def network_type(self) -> str: """Type of network (e.g., 'internal', 'external').""" return str(self._data.get("type", "")) @property def power_state(self) -> str: """Current power state (e.g., 'running', 'stopped').""" return str(self._data.get("power_state", "")) @property def network_address(self) -> str: """Network address in CIDR notation.""" return str(self._data.get("network", "")) @property def ip_address(self) -> str: """IP address assigned to the network interface.""" return str(self._data.get("ip", ""))
[docs] class InventoryStorageTier: """Storage tier inventory item."""
[docs] def __init__(self, data: dict[str, Any]) -> None: self._data = data
@property def tier(self) -> int: """Storage tier number.""" return int(self._data.get("tier", 0)) @property def description(self) -> str: """Storage tier description.""" return str(self._data.get("description", "")) @property def capacity_bytes(self) -> int: """Total capacity of the storage tier in bytes.""" return int(self._data.get("capacity", 0)) @property def capacity_gb(self) -> float: """Total capacity of the storage tier in gigabytes.""" return round(self.capacity_bytes / 1073741824, 2) @property def used_bytes(self) -> int: """Used space in the storage tier in bytes.""" return int(self._data.get("used", 0)) @property def used_gb(self) -> float: """Used space in the storage tier in gigabytes.""" return round(self.used_bytes / 1073741824, 2) @property def used_percent(self) -> float: """Percentage of storage tier capacity currently in use.""" if self.capacity_bytes > 0: return round((self.used_bytes / self.capacity_bytes) * 100, 1) return 0.0
[docs] class InventoryNode: """Node inventory item."""
[docs] def __init__(self, data: dict[str, Any]) -> None: self._data = data
@property def key(self) -> int: """Unique identifier for the node.""" return int(self._data.get("$key", 0)) @property def name(self) -> str: """Node name.""" return str(self._data.get("name", "")) @property def status(self) -> str: """Current node status.""" return str(self._data.get("status_display", "")) @property def cluster(self) -> str: """Name of the cluster this node belongs to.""" return str(self._data.get("cluster_name", "")) @property def cores(self) -> int: """Number of CPU cores available on the node.""" return int(self._data.get("cores", 0)) @property def ram_mb(self) -> int: """Total RAM on the node in megabytes.""" return int(self._data.get("ram", 0)) @property def ram_gb(self) -> float: """Total RAM on the node in gigabytes.""" return round(self.ram_mb / 1024, 1)
[docs] class InventoryCluster: """Cluster inventory item."""
[docs] def __init__(self, data: dict[str, Any]) -> None: self._data = data
@property def key(self) -> int: """Unique identifier for the cluster.""" return int(self._data.get("$key", 0)) @property def name(self) -> str: """Cluster name.""" return str(self._data.get("name", "")) @property def description(self) -> str: """Cluster description.""" return str(self._data.get("description", "")) @property def status(self) -> str: """Current cluster status.""" return str(self._data.get("status_display", "")) @property def total_nodes(self) -> int: """Total number of nodes in the cluster.""" return int(self._data.get("total_nodes", 0)) @property def online_nodes(self) -> int: """Number of nodes currently online in the cluster.""" return int(self._data.get("online_nodes", 0))
[docs] class InventoryTenant: """Tenant inventory item."""
[docs] def __init__(self, data: dict[str, Any]) -> None: self._data = data
@property def key(self) -> int: """Unique identifier for the tenant.""" return int(self._data.get("$key", 0)) @property def name(self) -> str: """Tenant name.""" return str(self._data.get("name", "")) @property def description(self) -> str: """Tenant description.""" return str(self._data.get("description", "")) @property def status(self) -> str: """Current tenant status.""" return str(self._data.get("status_display", "")) @property def is_running(self) -> bool: """Whether the tenant is currently running.""" return bool(self._data.get("is_running", False))
[docs] class SystemInventory: """System inventory containing all resource types. Similar to RVtools for VMware, this provides a comprehensive view of all VergeOS resources. """
[docs] def __init__( self, vms: builtins.list[InventoryVM], networks: builtins.list[InventoryNetwork], storage: builtins.list[InventoryStorageTier], nodes: builtins.list[InventoryNode], clusters: builtins.list[InventoryCluster], tenants: builtins.list[InventoryTenant], generated_at: datetime, ) -> None: self.vms = vms self.networks = networks self.storage = storage self.nodes = nodes self.clusters = clusters self.tenants = tenants self.generated_at = generated_at
@property def summary(self) -> dict[str, Any]: """Get a summary of the inventory.""" running_vms = [vm for vm in self.vms if vm.power_state == "running"] running_networks = [n for n in self.networks if n.power_state == "running"] running_tenants = [t for t in self.tenants if t.is_running] total_capacity_bytes = sum(s.capacity_bytes for s in self.storage) total_used_bytes = sum(s.used_bytes for s in self.storage) return { "generated_at": self.generated_at.isoformat(), "vms_total": len(self.vms), "vms_running": len(running_vms), "vms_stopped": len(self.vms) - len(running_vms), "total_cpu_cores": sum(vm.cpu_cores for vm in self.vms), "total_ram_gb": round(sum(vm.ram_gb for vm in self.vms), 1), "networks_total": len(self.networks), "networks_running": len(running_networks), "storage_tiers": len(self.storage), "storage_capacity_gb": round(total_capacity_bytes / 1073741824, 1), "storage_used_gb": round(total_used_bytes / 1073741824, 1), "nodes_total": len(self.nodes), "clusters_total": len(self.clusters), "tenants_total": len(self.tenants), "tenants_running": len(running_tenants), } def __repr__(self) -> str: return ( f"<SystemInventory " f"VMs={len(self.vms)}, " f"Networks={len(self.networks)}, " f"Nodes={len(self.nodes)}, " f"Clusters={len(self.clusters)}, " f"Tenants={len(self.tenants)}>" )
# ============================================================================= # System Diagnostics # =============================================================================
[docs] class SystemDiagnostic(ResourceObject): """Represents a system diagnostic report in VergeOS. System diagnostics capture comprehensive system information including logs, configuration, network state, and other metrics useful for troubleshooting. Properties: name: Diagnostic report name. description: Report description. status: Current status (initializing, building, uploading, complete, error). status_display: Human-readable status. status_info: Additional status information (e.g., current node being processed). file_key: Associated file $key for download. is_complete: Whether the diagnostic build is complete. is_building: Whether the diagnostic is currently building. has_error: Whether the diagnostic encountered an error. created_at: When the diagnostic was created. """ @property def name(self) -> str: """Diagnostic report name.""" return str(self.get("name", "")) @property def description(self) -> str: """Report description.""" return str(self.get("description", "")) @property def status(self) -> str: """Current status (API value).""" return str(self.get("status", DIAG_STATUS_INITIALIZING)) @property def status_display(self) -> str: """Human-readable status.""" return DIAG_STATUS_DISPLAY.get(self.status, self.status) @property def status_info(self) -> str: """Additional status information.""" return str(self.get("status_info", "")) @property def file_key(self) -> int | None: """Associated file $key for download.""" val = self.get("file") return int(val) if val else None @property def is_complete(self) -> bool: """Whether the diagnostic build is complete.""" return self.status == DIAG_STATUS_COMPLETE @property def is_building(self) -> bool: """Whether the diagnostic is currently building.""" return self.status in (DIAG_STATUS_INITIALIZING, DIAG_STATUS_BUILDING) @property def has_error(self) -> bool: """Whether the diagnostic encountered an error.""" return self.status == DIAG_STATUS_ERROR @property def created_at(self) -> datetime | None: """When the diagnostic was created.""" ts = self.get("timestamp") if ts: return datetime.fromtimestamp(int(ts), tz=timezone.utc) return None
[docs] def refresh(self) -> SystemDiagnostic: """Refresh diagnostic data from API. Returns: Updated SystemDiagnostic object. """ from typing import cast manager = cast("SystemDiagnosticManager", self._manager) return manager.get(self.key)
[docs] def send_to_support(self) -> None: """Send this diagnostic report to Verge.io support. Requires the diagnostic to be complete. Raises: ValueError: If diagnostic is not complete. """ from typing import cast if not self.is_complete: raise ValueError("Diagnostic must be complete before sending to support") manager = cast("SystemDiagnosticManager", self._manager) manager.send_to_support(self.key)
[docs] def wait_for_completion( self, timeout: float = TASK_WAIT_TIMEOUT, poll_interval: float = POLL_INTERVAL, ) -> SystemDiagnostic: """Wait for the diagnostic build to complete. Args: timeout: Maximum time to wait in seconds. poll_interval: Time between status checks in seconds. Returns: The completed SystemDiagnostic object. Raises: TaskTimeoutError: If timeout is reached before completion. """ import time from pyvergeos.exceptions import TaskTimeoutError start_time = time.time() while True: current = self.refresh() if current.is_complete or current.has_error: return current elapsed = time.time() - start_time if elapsed >= timeout: raise TaskTimeoutError( f"Diagnostic build did not complete within {timeout} seconds" ) time.sleep(poll_interval)
[docs] def delete(self) -> None: """Delete this diagnostic report.""" from typing import cast manager = cast("SystemDiagnosticManager", self._manager) manager.delete(self.key)
def __repr__(self) -> str: return ( f"<SystemDiagnostic key={self.key} name={self.name!r} status={self.status_display!r}>" )
[docs] class SystemDiagnosticManager(ResourceManager[SystemDiagnostic]): """Manages system diagnostic reports in VergeOS. System diagnostics capture comprehensive system information for troubleshooting and support purposes. Example: >>> # Build a new diagnostic report >>> diag = client.system.diagnostics.build( ... name="Support Case #12345", ... description="Network connectivity issue", ... send_to_support=True ... ) >>> >>> # Wait for completion >>> diag = diag.wait_for_completion() >>> print(f"Status: {diag.status_display}") >>> >>> # List all diagnostics >>> for d in client.system.diagnostics.list(): ... print(f"{d.name}: {d.status_display}") """ _endpoint = "system_diagnostics" def _to_model(self, data: dict[str, Any]) -> SystemDiagnostic: return SystemDiagnostic(data, self)
[docs] def list( # noqa: A003 self, filter: str | None = None, # noqa: A002 fields: builtins.list[str] | None = None, limit: int | None = None, offset: int | None = None, *, status: str | None = None, **filter_kwargs: Any, ) -> builtins.list[SystemDiagnostic]: """List system diagnostic reports. Args: filter: OData filter string. fields: List of fields to return. limit: Maximum number of results. offset: Skip this many results. status: Filter by status (initializing, building, complete, error). **filter_kwargs: Additional filter arguments. Returns: List of SystemDiagnostic objects. Example: >>> # List all diagnostics >>> diagnostics = client.system.diagnostics.list() >>> >>> # List completed diagnostics only >>> completed = client.system.diagnostics.list(status="complete") """ if fields is None: fields = [ "$key", "name", "description", "status", "status_info", "file", "timestamp", ] filters = [] if filter: filters.append(filter) if status: filters.append(f"status eq '{status}'") combined_filter = " and ".join(filters) if filters else None return super().list( filter=combined_filter, fields=fields, limit=limit, offset=offset, **filter_kwargs, )
[docs] def get( # type: ignore[override] self, key: int, *, fields: builtins.list[str] | None = None ) -> SystemDiagnostic: """Get a diagnostic report by key. Args: key: Diagnostic $key. fields: List of fields to return. Returns: SystemDiagnostic object. Raises: NotFoundError: If diagnostic not found. """ if fields is None: fields = [ "$key", "name", "description", "status", "status_info", "file", "timestamp", ] return super().get(key, fields=fields)
[docs] def build( self, name: str | None = None, description: str | None = None, *, send_to_support: bool = False, ) -> SystemDiagnostic: """Build a new system diagnostic report. Triggers collection of comprehensive system information from all nodes. This operation may take several minutes depending on system size. Args: name: Report name. If not provided, auto-generated with timestamp. description: Report description. send_to_support: If True, automatically send to Verge.io support when complete (requires internet connectivity). Returns: The new SystemDiagnostic object (initially in 'initializing' status). Example: >>> # Build diagnostic for support case >>> diag = client.system.diagnostics.build( ... name="Support-12345", ... description="High CPU usage investigation", ... send_to_support=True ... ) >>> diag = diag.wait_for_completion() """ body: dict[str, Any] = {} if name: body["name"] = name if description: body["description"] = description if send_to_support: body["send2support"] = True response = self._client._request("POST", self._endpoint, json_data=body) if response is None: from pyvergeos.exceptions import APIError raise APIError("Diagnostic build returned no response") if isinstance(response, dict): key = response.get("$key") if key is not None: return self.get(int(key)) return self._to_model(response) from pyvergeos.exceptions import APIError raise APIError("Unexpected response format from diagnostic build")
[docs] def send_to_support(self, key: int) -> None: """Send a diagnostic report to Verge.io support. The diagnostic must be complete before sending. Args: key: Diagnostic $key. Raises: NotFoundError: If diagnostic not found. ValueError: If diagnostic is not complete. """ # First verify the diagnostic exists and is complete diag = self.get(key) if not diag.is_complete: raise ValueError("Diagnostic must be complete before sending to support") # Use the actions endpoint self._client._request( "POST", "system_diagnostic_actions", json_data={ "system_diagnostic": key, "action": "send2support", }, )
[docs] def delete(self, key: int) -> None: """Delete a diagnostic report. Args: key: Diagnostic $key. Raises: NotFoundError: If diagnostic not found. """ self._client._request("DELETE", f"{self._endpoint}/{key}")
[docs] def update( # type: ignore[override] self, key: int, *, name: str | None = None, description: str | None = None, ) -> SystemDiagnostic: """Update a diagnostic report's name or description. Args: key: Diagnostic $key. name: New name. description: New description. Returns: Updated SystemDiagnostic object. Raises: NotFoundError: If diagnostic not found. """ body: dict[str, Any] = {} if name is not None: body["name"] = name if description is not None: body["description"] = description if not body: return self.get(key) self._client._request("PUT", f"{self._endpoint}/{key}", json_data=body) return self.get(key)
[docs] def get_download_url(self, key: int) -> str | None: """Get the download URL for a completed diagnostic file. Args: key: Diagnostic $key. Returns: Download URL string, or None if file not available. Raises: NotFoundError: If diagnostic not found. ValueError: If diagnostic is not complete. """ diag = self.get(key) if not diag.is_complete: raise ValueError("Diagnostic must be complete to download") if diag.file_key is None: return None # Get the file information file_response = self._client._request( "GET", f"files/{diag.file_key}", params={"fields": "$key,name,path"}, ) if file_response and isinstance(file_response, dict): path = file_response.get("path") if path: return f"https://{self._client.host}/files/{path}" return None
# ============================================================================= # Root Certificates (Trusted CAs) # =============================================================================
[docs] class RootCertificate(ResourceObject): """Represents a trusted root certificate authority in VergeOS. Root certificates are added to the system's trust store to enable secure connections to systems using private/internal CA certificates. Properties: subject: Certificate subject (common name, organization, etc.). issuer: Certificate issuer. fingerprint: Certificate fingerprint (SHA-256). start_date: Certificate validity start date. end_date: Certificate validity end date. cert_pem: Certificate in PEM format. modified_at: When the certificate was last modified. """ @property def subject(self) -> str: """Certificate subject.""" return str(self.get("subject", "")) @property def issuer(self) -> str: """Certificate issuer.""" return str(self.get("issuer", "")) @property def fingerprint(self) -> str: """Certificate fingerprint.""" return str(self.get("fingerprint", "")) @property def start_date(self) -> str: """Certificate validity start date.""" return str(self.get("startdate", "")) @property def end_date(self) -> str: """Certificate validity end date.""" return str(self.get("enddate", "")) @property def cert_pem(self) -> str: """Certificate in PEM format.""" return str(self.get("cert", "")) @property def modified_at(self) -> datetime | None: """When the certificate was last modified.""" ts = self.get("modified") if ts: return datetime.fromtimestamp(int(ts), tz=timezone.utc) return None
[docs] def delete(self) -> None: """Delete this root certificate.""" from typing import cast manager = cast("RootCertificateManager", self._manager) manager.delete(self.key)
def __repr__(self) -> str: return f"<RootCertificate key={self.key} subject={self.subject!r}>"
[docs] class RootCertificateManager(ResourceManager[RootCertificate]): """Manages trusted root certificates in VergeOS. Root certificates allow adding custom Certificate Authorities to the system's trust store, enabling secure connections to systems using private/enterprise CA certificates. Common use cases: - Enabling secure site syncs with internal CA certificates - Trusting enterprise PKI for LDAP/AD connections - Development environments with self-signed CAs Example: >>> # Add a root CA >>> ca_cert = open("enterprise-ca.pem").read() >>> root_ca = client.system.root_certificates.create(cert=ca_cert) >>> print(f"Added CA: {root_ca.subject}") >>> >>> # List trusted CAs >>> for ca in client.system.root_certificates.list(): ... print(f"{ca.subject} (expires: {ca.end_date})") """ _endpoint = "root_certificates" def _to_model(self, data: dict[str, Any]) -> RootCertificate: return RootCertificate(data, self)
[docs] def list( # noqa: A003 self, filter: str | None = None, # noqa: A002 fields: builtins.list[str] | None = None, limit: int | None = None, offset: int | None = None, **filter_kwargs: Any, ) -> builtins.list[RootCertificate]: """List trusted root certificates. Args: filter: OData filter string. fields: List of fields to return. limit: Maximum number of results. offset: Skip this many results. **filter_kwargs: Additional filter arguments. Returns: List of RootCertificate objects. Example: >>> # List all trusted CAs >>> for ca in client.system.root_certificates.list(): ... print(f"{ca.subject}") """ if fields is None: fields = [ "$key", "subject", "issuer", "fingerprint", "startdate", "enddate", "modified", ] return super().list( filter=filter, fields=fields, limit=limit, offset=offset, **filter_kwargs, )
[docs] def get( # type: ignore[override] self, key: int, *, fields: builtins.list[str] | None = None ) -> RootCertificate: """Get a root certificate by key. Args: key: Certificate $key. fields: List of fields to return. Returns: RootCertificate object. Raises: NotFoundError: If certificate not found. """ if fields is None: fields = [ "$key", "cert", "subject", "issuer", "fingerprint", "startdate", "enddate", "modified", ] return super().get(key, fields=fields)
[docs] def get_by_subject(self, subject: str) -> RootCertificate: """Get a root certificate by subject. Args: subject: Certificate subject (partial match supported). Returns: RootCertificate object. Raises: NotFoundError: If certificate not found. """ results = self.list(filter=f"subject ct '{subject}'", limit=1) if not results: from pyvergeos.exceptions import NotFoundError raise NotFoundError(f"Root certificate with subject containing '{subject}' not found") return results[0]
[docs] def get_by_fingerprint(self, fingerprint: str) -> RootCertificate: """Get a root certificate by fingerprint. Args: fingerprint: Certificate fingerprint. Returns: RootCertificate object. Raises: NotFoundError: If certificate not found. """ results = self.list(filter=f"fingerprint eq '{fingerprint}'", limit=1) if not results: from pyvergeos.exceptions import NotFoundError raise NotFoundError(f"Root certificate with fingerprint '{fingerprint}' not found") return results[0]
[docs] def create(self, cert: str) -> RootCertificate: # type: ignore[override] """Add a new trusted root certificate. Args: cert: Certificate in PEM format. Returns: The new RootCertificate object. Raises: ValidationError: If the certificate is invalid. ConflictError: If the certificate already exists. Example: >>> ca_cert = '''-----BEGIN CERTIFICATE----- ... MIIDXTCCAkWgAwIBAgIJAJC1... ... -----END CERTIFICATE-----''' >>> root_ca = client.system.root_certificates.create(cert=ca_cert) """ response = self._client._request( "POST", self._endpoint, json_data={"cert": cert}, ) if response is None: from pyvergeos.exceptions import APIError raise APIError("Root certificate creation returned no response") if isinstance(response, dict): key = response.get("$key") if key is not None: return self.get(int(key)) return self._to_model(response) from pyvergeos.exceptions import APIError raise APIError("Unexpected response format from root certificate creation")
[docs] def delete(self, key: int) -> None: """Delete a root certificate. Args: key: Certificate $key. Raises: NotFoundError: If certificate not found. """ self._client._request("DELETE", f"{self._endpoint}/{key}")
# ============================================================================= # System Manager # =============================================================================
[docs] class SystemManager: """Provides system-level operations in VergeOS. Includes access to settings, statistics, licenses, diagnostics, root certificates, and inventory. Example: >>> # Get system statistics >>> stats = client.system.statistics() >>> print(f"VMs: {stats.vms_online}/{stats.vms_total}") >>> # Get licenses >>> for lic in client.system.licenses.list(): ... print(f"{lic.name}: {'valid' if lic.is_valid else 'invalid'}") >>> # Get system settings >>> setting = client.system.settings.get("max_connections") >>> print(f"Max connections: {setting.value}") >>> # Update a setting >>> setting = client.system.settings.update("max_connections", "1000") >>> # Build system diagnostics >>> diag = client.system.diagnostics.build( ... name="Support Case", ... send_to_support=True ... ) >>> # Add a trusted root CA >>> root_ca = client.system.root_certificates.create(cert=pem_text) >>> # Get full inventory >>> inventory = client.system.inventory() >>> print(inventory.summary) """
[docs] def __init__(self, client: VergeClient) -> None: self._client = client self._settings: SettingsManager | None = None self._licenses: LicenseManager | None = None self._diagnostics: SystemDiagnosticManager | None = None self._root_certificates: RootCertificateManager | None = None
@property def settings(self) -> SettingsManager: """Access system settings. System settings control various aspects of VergeOS behavior including connection limits, API rate limits, and more. Example: >>> # List all settings >>> for s in client.system.settings.list(): ... print(f"{s.key}: {s.value}") >>> # Update a setting >>> client.system.settings.update("max_connections", "1000") >>> # List modified settings >>> for s in client.system.settings.list_modified(): ... print(f"{s.key}: {s.value} (default: {s.default_value})") """ if self._settings is None: self._settings = SettingsManager(self._client) return self._settings @property def licenses(self) -> LicenseManager: """Access license management. Licenses control which features are available and the capabilities of the VergeOS system. Example: >>> # List licenses >>> for lic in client.system.licenses.list(): ... print(f"{lic.name}: {'valid' if lic.is_valid else 'invalid'}") >>> # Generate payload for air-gapped licensing >>> payload = client.system.licenses.generate_payload() """ if self._licenses is None: self._licenses = LicenseManager(self._client) return self._licenses @property def diagnostics(self) -> SystemDiagnosticManager: """Access system diagnostics. System diagnostics capture comprehensive system information for troubleshooting and support purposes. Example: >>> # Build a diagnostic report >>> diag = client.system.diagnostics.build( ... name="Support-12345", ... description="Network issue", ... send_to_support=True ... ) >>> diag = diag.wait_for_completion() >>> # List diagnostics >>> for d in client.system.diagnostics.list(): ... print(f"{d.name}: {d.status_display}") """ if self._diagnostics is None: self._diagnostics = SystemDiagnosticManager(self._client) return self._diagnostics @property def root_certificates(self) -> RootCertificateManager: """Access root certificate (trusted CA) management. Root certificates allow adding custom Certificate Authorities to the system's trust store for secure connections. Example: >>> # Add a trusted CA >>> ca_pem = open("enterprise-ca.pem").read() >>> ca = client.system.root_certificates.create(cert=ca_pem) >>> # List trusted CAs >>> for ca in client.system.root_certificates.list(): ... print(f"{ca.subject}") """ if self._root_certificates is None: self._root_certificates = RootCertificateManager(self._client) return self._root_certificates
[docs] def statistics(self) -> SystemStatistics: """Get system dashboard statistics. Returns: SystemStatistics object with counts for all resource types. Example: >>> stats = client.system.statistics() >>> print(f"Total VMs: {stats.vms_total}") >>> print(f"Online VMs: {stats.vms_online}") >>> print(f"Active Alarms: {stats.alarms_total}") """ response = self._client._request("GET", "dashboard") if not response: return SystemStatistics({}) # Response can be a dict or list with one item if isinstance(response, list) and len(response) > 0: response = response[0] return SystemStatistics(response if isinstance(response, dict) else {})
[docs] def inventory( self, include_vms: bool = True, include_networks: bool = True, include_storage: bool = True, include_nodes: bool = True, include_clusters: bool = True, include_tenants: bool = True, ) -> SystemInventory: """Generate a comprehensive system inventory. Similar to RVtools for VMware, this provides a complete view of all VergeOS resources. Args: include_vms: Include VM inventory. include_networks: Include network inventory. include_storage: Include storage tier inventory. include_nodes: Include node inventory. include_clusters: Include cluster inventory. include_tenants: Include tenant inventory. Returns: SystemInventory object containing all requested resources. Example: >>> inventory = client.system.inventory() >>> print(f"Total VMs: {len(inventory.vms)}") >>> print(f"Summary: {inventory.summary}") """ vms: builtins.list[InventoryVM] = [] networks: builtins.list[InventoryNetwork] = [] storage: builtins.list[InventoryStorageTier] = [] nodes: builtins.list[InventoryNode] = [] clusters: builtins.list[InventoryCluster] = [] tenants: builtins.list[InventoryTenant] = [] if include_vms: try: vm_response = self._client._request( "GET", "vms", params={ "filter": "is_snapshot ne true", "fields": ( "$key,name,description,cpu_cores,ram,os_family," "machine#status#status as power_state," "machine#cluster#name as cluster_name," "machine#status#node#name as node_name" ), }, ) if vm_response: vm_list = vm_response if isinstance(vm_response, list) else [vm_response] vms = [InventoryVM(v) for v in vm_list if v] except Exception as e: logger.warning("Failed to collect VM inventory: %s", e) if include_networks: try: net_response = self._client._request( "GET", "vnets", params={ "fields": ( "$key,name,description,type,network,ip," "machine#status#status as power_state" ), }, ) if net_response: net_list = net_response if isinstance(net_response, list) else [net_response] networks = [InventoryNetwork(n) for n in net_list if n] except Exception as e: logger.warning("Failed to collect network inventory: %s", e) if include_storage: try: tier_response = self._client._request( "GET", "storage_tiers", params={ "fields": "$key,tier,description,capacity,used", }, ) if tier_response: tier_list = ( tier_response if isinstance(tier_response, list) else [tier_response] ) storage = [InventoryStorageTier(t) for t in tier_list if t] except Exception as e: logger.warning("Failed to collect storage inventory: %s", e) if include_nodes: try: node_response = self._client._request( "GET", "nodes", params={ "fields": ( "$key,name,cores,ram," "machine#status#display(status) as status_display," "cluster#name as cluster_name" ), }, ) if node_response: node_list = ( node_response if isinstance(node_response, list) else [node_response] ) nodes = [InventoryNode(n) for n in node_list if n] except Exception as e: logger.warning("Failed to collect node inventory: %s", e) if include_clusters: try: cluster_response = self._client._request( "GET", "clusters", params={ "fields": ( "$key,name,description," "status#total_nodes as total_nodes," "status#online_nodes as online_nodes," "status#display(status) as status_display" ), }, ) if cluster_response: cluster_list = ( cluster_response if isinstance(cluster_response, list) else [cluster_response] ) clusters = [InventoryCluster(c) for c in cluster_list if c] except Exception as e: logger.warning("Failed to collect cluster inventory: %s", e) if include_tenants: try: tenant_response = self._client._request( "GET", "tenants", params={ "filter": "is_snapshot ne true", "fields": ( "$key,name,description," "status#display(status) as status_display," "status#status eq 'running' as is_running" ), }, ) if tenant_response: tenant_list = ( tenant_response if isinstance(tenant_response, list) else [tenant_response] ) tenants = [InventoryTenant(t) for t in tenant_list if t] except Exception as e: logger.warning("Failed to collect tenant inventory: %s", e) return SystemInventory( vms=vms, networks=networks, storage=storage, nodes=nodes, clusters=clusters, tenants=tenants, generated_at=datetime.now(timezone.utc), )