Source code for pyvergeos.resources.files

"""File management for VergeOS media catalog."""

from __future__ import annotations

import builtins
import contextlib
import logging
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Callable

from pyvergeos.constants import (
    CONTENT_TYPE_OCTET_STREAM,
    DEFAULT_TIMEOUT,
    GB,
    HEADER_CONTENT_TYPE,
    HTTP_NO_CONTENT,
    HTTP_SUCCESS_CODES,
    UPLOAD_CHUNK_SIZE,
    UPLOAD_CHUNK_TIMEOUT,
)
from pyvergeos.exceptions import NotFoundError, ValidationError
from pyvergeos.resources.base import ResourceManager, ResourceObject

logger = logging.getLogger(__name__)

# File type constants
FILE_TYPES = {
    "iso": "ISO",
    "img": "IMG (Raw Disk Image)",
    "qcow": "QCOW (Legacy QEMU)",
    "qcow2": "QCOW2 (QEMU, Xen)",
    "qed": "QED (KVM)",
    "raw": "Raw (Binary Disc Image)",
    "vdi": "VDI (VirtualBox)",
    "vhd": "VHD/VPC (Legacy Hyper-V)",
    "vhdx": "VHDX (Hyper-V)",
    "vmdk": "VMDK (VMware)",
    "ova": "OVA (VMware, VirtualBox)",
    "ovf": "OVF (VMware, VirtualBox)",
    "vmx": "VMX (VMware)",
    "ybvm": "Verge.io Virtual Machine",
    "nvram": "NVRAM",
    "zip": "ZIP",
}


