Source code for pyvergeos.resources.webhooks

"""Webhook resource manager for VergeOS notification integrations."""

from __future__ import annotations

import builtins
import json
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Any

from pyvergeos.exceptions import NotFoundError
from pyvergeos.filters import build_filter
from pyvergeos.resources.base import ResourceManager, ResourceObject

if TYPE_CHECKING:
    from pyvergeos.client import VergeClient


# Authorization type mappings (friendly name -> API value)
AUTH_TYPE_MAP = {
    "None": "none",
    "Basic": "basic",
    "Bearer": "bearer",
    "ApiKey": "apikey",
}

# Reverse mapping (API value -> friendly name)
AUTH_TYPE_DISPLAY = {
    "none": "None",
    "basic": "Basic",
    "bearer": "Bearer",
    "apikey": "API Key",
}

# Status mappings (API value -> friendly name)
STATUS_DISPLAY = {
    "queued": "Queued",
    "running": "Running",
    "sent": "Sent",
    "error": "Error",
}


# Default fields for webhook list operations
_DEFAULT_WEBHOOK_FIELDS = [
    "$key",
    "name",
    "type",
    "url",
    "headers",
    "authorization_type",
    "allow_insecure",
    "timeout",
    "retries",
]

# Default fields for webhook history list operations
_DEFAULT_HISTORY_FIELDS = [
    "$key",
    "webhook_url",
    "message",
    "status",
    "status_info",
    "last_attempt",
    "created",
]


