Source code for pyvergeos.resources.users

"""User resource manager for VergeOS system users."""

from __future__ import annotations

import builtins
from typing import TYPE_CHECKING, Any, Literal

from pyvergeos.exceptions import NotFoundError
from pyvergeos.filters import build_filter
from pyvergeos.resources.base import ResourceManager, ResourceObject

if TYPE_CHECKING:
    from pyvergeos.client import VergeClient

# Type aliases
UserType = Literal["normal", "api", "vdi"]
TwoFactorType = Literal["email", "authenticator"]


[docs] class User(ResourceObject): """User resource object. Represents a user account in VergeOS. Users can be normal users, API users, or VDI users. Attributes: key: User primary key ($key). name: Username (login name). displayname: Display name. email: Email address. user_type: User type ('normal', 'api', 'vdi'). enabled: Whether the account is enabled. created: Creation timestamp (Unix epoch). last_login: Last login timestamp (Unix epoch). change_password: Whether password change is required on next login. physical_access: Whether console/SSH access is enabled (admin privilege). two_factor_enabled: Whether 2FA is enabled. two_factor_type: Type of 2FA ('email' or 'authenticator'). two_factor_setup_required: Whether 2FA setup is required on next login. account_locked: Timestamp when account was locked (0 if not locked). failed_attempts: Number of failed login attempts. auth_source: Authentication source key. auth_source_name: Authentication source name. remote_name: Remote username (for external auth). identity: Identity key. creator: Username of the user who created this account. ssh_keys: SSH public keys (newline-separated). """ @property def user_type(self) -> str: """Get the user type (normal, api, vdi).""" return str(self.get("type", "normal")) @property def user_type_display(self) -> str: """Get the user type as a human-readable string.""" type_map = { "normal": "Normal", "api": "API", "vdi": "VDI", "site_sync": "Site Sync", "site_user": "Site User", } return type_map.get(self.user_type, self.user_type) @property def displayname(self) -> str | None: """Get the display name.""" return self.get("displayname") @property def email(self) -> str | None: """Get the email address.""" return self.get("email") @property def is_enabled(self) -> bool: """Check if user account is enabled.""" return bool(self.get("enabled", True)) @property def created(self) -> int | None: """Get the creation timestamp (Unix epoch).""" val = self.get("created") return int(val) if val is not None else None @property def last_login(self) -> int | None: """Get the last login timestamp (Unix epoch).""" val = self.get("last_login") return int(val) if val and val > 0 else None @property def change_password(self) -> bool: """Check if password change is required on next login.""" return bool(self.get("change_password", False)) @property def physical_access(self) -> bool: """Check if physical (console/SSH) access is enabled. Note: Users with physical access have administrator privileges. """ return bool(self.get("physical_access", False)) @property def two_factor_enabled(self) -> bool: """Check if two-factor authentication is enabled.""" return bool(self.get("two_factor_authentication", False)) @property def two_factor_type(self) -> str | None: """Get the 2FA type ('email' or 'authenticator').""" return self.get("two_factor_type") @property def two_factor_type_display(self) -> str: """Get the 2FA type as a human-readable string.""" type_map = { "email": "Email", "authenticator": "Authenticator (TOTP)", } return type_map.get(self.two_factor_type or "", self.two_factor_type or "None") @property def two_factor_setup_required(self) -> bool: """Check if 2FA setup is required on next login. Note: This is a write-only field in the API, so this property will always return False. Use it only for setting during create/update operations. """ return bool(self.get("two_factor_setup_next_login", False)) @property def account_locked(self) -> int | None: """Get the account locked timestamp (Unix epoch), or None if not locked.""" val = self.get("account_locked") return int(val) if val and val > 0 else None @property def is_locked(self) -> bool: """Check if account is currently locked.""" return self.account_locked is not None @property def failed_attempts(self) -> int: """Get the number of failed login attempts.""" return int(self.get("failed_attempts", 0)) @property def auth_source(self) -> int | None: """Get the authentication source key.""" val = self.get("auth_source") return int(val) if val is not None else None @property def auth_source_name(self) -> str | None: """Get the authentication source name.""" return self.get("auth_source_name") or self.get("auth_source_display") @property def remote_name(self) -> str | None: """Get the remote username (for external auth).""" return self.get("remote_name") @property def identity(self) -> int | None: """Get the identity key.""" val = self.get("identity") return int(val) if val is not None else None @property def creator(self) -> str | None: """Get the username of who created this account.""" return self.get("creator") @property def ssh_keys(self) -> str | None: """Get the SSH public keys (newline-separated).""" return self.get("ssh_keys")
[docs] def enable(self) -> User: """Enable this user account. Returns: Updated User object. """ from typing import cast manager = cast("UserManager", self._manager) return manager.enable(self.key)
[docs] def disable(self) -> User: """Disable this user account. Returns: Updated User object. """ from typing import cast manager = cast("UserManager", self._manager) return manager.disable(self.key)
[docs] class UserManager(ResourceManager[User]): """Manager for VergeOS user operations. Provides CRUD operations and management for user accounts. Example: >>> # List all users >>> for user in client.users.list(): ... print(f"{user.name}: {user.user_type_display}") >>> # List only API users >>> api_users = client.users.list(user_type="api") >>> # Create a user >>> user = client.users.create( ... name="jsmith", ... password="SecurePass123!", ... displayname="John Smith", ... email="jsmith@company.com" ... ) >>> # Enable/disable users >>> client.users.disable(user.key) >>> client.users.enable(user.key) """ _endpoint = "users" # Default fields for list operations (matches PowerShell module) # Note: two_factor_setup_next_login is a write-only argument field _default_fields = [ "$key", "name", "displayname", "email", "type", "enabled", "created", "last_login", "change_password", "physical_access", "two_factor_authentication", "two_factor_type", "account_locked", "failed_attempts", "auth_source", "auth_source#name as auth_source_name", "remote_name", "identity", "creator", ]
[docs] def __init__(self, client: VergeClient) -> None: super().__init__(client)
def _to_model(self, data: dict[str, Any]) -> User: """Convert API response to User object.""" return User(data, self)
[docs] def list( self, filter: str | None = None, fields: builtins.list[str] | None = None, limit: int | None = None, offset: int | None = None, user_type: UserType | None = None, enabled: bool | None = None, include_system: bool = False, **filter_kwargs: Any, ) -> builtins.list[User]: """List users with optional filtering. By default, excludes system user types (site_sync, site_user). Args: filter: OData filter string. fields: List of fields to return (uses defaults if not specified). limit: Maximum number of results. offset: Skip this many results. user_type: Filter by user type ('normal', 'api', 'vdi'). enabled: Filter by enabled status. include_system: Include system user types (site_sync, site_user). **filter_kwargs: Shorthand filter arguments (name, etc.). Returns: List of User objects. Example: >>> # List all users >>> users = client.users.list() >>> # List only API users >>> api_users = client.users.list(user_type="api") >>> # List disabled users >>> disabled = client.users.list(enabled=False) >>> # List by name pattern (exact match) >>> users = client.users.list(name="admin") """ params: dict[str, Any] = {} # Build filter filters: builtins.list[str] = [] # Exclude system user types by default if not include_system: filters.append("type ne 'site_sync'") filters.append("type ne 'site_user'") if filter: filters.append(filter) if filter_kwargs: filters.append(build_filter(**filter_kwargs)) # Add user_type filter if user_type is not None: filters.append(f"type eq '{user_type}'") # Add enabled filter if enabled is not None: filters.append(f"enabled eq {str(enabled).lower()}") if filters: params["filter"] = " and ".join(filters) # Use default fields if not specified if fields: params["fields"] = ",".join(fields) else: params["fields"] = ",".join(self._default_fields) # Pagination if limit is not None: params["limit"] = limit if offset is not None: params["offset"] = offset response = self._client._request("GET", self._endpoint, params=params) if response is None: return [] if not isinstance(response, list): response = [response] return [self._to_model(item) for item in response if item]
[docs] def list_enabled(self) -> builtins.list[User]: """List all enabled users. Returns: List of enabled User objects. """ return self.list(enabled=True)
[docs] def list_disabled(self) -> builtins.list[User]: """List all disabled users. Returns: List of disabled User objects. """ return self.list(enabled=False)
[docs] def list_api_users(self) -> builtins.list[User]: """List all API users. Returns: List of API User objects. """ return self.list(user_type="api")
[docs] def list_vdi_users(self) -> builtins.list[User]: """List all VDI users. Returns: List of VDI User objects. """ return self.list(user_type="vdi")
[docs] def get( self, key: int | None = None, *, name: str | None = None, fields: builtins.list[str] | None = None, ) -> User: """Get a single user by key or name. Args: key: User $key (ID). name: Username. fields: List of fields to return. Returns: User object. Raises: NotFoundError: If user not found. ValueError: If neither key nor name provided. Example: >>> # Get by key >>> user = client.users.get(123) >>> # Get by name >>> user = client.users.get(name="admin") """ if key is not None: # Direct fetch by key params: dict[str, Any] = {} if fields: params["fields"] = ",".join(fields) else: params["fields"] = ",".join(self._default_fields) response = self._client._request("GET", f"{self._endpoint}/{key}", params=params) if response is None: raise NotFoundError(f"User with key {key} not found") if not isinstance(response, dict): raise NotFoundError(f"User with key {key} returned invalid response") return self._to_model(response) if name is not None: # Search by name escaped_name = name.replace("'", "''") results = self.list(filter=f"name eq '{escaped_name}'", fields=fields, limit=1) if not results: raise NotFoundError(f"User '{name}' not found") return results[0] raise ValueError("Either key or name must be provided")
[docs] def create( # type: ignore[override] self, name: str, password: str, *, displayname: str | None = None, email: str | None = None, user_type: UserType = "normal", enabled: bool = True, change_password: bool = False, physical_access: bool = False, two_factor_enabled: bool = False, two_factor_type: TwoFactorType = "email", two_factor_setup_required: bool = False, ssh_keys: builtins.list[str] | str | None = None, ) -> User: """Create a new user. Args: name: Username (1-128 chars, no forward slashes, will be lowercased). password: User password. displayname: Display name for the user. email: Email address (required if enabling 2FA). user_type: User type ('normal', 'api', 'vdi'). Default 'normal'. enabled: Enable the account. Default True. change_password: Require password change on next login. Default False. physical_access: Enable console/SSH access (grants admin). Default False. two_factor_enabled: Enable two-factor authentication. Default False. two_factor_type: Type of 2FA ('email' or 'authenticator'). Default 'email'. two_factor_setup_required: Require 2FA setup on next login. Default False. ssh_keys: SSH public keys (list or newline-separated string). Returns: Created User object. Raises: ValueError: If email not provided when enabling 2FA. Example: >>> # Create a basic user >>> user = client.users.create( ... name="jsmith", ... password="SecurePass123!", ... displayname="John Smith", ... email="jsmith@company.com" ... ) >>> # Create an API user >>> api_user = client.users.create( ... name="api_service", ... password="ApiSecret!", ... user_type="api" ... ) >>> # Create with 2FA enabled >>> user = client.users.create( ... name="secure_user", ... password="Pass123!", ... email="secure@company.com", ... two_factor_enabled=True, ... two_factor_type="authenticator" ... ) """ # Validate 2FA requirements if two_factor_enabled and not email: raise ValueError("Email address is required when enabling two-factor authentication") # Build request body body: dict[str, Any] = { "name": name.lower(), # API requires lowercase "password": password, "type": user_type, "enabled": enabled, } if displayname: body["displayname"] = displayname if email: body["email"] = email.lower() if change_password: body["change_password"] = True if physical_access: body["physical_access"] = True # Handle 2FA settings if two_factor_enabled: if two_factor_type == "authenticator": # Authenticator requires TOTP setup - user must set up at next login body["two_factor_setup_next_login"] = True body["two_factor_type"] = two_factor_type else: # Email-based 2FA can be enabled immediately body["two_factor_authentication"] = True body["two_factor_type"] = two_factor_type if two_factor_setup_required: body["two_factor_setup_next_login"] = True # Handle SSH keys if ssh_keys: if isinstance(ssh_keys, list): body["ssh_keys"] = "\n".join(ssh_keys) else: body["ssh_keys"] = ssh_keys response = self._client._request("POST", self._endpoint, json_data=body) # Get the created user if response and isinstance(response, dict): user_key = response.get("$key") if user_key: return self.get(key=int(user_key)) # Fallback: search by name return self.get(name=name.lower())
[docs] def update( # type: ignore[override] self, key: int, *, password: str | None = None, displayname: str | None = None, email: str | None = None, enabled: bool | None = None, change_password: bool | None = None, physical_access: bool | None = None, two_factor_enabled: bool | None = None, two_factor_type: TwoFactorType | None = None, two_factor_setup_required: bool | None = None, ssh_keys: builtins.list[str] | str | None = None, ) -> User: """Update a user. Args: key: User $key (ID). password: New password. displayname: New display name. email: New email address. enabled: Enable or disable the account. change_password: Require password change on next login. physical_access: Enable or disable console/SSH access. two_factor_enabled: Enable or disable 2FA. two_factor_type: Type of 2FA ('email' or 'authenticator'). two_factor_setup_required: Require 2FA setup on next login. ssh_keys: SSH public keys (list, string, or empty string to clear). Returns: Updated User object. Example: >>> # Change password >>> client.users.update(user.key, password="NewPass123!") >>> # Update display name and email >>> client.users.update( ... user.key, ... displayname="John Q. Smith", ... email="john.smith@company.com" ... ) >>> # Enable 2FA >>> client.users.update( ... user.key, ... two_factor_enabled=True, ... two_factor_type="authenticator" ... ) """ body: dict[str, Any] = {} if password is not None: body["password"] = password if displayname is not None: body["displayname"] = displayname if email is not None: body["email"] = email.lower() if email else "" if enabled is not None: body["enabled"] = enabled if change_password is not None: body["change_password"] = change_password if physical_access is not None: body["physical_access"] = physical_access if two_factor_enabled is not None: body["two_factor_authentication"] = two_factor_enabled if two_factor_type is not None: body["two_factor_type"] = two_factor_type if two_factor_setup_required is not None: body["two_factor_setup_next_login"] = two_factor_setup_required if ssh_keys is not None: if isinstance(ssh_keys, list): body["ssh_keys"] = "\n".join(ssh_keys) if ssh_keys else "" else: body["ssh_keys"] = ssh_keys if not body: return self.get(key) self._client._request("PUT", f"{self._endpoint}/{key}", json_data=body) return self.get(key)
[docs] def enable(self, key: int) -> User: """Enable a user account. Args: key: User $key (ID). Returns: Updated User object. Example: >>> client.users.enable(user.key) """ return self.update(key, enabled=True)
[docs] def disable(self, key: int) -> User: """Disable a user account. The account is not deleted and can be re-enabled later. Args: key: User $key (ID). Returns: Updated User object. Example: >>> client.users.disable(user.key) """ return self.update(key, enabled=False)