Source code for pyvergeos.resources.alarms

"""Alarm resource manager for VergeOS system monitoring."""

from __future__ import annotations

import builtins
import time
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Any

from pyvergeos.exceptions import NotFoundError
from pyvergeos.filters import build_filter
from pyvergeos.resources.base import ResourceManager, ResourceObject

if TYPE_CHECKING:
    from pyvergeos.client import VergeClient


# Alarm severity levels
ALARM_LEVELS = ["critical", "error", "warning", "message", "audit", "summary", "debug"]

# Owner type mappings (friendly name -> API value)
OWNER_TYPE_MAP = {
    "VM": "vms",
    "Network": "vnets",
    "Node": "nodes",
    "Tenant": "tenant_nodes",
    "User": "users",
    "System": "system",
    "CloudSnapshot": "cloud_snapshots",
}

# Reverse mapping (API value -> friendly name)
OWNER_TYPE_DISPLAY = {v: k for k, v in OWNER_TYPE_MAP.items()}


# Default fields for alarm list operations
_DEFAULT_ALARM_FIELDS = [
    "$key",
    "owner",
    "owner#name as owner_name",
    "owner_type",
    "sub_owner",
    "alarm_type",
    "alarm_type#name as alarm_type_name",
    "alarm_type#description as alarm_type_description",
    "level",
    "status",
    "alarm_id",
    "resolvable",
    "resolve_text",
    "created",
    "modified",
    "snooze",
    "snoozed_by",
]

# Default fields for alarm history list operations
_DEFAULT_HISTORY_FIELDS = [
    "$key",
    "alarm_raised",
    "alarm_lowered",
    "archived_by",
    "owner",
    "alarm_type",
    "level",
    "status",
    "alarm_id",
]