[docs] class Webhook(ResourceObject): """Webhook URL configuration resource object. Represents a webhook URL configuration in VergeOS that can receive notifications when events occur. Properties: name: Webhook name (unique identifier). webhook_type: Webhook type (currently only 'custom'). url: Destination URL for webhook payloads. headers: Dict of custom HTTP headers. headers_raw: Raw header string in "Name:Value" format. authorization_type: Auth method (none, basic, bearer, apikey). authorization_type_display: Friendly auth type name. is_insecure: True if insecure SSL connections are allowed. timeout: Request timeout in seconds (3-120). retries: Number of retry attempts (0-100). """ @property def name(self) -> str: """Get webhook name.""" return str(self.get("name", "")) @property def webhook_type(self) -> str: """Get webhook type (currently only 'custom').""" return str(self.get("type", "custom")) @property def url(self) -> str: """Get destination URL.""" return str(self.get("url", "")) @property def headers(self) -> dict[str, str]: """Get headers as dictionary.""" headers_raw = self.get("headers", "") if not headers_raw: return {} result: dict[str, str] = {} for line in str(headers_raw).split("\n"): line = line.strip() if line and ":" in line: # Split on first colon only parts = line.split(":", 1) if len(parts) == 2: result[parts[0].strip()] = parts[1].strip() return result @property def headers_raw(self) -> str: """Get raw header string in 'Name:Value' format.""" return str(self.get("headers", "")) @property def authorization_type(self) -> str: """Get authorization type (API value).""" return str(self.get("authorization_type", "none")) @property def authorization_type_display(self) -> str: """Get friendly authorization type name.""" return AUTH_TYPE_DISPLAY.get(self.authorization_type, self.authorization_type) @property def is_insecure(self) -> bool: """Check if insecure SSL connections are allowed.""" return bool(self.get("allow_insecure", False)) @property def timeout(self) -> int: """Get request timeout in seconds.""" return int(self.get("timeout", 5)) @property def retries(self) -> int: """Get number of retry attempts.""" return int(self.get("retries", 3))
[docs] def send(self, message: str | dict[str, Any] | None = None) -> dict[str, Any] | None: """Send a test/manual message to this webhook. Args: message: JSON message payload. Can be a JSON string or dict. Defaults to a simple test message. Returns: Action response (may include task info). """ from typing import cast manager = cast("WebhookManager", self._manager) return manager.send(self.key, message=message)
[docs] def history( self, *, status: str | None = None, pending: bool = False, failed: bool = False, limit: int = 100, ) -> builtins.list[WebhookHistory]: """Get execution history for this webhook. Args: status: Filter by status (queued, running, sent, error). pending: If True, show only pending (queued/running) messages. failed: If True, show only failed (error) messages. limit: Maximum number of entries to return. Returns: List of WebhookHistory objects. """ from typing import cast manager = cast("WebhookManager", self._manager) return manager.history( webhook_key=self.key, status=status, pending=pending, failed=failed, limit=limit, )
def __repr__(self) -> str: return f"<Webhook key={self.get('$key', '?')} name={self.name!r} url={self.url!r}>"
[docs] class WebhookHistory(ResourceObject): """Webhook execution history resource object. Represents a webhook message delivery attempt and its status. Properties: webhook_key: Key of the parent webhook URL. status: Delivery status (API value). status_display: Friendly status name. status_info: Additional status/error information. message: Parsed message payload (if JSON). message_raw: Raw message string. is_pending: True if message is queued or running. is_sent: True if message was successfully sent. has_error: True if delivery failed. last_attempt_at: Datetime of last delivery attempt. created_at: Datetime when message was queued. """ @property def webhook_key(self) -> int | None: """Get parent webhook URL key.""" val = self.get("webhook_url") return int(val) if val is not None else None @property def status(self) -> str: """Get delivery status (API value).""" return str(self.get("status", "")) @property def status_display(self) -> str: """Get friendly status name.""" return STATUS_DISPLAY.get(self.status, self.status) @property def status_info(self) -> str: """Get additional status/error information.""" return str(self.get("status_info", "")) @property def message(self) -> dict[str, Any] | str | None: """Get parsed message payload (if JSON).""" raw = self.get("message") if not raw: return None try: result = json.loads(str(raw)) if isinstance(result, dict): return result return str(raw) except (json.JSONDecodeError, TypeError): return str(raw) @property def message_raw(self) -> str: """Get raw message string.""" return str(self.get("message", "")) @property def is_pending(self) -> bool: """Check if message is pending (queued or running).""" return self.status in ("queued", "running") @property def is_sent(self) -> bool: """Check if message was successfully sent.""" return self.status == "sent" @property def has_error(self) -> bool: """Check if delivery failed.""" return self.status == "error" @property def last_attempt_at(self) -> datetime | None: """Get datetime of last delivery attempt.""" ts = self.get("last_attempt") if ts is None or ts == 0: return None return datetime.fromtimestamp(int(ts), tz=timezone.utc) @property def created_at(self) -> datetime | None: """Get datetime when message was queued.""" ts = self.get("created") if ts is None: return None return datetime.fromtimestamp(int(ts), tz=timezone.utc) def __repr__(self) -> str: return f"<WebhookHistory key={self.get('$key', '?')} status={self.status_display!r}>"
[docs] class WebhookManager(ResourceManager[Webhook]): """Manager for webhook URL configurations. Provides CRUD operations for webhook URL configurations and methods to send test messages and view delivery history. Example: >>> # List all webhooks >>> webhooks = client.webhooks.list() >>> >>> # Create a webhook >>> webhook = client.webhooks.create( ... name="slack-alerts", ... url="https://hooks.slack.com/services/xxx", ... authorization_type="none" ... ) >>> >>> # Send a test message >>> client.webhooks.send(webhook.key, message='{"text": "Hello!"}') >>> >>> # View delivery history >>> history = client.webhooks.history(webhook_key=webhook.key) """ _endpoint = "webhook_urls"
[docs] def __init__(self, client: VergeClient) -> None: super().__init__(client)
def _to_model(self, data: dict[str, Any]) -> Webhook: """Convert API response to Webhook object.""" return Webhook(data, self)
[docs] def list( self, filter: str | None = None, fields: builtins.list[str] | None = None, limit: int | None = None, offset: int | None = None, *, authorization_type: str | None = None, **filter_kwargs: Any, ) -> builtins.list[Webhook]: """List webhooks with optional filtering. Args: filter: OData filter string. fields: List of fields to return. limit: Maximum number of results. offset: Skip this many results. authorization_type: Filter by auth type (None, Basic, Bearer, ApiKey). **filter_kwargs: Additional filter arguments. Returns: List of Webhook objects. """ params: dict[str, Any] = {} filters: builtins.list[str] = [] # Build filter from string if filter: filters.append(filter) # Filter by authorization type if authorization_type: api_auth_type = AUTH_TYPE_MAP.get(authorization_type, authorization_type.lower()) filters.append(f"authorization_type eq '{api_auth_type}'") # Add filter kwargs if filter_kwargs: filters.append(build_filter(**filter_kwargs)) if filters: params["filter"] = " and ".join(filters) # Field selection if fields: params["fields"] = ",".join(fields) else: params["fields"] = ",".join(_DEFAULT_WEBHOOK_FIELDS) # Pagination if limit is not None: params["limit"] = limit if offset is not None: params["offset"] = offset response = self._client._request("GET", self._endpoint, params=params) if response is None: return [] if not isinstance(response, list): return [self._to_model(response)] return [self._to_model(item) for item in response]
[docs] def get( self, key: int | None = None, *, name: str | None = None, fields: builtins.list[str] | None = None, ) -> Webhook: """Get a webhook by key or name. Args: key: Webhook $key (ID). name: Webhook name (exact match). fields: List of fields to return. Returns: Webhook object. Raises: NotFoundError: If webhook not found. ValueError: If neither key nor name provided. """ field_list = fields or _DEFAULT_WEBHOOK_FIELDS if key is not None: params: dict[str, Any] = {"fields": ",".join(field_list)} response = self._client._request("GET", f"{self._endpoint}/{key}", params=params) if response is None: raise NotFoundError(f"Webhook with key {key} not found") if not isinstance(response, dict): raise NotFoundError(f"Webhook with key {key} returned invalid response") return self._to_model(response) if name is not None: escaped_name = name.replace("'", "''") results = self.list(filter=f"name eq '{escaped_name}'", fields=field_list, limit=1) if not results: raise NotFoundError(f"Webhook with name '{name}' not found") return results[0] raise ValueError("Either key or name must be provided")
[docs] def create( # type: ignore[override] self, name: str, url: str, *, headers: dict[str, str] | str | None = None, authorization_type: str = "None", authorization_value: str | None = None, allow_insecure: bool = False, timeout: int | None = None, retries: int | None = None, ) -> Webhook: """Create a new webhook URL configuration. Args: name: Webhook name (unique). url: Destination URL (must start with http:// or https://). headers: Custom HTTP headers as dict or "Name:Value" string. authorization_type: Auth method (None, Basic, Bearer, ApiKey). authorization_value: Auth credential value. - Basic: "username:password" (will be base64 encoded) - Bearer: token value - ApiKey: key value allow_insecure: Allow insecure SSL connections. timeout: Request timeout in seconds (3-120, default 5). retries: Number of retry attempts (0-100, default 3). Returns: Created Webhook object. Raises: ValidationError: If parameters invalid. ConflictError: If webhook with name already exists. """ body: dict[str, Any] = { "name": name, "url": url, } # Process headers if headers: if isinstance(headers, dict): header_lines = [f"{k}:{v}" for k, v in headers.items()] body["headers"] = "\n".join(header_lines) + "\n" else: # String format body["headers"] = headers if headers.endswith("\n") else f"{headers}\n" # Authorization api_auth_type = AUTH_TYPE_MAP.get(authorization_type, authorization_type.lower()) body["authorization_type"] = api_auth_type if authorization_value: body["authorization_value"] = authorization_value # Optional settings if allow_insecure: body["allow_insecure"] = True if timeout is not None: body["timeout"] = timeout if retries is not None: body["retries"] = retries response = self._client._request("POST", self._endpoint, json_data=body) if response is None: raise ValueError("No response from create operation") if not isinstance(response, dict): raise ValueError("Create operation returned invalid response") # Fetch full object since POST response may not include all fields key = response.get("$key") if key is not None: return self.get(int(key)) return self._to_model(response)
[docs] def update( # type: ignore[override] self, key: int, *, name: str | None = None, url: str | None = None, headers: dict[str, str] | str | None = None, authorization_type: str | None = None, authorization_value: str | None = None, allow_insecure: bool | None = None, timeout: int | None = None, retries: int | None = None, ) -> Webhook: """Update a webhook configuration. Args: key: Webhook $key (ID). name: New webhook name. url: New destination URL. headers: New custom HTTP headers. Pass empty dict to clear. authorization_type: New auth method. authorization_value: New auth credential value. allow_insecure: New allow insecure setting. timeout: New request timeout. retries: New retry count. Returns: Updated Webhook object. Raises: NotFoundError: If webhook not found. ValidationError: If parameters invalid. """ body: dict[str, Any] = {} if name is not None: body["name"] = name if url is not None: body["url"] = url if headers is not None: if isinstance(headers, dict): if headers: header_lines = [f"{k}:{v}" for k, v in headers.items()] body["headers"] = "\n".join(header_lines) + "\n" else: body["headers"] = "" else: body["headers"] = headers if headers.endswith("\n") else f"{headers}\n" if authorization_type is not None: api_auth_type = AUTH_TYPE_MAP.get(authorization_type, authorization_type.lower()) body["authorization_type"] = api_auth_type if authorization_value is not None: body["authorization_value"] = authorization_value if allow_insecure is not None: body["allow_insecure"] = allow_insecure if timeout is not None: body["timeout"] = timeout if retries is not None: body["retries"] = retries if not body: # No changes, just fetch current return self.get(key) response = self._client._request("PUT", f"{self._endpoint}/{key}", json_data=body) if response is None or not isinstance(response, dict): return self.get(key) return self._to_model(response)
[docs] def delete(self, key: int) -> None: """Delete a webhook configuration. This also deletes any pending messages in the queue for this webhook. Args: key: Webhook $key (ID). Raises: NotFoundError: If webhook not found. """ self._client._request("DELETE", f"{self._endpoint}/{key}")
[docs] def send( self, key: int, *, message: str | dict[str, Any] | None = None, ) -> dict[str, Any] | None: """Send a message to a webhook. The message is queued for delivery. Use history() to check delivery status. Args: key: Webhook $key (ID). message: JSON message payload. Can be a JSON string or dict. Defaults to a simple test message. Returns: Action response (may include task info). Raises: NotFoundError: If webhook not found. """ # Process message if message is None: message_json = '{"text": "Webhook test from pyvergeos"}' elif isinstance(message, dict): message_json = json.dumps(message) else: message_json = message body = {"message": message_json} # Use action endpoint: webhook_urls/{id}/send endpoint = f"{self._endpoint}/{key}/send" response = self._client._request("POST", endpoint, json_data=body) if isinstance(response, dict): return response return None
[docs] def history( self, key: int | None = None, *, webhook_key: int | None = None, webhook_name: str | None = None, status: str | None = None, pending: bool = False, failed: bool = False, limit: int = 100, fields: builtins.list[str] | None = None, ) -> builtins.list[WebhookHistory]: """Get webhook execution history. Args: key: History entry $key (ID) to get specific entry. webhook_key: Filter by webhook $key. webhook_name: Filter by webhook name. status: Filter by status (queued, running, sent, error). pending: If True, show only pending (queued/running) messages. failed: If True, show only failed (error) messages. limit: Maximum number of entries to return. fields: List of fields to return. Returns: List of WebhookHistory objects. """ params: dict[str, Any] = {} filters: builtins.list[str] = [] # Get specific entry by key if key is not None: filters.append(f"$key eq {key}") else: # Resolve webhook name to key resolved_webhook_key = webhook_key if webhook_name: webhook = self.get(name=webhook_name) resolved_webhook_key = webhook.key # Filter by webhook URL if resolved_webhook_key is not None: filters.append(f"webhook_url eq {resolved_webhook_key}") # Filter by status if status: api_status = status.lower() filters.append(f"status eq '{api_status}'") elif pending: filters.append("(status eq 'queued' or status eq 'running')") elif failed: filters.append("status eq 'error'") # Limit results params["limit"] = limit # Apply filters if filters: params["filter"] = " and ".join(filters) # Field selection if fields: params["fields"] = ",".join(fields) else: params["fields"] = ",".join(_DEFAULT_HISTORY_FIELDS) # Sort by created descending (newest first) params["sort"] = "-created" response = self._client._request("GET", "webhooks", params=params) if response is None: return [] if not isinstance(response, list): return [WebhookHistory(response, self)] return [WebhookHistory(item, self) for item in response]
[docs] def get_history(self, key: int) -> WebhookHistory: """Get a specific webhook history entry by key. Args: key: History entry $key (ID). Returns: WebhookHistory object. Raises: NotFoundError: If history entry not found. """ results = self.history(key=key, limit=1) if not results: raise NotFoundError(f"Webhook history entry with key {key} not found") return results[0]
[docs] def list_pending( self, webhook_key: int | None = None, limit: int = 100 ) -> builtins.list[WebhookHistory]: """List pending webhook messages (queued or running). Args: webhook_key: Filter by webhook $key. limit: Maximum number of entries to return. Returns: List of WebhookHistory objects. """ return self.history(webhook_key=webhook_key, pending=True, limit=limit)
[docs] def list_failed( self, webhook_key: int | None = None, limit: int = 100 ) -> builtins.list[WebhookHistory]: """List failed webhook messages. Args: webhook_key: Filter by webhook $key. limit: Maximum number of entries to return. Returns: List of WebhookHistory objects. """ return self.history(webhook_key=webhook_key, failed=True, limit=limit)