[docs] class File(ResourceObject): """Represents a file in the VergeOS media catalog. Files can be ISO images for mounting to VM drives, disk images for import, OVA/OVF packages, or other media types. """ @property def name(self) -> str: """File name.""" return str(self.get("name", "")) @property def file_type(self) -> str: """File type (iso, qcow2, vmdk, etc.).""" return str(self.get("type", "unknown")) @property def type_display(self) -> str: """Human-readable file type.""" file_type = self.file_type return FILE_TYPES.get(file_type, file_type) @property def description(self) -> str: """File description.""" return str(self.get("description", "")) @property def size_bytes(self) -> int: """File size in bytes.""" return int(self.get("filesize") or 0) @property def size_gb(self) -> float: """File size in GB.""" return round(self.size_bytes / GB, 3) if self.size_bytes else 0.0 @property def allocated_bytes(self) -> int: """Allocated storage in bytes.""" return int(self.get("allocated_bytes") or 0) @property def allocated_gb(self) -> float: """Allocated storage in GB.""" return round(self.allocated_bytes / GB, 3) if self.allocated_bytes else 0.0 @property def used_bytes(self) -> int: """Used storage in bytes (actual on-disk size).""" return int(self.get("used_bytes") or 0) @property def used_gb(self) -> float: """Used storage in GB.""" return round(self.used_bytes / GB, 3) if self.used_bytes else 0.0 @property def preferred_tier(self) -> int | None: """Preferred storage tier (1-5).""" tier = self.get("preferred_tier") if tier is not None: return int(tier) return None @property def modified(self) -> datetime | None: """Last modification timestamp.""" ts = self.get("modified") if ts: return datetime.fromtimestamp(int(ts), tz=timezone.utc) return None @property def creator(self) -> str: """Username who created the file.""" return str(self.get("creator", "")) def __repr__(self) -> str: return f"<File key={self.get('$key', '?')} name={self.name!r} type={self.file_type}>"
[docs] class FileManager(ResourceManager[File]): """Manages files in the VergeOS media catalog. Files in the media catalog can be used as: - ISO images for VM CD-ROM drives - Disk images for VM drive import - OVA/OVF packages for VM import Example: >>> files = client.files.list(file_type="iso") >>> for f in files: ... print(f"{f.name}: {f.size_gb} GB") >>> # Upload a file >>> uploaded = client.files.upload("/path/to/image.iso", tier=1) >>> # Download a file >>> client.files.download(name="ubuntu.iso", destination="/tmp/") """ _endpoint = "files" def _to_model(self, data: dict[str, Any]) -> File: return File(data, self)
[docs] def list( # noqa: A003 self, filter: str | None = None, # noqa: A002 fields: builtins.list[str] | None = None, limit: int | None = None, offset: int | None = None, file_type: str | builtins.list[str] | None = None, **filter_kwargs: Any, ) -> builtins.list[File]: """List files in the media catalog. Args: filter: OData filter string. fields: List of fields to return. limit: Maximum number of results. offset: Skip this many results. file_type: Filter by file type(s) - "iso", "qcow2", "vmdk", etc. **filter_kwargs: Additional filter arguments. Returns: List of File objects. Example: >>> # List all ISO files >>> isos = client.files.list(file_type="iso") >>> # List importable disk images >>> images = client.files.list(file_type=["ova", "ovf", "vmdk", "qcow2"]) """ # Default fields for useful file information if fields is None: fields = [ "$key", "name", "type", "description", "filesize", "allocated_bytes", "used_bytes", "preferred_tier", "modified", "creator", ] results = super().list( filter=filter, fields=fields, limit=limit, offset=offset, **filter_kwargs ) # Apply file_type filter client-side (more flexible) if file_type: if isinstance(file_type, str): file_type = [file_type] results = [f for f in results if f.file_type in file_type] return results
[docs] def get( self, key: int | None = None, *, name: str | None = None, fields: builtins.list[str] | None = None, ) -> File: """Get a file by key or name. Args: key: File $key (ID). name: File name. fields: List of fields to return. Returns: File object. Raises: NotFoundError: If file not found. ValueError: If neither key nor name provided. """ return super().get(key, name=name, fields=fields)
[docs] def upload( self, path: str | Path, name: str | None = None, description: str | None = None, tier: int | None = None, progress_callback: Callable[[int, int], None] | None = None, ) -> File: """Upload a file to the media catalog. Args: path: Local path to the file to upload. name: Name for the file in VergeOS (defaults to local filename). description: Optional description. tier: Preferred storage tier (1-5). progress_callback: Optional callback(bytes_uploaded, total_bytes). Returns: Uploaded File object. Raises: FileNotFoundError: If local file doesn't exist. ValidationError: If upload fails. Example: >>> def show_progress(uploaded, total): ... pct = (uploaded / total) * 100 ... print(f"\\rUploading: {pct:.1f}%", end="") >>> client.files.upload("/path/to/image.iso", tier=1, progress_callback=show_progress) """ file_path = Path(path) if not file_path.exists(): raise FileNotFoundError(f"File not found: {path}") if not file_path.is_file(): raise ValidationError(f"Not a file: {path}") upload_name = name or file_path.name file_size = file_path.stat().st_size logger.info("Uploading '%s' (%d bytes) as '%s'", file_path.name, file_size, upload_name) # Get connection details connection = self._client._connection if connection is None: from pyvergeos.exceptions import NotConnectedError raise NotConnectedError("Not connected to VergeOS") session = connection._session if session is None: from pyvergeos.exceptions import NotConnectedError raise NotConnectedError("Session not initialized") # Step 1: Create file entry with POST create_body: dict[str, Any] = { "allocated_bytes": str(file_size), "name": upload_name, } if description: create_body["description"] = description if tier: create_body["preferred_tier"] = str(tier) url = f"{connection.api_base_url}/files" response = session.post(url, json=create_body, timeout=DEFAULT_TIMEOUT) if response.status_code not in HTTP_SUCCESS_CODES: raise ValidationError(f"Failed to create file entry: {response.text}") response_data = response.json() file_id = response_data.get("$key") if not file_id: # Try extracting from location location = response_data.get("location", "") if location: file_id = location.rstrip("/").split("/")[-1] if not file_id: raise ValidationError("Could not determine file ID from upload response") logger.debug("File entry created with ID: %s", file_id) # Step 2: Upload file in chunks using PUT try: with open(file_path, "rb") as f: offset = 0 while offset < file_size: chunk = f.read(UPLOAD_CHUNK_SIZE) if not chunk: break chunk_url = f"{url}/{file_id}?filepos={offset}" chunk_response = session.put( chunk_url, data=chunk, headers={HEADER_CONTENT_TYPE: CONTENT_TYPE_OCTET_STREAM}, timeout=UPLOAD_CHUNK_TIMEOUT, ) if ( chunk_response.status_code not in HTTP_SUCCESS_CODES and chunk_response.status_code != HTTP_NO_CONTENT ): raise ValidationError(f"Chunk upload failed: {chunk_response.text}") offset += len(chunk) if progress_callback: progress_callback(offset, file_size) logger.info("Upload completed: %s", upload_name) # Return the uploaded file return self.get(key=int(file_id)) except Exception as e: # Try to clean up the partial upload logger.error("Upload failed, attempting cleanup: %s", e) with contextlib.suppress(Exception): self.delete(int(file_id)) raise
[docs] def download( self, key: int | None = None, *, name: str | None = None, destination: str | Path = ".", filename: str | None = None, overwrite: bool = False, progress_callback: Callable[[int, int], None] | None = None, ) -> Path: """Download a file from the media catalog. Args: key: File $key (ID). name: File name (alternative to key). destination: Directory or full path for downloaded file. filename: Override the filename (defaults to file's name). overwrite: Whether to overwrite existing files. progress_callback: Optional callback(bytes_downloaded, total_bytes). Returns: Path to the downloaded file. Raises: NotFoundError: If file not found. FileExistsError: If destination exists and overwrite=False. ValueError: If neither key nor name provided. Example: >>> path = client.files.download(name="ubuntu.iso", destination="/tmp/") >>> print(f"Downloaded to: {path}") """ # Resolve file info file_obj = self.get(key=key, name=name) file_key = file_obj.key download_name = filename or file_obj.name # Determine output path dest_path = Path(destination) output_path = dest_path / download_name if dest_path.is_dir() else dest_path # Check if exists if output_path.exists() and not overwrite: raise FileExistsError(f"File already exists: {output_path}") # Ensure parent directory exists output_path.parent.mkdir(parents=True, exist_ok=True) # Get connection details connection = self._client._connection if connection is None: from pyvergeos.exceptions import NotConnectedError raise NotConnectedError("Not connected to VergeOS") session = connection._session if session is None: from pyvergeos.exceptions import NotConnectedError raise NotConnectedError("Session not initialized") # Build download URL encoded_name = download_name.replace(" ", "%20") download_url = ( f"{connection.api_base_url}/files/{file_key}?download=1&asname={encoded_name}" ) logger.info("Downloading '%s' to '%s'", download_name, output_path) # Stream download response = session.get(download_url, stream=True, timeout=DEFAULT_TIMEOUT) if response.status_code not in HTTP_SUCCESS_CODES: raise NotFoundError(f"Download failed: {response.text}") total_size = file_obj.size_bytes downloaded = 0 with open(output_path, "wb") as f: for chunk in response.iter_content(chunk_size=UPLOAD_CHUNK_SIZE): if chunk: f.write(chunk) downloaded += len(chunk) if progress_callback and total_size: progress_callback(downloaded, total_size) logger.info("Download completed: %s", output_path) return output_path
[docs] def delete(self, key: int) -> None: """Delete a file from the media catalog. Args: key: File $key (ID). Raises: NotFoundError: If file not found. ValidationError: If file is in use by VM drives. Note: Files that are referenced by VM drives cannot be deleted until the references are removed. """ super().delete(key) logger.info("Deleted file with key %d", key)