Source code for pyvergeos.resources.clusters

"""Cluster resource manager."""

from __future__ import annotations

import builtins
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Any, Literal

from pyvergeos.exceptions import ValidationError
from pyvergeos.filters import build_filter
from pyvergeos.resources.base import ResourceManager, ResourceObject

if TYPE_CHECKING:
    from pyvergeos.client import VergeClient
    from pyvergeos.resources.cluster_tiers import ClusterTierManager


# Status display mappings
STATUS_DISPLAY = {
    "online": "Online",
    "offline": "Offline",
    "maintenance": "Maintenance",
    "reduced": "Reduced Capacity",
    "noredundant": "No Redundancy",
    "error": "Error",
    "updating": "Updating",
    "shutdown": "Shutting Down",
    "insufficient": "Insufficient Nodes",
}

STATE_DISPLAY = {
    "online": "Online",
    "offline": "Offline",
    "warning": "Warning",
    "error": "Error",
}

HEALTH_STATUS = {
    "online": "Healthy",
    "warning": "Degraded",
    "error": "Critical",
    "offline": "Offline",
}

# Valid CPU types for clusters
CPU_TYPES = [
    "qemu64",
    "kvm64",
    "host",
    "Broadwell",
    "Cascadelake-Server",
    "Conroe",
    "Cooperlake",
    "core2duo",
    "coreduo",
    "Denverton",
    "EPYC",
    "EPYC-Genoa",
    "EPYC-Milan",
    "EPYC-Rome",
    "GraniteRapids",
    "Haswell",
    "Icelake-Server",
    "IvyBridge",
    "KnightsMill",
    "n270",
    "Nehalem",
    "Opteron_G1",
    "Opteron_G2",
    "Opteron_G3",
    "Opteron_G4",
    "Opteron_G5",
    "Penryn",
    "phenom",
    "SandyBridge",
    "SapphireRapids",
    "Skylake-Client",
    "Skylake-Server",
    "Snowridge",
    "Westmere",
]

# Energy performance policy mappings
ENERGY_PERF_POLICY_MAP = {
    "performance": "performance",
    "balance-performance": "balance-performance",
    "balance-power": "balance-power",
    "normal": "normal",
    "power": "power",
}

# Scaling governor mappings
SCALING_GOVERNOR_MAP = {
    "performance": "performance",
    "ondemand": "ondemand",
    "powersave": "powersave",
}


