Source code for pyvergeos.resources.vm_imports

"""VM import resources for importing VMs from files."""

from __future__ import annotations

import builtins
from typing import TYPE_CHECKING, Any

from pyvergeos.exceptions import NotFoundError
from pyvergeos.filters import build_filter
from pyvergeos.resources.base import ResourceManager, ResourceObject

if TYPE_CHECKING:
    from pyvergeos.client import VergeClient


[docs] class VmImport(ResourceObject): """VM import resource object. Represents a VM import operation for importing VMs from files (VMDK, QCOW2, OVA, OVF) or other sources. Note: Import keys are 40-character hex strings, not integers like most other VergeOS resources. Attributes: key: The import unique identifier ($key) - 40-char hex string. id: The import ID (same as $key). name: VM name for the imported VM. vm: Created VM key (after import completes). uuid: UUID from the source VM. file: Source file key from media catalog. volume: Source NAS volume key. volume_path: Path within the source volume. status: Import status (initializing, importing, complete, aborted, error, warning). status_info: Detailed status information. importing: Whether import is in progress. aborted: Whether import was aborted. preserve_macs: Whether to preserve MAC addresses from source. preserve_drive_format: Whether to preserve original drive format. preferred_tier: Preferred storage tier (1-5). timestamp: Creation timestamp. modified: Last modified timestamp. """ @property def key(self) -> str: # type: ignore[override] """Resource primary key ($key) - 40-character hex string. Raises: ValueError: If resource has no $key (not yet persisted). """ k = self.get("$key") if k is None: raise ValueError("Resource has no $key - may not be persisted") return str(k)
[docs] def refresh(self) -> VmImport: """Refresh resource data from API. Returns: Updated VmImport object. """ from typing import cast manager = cast("VmImportManager", self._manager) return manager.get(self.key)
[docs] def save(self, **kwargs: Any) -> VmImport: """Save changes to resource. Args: **kwargs: Fields to update. Returns: Updated VmImport object. """ from typing import cast manager = cast("VmImportManager", self._manager) return manager.update(self.key, **kwargs)
[docs] def delete(self) -> None: """Delete this import.""" from typing import cast manager = cast("VmImportManager", self._manager) manager.delete(self.key)
@property def is_complete(self) -> bool: """Check if import completed successfully.""" return self.get("status") == "complete" @property def is_importing(self) -> bool: """Check if import is in progress.""" return self.get("importing", False) or self.get("status") == "importing" @property def has_error(self) -> bool: """Check if import has an error.""" return self.get("status") in ("error", "aborted") @property def vm_key(self) -> int | None: """Get the created VM key (after import completes).""" vm = self.get("vm") return int(vm) if vm is not None else None @property def logs(self) -> VmImportLogManager: """Get a log manager for this import's logs. Returns: VmImportLogManager scoped to this import. Example: >>> # Browse import logs >>> for log in vm_import.logs.list(): ... print(f"{log.level}: {log.text}") """ from typing import cast manager = cast("VmImportManager", self._manager) return VmImportLogManager(manager._client, import_key=self.key)
[docs] def start(self) -> dict[str, Any] | None: """Start the import operation. Returns: Task information dict or None. """ from typing import cast manager = cast("VmImportManager", self._manager) return manager.start_import(self.key)
[docs] def abort(self) -> dict[str, Any] | None: """Abort the import operation. Returns: Task information dict or None. """ from typing import cast manager = cast("VmImportManager", self._manager) return manager.abort_import(self.key)
[docs] class VmImportLog(ResourceObject): """VM import log entry resource object. Represents a log entry for a VM import operation. Attributes: key: The log entry key ($key). vm_import: Parent import key. level: Log level (message, warning, error, critical, debug, summary). text: Log message text. timestamp: Log entry timestamp. user: User who initiated the action. """ @property def is_error(self) -> bool: """Check if this is an error log entry.""" return self.get("level") in ("error", "critical") @property def is_warning(self) -> bool: """Check if this is a warning log entry.""" return self.get("level") == "warning"
[docs] class VmImportManager(ResourceManager["VmImport"]): """Manager for VM import operations. VM imports allow importing VMs from files (VMDK, QCOW2, OVA, OVF) or from NAS volumes. Example: >>> # List all imports >>> for imp in client.vm_imports.list(): ... print(f"{imp.name}: {imp.status}") >>> # Get a specific import >>> imp = client.vm_imports.get(key="abc123...") >>> # Create an import from a file >>> imp = client.vm_imports.create( ... name="imported-vm", ... file=123, # file key from media catalog ... preferred_tier="1" ... ) >>> # Start the import >>> imp.start() >>> # Monitor import logs >>> for log in imp.logs.list(): ... print(f"{log.level}: {log.text}") """ _endpoint = "vm_imports" # Default fields for list operations _default_fields = [ "$key", "id", "name", "vm", "vm#$display as vm_display", "uuid", "file", "file#$display as file_display", "volume", "volume#$display as volume_display", "volume_path", "status", "status_info", "importing", "aborted", "preserve_macs", "preserve_drive_format", "preferred_tier", "timestamp", "modified", ]
[docs] def __init__(self, client: VergeClient) -> None: super().__init__(client)
[docs] def list( self, filter: str | None = None, fields: builtins.list[str] | None = None, limit: int | None = None, offset: int | None = None, status: str | None = None, **filter_kwargs: Any, ) -> builtins.list[VmImport]: """List VM imports with optional filtering. Args: filter: OData filter string. fields: List of fields to return (uses defaults if not specified). limit: Maximum number of results. offset: Skip this many results. status: Filter by status (initializing, importing, complete, aborted, error). **filter_kwargs: Shorthand filter arguments (name, etc.). Returns: List of VmImport objects. Example: >>> # List all imports >>> imports = client.vm_imports.list() >>> # List active imports >>> active = client.vm_imports.list(status="importing") >>> # List completed imports >>> done = client.vm_imports.list(status="complete") """ params: dict[str, Any] = {} # Build filter filters: builtins.list[str] = [] if filter: filters.append(filter) if filter_kwargs: filters.append(build_filter(**filter_kwargs)) # Add status filter if status: filters.append(f"status eq '{status}'") if filters: params["filter"] = " and ".join(filters) # Use default fields if not specified if fields: params["fields"] = ",".join(fields) else: params["fields"] = ",".join(self._default_fields) # Pagination if limit is not None: params["limit"] = limit if offset is not None: params["offset"] = offset response = self._client._request("GET", self._endpoint, params=params) if response is None: return [] if not isinstance(response, list): response = [response] return [self._to_model(item) for item in response if item]
[docs] def get( # type: ignore[override] self, key: str | None = None, *, name: str | None = None, fields: builtins.list[str] | None = None, ) -> VmImport: """Get a single VM import by key or name. Args: key: Import $key (40-character hex string). name: Import name (VM name). fields: List of fields to return. Returns: VmImport object. Raises: NotFoundError: If import not found. ValueError: If no identifier provided. Example: >>> # Get by key >>> imp = client.vm_imports.get("8f73f8bcc9c9f1aaba32f733bfc295acaf548554") >>> # Get by name >>> imp = client.vm_imports.get(name="imported-vm") """ if key is not None: # Fetch by key using id filter params: dict[str, Any] = { "filter": f"id eq '{key}'", } if fields: params["fields"] = ",".join(fields) else: params["fields"] = ",".join(self._default_fields) response = self._client._request("GET", self._endpoint, params=params) if response is None: raise NotFoundError(f"VM import with key {key} not found") if isinstance(response, list): if not response: raise NotFoundError(f"VM import with key {key} not found") response = response[0] if not isinstance(response, dict): raise NotFoundError(f"VM import with key {key} returned invalid response") return self._to_model(response) if name is not None: # Search by name escaped_name = name.replace("'", "''") results = self.list(filter=f"name eq '{escaped_name}'", fields=fields, limit=1) if not results: raise NotFoundError(f"VM import with name '{name}' not found") return results[0] raise ValueError("Either key or name must be provided")
[docs] def create( # type: ignore[override] self, name: str, *, file: int | None = None, volume: str | None = None, volume_path: str | None = None, shared_object: int | None = None, preserve_macs: bool = True, preserve_drive_format: bool = False, preferred_tier: str | None = None, no_optical_drives: bool = False, override_drive_interface: str | None = None, override_nic_interface: str | None = None, cleanup_on_delete: bool = False, importing: bool = False, ) -> VmImport: """Create a new VM import. Args: name: Name for the imported VM. file: Source file key from media catalog (for file imports). volume: Source NAS volume key (for volume imports). volume_path: Path within the source volume. shared_object: Shared object key (for tenant imports). preserve_macs: Preserve MAC addresses from source (default True). preserve_drive_format: Preserve original drive format instead of converting to optimized .raw format (default False). preferred_tier: Preferred storage tier (1-5). no_optical_drives: Do not create optical drives (default False). override_drive_interface: Override drive interface type. override_nic_interface: Override NIC interface type. cleanup_on_delete: Clean up import file path on delete. importing: Auto-start the import immediately after creation (default False). Returns: Created VmImport object. Raises: ValueError: If no source (file, volume, or shared_object) provided. Example: >>> # Import from media catalog file (auto-start) >>> imp = client.vm_imports.create( ... name="imported-vm", ... file=123, ... preferred_tier="1", ... importing=True ... ) >>> # Import from NAS volume (two-step: create then start) >>> imp = client.vm_imports.create( ... name="imported-vm", ... volume="abc123...", ... volume_path="/exports/vm.vmdk" ... ) >>> imp.start() """ if file is None and volume is None and shared_object is None: raise ValueError("One of file, volume, or shared_object must be provided") body: dict[str, Any] = { "name": name, "preserve_macs": preserve_macs, "preserve_drive_format": preserve_drive_format, } if file is not None: body["file"] = file if volume is not None: body["volume"] = volume if volume_path is not None: body["volume_path"] = volume_path if shared_object is not None: body["shared_object"] = shared_object if preferred_tier is not None: body["preferred_tier"] = preferred_tier if no_optical_drives: body["no_optical_drives"] = True if override_drive_interface is not None: body["override_drive_interface"] = override_drive_interface if override_nic_interface is not None: body["override_nic_interface"] = override_nic_interface if cleanup_on_delete: body["cleanup_on_delete"] = True if importing: body["importing"] = True response = self._client._request("POST", self._endpoint, json_data=body) # Get the created import if response and isinstance(response, dict): imp_key = response.get("$key") or response.get("id") if imp_key: return self.get(key=imp_key) # Fallback: search by name return self.get(name=name)
[docs] def update( # type: ignore[override] self, key: str, *, name: str | None = None, vm: int | None = None, preserve_macs: bool | None = None, preserve_drive_format: bool | None = None, preferred_tier: str | None = None, no_optical_drives: bool | None = None, override_drive_interface: str | None = None, override_nic_interface: str | None = None, ) -> VmImport: """Update a VM import. Args: key: Import $key (40-character hex string). name: New VM name. vm: Associated VM key. preserve_macs: Preserve MAC addresses. preserve_drive_format: Preserve original drive format. preferred_tier: Preferred storage tier (1-5). no_optical_drives: Do not create optical drives. override_drive_interface: Override drive interface type. override_nic_interface: Override NIC interface type. Returns: Updated VmImport object. """ body: dict[str, Any] = {} if name is not None: body["name"] = name if vm is not None: body["vm"] = vm if preserve_macs is not None: body["preserve_macs"] = preserve_macs if preserve_drive_format is not None: body["preserve_drive_format"] = preserve_drive_format if preferred_tier is not None: body["preferred_tier"] = preferred_tier if no_optical_drives is not None: body["no_optical_drives"] = no_optical_drives if override_drive_interface is not None: body["override_drive_interface"] = override_drive_interface if override_nic_interface is not None: body["override_nic_interface"] = override_nic_interface if not body: return self.get(key) self._client._request("PUT", f"{self._endpoint}/{key}", json_data=body) return self.get(key)
[docs] def delete(self, key: str) -> None: # type: ignore[override] """Delete a VM import. Args: key: Import $key (40-character hex string). Example: >>> client.vm_imports.delete(imp.key) """ self._client._request("DELETE", f"{self._endpoint}/{key}")
[docs] def start_import(self, key: str) -> dict[str, Any] | None: """Start a VM import operation. This initiates the actual import process for an import that has been created but not yet started. Args: key: Import $key (40-character hex string). Returns: Task information dict or None. Example: >>> result = client.vm_imports.start_import(imp.key) >>> if result and "task" in result: ... client.tasks.wait(result["task"]) """ result = self._client._request("PUT", f"{self._endpoint}/{key}?action=import", json_data={}) if isinstance(result, dict): return result return None
[docs] def abort_import(self, key: str) -> dict[str, Any] | None: """Abort a VM import operation. This stops an in-progress import. Args: key: Import $key (40-character hex string). Returns: Task information dict or None. Example: >>> client.vm_imports.abort_import(imp.key) """ result = self._client._request("PUT", f"{self._endpoint}/{key}?action=abort", json_data={}) if isinstance(result, dict): return result return None
[docs] def logs(self, key: str) -> VmImportLogManager: """Get a log manager for a specific import. Args: key: Import $key (40-character hex string). Returns: VmImportLogManager for the import. Example: >>> # List logs for an import >>> for log in client.vm_imports.logs(imp.key).list(): ... print(f"{log.level}: {log.text}") """ return VmImportLogManager(self._client, import_key=key)
def _to_model(self, data: dict[str, Any]) -> VmImport: """Convert API response to VmImport object.""" return VmImport(data, self)
[docs] class VmImportLogManager(ResourceManager["VmImportLog"]): """Manager for VM import log operations. This manager provides access to log entries for VM imports. It can be used either standalone or scoped to a specific import. Example: >>> # List all import logs (standalone) >>> for log in client.vm_import_logs.list(): ... print(f"{log.level}: {log.text}") >>> # List logs for a specific import >>> for log in client.vm_imports.logs(imp.key).list(): ... print(f"{log.level}: {log.text}") >>> # Get only errors >>> errors = client.vm_imports.logs(imp.key).list(level="error") """ _endpoint = "vm_import_logs" # Default fields for list operations _default_fields = [ "$key", "vm_import", "level", "text", "timestamp", "user", ]
[docs] def __init__(self, client: VergeClient, *, import_key: str | None = None) -> None: super().__init__(client) self._import_key = import_key
[docs] def list( self, filter: str | None = None, fields: builtins.list[str] | None = None, limit: int | None = None, offset: int | None = None, level: str | None = None, vm_import: str | None = None, **filter_kwargs: Any, ) -> builtins.list[VmImportLog]: """List VM import logs with optional filtering. Args: filter: OData filter string. fields: List of fields to return (uses defaults if not specified). limit: Maximum number of results. offset: Skip this many results. level: Filter by log level (message, warning, error, critical, debug). vm_import: Filter by import key. Ignored if manager is scoped. **filter_kwargs: Shorthand filter arguments. Returns: List of VmImportLog objects. Example: >>> # List all logs >>> logs = client.vm_import_logs.list() >>> # List errors only >>> errors = client.vm_import_logs.list(level="error") >>> # List logs for a specific import >>> logs = client.vm_imports.logs(imp.key).list() """ params: dict[str, Any] = {} # Build filter filters: builtins.list[str] = [] if filter: filters.append(filter) if filter_kwargs: filters.append(build_filter(**filter_kwargs)) # Add import filter (from scope or parameter) import_key = self._import_key if import_key is None and vm_import is not None: import_key = vm_import if import_key is not None: filters.append(f"vm_import eq '{import_key}'") # Add level filter if level: filters.append(f"level eq '{level}'") if filters: params["filter"] = " and ".join(filters) # Use default fields if not specified if fields: params["fields"] = ",".join(fields) else: params["fields"] = ",".join(self._default_fields) # Pagination if limit is not None: params["limit"] = limit if offset is not None: params["offset"] = offset response = self._client._request("GET", self._endpoint, params=params) if response is None: return [] if not isinstance(response, list): response = [response] return [self._to_model(item) for item in response if item]
[docs] def get( # type: ignore[override] self, key: int | None = None, *, fields: builtins.list[str] | None = None, ) -> VmImportLog: """Get a single VM import log entry by key. Args: key: Log entry $key (row ID). fields: List of fields to return. Returns: VmImportLog object. Raises: NotFoundError: If log entry not found. ValueError: If key not provided. Example: >>> log = client.vm_import_logs.get(1) """ if key is not None: params: dict[str, Any] = {} if fields: params["fields"] = ",".join(fields) else: params["fields"] = ",".join(self._default_fields) response = self._client._request("GET", f"{self._endpoint}/{key}", params=params) if response is None: raise NotFoundError(f"VM import log with key {key} not found") if not isinstance(response, dict): raise NotFoundError(f"VM import log with key {key} returned invalid response") return self._to_model(response) raise ValueError("Key must be provided")
[docs] def list_errors(self) -> builtins.list[VmImportLog]: """List only error and critical log entries. Returns: List of error VmImportLog objects. Example: >>> errors = client.vm_imports.logs(imp.key).list_errors() """ return self.list(filter="(level eq 'error') or (level eq 'critical')")
[docs] def list_warnings(self) -> builtins.list[VmImportLog]: """List only warning log entries. Returns: List of warning VmImportLog objects. Example: >>> warnings = client.vm_imports.logs(imp.key).list_warnings() """ return self.list(level="warning")
def _to_model(self, data: dict[str, Any]) -> VmImportLog: """Convert API response to VmImportLog object.""" return VmImportLog(data, self)