Source code for pyvergeos.resources.networks

"""Virtual Network resource manager."""

from __future__ import annotations

import builtins
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Any, Literal

from pyvergeos.resources.base import ResourceManager, ResourceObject

if TYPE_CHECKING:
    from pyvergeos.client import VergeClient
    from pyvergeos.resources.aliases import NetworkAliasManager
    from pyvergeos.resources.dns import DNSZoneManager
    from pyvergeos.resources.dns_views import DNSViewManager
    from pyvergeos.resources.hosts import NetworkHostManager
    from pyvergeos.resources.ipsec import IPSecConnectionManager
    from pyvergeos.resources.network_stats import (
        IPSecActiveConnectionManager,
        NetworkMonitorStatsManager,
    )
    from pyvergeos.resources.routing import NetworkRoutingManager
    from pyvergeos.resources.rules import NetworkRuleManager
    from pyvergeos.resources.vnet_proxy import VnetProxyManager
    from pyvergeos.resources.wireguard import WireGuardManager


# Default fields to request for comprehensive network data
DEFAULT_NETWORK_FIELDS = [
    "$key",
    "name",
    "description",
    "enabled",
    "type",
    "layer2_type",
    "layer2_id",
    "network",
    "ipaddress",
    "gateway",
    "mtu",
    "dhcp_enabled",
    "dhcp_start",
    "dhcp_stop",
    "dns",
    "domain",
    "need_fw_apply",
    "need_dns_apply",
    "need_proxy_apply",
    "need_restart",
    "on_power_loss",
    "interface_vnet",
    "proxy_enabled",
    "machine#status#running as running",
    "machine#status#status as status",
]

# Type aliases for diagnostics
DiagnosticType = Literal["dhcp_leases", "addresses", "all"]

# Address type mapping for human-readable display
ADDRESS_TYPE_MAP = {
    "dynamic": "DHCP Lease",
    "static": "Static",
    "ipalias": "IP Alias",
    "proxy": "Proxy ARP",
    "virtual": "Virtual IP",
}

# Statistics fields to request from vnets endpoint
STATISTICS_FIELDS = [
    "$key",
    "name",
    "nic#stats#txbps as tx_bps",
    "nic#stats#rxbps as rx_bps",
    "nic#stats#tx_pckts as tx_packets",
    "nic#stats#rx_pckts as rx_packets",
    "nic#stats#tx_bytes as tx_bytes",
    "nic#stats#rx_bytes as rx_bytes",
    "nic_dmz#stats#txbps as dmz_tx_bps",
    "nic_dmz#stats#rxbps as dmz_rx_bps",
    "nic_dmz#stats#tx_pckts as dmz_tx_packets",
    "nic_dmz#stats#rx_pckts as dmz_rx_packets",
    "nic_dmz#stats#tx_bytes as dmz_tx_bytes",
    "nic_dmz#stats#rx_bytes as dmz_rx_bytes",
]


def _format_bytes(size: int | float | None) -> str:
    """Format bytes to human-readable string.

    Args:
        size: Size in bytes.

    Returns:
        Formatted string like "1.23 MB".
    """
    if size is None or size == 0:
        return "0 B"
    units = ["B", "KB", "MB", "GB", "TB"]
    unit_index = 0
    size_float = float(size)
    while size_float >= 1024 and unit_index < len(units) - 1:
        size_float /= 1024
        unit_index += 1
    return f"{size_float:.2f} {units[unit_index]}"


def _timestamp_to_datetime(timestamp: int | None) -> datetime | None:
    """Convert Unix timestamp to datetime.

    Args:
        timestamp: Unix timestamp in seconds.

    Returns:
        Datetime object or None if timestamp is 0 or None.
    """
    if not timestamp or timestamp == 0:
        return None
    return datetime.fromtimestamp(timestamp, tz=timezone.utc)


