Source code for pyvergeos.resources.tasks

"""Task resource manager with wait functionality and scheduling support.

The VergeOS Task Engine enables automated operations triggered either by
specific events or scheduled times. Tasks define the action to perform
(e.g., power off a VM, send a notification, take a snapshot).

Task Engine Components:
    - **Tasks**: Define the action to perform on a target object
    - **Schedules**: Specify when and how often a task should run
    - **Events**: Define conditions that trigger a task (e.g., user login)
    - **Schedule Triggers**: Link tasks to schedules
    - **Event Triggers**: Link tasks to events

Event-Based Examples:
    - Power on a VM when a designated user logs into VergeOS
    - Send email notification when a system update completes
    - Use a webhook to notify an external system when sync errors occur

Schedule-Based Examples:
    - Check for system updates every Saturday at 5:00 PM
    - Power off a tenant on a specific date
    - Run backup tasks daily at 2 AM

Example:
    >>> # List all tasks
    >>> for task in client.tasks.list():
    ...     print(f"{task.name}: {task.status}")

    >>> # Create a task to power on VMs with a specific tag
    >>> task = client.tasks.create(
    ...     name="Power On Dev VMs",
    ...     owner=vm.key,
    ...     action="poweron",
    ...     description="Power on development VMs when user logs in",
    ... )

    >>> # Link task to a schedule (e.g., Fridays at 6 PM)
    >>> trigger = client.task_schedule_triggers.create(
    ...     task=task.key,
    ...     schedule=friday_schedule.key,
    ... )

    >>> # Execute a task manually
    >>> task.execute()
    >>> task.wait(timeout=300)
"""

from __future__ import annotations

import builtins
import time
from typing import TYPE_CHECKING, Any

from pyvergeos.constants import POLL_INTERVAL, TASK_WAIT_TIMEOUT
from pyvergeos.exceptions import NotFoundError, TaskError, TaskTimeoutError
from pyvergeos.filters import build_filter
from pyvergeos.resources.base import ResourceManager, ResourceObject

if TYPE_CHECKING:
    from pyvergeos.client import VergeClient
    from pyvergeos.resources.task_events import TaskEventManager
    from pyvergeos.resources.task_schedule_triggers import TaskScheduleTriggerManager


# Default fields to request for task list operations
_DEFAULT_LIST_FIELDS = [
    "$key",
    "name",
    "description",
    "enabled",
    "status",
    "action",
    "action_display",
    "table",
    "owner",
    "owner#$display as owner_display",
    "creator",
    "creator#$display as creator_display",
    "last_run",
    "delete_after_run",
    "system_created",
    "id",
    "count(triggers) as triggers_count",
    "count(events) as events_count",
]


