Source code for pyvergeos.resources.volume_vm_exports

"""Volume VM export resources for exporting VMs to NAS volumes."""

from __future__ import annotations

import builtins
from typing import TYPE_CHECKING, Any

from pyvergeos.exceptions import NotFoundError
from pyvergeos.filters import build_filter
from pyvergeos.resources.base import ResourceManager, ResourceObject

if TYPE_CHECKING:
    from pyvergeos.client import VergeClient


[docs] class VolumeVmExport(ResourceObject): """Volume VM export resource object. Represents a VM export configuration for a NAS volume. VM exports allow exporting VMs to NAS volumes for backup/migration. Attributes: key: The export unique identifier ($key). volume: Parent NAS volume key. name: Volume name (derived from volume). quiesced: Whether exports are quiesced. status: Export status (idle, building, error, cleaning). status_info: Detailed status information. create_current: Whether to create 'current' folder with latest export. max_exports: Maximum exports to store (1-100). """ @property def is_idle(self) -> bool: """Check if export is idle.""" return self.get("status") == "idle" @property def is_building(self) -> bool: """Check if export is building.""" return self.get("status") == "building" @property def has_error(self) -> bool: """Check if export has an error.""" return self.get("status") == "error" @property def volume_key(self) -> int | None: """Get the parent volume key.""" vol = self.get("volume") return int(vol) if vol is not None else None @property def stats(self) -> VolumeVmExportStatManager: """Get a stats manager for this export's statistics. Returns: VolumeVmExportStatManager scoped to this export. Example: >>> # Browse export stats >>> for stat in export.stats.list(): ... print(f"{stat.file_name}: {stat.virtual_machines} VMs") """ from typing import cast manager = cast("VolumeVmExportManager", self._manager) return VolumeVmExportStatManager(manager._client, export_key=self.key)
[docs] def start( self, name: str | None = None, vms: builtins.list[int] | None = None, ) -> dict[str, Any] | None: """Start an export operation. Args: name: Optional export name/folder name. vms: Optional list of VM keys to export. If not provided, exports all. Returns: Action result dict or None. """ from typing import cast manager = cast("VolumeVmExportManager", self._manager) return manager.start_export(self.key, name=name, vms=vms)
[docs] def stop(self) -> dict[str, Any] | None: """Stop an in-progress export operation. Returns: Action result dict or None. """ from typing import cast manager = cast("VolumeVmExportManager", self._manager) return manager.stop_export(self.key)
[docs] def cleanup(self) -> dict[str, Any] | None: """Clean up old export folders. Returns: Action result dict or None. """ from typing import cast manager = cast("VolumeVmExportManager", self._manager) return manager.cleanup_exports(self.key)
[docs] class VolumeVmExportStat(ResourceObject): """Volume VM export statistics resource object. Represents statistics for a completed VM export. Attributes: key: The stat entry key ($key). volume_vm_exports: Parent export key. duration: Export duration in seconds. virtual_machines: Number of VMs exported. export_success: Number of successful exports. errors: Number of errors. quiesced: Whether the export was quiesced. size_bytes: Total size in bytes. file_name: Export folder name. timestamp: Export timestamp. """ @property def size_gb(self) -> float: """Get the export size in GB.""" size = self.get("size_bytes", 0) return round(size / 1073741824, 2) if size else 0 @property def has_errors(self) -> bool: """Check if the export had errors.""" errors = self.get("errors", 0) return int(errors) > 0 if errors else False
[docs] class VolumeVmExportManager(ResourceManager["VolumeVmExport"]): """Manager for volume VM export operations. Volume VM exports allow exporting VMs to NAS volumes for backup and migration purposes. Example: >>> # List all exports >>> for exp in client.volume_vm_exports.list(): ... print(f"{exp.name}: {exp.status}") >>> # Get export for a volume >>> exp = client.volume_vm_exports.get(volume=123) >>> # Start an export >>> exp.start(name="backup-2024") >>> # View export stats >>> for stat in exp.stats.list(): ... print(f"{stat.file_name}: {stat.size_gb}GB") """ _endpoint = "volume_vm_exports" # Default fields for list operations _default_fields = [ "$key", "volume", "volume#$display as volume_display", "volume#name as volume_name", "quiesced", "status", "status_info", "create_current", "max_exports", ]
[docs] def __init__(self, client: VergeClient) -> None: super().__init__(client)
[docs] def list( self, filter: str | None = None, fields: builtins.list[str] | None = None, limit: int | None = None, offset: int | None = None, status: str | None = None, volume: int | None = None, **filter_kwargs: Any, ) -> builtins.list[VolumeVmExport]: """List volume VM exports with optional filtering. Args: filter: OData filter string. fields: List of fields to return (uses defaults if not specified). limit: Maximum number of results. offset: Skip this many results. status: Filter by status (idle, building, error, cleaning). volume: Filter by volume key. **filter_kwargs: Shorthand filter arguments. Returns: List of VolumeVmExport objects. Example: >>> # List all exports >>> exports = client.volume_vm_exports.list() >>> # List active exports >>> active = client.volume_vm_exports.list(status="building") """ params: dict[str, Any] = {} # Build filter filters: builtins.list[str] = [] if filter: filters.append(filter) if filter_kwargs: filters.append(build_filter(**filter_kwargs)) # Add status filter if status: filters.append(f"status eq '{status}'") # Add volume filter if volume is not None: filters.append(f"volume eq {volume}") if filters: params["filter"] = " and ".join(filters) # Use default fields if not specified if fields: params["fields"] = ",".join(fields) else: params["fields"] = ",".join(self._default_fields) # Pagination if limit is not None: params["limit"] = limit if offset is not None: params["offset"] = offset response = self._client._request("GET", self._endpoint, params=params) if response is None: return [] if not isinstance(response, list): response = [response] return [self._to_model(item) for item in response if item]
[docs] def get( # type: ignore[override] self, key: int | None = None, *, volume: int | None = None, fields: builtins.list[str] | None = None, ) -> VolumeVmExport: """Get a single volume VM export by key or volume. Args: key: Export $key (row ID). volume: Parent volume key (since there's one export per volume). fields: List of fields to return. Returns: VolumeVmExport object. Raises: NotFoundError: If export not found. ValueError: If no identifier provided. Example: >>> # Get by key >>> exp = client.volume_vm_exports.get(key=1) >>> # Get by volume >>> exp = client.volume_vm_exports.get(volume=123) """ if key is not None: # Direct fetch by key params: dict[str, Any] = {} if fields: params["fields"] = ",".join(fields) else: params["fields"] = ",".join(self._default_fields) response = self._client._request("GET", f"{self._endpoint}/{key}", params=params) if response is None: raise NotFoundError(f"Volume VM export with key {key} not found") if not isinstance(response, dict): raise NotFoundError(f"Volume VM export with key {key} returned invalid response") return self._to_model(response) if volume is not None: # Search by volume results = self.list(filter=f"volume eq {volume}", fields=fields, limit=1) if not results: raise NotFoundError(f"Volume VM export for volume {volume} not found") return results[0] raise ValueError("Either key or volume must be provided")
[docs] def create( # type: ignore[override] self, volume: int, *, quiesced: bool = True, create_current: bool = True, max_exports: int = 3, ) -> VolumeVmExport: """Create a new volume VM export configuration. Args: volume: NAS volume key to export VMs to. quiesced: Whether to quiesce VMs during export (default True). create_current: Create 'current' folder with latest export (default True). max_exports: Maximum exports to store (1-100, default 3). Returns: Created VolumeVmExport object. Example: >>> exp = client.volume_vm_exports.create( ... volume=123, ... max_exports=5, ... quiesced=True ... ) """ body: dict[str, Any] = { "volume": volume, "quiesced": quiesced, "create_current": create_current, "max_exports": max_exports, } response = self._client._request("POST", self._endpoint, json_data=body) # Get the created export if response and isinstance(response, dict): exp_key = response.get("$key") if exp_key: return self.get(key=exp_key) # Fallback: search by volume return self.get(volume=volume)
[docs] def update( # type: ignore[override] self, key: int, *, quiesced: bool | None = None, create_current: bool | None = None, max_exports: int | None = None, ) -> VolumeVmExport: """Update a volume VM export configuration. Args: key: Export $key (row ID). quiesced: Whether to quiesce VMs during export. create_current: Create 'current' folder with latest export. max_exports: Maximum exports to store (1-100). Returns: Updated VolumeVmExport object. """ body: dict[str, Any] = {} if quiesced is not None: body["quiesced"] = quiesced if create_current is not None: body["create_current"] = create_current if max_exports is not None: body["max_exports"] = max_exports if not body: return self.get(key) self._client._request("PUT", f"{self._endpoint}/{key}", json_data=body) return self.get(key)
[docs] def delete(self, key: int) -> None: """Delete a volume VM export configuration. Args: key: Export $key (row ID). Example: >>> client.volume_vm_exports.delete(1) """ self._client._request("DELETE", f"{self._endpoint}/{key}")
[docs] def start_export( self, key: int, name: str | None = None, vms: builtins.list[int] | None = None, ) -> dict[str, Any] | None: """Start a VM export operation. Args: key: Export $key (row ID). name: Optional export name/folder name. vms: Optional list of VM keys to export. If not provided, exports all. Returns: Action result dict or None. Example: >>> result = client.volume_vm_exports.start_export(1, name="backup-2024") """ # Use the actions endpoint body: dict[str, Any] = { "volume_vm_export": key, "action": "start_export", } params: dict[str, Any] = {} if name is not None: params["name"] = name if vms is not None: params["vms"] = vms if params: body["params"] = params result = self._client._request("POST", "volume_vm_export_actions", json_data=body) if isinstance(result, dict): return result return None
[docs] def stop_export(self, key: int) -> dict[str, Any] | None: """Stop an in-progress VM export operation. Args: key: Export $key (row ID). Returns: Action result dict or None. Example: >>> client.volume_vm_exports.stop_export(1) """ body: dict[str, Any] = { "volume_vm_export": key, "action": "stop_export", } result = self._client._request("POST", "volume_vm_export_actions", json_data=body) if isinstance(result, dict): return result return None
[docs] def cleanup_exports(self, key: int) -> dict[str, Any] | None: """Clean up old export folders. Args: key: Export $key (row ID). Returns: Action result dict or None. Example: >>> client.volume_vm_exports.cleanup_exports(1) """ body: dict[str, Any] = { "volume_vm_export": key, "action": "cleanup", } result = self._client._request("POST", "volume_vm_export_actions", json_data=body) if isinstance(result, dict): return result return None
[docs] def stats(self, key: int) -> VolumeVmExportStatManager: """Get a stats manager for a specific export. Args: key: Export $key (row ID). Returns: VolumeVmExportStatManager for the export. Example: >>> for stat in client.volume_vm_exports.stats(1).list(): ... print(f"{stat.file_name}: {stat.size_gb}GB") """ return VolumeVmExportStatManager(self._client, export_key=key)
def _to_model(self, data: dict[str, Any]) -> VolumeVmExport: """Convert API response to VolumeVmExport object.""" return VolumeVmExport(data, self)
[docs] class VolumeVmExportStatManager(ResourceManager["VolumeVmExportStat"]): """Manager for volume VM export statistics. This manager provides access to export statistics. It can be used either standalone or scoped to a specific export. Example: >>> # List all export stats (standalone) >>> for stat in client.volume_vm_export_stats.list(): ... print(f"{stat.file_name}: {stat.size_gb}GB") >>> # List stats for a specific export >>> for stat in client.volume_vm_exports.stats(1).list(): ... print(f"{stat.file_name}: {stat.virtual_machines} VMs") """ _endpoint = "volume_vm_export_stats" # Default fields for list operations _default_fields = [ "$key", "volume_vm_exports", "duration", "virtual_machines", "export_success", "errors", "quiesced", "size_bytes", "file_name", "timestamp", ]
[docs] def __init__(self, client: VergeClient, *, export_key: int | None = None) -> None: super().__init__(client) self._export_key = export_key
[docs] def list( self, filter: str | None = None, fields: builtins.list[str] | None = None, limit: int | None = None, offset: int | None = None, volume_vm_exports: int | None = None, **filter_kwargs: Any, ) -> builtins.list[VolumeVmExportStat]: """List volume VM export statistics with optional filtering. Args: filter: OData filter string. fields: List of fields to return (uses defaults if not specified). limit: Maximum number of results. offset: Skip this many results. volume_vm_exports: Filter by export key. Ignored if manager is scoped. **filter_kwargs: Shorthand filter arguments. Returns: List of VolumeVmExportStat objects. Example: >>> # List all stats >>> stats = client.volume_vm_export_stats.list() >>> # List stats for a specific export >>> stats = client.volume_vm_exports.stats(1).list() """ params: dict[str, Any] = {} # Build filter filters: builtins.list[str] = [] if filter: filters.append(filter) if filter_kwargs: filters.append(build_filter(**filter_kwargs)) # Add export filter (from scope or parameter) export_key = self._export_key if export_key is None and volume_vm_exports is not None: export_key = volume_vm_exports if export_key is not None: filters.append(f"volume_vm_exports eq {export_key}") if filters: params["filter"] = " and ".join(filters) # Use default fields if not specified if fields: params["fields"] = ",".join(fields) else: params["fields"] = ",".join(self._default_fields) # Pagination if limit is not None: params["limit"] = limit if offset is not None: params["offset"] = offset response = self._client._request("GET", self._endpoint, params=params) if response is None: return [] if not isinstance(response, list): response = [response] return [self._to_model(item) for item in response if item]
[docs] def get( # type: ignore[override] self, key: int | None = None, *, fields: builtins.list[str] | None = None, ) -> VolumeVmExportStat: """Get a single VM export stat by key. Args: key: Stat $key (row ID). fields: List of fields to return. Returns: VolumeVmExportStat object. Raises: NotFoundError: If stat not found. ValueError: If key not provided. Example: >>> stat = client.volume_vm_export_stats.get(1) """ if key is not None: params: dict[str, Any] = {} if fields: params["fields"] = ",".join(fields) else: params["fields"] = ",".join(self._default_fields) response = self._client._request("GET", f"{self._endpoint}/{key}", params=params) if response is None: raise NotFoundError(f"Volume VM export stat with key {key} not found") if not isinstance(response, dict): raise NotFoundError( f"Volume VM export stat with key {key} returned invalid response" ) return self._to_model(response) raise ValueError("Key must be provided")
[docs] def delete(self, key: int) -> None: """Delete a VM export stat entry. Args: key: Stat $key (row ID). Example: >>> client.volume_vm_export_stats.delete(1) """ self._client._request("DELETE", f"{self._endpoint}/{key}")
def _to_model(self, data: dict[str, Any]) -> VolumeVmExportStat: """Convert API response to VolumeVmExportStat object.""" return VolumeVmExportStat(data, self)