[docs] class Alarm(ResourceObject): """Alarm resource object. Represents an active alarm in VergeOS indicating a condition requiring attention such as errors, warnings, or critical issues. Properties: level: Alarm severity (critical, error, warning, message, audit). level_display: Capitalized display name for level. status: Alarm status message. alarm_type: Alarm type name. alarm_type_key: Alarm type $key. description: Alarm type description. alarm_id: Unique alarm identifier (8-char string). owner_name: Name of the owner object. owner_key: Key of the owner object. owner_type: API owner type value (vms, vnets, etc.). owner_type_display: Friendly owner type name (VM, Network, etc.). sub_owner: Sub-owner key if applicable. is_resolvable: True if alarm can be resolved. resolve_text: Text describing how to resolve. is_snoozed: True if alarm is currently snoozed. snoozed_by: User who snoozed the alarm. snooze_until: Datetime when snooze expires. created_at: Datetime when alarm was raised. modified_at: Datetime when alarm was last modified. """ @property def level(self) -> str: """Get alarm severity level.""" return str(self.get("level", "")) @property def level_display(self) -> str: """Get capitalized level display name.""" level = self.level return level.capitalize() if level else "" @property def status(self) -> str: """Get alarm status message.""" return str(self.get("status", "")) @property def alarm_type(self) -> str: """Get alarm type name.""" return str(self.get("alarm_type_name", "")) @property def alarm_type_key(self) -> int | None: """Get alarm type $key.""" val = self.get("alarm_type") return int(val) if val is not None else None @property def description(self) -> str: """Get alarm type description.""" return str(self.get("alarm_type_description", "")) @property def alarm_id(self) -> str: """Get unique alarm identifier (8-char string).""" return str(self.get("alarm_id", "")) @property def owner_name(self) -> str: """Get owner object name.""" return str(self.get("owner_name", "")) @property def owner_key(self) -> int | None: """Get owner object key.""" val = self.get("owner") return int(val) if val is not None else None @property def owner_type(self) -> str: """Get API owner type value.""" return str(self.get("owner_type", "")) @property def owner_type_display(self) -> str: """Get friendly owner type name.""" return OWNER_TYPE_DISPLAY.get(self.owner_type, self.owner_type) @property def sub_owner(self) -> int | None: """Get sub-owner key if applicable.""" val = self.get("sub_owner") return int(val) if val is not None else None @property def is_resolvable(self) -> bool: """Check if alarm can be resolved.""" return bool(self.get("resolvable", False)) @property def resolve_text(self) -> str: """Get text describing how to resolve.""" return str(self.get("resolve_text", "")) @property def is_snoozed(self) -> bool: """Check if alarm is currently snoozed.""" snooze_ts = self.get("snooze", 0) if not snooze_ts or snooze_ts == 0: return False now_ts = int(datetime.now(timezone.utc).timestamp()) return int(snooze_ts) > now_ts @property def snoozed_by(self) -> str: """Get user who snoozed the alarm.""" return str(self.get("snoozed_by", "")) @property def snooze_until(self) -> datetime | None: """Get datetime when snooze expires.""" snooze_ts = self.get("snooze", 0) if not snooze_ts or snooze_ts == 0: return None return datetime.fromtimestamp(int(snooze_ts), tz=timezone.utc) @property def created_at(self) -> datetime | None: """Get datetime when alarm was raised.""" ts = self.get("created") if ts is None: return None return datetime.fromtimestamp(int(ts), tz=timezone.utc) @property def modified_at(self) -> datetime | None: """Get datetime when alarm was last modified.""" ts = self.get("modified") if ts is None: return None return datetime.fromtimestamp(int(ts), tz=timezone.utc)
[docs] def snooze(self, hours: int = 24) -> Alarm: """Snooze this alarm for a specified duration. Args: hours: Number of hours to snooze (default: 24, max: 8760). Returns: Updated Alarm object. """ from typing import cast manager = cast("AlarmManager", self._manager) return manager.snooze(self.key, hours=hours)
[docs] def snooze_to(self, until: datetime) -> Alarm: """Snooze this alarm until a specific time. Args: until: Datetime when snooze should expire. Returns: Updated Alarm object. """ from typing import cast manager = cast("AlarmManager", self._manager) return manager.snooze_to(self.key, until=until)
[docs] def unsnooze(self) -> Alarm: """Remove snooze from this alarm, making it active again. Returns: Updated Alarm object. """ from typing import cast manager = cast("AlarmManager", self._manager) return manager.unsnooze(self.key)
[docs] def resolve(self) -> None: """Resolve this alarm. Raises: ValueError: If alarm is not resolvable. APIError: If resolve action fails. """ from typing import cast manager = cast("AlarmManager", self._manager) manager.resolve(self.key)
def __repr__(self) -> str: key = self.get("$key", "?") level = self.level_display status = self.status[:30] + "..." if len(self.status) > 30 else self.status return f"<Alarm key={key} level={level!r} status={status!r}>"
[docs] class AlarmHistory(ResourceObject): """Alarm history resource object. Represents a resolved/lowered alarm in the history. Properties: level: Alarm severity when raised. level_display: Capitalized display name for level. status: Alarm status message. alarm_type: Alarm type name. alarm_id: Unique alarm identifier. owner: Owner object description (string in history). archived_by: How the alarm was archived (auto, user, etc.). raised_at: Datetime when alarm was raised. lowered_at: Datetime when alarm was lowered/resolved. """ @property def level(self) -> str: """Get alarm severity level.""" return str(self.get("level", "")) @property def level_display(self) -> str: """Get capitalized level display name.""" level = self.level return level.capitalize() if level else "" @property def status(self) -> str: """Get alarm status message.""" return str(self.get("status", "")) @property def alarm_type(self) -> str: """Get alarm type name.""" return str(self.get("alarm_type", "")) @property def alarm_id(self) -> str: """Get unique alarm identifier.""" return str(self.get("alarm_id", "")) @property def owner(self) -> str: """Get owner description.""" return str(self.get("owner", "")) @property def archived_by(self) -> str: """Get how alarm was archived.""" return str(self.get("archived_by", "")) @property def raised_at(self) -> datetime | None: """Get datetime when alarm was raised.""" ts = self.get("alarm_raised") if ts is None: return None return datetime.fromtimestamp(int(ts), tz=timezone.utc) @property def lowered_at(self) -> datetime | None: """Get datetime when alarm was lowered/resolved.""" ts = self.get("alarm_lowered") if ts is None: return None return datetime.fromtimestamp(int(ts), tz=timezone.utc) def __repr__(self) -> str: key = self.get("$key", "?") level = self.level_display status = self.status[:30] + "..." if len(self.status) > 30 else self.status return f"<AlarmHistory key={key} level={level!r} status={status!r}>"
[docs] class AlarmManager(ResourceManager[Alarm]): """Manager for Alarm operations. Alarms indicate conditions requiring attention such as errors, warnings, or critical issues with VMs, networks, nodes, or the system. Example: >>> # List all active alarms >>> alarms = client.alarms.list() >>> for alarm in alarms: ... print(f"{alarm.level_display}: {alarm.status}") >>> # List critical and error alarms >>> critical = client.alarms.list(level=["critical", "error"]) >>> # List alarms by owner type >>> vm_alarms = client.alarms.list(owner_type="VM") >>> # Snooze an alarm >>> client.alarms.snooze(alarm_key, hours=24) >>> # Resolve a resolvable alarm >>> client.alarms.resolve(alarm_key) >>> # Get alarm history >>> history = client.alarms.list_history() """ _endpoint = "alarms"
[docs] def __init__(self, client: VergeClient) -> None: super().__init__(client)
def _to_model(self, data: dict[str, Any]) -> Alarm: return Alarm(data, self) def _to_history_model(self, data: dict[str, Any]) -> AlarmHistory: return AlarmHistory(data, self)
[docs] def list( self, filter: str | None = None, fields: builtins.list[str] | None = None, limit: int | None = None, offset: int | None = None, *, level: str | builtins.list[str] | None = None, owner_type: str | None = None, include_snoozed: bool = False, **filter_kwargs: Any, ) -> builtins.list[Alarm]: """List alarms with optional filtering. By default, only returns active (non-snoozed) alarms. Args: filter: OData filter string. fields: List of fields to return. limit: Maximum number of results. offset: Skip this many results. level: Filter by severity level (or list of levels). Values: critical, error, warning, message, audit, summary, debug. owner_type: Filter by owner type. Values: VM, Network, Node, Tenant, User, System, CloudSnapshot. include_snoozed: If True, include snoozed alarms (default: False). **filter_kwargs: Additional filter arguments. Returns: List of Alarm objects sorted by created date (newest first). Example: >>> # All active alarms >>> alarms = client.alarms.list() >>> # Critical alarms only >>> critical = client.alarms.list(level="critical") >>> # Critical and error alarms >>> severe = client.alarms.list(level=["critical", "error"]) >>> # VM alarms >>> vm_alarms = client.alarms.list(owner_type="VM") >>> # Include snoozed alarms >>> all_alarms = client.alarms.list(include_snoozed=True) """ conditions: builtins.list[str] = [] if filter: conditions.append(f"({filter})") # Level filter if level: if isinstance(level, str): conditions.append(f"level eq '{level.lower()}'") else: level_filters = [f"level eq '{lv.lower()}'" for lv in level] if len(level_filters) == 1: conditions.append(level_filters[0]) else: conditions.append(f"({' or '.join(level_filters)})") # Owner type filter if owner_type: api_owner_type = OWNER_TYPE_MAP.get(owner_type, owner_type) conditions.append(f"owner_type eq '{api_owner_type}'") # Active (non-snoozed) filter - default behavior if not include_snoozed: # Show alarms where snooze is 0 or snooze time has passed conditions.append("(snooze eq 0 or snooze le {$now})") # Add any additional filter kwargs if filter_kwargs: conditions.append(build_filter(**filter_kwargs)) # Combine conditions combined_filter = " and ".join(conditions) if conditions else None # Use default fields if not specified if fields is None: fields = _DEFAULT_ALARM_FIELDS params: dict[str, Any] = {} if combined_filter: params["filter"] = combined_filter if fields: params["fields"] = ",".join(fields) if limit is not None: params["limit"] = limit if offset is not None: params["offset"] = offset # Sort by created date descending (newest first) params["sort"] = "-created" response = self._client._request("GET", self._endpoint, params=params) if response is None: return [] if not isinstance(response, list): return [self._to_model(response)] return [self._to_model(item) for item in response]
[docs] def list_critical( self, include_snoozed: bool = False, limit: int | None = None, ) -> builtins.list[Alarm]: """List critical alarms. Args: include_snoozed: If True, include snoozed alarms. limit: Maximum number of results. Returns: List of critical Alarm objects. """ return self.list(level="critical", include_snoozed=include_snoozed, limit=limit)
[docs] def list_errors( self, include_snoozed: bool = False, limit: int | None = None, ) -> builtins.list[Alarm]: """List error alarms. Args: include_snoozed: If True, include snoozed alarms. limit: Maximum number of results. Returns: List of error Alarm objects. """ return self.list(level="error", include_snoozed=include_snoozed, limit=limit)
[docs] def list_warnings( self, include_snoozed: bool = False, limit: int | None = None, ) -> builtins.list[Alarm]: """List warning alarms. Args: include_snoozed: If True, include snoozed alarms. limit: Maximum number of results. Returns: List of warning Alarm objects. """ return self.list(level="warning", include_snoozed=include_snoozed, limit=limit)
[docs] def list_by_owner_type( self, owner_type: str, include_snoozed: bool = False, limit: int | None = None, ) -> builtins.list[Alarm]: """List alarms by owner type. Args: owner_type: Owner type (VM, Network, Node, Tenant, User, System, CloudSnapshot). include_snoozed: If True, include snoozed alarms. limit: Maximum number of results. Returns: List of Alarm objects for the specified owner type. """ return self.list(owner_type=owner_type, include_snoozed=include_snoozed, limit=limit)
[docs] def get( self, key: int | None = None, *, name: str | None = None, fields: builtins.list[str] | None = None, ) -> Alarm: """Get an alarm by key. Note: Alarms do not have a name field, so name parameter is not supported. Args: key: Alarm $key (ID). name: Not supported for alarms (will raise ValueError). fields: List of fields to return. Returns: Alarm object. Raises: NotFoundError: If alarm not found. ValueError: If key not provided or if name is used. """ if name is not None: raise ValueError("Alarms do not have a name field. Use key instead.") if key is None: raise ValueError("key must be provided") params: dict[str, Any] = {} if fields is None: fields = _DEFAULT_ALARM_FIELDS if fields: params["fields"] = ",".join(fields) response = self._client._request("GET", f"{self._endpoint}/{key}", params=params) if response is None: raise NotFoundError(f"Alarm {key} not found") if not isinstance(response, dict): raise NotFoundError(f"Alarm {key} returned invalid response") return self._to_model(response)
[docs] def snooze(self, key: int, hours: int = 24) -> Alarm: """Snooze an alarm for a specified duration. Snoozing temporarily hides the alarm from the active alarm list. Args: key: Alarm $key. hours: Number of hours to snooze (default: 24, max: 8760 / 1 year). Returns: Updated Alarm object. Example: >>> # Snooze for 24 hours (default) >>> alarm = client.alarms.snooze(alarm_key) >>> # Snooze for 48 hours >>> alarm = client.alarms.snooze(alarm_key, hours=48) """ if hours < 1: raise ValueError("hours must be at least 1") if hours > 8760: raise ValueError("hours cannot exceed 8760 (1 year)") snooze_timestamp = int(time.time()) + (hours * 3600) self._client._request( "PUT", f"{self._endpoint}/{key}", json_data={"snooze": snooze_timestamp} ) return self.get(key)
[docs] def snooze_to(self, key: int, until: datetime) -> Alarm: """Snooze an alarm until a specific time. Args: key: Alarm $key. until: Datetime when snooze should expire. Returns: Updated Alarm object. Example: >>> from datetime import datetime, timedelta >>> next_week = datetime.now() + timedelta(days=7) >>> alarm = client.alarms.snooze_to(alarm_key, until=next_week) """ # Convert to UTC timestamp if until.tzinfo is None: # Assume local time if no timezone snooze_timestamp = int(until.timestamp()) else: snooze_timestamp = int(until.timestamp()) if snooze_timestamp <= int(time.time()): raise ValueError("snooze time must be in the future") self._client._request( "PUT", f"{self._endpoint}/{key}", json_data={"snooze": snooze_timestamp} ) return self.get(key)
[docs] def unsnooze(self, key: int) -> Alarm: """Remove snooze from an alarm, making it active again. Args: key: Alarm $key. Returns: Updated Alarm object. Example: >>> alarm = client.alarms.unsnooze(alarm_key) >>> print(alarm.is_snoozed) # False """ self._client._request("PUT", f"{self._endpoint}/{key}", json_data={"snooze": 0}) return self.get(key)
[docs] def resolve(self, key: int) -> None: """Resolve a resolvable alarm. Only alarms with resolvable=True can be resolved. After resolution, the alarm may be removed or moved to history. Args: key: Alarm $key. Raises: ValueError: If alarm is not resolvable. NotFoundError: If alarm not found. APIError: If resolve action fails. Example: >>> alarm = client.alarms.get(alarm_key) >>> if alarm.is_resolvable: ... client.alarms.resolve(alarm_key) """ # Verify alarm exists and is resolvable alarm = self.get(key) if not alarm.is_resolvable: raise ValueError(f"Alarm {key} is not resolvable") # Use row action endpoint: POST /alarms/{key}/resolve self._client._request("POST", f"{self._endpoint}/{key}/resolve")
[docs] def list_history( self, filter: str | None = None, fields: builtins.list[str] | None = None, limit: int | None = None, offset: int | None = None, *, level: str | builtins.list[str] | None = None, **filter_kwargs: Any, ) -> builtins.list[AlarmHistory]: """List alarm history (resolved/lowered alarms). Args: filter: OData filter string. fields: List of fields to return. limit: Maximum number of results. offset: Skip this many results. level: Filter by severity level (or list of levels). **filter_kwargs: Additional filter arguments. Returns: List of AlarmHistory objects sorted by lowered date (newest first). Example: >>> # Recent alarm history >>> history = client.alarms.list_history(limit=50) >>> # Critical alarm history >>> critical_history = client.alarms.list_history(level="critical") """ conditions: builtins.list[str] = [] if filter: conditions.append(f"({filter})") # Level filter if level: if isinstance(level, str): conditions.append(f"level eq '{level.lower()}'") else: level_filters = [f"level eq '{lv.lower()}'" for lv in level] if len(level_filters) == 1: conditions.append(level_filters[0]) else: conditions.append(f"({' or '.join(level_filters)})") # Add any additional filter kwargs if filter_kwargs: conditions.append(build_filter(**filter_kwargs)) # Combine conditions combined_filter = " and ".join(conditions) if conditions else None # Use default fields if not specified if fields is None: fields = _DEFAULT_HISTORY_FIELDS params: dict[str, Any] = {} if combined_filter: params["filter"] = combined_filter if fields: params["fields"] = ",".join(fields) if limit is not None: params["limit"] = limit if offset is not None: params["offset"] = offset # Sort by lowered date descending (newest first) params["sort"] = "-alarm_lowered" response = self._client._request("GET", "alarm_history", params=params) if response is None: return [] if not isinstance(response, list): return [self._to_history_model(response)] return [self._to_history_model(item) for item in response]
[docs] def get_history( self, key: int, *, fields: builtins.list[str] | None = None, ) -> AlarmHistory: """Get an alarm history entry by key. Args: key: Alarm history $key (ID). fields: List of fields to return. Returns: AlarmHistory object. Raises: NotFoundError: If alarm history not found. """ params: dict[str, Any] = {} if fields is None: fields = _DEFAULT_HISTORY_FIELDS if fields: params["fields"] = ",".join(fields) response = self._client._request("GET", f"alarm_history/{key}", params=params) if response is None: raise NotFoundError(f"Alarm history {key} not found") if not isinstance(response, dict): raise NotFoundError(f"Alarm history {key} returned invalid response") return self._to_history_model(response)
[docs] def get_summary(self) -> dict[str, Any]: """Get a summary of current alarm status. Returns: Dictionary with alarm counts by level and status. Example: >>> summary = client.alarms.get_summary() >>> print(f"Critical: {summary['critical']}") >>> print(f"Error: {summary['error']}") >>> print(f"Warning: {summary['warning']}") >>> print(f"Total: {summary['total']}") >>> print(f"Snoozed: {summary['snoozed']}") """ # Get all alarms including snoozed all_alarms = self.list(include_snoozed=True) active_alarms = [a for a in all_alarms if not a.is_snoozed] snoozed_alarms = [a for a in all_alarms if a.is_snoozed] # Count by level critical = len([a for a in active_alarms if a.level == "critical"]) error = len([a for a in active_alarms if a.level == "error"]) warning = len([a for a in active_alarms if a.level == "warning"]) message = len([a for a in active_alarms if a.level == "message"]) resolvable = len([a for a in active_alarms if a.is_resolvable]) return { "total": len(all_alarms), "active": len(active_alarms), "snoozed": len(snoozed_alarms), "critical": critical, "error": error, "warning": warning, "message": message, "resolvable": resolvable, }