[docs] class Cluster(ResourceObject): """Cluster resource object. Represents a VergeOS cluster with compute and storage capabilities. Example: >>> cluster = client.clusters.get(name="Production") >>> print(f"{cluster.name}: {cluster.status}") >>> print(f" Nodes: {cluster.online_nodes}/{cluster.total_nodes}") >>> print(f" RAM: {cluster.used_ram_gb}/{cluster.online_ram_gb} GB") """ @property def name(self) -> str: """Cluster name.""" return str(self.get("name", "")) @property def description(self) -> str: """Cluster description.""" return str(self.get("description", "")) @property def is_enabled(self) -> bool: """Check if cluster is enabled.""" return bool(self.get("enabled", False)) @property def is_compute(self) -> bool: """Check if cluster is a compute cluster.""" return bool(self.get("compute", False)) @property def is_storage(self) -> bool: """Check if cluster is a storage cluster.""" return bool(self.get("storage", False)) @property def default_cpu(self) -> str: """Default CPU type for VMs in this cluster.""" return str(self.get("default_cpu", "")) @property def recommended_cpu_type(self) -> str: """Recommended CPU type based on cluster hardware.""" return str(self.get("recommended_cpu_type", "")) @property def nested_virtualization(self) -> bool: """Check if nested virtualization is enabled.""" return bool(self.get("kvm_nested", False)) @property def ram_per_unit(self) -> int: """RAM per billing unit in MB.""" return int(self.get("ram_per_unit") or 0) @property def cores_per_unit(self) -> int: """CPU cores per billing unit.""" return int(self.get("cores_per_unit") or 0) @property def max_ram_per_vm(self) -> int: """Maximum RAM allowed per VM in MB.""" return int(self.get("max_ram_per_vm") or 0) @property def max_cores_per_vm(self) -> int: """Maximum CPU cores allowed per VM.""" return int(self.get("max_cores_per_vm") or 0) @property def target_ram_percent(self) -> float: """Target maximum RAM utilization percentage.""" return float(self.get("target_ram_pct") or 0) @property def ram_overcommit_percent(self) -> float: """Percentage of reserve RAM to use for machines.""" return float(self.get("ram_overcommit_pct") or 0) @property def created_at(self) -> datetime | None: """Timestamp when cluster was created.""" ts = self.get("created") if ts: return datetime.fromtimestamp(int(ts), tz=timezone.utc) return None # Status properties (from status# fields) @property def status(self) -> str: """Cluster status (Online, Offline, etc.).""" raw = str(self.get("status_state", "")) return STATE_DISPLAY.get(raw, raw) @property def status_raw(self) -> str: """Raw status value.""" return str(self.get("status_state", "")) @property def total_nodes(self) -> int: """Total number of nodes in cluster.""" return int(self.get("total_nodes") or 0) @property def online_nodes(self) -> int: """Number of online nodes.""" return int(self.get("online_nodes") or 0) @property def total_ram_mb(self) -> int: """Total RAM in MB.""" return int(self.get("total_ram") or 0) @property def total_ram_gb(self) -> float: """Total RAM in GB.""" return round(self.total_ram_mb / 1024, 2) if self.total_ram_mb else 0.0 @property def online_ram_mb(self) -> int: """Online RAM in MB.""" return int(self.get("online_ram") or 0) @property def online_ram_gb(self) -> float: """Online RAM in GB.""" return round(self.online_ram_mb / 1024, 2) if self.online_ram_mb else 0.0 @property def used_ram_mb(self) -> int: """Used RAM in MB.""" return int(self.get("used_ram") or 0) @property def used_ram_gb(self) -> float: """Used RAM in GB.""" return round(self.used_ram_mb / 1024, 2) if self.used_ram_mb else 0.0 @property def ram_used_percent(self) -> float: """RAM usage percentage.""" if self.online_ram_mb > 0: return round((self.used_ram_mb / self.online_ram_mb) * 100, 1) return 0.0 @property def total_cores(self) -> int: """Total CPU cores.""" return int(self.get("total_cores") or 0) @property def online_cores(self) -> int: """Online CPU cores.""" return int(self.get("online_cores") or 0) @property def used_cores(self) -> int: """Used CPU cores.""" return int(self.get("used_cores") or 0) @property def cores_used_percent(self) -> float: """CPU core usage percentage.""" if self.online_cores > 0: return round((self.used_cores / self.online_cores) * 100, 1) return 0.0 @property def running_machines(self) -> int: """Number of running VMs.""" return int(self.get("running_machines") or 0) @property def tiers(self) -> ClusterTierManager: """Access tier management for this cluster. Returns: ClusterTierManager scoped to this cluster. Example: >>> cluster = client.clusters.get(name="Production") >>> for tier in cluster.tiers.list(): ... print(f"Tier {tier.tier}: {tier.used_percent}% used") """ from pyvergeos.resources.cluster_tiers import ClusterTierManager return ClusterTierManager(self._manager._client, self.key) def __repr__(self) -> str: return ( f"<Cluster key={self.get('$key', '?')} name={self.name!r} " f"status={self.status} nodes={self.online_nodes}/{self.total_nodes}>" )
[docs] class VSANStatus(ResourceObject): """Represents vSAN status for a cluster. Provides health status, capacity information, and resource utilization metrics for a VergeOS cluster's vSAN. """ @property def cluster_name(self) -> str: """Cluster name.""" return str(self.get("name", "")) @property def description(self) -> str: """Cluster description.""" return str(self.get("description", "")) @property def is_enabled(self) -> bool: """Whether cluster is enabled.""" return bool(self.get("enabled", False)) @property def is_storage(self) -> bool: """Whether cluster provides storage.""" return bool(self.get("storage", False)) @property def is_compute(self) -> bool: """Whether cluster provides compute.""" return bool(self.get("compute", False)) @property def status(self) -> str: """Cluster status (Online, Offline, etc.).""" raw = str(self.get("status", "")) return STATUS_DISPLAY.get(raw, raw) @property def status_raw(self) -> str: """Raw status value.""" return str(self.get("status", "")) @property def state(self) -> str: """Cluster state (Online, Warning, Error, Offline).""" raw = str(self.get("state", "")) return STATE_DISPLAY.get(raw, raw) @property def state_raw(self) -> str: """Raw state value.""" return str(self.get("state", "")) @property def health_status(self) -> str: """Health status (Healthy, Degraded, Critical, Offline).""" raw = self.get("state", "") return HEALTH_STATUS.get(raw, "Unknown") @property def status_info(self) -> str: """Status information message.""" return str(self.get("status_info", "")) @property def total_nodes(self) -> int: """Total number of nodes in cluster.""" return int(self.get("total_nodes") or 0) @property def online_nodes(self) -> int: """Number of online nodes.""" return int(self.get("online_nodes") or 0) @property def running_machines(self) -> int: """Number of running VMs.""" return int(self.get("running_machines") or 0) @property def total_ram_mb(self) -> int: """Total RAM in MB.""" return int(self.get("total_ram") or 0) @property def total_ram_gb(self) -> float: """Total RAM in GB.""" return round(self.total_ram_mb / 1024, 2) if self.total_ram_mb else 0.0 @property def online_ram_mb(self) -> int: """Online RAM in MB.""" return int(self.get("online_ram") or 0) @property def online_ram_gb(self) -> float: """Online RAM in GB.""" return round(self.online_ram_mb / 1024, 2) if self.online_ram_mb else 0.0 @property def used_ram_mb(self) -> int: """Used RAM in MB.""" return int(self.get("used_ram") or 0) @property def used_ram_gb(self) -> float: """Used RAM in GB.""" return round(self.used_ram_mb / 1024, 2) if self.used_ram_mb else 0.0 @property def ram_used_percent(self) -> float: """RAM usage percentage.""" if self.online_ram_mb > 0: return round((self.used_ram_mb / self.online_ram_mb) * 100, 1) return 0.0 @property def total_cores(self) -> int: """Total CPU cores.""" return int(self.get("total_cores") or 0) @property def online_cores(self) -> int: """Online CPU cores.""" return int(self.get("online_cores") or 0) @property def used_cores(self) -> int: """Used CPU cores.""" return int(self.get("used_cores") or 0) @property def core_used_percent(self) -> float: """CPU core usage percentage.""" if self.online_cores > 0: return round((self.used_cores / self.online_cores) * 100, 1) return 0.0 @property def last_update(self) -> datetime | None: """Last status update timestamp.""" ts = self.get("last_update") if ts: return datetime.fromtimestamp(int(ts), tz=timezone.utc) return None @property def tiers(self) -> builtins.list[dict[str, Any]]: """Tier status information (if include_tiers=True).""" raw_tiers = self.get("tiers") or [] result = [] for tier in raw_tiers: used = tier.get("used") or 0 capacity = tier.get("capacity") or 0 used_gb = round(used / 1073741824, 2) if used else 0 capacity_gb = round(capacity / 1073741824, 2) if capacity else 0 used_pct = round((used / capacity) * 100, 1) if capacity else 0 result.append( { "tier": tier.get("tier"), "status": tier.get("status"), "used_gb": used_gb, "capacity_gb": capacity_gb, "used_percent": used_pct, "read_ops": tier.get("read_ops") or 0, "write_ops": tier.get("write_ops") or 0, "read_bps": tier.get("read_bps") or 0, "write_bps": tier.get("write_bps") or 0, } ) return result def __repr__(self) -> str: return ( f"<VSANStatus cluster={self.cluster_name!r} " f"health={self.health_status} nodes={self.online_nodes}/{self.total_nodes}>" )
[docs] class ClusterManager(ResourceManager[Cluster]): """Manager for Cluster operations. Provides CRUD operations for VergeOS clusters including compute and storage configuration. Example: >>> # List all clusters >>> clusters = client.clusters.list() >>> for cluster in clusters: ... print(f"{cluster.name}: {cluster.status}") ... print(f" Nodes: {cluster.online_nodes}/{cluster.total_nodes}") >>> # Get a specific cluster >>> cluster = client.clusters.get(name="Production") >>> # Create a new cluster >>> new_cluster = client.clusters.create( ... name="Development", ... compute=True, ... max_ram_per_vm=65536, ... max_cores_per_vm=32, ... ) >>> # Update cluster settings >>> client.clusters.update(cluster.key, max_ram_per_vm=131072) >>> # Get vSAN status for all clusters >>> status_list = client.clusters.vsan_status() >>> for status in status_list: ... print(f"{status.cluster_name}: {status.health_status}") """ _endpoint = "clusters" # Default fields for list operations (includes status info) _default_fields = [ "$key", "name", "description", "enabled", "storage", "compute", "default_cpu", "recommended_cpu_type", "kvm_nested", "ram_per_unit", "cores_per_unit", "max_ram_per_vm", "max_cores_per_vm", "target_ram_pct", "ram_overcommit_pct", "created", "status#status as status_state", "status#total_nodes as total_nodes", "status#online_nodes as online_nodes", "status#total_ram as total_ram", "status#online_ram as online_ram", "status#used_ram as used_ram", "status#total_cores as total_cores", "status#online_cores as online_cores", "status#used_cores as used_cores", "status#running_machines as running_machines", ]
[docs] def __init__(self, client: VergeClient) -> None: super().__init__(client)
def _to_model(self, data: dict[str, Any]) -> Cluster: return Cluster(data, self)
[docs] def list( self, filter: str | None = None, fields: builtins.list[str] | None = None, limit: int | None = None, offset: int | None = None, *, name: str | None = None, enabled: bool | None = None, compute: bool | None = None, storage: bool | None = None, **filter_kwargs: Any, ) -> builtins.list[Cluster]: """List clusters with optional filtering. Args: filter: OData filter string. fields: List of fields to return (defaults to comprehensive set). limit: Maximum number of results. offset: Skip this many results. name: Filter by cluster name (supports wildcards with 'ct'). enabled: Filter by enabled status. compute: Filter by compute capability. storage: Filter by storage capability. **filter_kwargs: Additional filter arguments. Returns: List of Cluster objects. Example: >>> # List all clusters >>> clusters = client.clusters.list() >>> # List only compute clusters >>> compute_clusters = client.clusters.list(compute=True) >>> # List enabled clusters >>> enabled = client.clusters.list(enabled=True) """ params: dict[str, Any] = {} # Build filter filters = [] if filter: filters.append(filter) if name is not None: escaped = name.replace("'", "''") filters.append(f"name eq '{escaped}'") if enabled is not None: filters.append(f"enabled eq {str(enabled).lower()}") if compute is not None: filters.append(f"compute eq {str(compute).lower()}") if storage is not None: filters.append(f"storage eq {str(storage).lower()}") if filter_kwargs: filters.append(build_filter(**filter_kwargs)) if filters: params["filter"] = " and ".join(filters) # Use default fields if not specified if fields: params["fields"] = ",".join(fields) else: params["fields"] = ",".join(self._default_fields) # Pagination if limit is not None: params["limit"] = limit if offset is not None: params["offset"] = offset response = self._client._request("GET", self._endpoint, params=params) if response is None: return [] if not isinstance(response, list): return [self._to_model(response)] return [self._to_model(item) for item in response]
[docs] def get( self, key: int | None = None, *, name: str | None = None, fields: builtins.list[str] | None = None, ) -> Cluster: """Get a single cluster by key or name. Args: key: Cluster $key (ID). name: Cluster name. fields: List of fields to return. Returns: Cluster object. Raises: NotFoundError: If cluster not found. ValueError: If neither key nor name provided. Example: >>> cluster = client.clusters.get(key=1) >>> cluster = client.clusters.get(name="Production") """ # Use default fields if not specified if fields is None: fields = self._default_fields return super().get(key, name=name, fields=fields)
[docs] def create( # type: ignore[override] self, name: str, *, description: str | None = None, enabled: bool = True, compute: bool = False, nested_virtualization: bool = False, allow_nested_virt_migration: bool = True, allow_vgpu_migration: bool = False, default_cpu: str | None = None, disable_cpu_security_mitigations: bool = False, disable_smt: bool = False, enable_split_lock_detection: bool = False, energy_perf_policy: Literal[ "performance", "balance-performance", "balance-power", "normal", "power" ] = "performance", scaling_governor: Literal["performance", "ondemand", "powersave"] = "performance", ram_per_unit: int = 4096, cores_per_unit: int = 1, cost_per_unit: float = 0, price_per_unit: float = 0, max_ram_per_vm: int = 65536, max_cores_per_vm: int = 16, target_ram_percent: float = 80, ram_overcommit_percent: float = 0, storage_cache_per_node: int | None = None, storage_buffer_per_node: int | None = None, storage_hugepages: bool = True, enable_nvme_power_management: bool = False, swap_tier: int = -1, swap_per_drive: int | None = None, max_core_temp: int | None = None, critical_core_temp: int | None = None, max_core_temp_warn_percent: int = 10, disable_sleep: bool = False, log_filter: str | None = None, ) -> Cluster: """Create a new cluster. Args: name: Cluster name (1-128 characters, must be unique). description: Optional description (max 2048 characters). enabled: Enable the cluster after creation. compute: Enable compute workloads on this cluster. nested_virtualization: Enable nested virtualization (VMs inside VMs). allow_nested_virt_migration: Allow live migration of VMs with nested virt. allow_vgpu_migration: Allow live migration of VMs with vGPU devices. default_cpu: Default CPU type for VMs (e.g., 'host', 'EPYC-Milan'). disable_cpu_security_mitigations: Disable CPU security mitigations. disable_smt: Disable Simultaneous Multi-Threading (hyper-threading). enable_split_lock_detection: Enable split lock detection. energy_perf_policy: CPU energy-performance policy. scaling_governor: CPU scaling governor. ram_per_unit: RAM per billing unit in MB. cores_per_unit: CPU cores per billing unit. cost_per_unit: Cost per billing unit. price_per_unit: Price per billing unit. max_ram_per_vm: Maximum RAM allowed per VM in MB. max_cores_per_vm: Maximum CPU cores allowed per VM. target_ram_percent: Target maximum RAM utilization percentage (0-100). ram_overcommit_percent: Percentage of reserve RAM for machines (0-100). storage_cache_per_node: Storage cache per node in MB. storage_buffer_per_node: Storage buffer per node in MB. storage_hugepages: Allocate hugepages for storage. enable_nvme_power_management: Enable NVMe power management. swap_tier: Storage tier for swap (-1 to disable, 0-5 for tier). swap_per_drive: Swap space per drive in MB. max_core_temp: Maximum core temperature in Celsius. critical_core_temp: Critical core temperature in Celsius. max_core_temp_warn_percent: Temperature warning threshold percentage. disable_sleep: Disable CPU sleep states. log_filter: System log filter expression. Returns: Created Cluster object. Raises: ValidationError: If parameters are invalid. ConflictError: If cluster name already exists. Example: >>> cluster = client.clusters.create( ... name="Development", ... description="Development workloads", ... compute=True, ... max_ram_per_vm=65536, ... max_cores_per_vm=32, ... ) """ # Validate name if not name or len(name) > 128: raise ValidationError("Cluster name must be 1-128 characters") if description and len(description) > 2048: raise ValidationError("Description must be max 2048 characters") if default_cpu and default_cpu not in CPU_TYPES: raise ValidationError(f"Invalid CPU type. Must be one of: {', '.join(CPU_TYPES)}") # Build request body body: dict[str, Any] = { "name": name, "enabled": enabled, "compute": compute, "kvm_nested": nested_virtualization, "allow_nested_virt_migration": allow_nested_virt_migration, "allow_vgpu_migration": allow_vgpu_migration, "disable_cpu_security_mitigations": disable_cpu_security_mitigations, "disable_smt": disable_smt, "enable_split_lock_detection": enable_split_lock_detection, "x86_energy_perf_policy": energy_perf_policy, "scaling_governor": scaling_governor, "ram_per_unit": ram_per_unit, "cores_per_unit": cores_per_unit, "cost_per_unit": cost_per_unit, "price_per_unit": price_per_unit, "max_ram_per_vm": max_ram_per_vm, "max_cores_per_vm": max_cores_per_vm, "target_ram_pct": target_ram_percent, "ram_overcommit_pct": ram_overcommit_percent, "storage_hugepages": storage_hugepages, "enable_nvme_power_management": enable_nvme_power_management, "swap_tier": swap_tier, "disable_sleep": disable_sleep, "max_core_temp_warn_perc": max_core_temp_warn_percent, } # Add optional parameters if description: body["description"] = description if default_cpu: body["default_cpu"] = default_cpu if storage_cache_per_node is not None: body["storage_cachesize"] = storage_cache_per_node if storage_buffer_per_node is not None: body["storage_buffersize"] = storage_buffer_per_node if swap_per_drive is not None: body["swap_per_drive"] = swap_per_drive if max_core_temp is not None: body["max_core_temp"] = max_core_temp if critical_core_temp is not None: body["critical_core_temp"] = critical_core_temp if log_filter: body["log_filter"] = log_filter response = self._client._request("POST", self._endpoint, json_data=body) if response is None: raise ValueError("No response from create operation") if not isinstance(response, dict): raise ValueError("Create operation returned invalid response") # Get the created cluster key and fetch full details cluster_key = response.get("$key") or response.get("key") if cluster_key: return self.get(int(cluster_key)) return self._to_model(response)
[docs] def update( # type: ignore[override] self, key: int, *, name: str | None = None, description: str | None = None, enabled: bool | None = None, compute: bool | None = None, nested_virtualization: bool | None = None, allow_nested_virt_migration: bool | None = None, allow_vgpu_migration: bool | None = None, default_cpu: str | None = None, disable_cpu_security_mitigations: bool | None = None, disable_smt: bool | None = None, enable_split_lock_detection: bool | None = None, energy_perf_policy: Literal[ "performance", "balance-performance", "balance-power", "normal", "power" ] | None = None, scaling_governor: Literal["performance", "ondemand", "powersave"] | None = None, ram_per_unit: int | None = None, cores_per_unit: int | None = None, cost_per_unit: float | None = None, price_per_unit: float | None = None, max_ram_per_vm: int | None = None, max_cores_per_vm: int | None = None, target_ram_percent: float | None = None, ram_overcommit_percent: float | None = None, storage_cache_per_node: int | None = None, storage_buffer_per_node: int | None = None, storage_hugepages: bool | None = None, enable_nvme_power_management: bool | None = None, swap_tier: int | None = None, swap_per_drive: int | None = None, max_core_temp: int | None = None, critical_core_temp: int | None = None, max_core_temp_warn_percent: int | None = None, disable_sleep: bool | None = None, log_filter: str | None = None, ) -> Cluster: """Update an existing cluster. Args: key: Cluster $key (ID). name: New cluster name. description: New description. enabled: Enable or disable the cluster. compute: Enable or disable compute workloads. nested_virtualization: Enable or disable nested virtualization. allow_nested_virt_migration: Allow live migration with nested virt. allow_vgpu_migration: Allow live migration with vGPU. default_cpu: Default CPU type for VMs. disable_cpu_security_mitigations: Disable CPU security mitigations. disable_smt: Disable Simultaneous Multi-Threading. enable_split_lock_detection: Enable split lock detection. energy_perf_policy: CPU energy-performance policy. scaling_governor: CPU scaling governor. ram_per_unit: RAM per billing unit in MB. cores_per_unit: CPU cores per billing unit. cost_per_unit: Cost per billing unit. price_per_unit: Price per billing unit. max_ram_per_vm: Maximum RAM allowed per VM in MB. max_cores_per_vm: Maximum CPU cores allowed per VM. target_ram_percent: Target maximum RAM utilization percentage. ram_overcommit_percent: Percentage of reserve RAM for machines. storage_cache_per_node: Storage cache per node in MB. storage_buffer_per_node: Storage buffer per node in MB. storage_hugepages: Allocate hugepages for storage. enable_nvme_power_management: Enable NVMe power management. swap_tier: Storage tier for swap. swap_per_drive: Swap space per drive in MB. max_core_temp: Maximum core temperature in Celsius. critical_core_temp: Critical core temperature in Celsius. max_core_temp_warn_percent: Temperature warning threshold percentage. disable_sleep: Disable CPU sleep states. log_filter: System log filter expression. Returns: Updated Cluster object. Example: >>> updated = client.clusters.update( ... cluster.key, ... max_ram_per_vm=131072, ... max_cores_per_vm=64, ... ) """ body: dict[str, Any] = {} if name is not None: if len(name) > 128: raise ValidationError("Cluster name must be max 128 characters") body["name"] = name if description is not None: if len(description) > 2048: raise ValidationError("Description must be max 2048 characters") body["description"] = description if enabled is not None: body["enabled"] = enabled if compute is not None: body["compute"] = compute if nested_virtualization is not None: body["kvm_nested"] = nested_virtualization if allow_nested_virt_migration is not None: body["allow_nested_virt_migration"] = allow_nested_virt_migration if allow_vgpu_migration is not None: body["allow_vgpu_migration"] = allow_vgpu_migration if default_cpu is not None: if default_cpu not in CPU_TYPES: raise ValidationError(f"Invalid CPU type. Must be one of: {', '.join(CPU_TYPES)}") body["default_cpu"] = default_cpu if disable_cpu_security_mitigations is not None: body["disable_cpu_security_mitigations"] = disable_cpu_security_mitigations if disable_smt is not None: body["disable_smt"] = disable_smt if enable_split_lock_detection is not None: body["enable_split_lock_detection"] = enable_split_lock_detection if energy_perf_policy is not None: body["x86_energy_perf_policy"] = energy_perf_policy if scaling_governor is not None: body["scaling_governor"] = scaling_governor if ram_per_unit is not None: body["ram_per_unit"] = ram_per_unit if cores_per_unit is not None: body["cores_per_unit"] = cores_per_unit if cost_per_unit is not None: body["cost_per_unit"] = cost_per_unit if price_per_unit is not None: body["price_per_unit"] = price_per_unit if max_ram_per_vm is not None: body["max_ram_per_vm"] = max_ram_per_vm if max_cores_per_vm is not None: body["max_cores_per_vm"] = max_cores_per_vm if target_ram_percent is not None: body["target_ram_pct"] = target_ram_percent if ram_overcommit_percent is not None: body["ram_overcommit_pct"] = ram_overcommit_percent if storage_cache_per_node is not None: body["storage_cachesize"] = storage_cache_per_node if storage_buffer_per_node is not None: body["storage_buffersize"] = storage_buffer_per_node if storage_hugepages is not None: body["storage_hugepages"] = storage_hugepages if enable_nvme_power_management is not None: body["enable_nvme_power_management"] = enable_nvme_power_management if swap_tier is not None: body["swap_tier"] = swap_tier if swap_per_drive is not None: body["swap_per_drive"] = swap_per_drive if max_core_temp is not None: body["max_core_temp"] = max_core_temp if critical_core_temp is not None: body["critical_core_temp"] = critical_core_temp if max_core_temp_warn_percent is not None: body["max_core_temp_warn_perc"] = max_core_temp_warn_percent if disable_sleep is not None: body["disable_sleep"] = disable_sleep if log_filter is not None: body["log_filter"] = log_filter if not body: # No changes specified, just return current state return self.get(key) self._client._request("PUT", f"{self._endpoint}/{key}", json_data=body) return self.get(key)
[docs] def delete(self, key: int) -> None: """Delete a cluster. Clusters cannot be deleted if they have nodes or running machines assigned to them. Reassign resources before deletion. Args: key: Cluster $key (ID). Raises: ValidationError: If cluster has nodes or running machines. NotFoundError: If cluster not found. Example: >>> client.clusters.delete(cluster.key) """ # Get cluster to check for safety cluster = self.get(key) if cluster.total_nodes > 0: raise ValidationError( f"Cannot delete cluster '{cluster.name}': " f"Cluster has {cluster.total_nodes} node(s) assigned. " "Reassign nodes to another cluster first." ) if cluster.running_machines > 0: raise ValidationError( f"Cannot delete cluster '{cluster.name}': " f"Cluster has {cluster.running_machines} running machine(s). " "Stop and reassign VMs first." ) self._client._request("DELETE", f"{self._endpoint}/{key}")
[docs] def vsan_status( self, cluster_name: str | None = None, include_tiers: bool = False, ) -> builtins.list[VSANStatus]: """Get vSAN health status for clusters. Args: cluster_name: Filter by specific cluster name. include_tiers: Include per-tier status information. Returns: List of VSANStatus objects with health and capacity info. Example: >>> status_list = client.clusters.vsan_status() >>> for status in status_list: ... print(f"{status.cluster_name}: {status.health_status}") ... print(f" RAM: {status.used_ram_gb}/{status.online_ram_gb} GB") ... print(f" Cores: {status.used_cores}/{status.online_cores}") >>> # With tier information >>> status_list = client.clusters.vsan_status(include_tiers=True) >>> for status in status_list: ... for tier in status.tiers: ... print(f" Tier {tier['tier']}: {tier['used_percent']}% used") """ # Build field list fields = [ "$key", "name", "description", "enabled", "storage", "compute", "status#status as status", "status#state as state", "status#status_info as status_info", "status#total_nodes as total_nodes", "status#online_nodes as online_nodes", "status#running_machines as running_machines", "status#total_ram as total_ram", "status#online_ram as online_ram", "status#used_ram as used_ram", "status#total_cores as total_cores", "status#online_cores as online_cores", "status#used_cores as used_cores", "status#last_update as last_update", ] if include_tiers: fields.append( "tiers[$key,tier,status#status as status,status#used as used," "status#capacity as capacity,stats#rops as read_ops," "stats#wops as write_ops,stats#rbps as read_bps,stats#wbps as write_bps]" ) params: dict[str, Any] = {"fields": ",".join(fields)} if cluster_name: escaped = cluster_name.replace("'", "''") params["filter"] = f"name eq '{escaped}'" response = self._client._request("GET", self._endpoint, params=params) if response is None: return [] if not isinstance(response, list): response = [response] return [VSANStatus(item, self) for item in response if item]