Source code for pyvergeos.resources.logs

"""Log resource manager for VergeOS system logs."""

from __future__ import annotations

import builtins
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Any

from pyvergeos.exceptions import NotFoundError
from pyvergeos.filters import build_filter
from pyvergeos.resources.base import ResourceManager, ResourceObject

if TYPE_CHECKING:
    from pyvergeos.client import VergeClient


# Log levels
LOG_LEVELS = ["critical", "error", "warning", "message", "audit", "summary", "debug"]

# Object type mappings (friendly name -> API value)
OBJECT_TYPE_MAP = {
    "VM": "vm",
    "Network": "vnet",
    "Tenant": "tenant",
    "User": "user",
    "System": "system",
    "Node": "node",
    "Cluster": "cluster",
    "File": "file",
    "Group": "group",
    "Permission": "permission",
    "SMTP": "smtp",
    "Task": "task",
    "Site": "site",
    "SystemSnapshot": "cloud_snapshots",
    "CatalogRepository": "catalog_repository",
    "OIDCApplication": "oidc_application",
    "ServiceContainer": "service_container",
    "NASService": "vm_service",
    "VMImport": "vm_import",
    "VMwareBackup": "vmware_container",
    "SnapshotProfile": "snapshot_profile",
    "ImportExport": "import_export",
    "Update": "updates",
    "Other": "other",
}

# Reverse mapping (API value -> friendly name)
OBJECT_TYPE_DISPLAY = {v: k for k, v in OBJECT_TYPE_MAP.items()}
# Add extra display mappings for full names from API schema
OBJECT_TYPE_DISPLAY.update(
    {
        "vm": "VM",
        "vnet": "Network",
        "tenant": "Tenant",
        "user": "User",
        "system": "System",
        "node": "Node",
        "cluster": "Cluster",
        "file": "File",
        "group": "Group",
        "permission": "Permission",
        "smtp": "SMTP",
        "task": "Task",
        "site": "Site",
        "cloud_snapshots": "SystemSnapshot",
        "catalog_repository": "CatalogRepository",
        "oidc_application": "OIDCApplication",
        "service_container": "ServiceContainer",
        "vm_service": "NASService",
        "vm_import": "VMImport",
        "vmware_container": "VMwareBackup",
        "snapshot_profile": "SnapshotProfile",
        "import_export": "ImportExport",
        "updates": "Update",
        "other": "Other",
    }
)


# Default fields for log list operations
_DEFAULT_LOG_FIELDS = [
    "$key",
    "level",
    "text",
    "timestamp",
    "user",
    "object_type",
    "object_name",
]