[docs] class Network(ResourceObject): """Virtual Network resource object.""" # Cached manager instances _stats_manager: NetworkMonitorStatsManager | None = None _ipsec_connections_manager: IPSecActiveConnectionManager | None = None
[docs] def power_on(self, apply_rules: bool = True) -> Network: """Power on the network. Args: apply_rules: Apply firewall rules on start. Returns: Self for chaining. """ manager = self._manager if not isinstance(manager, NetworkManager): raise TypeError("Manager must be NetworkManager") manager._power_action(self.key, "poweron", apply=apply_rules) return self
[docs] def power_off(self, force: bool = False) -> Network: """Power off the network. Args: force: Force immediate power off (killpower) instead of graceful. Returns: Self for chaining. """ manager = self._manager if not isinstance(manager, NetworkManager): raise TypeError("Manager must be NetworkManager") action = "killpower" if force else "poweroff" manager._power_action(self.key, action) return self
[docs] def restart(self, apply_rules: bool = True) -> Network: """Restart the network. Args: apply_rules: Apply firewall rules on restart. Returns: Self for chaining. """ manager = self._manager if not isinstance(manager, NetworkManager): raise TypeError("Manager must be NetworkManager") manager._power_action(self.key, "reset", apply=apply_rules) return self
# Alias for backwards compatibility reset = restart
[docs] def apply_rules(self) -> Network: """Apply firewall rules. Returns: Self for chaining. """ self._manager._client._request( "POST", "vnet_actions", json_data={"action": "refresh", "vnet": self.key} ) return self
[docs] def apply_dns(self) -> Network: """Apply DNS configuration. Returns: Self for chaining. """ self._manager._client._request("PUT", f"vnets/{self.key}/applydns") return self
@property def is_running(self) -> bool: """Check if network is powered on.""" return bool(self.get("running", False)) @property def status(self) -> str: """Get the network status (running, stopped, etc.).""" return str(self.get("status", "unknown")) @property def needs_restart(self) -> bool: """Check if network needs restart to apply changes.""" return bool(self.get("need_restart", False)) @property def needs_rule_apply(self) -> bool: """Check if firewall rules need to be applied.""" return bool(self.get("need_fw_apply", False)) @property def needs_dns_apply(self) -> bool: """Check if DNS configuration needs to be applied.""" return bool(self.get("need_dns_apply", False)) @property def needs_proxy_apply(self) -> bool: """Check if proxy configuration needs to be applied.""" return bool(self.get("need_proxy_apply", False)) @property def proxy_enabled(self) -> bool: """Check if proxy service is enabled on this network.""" return bool(self.get("proxy_enabled", False)) @property def rules(self) -> NetworkRuleManager: """Access firewall rules for this network. Returns: NetworkRuleManager for this network. Examples: List all rules:: rules = network.rules.list() Create a rule:: rule = network.rules.create( name="Allow HTTPS", direction="incoming", action="accept", protocol="tcp", destination_ports="443" ) """ from pyvergeos.resources.rules import NetworkRuleManager return NetworkRuleManager(self._manager._client, self) @property def aliases(self) -> NetworkAliasManager: """Access IP aliases for this network. Returns: NetworkAliasManager for this network. Examples: List all aliases:: aliases = network.aliases.list() Create an alias:: alias = network.aliases.create( ip="10.0.0.100", name="webserver", description="Main web server" ) Get alias by IP:: alias = network.aliases.get(ip="10.0.0.100") """ from pyvergeos.resources.aliases import NetworkAliasManager return NetworkAliasManager(self._manager._client, self) @property def hosts(self) -> NetworkHostManager: """Access DHCP/DNS host overrides for this network. Returns: NetworkHostManager for this network. Examples: List all host overrides:: hosts = network.hosts.list() Create a host override:: host = network.hosts.create( hostname="server01", ip="10.0.0.50" ) Get host by hostname:: host = network.hosts.get(hostname="server01") Note: Host override changes require DNS apply to take effect. """ from pyvergeos.resources.hosts import NetworkHostManager return NetworkHostManager(self._manager._client, self) @property def dns_zones(self) -> DNSZoneManager: """Access DNS zones for this network. Returns: DNSZoneManager for this network. Examples: List all DNS zones:: zones = network.dns_zones.list() Get a zone by domain:: zone = network.dns_zones.get(domain="example.com") List records in a zone:: records = zone.records.list() Create a DNS record:: record = zone.records.create( host="www", record_type="A", value="10.0.0.100" ) Note: DNS zones are typically created through the VergeOS UI. DNS changes require DNS apply on the network to take effect. """ from pyvergeos.resources.dns import DNSZoneManager return DNSZoneManager(self._manager._client, network=self) @property def dns_views(self) -> DNSViewManager: """Access DNS views for this network. Returns: DNSViewManager for this network. Examples: List all DNS views:: views = network.dns_views.list() Get a view by name:: view = network.dns_views.get(name="internal") Create a view:: view = network.dns_views.create(name="internal") Create a zone through a view:: zone = view.zones.create(domain="example.com") Note: DNS changes require DNS apply on the network to take effect. """ from pyvergeos.resources.dns_views import DNSViewManager return DNSViewManager(self._manager._client, self) @property def ipsec(self) -> IPSecConnectionManager: """Access IPSec VPN connections for this network. Returns: IPSecConnectionManager for this network. Examples: List all IPSec connections:: connections = network.ipsec.list() Get a connection by name:: conn = network.ipsec.get(name="Site-B") Create a connection:: conn = network.ipsec.create( name="Site-B", remote_gateway="203.0.113.1", pre_shared_key="MySecretKey123" ) Access Phase 2 policies:: policies = conn.policies.list() Create a Phase 2 policy:: policy = conn.policies.create( name="LAN-to-LAN", local_network="10.0.0.0/24", remote_network="192.168.1.0/24" ) Note: IPSec configuration changes may require applying firewall rules on the network for changes to take effect. """ from pyvergeos.resources.ipsec import IPSecConnectionManager return IPSecConnectionManager(self._manager._client, self) @property def wireguard(self) -> WireGuardManager: """Access WireGuard VPN interfaces for this network. Returns: WireGuardManager for this network. Examples: List all WireGuard interfaces:: interfaces = network.wireguard.list() Get an interface by name:: wg = network.wireguard.get(name="wg0") Create an interface:: wg = network.wireguard.create( name="wg0", ip_address="10.100.0.1/24", listen_port=51820 ) Access peers:: peers = wg.peers.list() Create a peer:: peer = wg.peers.create( name="remote-office", peer_ip="10.100.0.2", public_key="abc123...", allowed_ips="192.168.1.0/24" ) Get peer configuration:: config = peer.get_config() Note: WireGuard configuration changes may require applying firewall rules on the network for changes to take effect. """ from pyvergeos.resources.wireguard import WireGuardManager return WireGuardManager(self._manager._client, self) @property def routing(self) -> NetworkRoutingManager: """Access routing protocols (BGP, OSPF, EIGRP) for this network. Returns: NetworkRoutingManager for this network. Examples: Configure BGP:: # Create a BGP router bgp = network.routing.bgp_routers.create(asn=65000) # Add a neighbor bgp.commands.create( command="neighbor", params="192.168.1.1 remote-as 65001" ) Configure OSPF:: # Set router ID network.routing.ospf_commands.create( command="router-id", params="1.1.1.1" ) # Add network to area 0 network.routing.ospf_commands.create( command="network", params="10.0.0.0/24 area 0" ) Configure EIGRP:: # Create an EIGRP router eigrp = network.routing.eigrp_routers.create(asn=100) # Add network eigrp.commands.create( command="network", params="10.0.0.0/24" ) Note: Routing configuration changes require restarting the network for changes to take effect. """ from pyvergeos.resources.routing import NetworkRoutingManager return NetworkRoutingManager(self._manager._client, self) @property def proxy(self) -> VnetProxyManager: """Access proxy configuration for this network. The proxy service enables multi-tenant access through a single IP address using FQDN-based routing. Each tenant is assigned a unique hostname that routes to their UI through the proxy. Returns: VnetProxyManager for this network. Examples: Check if proxy is configured:: if network.proxy.exists(): proxy = network.proxy.get() print(f"Proxy listening on {proxy.listen_address}") Enable proxy on a network:: proxy = network.proxy.create( listen_address="0.0.0.0", default_self=True ) Add a tenant mapping:: mapping = proxy.tenants.create( tenant=tenant_key, fqdn="tenant1.example.com" ) List all tenant mappings:: for mapping in proxy.tenants.list(): print(f"{mapping.fqdn} -> {mapping.tenant_name}") Note: Proxy is typically used on external networks to allow multiple tenants to share a single public IP address. After adding/modifying tenant mappings, the tenant may need to have proxy applied via the tenant dashboard. """ from pyvergeos.resources.vnet_proxy import VnetProxyManager return VnetProxyManager(self._manager._client, self) @property def stats(self) -> NetworkMonitorStatsManager: """Access network monitoring statistics for this network. Provides network quality metrics including packet loss, latency, and overall connection health for monitoring and troubleshooting. Returns: NetworkMonitorStatsManager for this network. Examples: Get current stats:: stats = network.stats.get() print(f"Quality: {stats.quality}%") print(f"Latency: {stats.latency_avg_ms}ms") if stats.has_issues: print("Warning: Network quality issues detected!") Get recent history:: history = network.stats.history_short(limit=60) for point in history: print(f"{point.timestamp}: {point.quality}% quality") Get long-term trends:: from datetime import datetime, timedelta since = datetime.now() - timedelta(days=7) history = network.stats.history_long(since=since) Note: Statistics are collected by VergeOS network monitoring and may not be available for newly created networks until they have been running for some time. """ if self._stats_manager is None: from pyvergeos.resources.network_stats import NetworkMonitorStatsManager self._stats_manager = NetworkMonitorStatsManager(self._manager._client, self) return self._stats_manager @property def ipsec_connections(self) -> IPSecActiveConnectionManager: """Access active IPSec VPN connections for this network. Provides real-time status of currently established IPSec tunnels, including connection details and tunnel parameters. Returns: IPSecActiveConnectionManager for this network. Examples: List active connections:: for conn in network.ipsec_connections.list(): print(f"{conn.connection}: {conn.local} <-> {conn.remote}") print(f" Local Network: {conn.local_network}") print(f" Remote Network: {conn.remote_network}") print(f" Protocol: {conn.protocol}") Check connection count:: count = network.ipsec_connections.count() print(f"Active IPSec tunnels: {count}") Note: This provides real-time connection status, not configuration. For IPSec configuration management, use network.ipsec instead. Connections are dynamically computed and not stored in the database. """ if self._ipsec_connections_manager is None: from pyvergeos.resources.network_stats import IPSecActiveConnectionManager self._ipsec_connections_manager = IPSecActiveConnectionManager( self._manager._client, self ) return self._ipsec_connections_manager
[docs] def diagnostics( self, diagnostic_type: DiagnosticType = "all", ) -> dict[str, Any]: """Get network diagnostic information. Returns DHCP leases and/or address table entries for this network. Args: diagnostic_type: Type of diagnostics to retrieve: - "dhcp_leases": Only DHCP lease information (dynamic addresses) - "addresses": All address table entries - "all": Both DHCP leases and addresses (default) Returns: Dictionary containing: - network_key: Network key - network_name: Network name - is_running: Whether network is running - dhcp_enabled: Whether DHCP is enabled - dhcp_leases: List of DHCP leases (if requested) - dhcp_lease_count: Number of DHCP leases - addresses: List of all addresses (if requested) - address_count: Number of addresses Examples: Get all diagnostics:: diag = network.diagnostics() print(f"DHCP Leases: {diag['dhcp_lease_count']}") for lease in diag['dhcp_leases']: print(f" {lease['ip']} -> {lease['hostname']}") Get only DHCP leases:: diag = network.diagnostics(diagnostic_type="dhcp_leases") Get only address table:: diag = network.diagnostics(diagnostic_type="addresses") """ manager = self._manager if not isinstance(manager, NetworkManager): raise TypeError("Manager must be NetworkManager") return manager.diagnostics(self.key, diagnostic_type=diagnostic_type)
[docs] def statistics( self, include_history: bool = False, history_limit: int = 60, ) -> dict[str, Any]: """Get network traffic statistics. Returns current traffic statistics and optionally historical data. Args: include_history: Include historical monitoring data. history_limit: Maximum number of history entries to return (default 60). Returns: Dictionary containing: - network_key: Network key - network_name: Network name - is_running: Whether network is running - tx_bytes_per_sec: Transmit rate in bytes/second - rx_bytes_per_sec: Receive rate in bytes/second - tx_packets_per_sec: Transmit packets/second - rx_packets_per_sec: Receive packets/second - tx_bytes_total: Total bytes transmitted - rx_bytes_total: Total bytes received - tx_total_formatted: Human-readable total transmitted - rx_total_formatted: Human-readable total received - dmz_tx_bytes_per_sec: DMZ transmit rate (if applicable) - dmz_rx_bytes_per_sec: DMZ receive rate (if applicable) - ... (similar DMZ stats) - history: List of historical stats (if requested) Examples: Get current statistics:: stats = network.statistics() print(f"TX: {stats['tx_total_formatted']}") print(f"RX: {stats['rx_total_formatted']}") Get statistics with history:: stats = network.statistics(include_history=True) for entry in stats.get('history', []): print(f"{entry['timestamp']}: {entry['quality']}% quality") Note: Statistics are only available for running networks. """ manager = self._manager if not isinstance(manager, NetworkManager): raise TypeError("Manager must be NetworkManager") return manager.statistics( self.key, include_history=include_history, history_limit=history_limit )
[docs] class NetworkManager(ResourceManager[Network]): """Manager for Virtual Network operations. Provides CRUD operations and power management for virtual networks. Examples: List all networks:: networks = client.networks.list() Get a network by name:: external = client.networks.get(name="External") Create an internal network:: net = client.networks.create( name="Dev-Network", network_address="10.10.10.0/24", ip_address="10.10.10.1", dhcp_enabled=True, dhcp_start="10.10.10.100", dhcp_stop="10.10.10.200" ) Power operations:: net.power_on() net.restart() net.power_off() """ _endpoint = "vnets"
[docs] def __init__(self, client: VergeClient) -> None: super().__init__(client)
def _to_model(self, data: dict[str, Any]) -> Network: return Network(data, self) def _power_action(self, key: int, action: str, **params: Any) -> dict[str, Any] | None: """Execute a power action on a network. Uses the vnet_actions endpoint for power operations. Args: key: Network $key (ID). action: Power action (poweron, poweroff, killpower, reset). **params: Action parameters (e.g., apply=True). Returns: Action response. """ body: dict[str, Any] = {"vnet": key, "action": action} if params: body["params"] = params response = self._client._request("POST", "vnet_actions", json_data=body) if isinstance(response, dict): return response return None
[docs] def list( self, filter: str | None = None, fields: builtins.list[str] | None = None, limit: int | None = None, offset: int | None = None, **filter_kwargs: Any, ) -> builtins.list[Network]: """List networks with optional filtering. By default, requests common fields including running status. Override with the `fields` parameter for custom field selection. Args: filter: OData filter string. fields: List of fields to return (defaults to common fields). limit: Maximum number of results. offset: Skip this many results. **filter_kwargs: Shorthand filter arguments. Returns: List of Network objects. """ # Use default fields if none specified if fields is None: fields = DEFAULT_NETWORK_FIELDS.copy() return super().list( filter=filter, fields=fields, limit=limit, offset=offset, **filter_kwargs, )
[docs] def get( self, key: int | None = None, *, name: str | None = None, fields: builtins.list[str] | None = None, ) -> Network: """Get a single network by key or name. Args: key: Network $key (ID). name: Network name. fields: List of fields to return (defaults to common fields). Returns: Network object. Raises: NotFoundError: If network not found. """ if fields is None: fields = DEFAULT_NETWORK_FIELDS.copy() return super().get(key, name=name, fields=fields)
[docs] def list_internal(self) -> builtins.list[Network]: """List internal networks. Returns: List of internal networks. """ return self.list(filter="type eq 'internal'")
[docs] def list_external(self) -> builtins.list[Network]: """List external networks. Returns: List of external networks. """ return self.list(filter="type eq 'external'")
[docs] def list_running(self) -> builtins.list[Network]: """List all running networks. Returns: List of running networks. """ return self.list(filter="running eq true")
[docs] def list_stopped(self) -> builtins.list[Network]: """List all stopped networks. Returns: List of stopped networks. """ return self.list(filter="running eq false")
[docs] def create( # type: ignore[override] self, name: str, network_type: str = "internal", network_address: str | None = None, ip_address: str | None = None, gateway: str | None = None, dhcp_enabled: bool = False, dhcp_start: str | None = None, dhcp_stop: str | None = None, dns: str = "simple", dns_servers: builtins.list[str] | None = None, domain: str | None = None, mtu: int | None = None, layer2_type: str = "vxlan", layer2_id: int | None = None, interface_network: int | None = None, on_power_loss: str = "last_state", description: str = "", **kwargs: Any, ) -> Network: """Create a new virtual network. Args: name: Network name. network_type: Network type (internal, external, dmz). network_address: CIDR notation (e.g., "192.168.1.0/24"). ip_address: Router IP address within the network. gateway: Default gateway IP (for DHCP clients). dhcp_enabled: Enable DHCP server. dhcp_start: DHCP range start IP. dhcp_stop: DHCP range end IP. dns: DNS mode (disabled, simple, bind, network). dns_servers: List of DNS server IPs for DHCP. domain: Domain name for the network. mtu: MTU size (1000-65536). layer2_type: Layer 2 type (vlan, vxlan, none). layer2_id: VLAN or VXLAN ID. interface_network: Key of interface (uplink) network. on_power_loss: Behavior on power restore (power_on, last_state, leave_off). description: Network description. **kwargs: Additional network properties. Returns: Created network object. """ data: dict[str, Any] = { "name": name, "type": network_type, "dhcp_enabled": dhcp_enabled, "dns": dns, "layer2_type": layer2_type, "on_power_loss": on_power_loss, "description": description, **kwargs, } if network_address: data["network"] = network_address if ip_address: data["ipaddress"] = ip_address if gateway: data["gateway"] = gateway if dhcp_start: data["dhcp_start"] = dhcp_start if dhcp_stop: data["dhcp_stop"] = dhcp_stop if dns_servers: data["dnslist"] = ",".join(dns_servers) if domain: data["domain"] = domain if mtu: data["mtu"] = mtu if layer2_id: data["layer2_id"] = layer2_id if interface_network: data["interface_vnet"] = interface_network # Create the network response = self._client._request("POST", self._endpoint, json_data=data) if response is None: raise ValueError("No response from create operation") if not isinstance(response, dict): raise ValueError("Create operation returned invalid response") # API returns {$key, location, dbpath} - fetch full network key = response.get("$key") if key is None: raise ValueError("Create response missing $key") return self.get(int(key))
[docs] def diagnostics( self, key: int, diagnostic_type: DiagnosticType = "all", ) -> dict[str, Any]: """Get network diagnostic information. Returns DHCP leases and/or address table entries for a network. Args: key: Network $key (ID). diagnostic_type: Type of diagnostics to retrieve: - "dhcp_leases": Only DHCP lease information (dynamic addresses) - "addresses": All address table entries - "all": Both DHCP leases and addresses (default) Returns: Dictionary containing diagnostic information. Examples: Get all diagnostics:: diag = client.networks.diagnostics(network_key) print(f"DHCP Leases: {diag['dhcp_lease_count']}") Get via network object:: network = client.networks.get(name="Internal") diag = network.diagnostics() """ # Get the network to include metadata network = self.get(key) result: dict[str, Any] = { "network_key": network.key, "network_name": network.name, "is_running": network.is_running, "dhcp_enabled": network.get("dhcp_enabled", False), } # Get DHCP leases (dynamic addresses) if diagnostic_type in ("dhcp_leases", "all"): lease_params = { "filter": f"vnet eq {key} and type eq 'dynamic'", "fields": "$key,ip,mac,hostname,expiration,vendor", "sort": "ip", } lease_response = self._client._request("GET", "vnet_addresses", params=lease_params) leases: builtins.list[dict[str, Any]] = [] if lease_response: raw_leases = ( lease_response if isinstance(lease_response, builtins.list) else [lease_response] ) for lease in raw_leases: if isinstance(lease, dict) and lease.get("$key"): leases.append( { "key": lease.get("$key"), "ip": lease.get("ip"), "mac": lease.get("mac"), "hostname": lease.get("hostname"), "vendor": lease.get("vendor"), "expiration": _timestamp_to_datetime(lease.get("expiration")), } ) result["dhcp_leases"] = leases result["dhcp_lease_count"] = len(leases) # Get all addresses if diagnostic_type in ("addresses", "all"): address_params = { "filter": f"vnet eq {key}", "fields": "$key,ip,mac,hostname,type,expiration,vendor,description", "sort": "ip", } address_response = self._client._request("GET", "vnet_addresses", params=address_params) addresses: builtins.list[dict[str, Any]] = [] if address_response: raw_addresses = ( address_response if isinstance(address_response, builtins.list) else [address_response] ) for addr in raw_addresses: if isinstance(addr, dict) and addr.get("$key"): addr_type = addr.get("type", "") addresses.append( { "key": addr.get("$key"), "ip": addr.get("ip"), "mac": addr.get("mac"), "hostname": addr.get("hostname"), "type": ADDRESS_TYPE_MAP.get(addr_type, addr_type), "type_raw": addr_type, "vendor": addr.get("vendor"), "description": addr.get("description"), "expiration": _timestamp_to_datetime(addr.get("expiration")), } ) result["addresses"] = addresses result["address_count"] = len(addresses) return result
[docs] def statistics( self, key: int, include_history: bool = False, history_limit: int = 60, ) -> dict[str, Any]: """Get network traffic statistics. Returns current traffic statistics and optionally historical data. Args: key: Network $key (ID). include_history: Include historical monitoring data. history_limit: Maximum number of history entries (default 60). Returns: Dictionary containing traffic statistics. Examples: Get current statistics:: stats = client.networks.statistics(network_key) print(f"TX: {stats['tx_total_formatted']}") Get via network object:: network = client.networks.get(name="External") stats = network.statistics(include_history=True) Note: Statistics are only available for running networks. """ # Get the network first to get name and running status network = self.get(key) # Query stats fields stats_params = { "filter": f"$key eq {key}", "fields": ",".join(STATISTICS_FIELDS), } stats_response = self._client._request("GET", "vnets", params=stats_params) # Extract stats data stats_data: dict[str, Any] = {} if stats_response: if isinstance(stats_response, builtins.list) and stats_response: stats_data = stats_response[0] elif isinstance(stats_response, dict): stats_data = stats_response result: dict[str, Any] = { "network_key": network.key, "network_name": network.name, "is_running": network.is_running, # Router interface stats "tx_bytes_per_sec": stats_data.get("tx_bps"), "rx_bytes_per_sec": stats_data.get("rx_bps"), "tx_packets_per_sec": stats_data.get("tx_packets"), "rx_packets_per_sec": stats_data.get("rx_packets"), "tx_bytes_total": stats_data.get("tx_bytes"), "rx_bytes_total": stats_data.get("rx_bytes"), "tx_total_formatted": _format_bytes(stats_data.get("tx_bytes")), "rx_total_formatted": _format_bytes(stats_data.get("rx_bytes")), # DMZ interface stats "dmz_tx_bytes_per_sec": stats_data.get("dmz_tx_bps"), "dmz_rx_bytes_per_sec": stats_data.get("dmz_rx_bps"), "dmz_tx_packets_per_sec": stats_data.get("dmz_tx_packets"), "dmz_rx_packets_per_sec": stats_data.get("dmz_rx_packets"), "dmz_tx_bytes_total": stats_data.get("dmz_tx_bytes"), "dmz_rx_bytes_total": stats_data.get("dmz_rx_bytes"), } # Get historical data if requested if include_history: history_params = { "filter": f"vnet eq {key}", "fields": "timestamp,sent,dropped,quality,latency_usec_avg,latency_usec_peak", "sort": "-timestamp", "limit": str(history_limit), } history_response = self._client._request( "GET", "vnet_monitor_stats_history_short", params=history_params ) history: builtins.list[dict[str, Any]] = [] if history_response: raw_history = ( history_response if isinstance(history_response, builtins.list) else [history_response] ) for entry in raw_history: if isinstance(entry, dict): latency_avg = entry.get("latency_usec_avg", 0) latency_peak = entry.get("latency_usec_peak", 0) history.append( { "timestamp": _timestamp_to_datetime(entry.get("timestamp")), "sent": entry.get("sent"), "dropped": entry.get("dropped"), "quality": entry.get("quality"), "latency_avg_ms": round(latency_avg / 1000, 2) if latency_avg else 0, "latency_peak_ms": round(latency_peak / 1000, 2) if latency_peak else 0, } ) result["history"] = history return result