[docs] class Task(ResourceObject): """Task resource object. Represents a scheduled or running task in VergeOS. Properties: name: Task name. description: Task description. status: Task status ('idle' or 'running'). is_complete: True if task is complete (idle). is_running: True if task is currently running. is_enabled: True if task is enabled. has_error: True if task has an error status. progress: Task progress percentage (0-100). action: The action type this task performs. action_display: Human-readable action description. owner_key: Key of the owner object. owner_display: Display name of the owner. creator_key: Key of the user who created the task. creator_display: Display name of the creator. last_run: Timestamp of last run. delete_after_run: Whether task deletes itself after running. task_id: Unique task identifier (40-char hex string). """ @property def is_complete(self) -> bool: """Check if task is complete (idle).""" return self.get("status") == "idle" @property def is_running(self) -> bool: """Check if task is running.""" return self.get("status") == "running" @property def is_enabled(self) -> bool: """Check if task is enabled.""" return bool(self.get("enabled", False)) @property def has_error(self) -> bool: """Check if task has an error.""" return self.get("status") == "error" @property def progress(self) -> int: """Get task progress percentage.""" return int(self.get("progress", 0)) @property def owner_key(self) -> int | None: """Get owner object key.""" owner = self.get("owner") return int(owner) if owner is not None else None @property def owner_display(self) -> str: """Get owner display name.""" return str(self.get("owner_display", "")) @property def creator_key(self) -> int | None: """Get creator user key.""" creator = self.get("creator") return int(creator) if creator is not None else None @property def creator_display(self) -> str: """Get creator display name.""" return str(self.get("creator_display", "")) @property def task_id(self) -> str: """Get unique task ID (40-char hex string).""" return str(self.get("id", "")) @property def owner_table(self) -> str | None: """Get owner table name (resource type).""" return self.get("table") @property def action_type(self) -> str | None: """Get the action type this task performs.""" return self.get("action") @property def action_display_name(self) -> str | None: """Get human-readable action description.""" return self.get("action_display") @property def is_delete_after_run(self) -> bool: """Check if task deletes itself after running.""" return bool(self.get("delete_after_run", False)) @property def is_system_created(self) -> bool: """Check if task was created by system.""" return bool(self.get("system_created", False)) @property def trigger_count(self) -> int: """Get number of schedule triggers.""" return int(self.get("triggers_count", 0)) @property def event_count(self) -> int: """Get number of event triggers.""" return int(self.get("events_count", 0)) @property def triggers(self) -> TaskScheduleTriggerManager: """Get trigger manager scoped to this task. Returns: TaskScheduleTriggerManager for this task's triggers. Example: >>> for trigger in task.triggers.list(): ... print(f"Schedule: {trigger.schedule_display}") """ from typing import cast from pyvergeos.resources.task_schedule_triggers import TaskScheduleTriggerManager manager = cast("TaskManager", self._manager) return TaskScheduleTriggerManager(manager._client, task_key=self.key) @property def events(self) -> TaskEventManager: """Get event manager scoped to this task. Returns: TaskEventManager for this task's events. Example: >>> for event in task.events.list(): ... print(f"Event: {event.event_name_display}") """ from typing import cast from pyvergeos.resources.task_events import TaskEventManager manager = cast("TaskManager", self._manager) return TaskEventManager(manager._client, task_key=self.key)
[docs] def enable(self) -> Task: """Enable this task. Returns: Updated Task object. """ from typing import cast manager = cast("TaskManager", self._manager) return manager.enable(self.key)
[docs] def disable(self) -> Task: """Disable this task. Returns: Updated Task object. """ from typing import cast manager = cast("TaskManager", self._manager) return manager.disable(self.key)
[docs] def execute(self, **params: Any) -> Task: """Execute this task immediately. Args: **params: Optional parameters to pass to the task. Returns: Updated Task object. """ from typing import cast manager = cast("TaskManager", self._manager) return manager.execute(self.key, **params)
[docs] def wait( self, timeout: int = TASK_WAIT_TIMEOUT, poll_interval: int = POLL_INTERVAL, raise_on_error: bool = True, ) -> Task: """Wait for this task to complete. Args: timeout: Maximum wait time in seconds (0 = infinite). poll_interval: Seconds between status checks. raise_on_error: Raise TaskError if task fails. Returns: Completed Task object. Raises: TaskTimeoutError: If timeout exceeded. TaskError: If task fails and raise_on_error=True. """ from typing import cast manager = cast("TaskManager", self._manager) return manager.wait( self.key, timeout=timeout, poll_interval=poll_interval, raise_on_error=raise_on_error, )
[docs] class TaskManager(ResourceManager[Task]): """Manager for Task operations with wait functionality. Tasks in VergeOS represent scheduled automation operations. They can be scheduled to run at specific times or triggered by events. Tasks can also be run manually. Example: >>> # List all tasks >>> tasks = client.tasks.list() >>> for task in tasks: ... print(f"{task.name}: {task.status}") >>> # List running tasks >>> running = client.tasks.list_running() >>> # Wait for a task to complete >>> task = client.tasks.wait(task_key, timeout=300) >>> # Enable/disable a task >>> client.tasks.enable(task_key) >>> client.tasks.disable(task_key) >>> # Execute a task manually >>> task = client.tasks.execute(task_key) """ _endpoint = "tasks"
[docs] def __init__(self, client: VergeClient) -> None: super().__init__(client)
def _to_model(self, data: dict[str, Any]) -> Task: return Task(data, self)
[docs] def list( self, filter: str | None = None, fields: builtins.list[str] | None = None, limit: int | None = None, offset: int | None = None, *, status: str | None = None, running: bool | None = None, enabled: bool | None = None, name: str | None = None, **filter_kwargs: Any, ) -> builtins.list[Task]: """List tasks with optional filtering. Args: filter: OData filter string. fields: List of fields to return. limit: Maximum number of results. offset: Skip this many results. status: Filter by status ('running' or 'idle'). running: If True, filter for running tasks only. enabled: Filter by enabled state. name: Filter by name (supports partial match with 'ct' operator). **filter_kwargs: Additional filter arguments. Returns: List of Task objects. Example: >>> # All tasks >>> tasks = client.tasks.list() >>> # Running tasks only >>> running = client.tasks.list(running=True) >>> # Disabled tasks >>> disabled = client.tasks.list(enabled=False) >>> # Tasks by name pattern >>> backup_tasks = client.tasks.list(name="Backup") """ # Build filter conditions conditions: builtins.list[str] = [] if filter: conditions.append(f"({filter})") if running is True: conditions.append("status eq 'running'") elif running is False: conditions.append("status eq 'idle'") elif status: conditions.append(f"status eq '{status.lower()}'") if enabled is not None: conditions.append(f"enabled eq {str(enabled).lower()}") if name: # Check if name contains wildcards if "*" in name or "?" in name: # Use contains for partial match search_term = name.replace("*", "").replace("?", "") if search_term: conditions.append(f"name ct '{search_term}'") else: conditions.append(f"name eq '{name}'") # Add any additional filter kwargs if filter_kwargs: conditions.append(build_filter(**filter_kwargs)) # Combine conditions combined_filter = " and ".join(conditions) if conditions else None # Use default fields if not specified if fields is None: fields = _DEFAULT_LIST_FIELDS params: dict[str, Any] = {} if combined_filter: params["filter"] = combined_filter if fields: params["fields"] = ",".join(fields) if limit is not None: params["limit"] = limit if offset is not None: params["offset"] = offset response = self._client._request("GET", self._endpoint, params=params) if response is None: return [] if not isinstance(response, list): return [self._to_model(response)] return [self._to_model(item) for item in response]
[docs] def list_running( self, fields: builtins.list[str] | None = None, limit: int | None = None, ) -> builtins.list[Task]: """List running tasks. Args: fields: List of fields to return. limit: Maximum number of results. Returns: List of running Task objects. """ return self.list(running=True, fields=fields, limit=limit)
[docs] def list_idle( self, fields: builtins.list[str] | None = None, limit: int | None = None, ) -> builtins.list[Task]: """List idle tasks. Args: fields: List of fields to return. limit: Maximum number of results. Returns: List of idle Task objects. """ return self.list(running=False, fields=fields, limit=limit)
[docs] def list_enabled( self, fields: builtins.list[str] | None = None, limit: int | None = None, ) -> builtins.list[Task]: """List enabled tasks. Args: fields: List of fields to return. limit: Maximum number of results. Returns: List of enabled Task objects. """ return self.list(enabled=True, fields=fields, limit=limit)
[docs] def list_disabled( self, fields: builtins.list[str] | None = None, limit: int | None = None, ) -> builtins.list[Task]: """List disabled tasks. Args: fields: List of fields to return. limit: Maximum number of results. Returns: List of disabled Task objects. """ return self.list(enabled=False, fields=fields, limit=limit)
[docs] def get( self, key: int | None = None, *, name: str | None = None, fields: builtins.list[str] | None = None, ) -> Task: """Get a task by key or name. Args: key: Task $key (ID). name: Task name. fields: List of fields to return. Returns: Task object. Raises: NotFoundError: If task not found. ValueError: If neither key nor name provided. """ if key is not None: params: dict[str, Any] = {} if fields is None: fields = _DEFAULT_LIST_FIELDS if fields: params["fields"] = ",".join(fields) response = self._client._request("GET", f"{self._endpoint}/{key}", params=params) if response is None: raise NotFoundError(f"Task {key} not found") if not isinstance(response, dict): raise NotFoundError(f"Task {key} returned invalid response") return self._to_model(response) if name is not None: escaped_name = name.replace("'", "''") results = self.list(filter=f"name eq '{escaped_name}'", fields=fields, limit=1) if not results: raise NotFoundError(f"Task with name '{name}' not found") return results[0] raise ValueError("Either key or name must be provided")
[docs] def wait( self, key: int, timeout: int = TASK_WAIT_TIMEOUT, poll_interval: int = POLL_INTERVAL, raise_on_error: bool = True, ) -> Task: """Wait for a task to complete. Polls the task status until it becomes idle or an error occurs. Args: key: Task $key. timeout: Maximum wait time in seconds (0 = infinite). poll_interval: Seconds between status checks. raise_on_error: Raise TaskError if task fails. Returns: Completed Task object. Raises: TaskTimeoutError: If timeout exceeded. TaskError: If task fails and raise_on_error=True. Example: >>> # Wait for task with 5 minute timeout >>> task = client.tasks.wait(task_key, timeout=300) >>> # Wait indefinitely >>> task = client.tasks.wait(task_key, timeout=0) >>> # Don't raise on error, handle manually >>> task = client.tasks.wait(task_key, raise_on_error=False) >>> if task.has_error: ... print(f"Task failed: {task.get('error')}") """ start_time = time.time() while True: task = self.get(key) if task.is_complete: return task if task.has_error: if raise_on_error: error_msg = task.get("error", "Task failed") raise TaskError(str(error_msg), task_id=key) return task # Check timeout if timeout > 0 and (time.time() - start_time) > timeout: raise TaskTimeoutError( f"Task {key} did not complete within {timeout} seconds", task_id=key, ) time.sleep(poll_interval)
[docs] def enable(self, key: int) -> Task: """Enable a task. Enables a previously disabled task so it can run according to its schedule or event triggers. Args: key: Task $key. Returns: Updated Task object. Example: >>> task = client.tasks.enable(task_key) >>> print(task.is_enabled) # True """ self._client._request("PUT", f"{self._endpoint}/{key}", json_data={"enabled": True}) return self.get(key)
[docs] def disable(self, key: int) -> Task: """Disable a task. Disables a task to prevent it from running. If the task is currently running, it will complete but won't run again until re-enabled. Args: key: Task $key. Returns: Updated Task object. Example: >>> task = client.tasks.disable(task_key) >>> print(task.is_enabled) # False """ self._client._request("PUT", f"{self._endpoint}/{key}", json_data={"enabled": False}) return self.get(key)
[docs] def execute(self, key: int, **params: Any) -> Task: """Execute a task immediately. Triggers a task to run now, regardless of its schedule. Args: key: Task $key. **params: Optional parameters to pass to the task. Returns: Updated Task object. Example: >>> # Run a backup task immediately >>> task = client.tasks.execute(backup_task_key) >>> task.wait() # Wait for completion """ self.action(key, "execute", params=params if params else {}) return self.get(key)
[docs] def cancel(self, key: int) -> Task: """Cancel a running task. Attempts to cancel a task that is currently running. Args: key: Task $key. Returns: Updated Task object. Note: Not all tasks support cancellation. Some tasks may complete their current operation before stopping. """ self.action(key, "cancel") return self.get(key)
[docs] def create( # type: ignore[override] self, name: str, owner: int, action: str, *, table: str, description: str | None = None, enabled: bool = True, delete_after_run: bool = False, settings_args: dict[str, Any] | None = None, ) -> Task: """Create a new task. Args: name: Task name (required). owner: Owner resource $key (required). action: Action type to perform (required). table: Owner table name (required, e.g. ``"vms"``). Combined with owner to form the ``"table/key"`` reference. description: Task description. enabled: Whether task is enabled (default True). delete_after_run: Delete task after execution (default False). settings_args: Task settings arguments (JSON). Returns: Created Task object. Example: >>> # Create a snapshot task for a VM >>> task = client.tasks.create( ... name="Daily Snapshot", ... owner=vm.key, ... action="snapshot", ... table="vms", ... description="Daily snapshot of production VM", ... ) >>> # Create a one-time task that deletes itself >>> task = client.tasks.create( ... name="One-time Backup", ... owner=vm.key, ... action="snapshot", ... table="vms", ... delete_after_run=True, ... ) """ body: dict[str, Any] = { "name": name, "owner": f"{table}/{owner}", "action": action, "enabled": enabled, "delete_after_run": delete_after_run, } if description is not None: body["description"] = description if settings_args is not None: body["settings_args"] = settings_args response = self._client._request("POST", self._endpoint, json_data=body) if response is None: raise ValueError("No response from create operation") if not isinstance(response, dict): raise ValueError("Create operation returned invalid response") # Get the full object key = response.get("$key") if key is not None: return self.get(int(key)) return self._to_model(response)
[docs] def update( # type: ignore[override] self, key: int, *, name: str | None = None, description: str | None = None, enabled: bool | None = None, delete_after_run: bool | None = None, settings_args: dict[str, Any] | None = None, ) -> Task: """Update an existing task. Args: key: Task $key (ID). name: New name. description: New description. enabled: Enable/disable task. delete_after_run: Update delete after run setting. settings_args: New task settings. Returns: Updated Task object. Example: >>> task = client.tasks.update( ... task.key, ... description="Updated description", ... enabled=False, ... ) """ body: dict[str, Any] = {} if name is not None: body["name"] = name if description is not None: body["description"] = description if enabled is not None: body["enabled"] = enabled if delete_after_run is not None: body["delete_after_run"] = delete_after_run if settings_args is not None: body["settings_args"] = settings_args if not body: return self.get(key) self._client._request("PUT", f"{self._endpoint}/{key}", json_data=body) return self.get(key)
[docs] def delete(self, key: int) -> None: """Delete a task. Removes the task and its associated triggers and events. Args: key: Task $key (ID). Example: >>> client.tasks.delete(task.key) """ self._client._request("DELETE", f"{self._endpoint}/{key}")
[docs] def triggers(self, key: int) -> TaskScheduleTriggerManager: """Get trigger manager scoped to a specific task. Args: key: Task $key (ID). Returns: TaskScheduleTriggerManager for the task. Example: >>> for trigger in client.tasks.triggers(task.key).list(): ... print(f"Schedule: {trigger.schedule_display}") """ from pyvergeos.resources.task_schedule_triggers import TaskScheduleTriggerManager return TaskScheduleTriggerManager(self._client, task_key=key)
[docs] def events(self, key: int) -> TaskEventManager: """Get event manager scoped to a specific task. Args: key: Task $key (ID). Returns: TaskEventManager for the task. Example: >>> for event in client.tasks.events(task.key).list(): ... print(f"Event: {event.event_name_display}") """ from pyvergeos.resources.task_events import TaskEventManager return TaskEventManager(self._client, task_key=key)
[docs] def list_by_owner( self, owner_key: int, fields: builtins.list[str] | None = None, limit: int | None = None, ) -> builtins.list[Task]: """List tasks for a specific owner resource. Args: owner_key: Owner resource $key. fields: List of fields to return. limit: Maximum number of results. Returns: List of Task objects. """ return self.list(filter=f"owner eq {owner_key}", fields=fields, limit=limit)
[docs] def list_by_action( self, action: str, fields: builtins.list[str] | None = None, limit: int | None = None, ) -> builtins.list[Task]: """List tasks by action type. Args: action: Action type (e.g., "snapshot", "poweron"). fields: List of fields to return. limit: Maximum number of results. Returns: List of Task objects. """ return self.list(filter=f"action eq '{action}'", fields=fields, limit=limit)