Source code for pyvergeos.resources.nas_services

"""NAS service management resources."""

from __future__ import annotations

import builtins
from typing import TYPE_CHECKING, Any

from pyvergeos.exceptions import NotFoundError
from pyvergeos.filters import build_filter
from pyvergeos.resources.base import ResourceManager, ResourceObject

if TYPE_CHECKING:
    from pyvergeos.client import VergeClient
    from pyvergeos.resources.nas_antivirus import NasServiceAntivirusManager


[docs] class NASService(ResourceObject): """NAS service resource object. Represents a NAS service VM that manages volumes and file shares. Attributes: key: The NAS service unique identifier ($key). name: NAS service name. vm: The underlying VM key. max_imports: Maximum simultaneous import jobs. max_syncs: Maximum simultaneous sync jobs. disable_swap: Whether swap is disabled. read_ahead_kb_default: Read-ahead buffer size in KB. cifs: CIFS settings key. nfs: NFS settings key. """ @property def is_running(self) -> bool: """Check if the NAS service VM is running.""" return self.get("vm_running", False) or self.get("vm_status") == "running" @property def vm_key(self) -> int | None: """Get the underlying VM key.""" vm = self.get("vm") return int(vm) if vm is not None else None @property def volume_count(self) -> int: """Get the number of volumes managed by this service.""" count = self.get("volume_count", 0) return int(count) if count is not None else 0 @property def antivirus(self) -> NasServiceAntivirusManager: """Get antivirus configuration manager for this NAS service. Returns: NasServiceAntivirusManager scoped to this service. Note: NAS service requires 8GB+ RAM for antivirus support. Example: >>> # Get service antivirus config >>> svc_av = nas.antivirus.get() >>> # Update settings >>> svc_av = nas.antivirus.update( ... key=svc_av.key, ... max_recursion=20 ... ) """ from typing import cast from pyvergeos.resources.nas_antivirus import NasServiceAntivirusManager manager = cast("NASServiceManager", self._manager) return NasServiceAntivirusManager(manager._client, service_key=self.key)
[docs] class CIFSSettings(ResourceObject): """CIFS/SMB settings resource object. Represents CIFS/SMB configuration for a NAS service. Attributes: key: The CIFS settings unique identifier ($key). service: The parent NAS service key. workgroup: NetBIOS workgroup name. realm: Kerberos realm for AD. server_type: Server role (default, MEMBER, BDC, PDC). map_to_guest: How invalid users/passwords are handled. server_min_protocol: Minimum SMB protocol version. extended_acl_support: Whether extended ACLs are enabled. ad_status: Active Directory join status. """ pass
[docs] class NFSSettings(ResourceObject): """NFS settings resource object. Represents NFS configuration for a NAS service. Attributes: key: The NFS settings unique identifier ($key). service: The parent NAS service key. enable_nfsv4: Whether NFSv4 is enabled. allowed_hosts: List of allowed hosts/networks. allow_all: Whether all hosts are allowed. squash: User/group squashing mode. data_access: Read-only or read-write. anonuid: Anonymous user ID. anongid: Anonymous group ID. """ pass
[docs] class NASServiceManager(ResourceManager[NASService]): """Manager for NAS service operations. NAS services are specialized VMs that manage NAS volumes and file shares (CIFS/SMB and NFS). Example: >>> # List all NAS services >>> for service in client.nas_services.list(): ... print(f"{service.name}: {service.volume_count} volumes") >>> # Get a specific NAS service >>> nas = client.nas_services.get(name="NAS01") >>> # Get CIFS settings >>> cifs = client.nas_services.get_cifs_settings(nas.key) >>> print(f"Workgroup: {cifs.workgroup}") >>> # Update NFS settings >>> client.nas_services.set_nfs_settings(nas.key, enable_nfsv4=True) """ _endpoint = "vm_services" # Default fields for list operations _default_fields = [ "$key", "name", "vm", "vm#name as vm_name", "vm#$display as vm_display", "vm#description as vm_description", "vm#machine#status#status as vm_status", "vm#machine#status#running as vm_running", "vm#machine#cores as vm_cores", "vm#machine#ram as vm_ram", "vm#created as created", "vm#modified as modified", "max_imports", "max_syncs", "disable_swap", "read_ahead_kb_default", "cifs", "nfs", "count(volumes) as volume_count", ]
[docs] def __init__(self, client: VergeClient) -> None: super().__init__(client)
[docs] def list( self, filter: str | None = None, fields: builtins.list[str] | None = None, limit: int | None = None, offset: int | None = None, status: str | None = None, **filter_kwargs: Any, ) -> builtins.list[NASService]: """List NAS services with optional filtering. Args: filter: OData filter string. fields: List of fields to return (uses defaults if not specified). limit: Maximum number of results. offset: Skip this many results. status: Filter by VM status (running, stopped, etc.). **filter_kwargs: Shorthand filter arguments (name, etc.). Returns: List of NASService objects. Example: >>> # List all NAS services >>> services = client.nas_services.list() >>> # List running services only >>> running = client.nas_services.list(status="running") >>> # Filter by name >>> nas01 = client.nas_services.list(name="NAS01") """ params: dict[str, Any] = {} # Build filter filters: builtins.list[str] = [] if filter: filters.append(filter) if filter_kwargs: filters.append(build_filter(**filter_kwargs)) if filters: params["filter"] = " and ".join(filters) # Use default fields if not specified if fields: params["fields"] = ",".join(fields) else: params["fields"] = ",".join(self._default_fields) # Pagination if limit is not None: params["limit"] = limit if offset is not None: params["offset"] = offset response = self._client._request("GET", self._endpoint, params=params) if response is None: return [] if not isinstance(response, list): response = [response] results = [self._to_model(item) for item in response if item] # Post-filter by status if specified if status: status_lower = status.lower() results = [s for s in results if s.get("vm_status") == status_lower] return results
[docs] def list_running(self) -> builtins.list[NASService]: """List all running NAS services. Returns: List of running NASService objects. """ return self.list(status="running")
[docs] def list_stopped(self) -> builtins.list[NASService]: """List all stopped NAS services. Returns: List of stopped NASService objects. """ return self.list(status="stopped")
[docs] def get( self, key: int | None = None, *, name: str | None = None, fields: builtins.list[str] | None = None, ) -> NASService: """Get a single NAS service by key or name. Args: key: NAS service $key (ID). name: NAS service name. fields: List of fields to return. Returns: NASService object. Raises: NotFoundError: If NAS service not found. ValueError: If neither key nor name provided. Example: >>> # Get by key >>> service = client.nas_services.get(1) >>> # Get by name >>> service = client.nas_services.get(name="NAS01") """ if key is not None: # Fetch by key with default fields params: dict[str, Any] = {} if fields: params["fields"] = ",".join(fields) else: params["fields"] = ",".join(self._default_fields) response = self._client._request("GET", f"{self._endpoint}/{key}", params=params) if response is None: raise NotFoundError(f"NAS service with key {key} not found") if not isinstance(response, dict): raise NotFoundError(f"NAS service with key {key} returned invalid response") return self._to_model(response) if name is not None: # Search by name escaped_name = name.replace("'", "''") results = self.list(filter=f"name eq '{escaped_name}'", fields=fields, limit=1) if not results: raise NotFoundError(f"NAS service with name '{name}' not found") return results[0] raise ValueError("Either key or name must be provided")
[docs] def create( # type: ignore[override] self, name: str, *, hostname: str | None = None, network: int | str | None = None, cores: int = 4, memory_gb: int = 8, auto_update: bool = False, ) -> NASService: """Create a new NAS service by deploying the Services recipe. Args: name: Name for the new NAS service. hostname: Hostname for the NAS VM (defaults to name with invalid chars removed). network: Network to connect to (key or name). Defaults to 'Internal'. cores: Number of CPU cores (default: 4). memory_gb: Amount of RAM in GB (default: 8). auto_update: Enable auto-update on power off. Returns: Created NASService object. Raises: ValueError: If Services recipe not found or NAS service already exists. Example: >>> # Create with defaults >>> nas = client.nas_services.create("NAS01") >>> # Create with custom settings >>> nas = client.nas_services.create( ... "FileServer", ... network="Internal", ... cores=8, ... memory_gb=16 ... ) """ # Find the Services recipe recipe_response = self._client._request( "GET", "vm_recipes", params={"filter": "name eq 'Services'", "fields": "id,name,description,version"}, ) if not recipe_response: raise ValueError("Services recipe not found. Ensure the Services recipe is available.") if isinstance(recipe_response, list): recipe = recipe_response[0] if recipe_response else None else: recipe = recipe_response if not recipe: raise ValueError("Services recipe not found") recipe_id = recipe.get("id") # Check if NAS service with this name already exists try: existing = self.get(name=name) if existing: raise ValueError(f"A NAS service with name '{name}' already exists") except NotFoundError: pass # Good, doesn't exist # Determine hostname if not hostname: import re hostname = re.sub(r"[^a-zA-Z0-9\-]", "", name) hostname = hostname.strip("-") if len(hostname) > 63: hostname = hostname[:63] if not hostname: hostname = "nas" # Resolve network network_key: int | None = None if network is not None: if isinstance(network, int): network_key = network elif isinstance(network, str): # Look up network by name net_response = self._client._request( "GET", "vnets", params={"filter": f"name eq '{network}'", "fields": "$key,name", "limit": "1"}, ) if net_response: if isinstance(net_response, list): net_response = net_response[0] if net_response else None if net_response: network_key = net_response.get("$key") if network_key is None: raise ValueError(f"Network '{network}' not found") else: # Try to find 'Internal' network net_response = self._client._request( "GET", "vnets", params={"filter": "name eq 'Internal'", "fields": "$key,name", "limit": "1"}, ) if net_response: if isinstance(net_response, list): net_response = net_response[0] if net_response else None if net_response: network_key = net_response.get("$key") if network_key is None: raise ValueError("No network specified and no 'Internal' network found") # Build request body for vm_recipe_instances body: dict[str, Any] = { "recipe": recipe_id, "name": name, "answers": { "HOSTNAME": hostname, "YB_HOSTNAME": hostname, "YB_CPU_CORES": cores, "YB_RAM": memory_gb * 1024, # Convert GB to MB "YB_NIC_1": str(network_key), "YB_NIC_1_IP_TYPE": "dhcp", "YB_TIMEZONE": "America/New_York", "YB_NTP": "time.nist.gov 0.pool.ntp.org 1.pool.ntp.org", "YB_DOMAINNAME": "", }, } if auto_update: body["auto_update"] = True # Deploy the recipe self._client._request("POST", "vm_recipe_instances", json_data=body) # Wait for the service to be created import time max_attempts = 15 for _ in range(max_attempts): time.sleep(2) try: service = self.get(name=name) if service: return service except NotFoundError: pass raise ValueError( f"NAS service deployment initiated but service '{name}' not found after waiting. " "It may still be creating." )
[docs] def update( # type: ignore[override] self, key: int, *, description: str | None = None, cpu_cores: int | None = None, memory_gb: int | None = None, max_imports: int | None = None, max_syncs: int | None = None, disable_swap: bool | None = None, read_ahead_kb: int | None = None, ) -> NASService: """Update NAS service settings. Updates both the underlying VM settings (CPU, RAM, description) and NAS-specific settings (max_imports, max_syncs, etc.). Args: key: NAS service $key (ID). description: New description. cpu_cores: Number of CPU cores (requires restart). memory_gb: Amount of RAM in GB (requires restart). max_imports: Maximum simultaneous import jobs (1-10). max_syncs: Maximum simultaneous sync jobs (1-10). disable_swap: Disable swap on the NAS service. read_ahead_kb: Read-ahead buffer size (0=auto, 64, 128, 256, 512, 1024, 2048, 4096). Returns: Updated NASService object. Example: >>> # Update NAS settings >>> client.nas_services.update(1, max_imports=5, max_syncs=3) >>> # Update VM resources (requires restart) >>> client.nas_services.update(1, cpu_cores=8, memory_gb=16) """ # Get current service info service = self.get(key) # Build VM update body vm_body: dict[str, Any] = {} if description is not None: vm_body["description"] = description if cpu_cores is not None: vm_body["cpu_cores"] = cpu_cores if memory_gb is not None: vm_body["ram"] = memory_gb * 1024 # Convert GB to MB # Build service update body service_body: dict[str, Any] = {} if max_imports is not None: service_body["max_imports"] = max_imports if max_syncs is not None: service_body["max_syncs"] = max_syncs if disable_swap is not None: service_body["disable_swap"] = disable_swap if read_ahead_kb is not None: service_body["read_ahead_kb_default"] = read_ahead_kb # Update VM settings if any if vm_body and service.vm_key: self._client._request("PUT", f"vms/{service.vm_key}", json_data=vm_body) # Update service settings if any if service_body: self._client._request("PUT", f"{self._endpoint}/{key}", json_data=service_body) return self.get(key)
[docs] def delete(self, key: int, *, force: bool = False) -> None: """Delete a NAS service. The NAS service VM must be stopped before deletion. If the service has volumes, they must be removed first unless force=True. Args: key: NAS service $key (ID). force: Remove even if service has volumes (will delete all data). Raises: ValueError: If service is running or has volumes without force. Example: >>> # Delete a stopped NAS service >>> client.nas_services.delete(1) >>> # Force delete (removes all volumes and data) >>> client.nas_services.delete(1, force=True) """ service = self.get(key) if service.is_running: raise ValueError( f"Cannot delete NAS service '{service.name}': Service is running. " "Power off the service first." ) if service.volume_count > 0 and not force: raise ValueError( f"Cannot delete NAS service '{service.name}': " f"Service has {service.volume_count} volume(s). " "Remove volumes first or use force=True." ) # Delete the underlying VM (cascades to recipe instance and vm_services) if service.vm_key: self._client._request("DELETE", f"vms/{service.vm_key}") else: # Fallback: delete service directly self._client._request("DELETE", f"{self._endpoint}/{key}")
[docs] def power_on(self, key: int) -> dict[str, Any] | None: """Power on a NAS service. Args: key: NAS service $key (ID). Returns: Task information dict or None. Example: >>> client.nas_services.power_on(1) """ service = self.get(key) if service.vm_key: result = self._client._request( "POST", "vm_actions", json_data={"vm": service.vm_key, "action": "poweron"} ) if isinstance(result, dict): return result return None
[docs] def power_off(self, key: int, *, force: bool = False) -> dict[str, Any] | None: """Power off a NAS service. Args: key: NAS service $key (ID). force: Force power off (like pulling the plug). Returns: Task information dict or None. Example: >>> # Graceful shutdown >>> client.nas_services.power_off(1) >>> # Force power off >>> client.nas_services.power_off(1, force=True) """ service = self.get(key) if service.vm_key: action = "kill" if force else "poweroff" result = self._client._request( "POST", "vm_actions", json_data={"vm": service.vm_key, "action": action} ) if isinstance(result, dict): return result return None
[docs] def restart(self, key: int) -> dict[str, Any] | None: """Restart a NAS service. Args: key: NAS service $key (ID). Returns: Task information dict or None. Example: >>> client.nas_services.restart(1) """ service = self.get(key) if service.vm_key: result = self._client._request( "POST", "vm_actions", json_data={"vm": service.vm_key, "action": "reset"} ) if isinstance(result, dict): return result return None
# ------------------------------------------------------------------------- # CIFS Settings # -------------------------------------------------------------------------
[docs] def get_cifs_settings(self, key: int) -> CIFSSettings: """Get CIFS/SMB settings for a NAS service. Args: key: NAS service $key (ID). Returns: CIFSSettings object. Raises: NotFoundError: If CIFS settings not found. Example: >>> cifs = client.nas_services.get_cifs_settings(1) >>> print(f"Workgroup: {cifs.workgroup}") >>> print(f"Min Protocol: {cifs.server_min_protocol}") """ fields = [ "$key", "service", "service#$display as service_name", "map_to_guest", "realm", "workgroup", "server_type", "extended_acl_support", "server_min_protocol", "ad_status", "ad_status_info", "ad_upn", "ad_ou", "ad_osname", "ad_osver", "advanced", ] response = self._client._request( "GET", "vm_service_cifs", params={"filter": f"service eq {key}", "fields": ",".join(fields)}, ) if not response: raise NotFoundError(f"CIFS settings not found for NAS service {key}") if isinstance(response, list): response = response[0] if response else None if not response: raise NotFoundError(f"CIFS settings not found for NAS service {key}") return CIFSSettings(response, self)
[docs] def set_cifs_settings( self, key: int, *, workgroup: str | None = None, min_protocol: str | None = None, guest_mapping: str | None = None, extended_acl_support: bool | None = None, ) -> CIFSSettings: """Update CIFS/SMB settings for a NAS service. Args: key: NAS service $key (ID). workgroup: NetBIOS workgroup name. min_protocol: Minimum SMB protocol version. Valid values: none, SMB2, SMB2_02, SMB2_10, SMB3, SMB3_00, SMB3_02, SMB3_11 guest_mapping: How to handle invalid users/passwords. Valid values: never, bad user, bad password, bad uid extended_acl_support: Enable extended ACL support. Returns: Updated CIFSSettings object. Example: >>> # Set minimum protocol to SMB3 >>> client.nas_services.set_cifs_settings(1, min_protocol="SMB3") >>> # Update workgroup >>> client.nas_services.set_cifs_settings(1, workgroup="MYWORKGROUP") """ # Get current settings to find the CIFS settings key current = self.get_cifs_settings(key) cifs_key = current.key body: dict[str, Any] = {} if workgroup is not None: body["workgroup"] = workgroup.lower() if min_protocol is not None: # Map user-friendly values to API values protocol_map = { "none": "none", "smb2": "SMB2", "smb2_02": "SMB2_02", "smb2_10": "SMB2_10", "smb3": "SMB3", "smb3_00": "SMB3_00", "smb3_02": "SMB3_02", "smb3_11": "SMB3_11", } body["server_min_protocol"] = protocol_map.get(min_protocol.lower(), min_protocol) if guest_mapping is not None: # Map user-friendly values to API values guest_map = { "never": "never", "baduser": "bad user", "bad_user": "bad user", "badpassword": "bad password", "bad_password": "bad password", "baduid": "bad uid", "bad_uid": "bad uid", } body["map_to_guest"] = guest_map.get(guest_mapping.lower(), guest_mapping) if extended_acl_support is not None: body["extended_acl_support"] = extended_acl_support if not body: return current self._client._request("PUT", f"vm_service_cifs/{cifs_key}", json_data=body) return self.get_cifs_settings(key)
# ------------------------------------------------------------------------- # NFS Settings # -------------------------------------------------------------------------
[docs] def get_nfs_settings(self, key: int) -> NFSSettings: """Get NFS settings for a NAS service. Args: key: NAS service $key (ID). Returns: NFSSettings object. Raises: NotFoundError: If NFS settings not found. Example: >>> nfs = client.nas_services.get_nfs_settings(1) >>> print(f"NFSv4 Enabled: {nfs.enable_nfsv4}") >>> print(f"Allowed Hosts: {nfs.allowed_hosts}") """ fields = [ "$key", "service", "service#$display as service_name", "enable_nfsv4", "allowed_hosts", "fsid", "anonuid", "anongid", "no_acl", "insecure", "async", "squash", "data_access", "allow_all", ] response = self._client._request( "GET", "vm_service_nfs", params={"filter": f"service eq {key}", "fields": ",".join(fields)}, ) if not response: raise NotFoundError(f"NFS settings not found for NAS service {key}") if isinstance(response, list): response = response[0] if response else None if not response: raise NotFoundError(f"NFS settings not found for NAS service {key}") return NFSSettings(response, self)
[docs] def set_nfs_settings( self, key: int, *, enable_nfsv4: bool | None = None, allowed_hosts: str | None = None, allow_all: bool | None = None, squash: str | None = None, data_access: str | None = None, anon_uid: int | None = None, anon_gid: int | None = None, no_acl: bool | None = None, insecure: bool | None = None, async_mode: bool | None = None, ) -> NFSSettings: """Update NFS settings for a NAS service. Args: key: NAS service $key (ID). enable_nfsv4: Enable NFSv4 protocol support. allowed_hosts: Comma-separated list of allowed hosts/networks. allow_all: Allow all hosts to access NFS exports. squash: User/group squashing mode. Valid values: root_squash, all_squash, no_root_squash data_access: Read-only or read-write access. Valid values: ro, rw anon_uid: Anonymous user ID for squashed users. anon_gid: Anonymous group ID for squashed users. no_acl: Disable ACL support for NFS exports. insecure: Allow connections from non-privileged ports. async_mode: Enable async mode for better performance. Returns: Updated NFSSettings object. Example: >>> # Enable NFSv4 >>> client.nas_services.set_nfs_settings(1, enable_nfsv4=True) >>> # Set allowed hosts >>> client.nas_services.set_nfs_settings( ... 1, allowed_hosts="192.168.1.0/24,10.0.0.0/8" ... ) """ # Get current settings to find the NFS settings key current = self.get_nfs_settings(key) nfs_key = current.key body: dict[str, Any] = {} if enable_nfsv4 is not None: body["enable_nfsv4"] = enable_nfsv4 if allowed_hosts is not None: body["allowed_hosts"] = allowed_hosts if allow_all is not None: body["allow_all"] = allow_all if squash is not None: # Map user-friendly values to API values squash_map = { "root_squash": "root_squash", "rootsquash": "root_squash", "all_squash": "all_squash", "allsquash": "all_squash", "no_root_squash": "no_root_squash", "norootsquash": "no_root_squash", "no_squash": "no_root_squash", "nosquash": "no_root_squash", } body["squash"] = squash_map.get(squash.lower(), squash) if data_access is not None: # Map user-friendly values to API values access_map = { "ro": "ro", "readonly": "ro", "read_only": "ro", "rw": "rw", "readwrite": "rw", "read_write": "rw", } body["data_access"] = access_map.get(data_access.lower(), data_access) if anon_uid is not None: body["anonuid"] = anon_uid if anon_gid is not None: body["anongid"] = anon_gid if no_acl is not None: body["no_acl"] = no_acl if insecure is not None: body["insecure"] = insecure if async_mode is not None: body["async"] = async_mode if not body: return current self._client._request("PUT", f"vm_service_nfs/{nfs_key}", json_data=body) return self.get_nfs_settings(key)
def _to_model(self, data: dict[str, Any]) -> NASService: """Convert API response to NASService object.""" return NASService(data, self)