Source code for pyvergeos.resources.nas_volume_syncs

"""NAS volume sync resources."""

from __future__ import annotations

import builtins
from typing import TYPE_CHECKING, Any

from pyvergeos.exceptions import NotFoundError
from pyvergeos.filters import build_filter
from pyvergeos.resources.base import ResourceManager, ResourceObject

if TYPE_CHECKING:
    from pyvergeos.client import VergeClient


[docs] class NASVolumeSync(ResourceObject): """NAS volume sync resource object. Represents a volume synchronization job that copies data between NAS volumes. Attributes: key: The sync job unique identifier ($key). id: The sync job ID (same as key). name: Sync job name. description: Sync job description. enabled: Whether the sync is enabled. service: Parent NAS service key. source_volume: Source volume key. source_path: Starting directory in source volume. destination_volume: Destination volume key. destination_path: Destination directory path. include: File/directory patterns to include. exclude: File/directory patterns to exclude. sync_method: Sync method (rsync or ysync). destination_delete: How to handle deleted files. workers: Number of simultaneous workers. preserve_ACLs: Preserve access control lists. preserve_permissions: Preserve file permissions. preserve_owner: Preserve file owner. preserve_groups: Preserve file groups. preserve_mod_time: Preserve modification time. preserve_xattrs: Preserve extended attributes. copy_symlinks: Copy symbolic links. fsfreeze: Freeze filesystem before snapshot. status: Current sync status. syncing: Whether sync is currently running. files_transferred: Number of files transferred. bytes_transferred: Number of bytes transferred. transfer_rate: Current transfer rate. sync_errors: Number of sync errors. start_time: Sync start time. stop_time: Sync stop time. created: Creation timestamp. modified: Last modified timestamp. """ @property def key(self) -> str: # type: ignore[override] """Resource primary key ($key). Note: Volume sync keys are typically strings. Raises: ValueError: If resource has no $key (not yet persisted). """ k = self.get("$key") or self.get("id") if k is None: raise ValueError("Resource has no $key - may not be persisted") return str(k)
[docs] def refresh(self) -> NASVolumeSync: """Refresh resource data from API. Returns: Updated NASVolumeSync object. """ from typing import cast manager = cast("NASVolumeSyncManager", self._manager) return manager.get(self.key)
[docs] def save(self, **kwargs: Any) -> NASVolumeSync: """Save changes to resource. Args: **kwargs: Fields to update. Returns: Updated NASVolumeSync object. """ from typing import cast manager = cast("NASVolumeSyncManager", self._manager) return manager.update(self.key, **kwargs)
[docs] def delete(self) -> None: """Delete this sync job.""" from typing import cast manager = cast("NASVolumeSyncManager", self._manager) manager.delete(self.key)
[docs] def start(self) -> None: """Start this sync job.""" from typing import cast manager = cast("NASVolumeSyncManager", self._manager) manager.start(self.key)
[docs] def stop(self) -> None: """Stop this sync job.""" from typing import cast manager = cast("NASVolumeSyncManager", self._manager) manager.stop(self.key)
@property def is_syncing(self) -> bool: """Check if the sync is currently running.""" return bool(self.get("syncing", False)) @property def service_key(self) -> int | None: """Get the parent NAS service key.""" service = self.get("service") return int(service) if service is not None else None @property def source_volume_key(self) -> str | None: """Get the source volume key.""" vol = self.get("source_volume") return str(vol) if vol is not None else None @property def destination_volume_key(self) -> str | None: """Get the destination volume key.""" vol = self.get("destination_volume") return str(vol) if vol is not None else None @property def sync_method_display(self) -> str: """Get human-readable sync method.""" method = str(self.get("sync_method", "")) method_map = { "rsync": "rsync", "ysync": "Verge.io sync", } return method_map.get(method, method) @property def destination_delete_display(self) -> str: """Get human-readable destination delete mode.""" mode = str(self.get("destination_delete", "")) mode_map = { "never": "Never delete", "delete": "Delete files from destination", "delete-before": "Delete before transfer", "delete-during": "Delete during transfer", "delete-delay": "Delete after transfer (find during)", "delete-after": "Delete after transfer", } return mode_map.get(mode, mode) @property def status_display(self) -> str: """Get human-readable status.""" status = str(self.get("status", "")) status_map = { "complete": "Complete", "offline": "Offline", "syncing": "Syncing", "aborted": "Aborted", "error": "Error", "warning": "Warning", } return status_map.get(status, status)
[docs] class NASVolumeSyncManager(ResourceManager["NASVolumeSync"]): """Manager for NAS volume sync operations. Volume syncs copy data between NAS volumes on a schedule or on-demand. Example: >>> # List all volume syncs >>> for sync in client.volume_syncs.list(): ... print(f"{sync.name}: {sync.status_display}") >>> # Get a specific sync >>> sync = client.volume_syncs.get(name="DailyBackup") >>> # Create a sync job >>> sync = client.volume_syncs.create( ... name="DailyBackup", ... service=1, ... source_volume="8f73...", ... destination_volume="9a84...", ... ) >>> # Start a sync >>> client.volume_syncs.start(sync.key) >>> # Stop a running sync >>> client.volume_syncs.stop(sync.key) """ _endpoint = "volume_syncs" _actions_endpoint = "volume_sync_actions" # Default fields for list operations _default_fields = [ "$key", "id", "name", "description", "enabled", "created", "modified", "service", "service#name as service_name", "service#vm#$display as service_vm", "source_volume", "source_volume#name as source_volume_name", "source_path", "destination_volume", "destination_volume#name as destination_volume_name", "destination_path", "include", "exclude", "sync_method", "destination_delete", "workers", "preserve_ACLs", "preserve_permissions", "preserve_owner", "preserve_groups", "preserve_mod_time", "preserve_xattrs", "copy_symlinks", "fsfreeze", "progress#status as status", "progress#syncing as syncing", "progress#files_transferred as files_transferred", "progress#bytes_transferred as bytes_transferred", "progress#transfer_rate as transfer_rate", "progress#sync_errors as sync_errors", "progress#start_time as start_time", "progress#stop_time as stop_time", ]
[docs] def __init__(self, client: VergeClient) -> None: super().__init__(client)
[docs] def list( self, filter: str | None = None, fields: builtins.list[str] | None = None, limit: int | None = None, offset: int | None = None, service: int | str | None = None, enabled: bool | None = None, **filter_kwargs: Any, ) -> builtins.list[NASVolumeSync]: """List volume sync jobs with optional filtering. Args: filter: OData filter string. fields: List of fields to return (uses defaults if not specified). limit: Maximum number of results. offset: Skip this many results. service: Filter by NAS service (key or name). enabled: Filter by enabled state. **filter_kwargs: Shorthand filter arguments (name, etc.). Returns: List of NASVolumeSync objects. Example: >>> # List all syncs >>> syncs = client.volume_syncs.list() >>> # List syncs for a specific NAS service >>> syncs = client.volume_syncs.list(service="NAS01") >>> # List enabled syncs only >>> syncs = client.volume_syncs.list(enabled=True) >>> # Filter by name >>> syncs = client.volume_syncs.list(name="DailyBackup") """ params: dict[str, Any] = {} # Build filter filters: builtins.list[str] = [] if filter: filters.append(filter) if filter_kwargs: filters.append(build_filter(**filter_kwargs)) # Add service filter if service is not None: if isinstance(service, int): filters.append(f"service eq {service}") elif isinstance(service, str): # Look up service by name svc_response = self._client._request( "GET", "vm_services", params={"filter": f"name eq '{service}'", "fields": "$key", "limit": "1"}, ) if svc_response: if isinstance(svc_response, list): svc_response = svc_response[0] if svc_response else None if svc_response: filters.append(f"service eq {svc_response.get('$key')}") # Add enabled filter if enabled is not None: filters.append(f"enabled eq {1 if enabled else 0}") if filters: params["filter"] = " and ".join(filters) # Use default fields if not specified if fields: params["fields"] = ",".join(fields) else: params["fields"] = ",".join(self._default_fields) # Pagination if limit is not None: params["limit"] = limit if offset is not None: params["offset"] = offset response = self._client._request("GET", self._endpoint, params=params) if response is None: return [] if not isinstance(response, list): response = [response] return [self._to_model(item) for item in response if item]
[docs] def get( # type: ignore[override] self, key: str | None = None, *, name: str | None = None, service: int | str | None = None, fields: builtins.list[str] | None = None, ) -> NASVolumeSync: """Get a single volume sync by key or name. Args: key: Sync job key (ID). name: Sync job name. service: NAS service (key or name) to narrow name search. fields: List of fields to return. Returns: NASVolumeSync object. Raises: NotFoundError: If sync not found. ValueError: If no identifier provided. Example: >>> # Get by key >>> sync = client.volume_syncs.get("abc123") >>> # Get by name >>> sync = client.volume_syncs.get(name="DailyBackup") >>> # Get by name within specific NAS service >>> sync = client.volume_syncs.get(name="DailyBackup", service="NAS01") """ if key is not None: # Fetch by key using id filter params: dict[str, Any] = { "filter": f"id eq '{key}'", } if fields: params["fields"] = ",".join(fields) else: params["fields"] = ",".join(self._default_fields) response = self._client._request("GET", self._endpoint, params=params) if response is None: raise NotFoundError(f"Volume sync with key {key} not found") if isinstance(response, list): if not response: raise NotFoundError(f"Volume sync with key {key} not found") response = response[0] if not isinstance(response, dict): raise NotFoundError(f"Volume sync with key {key} returned invalid response") return self._to_model(response) if name is not None: # Search by name escaped_name = name.replace("'", "''") results = self.list( filter=f"name eq '{escaped_name}'", service=service, fields=fields, limit=1, ) if not results: raise NotFoundError(f"Volume sync with name '{name}' not found") return results[0] raise ValueError("Either key or name must be provided")
[docs] def create( # type: ignore[override] self, name: str, service: int | str, source_volume: str, destination_volume: str, *, source_path: str | None = None, destination_path: str | None = None, description: str | None = None, include: builtins.list[str] | None = None, exclude: builtins.list[str] | None = None, sync_method: str = "ysync", destination_delete: str = "never", workers: int = 4, preserve_acls: bool = True, preserve_permissions: bool = True, preserve_owner: bool = True, preserve_groups: bool = True, preserve_mod_time: bool = True, preserve_xattrs: bool = True, copy_symlinks: bool = True, freeze_filesystem: bool = False, enabled: bool = True, ) -> NASVolumeSync: """Create a new volume sync job. Args: name: Name for the sync job. service: NAS service (key or name) to create the sync on. source_volume: Source volume key (40-char hex string). destination_volume: Destination volume key (40-char hex string). source_path: Starting directory in source volume. destination_path: Destination directory path. description: Sync job description. include: List of file/directory patterns to include. exclude: List of file/directory patterns to exclude. sync_method: Sync method - "rsync" or "ysync" (default: "ysync"). destination_delete: How to handle deleted files. Valid values: "never", "delete", "delete-before", "delete-during", "delete-delay", "delete-after" workers: Number of simultaneous workers (1-128, default: 4). preserve_acls: Preserve access control lists (default: True). preserve_permissions: Preserve file permissions (default: True). preserve_owner: Preserve file owner (default: True). preserve_groups: Preserve file groups (default: True). preserve_mod_time: Preserve modification time (default: True). preserve_xattrs: Preserve extended attributes (default: True). copy_symlinks: Copy symbolic links (default: True). freeze_filesystem: Freeze filesystem before snapshot (default: False). enabled: Enable the sync job (default: True). Returns: Created NASVolumeSync object. Raises: ValueError: If NAS service not found. Example: >>> # Create a basic sync job >>> sync = client.volume_syncs.create( ... name="DailyBackup", ... service="NAS01", ... source_volume="8f73f8bcc9c9f1aaba32f733bfc295acaf548554", ... destination_volume="9a84e7add1d2c3b4a5e6f7890123456789abcdef", ... ) >>> # Create with options >>> sync = client.volume_syncs.create( ... name="SelectiveSync", ... service=1, ... source_volume=source_vol.key, ... destination_volume=dest_vol.key, ... include=["*.docx", "*.xlsx"], ... exclude=["temp/*"], ... workers=8, ... ) """ # Resolve NAS service to key service_key: int | None = None if isinstance(service, int): service_key = service elif isinstance(service, str): svc_response = self._client._request( "GET", "vm_services", params={"filter": f"name eq '{service}'", "fields": "$key,name", "limit": "1"}, ) if not svc_response: raise ValueError(f"NAS service '{service}' not found") if isinstance(svc_response, list): svc_response = svc_response[0] if svc_response else None if not svc_response: raise ValueError(f"NAS service '{service}' not found") service_key = svc_response.get("$key") if service_key is None: raise ValueError("Could not resolve NAS service key") # Map sync_method friendly names sync_method_map = { "rsync": "rsync", "ysync": "ysync", "vergesync": "ysync", "verge_sync": "ysync", } api_sync_method = sync_method_map.get(sync_method.lower(), sync_method) # Map destination_delete friendly names delete_map = { "never": "never", "delete": "delete", "delete-before": "delete-before", "delete_before": "delete-before", "deletebefore": "delete-before", "delete-during": "delete-during", "delete_during": "delete-during", "deleteduring": "delete-during", "delete-delay": "delete-delay", "delete_delay": "delete-delay", "deletedelay": "delete-delay", "delete-after": "delete-after", "delete_after": "delete-after", "deleteafter": "delete-after", } api_delete = delete_map.get(destination_delete.lower(), destination_delete) # Build request body body: dict[str, Any] = { "service": service_key, "name": name, "type": "volsync", "source_volume": source_volume, "destination_volume": destination_volume, "enabled": enabled, "sync_method": api_sync_method, "destination_delete": api_delete, "workers": workers, "preserve_ACLs": preserve_acls, "preserve_permissions": preserve_permissions, "preserve_owner": preserve_owner, "preserve_groups": preserve_groups, "preserve_mod_time": preserve_mod_time, "preserve_xattrs": preserve_xattrs, "copy_symlinks": copy_symlinks, "fsfreeze": freeze_filesystem, } if source_path is not None: body["source_path"] = source_path if destination_path is not None: body["destination_path"] = destination_path if description is not None: body["description"] = description if include: body["include"] = "\n".join(include) if exclude: body["exclude"] = "\n".join(exclude) response = self._client._request("POST", self._endpoint, json_data=body) # Get the created sync if response and isinstance(response, dict): sync_key = response.get("$key") or response.get("id") if sync_key: return self.get(key=sync_key) # Fallback: search by name return self.get(name=name, service=service_key)
[docs] def update( # type: ignore[override] self, key: str, *, description: str | None = None, source_path: str | None = None, destination_path: str | None = None, include: builtins.list[str] | None = None, exclude: builtins.list[str] | None = None, sync_method: str | None = None, destination_delete: str | None = None, workers: int | None = None, preserve_acls: bool | None = None, preserve_permissions: bool | None = None, preserve_owner: bool | None = None, preserve_groups: bool | None = None, preserve_mod_time: bool | None = None, preserve_xattrs: bool | None = None, copy_symlinks: bool | None = None, freeze_filesystem: bool | None = None, enabled: bool | None = None, ) -> NASVolumeSync: """Update a volume sync job. Args: key: Sync job key (ID). description: New description. source_path: New source path. destination_path: New destination path. include: New list of include patterns. exclude: New list of exclude patterns. sync_method: New sync method ("rsync" or "ysync"). destination_delete: New delete mode. workers: New number of workers (1-128). preserve_acls: Preserve access control lists. preserve_permissions: Preserve file permissions. preserve_owner: Preserve file owner. preserve_groups: Preserve file groups. preserve_mod_time: Preserve modification time. preserve_xattrs: Preserve extended attributes. copy_symlinks: Copy symbolic links. freeze_filesystem: Freeze filesystem before snapshot. enabled: Enable or disable the sync job. Returns: Updated NASVolumeSync object. Example: >>> # Update workers >>> client.volume_syncs.update(sync.key, workers=8) >>> # Disable a sync >>> client.volume_syncs.update(sync.key, enabled=False) """ body: dict[str, Any] = {} if description is not None: body["description"] = description if source_path is not None: body["source_path"] = source_path if destination_path is not None: body["destination_path"] = destination_path if include is not None: body["include"] = "\n".join(include) if include else "" if exclude is not None: body["exclude"] = "\n".join(exclude) if exclude else "" if sync_method is not None: sync_method_map = { "rsync": "rsync", "ysync": "ysync", "vergesync": "ysync", "verge_sync": "ysync", } body["sync_method"] = sync_method_map.get(sync_method.lower(), sync_method) if destination_delete is not None: delete_map = { "never": "never", "delete": "delete", "delete-before": "delete-before", "delete_before": "delete-before", "deletebefore": "delete-before", "delete-during": "delete-during", "delete_during": "delete-during", "deleteduring": "delete-during", "delete-delay": "delete-delay", "delete_delay": "delete-delay", "deletedelay": "delete-delay", "delete-after": "delete-after", "delete_after": "delete-after", "deleteafter": "delete-after", } body["destination_delete"] = delete_map.get( destination_delete.lower(), destination_delete ) if workers is not None: body["workers"] = workers if preserve_acls is not None: body["preserve_ACLs"] = preserve_acls if preserve_permissions is not None: body["preserve_permissions"] = preserve_permissions if preserve_owner is not None: body["preserve_owner"] = preserve_owner if preserve_groups is not None: body["preserve_groups"] = preserve_groups if preserve_mod_time is not None: body["preserve_mod_time"] = preserve_mod_time if preserve_xattrs is not None: body["preserve_xattrs"] = preserve_xattrs if copy_symlinks is not None: body["copy_symlinks"] = copy_symlinks if freeze_filesystem is not None: body["fsfreeze"] = freeze_filesystem if enabled is not None: body["enabled"] = enabled if not body: return self.get(key) self._client._request("PUT", f"{self._endpoint}/{key}", json_data=body) return self.get(key)
[docs] def delete(self, key: str) -> None: # type: ignore[override] """Delete a volume sync job. This operation cannot be undone. Running syncs should be stopped first. Args: key: Sync job key (ID). Example: >>> client.volume_syncs.delete(sync.key) """ self._client._request("DELETE", f"{self._endpoint}/{key}")
[docs] def enable(self, key: str) -> NASVolumeSync: """Enable a volume sync job. Args: key: Sync job key (ID). Returns: Updated NASVolumeSync object. Example: >>> client.volume_syncs.enable(sync.key) """ return self.update(key, enabled=True)
[docs] def disable(self, key: str) -> NASVolumeSync: """Disable a volume sync job. Args: key: Sync job key (ID). Returns: Updated NASVolumeSync object. Example: >>> client.volume_syncs.disable(sync.key) """ return self.update(key, enabled=False)
[docs] def start(self, key: str) -> None: """Start a volume sync job. Initiates the synchronization process to copy data from source to destination volume. Args: key: Sync job key (ID). Example: >>> client.volume_syncs.start(sync.key) """ body: dict[str, Any] = { "sync": key, "action": "start_sync", } self._client._request("POST", self._actions_endpoint, json_data=body)
[docs] def stop(self, key: str) -> None: """Stop a running volume sync job. Aborts the synchronization process at its current progress point. Args: key: Sync job key (ID). Example: >>> client.volume_syncs.stop(sync.key) """ body: dict[str, Any] = { "sync": key, "action": "stop_sync", } self._client._request("POST", self._actions_endpoint, json_data=body)
def _to_model(self, data: dict[str, Any]) -> NASVolumeSync: """Convert API response to NASVolumeSync object.""" return NASVolumeSync(data, self)