Source code for pyvergeos.resources.task_events

"""Task event resource manager for VergeOS event-driven automation.

Task events (event triggers) link tasks to system events, enabling event-driven
automation. When a specific event occurs on a resource, tasks linked via task
events are automatically executed.

Event triggers allow tasks to run in response to specific system occurrences.
A single task can be triggered by multiple distinct events, enabling you to
consolidate logic and reuse task definitions across scenarios.

Event-Based Task Examples:
    - Power on a VM when a designated user logs into VergeOS
    - Power off VMs when the user logs out
    - Send an email notification when a system update completes
    - Use a webhook to notify Slack when a sync error is detected

Event Configuration:
    - **Type**: The object class (e.g., users, virtual machines, tenants, alarms)
    - **Event**: The event type (e.g., login, logout, poweron, error)
    - **Object Instance**: A specific resource, or use tags to match multiple
    - **Filter Settings**: For logs/alarms, filter by severity or log type

Example:
    >>> # List all task events
    >>> for event in client.task_events.list():
    ...     print(f"{event.event_name_display} -> {event.task_display}")

    >>> # Get events linked to a specific task
    >>> task = client.tasks.get(name="Power On Dev VMs")
    >>> for event in task.events.list():
    ...     print(f"Triggered by: {event.event_name_display}")

    >>> # Create event trigger for user login
    >>> event = client.task_events.create(
    ...     task=task.key,
    ...     owner=user.key,
    ...     event="login",
    ...     table="users",
    ... )

    >>> # Manually trigger an event for testing
    >>> event.trigger_now(context={"custom": "data"})
"""

from __future__ import annotations

import builtins
from typing import TYPE_CHECKING, Any

from pyvergeos.exceptions import NotFoundError
from pyvergeos.filters import build_filter
from pyvergeos.resources.base import ResourceManager, ResourceObject

if TYPE_CHECKING:
    from pyvergeos.client import VergeClient


