"""NAS volume and snapshot resources."""
from __future__ import annotations
import builtins
from typing import TYPE_CHECKING, Any
from pyvergeos.exceptions import NotFoundError
from pyvergeos.filters import build_filter
from pyvergeos.resources.base import ResourceManager, ResourceObject
if TYPE_CHECKING:
from pyvergeos.client import VergeClient
from pyvergeos.resources.nas_antivirus import VolumeAntivirusManager
from pyvergeos.resources.nas_volume_browser import NASVolumeFileManager
[docs]
class NASVolume(ResourceObject):
"""NAS volume resource object.
Represents a NAS volume (virtual filesystem) managed by a NAS service.
Note:
Volume keys are 40-character hex strings, not integers like most
other VergeOS resources.
Attributes:
key: The volume unique identifier ($key) - 40-char hex string.
id: The volume ID (same as $key).
name: Volume name.
description: Volume description.
enabled: Whether the volume is enabled.
max_size: Maximum size in bytes.
preferred_tier: Preferred storage tier (1-5).
fs_type: Filesystem type (ext4, cifs, nfs, ybfs, verge_vm_export).
read_only: Whether the volume is read-only.
discard: Whether discard is enabled for deleted files.
owner_user: Volume directory owner user.
owner_group: Volume directory owner group.
encrypt: Whether the volume is encrypted.
automount_snapshots: Whether snapshots are auto-mounted.
is_snapshot: Whether this is a snapshot volume.
service: Parent NAS service key.
snapshot_profile: Associated snapshot profile key.
created: Creation timestamp.
modified: Last modified timestamp.
"""
@property
def key(self) -> str: # type: ignore[override]
"""Resource primary key ($key) - 40-character hex string.
Raises:
ValueError: If resource has no $key (not yet persisted).
"""
k = self.get("$key")
if k is None:
raise ValueError("Resource has no $key - may not be persisted")
return str(k)
[docs]
def refresh(self) -> NASVolume:
"""Refresh resource data from API.
Returns:
Updated NASVolume object.
"""
from typing import cast
manager = cast("NASVolumeManager", self._manager)
return manager.get(self.key)
[docs]
def save(self, **kwargs: Any) -> NASVolume:
"""Save changes to resource.
Args:
**kwargs: Fields to update.
Returns:
Updated NASVolume object.
"""
from typing import cast
manager = cast("NASVolumeManager", self._manager)
return manager.update(self.key, **kwargs)
[docs]
def delete(self) -> None:
"""Delete this volume."""
from typing import cast
manager = cast("NASVolumeManager", self._manager)
manager.delete(self.key)
@property
def max_size_gb(self) -> float:
"""Get the maximum size in GB."""
max_size = self.get("maxsize", 0)
return round(max_size / 1073741824, 2) if max_size else 0
@property
def used_gb(self) -> float:
"""Get the used space in GB."""
used = self.get("used_bytes", 0)
return round(used / 1073741824, 2) if used else 0
@property
def allocated_gb(self) -> float:
"""Get the allocated space in GB."""
allocated = self.get("allocated_bytes", 0)
return round(allocated / 1073741824, 2) if allocated else 0
@property
def is_mounted(self) -> bool:
"""Check if the volume is mounted."""
return self.get("mounted", False) or self.get("mount_status") == "mounted"
@property
def service_key(self) -> int | None:
"""Get the parent NAS service key."""
service = self.get("service")
return int(service) if service is not None else None
@property
def files(self) -> NASVolumeFileManager:
"""Get a file manager for browsing this volume's files.
Returns:
NASVolumeFileManager scoped to this volume.
Example:
>>> # Browse volume files
>>> for f in volume.files.list():
... print(f"{f.name}: {f.size_display}")
>>> # Browse a subdirectory
>>> for f in volume.files.list("/documents"):
... print(f.name)
"""
from typing import cast
from pyvergeos.resources.nas_volume_browser import NASVolumeFileManager
manager = cast("NASVolumeManager", self._manager)
return NASVolumeFileManager(
manager._client,
volume_key=self.key,
volume_name=self.get("name"),
)
@property
def antivirus(self) -> VolumeAntivirusManager:
"""Get antivirus manager for this volume.
Returns:
VolumeAntivirusManager scoped to this volume.
Example:
>>> # Get antivirus configuration
>>> av = volume.antivirus.get()
>>> # Enable and start scan
>>> av.enable()
>>> av.start_scan()
>>> # Check status
>>> status = av.get_status()
>>> print(f"Status: {status.status}")
"""
from typing import cast
from pyvergeos.resources.nas_antivirus import VolumeAntivirusManager
manager = cast("NASVolumeManager", self._manager)
return VolumeAntivirusManager(manager._client, volume_key=self.key)
[docs]
class NASVolumeSnapshot(ResourceObject):
"""NAS volume snapshot resource object.
Represents a point-in-time snapshot of a NAS volume.
Attributes:
key: The snapshot unique identifier ($key).
name: Snapshot name.
description: Snapshot description.
volume: Parent volume key.
snap_volume: Mounted snapshot volume key (if mounted).
created: Creation timestamp.
expires: Expiration timestamp (0 for never expires).
expires_type: Expiration type (never, date).
enabled: Whether the snapshot is enabled.
created_manually: Whether created manually.
quiesce: Whether I/O was quiesced during creation.
"""
@property
def volume_key(self) -> str | None:
"""Get the parent volume key.
Note: NAS volume keys are 40-character hex strings, not integers.
"""
vol = self.get("volume")
return str(vol) if vol is not None else None
@property
def never_expires(self) -> bool:
"""Check if the snapshot never expires."""
return self.get("expires_type") == "never" or self.get("expires", 0) == 0
[docs]
class NASVolumeManager(ResourceManager["NASVolume"]):
"""Manager for NAS volume operations.
NAS volumes are virtual filesystems that can be shared via CIFS/SMB or NFS.
Example:
>>> # List all volumes
>>> for volume in client.nas_volumes.list():
... print(f"{volume.name}: {volume.max_size_gb}GB")
>>> # Get a specific volume
>>> vol = client.nas_volumes.get(name="FileShare")
>>> # Create a volume
>>> vol = client.nas_volumes.create(
... name="DataShare",
... service=1,
... size_gb=500,
... tier=1
... )
>>> # Create a snapshot
>>> snap = vol.snapshots.create("pre-update")
"""
_endpoint = "volumes"
# Default fields for list operations
_default_fields = [
"$key",
"id",
"name",
"description",
"enabled",
"created",
"modified",
"maxsize",
"preferred_tier",
"fs_type",
"read_only",
"discard",
"owner_user",
"owner_group",
"encrypt",
"automount_snapshots",
"is_snapshot",
"note",
"creator",
"service",
"service#$display as service_display",
"service#vm#$display as nas_vm_display",
"service#vm#machine#status#status as nas_status",
"snapshot_profile",
"snapshot_profile#$display as snapshot_profile_display",
"status#status as mount_status",
"status#mounted as mounted",
"drive",
"drive#media_source#used_bytes as used_bytes",
"drive#media_source#filesize as allocated_bytes",
]
[docs]
def __init__(self, client: VergeClient) -> None:
super().__init__(client)
[docs]
def list(
self,
filter: str | None = None,
fields: builtins.list[str] | None = None,
limit: int | None = None,
offset: int | None = None,
enabled: bool | None = None,
fs_type: str | None = None,
service: int | str | None = None,
**filter_kwargs: Any,
) -> builtins.list[NASVolume]:
"""List NAS volumes with optional filtering.
Args:
filter: OData filter string.
fields: List of fields to return (uses defaults if not specified).
limit: Maximum number of results.
offset: Skip this many results.
enabled: Filter by enabled state.
fs_type: Filter by filesystem type (ext4, cifs, nfs, ybfs, verge_vm_export).
service: Filter by NAS service (key or name).
**filter_kwargs: Shorthand filter arguments (name, etc.).
Returns:
List of NASVolume objects.
Example:
>>> # List all volumes
>>> volumes = client.nas_volumes.list()
>>> # List enabled volumes only
>>> enabled = client.nas_volumes.list(enabled=True)
>>> # Filter by filesystem type
>>> ext4_vols = client.nas_volumes.list(fs_type="ext4")
>>> # Filter by NAS service
>>> nas01_vols = client.nas_volumes.list(service="NAS01")
"""
params: dict[str, Any] = {}
# Build filter
filters: builtins.list[str] = []
if filter:
filters.append(filter)
if filter_kwargs:
filters.append(build_filter(**filter_kwargs))
# Add enabled filter
if enabled is not None:
filters.append(f"enabled eq {1 if enabled else 0}")
# Add fs_type filter
if fs_type:
filters.append(f"fs_type eq '{fs_type}'")
# Add service filter
if service is not None:
if isinstance(service, int):
filters.append(f"service eq {service}")
elif isinstance(service, str):
# Look up service by name
svc_response = self._client._request(
"GET",
"vm_services",
params={"filter": f"name eq '{service}'", "fields": "$key", "limit": "1"},
)
if svc_response:
if isinstance(svc_response, list):
svc_response = svc_response[0] if svc_response else None
if svc_response:
filters.append(f"service eq {svc_response.get('$key')}")
if filters:
params["filter"] = " and ".join(filters)
# Use default fields if not specified
if fields:
params["fields"] = ",".join(fields)
else:
params["fields"] = ",".join(self._default_fields)
# Pagination
if limit is not None:
params["limit"] = limit
if offset is not None:
params["offset"] = offset
response = self._client._request("GET", self._endpoint, params=params)
if response is None:
return []
if not isinstance(response, list):
response = [response]
return [self._to_model(item) for item in response if item]
[docs]
def get( # type: ignore[override]
self,
key: str | None = None,
*,
name: str | None = None,
fields: builtins.list[str] | None = None,
) -> NASVolume:
"""Get a single NAS volume by key or name.
Args:
key: Volume $key (40-character hex string).
name: Volume name.
fields: List of fields to return.
Returns:
NASVolume object.
Raises:
NotFoundError: If volume not found.
ValueError: If no identifier provided.
Example:
>>> # Get by key
>>> vol = client.nas_volumes.get("8f73f8bcc9c9f1aaba32f733bfc295acaf548554")
>>> # Get by name
>>> vol = client.nas_volumes.get(name="FileShare")
"""
if key is not None:
# Fetch by key using id filter (PowerShell pattern: id eq '$Key')
params: dict[str, Any] = {
"filter": f"id eq '{key}'",
}
if fields:
params["fields"] = ",".join(fields)
else:
params["fields"] = ",".join(self._default_fields)
response = self._client._request("GET", self._endpoint, params=params)
if response is None:
raise NotFoundError(f"NAS volume with key {key} not found")
if isinstance(response, list):
if not response:
raise NotFoundError(f"NAS volume with key {key} not found")
response = response[0]
if not isinstance(response, dict):
raise NotFoundError(f"NAS volume with key {key} returned invalid response")
return self._to_model(response)
if name is not None:
# Search by name
escaped_name = name.replace("'", "''")
results = self.list(filter=f"name eq '{escaped_name}'", fields=fields, limit=1)
if not results:
raise NotFoundError(f"NAS volume with name '{name}' not found")
return results[0]
raise ValueError("Either key or name must be provided")
[docs]
def create( # type: ignore[override]
self,
name: str,
service: int | str,
size_gb: int,
*,
tier: int | None = None,
description: str | None = None,
read_only: bool = False,
discard: bool = True,
owner_user: str | None = None,
owner_group: str | None = None,
snapshot_profile: int | None = None,
enabled: bool = True,
) -> NASVolume:
"""Create a new NAS volume.
Args:
name: Volume name (alphanumeric with underscores/hyphens only).
service: NAS service (key or name) to create the volume on.
size_gb: Maximum size in gigabytes (1-524288).
tier: Preferred storage tier (1-5).
description: Volume description.
read_only: Create as read-only volume.
discard: Enable automatic discard of deleted files (default True).
owner_user: Volume directory owner user.
owner_group: Volume directory owner group.
snapshot_profile: Snapshot profile key.
enabled: Enable the volume (default True).
Returns:
Created NASVolume object.
Raises:
ValueError: If NAS service not found.
Example:
>>> # Create a basic volume
>>> vol = client.nas_volumes.create("FileShare", "NAS01", 500)
>>> # Create with options
>>> vol = client.nas_volumes.create(
... "Archive",
... service="NAS01",
... size_gb=2000,
... tier=3,
... description="Archive storage"
... )
"""
# Resolve NAS service to key
service_key: int | None = None
if isinstance(service, int):
service_key = service
elif isinstance(service, str):
svc_response = self._client._request(
"GET",
"vm_services",
params={"filter": f"name eq '{service}'", "fields": "$key,name", "limit": "1"},
)
if not svc_response:
raise ValueError(f"NAS service '{service}' not found")
if isinstance(svc_response, list):
svc_response = svc_response[0] if svc_response else None
if not svc_response:
raise ValueError(f"NAS service '{service}' not found")
service_key = svc_response.get("$key")
if service_key is None:
raise ValueError("Could not resolve NAS service key")
# Build request body
body: dict[str, Any] = {
"name": name,
"service": service_key,
"maxsize": size_gb * 1073741824, # Convert GB to bytes
"enabled": enabled,
"discard": discard,
}
if tier is not None:
body["preferred_tier"] = str(tier)
if description is not None:
body["description"] = description
if read_only:
body["read_only"] = True
if owner_user is not None:
body["owner_user"] = owner_user
if owner_group is not None:
body["owner_group"] = owner_group
if snapshot_profile is not None:
body["snapshot_profile"] = snapshot_profile
response = self._client._request("POST", self._endpoint, json_data=body)
# Get the created volume
if response and isinstance(response, dict):
vol_key = response.get("$key") or response.get("id")
if vol_key:
return self.get(key=vol_key)
# Fallback: search by name
return self.get(name=name)
[docs]
def update( # type: ignore[override]
self,
key: str,
*,
description: str | None = None,
size_gb: int | None = None,
tier: int | None = None,
enabled: bool | None = None,
read_only: bool | None = None,
discard: bool | None = None,
owner_user: str | None = None,
owner_group: str | None = None,
snapshot_profile: int | None = None,
automount_snapshots: bool | None = None,
) -> NASVolume:
"""Update a NAS volume.
Args:
key: Volume $key (40-character hex string).
description: New description.
size_gb: New maximum size in gigabytes.
tier: New preferred storage tier (1-5).
enabled: Enable or disable the volume.
read_only: Set read-only or read-write.
discard: Enable or disable automatic discard.
owner_user: New owner user.
owner_group: New owner group.
snapshot_profile: New snapshot profile key (or None to remove).
automount_snapshots: Enable or disable auto-mount of snapshots.
Returns:
Updated NASVolume object.
Example:
>>> # Increase size
>>> client.nas_volumes.update(vol.key, size_gb=1000)
>>> # Change tier
>>> client.nas_volumes.update(vol.key, tier=3)
>>> # Disable volume
>>> client.nas_volumes.update(vol.key, enabled=False)
"""
body: dict[str, Any] = {}
if description is not None:
body["description"] = description
if size_gb is not None:
body["maxsize"] = size_gb * 1073741824
if tier is not None:
body["preferred_tier"] = str(tier)
if enabled is not None:
body["enabled"] = enabled
if read_only is not None:
body["read_only"] = read_only
if discard is not None:
body["discard"] = discard
if owner_user is not None:
body["owner_user"] = owner_user
if owner_group is not None:
body["owner_group"] = owner_group
if snapshot_profile is not None:
body["snapshot_profile"] = snapshot_profile
if automount_snapshots is not None:
body["automount_snapshots"] = automount_snapshots
if not body:
return self.get(key)
self._client._request("PUT", f"{self._endpoint}/{key}", json_data=body)
return self.get(key)
[docs]
def delete(self, key: str) -> None: # type: ignore[override]
"""Delete a NAS volume.
This operation is destructive and cannot be undone. All data on
the volume will be permanently deleted.
Args:
key: Volume $key (40-character hex string).
Raises:
NotFoundError: If volume not found.
Example:
>>> client.nas_volumes.delete(vol.key)
"""
self._client._request("DELETE", f"{self._endpoint}/{key}")
[docs]
def enable(self, key: str) -> NASVolume:
"""Enable a NAS volume.
Args:
key: Volume $key (40-character hex string).
Returns:
Updated NASVolume object.
Example:
>>> client.nas_volumes.enable(vol.key)
"""
return self.update(key, enabled=True)
[docs]
def disable(self, key: str) -> NASVolume:
"""Disable a NAS volume.
Args:
key: Volume $key (40-character hex string).
Returns:
Updated NASVolume object.
Example:
>>> client.nas_volumes.disable(vol.key)
"""
return self.update(key, enabled=False)
[docs]
def reset(self, key: str) -> dict[str, Any] | None:
"""Reset a NAS volume.
Resets the volume, which can help recover from error states.
Args:
key: Volume $key (40-character hex string).
Returns:
Task information dict or None.
Example:
>>> client.nas_volumes.reset(vol.key)
"""
result = self._client._request("PUT", f"{self._endpoint}/{key}?action=reset", json_data={})
if isinstance(result, dict):
return result
return None
[docs]
def snapshots(self, key: str) -> NASVolumeSnapshotManager:
"""Get a snapshot manager for a specific volume.
Args:
key: Volume $key (40-character hex string).
Returns:
NASVolumeSnapshotManager for the volume.
Example:
>>> # List snapshots for a volume
>>> for snap in client.nas_volumes.snapshots(vol.key).list():
... print(snap.name)
>>> # Create a snapshot
>>> snap = client.nas_volumes.snapshots(vol.key).create("pre-update")
"""
return NASVolumeSnapshotManager(self._client, volume_key=key)
[docs]
def files(self, key: str, *, name: str | None = None) -> NASVolumeFileManager:
"""Get a file manager for browsing a volume's files.
Args:
key: Volume $key (40-character hex string).
name: Optional volume name for display purposes.
Returns:
NASVolumeFileManager for browsing the volume.
Example:
>>> # Browse root directory
>>> for f in client.nas_volumes.files(vol.key).list():
... print(f"{f.name}: {f.size_display}")
>>> # Browse a subdirectory
>>> files = client.nas_volumes.files(vol.key).list("/documents")
>>> # Get a specific file
>>> file = client.nas_volumes.files(vol.key).get("/report.pdf")
"""
from pyvergeos.resources.nas_volume_browser import NASVolumeFileManager
return NASVolumeFileManager(self._client, volume_key=key, volume_name=name)
def _to_model(self, data: dict[str, Any]) -> NASVolume:
"""Convert API response to NASVolume object."""
return NASVolume(data, self)
[docs]
class NASVolumeSnapshotManager(ResourceManager["NASVolumeSnapshot"]):
"""Manager for NAS volume snapshot operations.
This manager can be used either standalone or scoped to a specific volume.
Example:
>>> # List all snapshots (standalone)
>>> for snap in client.nas_volume_snapshots.list():
... print(f"{snap.name} ({snap.volume_name})")
>>> # List snapshots for a specific volume
>>> for snap in client.nas_volumes.snapshots(vol.key).list():
... print(snap.name)
>>> # Create a snapshot
>>> snap = client.nas_volumes.snapshots(vol.key).create("pre-update")
"""
_endpoint = "volume_snapshots"
# Default fields for list operations
_default_fields = [
"$key",
"name",
"description",
"created",
"expires",
"expires_type",
"enabled",
"created_manually",
"quiesce",
"volume",
"volume#$display as volume_display",
"volume#name as volume_name",
"snap_volume",
]
[docs]
def __init__(self, client: VergeClient, *, volume_key: str | None = None) -> None:
super().__init__(client)
self._volume_key = volume_key
[docs]
def list(
self,
filter: str | None = None,
fields: builtins.list[str] | None = None,
limit: int | None = None,
offset: int | None = None,
volume: str | None = None,
**filter_kwargs: Any,
) -> builtins.list[NASVolumeSnapshot]:
"""List volume snapshots with optional filtering.
Args:
filter: OData filter string.
fields: List of fields to return (uses defaults if not specified).
limit: Maximum number of results.
offset: Skip this many results.
volume: Filter by volume (key or name). Ignored if manager is scoped.
**filter_kwargs: Shorthand filter arguments (name, etc.).
Returns:
List of NASVolumeSnapshot objects.
Example:
>>> # List all snapshots
>>> snapshots = client.nas_volume_snapshots.list()
>>> # List snapshots for a specific volume by name
>>> snapshots = client.nas_volume_snapshots.list(volume="FileShare")
>>> # List snapshots by name pattern
>>> snapshots = client.nas_volume_snapshots.list(name="Daily-*")
"""
params: dict[str, Any] = {}
# Build filter
filters: builtins.list[str] = []
if filter:
filters.append(filter)
if filter_kwargs:
filters.append(build_filter(**filter_kwargs))
# Add volume filter (from scope or parameter)
volume_key = self._volume_key
if volume_key is None and volume is not None:
# Check if it looks like a volume key (40-char hex) or a name
if len(volume) == 40 and all(c in "0123456789abcdef" for c in volume.lower()):
volume_key = volume
else:
# Look up volume by name
vol_response = self._client._request(
"GET",
"volumes",
params={"filter": f"name eq '{volume}'", "fields": "$key", "limit": "1"},
)
if vol_response:
if isinstance(vol_response, list):
vol_response = vol_response[0] if vol_response else None
if vol_response:
volume_key = vol_response.get("$key")
if volume_key is not None:
# Volume keys are strings (40-char hex)
filters.append(f"volume eq '{volume_key}'")
if filters:
params["filter"] = " and ".join(filters)
# Use default fields if not specified
if fields:
params["fields"] = ",".join(fields)
else:
params["fields"] = ",".join(self._default_fields)
# Pagination
if limit is not None:
params["limit"] = limit
if offset is not None:
params["offset"] = offset
response = self._client._request("GET", self._endpoint, params=params)
if response is None:
return []
if not isinstance(response, list):
response = [response]
return [self._to_model(item) for item in response if item]
[docs]
def get(
self,
key: int | None = None,
*,
name: str | None = None,
fields: builtins.list[str] | None = None,
) -> NASVolumeSnapshot:
"""Get a single volume snapshot by key or name.
Args:
key: Snapshot $key (row ID).
name: Snapshot name (requires scoped manager or unique name).
fields: List of fields to return.
Returns:
NASVolumeSnapshot object.
Raises:
NotFoundError: If snapshot not found.
ValueError: If no identifier provided.
Example:
>>> # Get by key
>>> snap = client.nas_volume_snapshots.get(1)
>>> # Get by name (scoped to volume)
>>> snap = client.nas_volumes.snapshots(1).get(name="pre-update")
"""
if key is not None:
# Fetch by key with default fields
params: dict[str, Any] = {}
if fields:
params["fields"] = ",".join(fields)
else:
params["fields"] = ",".join(self._default_fields)
response = self._client._request("GET", f"{self._endpoint}/{key}", params=params)
if response is None:
raise NotFoundError(f"Volume snapshot with key {key} not found")
if not isinstance(response, dict):
raise NotFoundError(f"Volume snapshot with key {key} returned invalid response")
return self._to_model(response)
if name is not None:
# Search by name
escaped_name = name.replace("'", "''")
results = self.list(filter=f"name eq '{escaped_name}'", fields=fields, limit=1)
if not results:
raise NotFoundError(f"Volume snapshot with name '{name}' not found")
return results[0]
raise ValueError("Either key or name must be provided")
[docs]
def create( # type: ignore[override]
self,
name: str,
*,
volume: str | None = None,
description: str | None = None,
expires_days: int = 3,
never_expires: bool = False,
quiesce: bool = False,
) -> NASVolumeSnapshot:
"""Create a new volume snapshot.
Args:
name: Snapshot name.
volume: Volume key (40-char hex string, required if manager is not scoped).
description: Snapshot description.
expires_days: Days until expiration (default 3).
never_expires: If True, snapshot never expires.
quiesce: If True, freeze I/O during snapshot for consistency.
Returns:
Created NASVolumeSnapshot object.
Raises:
ValueError: If volume not specified and manager not scoped.
Example:
>>> # Create with scoped manager
>>> snap = client.nas_volumes.snapshots(vol.key).create("pre-update")
>>> # Create with standalone manager
>>> snap = client.nas_volume_snapshots.create("backup", volume=vol.key)
>>> # Create a permanent quiesced snapshot
>>> snap = client.nas_volumes.snapshots(vol.key).create(
... "before-migration",
... never_expires=True,
... quiesce=True
... )
"""
volume_key = volume or self._volume_key
if volume_key is None:
raise ValueError("Volume key is required (use scoped manager or provide volume)")
import time as _time
body: dict[str, Any] = {
"volume": volume_key,
"name": name,
"created_manually": True,
}
if description is not None:
body["description"] = description
if never_expires:
body["expires_type"] = "never"
body["expires"] = 0
else:
body["expires_type"] = "date"
# Calculate expiration as Unix timestamp
body["expires"] = int(_time.time()) + (expires_days * 86400)
if quiesce:
body["quiesce"] = True
response = self._client._request("POST", self._endpoint, json_data=body)
# Get the created snapshot
if response and isinstance(response, dict):
snap_key = response.get("$key")
if snap_key:
return self.get(key=snap_key)
# Fallback: search by name
return self.get(name=name)
[docs]
def delete(self, key: int) -> None:
"""Delete a volume snapshot.
This operation cannot be undone.
Args:
key: Snapshot $key (ID).
Example:
>>> client.nas_volume_snapshots.delete(1)
"""
self._client._request("DELETE", f"{self._endpoint}/{key}")
def _to_model(self, data: dict[str, Any]) -> NASVolumeSnapshot:
"""Convert API response to NASVolumeSnapshot object."""
return NASVolumeSnapshot(data, self)