"""Site sync resource managers for VergeOS backup/DR operations."""
from __future__ import annotations
import builtins
from datetime import datetime, timedelta, timezone
from typing import TYPE_CHECKING, Any
from pyvergeos.exceptions import NotFoundError, ValidationError
from pyvergeos.filters import build_filter
from pyvergeos.resources.base import ResourceManager, ResourceObject
if TYPE_CHECKING:
from pyvergeos.client import VergeClient
# Default fields for outgoing sync list operations
_DEFAULT_OUTGOING_SYNC_FIELDS = [
"$key",
"site",
"name",
"description",
"enabled",
"status",
"status_info",
"state",
"url",
"encryption",
"compression",
"netinteg",
"threads",
"file_threads",
"sendthrottle",
"destination_tier",
"queue_retry_count",
"queue_retry_interval_seconds",
"queue_retry_interval_multiplier",
"last_run",
"remote_min_snapshots",
"note",
]
# Default fields for incoming sync list operations
_DEFAULT_INCOMING_SYNC_FIELDS = [
"$key",
"site",
"name",
"description",
"enabled",
"status",
"status_info",
"state",
"sync_id",
"registration_code",
"public_ip",
"force_tier",
"min_snapshots",
"last_sync",
"vsan_host",
"vsan_port",
"request_url",
"system_created",
]
# Default fields for sync schedule list operations
_DEFAULT_SCHEDULE_FIELDS = [
"$key",
"site_syncs_outgoing",
"profile_period",
"retention",
"priority",
"do_not_expire",
"destination_prefix",
]
[docs]
class SiteSyncOutgoing(ResourceObject):
"""Outgoing site sync resource object.
Outgoing syncs send cloud snapshots from this system to a remote site
for disaster recovery purposes.
Properties:
site_key: Key of the associated site.
name: Sync name.
description: Sync description.
is_enabled: Whether the sync is enabled.
status: Sync status (offline, initializing, syncing, error).
status_info: Additional status information.
state: Sync state (offline, online, warning, error).
url: Remote URL.
has_encryption: Whether encryption is enabled.
has_compression: Whether compression is enabled.
has_network_integrity: Whether network integrity checking is enabled.
data_threads: Number of data threads.
file_threads: Number of file threads.
send_throttle: Send throttle in bytes/sec (0 = disabled).
destination_tier: Override destination storage tier.
queue_retry_count: Number of retry attempts for queued items.
queue_retry_interval: Retry interval in seconds.
has_retry_multiplier: Whether retry interval multiplier is enabled.
last_run_at: When the sync last ran.
remote_min_snapshots: Minimum snapshots to retain on remote.
note: Optional note.
"""
@property
def site_key(self) -> int | None:
"""Get the associated site key."""
val = self.get("site")
return int(val) if val is not None else None
@property
def name(self) -> str:
"""Get sync name."""
return str(self.get("name", ""))
@property
def description(self) -> str:
"""Get sync description."""
return str(self.get("description", ""))
@property
def is_enabled(self) -> bool:
"""Check if sync is enabled."""
return bool(self.get("enabled", False))
@property
def status(self) -> str:
"""Get sync status (offline, initializing, syncing, error)."""
return str(self.get("status", "offline"))
@property
def status_info(self) -> str:
"""Get additional status information."""
return str(self.get("status_info", ""))
@property
def state(self) -> str:
"""Get sync state (offline, online, warning, error)."""
return str(self.get("state", "offline"))
@property
def is_syncing(self) -> bool:
"""Check if sync is currently syncing."""
return self.status == "syncing"
@property
def is_online(self) -> bool:
"""Check if sync is online."""
return self.state == "online"
@property
def has_error(self) -> bool:
"""Check if sync has an error."""
return self.state == "error" or self.status == "error"
@property
def url(self) -> str:
"""Get remote URL."""
return str(self.get("url", ""))
@property
def has_encryption(self) -> bool:
"""Check if encryption is enabled."""
return bool(self.get("encryption", True))
@property
def has_compression(self) -> bool:
"""Check if compression is enabled."""
return bool(self.get("compression", True))
@property
def has_network_integrity(self) -> bool:
"""Check if network integrity checking is enabled."""
return bool(self.get("netinteg", True))
@property
def data_threads(self) -> int:
"""Get number of data threads."""
return int(self.get("threads", 8))
@property
def file_threads(self) -> int:
"""Get number of file threads."""
return int(self.get("file_threads", 4))
@property
def send_throttle(self) -> int:
"""Get send throttle in bytes/sec (0 = disabled)."""
return int(self.get("sendthrottle", 0))
@property
def destination_tier(self) -> str:
"""Get override destination storage tier."""
return str(self.get("destination_tier", "unspecified"))
@property
def queue_retry_count(self) -> int:
"""Get number of retry attempts for queued items."""
return int(self.get("queue_retry_count", 10))
@property
def queue_retry_interval(self) -> int:
"""Get retry interval in seconds."""
return int(self.get("queue_retry_interval_seconds", 60))
@property
def has_retry_multiplier(self) -> bool:
"""Check if retry interval multiplier is enabled."""
return bool(self.get("queue_retry_interval_multiplier", True))
@property
def last_run_at(self) -> datetime | None:
"""Get when the sync last ran."""
ts = self.get("last_run")
if ts and int(ts) > 0:
return datetime.fromtimestamp(int(ts), tz=timezone.utc)
return None
@property
def remote_min_snapshots(self) -> int:
"""Get minimum snapshots to retain on remote."""
return int(self.get("remote_min_snapshots", 0))
@property
def note(self) -> str:
"""Get optional note."""
return str(self.get("note", ""))
[docs]
def enable(self) -> SiteSyncOutgoing:
"""Enable this sync.
Returns:
Updated SiteSyncOutgoing object.
"""
from typing import cast
manager = cast("SiteSyncOutgoingManager", self._manager)
return manager.enable(self.key)
[docs]
def disable(self) -> SiteSyncOutgoing:
"""Disable this sync.
Returns:
Updated SiteSyncOutgoing object.
"""
from typing import cast
manager = cast("SiteSyncOutgoingManager", self._manager)
return manager.disable(self.key)
[docs]
def start(self) -> SiteSyncOutgoing:
"""Start (enable) this sync.
Alias for enable().
Returns:
Updated SiteSyncOutgoing object.
"""
return self.enable()
[docs]
def stop(self) -> SiteSyncOutgoing:
"""Stop (disable) this sync.
Alias for disable().
Returns:
Updated SiteSyncOutgoing object.
"""
return self.disable()
[docs]
def refresh(self) -> SiteSyncOutgoing:
"""Refresh sync data from server.
Returns:
Updated SiteSyncOutgoing object.
"""
from typing import cast
manager = cast("SiteSyncOutgoingManager", self._manager)
return manager.get(self.key)
[docs]
def add_to_queue(
self,
snapshot_key: int,
retention: int | timedelta = 259200,
priority: int = 0,
do_not_expire: bool = False,
destination_prefix: str | None = None,
) -> None:
"""Add a cloud snapshot to the transfer queue.
Args:
snapshot_key: Key of the cloud snapshot to queue.
retention: Retention period in seconds or timedelta. Default 3 days.
priority: Priority for syncing (lower = first). Default 0.
do_not_expire: If True, snapshot won't expire until synced.
destination_prefix: Prefix for snapshot name on destination.
"""
from typing import cast
manager = cast("SiteSyncOutgoingManager", self._manager)
manager.add_to_queue(
self.key,
snapshot_key,
retention=retention,
priority=priority,
do_not_expire=do_not_expire,
destination_prefix=destination_prefix,
)
@property
def queue(self) -> SiteSyncQueueManager:
"""Access the sync queue for this outgoing sync.
Returns:
SiteSyncQueueManager scoped to this sync.
Example:
>>> sync = client.site_syncs.get(1)
>>> items = sync.queue.list()
>>> syncing = sync.queue.list_syncing()
"""
from typing import cast
manager = cast("SiteSyncOutgoingManager", self._manager)
return SiteSyncQueueManager(manager._client, self.key)
@property
def remote_snapshots(self) -> SiteSyncRemoteSnapManager:
"""Access remote snapshots for this outgoing sync.
Returns:
SiteSyncRemoteSnapManager scoped to this sync.
Example:
>>> sync = client.site_syncs.get(1)
>>> snaps = sync.remote_snapshots.list()
>>> snap = snaps[0]
>>> snap.request_sync_back()
"""
from typing import cast
manager = cast("SiteSyncOutgoingManager", self._manager)
return SiteSyncRemoteSnapManager(manager._client, self.key)
@property
def stats(self) -> SiteSyncStatsManager:
"""Access stats for this outgoing sync.
Returns:
SiteSyncStatsManager scoped to this sync.
Example:
>>> sync = client.site_syncs.get(1)
>>> stats = sync.stats.get()
>>> print(f"Sent: {stats.sent_display}")
"""
from typing import cast
manager = cast("SiteSyncOutgoingManager", self._manager)
return SiteSyncStatsManager(manager._client, self.key)
@property
def logs(self) -> SiteSyncOutgoingLogManager:
"""Access logs for this outgoing sync.
Returns:
SiteSyncOutgoingLogManager scoped to this sync.
Example:
>>> sync = client.site_syncs.get(1)
>>> logs = sync.logs.list(limit=20)
>>> errors = sync.logs.list_errors()
"""
from typing import cast
manager = cast("SiteSyncOutgoingManager", self._manager)
return SiteSyncOutgoingLogManager(manager._client, self.key)
def __repr__(self) -> str:
key = self.get("$key", "?")
name = self.name
status = self.status
return f"<SiteSyncOutgoing key={key} name={name!r} status={status!r}>"
[docs]
class SiteSyncIncoming(ResourceObject):
"""Incoming site sync resource object.
Incoming syncs receive cloud snapshots from a remote site
for disaster recovery purposes.
Properties:
site_key: Key of the associated site.
name: Sync name.
description: Sync description.
is_enabled: Whether the sync is enabled.
status: Sync status (offline, syncing, error, etc.).
status_info: Additional status information.
state: Sync state (offline, online, warning, error).
sync_id: Unique sync identifier.
registration_code: Code used by remote site to establish connection.
public_ip: Public IP/domain of connecting system.
force_tier: Force all synced data to this tier.
min_snapshots: Minimum snapshots to retain.
last_sync_at: When last sync occurred.
vsan_host: vSAN connection host.
vsan_port: vSAN connection port.
request_url: URL remote system uses to connect back.
is_system_created: Whether sync was created by system.
"""
@property
def site_key(self) -> int | None:
"""Get the associated site key."""
val = self.get("site")
return int(val) if val is not None else None
@property
def name(self) -> str:
"""Get sync name."""
return str(self.get("name", ""))
@property
def description(self) -> str:
"""Get sync description."""
return str(self.get("description", ""))
@property
def is_enabled(self) -> bool:
"""Check if sync is enabled."""
return bool(self.get("enabled", False))
@property
def status(self) -> str:
"""Get sync status."""
return str(self.get("status", "offline"))
@property
def status_info(self) -> str:
"""Get additional status information."""
return str(self.get("status_info", ""))
@property
def state(self) -> str:
"""Get sync state (offline, online, warning, error)."""
return str(self.get("state", "offline"))
@property
def is_syncing(self) -> bool:
"""Check if sync is currently syncing."""
return self.status == "syncing"
@property
def is_online(self) -> bool:
"""Check if sync is online."""
return self.state == "online"
@property
def has_error(self) -> bool:
"""Check if sync has an error."""
return self.state == "error" or self.status == "error"
@property
def sync_id(self) -> str:
"""Get unique sync identifier."""
return str(self.get("sync_id", ""))
@property
def registration_code(self) -> str:
"""Get registration code for remote site connection."""
return str(self.get("registration_code", ""))
@property
def public_ip(self) -> str:
"""Get public IP/domain of connecting system."""
return str(self.get("public_ip", ""))
@property
def force_tier(self) -> str:
"""Get forced storage tier (unspecified, 1-5)."""
return str(self.get("force_tier", "unspecified"))
@property
def min_snapshots(self) -> int:
"""Get minimum snapshots to retain."""
return int(self.get("min_snapshots", 1))
@property
def last_sync_at(self) -> datetime | None:
"""Get when last sync occurred."""
ts = self.get("last_sync")
if ts and int(ts) > 0:
return datetime.fromtimestamp(int(ts), tz=timezone.utc)
return None
@property
def vsan_host(self) -> str:
"""Get vSAN connection host."""
return str(self.get("vsan_host", ""))
@property
def vsan_port(self) -> int:
"""Get vSAN connection port."""
return int(self.get("vsan_port", 14201))
@property
def request_url(self) -> str:
"""Get URL remote system uses to connect back."""
return str(self.get("request_url", ""))
@property
def is_system_created(self) -> bool:
"""Check if sync was created by system."""
return bool(self.get("system_created", False))
[docs]
def enable(self) -> SiteSyncIncoming:
"""Enable this sync.
Returns:
Updated SiteSyncIncoming object.
"""
from typing import cast
manager = cast("SiteSyncIncomingManager", self._manager)
return manager.enable(self.key)
[docs]
def disable(self) -> SiteSyncIncoming:
"""Disable this sync.
Returns:
Updated SiteSyncIncoming object.
"""
from typing import cast
manager = cast("SiteSyncIncomingManager", self._manager)
return manager.disable(self.key)
[docs]
def refresh(self) -> SiteSyncIncoming:
"""Refresh sync data from server.
Returns:
Updated SiteSyncIncoming object.
"""
from typing import cast
manager = cast("SiteSyncIncomingManager", self._manager)
return manager.get(self.key)
@property
def verified(self) -> SiteSyncIncomingVerifiedManager:
"""Access verified sync info for this incoming sync.
Returns:
SiteSyncIncomingVerifiedManager scoped to this sync.
Example:
>>> incoming = client.site_syncs_incoming.get(1)
>>> verified = incoming.verified.get()
>>> snaps = verified.list_remote_snapshots()
"""
from typing import cast
manager = cast("SiteSyncIncomingManager", self._manager)
return SiteSyncIncomingVerifiedManager(manager._client, self.key)
@property
def logs(self) -> SiteSyncIncomingLogManager:
"""Access logs for this incoming sync.
Returns:
SiteSyncIncomingLogManager scoped to this sync.
Example:
>>> incoming = client.site_syncs_incoming.get(1)
>>> logs = incoming.logs.list(limit=20)
>>> errors = incoming.logs.list_errors()
"""
from typing import cast
manager = cast("SiteSyncIncomingManager", self._manager)
return SiteSyncIncomingLogManager(manager._client, self.key)
def __repr__(self) -> str:
key = self.get("$key", "?")
name = self.name
status = self.status
return f"<SiteSyncIncoming key={key} name={name!r} status={status!r}>"
[docs]
class SiteSyncSchedule(ResourceObject):
"""Site sync schedule resource object.
Schedules link snapshot profile periods to outgoing site syncs,
enabling automatic syncing of snapshots taken by those periods.
Properties:
sync_key: Key of the associated outgoing sync.
profile_period_key: Key of the snapshot profile period.
retention: Retention period in seconds.
retention_timedelta: Retention period as timedelta.
priority: Sync priority (lower = first).
do_not_expire: Whether source snapshot should not expire until synced.
destination_prefix: Prefix for snapshot name on destination.
"""
@property
def sync_key(self) -> int | None:
"""Get the associated outgoing sync key."""
val = self.get("site_syncs_outgoing")
return int(val) if val is not None else None
@property
def profile_period_key(self) -> int | None:
"""Get the associated snapshot profile period key."""
val = self.get("profile_period")
return int(val) if val is not None else None
@property
def retention(self) -> int:
"""Get retention period in seconds."""
return int(self.get("retention", 0))
@property
def retention_timedelta(self) -> timedelta:
"""Get retention period as timedelta."""
return timedelta(seconds=self.retention)
@property
def priority(self) -> int:
"""Get sync priority (lower = first)."""
return int(self.get("priority", 0))
@property
def do_not_expire(self) -> bool:
"""Check if source snapshot should not expire until synced."""
return bool(self.get("do_not_expire", False))
@property
def destination_prefix(self) -> str:
"""Get prefix for snapshot name on destination."""
return str(self.get("destination_prefix", "remote"))
[docs]
def delete(self) -> None:
"""Delete this schedule."""
from typing import cast
manager = cast("SiteSyncScheduleManager", self._manager)
manager.delete(self.key)
def __repr__(self) -> str:
key = self.get("$key", "?")
sync_key = self.sync_key
period_key = self.profile_period_key
return f"<SiteSyncSchedule key={key} sync={sync_key} period={period_key}>"
[docs]
class SiteSyncOutgoingManager(ResourceManager[SiteSyncOutgoing]):
"""Manager for outgoing site sync operations.
Outgoing syncs send cloud snapshots to remote sites for disaster recovery.
Example:
>>> # List all outgoing syncs
>>> syncs = client.site_syncs.list()
>>> # Get syncs for a specific site
>>> syncs = client.site_syncs.list(site_key=1)
>>> # Get a sync by name
>>> sync = client.site_syncs.get(name="DR-Sync")
>>> # Enable/disable a sync
>>> sync = client.site_syncs.enable(sync.key)
>>> sync = client.site_syncs.disable(sync.key)
>>> # Add snapshot to queue
>>> client.site_syncs.add_to_queue(
... sync_key=1,
... snapshot_key=5,
... retention=604800, # 7 days
... )
"""
_endpoint = "site_syncs_outgoing"
[docs]
def __init__(self, client: VergeClient) -> None:
super().__init__(client)
def _to_model(self, data: dict[str, Any]) -> SiteSyncOutgoing:
return SiteSyncOutgoing(data, self)
[docs]
def list(
self,
filter: str | None = None,
fields: builtins.list[str] | None = None,
limit: int | None = None,
offset: int | None = None,
*,
site_key: int | None = None,
site_name: str | None = None,
enabled: bool | None = None,
**filter_kwargs: Any,
) -> builtins.list[SiteSyncOutgoing]:
"""List outgoing site syncs.
Args:
filter: OData filter string.
fields: List of fields to return.
limit: Maximum number of results.
offset: Skip this many results.
site_key: Filter by site key.
site_name: Filter by site name (looks up site key).
enabled: Filter by enabled status.
**filter_kwargs: Additional filter arguments.
Returns:
List of SiteSyncOutgoing objects sorted by name.
Example:
>>> # All outgoing syncs
>>> syncs = client.site_syncs.list()
>>> # Syncs for a specific site
>>> syncs = client.site_syncs.list(site_key=1)
>>> # Enabled syncs only
>>> syncs = client.site_syncs.list(enabled=True)
"""
# Resolve site name to key if provided
if site_name and not site_key:
site = self._client.sites.get(name=site_name)
site_key = site.key
conditions: builtins.list[str] = []
if site_key is not None:
conditions.append(f"site eq {site_key}")
if enabled is not None:
conditions.append(f"enabled eq {str(enabled).lower()}")
if filter:
conditions.append(f"({filter})")
if filter_kwargs:
conditions.append(build_filter(**filter_kwargs))
combined_filter = " and ".join(conditions) if conditions else None
if fields is None:
fields = _DEFAULT_OUTGOING_SYNC_FIELDS
params: dict[str, Any] = {}
if combined_filter:
params["filter"] = combined_filter
if fields:
params["fields"] = ",".join(fields)
if limit is not None:
params["limit"] = limit
if offset is not None:
params["offset"] = offset
params["sort"] = "+name"
response = self._client._request("GET", self._endpoint, params=params)
if response is None:
return []
if not isinstance(response, list):
return [self._to_model(response)]
return [self._to_model(item) for item in response]
[docs]
def list_enabled(
self,
fields: builtins.list[str] | None = None,
) -> builtins.list[SiteSyncOutgoing]:
"""List enabled outgoing syncs.
Args:
fields: List of fields to return.
Returns:
List of enabled SiteSyncOutgoing objects.
"""
return self.list(enabled=True, fields=fields)
[docs]
def list_disabled(
self,
fields: builtins.list[str] | None = None,
) -> builtins.list[SiteSyncOutgoing]:
"""List disabled outgoing syncs.
Args:
fields: List of fields to return.
Returns:
List of disabled SiteSyncOutgoing objects.
"""
return self.list(enabled=False, fields=fields)
[docs]
def list_for_site(
self,
site_key: int | None = None,
site_name: str | None = None,
fields: builtins.list[str] | None = None,
) -> builtins.list[SiteSyncOutgoing]:
"""List outgoing syncs for a specific site.
Args:
site_key: Site key.
site_name: Site name.
fields: List of fields to return.
Returns:
List of SiteSyncOutgoing objects for the site.
"""
return self.list(site_key=site_key, site_name=site_name, fields=fields)
[docs]
def get(
self,
key: int | None = None,
*,
name: str | None = None,
site_key: int | None = None,
site_name: str | None = None,
fields: builtins.list[str] | None = None,
) -> SiteSyncOutgoing:
"""Get an outgoing sync by key or name.
Args:
key: Sync $key (ID).
name: Sync name (requires site_key or site_name for uniqueness).
site_key: Site key (for name lookup).
site_name: Site name (for name lookup).
fields: List of fields to return.
Returns:
SiteSyncOutgoing object.
Raises:
NotFoundError: If sync not found.
ValueError: If neither key nor name provided.
"""
if fields is None:
fields = _DEFAULT_OUTGOING_SYNC_FIELDS
if key is not None:
params: dict[str, Any] = {}
if fields:
params["fields"] = ",".join(fields)
response = self._client._request("GET", f"{self._endpoint}/{key}", params=params)
if response is None:
raise NotFoundError(f"Outgoing sync {key} not found")
if not isinstance(response, dict):
raise NotFoundError(f"Outgoing sync {key} returned invalid response")
return self._to_model(response)
if name is not None:
# Resolve site name to key if provided
if site_name and not site_key:
site = self._client.sites.get(name=site_name)
site_key = site.key
conditions = [f"name eq '{name.replace(chr(39), chr(39) + chr(39))}'"]
if site_key is not None:
conditions.append(f"site eq {site_key}")
results = self.list(filter=" and ".join(conditions), fields=fields)
if not results:
raise NotFoundError(f"Outgoing sync '{name}' not found")
return results[0]
raise ValueError("Either key or name must be provided")
[docs]
def enable(self, key: int) -> SiteSyncOutgoing:
"""Enable an outgoing sync.
Args:
key: Sync $key (ID).
Returns:
Updated SiteSyncOutgoing object.
"""
body: dict[str, Any] = {
"site_syncs_outgoing": key,
"action": "enable",
}
self._client._request("POST", "site_syncs_outgoing_actions", json_data=body)
return self.get(key)
[docs]
def disable(self, key: int) -> SiteSyncOutgoing:
"""Disable an outgoing sync.
Args:
key: Sync $key (ID).
Returns:
Updated SiteSyncOutgoing object.
"""
body: dict[str, Any] = {
"site_syncs_outgoing": key,
"action": "disable",
}
self._client._request("POST", "site_syncs_outgoing_actions", json_data=body)
return self.get(key)
[docs]
def start(self, key: int) -> SiteSyncOutgoing:
"""Start (enable) an outgoing sync.
Alias for enable().
Args:
key: Sync $key (ID).
Returns:
Updated SiteSyncOutgoing object.
"""
return self.enable(key)
[docs]
def stop(self, key: int) -> SiteSyncOutgoing:
"""Stop (disable) an outgoing sync.
Alias for disable().
Args:
key: Sync $key (ID).
Returns:
Updated SiteSyncOutgoing object.
"""
return self.disable(key)
[docs]
def add_to_queue(
self,
sync_key: int,
snapshot_key: int,
retention: int | timedelta = 259200,
priority: int = 0,
do_not_expire: bool = False,
destination_prefix: str | None = None,
) -> None:
"""Add a cloud snapshot to the transfer queue.
Args:
sync_key: Key of the outgoing sync.
snapshot_key: Key of the cloud snapshot to queue.
retention: Retention period in seconds or timedelta. Default 3 days.
priority: Priority for syncing (lower = first). Default 0.
do_not_expire: If True, snapshot won't expire until synced.
destination_prefix: Prefix for snapshot name on destination.
Raises:
ValidationError: If invalid parameters.
"""
# Convert retention to seconds if timedelta
if isinstance(retention, timedelta):
retention_seconds = int(retention.total_seconds())
else:
retention_seconds = int(retention)
if retention_seconds < 0:
raise ValidationError("Retention must be positive")
params: dict[str, Any] = {
"cloud_snapshot": snapshot_key,
"retention": retention_seconds,
"priority": priority,
"do_not_expire": do_not_expire,
}
if destination_prefix is not None:
params["destination_prefix"] = destination_prefix
body: dict[str, Any] = {
"site_syncs_outgoing": sync_key,
"action": "add_to_queue",
"params": params,
}
self._client._request("POST", "site_syncs_outgoing_actions", json_data=body)
[docs]
def invoke(
self,
sync_key: int,
snapshot_key: int,
retention: int | timedelta = 259200,
priority: int = 0,
do_not_expire: bool = False,
destination_prefix: str | None = None,
) -> None:
"""Manually trigger a sync for a cloud snapshot.
Alias for add_to_queue().
Args:
sync_key: Key of the outgoing sync.
snapshot_key: Key of the cloud snapshot to queue.
retention: Retention period in seconds or timedelta. Default 3 days.
priority: Priority for syncing (lower = first). Default 0.
do_not_expire: If True, snapshot won't expire until synced.
destination_prefix: Prefix for snapshot name on destination.
"""
self.add_to_queue(
sync_key,
snapshot_key,
retention=retention,
priority=priority,
do_not_expire=do_not_expire,
destination_prefix=destination_prefix,
)
[docs]
def set_throttle(self, key: int, throttle: int) -> SiteSyncOutgoing:
"""Set send throttle for an outgoing sync.
Args:
key: Sync $key (ID).
throttle: Throttle in bytes/sec (0 = disable).
Returns:
Updated SiteSyncOutgoing object.
"""
body: dict[str, Any] = {
"site_syncs_outgoing": key,
"action": "throttle_sync",
"params": {"throttle": throttle},
}
self._client._request("POST", "site_syncs_outgoing_actions", json_data=body)
return self.get(key)
[docs]
def disable_throttle(self, key: int) -> SiteSyncOutgoing:
"""Disable send throttle for an outgoing sync.
Args:
key: Sync $key (ID).
Returns:
Updated SiteSyncOutgoing object.
"""
return self.set_throttle(key, 0)
[docs]
def refresh_remote_snapshots(self, key: int) -> SiteSyncOutgoing:
"""Refresh the list of snapshots on the destination.
Args:
key: Sync $key (ID).
Returns:
Updated SiteSyncOutgoing object.
"""
body: dict[str, Any] = {
"site_syncs_outgoing": key,
"action": "refresh",
}
self._client._request("POST", "site_syncs_outgoing_actions", json_data=body)
return self.get(key)
[docs]
class SiteSyncIncomingManager(ResourceManager[SiteSyncIncoming]):
"""Manager for incoming site sync operations.
Incoming syncs receive cloud snapshots from remote sites for disaster recovery.
Example:
>>> # List all incoming syncs
>>> syncs = client.site_syncs_incoming.list()
>>> # Get syncs for a specific site
>>> syncs = client.site_syncs_incoming.list(site_key=1)
>>> # Get a sync by name
>>> sync = client.site_syncs_incoming.get(name="DR-Sync")
>>> # Enable/disable a sync
>>> sync = client.site_syncs_incoming.enable(sync.key)
>>> sync = client.site_syncs_incoming.disable(sync.key)
"""
_endpoint = "site_syncs_incoming"
[docs]
def __init__(self, client: VergeClient) -> None:
super().__init__(client)
def _to_model(self, data: dict[str, Any]) -> SiteSyncIncoming:
return SiteSyncIncoming(data, self)
[docs]
def list(
self,
filter: str | None = None,
fields: builtins.list[str] | None = None,
limit: int | None = None,
offset: int | None = None,
*,
site_key: int | None = None,
site_name: str | None = None,
enabled: bool | None = None,
**filter_kwargs: Any,
) -> builtins.list[SiteSyncIncoming]:
"""List incoming site syncs.
Args:
filter: OData filter string.
fields: List of fields to return.
limit: Maximum number of results.
offset: Skip this many results.
site_key: Filter by site key.
site_name: Filter by site name (looks up site key).
enabled: Filter by enabled status.
**filter_kwargs: Additional filter arguments.
Returns:
List of SiteSyncIncoming objects sorted by name.
Example:
>>> # All incoming syncs
>>> syncs = client.site_syncs_incoming.list()
>>> # Syncs for a specific site
>>> syncs = client.site_syncs_incoming.list(site_key=1)
>>> # Enabled syncs only
>>> syncs = client.site_syncs_incoming.list(enabled=True)
"""
# Resolve site name to key if provided
if site_name and not site_key:
site = self._client.sites.get(name=site_name)
site_key = site.key
conditions: builtins.list[str] = []
if site_key is not None:
conditions.append(f"site eq {site_key}")
if enabled is not None:
conditions.append(f"enabled eq {str(enabled).lower()}")
if filter:
conditions.append(f"({filter})")
if filter_kwargs:
conditions.append(build_filter(**filter_kwargs))
combined_filter = " and ".join(conditions) if conditions else None
if fields is None:
fields = _DEFAULT_INCOMING_SYNC_FIELDS
params: dict[str, Any] = {}
if combined_filter:
params["filter"] = combined_filter
if fields:
params["fields"] = ",".join(fields)
if limit is not None:
params["limit"] = limit
if offset is not None:
params["offset"] = offset
params["sort"] = "+name"
response = self._client._request("GET", self._endpoint, params=params)
if response is None:
return []
if not isinstance(response, list):
return [self._to_model(response)]
return [self._to_model(item) for item in response]
[docs]
def list_for_site(
self,
site_key: int | None = None,
site_name: str | None = None,
fields: builtins.list[str] | None = None,
) -> builtins.list[SiteSyncIncoming]:
"""List incoming syncs for a specific site.
Args:
site_key: Site key.
site_name: Site name.
fields: List of fields to return.
Returns:
List of SiteSyncIncoming objects for the site.
"""
return self.list(site_key=site_key, site_name=site_name, fields=fields)
[docs]
def get(
self,
key: int | None = None,
*,
name: str | None = None,
site_key: int | None = None,
site_name: str | None = None,
fields: builtins.list[str] | None = None,
) -> SiteSyncIncoming:
"""Get an incoming sync by key or name.
Args:
key: Sync $key (ID).
name: Sync name (requires site_key or site_name for uniqueness).
site_key: Site key (for name lookup).
site_name: Site name (for name lookup).
fields: List of fields to return.
Returns:
SiteSyncIncoming object.
Raises:
NotFoundError: If sync not found.
ValueError: If neither key nor name provided.
"""
if fields is None:
fields = _DEFAULT_INCOMING_SYNC_FIELDS
if key is not None:
params: dict[str, Any] = {}
if fields:
params["fields"] = ",".join(fields)
response = self._client._request("GET", f"{self._endpoint}/{key}", params=params)
if response is None:
raise NotFoundError(f"Incoming sync {key} not found")
if not isinstance(response, dict):
raise NotFoundError(f"Incoming sync {key} returned invalid response")
return self._to_model(response)
if name is not None:
# Resolve site name to key if provided
if site_name and not site_key:
site = self._client.sites.get(name=site_name)
site_key = site.key
conditions = [f"name eq '{name.replace(chr(39), chr(39) + chr(39))}'"]
if site_key is not None:
conditions.append(f"site eq {site_key}")
results = self.list(filter=" and ".join(conditions), fields=fields)
if not results:
raise NotFoundError(f"Incoming sync '{name}' not found")
return results[0]
raise ValueError("Either key or name must be provided")
[docs]
def enable(self, key: int) -> SiteSyncIncoming:
"""Enable an incoming sync.
Args:
key: Sync $key (ID).
Returns:
Updated SiteSyncIncoming object.
"""
body: dict[str, Any] = {
"site_syncs_incoming": key,
"action": "enable",
}
self._client._request("POST", "site_syncs_incoming_actions", json_data=body)
return self.get(key)
[docs]
def disable(self, key: int) -> SiteSyncIncoming:
"""Disable an incoming sync.
Args:
key: Sync $key (ID).
Returns:
Updated SiteSyncIncoming object.
"""
body: dict[str, Any] = {
"site_syncs_incoming": key,
"action": "disable",
}
self._client._request("POST", "site_syncs_incoming_actions", json_data=body)
return self.get(key)
[docs]
class SiteSyncScheduleManager(ResourceManager[SiteSyncSchedule]):
"""Manager for site sync schedule operations.
Schedules link snapshot profile periods to outgoing syncs, enabling
automatic syncing of snapshots taken by those periods.
Example:
>>> # List all schedules
>>> schedules = client.site_sync_schedules.list()
>>> # Get schedules for a specific sync
>>> schedules = client.site_sync_schedules.list(sync_key=1)
>>> # Create a new schedule
>>> schedule = client.site_sync_schedules.create(
... sync_key=1,
... profile_period_key=2,
... retention=604800, # 7 days
... )
>>> # Delete a schedule
>>> client.site_sync_schedules.delete(schedule.key)
"""
_endpoint = "site_syncs_outgoing_profile_periods"
[docs]
def __init__(self, client: VergeClient) -> None:
super().__init__(client)
def _to_model(self, data: dict[str, Any]) -> SiteSyncSchedule:
return SiteSyncSchedule(data, self)
[docs]
def list(
self,
filter: str | None = None,
fields: builtins.list[str] | None = None,
limit: int | None = None,
offset: int | None = None,
*,
sync_key: int | None = None,
sync_name: str | None = None,
**filter_kwargs: Any,
) -> builtins.list[SiteSyncSchedule]:
"""List site sync schedules.
Args:
filter: OData filter string.
fields: List of fields to return.
limit: Maximum number of results.
offset: Skip this many results.
sync_key: Filter by outgoing sync key.
sync_name: Filter by outgoing sync name.
**filter_kwargs: Additional filter arguments.
Returns:
List of SiteSyncSchedule objects sorted by priority.
Example:
>>> # All schedules
>>> schedules = client.site_sync_schedules.list()
>>> # Schedules for a specific sync
>>> schedules = client.site_sync_schedules.list(sync_key=1)
"""
# Resolve sync name to key if provided
if sync_name and not sync_key:
sync = self._client.site_syncs.get(name=sync_name)
sync_key = sync.key
conditions: builtins.list[str] = []
if sync_key is not None:
conditions.append(f"site_syncs_outgoing eq {sync_key}")
if filter:
conditions.append(f"({filter})")
if filter_kwargs:
conditions.append(build_filter(**filter_kwargs))
combined_filter = " and ".join(conditions) if conditions else None
if fields is None:
fields = _DEFAULT_SCHEDULE_FIELDS
params: dict[str, Any] = {}
if combined_filter:
params["filter"] = combined_filter
if fields:
params["fields"] = ",".join(fields)
if limit is not None:
params["limit"] = limit
if offset is not None:
params["offset"] = offset
params["sort"] = "+priority"
response = self._client._request("GET", self._endpoint, params=params)
if response is None:
return []
if not isinstance(response, list):
return [self._to_model(response)]
return [self._to_model(item) for item in response]
[docs]
def list_for_sync(
self,
sync_key: int | None = None,
sync_name: str | None = None,
fields: builtins.list[str] | None = None,
) -> builtins.list[SiteSyncSchedule]:
"""List schedules for a specific outgoing sync.
Args:
sync_key: Outgoing sync key.
sync_name: Outgoing sync name.
fields: List of fields to return.
Returns:
List of SiteSyncSchedule objects for the sync.
"""
return self.list(sync_key=sync_key, sync_name=sync_name, fields=fields)
[docs]
def get( # type: ignore[override]
self,
key: int,
*,
fields: builtins.list[str] | None = None,
) -> SiteSyncSchedule:
"""Get a schedule by key.
Args:
key: Schedule $key (ID).
fields: List of fields to return.
Returns:
SiteSyncSchedule object.
Raises:
NotFoundError: If schedule not found.
"""
if fields is None:
fields = _DEFAULT_SCHEDULE_FIELDS
params: dict[str, Any] = {}
if fields:
params["fields"] = ",".join(fields)
response = self._client._request("GET", f"{self._endpoint}/{key}", params=params)
if response is None:
raise NotFoundError(f"Site sync schedule {key} not found")
if not isinstance(response, dict):
raise NotFoundError(f"Site sync schedule {key} returned invalid response")
return self._to_model(response)
[docs]
def create( # type: ignore[override]
self,
sync_key: int,
profile_period_key: int,
retention: int | timedelta,
*,
priority: int | None = None,
do_not_expire: bool = False,
destination_prefix: str = "remote",
) -> SiteSyncSchedule:
"""Create a new site sync schedule.
Links a snapshot profile period to an outgoing sync so that snapshots
taken by that period are automatically queued for sync.
Args:
sync_key: Key of the outgoing sync.
profile_period_key: Key of the snapshot profile period.
retention: Retention period in seconds or timedelta.
priority: Sync priority (lower = first). Auto-assigned if not specified.
do_not_expire: If True, source snapshot won't expire until synced.
destination_prefix: Prefix for snapshot name on destination.
Returns:
Created SiteSyncSchedule object.
Raises:
ValidationError: If invalid parameters.
Example:
>>> schedule = client.site_sync_schedules.create(
... sync_key=1,
... profile_period_key=2,
... retention=timedelta(days=7),
... destination_prefix="remote",
... )
"""
# Convert retention to seconds if timedelta
if isinstance(retention, timedelta):
retention_seconds = int(retention.total_seconds())
else:
retention_seconds = int(retention)
if retention_seconds < 0:
raise ValidationError("Retention must be positive")
body: dict[str, Any] = {
"site_syncs_outgoing": sync_key,
"profile_period": profile_period_key,
"retention": retention_seconds,
"do_not_expire": do_not_expire,
"destination_prefix": destination_prefix,
}
if priority is not None:
body["priority"] = priority
response = self._client._request("POST", self._endpoint, json_data=body)
if response is None:
raise ValueError("No response from create operation")
if not isinstance(response, dict):
raise ValueError("Create operation returned invalid response")
schedule_key = response.get("$key")
if schedule_key:
return self.get(int(schedule_key))
return self._to_model(response)
[docs]
def delete(self, key: int) -> None:
"""Delete a site sync schedule.
Args:
key: Schedule $key (ID).
Raises:
NotFoundError: If schedule not found.
"""
self._client._request("DELETE", f"{self._endpoint}/{key}")
# ============================================================================
# Site Sync Stats
# ============================================================================
# Default fields for stats
_DEFAULT_STATS_FIELDS = [
"$key",
"parent",
"checked_bytes",
"scanned_bytes",
"sent_bytes",
"sent_net_bytes",
"checked",
"scanned",
"sent",
"sent_net",
"dirs_checked",
"files_checked",
"files_updated",
"last_run_time",
"start_time",
"stop_time",
"error_time",
"last_error",
"sendthrottle",
"retry_count",
"snapshot_name",
"last_retry_attempt",
"timestamp",
]
# Default fields for stats history
_DEFAULT_STATS_HISTORY_FIELDS = [
"$key",
"parent",
"checked_bytes",
"scanned_bytes",
"sent_bytes",
"sent_net_bytes",
"dirs_checked",
"files_checked",
"files_updated",
"last_run_time",
"snapshot_name",
"timestamp",
]
[docs]
class SiteSyncStats(ResourceObject):
"""Site sync statistics resource object.
Provides performance metrics for a site sync operation.
Properties:
parent_key: Key of the parent sync.
checked_bytes: Total bytes checked (file-level, pre-deduplication).
scanned_bytes: Total bytes scanned (block comparison).
sent_bytes: Bytes determined to need sending (pre-compression).
sent_net_bytes: Actual bytes sent on wire (post-compression).
dirs_checked: Number of directories checked.
files_checked: Number of files checked.
files_updated: Number of files updated.
last_run_time_ms: Last run time in milliseconds.
started_at: When sync started.
stopped_at: When sync stopped.
error_at: When error occurred.
last_error: Last error message.
send_throttle: Current throttle rate in bytes/sec.
retry_count: Number of retry attempts.
snapshot_name: Name of snapshot being synced.
last_retry_at: When last retry occurred.
"""
@property
def parent_key(self) -> int | None:
"""Get the parent sync/queue item key."""
val = self.get("parent")
return int(val) if val is not None else None
@property
def checked_bytes(self) -> int:
"""Get total bytes checked (file-level)."""
return int(self.get("checked_bytes", 0))
@property
def scanned_bytes(self) -> int:
"""Get total bytes scanned (block comparison)."""
return int(self.get("scanned_bytes", 0))
@property
def sent_bytes(self) -> int:
"""Get bytes to send (pre-compression)."""
return int(self.get("sent_bytes", 0))
@property
def sent_net_bytes(self) -> int:
"""Get bytes actually sent (post-compression)."""
return int(self.get("sent_net_bytes", 0))
@property
def checked_display(self) -> str:
"""Get human-readable checked size."""
return str(self.get("checked", ""))
@property
def scanned_display(self) -> str:
"""Get human-readable scanned size."""
return str(self.get("scanned", ""))
@property
def sent_display(self) -> str:
"""Get human-readable sent size."""
return str(self.get("sent", ""))
@property
def sent_net_display(self) -> str:
"""Get human-readable net sent size."""
return str(self.get("sent_net", ""))
@property
def dirs_checked(self) -> int:
"""Get number of directories checked."""
return int(self.get("dirs_checked", 0))
@property
def files_checked(self) -> int:
"""Get number of files checked."""
return int(self.get("files_checked", 0))
@property
def files_updated(self) -> int:
"""Get number of files updated."""
return int(self.get("files_updated", 0))
@property
def last_run_time_ms(self) -> int:
"""Get last run time in milliseconds."""
return int(self.get("last_run_time", 0))
@property
def last_run_time_seconds(self) -> float:
"""Get last run time in seconds."""
return self.last_run_time_ms / 1000.0
@property
def started_at(self) -> datetime | None:
"""Get when sync started."""
ts = self.get("start_time")
if ts and int(ts) > 0:
return datetime.fromtimestamp(int(ts), tz=timezone.utc)
return None
@property
def stopped_at(self) -> datetime | None:
"""Get when sync stopped."""
ts = self.get("stop_time")
if ts and int(ts) > 0:
return datetime.fromtimestamp(int(ts), tz=timezone.utc)
return None
@property
def error_at(self) -> datetime | None:
"""Get when error occurred."""
ts = self.get("error_time")
if ts and int(ts) > 0:
return datetime.fromtimestamp(int(ts), tz=timezone.utc)
return None
@property
def last_error(self) -> str:
"""Get last error message."""
return str(self.get("last_error", ""))
@property
def has_error(self) -> bool:
"""Check if there was an error."""
return bool(self.last_error)
@property
def send_throttle(self) -> int:
"""Get current throttle rate in bytes/sec."""
return int(self.get("sendthrottle", 0))
@property
def retry_count(self) -> int:
"""Get number of retry attempts."""
return int(self.get("retry_count", 0))
@property
def snapshot_name(self) -> str:
"""Get name of snapshot being synced."""
return str(self.get("snapshot_name", ""))
@property
def last_retry_at(self) -> datetime | None:
"""Get when last retry occurred."""
ts = self.get("last_retry_attempt")
if ts and int(ts) > 0:
return datetime.fromtimestamp(int(ts), tz=timezone.utc)
return None
@property
def compression_ratio(self) -> float:
"""Calculate compression ratio (sent / sent_net).
Returns ratio > 1 means compression is effective.
Returns 0 if no data sent.
"""
if self.sent_net_bytes == 0:
return 0.0
return self.sent_bytes / self.sent_net_bytes
@property
def dedup_ratio(self) -> float:
"""Calculate deduplication ratio (checked / sent).
Returns ratio > 1 means deduplication is effective.
Returns 0 if no data sent.
"""
if self.sent_bytes == 0:
return 0.0
return self.checked_bytes / self.sent_bytes
def __repr__(self) -> str:
return (
f"<SiteSyncStats checked={self.checked_display!r} "
f"sent={self.sent_display!r} sent_net={self.sent_net_display!r}>"
)
[docs]
class SiteSyncStatsHistory(ResourceObject):
"""Site sync stats history entry.
Long-term historical metrics for sync operations.
"""
@property
def parent_key(self) -> int | None:
"""Get the parent sync key."""
val = self.get("parent")
return int(val) if val is not None else None
@property
def checked_bytes(self) -> int:
"""Get total bytes checked."""
return int(self.get("checked_bytes", 0))
@property
def scanned_bytes(self) -> int:
"""Get total bytes scanned."""
return int(self.get("scanned_bytes", 0))
@property
def sent_bytes(self) -> int:
"""Get bytes to send."""
return int(self.get("sent_bytes", 0))
@property
def sent_net_bytes(self) -> int:
"""Get bytes actually sent."""
return int(self.get("sent_net_bytes", 0))
@property
def dirs_checked(self) -> int:
"""Get directories checked."""
return int(self.get("dirs_checked", 0))
@property
def files_checked(self) -> int:
"""Get files checked."""
return int(self.get("files_checked", 0))
@property
def files_updated(self) -> int:
"""Get files updated."""
return int(self.get("files_updated", 0))
@property
def last_run_time_ms(self) -> int:
"""Get run time in milliseconds."""
return int(self.get("last_run_time", 0))
@property
def snapshot_name(self) -> str:
"""Get snapshot name."""
return str(self.get("snapshot_name", ""))
@property
def timestamp(self) -> datetime | None:
"""Get when record was created."""
ts = self.get("timestamp")
if ts and int(ts) > 0:
return datetime.fromtimestamp(int(ts), tz=timezone.utc)
return None
def __repr__(self) -> str:
return f"<SiteSyncStatsHistory snapshot={self.snapshot_name!r} sent={self.sent_bytes}>"
[docs]
class SiteSyncStatsManager:
"""Manager for site sync stats (scoped to a sync).
Example:
>>> sync = client.site_syncs.get(1)
>>> stats = sync.stats.get()
>>> print(f"Checked: {stats.checked_display}")
>>> # Get historical stats
>>> history = sync.stats.history()
>>> for entry in history:
... print(f"{entry.snapshot_name}: {entry.sent_bytes} bytes")
"""
[docs]
def __init__(self, client: VergeClient, sync_key: int) -> None:
self._client = client
self._sync_key = sync_key
self._endpoint = "site_sync_stats"
self._history_endpoint = "site_sync_stats_history_long"
[docs]
def get(
self,
fields: builtins.list[str] | None = None,
) -> SiteSyncStats | None:
"""Get current stats for this sync.
Args:
fields: List of fields to return.
Returns:
SiteSyncStats object or None if no stats available.
"""
if fields is None:
fields = _DEFAULT_STATS_FIELDS
params: dict[str, Any] = {
"filter": f"parent eq {self._sync_key}",
"sort": "-timestamp",
"limit": 1,
}
if fields:
params["fields"] = ",".join(fields)
response = self._client._request("GET", self._endpoint, params=params)
if response is None:
return None
if isinstance(response, list):
if not response:
return None
return SiteSyncStats(response[0], None) # type: ignore[arg-type]
return SiteSyncStats(response, None) # type: ignore[arg-type]
[docs]
def list( # noqa: A003
self,
limit: int | None = 20,
offset: int | None = None,
fields: builtins.list[str] | None = None,
) -> builtins.list[SiteSyncStats]:
"""List recent stats entries.
Args:
limit: Maximum entries to return.
offset: Skip this many entries.
fields: List of fields to return.
Returns:
List of SiteSyncStats objects, newest first.
"""
if fields is None:
fields = _DEFAULT_STATS_FIELDS
params: dict[str, Any] = {
"filter": f"parent eq {self._sync_key}",
"sort": "-timestamp",
}
if fields:
params["fields"] = ",".join(fields)
if limit is not None:
params["limit"] = limit
if offset is not None:
params["offset"] = offset
response = self._client._request("GET", self._endpoint, params=params)
if response is None:
return []
if not isinstance(response, list):
return [SiteSyncStats(response, None)] # type: ignore[arg-type]
return [SiteSyncStats(item, None) for item in response] # type: ignore[arg-type]
[docs]
def history(
self,
limit: int | None = 100,
offset: int | None = None,
fields: builtins.list[str] | None = None,
) -> builtins.list[SiteSyncStatsHistory]:
"""Get historical stats (long-term).
Args:
limit: Maximum entries to return.
offset: Skip this many entries.
fields: List of fields to return.
Returns:
List of SiteSyncStatsHistory objects, newest first.
"""
if fields is None:
fields = _DEFAULT_STATS_HISTORY_FIELDS
params: dict[str, Any] = {
"filter": f"parent eq {self._sync_key}",
"sort": "-timestamp",
}
if fields:
params["fields"] = ",".join(fields)
if limit is not None:
params["limit"] = limit
if offset is not None:
params["offset"] = offset
response = self._client._request("GET", self._history_endpoint, params=params)
if response is None:
return []
if not isinstance(response, list):
return [SiteSyncStatsHistory(response, None)] # type: ignore[arg-type]
return [SiteSyncStatsHistory(item, None) for item in response] # type: ignore[arg-type]
# ============================================================================
# Site Sync Queue
# ============================================================================
# Default fields for queue items
_DEFAULT_QUEUE_FIELDS = [
"$key",
"site_syncs_outgoing",
"id",
"cloud_snapshot",
"priority",
"status",
"retention",
"remote_expiration",
"stats",
"destination_prefix",
"timestamp",
"do_not_expire",
]
[docs]
class SiteSyncQueueItem(ResourceObject):
"""Site sync queue item resource object.
Represents a cloud snapshot queued for sync to a remote site.
Properties:
sync_key: Key of the parent outgoing sync.
cloud_snapshot_key: Key of the cloud snapshot.
priority: Sync priority (lower = first).
status: Queue status (queue, paused, syncing, complete, error, retry).
retention: Retention period in seconds.
remote_expiration_at: When snapshot will expire on remote.
destination_prefix: Prefix for snapshot name on remote.
created_at: When item was queued.
do_not_expire: Whether snapshot should not expire until synced.
"""
@property
def sync_key(self) -> int | None:
"""Get the parent outgoing sync key."""
val = self.get("site_syncs_outgoing")
return int(val) if val is not None else None
@property
def queue_id(self) -> str:
"""Get the queue item ID."""
return str(self.get("id", ""))
@property
def cloud_snapshot_key(self) -> int | None:
"""Get the cloud snapshot key."""
val = self.get("cloud_snapshot")
return int(val) if val is not None else None
@property
def priority(self) -> int:
"""Get sync priority (lower = first)."""
return int(self.get("priority", 0))
@property
def status(self) -> str:
"""Get queue status.
Values: queue, paused, syncing, complete, error, retry, initializing,
skip_retention.
"""
return str(self.get("status", "queue"))
@property
def is_queued(self) -> bool:
"""Check if item is queued (waiting to sync)."""
return self.status == "queue"
@property
def is_syncing(self) -> bool:
"""Check if item is currently syncing."""
return self.status == "syncing"
@property
def is_complete(self) -> bool:
"""Check if sync completed successfully."""
return self.status == "complete"
@property
def has_error(self) -> bool:
"""Check if item has an error."""
return self.status in ("error", "retry")
@property
def is_paused(self) -> bool:
"""Check if item is paused."""
return self.status == "paused"
@property
def retention(self) -> int:
"""Get retention period in seconds."""
return int(self.get("retention", 259200))
@property
def retention_timedelta(self) -> timedelta:
"""Get retention period as timedelta."""
return timedelta(seconds=self.retention)
@property
def remote_expiration_at(self) -> datetime | None:
"""Get when snapshot expires on remote."""
ts = self.get("remote_expiration")
if ts and int(ts) > 0:
return datetime.fromtimestamp(int(ts), tz=timezone.utc)
return None
@property
def destination_prefix(self) -> str:
"""Get prefix for snapshot name on remote."""
return str(self.get("destination_prefix", ""))
@property
def created_at(self) -> datetime | None:
"""Get when item was queued."""
ts = self.get("timestamp")
if ts and int(ts) > 0:
# timestamp is in microseconds
return datetime.fromtimestamp(int(ts) / 1_000_000, tz=timezone.utc)
return None
@property
def do_not_expire(self) -> bool:
"""Check if snapshot should not expire until synced."""
return bool(self.get("do_not_expire", False))
[docs]
def get_stats(self) -> SiteSyncStats | None:
"""Get stats for this queue item.
Returns:
SiteSyncStats object or None if no stats available.
"""
stats_data = self.get("stats")
if stats_data and isinstance(stats_data, dict):
return SiteSyncStats(stats_data, None) # type: ignore[arg-type]
return None
[docs]
def delete(self) -> None:
"""Remove this item from the queue."""
from typing import cast
manager = cast("SiteSyncQueueManager", self._manager)
manager.delete(self.key)
def __repr__(self) -> str:
key = self.get("$key", "?")
status = self.status
priority = self.priority
return f"<SiteSyncQueueItem key={key} status={status!r} priority={priority}>"
[docs]
class SiteSyncQueueManager:
"""Manager for site sync queue (scoped to an outgoing sync).
Example:
>>> sync = client.site_syncs.get(1)
>>> # List queued items
>>> items = sync.queue.list()
>>> for item in items:
... print(f"{item.key}: {item.status} (priority {item.priority})")
>>> # Get items currently syncing
>>> syncing = sync.queue.list_syncing()
>>> # Get items with errors
>>> errors = sync.queue.list_errors()
"""
[docs]
def __init__(self, client: VergeClient, sync_key: int) -> None:
self._client = client
self._sync_key = sync_key
self._endpoint = "site_syncs_outgoing_queue"
def _to_model(self, data: dict[str, Any]) -> SiteSyncQueueItem:
return SiteSyncQueueItem(data, self) # type: ignore[arg-type]
[docs]
def list( # noqa: A003
self,
filter: str | None = None, # noqa: A002
fields: builtins.list[str] | None = None,
limit: int | None = None,
offset: int | None = None,
*,
status: str | None = None,
**filter_kwargs: Any,
) -> builtins.list[SiteSyncQueueItem]:
"""List queue items for this sync.
Args:
filter: OData filter string.
fields: List of fields to return.
limit: Maximum results to return.
offset: Skip this many results.
status: Filter by status (queue, syncing, complete, error, etc.).
**filter_kwargs: Additional filter arguments.
Returns:
List of SiteSyncQueueItem objects sorted by priority.
"""
conditions: builtins.list[str] = [f"site_syncs_outgoing eq {self._sync_key}"]
if status is not None:
conditions.append(f"status eq '{status}'")
if filter:
conditions.append(f"({filter})")
if filter_kwargs:
conditions.append(build_filter(**filter_kwargs))
combined_filter = " and ".join(conditions)
if fields is None:
fields = _DEFAULT_QUEUE_FIELDS
params: dict[str, Any] = {
"filter": combined_filter,
"sort": "+priority",
}
if fields:
params["fields"] = ",".join(fields)
if limit is not None:
params["limit"] = limit
if offset is not None:
params["offset"] = offset
response = self._client._request("GET", self._endpoint, params=params)
if response is None:
return []
if not isinstance(response, list):
return [self._to_model(response)]
return [self._to_model(item) for item in response]
[docs]
def list_queued(
self,
fields: builtins.list[str] | None = None,
) -> builtins.list[SiteSyncQueueItem]:
"""List items waiting to sync.
Args:
fields: List of fields to return.
Returns:
List of queued SiteSyncQueueItem objects.
"""
return self.list(status="queue", fields=fields)
[docs]
def list_syncing(
self,
fields: builtins.list[str] | None = None,
) -> builtins.list[SiteSyncQueueItem]:
"""List items currently syncing.
Args:
fields: List of fields to return.
Returns:
List of syncing SiteSyncQueueItem objects.
"""
return self.list(status="syncing", fields=fields)
[docs]
def list_complete(
self,
limit: int | None = 20,
fields: builtins.list[str] | None = None,
) -> builtins.list[SiteSyncQueueItem]:
"""List completed items.
Args:
limit: Maximum results (default 20).
fields: List of fields to return.
Returns:
List of completed SiteSyncQueueItem objects.
"""
return self.list(status="complete", limit=limit, fields=fields)
[docs]
def list_errors(
self,
fields: builtins.list[str] | None = None,
) -> builtins.list[SiteSyncQueueItem]:
"""List items with errors.
Args:
fields: List of fields to return.
Returns:
List of error/retry SiteSyncQueueItem objects.
"""
return self.list(filter="status eq 'error' or status eq 'retry'", fields=fields)
[docs]
def get(
self,
key: int,
fields: builtins.list[str] | None = None,
) -> SiteSyncQueueItem:
"""Get a queue item by key.
Args:
key: Queue item $key (ID).
fields: List of fields to return.
Returns:
SiteSyncQueueItem object.
Raises:
NotFoundError: If item not found.
"""
if fields is None:
fields = _DEFAULT_QUEUE_FIELDS
params: dict[str, Any] = {}
if fields:
params["fields"] = ",".join(fields)
response = self._client._request("GET", f"{self._endpoint}/{key}", params=params)
if response is None:
raise NotFoundError(f"Queue item {key} not found")
if not isinstance(response, dict):
raise NotFoundError(f"Queue item {key} returned invalid response")
return self._to_model(response)
[docs]
def delete(self, key: int) -> None:
"""Remove an item from the queue.
Args:
key: Queue item $key (ID).
"""
self._client._request("DELETE", f"{self._endpoint}/{key}")
[docs]
def count(self) -> int:
"""Get total count of items in queue.
Returns:
Number of queue items.
"""
items = self.list(fields=["$key"], limit=1000)
return len(items)
[docs]
def count_pending(self) -> int:
"""Get count of items waiting to sync.
Returns:
Number of queued items.
"""
items = self.list_queued(fields=["$key"])
return len(items)
# ============================================================================
# Site Sync Remote Snapshots
# ============================================================================
# Default fields for remote snapshots
_DEFAULT_REMOTE_SNAP_FIELDS = [
"$key",
"site_syncs_outgoing",
"name",
"status",
"status_info",
"remote_key",
"created",
"description",
"expires",
]
[docs]
class SiteSyncRemoteSnap(ResourceObject):
"""Remote snapshot on destination site.
Represents a cloud snapshot that has been synced to the remote site.
Properties:
sync_key: Key of the parent outgoing sync.
name: Snapshot name on remote.
status: Status (offline, requesting, downloading, error).
status_info: Additional status information.
remote_key: Key of snapshot on remote system.
created_at: When snapshot was created.
description: Snapshot description.
expires_at: When snapshot expires on remote.
"""
@property
def sync_key(self) -> int | None:
"""Get the parent outgoing sync key."""
val = self.get("site_syncs_outgoing")
return int(val) if val is not None else None
@property
def name(self) -> str:
"""Get snapshot name."""
return str(self.get("name", ""))
@property
def status(self) -> str:
"""Get status (offline, request, downloading, error, unsupported)."""
return str(self.get("status", "offline"))
@property
def status_info(self) -> str:
"""Get additional status information."""
return str(self.get("status_info", ""))
@property
def is_downloading(self) -> bool:
"""Check if snapshot is being downloaded (sync back)."""
return self.status == "downloading"
@property
def has_error(self) -> bool:
"""Check if there's an error."""
return self.status == "error"
@property
def remote_key(self) -> int | None:
"""Get the snapshot key on remote system."""
val = self.get("remote_key")
return int(val) if val is not None else None
@property
def created_at(self) -> datetime | None:
"""Get when snapshot was created."""
ts = self.get("created")
if ts and int(ts) > 0:
return datetime.fromtimestamp(int(ts), tz=timezone.utc)
return None
@property
def description(self) -> str:
"""Get snapshot description."""
return str(self.get("description", ""))
@property
def expires_at(self) -> datetime | None:
"""Get when snapshot expires on remote."""
ts = self.get("expires")
if ts and int(ts) > 0:
return datetime.fromtimestamp(int(ts), tz=timezone.utc)
return None
[docs]
def request_sync_back(self) -> None:
"""Request this snapshot be synced back to local system.
This initiates a "sync back" operation to restore this
snapshot from the remote site to the local system.
"""
from typing import cast
manager = cast("SiteSyncRemoteSnapManager", self._manager)
manager.request_sync_back(self.key)
[docs]
def set_retention(self, expires: datetime | int) -> None:
"""Set the retention/expiration for this remote snapshot.
Args:
expires: New expiration as datetime or Unix timestamp.
"""
from typing import cast
manager = cast("SiteSyncRemoteSnapManager", self._manager)
manager.set_retention(self.key, expires)
def __repr__(self) -> str:
key = self.get("$key", "?")
name = self.name
status = self.status
return f"<SiteSyncRemoteSnap key={key} name={name!r} status={status!r}>"
[docs]
class SiteSyncRemoteSnapManager:
"""Manager for remote snapshots (scoped to an outgoing sync).
Example:
>>> sync = client.site_syncs.get(1)
>>> # Refresh list of remote snapshots
>>> sync.refresh_remote_snapshots()
>>> # List remote snapshots
>>> snaps = sync.remote_snapshots.list()
>>> for snap in snaps:
... print(f"{snap.name}: expires {snap.expires_at}")
>>> # Request a snapshot to be synced back
>>> snap = snaps[0]
>>> snap.request_sync_back()
"""
[docs]
def __init__(self, client: VergeClient, sync_key: int) -> None:
self._client = client
self._sync_key = sync_key
self._endpoint = "site_syncs_outgoing_remote_snaps"
self._actions_endpoint = "site_syncs_outgoing_remote_snap_actions"
def _to_model(self, data: dict[str, Any]) -> SiteSyncRemoteSnap:
return SiteSyncRemoteSnap(data, self) # type: ignore[arg-type]
[docs]
def list( # noqa: A003
self,
filter: str | None = None, # noqa: A002
fields: builtins.list[str] | None = None,
limit: int | None = None,
offset: int | None = None,
**filter_kwargs: Any,
) -> builtins.list[SiteSyncRemoteSnap]:
"""List remote snapshots.
Args:
filter: OData filter string.
fields: List of fields to return.
limit: Maximum results to return.
offset: Skip this many results.
**filter_kwargs: Additional filter arguments.
Returns:
List of SiteSyncRemoteSnap objects sorted by expiration.
"""
conditions: builtins.list[str] = [f"site_syncs_outgoing eq {self._sync_key}"]
if filter:
conditions.append(f"({filter})")
if filter_kwargs:
conditions.append(build_filter(**filter_kwargs))
combined_filter = " and ".join(conditions)
if fields is None:
fields = _DEFAULT_REMOTE_SNAP_FIELDS
params: dict[str, Any] = {
"filter": combined_filter,
"sort": "+expires",
}
if fields:
params["fields"] = ",".join(fields)
if limit is not None:
params["limit"] = limit
if offset is not None:
params["offset"] = offset
response = self._client._request("GET", self._endpoint, params=params)
if response is None:
return []
if not isinstance(response, list):
return [self._to_model(response)]
return [self._to_model(item) for item in response]
[docs]
def get(
self,
key: int | None = None,
*,
name: str | None = None,
fields: builtins.list[str] | None = None,
) -> SiteSyncRemoteSnap:
"""Get a remote snapshot by key or name.
Args:
key: Snapshot $key (ID).
name: Snapshot name.
fields: List of fields to return.
Returns:
SiteSyncRemoteSnap object.
Raises:
NotFoundError: If not found.
ValueError: If neither key nor name provided.
"""
if fields is None:
fields = _DEFAULT_REMOTE_SNAP_FIELDS
if key is not None:
params: dict[str, Any] = {}
if fields:
params["fields"] = ",".join(fields)
response = self._client._request("GET", f"{self._endpoint}/{key}", params=params)
if response is None:
raise NotFoundError(f"Remote snapshot {key} not found")
if not isinstance(response, dict):
raise NotFoundError(f"Remote snapshot {key} returned invalid response")
return self._to_model(response)
if name is not None:
escaped_name = name.replace("'", "''")
results = self.list(filter=f"name eq '{escaped_name}'", fields=fields)
if not results:
raise NotFoundError(f"Remote snapshot '{name}' not found")
return results[0]
raise ValueError("Either key or name must be provided")
[docs]
def request_sync_back(self, key: int) -> None:
"""Request a remote snapshot to be synced back.
Args:
key: Remote snapshot $key (ID).
"""
body: dict[str, Any] = {
"site_syncs_outgoing_remote_snap": key,
"action": "request",
}
self._client._request("POST", self._actions_endpoint, json_data=body)
[docs]
def set_retention(self, key: int, expires: datetime | int) -> None:
"""Set retention for a remote snapshot.
Args:
key: Remote snapshot $key (ID).
expires: New expiration as datetime or Unix timestamp.
"""
expires_ts = int(expires.timestamp()) if isinstance(expires, datetime) else int(expires)
body: dict[str, Any] = {
"site_syncs_outgoing_remote_snap": key,
"action": "set_retention",
"params": {"expires": expires_ts},
}
self._client._request("POST", self._actions_endpoint, json_data=body)
[docs]
def count(self) -> int:
"""Get count of remote snapshots.
Returns:
Number of remote snapshots.
"""
snaps = self.list(fields=["$key"], limit=1000)
return len(snaps)
# ============================================================================
# Site Sync Incoming Verified
# ============================================================================
# Default fields for verified syncs
_DEFAULT_VERIFIED_FIELDS = [
"$key",
"site_syncs_incoming",
"name",
"registered",
"registered_on",
]
[docs]
class SiteSyncIncomingVerified(ResourceObject):
"""Verified incoming sync resource object.
Represents a verified/registered incoming sync connection.
Properties:
incoming_sync_key: Key of the parent incoming sync.
name: Name (from parent sync).
is_registered: Whether sync is registered.
registered_at: When sync was registered.
"""
@property
def incoming_sync_key(self) -> int | None:
"""Get the parent incoming sync key."""
val = self.get("site_syncs_incoming")
return int(val) if val is not None else None
@property
def name(self) -> str:
"""Get name (from parent sync)."""
return str(self.get("name", ""))
@property
def is_registered(self) -> bool:
"""Check if sync is registered."""
return bool(self.get("registered", False))
@property
def registered_at(self) -> datetime | None:
"""Get when sync was registered."""
ts = self.get("registered_on")
if ts and int(ts) > 0:
return datetime.fromtimestamp(int(ts), tz=timezone.utc)
return None
[docs]
def list_remote_snapshots(self) -> builtins.list[dict[str, Any]]:
"""Request list of snapshots available on the remote sender.
Returns:
List of snapshot information dicts.
"""
from typing import cast
manager = cast("SiteSyncIncomingVerifiedManager", self._manager)
return manager.list_remote_snapshots(self.key)
[docs]
def request_snapshot(
self,
snapshot_name: str,
retention: int | timedelta = 259200,
) -> None:
"""Request a snapshot to be synced from the remote sender.
Args:
snapshot_name: Name of snapshot on remote system.
retention: Retention period in seconds or timedelta.
"""
from typing import cast
manager = cast("SiteSyncIncomingVerifiedManager", self._manager)
manager.request_snapshot(self.key, snapshot_name, retention)
[docs]
def set_retention(
self,
snapshot_name: str,
retention: int | timedelta,
) -> None:
"""Set retention for a synced snapshot.
Args:
snapshot_name: Name of the snapshot.
retention: New retention period in seconds or timedelta.
"""
from typing import cast
manager = cast("SiteSyncIncomingVerifiedManager", self._manager)
manager.set_retention(self.key, snapshot_name, retention)
def __repr__(self) -> str:
key = self.get("$key", "?")
name = self.name
registered = self.is_registered
return f"<SiteSyncIncomingVerified key={key} name={name!r} registered={registered}>"
[docs]
class SiteSyncIncomingVerifiedManager:
"""Manager for verified incoming syncs (scoped to an incoming sync).
Example:
>>> incoming = client.site_syncs_incoming.get(1)
>>> verified = incoming.verified.get()
>>> print(f"Registered: {verified.is_registered}")
>>> # List snapshots available on sender
>>> snaps = verified.list_remote_snapshots()
>>> # Request a snapshot to sync back
>>> verified.request_snapshot("cloud-snap-2024-01-01", retention=604800)
"""
[docs]
def __init__(self, client: VergeClient, incoming_sync_key: int) -> None:
self._client = client
self._incoming_sync_key = incoming_sync_key
self._endpoint = "site_syncs_incoming_verified"
self._actions_endpoint = "site_syncs_incoming_verified_actions"
def _to_model(self, data: dict[str, Any]) -> SiteSyncIncomingVerified:
return SiteSyncIncomingVerified(data, self) # type: ignore[arg-type]
[docs]
def get(
self,
fields: builtins.list[str] | None = None,
) -> SiteSyncIncomingVerified | None:
"""Get the verified sync entry for this incoming sync.
Args:
fields: List of fields to return.
Returns:
SiteSyncIncomingVerified object or None.
"""
if fields is None:
fields = _DEFAULT_VERIFIED_FIELDS
params: dict[str, Any] = {
"filter": f"site_syncs_incoming eq {self._incoming_sync_key}",
"limit": 1,
}
if fields:
params["fields"] = ",".join(fields)
response = self._client._request("GET", self._endpoint, params=params)
if response is None:
return None
if isinstance(response, list):
if not response:
return None
return self._to_model(response[0])
return self._to_model(response)
[docs]
def list_remote_snapshots(self, verified_key: int) -> builtins.list[dict[str, Any]]:
"""Request list of snapshots available on the remote sender.
Args:
verified_key: Verified sync $key (ID).
Returns:
List of snapshot information dicts.
"""
body: dict[str, Any] = {
"site_syncs_incoming_verified": verified_key,
"action": "list_snaps",
}
response = self._client._request("POST", self._actions_endpoint, json_data=body)
if response and isinstance(response, dict):
snaps = response.get("snapshots", [])
if isinstance(snaps, list):
return snaps
return []
[docs]
def request_snapshot(
self,
verified_key: int,
snapshot_name: str,
retention: int | timedelta = 259200,
) -> None:
"""Request a snapshot to be synced from the remote sender.
Args:
verified_key: Verified sync $key (ID).
snapshot_name: Name of snapshot on remote system.
retention: Retention period in seconds or timedelta.
"""
if isinstance(retention, timedelta):
retention_seconds = int(retention.total_seconds())
else:
retention_seconds = int(retention)
body: dict[str, Any] = {
"site_syncs_incoming_verified": verified_key,
"action": "request",
"params": {
"snapshot": snapshot_name,
"retention": retention_seconds,
},
}
self._client._request("POST", self._actions_endpoint, json_data=body)
[docs]
def set_retention(
self,
verified_key: int,
snapshot_name: str,
retention: int | timedelta,
) -> None:
"""Set retention for a synced snapshot.
Args:
verified_key: Verified sync $key (ID).
snapshot_name: Name of the snapshot.
retention: New retention period in seconds or timedelta.
"""
if isinstance(retention, timedelta):
retention_seconds = int(retention.total_seconds())
else:
retention_seconds = int(retention)
body: dict[str, Any] = {
"site_syncs_incoming_verified": verified_key,
"action": "set_retention",
"params": {
"snapshot": snapshot_name,
"retention": retention_seconds,
},
}
self._client._request("POST", self._actions_endpoint, json_data=body)
# ============================================================================
# Site Sync Logs
# ============================================================================
# Default fields for logs
_DEFAULT_LOG_FIELDS = [
"$key",
"level",
"text",
"timestamp",
"user",
]
# Log level display mappings
LOG_LEVEL_DISPLAY = {
"audit": "Audit",
"message": "Message",
"warning": "Warning",
"error": "Error",
"critical": "Critical",
"summary": "Summary",
"debug": "Debug",
}
[docs]
class SiteSyncLog(ResourceObject):
"""Site sync log entry.
Properties:
level: Log level (audit, message, warning, error, critical).
level_display: Human-readable level name.
text: Log message text.
timestamp_us: Timestamp in microseconds.
logged_at: Datetime when logged.
user: User who triggered the log (if applicable).
"""
@property
def level(self) -> str:
"""Get log level."""
return str(self.get("level", "message"))
@property
def level_display(self) -> str:
"""Get human-readable level name."""
return LOG_LEVEL_DISPLAY.get(self.level, self.level.title())
@property
def is_error(self) -> bool:
"""Check if this is an error or critical log."""
return self.level in ("error", "critical")
@property
def is_warning(self) -> bool:
"""Check if this is a warning log."""
return self.level == "warning"
@property
def text(self) -> str:
"""Get log message text."""
return str(self.get("text", ""))
@property
def timestamp_us(self) -> int:
"""Get timestamp in microseconds."""
return int(self.get("timestamp", 0))
@property
def logged_at(self) -> datetime | None:
"""Get datetime when logged."""
ts = self.timestamp_us
if ts > 0:
return datetime.fromtimestamp(ts / 1_000_000, tz=timezone.utc)
return None
@property
def user(self) -> str:
"""Get user who triggered the log."""
return str(self.get("user", ""))
def __repr__(self) -> str:
level = self.level
text = self.text[:50] + "..." if len(self.text) > 50 else self.text
return f"<SiteSyncLog [{level}] {text!r}>"
[docs]
class SiteSyncOutgoingLogManager:
"""Manager for outgoing sync logs (scoped to an outgoing sync).
Example:
>>> sync = client.site_syncs.get(1)
>>> logs = sync.logs.list(limit=20)
>>> for log in logs:
... print(f"[{log.level}] {log.text}")
>>> # Get only errors
>>> errors = sync.logs.list_errors()
"""
[docs]
def __init__(self, client: VergeClient, sync_key: int) -> None:
self._client = client
self._sync_key = sync_key
self._endpoint = "site_syncs_outgoing_logs"
def _to_model(self, data: dict[str, Any]) -> SiteSyncLog:
return SiteSyncLog(data, None) # type: ignore[arg-type]
[docs]
def list( # noqa: A003
self,
filter: str | None = None, # noqa: A002
fields: builtins.list[str] | None = None,
limit: int | None = 100,
offset: int | None = None,
*,
level: str | None = None,
**filter_kwargs: Any,
) -> builtins.list[SiteSyncLog]:
"""List logs for this sync.
Args:
filter: OData filter string.
fields: List of fields to return.
limit: Maximum entries (default 100).
offset: Skip this many entries.
level: Filter by level (audit, message, warning, error, critical).
**filter_kwargs: Additional filter arguments.
Returns:
List of SiteSyncLog objects, newest first.
"""
conditions: builtins.list[str] = [f"site_syncs_outgoing eq {self._sync_key}"]
if level is not None:
conditions.append(f"level eq '{level}'")
if filter:
conditions.append(f"({filter})")
if filter_kwargs:
conditions.append(build_filter(**filter_kwargs))
combined_filter = " and ".join(conditions)
if fields is None:
fields = _DEFAULT_LOG_FIELDS
params: dict[str, Any] = {
"filter": combined_filter,
"sort": "-timestamp",
}
if fields:
params["fields"] = ",".join(fields)
if limit is not None:
params["limit"] = limit
if offset is not None:
params["offset"] = offset
response = self._client._request("GET", self._endpoint, params=params)
if response is None:
return []
if not isinstance(response, list):
return [self._to_model(response)]
return [self._to_model(item) for item in response]
[docs]
def list_errors(
self,
limit: int | None = 100,
fields: builtins.list[str] | None = None,
) -> builtins.list[SiteSyncLog]:
"""List error and critical logs.
Args:
limit: Maximum entries.
fields: List of fields to return.
Returns:
List of error/critical SiteSyncLog objects.
"""
return self.list(
filter="level eq 'error' or level eq 'critical'",
limit=limit,
fields=fields,
)
[docs]
def list_warnings(
self,
limit: int | None = 100,
fields: builtins.list[str] | None = None,
) -> builtins.list[SiteSyncLog]:
"""List warning logs.
Args:
limit: Maximum entries.
fields: List of fields to return.
Returns:
List of warning SiteSyncLog objects.
"""
return self.list(level="warning", limit=limit, fields=fields)
[docs]
class SiteSyncIncomingLogManager:
"""Manager for incoming sync logs (scoped to an incoming sync).
Example:
>>> incoming = client.site_syncs_incoming.get(1)
>>> logs = incoming.logs.list(limit=20)
>>> for log in logs:
... print(f"[{log.level}] {log.text}")
>>> # Get only errors
>>> errors = incoming.logs.list_errors()
"""
[docs]
def __init__(self, client: VergeClient, sync_key: int) -> None:
self._client = client
self._sync_key = sync_key
self._endpoint = "site_syncs_incoming_logs"
def _to_model(self, data: dict[str, Any]) -> SiteSyncLog:
return SiteSyncLog(data, None) # type: ignore[arg-type]
[docs]
def list( # noqa: A003
self,
filter: str | None = None, # noqa: A002
fields: builtins.list[str] | None = None,
limit: int | None = 100,
offset: int | None = None,
*,
level: str | None = None,
**filter_kwargs: Any,
) -> builtins.list[SiteSyncLog]:
"""List logs for this sync.
Args:
filter: OData filter string.
fields: List of fields to return.
limit: Maximum entries (default 100).
offset: Skip this many entries.
level: Filter by level (audit, message, warning, error, critical).
**filter_kwargs: Additional filter arguments.
Returns:
List of SiteSyncLog objects, newest first.
"""
conditions: builtins.list[str] = [f"site_syncs_incoming eq {self._sync_key}"]
if level is not None:
conditions.append(f"level eq '{level}'")
if filter:
conditions.append(f"({filter})")
if filter_kwargs:
conditions.append(build_filter(**filter_kwargs))
combined_filter = " and ".join(conditions)
if fields is None:
fields = _DEFAULT_LOG_FIELDS
params: dict[str, Any] = {
"filter": combined_filter,
"sort": "-timestamp",
}
if fields:
params["fields"] = ",".join(fields)
if limit is not None:
params["limit"] = limit
if offset is not None:
params["offset"] = offset
response = self._client._request("GET", self._endpoint, params=params)
if response is None:
return []
if not isinstance(response, list):
return [self._to_model(response)]
return [self._to_model(item) for item in response]
[docs]
def list_errors(
self,
limit: int | None = 100,
fields: builtins.list[str] | None = None,
) -> builtins.list[SiteSyncLog]:
"""List error and critical logs.
Args:
limit: Maximum entries.
fields: List of fields to return.
Returns:
List of error/critical SiteSyncLog objects.
"""
return self.list(
filter="level eq 'error' or level eq 'critical'",
limit=limit,
fields=fields,
)
[docs]
def list_warnings(
self,
limit: int | None = 100,
fields: builtins.list[str] | None = None,
) -> builtins.list[SiteSyncLog]:
"""List warning logs.
Args:
limit: Maximum entries.
fields: List of fields to return.
Returns:
List of warning SiteSyncLog objects.
"""
return self.list(level="warning", limit=limit, fields=fields)