[docs] class TaskEvent(ResourceObject): """Task event resource object. Represents a link between a task and a system event. Properties: owner_key: The owner resource $key. owner_table: The owner resource table name. event: Event identifier. event_name: Human-readable event name. task_key: Linked task $key. task_display: Linked task display name. filters: Event filter conditions (JSON). context: Event context data (JSON). trigger: Timer value for the event trigger. """ @property def owner_key(self) -> int | None: """Get the owner resource key. Note: Some events have non-integer owner values (e.g., 'update_settings/1'). In those cases, this property returns None. """ owner = self.get("owner") if owner is None or owner == "": return None # Owner can be an integer or a path like 'update_settings/1' try: return int(owner) except (ValueError, TypeError): return None @property def owner_table(self) -> str | None: """Get the owner resource table name.""" return self.get("table") @property def event_type(self) -> str | None: """Get the event identifier.""" return self.get("event") @property def event_name_display(self) -> str | None: """Get the human-readable event name.""" return self.get("event_name") @property def task_key(self) -> int | None: """Get the linked task key.""" task = self.get("task") return int(task) if task is not None else None @property def task_display(self) -> str: """Get the linked task display name.""" return str(self.get("task_display", "")) @property def event_filters(self) -> dict[str, Any] | None: """Get the event filter conditions.""" filters = self.get("table_event_filters") if isinstance(filters, dict): return filters return None @property def event_context(self) -> dict[str, Any] | None: """Get the event context data.""" context = self.get("context") if isinstance(context, dict): return context return None
[docs] def trigger_now(self, context: dict[str, Any] | None = None) -> dict[str, Any] | None: """Manually trigger this event. Args: context: Optional context data to pass to the task. Returns: Action response dict or None. """ from typing import cast manager = cast("TaskEventManager", self._manager) return manager.trigger(self.key, context=context)
[docs] class TaskEventManager(ResourceManager[TaskEvent]): """Manager for task event operations. Task events enable event-driven automation by linking tasks to system events. Example: >>> # List all task events >>> for event in client.task_events.list(): ... print(f"{event.event_name_display}: {event.task_display}") >>> # List events for a specific task >>> events = client.task_events.list(task=task_key) >>> # List events for a specific owner >>> events = client.task_events.list(owner=vm_key) >>> # Manually trigger an event >>> client.task_events.trigger(event_key, context={"key": "value"}) """ _endpoint = "task_events" _default_fields = [ "$key", "owner", "owner#$display as owner_display", "table", "event", "event_name", "task", "task#$display as task_display", "task#name as task_name", "table_event_filters", "trigger", "context", ]
[docs] def __init__( self, client: VergeClient, *, task_key: int | None = None, owner_key: int | None = None, ) -> None: """Initialize the event manager. Args: client: VergeClient instance. task_key: Optional task key to scope the manager. owner_key: Optional owner resource key to scope the manager. """ super().__init__(client) self._task_key = task_key self._owner_key = owner_key
def _to_model(self, data: dict[str, Any]) -> TaskEvent: return TaskEvent(data, self)
[docs] def list( self, filter: str | None = None, fields: builtins.list[str] | None = None, limit: int | None = None, offset: int | None = None, *, task: int | None = None, owner: int | None = None, table: str | None = None, event: str | None = None, **filter_kwargs: Any, ) -> builtins.list[TaskEvent]: """List task events with optional filtering. Args: filter: OData filter string. fields: List of fields to return. limit: Maximum number of results. offset: Skip this many results. task: Filter by task key. Ignored if manager is scoped to a task. owner: Filter by owner resource key. Ignored if manager is scoped. table: Filter by owner table name. event: Filter by event identifier. **filter_kwargs: Additional filter arguments. Returns: List of TaskEvent objects. Example: >>> # All events >>> events = client.task_events.list() >>> # Events for a specific task >>> events = client.task_events.list(task=task_key) >>> # Events for VMs only >>> events = client.task_events.list(table="vms") >>> # Events for a specific event type >>> events = client.task_events.list(event="poweron") """ params: dict[str, Any] = {} # Build filter conditions filters: builtins.list[str] = [] if filter: filters.append(f"({filter})") # Use scoped key or parameter task_filter = self._task_key if self._task_key is not None else task if task_filter is not None: filters.append(f"task eq {task_filter}") owner_filter = self._owner_key if self._owner_key is not None else owner if owner_filter is not None: filters.append(f"owner eq {owner_filter}") if table is not None: filters.append(f"table eq '{table}'") if event is not None: filters.append(f"event eq '{event}'") if filter_kwargs: filters.append(build_filter(**filter_kwargs)) if filters: params["filter"] = " and ".join(filters) # Use default fields if not specified if fields: params["fields"] = ",".join(fields) else: params["fields"] = ",".join(self._default_fields) # Pagination if limit is not None: params["limit"] = limit if offset is not None: params["offset"] = offset response = self._client._request("GET", self._endpoint, params=params) if response is None: return [] if not isinstance(response, list): return [self._to_model(response)] return [self._to_model(item) for item in response]
[docs] def get( # type: ignore[override] self, key: int | None = None, *, fields: builtins.list[str] | None = None, ) -> TaskEvent: """Get a task event by key. Args: key: Event $key (ID). fields: List of fields to return. Returns: TaskEvent object. Raises: NotFoundError: If event not found. ValueError: If key not provided. """ if key is None: raise ValueError("Key must be provided") params: dict[str, Any] = {} if fields: params["fields"] = ",".join(fields) else: params["fields"] = ",".join(self._default_fields) response = self._client._request("GET", f"{self._endpoint}/{key}", params=params) if response is None: raise NotFoundError(f"Task event {key} not found") if not isinstance(response, dict): raise NotFoundError(f"Task event {key} returned invalid response") return self._to_model(response)
[docs] def create( # type: ignore[override] self, task: int, event: str, *, table: str, event_name: str | None = None, table_event_filters: dict[str, Any] | None = None, context: dict[str, Any] | None = None, ) -> TaskEvent: """Create a task event. Links a task to a system event for event-driven execution. The owner is auto-populated from the parent task. Args: task: Task $key to link (create the task first). event: Event identifier (e.g. ``"lowered"``). table: Event source table (required, e.g. ``"alarms"``). The table whose events will trigger the task. event_name: Human-readable event name. table_event_filters: JSON filter conditions for the event. context: JSON context data to pass to the task. Returns: Created TaskEvent object. Example: >>> # Create a task first, then link an event >>> task = client.tasks.create( ... name="Alert on alarm", ... owner=1, ... action="send", ... table="smtp_settings", ... ) >>> # Trigger task when an alarm is lowered >>> event = client.task_events.create( ... task=task.key, ... event="lowered", ... table="alarms", ... table_event_filters={"level": "summary"}, ... ) """ body: dict[str, Any] = { "task": task, "table": table, "event": event, } if event_name is not None: body["event_name"] = event_name if table_event_filters is not None: body["table_event_filters"] = table_event_filters if context is not None: body["context"] = context response = self._client._request("POST", self._endpoint, json_data=body) if response is None: raise ValueError("No response from create operation") if not isinstance(response, dict): raise ValueError("Create operation returned invalid response") # Get the full object key = response.get("$key") if key is not None: return self.get(int(key)) return self._to_model(response)
[docs] def update( # type: ignore[override] self, key: int, *, table_event_filters: dict[str, Any] | None = None, context: dict[str, Any] | None = None, ) -> TaskEvent: """Update a task event. Args: key: Event $key (ID). table_event_filters: New filter conditions. context: New context data. Returns: Updated TaskEvent object. """ body: dict[str, Any] = {} if table_event_filters is not None: body["table_event_filters"] = table_event_filters if context is not None: body["context"] = context if not body: return self.get(key) self._client._request("PUT", f"{self._endpoint}/{key}", json_data=body) return self.get(key)
[docs] def delete(self, key: int) -> None: """Delete a task event. Removes the link between a task and event. Args: key: Event $key (ID). """ self._client._request("DELETE", f"{self._endpoint}/{key}")
[docs] def trigger( self, key: int, context: dict[str, Any] | None = None, ) -> dict[str, Any] | None: """Manually trigger a task event. Fires the event, causing the linked task to execute. Args: key: Event $key (ID). context: Optional context data to pass to the task. Returns: Action response dict or None. Example: >>> # Trigger an event >>> client.task_events.trigger(event_key) >>> # Trigger with context >>> client.task_events.trigger(event_key, context={"key": "value"}) """ params: dict[str, Any] = {} if context is not None: params["context"] = context return self.action(key, "trigger", **params)
[docs] def list_for_task( self, task_key: int, limit: int | None = None, ) -> builtins.list[TaskEvent]: """List all events for a specific task. Args: task_key: Task $key. limit: Maximum number of results. Returns: List of TaskEvent objects. """ return self.list(task=task_key, limit=limit)
[docs] def list_for_owner( self, owner_key: int, limit: int | None = None, ) -> builtins.list[TaskEvent]: """List all events for a specific owner resource. Args: owner_key: Owner resource $key. limit: Maximum number of results. Returns: List of TaskEvent objects. """ return self.list(owner=owner_key, limit=limit)
[docs] def list_by_table( self, table: str, limit: int | None = None, ) -> builtins.list[TaskEvent]: """List all events for a specific table/resource type. Args: table: Table name (e.g., "vms", "vnets", "tenants"). limit: Maximum number of results. Returns: List of TaskEvent objects. """ return self.list(table=table, limit=limit)