Source code for pyvergeos.resources.nas_volume_browser

"""NAS volume file browser resources."""

from __future__ import annotations

import builtins
import time
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Any

from pyvergeos.constants import DEFAULT_TIMEOUT, POLL_INTERVAL_FAST
from pyvergeos.exceptions import APIError, NotFoundError, VergeTimeoutError

if TYPE_CHECKING:
    from pyvergeos.client import VergeClient


[docs] class NASVolumeFile(dict[str, Any]): """NAS volume file/directory entry. Represents a file or directory within a NAS volume. This is a read-only data object returned by the volume browser. Attributes: name: File or directory name. type: Entry type ('file' or 'directory'). size: Size in bytes. date: Modification timestamp (Unix). n_name: Normalized name (lowercase). """
[docs] def __init__(self, data: dict[str, Any]) -> None: """Initialize with data dict.""" super().__init__(data)
[docs] def __getattr__(self, name: str) -> Any: """Allow attribute-style access to dict items.""" try: return self[name] except KeyError: raise AttributeError(f"'{type(self).__name__}' has no attribute '{name}'") from None
@property def name(self) -> str: """Get the file/directory name.""" return str(self.get("name", "")) @property def is_directory(self) -> bool: """Check if this entry is a directory.""" entry_type = self.get("type", "") return entry_type in ("directory", "d") @property def is_file(self) -> bool: """Check if this entry is a file.""" return not self.is_directory @property def size(self) -> int: """Get the size in bytes.""" return int(self.get("size", 0)) @property def size_display(self) -> str: """Get human-readable size.""" return _format_file_size(self.size) @property def modified(self) -> datetime | None: """Get the modification time as datetime.""" timestamp = self.get("date") if timestamp: return datetime.fromtimestamp(int(timestamp), tz=timezone.utc) return None @property def full_path(self) -> str: """Get the full path of this entry.""" return str(self.get("_full_path", f"/{self.name}")) @property def volume_key(self) -> str | None: """Get the volume key this entry belongs to.""" return self.get("_volume_key")
[docs] class NASVolumeFileManager: """Manager for browsing NAS volume files. This manager provides methods to browse files and directories within a NAS volume. It uses the asynchronous volume_browser API. Note: The NAS service VM must be running to browse volumes. The volume must be mounted (enabled). Example: >>> # Browse root directory >>> files = client.nas_volumes.files(vol.key).list() >>> for f in files: ... print(f"{f.name}: {f.size_display}") >>> # Browse a subdirectory >>> files = client.nas_volumes.files(vol.key).list("/documents") >>> # Get a specific file >>> file = client.nas_volumes.files(vol.key).get("/documents/report.pdf") """ _endpoint = "volume_browser"
[docs] def __init__( self, client: VergeClient, *, volume_key: str, volume_name: str | None = None ) -> None: """Initialize the file manager. Args: client: VergeClient instance. volume_key: Volume key (40-character hex string). volume_name: Optional volume name for display purposes. """ self._client = client self._volume_key = volume_key self._volume_name = volume_name
[docs] def list( self, path: str = "/", *, limit: int = 1000, offset: int | None = None, extensions: str = "", sort: str = "", timeout: int = DEFAULT_TIMEOUT, ) -> builtins.list[NASVolumeFile]: """List files and directories at the specified path. Args: path: Directory path to list. Use "/" for root. limit: Maximum number of entries to return (default 1000). offset: Pagination offset. extensions: Filter by file extensions (comma-separated). sort: Sort field. timeout: Maximum seconds to wait for results (default 30). Returns: List of NASVolumeFile objects. Raises: APIError: If the browse operation fails. VergeTimeoutError: If the operation times out. Example: >>> # List root directory >>> files = client.nas_volumes.files(vol.key).list() >>> # List a subdirectory >>> files = client.nas_volumes.files(vol.key).list("/documents") >>> # Filter by extension >>> pdfs = client.nas_volumes.files(vol.key).list("/documents", extensions="pdf") """ # Normalize path - API uses empty string for root, not "/" dir_path = path if dir_path == "/": dir_path = "" elif dir_path.startswith("/"): dir_path = dir_path[1:] # Create browse request body = { "volume": self._volume_key, "query": "get-dir", "params": { "dir": dir_path, "limit": limit, "offset": offset, "filter": {"extensions": extensions}, "volume": self._volume_key, "sort": sort, }, } # POST to create browse job response = self._client._request("POST", self._endpoint, json_data=body) if not response or not isinstance(response, dict): raise APIError("No response from volume browser") job_key = response.get("$key") or response.get("id") if not job_key: raise APIError("No job key returned from volume browser") # Poll for results result = self._poll_for_result(job_key, timeout=timeout) # Handle empty directory (result is null/None) if result is None: return [] # Parse result if it's a JSON string if isinstance(result, str): import json try: result = json.loads(result) except json.JSONDecodeError as e: raise APIError(f"Invalid result from volume browser: {result}") from e # Result can be an array directly or have an entries property entries: builtins.list[dict[str, Any]] if isinstance(result, list): entries = result elif isinstance(result, dict) and "entries" in result: entries = result["entries"] else: entries = [result] if result else [] # Convert to NASVolumeFile objects files: builtins.list[NASVolumeFile] = [] for entry in entries: if entry: # Add metadata entry["_volume_key"] = self._volume_key entry["_volume_name"] = self._volume_name # Build full path if path == "/" or path == "": entry["_full_path"] = f"/{entry.get('name', '')}" else: normalized_path = path if path.startswith("/") else f"/{path}" entry["_full_path"] = f"{normalized_path}/{entry.get('name', '')}" files.append(NASVolumeFile(entry)) return files
[docs] def get( self, path: str, *, timeout: int = DEFAULT_TIMEOUT, ) -> NASVolumeFile: """Get information about a specific file or directory. Args: path: Full path to the file or directory. timeout: Maximum seconds to wait for results. Returns: NASVolumeFile object. Raises: NotFoundError: If the file/directory is not found. APIError: If the browse operation fails. Example: >>> file = client.nas_volumes.files(vol.key).get("/documents/report.pdf") >>> print(f"{file.name}: {file.size_display}") """ # Split path into directory and name path = path.rstrip("/") if "/" in path: last_slash = path.rfind("/") dir_path = path[:last_slash] if last_slash > 0 else "/" file_name = path[last_slash + 1 :] else: dir_path = "/" file_name = path # List the directory files = self.list(dir_path, timeout=timeout) # Find the specific file/directory for f in files: if f.name == file_name: return f raise NotFoundError(f"File or directory not found: {path}")
def _poll_for_result( self, job_key: str, *, timeout: int = DEFAULT_TIMEOUT, poll_interval: float = POLL_INTERVAL_FAST, ) -> Any: """Poll for browse operation results. Args: job_key: The browse job key. timeout: Maximum seconds to wait. poll_interval: Seconds between polls. Returns: The result data (can be list, dict, or None for empty dirs). Raises: APIError: If the job fails. VergeTimeoutError: If timeout is exceeded. """ max_attempts = int(timeout / poll_interval) endpoint = f"{self._endpoint}/{job_key}" for _attempt in range(max_attempts): time.sleep(poll_interval) # Must explicitly request the result field - it's not returned by default response = self._client._request("GET", endpoint, params={"fields": "id,status,result"}) if not response or not isinstance(response, dict): continue status = response.get("status") if status == "complete": return response.get("result") elif status == "error": error_msg = response.get("result", "Unknown error") raise APIError(f"Browse operation failed: {error_msg}") raise VergeTimeoutError(f"Browse operation timed out after {timeout} seconds")
def _format_file_size(size_bytes: int) -> str: """Format file size in human-readable format. Args: size_bytes: Size in bytes. Returns: Human-readable size string (e.g., "1.5 MB"). """ if size_bytes == 0: return "0 B" units = ["B", "KB", "MB", "GB", "TB", "PB"] size = float(size_bytes) unit_index = 0 while size >= 1024 and unit_index < len(units) - 1: size /= 1024 unit_index += 1 if unit_index == 0: return f"{int(size)} B" return f"{size:.2f} {units[unit_index]}"