Source code for pyvergeos.resources.nas_nfs

"""NAS NFS share resources."""

from __future__ import annotations

import builtins
from typing import TYPE_CHECKING, Any

from pyvergeos.exceptions import NotFoundError
from pyvergeos.filters import build_filter
from pyvergeos.resources.base import ResourceManager, ResourceObject

if TYPE_CHECKING:
    from pyvergeos.client import VergeClient


[docs] class NASNFSShare(ResourceObject): """NAS NFS share resource object. Represents an NFS file share on a NAS volume. Note: Share keys are 40-character hex strings, not integers like most other VergeOS resources. Attributes: key: The share unique identifier ($key) - 40-char hex string. id: The share ID (same as $key). name: Share name. description: Share description. volume_key: Parent volume key. volume_name: Parent volume name. share_path: Path within the volume being shared. allowed_hosts: Comma-delimited list of allowed hosts. allow_all: Whether all hosts are allowed. data_access: Read-only (ro) or read-write (rw). squash: User/group squashing mode. filesystem_id: Filesystem ID for the export. anonymous_uid: User ID for anonymous/squashed users. anonymous_gid: Group ID for anonymous/squashed users. no_acl: Whether ACLs are disabled. insecure: Whether non-privileged ports are allowed. async_mode: Whether async mode is enabled. enabled: Whether the share is enabled. created: Creation timestamp. modified: Last modified timestamp. """ @property def key(self) -> str: # type: ignore[override] """Resource primary key ($key) - 40-character hex string. Raises: ValueError: If resource has no $key (not yet persisted). """ k = self.get("$key") or self.get("id") if k is None: raise ValueError("Resource has no $key - may not be persisted") return str(k)
[docs] def refresh(self) -> NASNFSShare: """Refresh resource data from API. Returns: Updated NASNFSShare object. """ from typing import cast manager = cast("NASNFSShareManager", self._manager) return manager.get(self.key)
[docs] def save(self, **kwargs: Any) -> NASNFSShare: """Save changes to resource. Args: **kwargs: Fields to update. Returns: Updated NASNFSShare object. """ from typing import cast manager = cast("NASNFSShareManager", self._manager) return manager.update(self.key, **kwargs)
[docs] def delete(self) -> None: """Delete this share.""" from typing import cast manager = cast("NASNFSShareManager", self._manager) manager.delete(self.key)
@property def volume_key(self) -> str | None: """Get the parent volume key (40-char hex string).""" vol = self.get("volume") return str(vol) if vol is not None else None @property def volume_name(self) -> str | None: """Get the parent volume name.""" return self.get("volume_name") or self.get("volume_display") @property def is_enabled(self) -> bool: """Check if the share is enabled.""" return bool(self.get("enabled", False)) @property def is_read_only(self) -> bool: """Check if the share is read-only.""" return self.get("data_access") == "ro" @property def allows_all_hosts(self) -> bool: """Check if all hosts are allowed.""" return bool(self.get("allow_all", False)) @property def squash_display(self) -> str: """Get human-readable squash mode.""" squash = str(self.get("squash", "root_squash")) squash_map = { "root_squash": "Squash Root", "all_squash": "Squash All", "no_root_squash": "No Squashing", } return squash_map.get(squash, squash) @property def data_access_display(self) -> str: """Get human-readable data access mode.""" access = str(self.get("data_access", "ro")) access_map = { "ro": "Read Only", "rw": "Read and Write", } return access_map.get(access, access)
[docs] class NASNFSShareManager(ResourceManager["NASNFSShare"]): """Manager for NAS NFS share operations. NFS shares provide Unix/Linux-compatible file sharing. Example: >>> # List all NFS shares >>> for share in client.nfs_shares.list(): ... print(f"{share.name} on {share.volume_name}") >>> # Get shares for a specific volume >>> shares = client.nfs_shares.list(volume="FileShare") >>> # Create a share >>> share = client.nfs_shares.create( ... name="exports", ... volume="FileShare", ... allowed_hosts="192.168.1.0/24" ... ) """ _endpoint = "volume_nfs_shares" # Default fields for list operations _default_fields = [ "$key", "id", "name", "description", "enabled", "created", "modified", "share_path", "allowed_hosts", "fsid", "anonuid", "anongid", "no_acl", "insecure", "async", "squash", "data_access", "allow_all", "volume", "volume#$display as volume_display", "volume#name as volume_name", "status#status as status", "status#state as state", ]
[docs] def __init__(self, client: VergeClient) -> None: super().__init__(client)
[docs] def list( self, filter: str | None = None, fields: builtins.list[str] | None = None, limit: int | None = None, offset: int | None = None, volume: str | int | None = None, enabled: bool | None = None, **filter_kwargs: Any, ) -> builtins.list[NASNFSShare]: """List NFS shares with optional filtering. Args: filter: OData filter string. fields: List of fields to return (uses defaults if not specified). limit: Maximum number of results. offset: Skip this many results. volume: Filter by volume (key or name). enabled: Filter by enabled state. **filter_kwargs: Shorthand filter arguments (name, etc.). Returns: List of NASNFSShare objects. Example: >>> # List all NFS shares >>> shares = client.nfs_shares.list() >>> # List shares on a specific volume >>> shares = client.nfs_shares.list(volume="FileShare") >>> # List enabled shares only >>> shares = client.nfs_shares.list(enabled=True) """ params: dict[str, Any] = {} # Build filter filters: builtins.list[str] = [] if filter: filters.append(filter) if filter_kwargs: filters.append(build_filter(**filter_kwargs)) # Add volume filter if volume is not None: volume_key = self._resolve_volume_key(volume) if volume_key: filters.append(f"volume eq {volume_key}") # Add enabled filter if enabled is not None: filters.append(f"enabled eq {1 if enabled else 0}") if filters: params["filter"] = " and ".join(filters) # Use default fields if not specified if fields: params["fields"] = ",".join(fields) else: params["fields"] = ",".join(self._default_fields) # Pagination if limit is not None: params["limit"] = limit if offset is not None: params["offset"] = offset response = self._client._request("GET", self._endpoint, params=params) if response is None: return [] if not isinstance(response, list): response = [response] return [self._to_model(item) for item in response if item]
[docs] def get( # type: ignore[override] self, key: str | None = None, *, name: str | None = None, volume: str | int | None = None, fields: builtins.list[str] | None = None, ) -> NASNFSShare: """Get a single NFS share by key or name. Args: key: Share $key (40-character hex string). name: Share name (requires volume if not unique). volume: Volume key or name (helps disambiguate by name). fields: List of fields to return. Returns: NASNFSShare object. Raises: NotFoundError: If share not found. ValueError: If no identifier provided. Example: >>> # Get by key >>> share = client.nfs_shares.get("abc123...") >>> # Get by name on a volume >>> share = client.nfs_shares.get(name="exports", volume="FileShare") """ if key is not None: # Fetch by key using id filter params: dict[str, Any] = { "filter": f"id eq '{key}'", } if fields: params["fields"] = ",".join(fields) else: params["fields"] = ",".join(self._default_fields) response = self._client._request("GET", self._endpoint, params=params) if response is None: raise NotFoundError(f"NFS share with key {key} not found") if isinstance(response, list): if not response: raise NotFoundError(f"NFS share with key {key} not found") response = response[0] if not isinstance(response, dict): raise NotFoundError(f"NFS share with key {key} returned invalid response") return self._to_model(response) if name is not None: # Search by name escaped_name = name.replace("'", "''") filter_str = f"name eq '{escaped_name}'" # Add volume filter if specified if volume is not None: volume_key = self._resolve_volume_key(volume) if volume_key: filter_str += f" and volume eq {volume_key}" results = self.list(filter=filter_str, fields=fields, limit=1) if not results: raise NotFoundError(f"NFS share with name '{name}' not found") return results[0] raise ValueError("Either key or name must be provided")
[docs] def create( # type: ignore[override] self, name: str, volume: str | int, *, share_path: str | None = None, description: str | None = None, allowed_hosts: str | None = None, allow_all: bool = False, data_access: str = "ro", squash: str = "root_squash", anonymous_uid: str | None = None, anonymous_gid: str | None = None, async_mode: bool = False, insecure: bool = False, no_acl: bool = False, filesystem_id: str | None = None, enabled: bool = True, ) -> NASNFSShare: """Create a new NFS share. Args: name: Share name (alphanumeric with underscores/hyphens). volume: Volume key or name to create the share on. share_path: Path within the volume to share (empty = entire volume). description: Share description. allowed_hosts: Comma-delimited list of allowed hosts (FQDNs, IPs, networks, NIS netgroups). Required unless allow_all is True. allow_all: Allow connections from any host. data_access: Data access mode - "ro" (read-only) or "rw" (read-write). squash: User/group squashing - "root_squash", "all_squash", "no_root_squash". anonymous_uid: User ID for anonymous/squashed users (default 65534). anonymous_gid: Group ID for anonymous/squashed users (default 65534). async_mode: Enable async mode for better performance (risk of data loss). insecure: Allow connections from non-privileged ports. no_acl: Disable access control lists. filesystem_id: Filesystem ID for the export (must be unique per volume). enabled: Enable the share (default True). Returns: Created NASNFSShare object. Raises: ValueError: If volume not found or if neither allowed_hosts nor allow_all is specified. Example: >>> # Create a share for a subnet >>> share = client.nfs_shares.create( ... "exports", "FileShare", ... allowed_hosts="192.168.1.0/24" ... ) >>> # Create a read-write share for specific hosts >>> share = client.nfs_shares.create( ... "data", "FileShare", ... allowed_hosts="10.0.0.5,10.0.0.6", ... data_access="rw", ... squash="no_root_squash" ... ) >>> # Create a share accessible from anywhere >>> share = client.nfs_shares.create( ... "public", "FileShare", ... allow_all=True, ... data_access="ro" ... ) """ # Validate hosts requirement if not allow_all and not allowed_hosts: raise ValueError("Either allowed_hosts or allow_all must be specified") # Resolve volume to key volume_key = self._resolve_volume_key(volume) if volume_key is None: raise ValueError(f"Volume '{volume}' not found") # Map friendly values to API values data_access_map = { "readonly": "ro", "read_only": "ro", "ro": "ro", "readwrite": "rw", "read_write": "rw", "rw": "rw", } squash_map = { "squashroot": "root_squash", "squash_root": "root_squash", "root_squash": "root_squash", "squashall": "all_squash", "squash_all": "all_squash", "all_squash": "all_squash", "nosquash": "no_root_squash", "no_squash": "no_root_squash", "no_root_squash": "no_root_squash", } # Build request body body: dict[str, Any] = { "volume": volume_key, "name": name, "enabled": enabled, "data_access": data_access_map.get(data_access.lower(), data_access), "squash": squash_map.get(squash.lower(), squash), } if share_path: body["share_path"] = share_path if description: body["description"] = description if allow_all: body["allow_all"] = True if allowed_hosts: body["allowed_hosts"] = allowed_hosts if anonymous_uid: body["anonuid"] = anonymous_uid if anonymous_gid: body["anongid"] = anonymous_gid if async_mode: body["async"] = True if insecure: body["insecure"] = True if no_acl: body["no_acl"] = True if filesystem_id: body["fsid"] = filesystem_id response = self._client._request("POST", self._endpoint, json_data=body) # Get the created share if response and isinstance(response, dict): share_key = response.get("$key") or response.get("id") if share_key: return self.get(key=share_key) # Fallback: search by name and volume return self.get(name=name, volume=volume_key)
[docs] def update( # type: ignore[override] self, key: str, *, description: str | None = None, allowed_hosts: str | None = None, allow_all: bool | None = None, data_access: str | None = None, squash: str | None = None, anonymous_uid: str | None = None, anonymous_gid: str | None = None, async_mode: bool | None = None, insecure: bool | None = None, no_acl: bool | None = None, filesystem_id: str | None = None, enabled: bool | None = None, ) -> NASNFSShare: """Update an NFS share. Args: key: Share $key (40-character hex string). description: New description. allowed_hosts: New allowed hosts list. allow_all: Allow all hosts. data_access: Data access mode - "ro" or "rw". squash: Squash mode - "root_squash", "all_squash", "no_root_squash". anonymous_uid: Anonymous user ID. anonymous_gid: Anonymous group ID. async_mode: Enable or disable async mode. insecure: Allow or disallow non-privileged ports. no_acl: Enable or disable ACL support. filesystem_id: New filesystem ID. enabled: Enable or disable the share. Returns: Updated NASNFSShare object. Example: >>> # Change to read-write >>> client.nfs_shares.update(share.key, data_access="rw") >>> # Update allowed hosts >>> client.nfs_shares.update( ... share.key, ... allowed_hosts="192.168.1.0/24,10.0.0.0/8" ... ) """ body: dict[str, Any] = {} if description is not None: body["description"] = description if allowed_hosts is not None: body["allowed_hosts"] = allowed_hosts if allow_all is not None: body["allow_all"] = allow_all if data_access is not None: data_access_map = { "readonly": "ro", "read_only": "ro", "ro": "ro", "readwrite": "rw", "read_write": "rw", "rw": "rw", } body["data_access"] = data_access_map.get(data_access.lower(), data_access) if squash is not None: squash_map = { "squashroot": "root_squash", "squash_root": "root_squash", "root_squash": "root_squash", "squashall": "all_squash", "squash_all": "all_squash", "all_squash": "all_squash", "nosquash": "no_root_squash", "no_squash": "no_root_squash", "no_root_squash": "no_root_squash", } body["squash"] = squash_map.get(squash.lower(), squash) if anonymous_uid is not None: body["anonuid"] = anonymous_uid if anonymous_gid is not None: body["anongid"] = anonymous_gid if async_mode is not None: body["async"] = async_mode if insecure is not None: body["insecure"] = insecure if no_acl is not None: body["no_acl"] = no_acl if filesystem_id is not None: body["fsid"] = filesystem_id if enabled is not None: body["enabled"] = enabled if not body: return self.get(key) self._client._request("PUT", f"{self._endpoint}/{key}", json_data=body) return self.get(key)
[docs] def delete(self, key: str) -> None: # type: ignore[override] """Delete an NFS share. This removes the share but does not delete the underlying data on the volume. Args: key: Share $key (40-character hex string). Example: >>> client.nfs_shares.delete(share.key) """ self._client._request("DELETE", f"{self._endpoint}/{key}")
[docs] def enable(self, key: str) -> NASNFSShare: """Enable an NFS share. Args: key: Share $key (40-character hex string). Returns: Updated NASNFSShare object. """ return self.update(key, enabled=True)
[docs] def disable(self, key: str) -> NASNFSShare: """Disable an NFS share. Args: key: Share $key (40-character hex string). Returns: Updated NASNFSShare object. """ return self.update(key, enabled=False)
def _resolve_volume_key(self, volume: str | int) -> int | None: """Resolve a volume identifier to its integer key. Args: volume: Volume key (hex string or int) or name. Returns: Volume key as integer, or None if not found. """ if isinstance(volume, int): return volume # Check if it looks like a volume key (40-char hex) if len(volume) == 40 and all(c in "0123456789abcdef" for c in volume.lower()): # Look up to get the row key for NFS shares vol_response = self._client._request( "GET", "volumes", params={"filter": f"id eq '{volume}'", "fields": "$key,id,name", "limit": "1"}, ) if vol_response: if isinstance(vol_response, list): vol_response = vol_response[0] if vol_response else None if vol_response: return vol_response.get("$key") return None # Look up by name vol_response = self._client._request( "GET", "volumes", params={"filter": f"name eq '{volume}'", "fields": "$key,id,name", "limit": "1"}, ) if vol_response: if isinstance(vol_response, list): vol_response = vol_response[0] if vol_response else None if vol_response: return vol_response.get("$key") return None def _to_model(self, data: dict[str, Any]) -> NASNFSShare: """Convert API response to NASNFSShare object.""" return NASNFSShare(data, self)