[docs] class Log(ResourceObject): """Log resource object. Represents a log entry in VergeOS system logs including audit events, messages, warnings, errors, and critical events. Properties: level: Log severity (critical, error, warning, message, audit, summary, debug). level_display: Capitalized display name for level. text: Log message text. user: User who performed the action. object_type: API object type value (vm, vnet, etc.). object_type_display: Friendly object type name (VM, Network, etc.). object_name: Name of the related object. created_at: Datetime when log was created (from microsecond timestamp). timestamp_us: Raw timestamp in microseconds. """ @property def level(self) -> str: """Get log severity level.""" return str(self.get("level", "")) @property def level_display(self) -> str: """Get capitalized level display name.""" level = self.level return level.capitalize() if level else "" @property def text(self) -> str: """Get log message text.""" return str(self.get("text", "")) @property def user(self) -> str: """Get user who performed the action.""" return str(self.get("user", "")) @property def object_type(self) -> str: """Get API object type value.""" return str(self.get("object_type", "")) @property def object_type_display(self) -> str: """Get friendly object type name.""" return OBJECT_TYPE_DISPLAY.get(self.object_type, self.object_type) @property def object_name(self) -> str: """Get name of the related object.""" return str(self.get("object_name", "")) @property def timestamp_us(self) -> int: """Get raw timestamp in microseconds.""" ts = self.get("timestamp") return int(ts) if ts is not None else 0 @property def created_at(self) -> datetime | None: """Get datetime when log was created. Converts the microsecond timestamp to a datetime object. """ ts = self.get("timestamp") if ts is None or ts == 0: return None # Timestamp is in microseconds, convert to seconds for datetime ts_seconds = int(ts) / 1_000_000 return datetime.fromtimestamp(ts_seconds, tz=timezone.utc) def __repr__(self) -> str: key = self.get("$key", "?") level = self.level_display text = self.text[:30] + "..." if len(self.text) > 30 else self.text return f"<Log key={key} level={level!r} text={text!r}>"
[docs] class LogManager(ResourceManager[Log]): """Manager for Log operations. Logs provide an audit trail and history of events in the VergeOS system, including user actions, system events, errors, and warnings. Example: >>> # List recent logs >>> logs = client.logs.list(limit=100) >>> for log in logs: ... print(f"{log.level_display}: {log.text}") >>> # List errors and critical logs >>> errors = client.logs.list(level=["error", "critical"]) >>> # Filter by object type >>> vm_logs = client.logs.list(object_type="VM") >>> # Filter by user >>> admin_logs = client.logs.list(user="admin") >>> # Filter by time range (last hour) >>> from datetime import datetime, timedelta, timezone >>> since = datetime.now(timezone.utc) - timedelta(hours=1) >>> recent = client.logs.list(since=since) >>> # Search log text >>> power_logs = client.logs.list(text="power") """ _endpoint = "logs"
[docs] def __init__(self, client: VergeClient) -> None: super().__init__(client)
def _to_model(self, data: dict[str, Any]) -> Log: return Log(data, self)
[docs] def list( self, filter: str | None = None, fields: builtins.list[str] | None = None, limit: int | None = 100, offset: int | None = None, *, level: str | builtins.list[str] | None = None, object_type: str | None = None, user: str | None = None, text: str | None = None, since: datetime | None = None, before: datetime | None = None, errors_only: bool = False, **filter_kwargs: Any, ) -> builtins.list[Log]: """List logs with optional filtering. Args: filter: OData filter string. fields: List of fields to return. limit: Maximum number of results (default: 100, max: 10000). offset: Skip this many results. level: Filter by severity level (or list of levels). Values: critical, error, warning, message, audit, summary, debug. object_type: Filter by object type. Values: VM, Network, Tenant, User, System, Node, Cluster, File, Group, Permission, SMTP, Task, Site, etc. user: Filter logs by user (contains search). text: Filter logs containing this text (contains search). since: Return logs since this datetime. before: Return logs before this datetime. errors_only: Shortcut to filter for error and critical logs only. **filter_kwargs: Additional filter arguments. Returns: List of Log objects sorted by timestamp (newest first). Example: >>> # Recent logs >>> logs = client.logs.list() >>> # Error and critical logs only >>> errors = client.logs.list(errors_only=True) >>> # VM logs from the last hour >>> from datetime import datetime, timedelta, timezone >>> since = datetime.now(timezone.utc) - timedelta(hours=1) >>> vm_logs = client.logs.list(object_type="VM", since=since) >>> # Search for specific text >>> logs = client.logs.list(text="snapshot") """ conditions: builtins.list[str] = [] if filter: conditions.append(f"({filter})") # Handle errors_only shortcut if errors_only: level = ["error", "critical"] # Level filter if level: if isinstance(level, str): conditions.append(f"level eq '{level.lower()}'") else: level_filters = [f"level eq '{lv.lower()}'" for lv in level] if len(level_filters) == 1: conditions.append(level_filters[0]) else: conditions.append(f"({' or '.join(level_filters)})") # Object type filter if object_type: api_object_type = OBJECT_TYPE_MAP.get(object_type, object_type) conditions.append(f"object_type eq '{api_object_type}'") # User filter (contains search) if user: escaped_user = user.replace("'", "''") conditions.append(f"user ct '{escaped_user}'") # Text filter (contains search) if text: escaped_text = text.replace("'", "''") conditions.append(f"text ct '{escaped_text}'") # Time filters (timestamp is in microseconds) if since: if since.tzinfo is None: # Assume local time if no timezone, convert to UTC timestamp since_us = int(since.timestamp() * 1_000_000) else: since_us = int(since.timestamp() * 1_000_000) conditions.append(f"timestamp ge {since_us}") if before: if before.tzinfo is None: before_us = int(before.timestamp() * 1_000_000) else: before_us = int(before.timestamp() * 1_000_000) conditions.append(f"timestamp lt {before_us}") # Add any additional filter kwargs if filter_kwargs: conditions.append(build_filter(**filter_kwargs)) # Combine conditions combined_filter = " and ".join(conditions) if conditions else None # Use default fields if not specified if fields is None: fields = _DEFAULT_LOG_FIELDS params: dict[str, Any] = {} if combined_filter: params["filter"] = combined_filter if fields: params["fields"] = ",".join(fields) if limit is not None: params["limit"] = limit if offset is not None: params["offset"] = offset # Sort by timestamp descending (newest first) params["sort"] = "-timestamp" response = self._client._request("GET", self._endpoint, params=params) if response is None: return [] if not isinstance(response, list): return [self._to_model(response)] return [self._to_model(item) for item in response]
[docs] def list_errors( self, limit: int | None = 100, since: datetime | None = None, ) -> builtins.list[Log]: """List error and critical logs. Args: limit: Maximum number of results. since: Return logs since this datetime. Returns: List of error and critical Log objects. Example: >>> errors = client.logs.list_errors() >>> for log in errors: ... print(f"[{log.level_display}] {log.text}") """ return self.list(errors_only=True, limit=limit, since=since)
[docs] def list_by_level( self, level: str, limit: int | None = 100, since: datetime | None = None, ) -> builtins.list[Log]: """List logs by severity level. Args: level: Log level (critical, error, warning, message, audit, summary, debug). limit: Maximum number of results. since: Return logs since this datetime. Returns: List of Log objects at the specified level. Example: >>> warnings = client.logs.list_by_level("warning") """ return self.list(level=level, limit=limit, since=since)
[docs] def list_by_object_type( self, object_type: str, limit: int | None = 100, since: datetime | None = None, ) -> builtins.list[Log]: """List logs by object type. Args: object_type: Object type (VM, Network, Tenant, User, System, Node, etc.). limit: Maximum number of results. since: Return logs since this datetime. Returns: List of Log objects for the specified object type. Example: >>> vm_logs = client.logs.list_by_object_type("VM") >>> network_logs = client.logs.list_by_object_type("Network") """ return self.list(object_type=object_type, limit=limit, since=since)
[docs] def list_by_user( self, user: str, limit: int | None = 100, since: datetime | None = None, ) -> builtins.list[Log]: """List logs by user. Args: user: Username to filter by (contains search). limit: Maximum number of results. since: Return logs since this datetime. Returns: List of Log objects for the specified user. Example: >>> admin_logs = client.logs.list_by_user("admin") """ return self.list(user=user, limit=limit, since=since)
[docs] def search( self, text: str, limit: int | None = 100, since: datetime | None = None, level: str | builtins.list[str] | None = None, object_type: str | None = None, ) -> builtins.list[Log]: """Search logs by text content. Args: text: Text to search for (case-insensitive contains search). limit: Maximum number of results. since: Return logs since this datetime. level: Filter by severity level(s). object_type: Filter by object type. Returns: List of Log objects containing the search text. Example: >>> power_logs = client.logs.search("power") >>> snapshot_errors = client.logs.search( ... "snapshot", level=["error", "critical"] ... ) """ return self.list(text=text, limit=limit, since=since, level=level, object_type=object_type)
[docs] def get( self, key: int | None = None, *, name: str | None = None, fields: builtins.list[str] | None = None, ) -> Log: """Get a log entry by key. Note: Logs do not have a name field, so name parameter is not supported. Args: key: Log $key (ID). name: Not supported for logs (will raise ValueError). fields: List of fields to return. Returns: Log object. Raises: NotFoundError: If log not found. ValueError: If key not provided or if name is used. Example: >>> log = client.logs.get(12345) >>> print(f"{log.level_display}: {log.text}") """ if name is not None: raise ValueError("Logs do not have a name field. Use key instead.") if key is None: raise ValueError("key must be provided") params: dict[str, Any] = {} if fields is None: fields = _DEFAULT_LOG_FIELDS if fields: params["fields"] = ",".join(fields) response = self._client._request("GET", f"{self._endpoint}/{key}", params=params) if response is None: raise NotFoundError(f"Log {key} not found") if not isinstance(response, dict): raise NotFoundError(f"Log {key} returned invalid response") return self._to_model(response)