Source code for pyvergeos.resources.nas_cifs

"""NAS CIFS/SMB share resources."""

from __future__ import annotations

import builtins
from typing import TYPE_CHECKING, Any

from pyvergeos.exceptions import NotFoundError
from pyvergeos.filters import build_filter
from pyvergeos.resources.base import ResourceManager, ResourceObject

if TYPE_CHECKING:
    from pyvergeos.client import VergeClient


[docs] class NASCIFSShare(ResourceObject): """NAS CIFS/SMB share resource object. Represents a CIFS (SMB) file share on a NAS volume. Note: Share keys are 40-character hex strings, not integers like most other VergeOS resources. Attributes: key: The share unique identifier ($key) - 40-char hex string. id: The share ID (same as $key). name: Share name. description: Share description. volume_key: Parent volume key. volume_name: Parent volume name. share_path: Path within the volume being shared. comment: Short comment (visible to clients). enabled: Whether the share is enabled. browseable: Whether the share is visible in network browsing. read_only: Whether the share is read-only. guest_ok: Whether guest access is allowed. guest_only: Only guest connections are permitted. force_user: All file operations performed as this user. force_group: Default primary group for connecting users. valid_users: List of users allowed to connect. valid_groups: List of groups allowed to connect. admin_users: List of users with admin privileges. admin_groups: List of groups with admin privileges. allowed_hosts: List of allowed hosts. denied_hosts: List of denied hosts. shadow_copy_enabled: Whether shadow copy (Previous Versions) is enabled. created: Creation timestamp. modified: Last modified timestamp. """ @property def key(self) -> str: # type: ignore[override] """Resource primary key ($key) - 40-character hex string. Raises: ValueError: If resource has no $key (not yet persisted). """ k = self.get("$key") or self.get("id") if k is None: raise ValueError("Resource has no $key - may not be persisted") return str(k)
[docs] def refresh(self) -> NASCIFSShare: """Refresh resource data from API. Returns: Updated NASCIFSShare object. """ from typing import cast manager = cast("NASCIFSShareManager", self._manager) return manager.get(self.key)
[docs] def save(self, **kwargs: Any) -> NASCIFSShare: """Save changes to resource. Args: **kwargs: Fields to update. Returns: Updated NASCIFSShare object. """ from typing import cast manager = cast("NASCIFSShareManager", self._manager) return manager.update(self.key, **kwargs)
[docs] def delete(self) -> None: """Delete this share.""" from typing import cast manager = cast("NASCIFSShareManager", self._manager) manager.delete(self.key)
@property def volume_key(self) -> str | None: """Get the parent volume key (40-char hex string).""" vol = self.get("volume") return str(vol) if vol is not None else None @property def volume_name(self) -> str | None: """Get the parent volume name.""" return self.get("volume_name") or self.get("volume_display") @property def is_enabled(self) -> bool: """Check if the share is enabled.""" return bool(self.get("enabled", False)) @property def is_read_only(self) -> bool: """Check if the share is read-only.""" return bool(self.get("read_only", False)) @property def allows_guests(self) -> bool: """Check if guest access is allowed.""" return bool(self.get("guest_ok", False)) @property def shadow_copy_enabled(self) -> bool: """Check if shadow copy is enabled.""" return bool(self.get("vfs_shadow_copy2", False))
[docs] class NASCIFSShareManager(ResourceManager["NASCIFSShare"]): """Manager for NAS CIFS/SMB share operations. CIFS shares provide Windows-compatible file sharing (SMB protocol). Example: >>> # List all CIFS shares >>> for share in client.cifs_shares.list(): ... print(f"{share.name} on {share.volume_name}") >>> # Get shares for a specific volume >>> shares = client.cifs_shares.list(volume="FileShare") >>> # Create a share >>> share = client.cifs_shares.create( ... name="shared", ... volume="FileShare", ... guest_ok=True ... ) """ _endpoint = "volume_cifs_shares" # Default fields for list operations _default_fields = [ "$key", "id", "name", "description", "enabled", "created", "modified", "share_path", "comment", "browseable", "read_only", "guest_ok", "guest_only", "force_user", "force_group", "valid_users", "valid_groups", "admin_users", "admin_groups", "host_allow", "host_deny", "vfs_shadow_copy2", "volume", "volume#$display as volume_display", "volume#name as volume_name", "status#status as status", "status#state as state", ]
[docs] def __init__(self, client: VergeClient) -> None: super().__init__(client)
[docs] def list( self, filter: str | None = None, fields: builtins.list[str] | None = None, limit: int | None = None, offset: int | None = None, volume: str | int | None = None, enabled: bool | None = None, **filter_kwargs: Any, ) -> builtins.list[NASCIFSShare]: """List CIFS shares with optional filtering. Args: filter: OData filter string. fields: List of fields to return (uses defaults if not specified). limit: Maximum number of results. offset: Skip this many results. volume: Filter by volume (key or name). enabled: Filter by enabled state. **filter_kwargs: Shorthand filter arguments (name, etc.). Returns: List of NASCIFSShare objects. Example: >>> # List all CIFS shares >>> shares = client.cifs_shares.list() >>> # List shares on a specific volume >>> shares = client.cifs_shares.list(volume="FileShare") >>> # List enabled shares only >>> shares = client.cifs_shares.list(enabled=True) """ params: dict[str, Any] = {} # Build filter filters: builtins.list[str] = [] if filter: filters.append(filter) if filter_kwargs: filters.append(build_filter(**filter_kwargs)) # Add volume filter if volume is not None: volume_key = self._resolve_volume_key(volume) if volume_key: # Volume keys are integers in the API for shares filters.append(f"volume eq {volume_key}") # Add enabled filter if enabled is not None: filters.append(f"enabled eq {1 if enabled else 0}") if filters: params["filter"] = " and ".join(filters) # Use default fields if not specified if fields: params["fields"] = ",".join(fields) else: params["fields"] = ",".join(self._default_fields) # Pagination if limit is not None: params["limit"] = limit if offset is not None: params["offset"] = offset response = self._client._request("GET", self._endpoint, params=params) if response is None: return [] if not isinstance(response, list): response = [response] return [self._to_model(item) for item in response if item]
[docs] def get( # type: ignore[override] self, key: str | None = None, *, name: str | None = None, volume: str | int | None = None, fields: builtins.list[str] | None = None, ) -> NASCIFSShare: """Get a single CIFS share by key or name. Args: key: Share $key (40-character hex string). name: Share name (requires volume if not unique). volume: Volume key or name (helps disambiguate by name). fields: List of fields to return. Returns: NASCIFSShare object. Raises: NotFoundError: If share not found. ValueError: If no identifier provided. Example: >>> # Get by key >>> share = client.cifs_shares.get("abc123...") >>> # Get by name on a volume >>> share = client.cifs_shares.get(name="shared", volume="FileShare") """ if key is not None: # Fetch by key using id filter params: dict[str, Any] = { "filter": f"id eq '{key}'", } if fields: params["fields"] = ",".join(fields) else: params["fields"] = ",".join(self._default_fields) response = self._client._request("GET", self._endpoint, params=params) if response is None: raise NotFoundError(f"CIFS share with key {key} not found") if isinstance(response, list): if not response: raise NotFoundError(f"CIFS share with key {key} not found") response = response[0] if not isinstance(response, dict): raise NotFoundError(f"CIFS share with key {key} returned invalid response") return self._to_model(response) if name is not None: # Search by name escaped_name = name.replace("'", "''") filter_str = f"name eq '{escaped_name}'" # Add volume filter if specified if volume is not None: volume_key = self._resolve_volume_key(volume) if volume_key: filter_str += f" and volume eq {volume_key}" results = self.list(filter=filter_str, fields=fields, limit=1) if not results: raise NotFoundError(f"CIFS share with name '{name}' not found") return results[0] raise ValueError("Either key or name must be provided")
[docs] def create( # type: ignore[override] self, name: str, volume: str | int, *, share_path: str | None = None, description: str | None = None, comment: str | None = None, browseable: bool = True, read_only: bool = False, guest_ok: bool = False, guest_only: bool = False, force_user: str | None = None, force_group: str | None = None, valid_users: builtins.list[str] | None = None, valid_groups: builtins.list[str] | None = None, admin_users: builtins.list[str] | None = None, admin_groups: builtins.list[str] | None = None, allowed_hosts: builtins.list[str] | None = None, denied_hosts: builtins.list[str] | None = None, shadow_copy: bool = False, enabled: bool = True, ) -> NASCIFSShare: """Create a new CIFS share. Args: name: Share name (alphanumeric with underscores/hyphens). volume: Volume key or name to create the share on. share_path: Path within the volume to share (empty = entire volume). description: Share description. comment: Short comment visible to clients. browseable: Make visible in network browsing (default True). read_only: Create as read-only share. guest_ok: Allow guest access. guest_only: Only allow guest connections. force_user: All operations performed as this user. force_group: Default primary group for connecting users. valid_users: List of usernames allowed to connect. valid_groups: List of group names allowed to connect. admin_users: List of users with admin privileges. admin_groups: List of groups with admin privileges. allowed_hosts: List of allowed hosts (IPs, hostnames, subnets). denied_hosts: List of denied hosts. shadow_copy: Enable Previous Versions support. enabled: Enable the share (default True). Returns: Created NASCIFSShare object. Raises: ValueError: If volume not found. Example: >>> # Create a basic share >>> share = client.cifs_shares.create("shared", "FileShare") >>> # Create with guest access >>> share = client.cifs_shares.create( ... "public", "FileShare", ... share_path="/public", ... guest_ok=True ... ) >>> # Create with restricted access >>> share = client.cifs_shares.create( ... "secure", "FileShare", ... valid_users=["admin", "manager"] ... ) """ # Resolve volume to key volume_key = self._resolve_volume_key(volume) if volume_key is None: raise ValueError(f"Volume '{volume}' not found") # Build request body body: dict[str, Any] = { "volume": volume_key, "name": name, "enabled": enabled, "browseable": browseable, } if share_path: body["share_path"] = share_path if description: body["description"] = description if comment: body["comment"] = comment if read_only: body["read_only"] = True if guest_ok: body["guest_ok"] = True if guest_only: body["guest_only"] = True if force_user: body["force_user"] = force_user if force_group: body["force_group"] = force_group if valid_users: body["valid_users"] = "\n".join(valid_users) if valid_groups: body["valid_groups"] = "\n".join(valid_groups) if admin_users: body["admin_users"] = "\n".join(admin_users) if admin_groups: body["admin_groups"] = "\n".join(admin_groups) if allowed_hosts: body["host_allow"] = "\n".join(allowed_hosts) if denied_hosts: body["host_deny"] = "\n".join(denied_hosts) if shadow_copy: body["vfs_shadow_copy2"] = True response = self._client._request("POST", self._endpoint, json_data=body) # Get the created share if response and isinstance(response, dict): share_key = response.get("$key") or response.get("id") if share_key: return self.get(key=share_key) # Fallback: search by name and volume return self.get(name=name, volume=volume_key)
[docs] def update( # type: ignore[override] self, key: str, *, description: str | None = None, comment: str | None = None, enabled: bool | None = None, browseable: bool | None = None, read_only: bool | None = None, guest_ok: bool | None = None, guest_only: bool | None = None, force_user: str | None = None, force_group: str | None = None, valid_users: builtins.list[str] | None = None, valid_groups: builtins.list[str] | None = None, admin_users: builtins.list[str] | None = None, admin_groups: builtins.list[str] | None = None, allowed_hosts: builtins.list[str] | None = None, denied_hosts: builtins.list[str] | None = None, shadow_copy: bool | None = None, ) -> NASCIFSShare: """Update a CIFS share. Args: key: Share $key (40-character hex string). description: New description. comment: New comment. enabled: Enable or disable the share. browseable: Show or hide in network browsing. read_only: Set read-only or read-write. guest_ok: Allow or disallow guest access. guest_only: Restrict to guest-only access. force_user: Set force user (empty string to clear). force_group: Set force group (empty string to clear). valid_users: Set valid users (empty list to clear). valid_groups: Set valid groups (empty list to clear). admin_users: Set admin users (empty list to clear). admin_groups: Set admin groups (empty list to clear). allowed_hosts: Set allowed hosts (empty list to clear). denied_hosts: Set denied hosts (empty list to clear). shadow_copy: Enable or disable shadow copy. Returns: Updated NASCIFSShare object. Example: >>> # Make share read-only >>> client.cifs_shares.update(share.key, read_only=True) >>> # Update access controls >>> client.cifs_shares.update( ... share.key, ... valid_users=["admin", "manager", "backup"] ... ) """ body: dict[str, Any] = {} if description is not None: body["description"] = description if comment is not None: body["comment"] = comment if enabled is not None: body["enabled"] = enabled if browseable is not None: body["browseable"] = browseable if read_only is not None: body["read_only"] = read_only if guest_ok is not None: body["guest_ok"] = guest_ok if guest_only is not None: body["guest_only"] = guest_only if force_user is not None: body["force_user"] = force_user if force_group is not None: body["force_group"] = force_group if valid_users is not None: body["valid_users"] = "\n".join(valid_users) if valid_users else "" if valid_groups is not None: body["valid_groups"] = "\n".join(valid_groups) if valid_groups else "" if admin_users is not None: body["admin_users"] = "\n".join(admin_users) if admin_users else "" if admin_groups is not None: body["admin_groups"] = "\n".join(admin_groups) if admin_groups else "" if allowed_hosts is not None: body["host_allow"] = "\n".join(allowed_hosts) if allowed_hosts else "" if denied_hosts is not None: body["host_deny"] = "\n".join(denied_hosts) if denied_hosts else "" if shadow_copy is not None: body["vfs_shadow_copy2"] = shadow_copy if not body: return self.get(key) self._client._request("PUT", f"{self._endpoint}/{key}", json_data=body) return self.get(key)
[docs] def delete(self, key: str) -> None: # type: ignore[override] """Delete a CIFS share. This removes the share but does not delete the underlying data on the volume. Args: key: Share $key (40-character hex string). Example: >>> client.cifs_shares.delete(share.key) """ self._client._request("DELETE", f"{self._endpoint}/{key}")
[docs] def enable(self, key: str) -> NASCIFSShare: """Enable a CIFS share. Args: key: Share $key (40-character hex string). Returns: Updated NASCIFSShare object. """ return self.update(key, enabled=True)
[docs] def disable(self, key: str) -> NASCIFSShare: """Disable a CIFS share. Args: key: Share $key (40-character hex string). Returns: Updated NASCIFSShare object. """ return self.update(key, enabled=False)
def _resolve_volume_key(self, volume: str | int) -> int | None: """Resolve a volume identifier to its integer key. Args: volume: Volume key (hex string or int) or name. Returns: Volume key as integer, or None if not found. """ if isinstance(volume, int): return volume # Check if it looks like a volume key (40-char hex) if len(volume) == 40 and all(c in "0123456789abcdef" for c in volume.lower()): # Look up to get the row key for CIFS shares vol_response = self._client._request( "GET", "volumes", params={"filter": f"id eq '{volume}'", "fields": "$key,id,name", "limit": "1"}, ) if vol_response: if isinstance(vol_response, list): vol_response = vol_response[0] if vol_response else None if vol_response: return vol_response.get("$key") return None # Look up by name vol_response = self._client._request( "GET", "volumes", params={"filter": f"name eq '{volume}'", "fields": "$key,id,name", "limit": "1"}, ) if vol_response: if isinstance(vol_response, list): vol_response = vol_response[0] if vol_response else None if vol_response: return vol_response.get("$key") return None def _to_model(self, data: dict[str, Any]) -> NASCIFSShare: """Convert API response to NASCIFSShare object.""" return NASCIFSShare(data, self)