"""WireGuard VPN resource managers."""
from __future__ import annotations
import builtins
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Any, Literal
from pyvergeos.exceptions import NotFoundError
from pyvergeos.resources.base import ResourceManager, ResourceObject
if TYPE_CHECKING:
from pyvergeos.client import VergeClient
from pyvergeos.resources.network_stats import WireGuardPeerStatusManager
from pyvergeos.resources.networks import Network
# Firewall configuration type mapping
FIREWALL_CONFIG_MAP = {
"site-to-site": "Site-to-Site",
"remote-user": "Remote User",
"none": "None",
}
# Reverse mapping for API
FIREWALL_CONFIG_API_MAP = {
"site_to_site": "site-to-site",
"remote_user": "remote-user",
"none": "none",
}
# Type aliases
FirewallConfigType = Literal["site_to_site", "remote_user", "none"]
# Default fields for WireGuard interface queries
DEFAULT_INTERFACE_FIELDS = [
"$key",
"vnet",
"name",
"description",
"enabled",
"ip",
"listenport",
"mtu",
"public_key",
"endpoint_ip",
"modified",
]
# Default fields for WireGuard peer queries
DEFAULT_PEER_FIELDS = [
"$key",
"wireguard",
"name",
"description",
"enabled",
"endpoint",
"port",
"peer_ip",
"public_key",
"preshared_key",
"allowed_ips",
"keepalive",
"configure_firewall",
"modified",
]
def _timestamp_to_datetime(timestamp: int | None) -> datetime | None:
"""Convert Unix timestamp to datetime.
Args:
timestamp: Unix timestamp in seconds.
Returns:
Datetime object or None if timestamp is 0 or None.
"""
if not timestamp or timestamp == 0:
return None
return datetime.fromtimestamp(timestamp, tz=timezone.utc)
[docs]
class WireGuardInterface(ResourceObject):
"""WireGuard VPN interface object."""
@property
def peers(self) -> WireGuardPeerManager:
"""Access peers for this WireGuard interface.
Returns:
WireGuardPeerManager for this interface.
Examples:
List all peers::
peers = interface.peers.list()
Create a peer::
peer = interface.peers.create(
name="remote-office",
peer_ip="10.100.0.2",
public_key="abc123...",
allowed_ips="192.168.1.0/24"
)
"""
manager = self._manager
if not isinstance(manager, WireGuardManager):
raise TypeError("Manager must be WireGuardManager")
return WireGuardPeerManager(manager._client, self)
@property
def is_enabled(self) -> bool:
"""Check if interface is enabled."""
return bool(self.get("enabled", False))
@property
def ip_address(self) -> str:
"""Get the tunnel IP address with CIDR notation."""
return str(self.get("ip", ""))
@property
def ip_only(self) -> str:
"""Get the IP address without the CIDR mask."""
ip = self.ip_address
if "/" in ip:
return ip.split("/")[0]
return ip
@property
def subnet_mask(self) -> str:
"""Get the CIDR subnet mask."""
ip = self.ip_address
if "/" in ip:
return ip.split("/")[1]
return "32"
@property
def listen_port(self) -> int:
"""Get the UDP listen port."""
return int(self.get("listenport", 51820))
@property
def mtu(self) -> int:
"""Get the MTU (0 = auto)."""
return int(self.get("mtu", 0))
@property
def mtu_display(self) -> str:
"""Get human-readable MTU."""
mtu = self.mtu
return "Auto" if mtu == 0 else str(mtu)
@property
def public_key(self) -> str:
"""Get the interface public key."""
return str(self.get("public_key", ""))
@property
def endpoint_ip(self) -> str:
"""Get the endpoint IP for peer configurations."""
return str(self.get("endpoint_ip", ""))
@property
def modified_at(self) -> datetime | None:
"""Get last modified timestamp."""
return _timestamp_to_datetime(self.get("modified"))
@property
def peer_status(self) -> WireGuardPeerStatusManager:
"""Access real-time peer status for this WireGuard interface.
Provides connection status including last handshake time and
transfer statistics for all peers.
Returns:
WireGuardPeerStatusManager for this interface.
Examples:
List peer status::
for status in wg.peer_status.list():
if status.is_connected:
print(f"Peer {status.peer_key} connected")
print(f" TX: {status.tx_bytes_formatted}")
print(f" RX: {status.rx_bytes_formatted}")
else:
print(f"Peer {status.peer_key} disconnected")
Get status for specific peer::
status = wg.peer_status.get_for_peer(peer.key)
print(f"Last handshake: {status.last_handshake}")
"""
from pyvergeos.resources.network_stats import WireGuardPeerStatusManager
manager = self._manager
if not isinstance(manager, WireGuardManager):
raise TypeError("Manager must be WireGuardManager")
return WireGuardPeerStatusManager(manager._client, self)
[docs]
class WireGuardPeer(ResourceObject):
"""WireGuard VPN peer object."""
@property
def is_enabled(self) -> bool:
"""Check if peer is enabled."""
return bool(self.get("enabled", False))
@property
def endpoint(self) -> str:
"""Get the peer endpoint (IP or hostname)."""
return str(self.get("endpoint", ""))
@property
def port(self) -> int:
"""Get the peer port."""
return int(self.get("port", 51820))
@property
def peer_ip(self) -> str:
"""Get the peer tunnel IP address."""
return str(self.get("peer_ip", ""))
@property
def public_key(self) -> str:
"""Get the peer public key."""
return str(self.get("public_key", ""))
@property
def has_preshared_key(self) -> bool:
"""Check if a preshared key is configured."""
return bool(self.get("preshared_key"))
@property
def allowed_ips(self) -> str:
"""Get the allowed IPs for this peer."""
return str(self.get("allowed_ips", ""))
@property
def keepalive(self) -> int:
"""Get the keepalive interval in seconds (0 = disabled)."""
return int(self.get("keepalive", 0))
@property
def firewall_config(self) -> str:
"""Get the raw firewall configuration mode."""
return str(self.get("configure_firewall", ""))
@property
def firewall_config_display(self) -> str:
"""Get human-readable firewall configuration mode."""
raw = self.firewall_config
return FIREWALL_CONFIG_MAP.get(raw, raw)
@property
def modified_at(self) -> datetime | None:
"""Get last modified timestamp."""
return _timestamp_to_datetime(self.get("modified"))
[docs]
def get_config(self) -> str:
"""Get the WireGuard configuration for this peer.
Returns the WireGuard config file content that can be used
by the remote peer to connect to this tunnel.
Returns:
WireGuard configuration file content as string.
Raises:
ValueError: If configuration cannot be retrieved.
Examples:
Get and save peer config::
config = peer.get_config()
with open("wg0.conf", "w") as f:
f.write(config)
"""
manager = self._manager
if not isinstance(manager, WireGuardPeerManager):
raise TypeError("Manager must be WireGuardPeerManager")
return manager.get_config(self.key)
[docs]
class WireGuardManager(ResourceManager[WireGuardInterface]):
"""Manager for WireGuard VPN interfaces.
WireGuard interfaces define the local VPN tunnel endpoint including
IP address, listen port, and cryptographic keys.
This is a sub-resource manager that operates on a specific network.
Examples:
List interfaces on a network::
interfaces = network.wireguard.list()
Get an interface by name::
wg = network.wireguard.get(name="wg0")
Create an interface::
wg = network.wireguard.create(
name="wg0",
ip_address="10.100.0.1/24",
listen_port=51820
)
Access peers::
peers = wg.peers.list()
"""
_endpoint = "vnet_wireguards"
[docs]
def __init__(self, client: VergeClient, network: Network) -> None:
super().__init__(client)
self._network = network
def _to_model(self, data: dict[str, Any]) -> WireGuardInterface:
# Add network info to the data
data["_network_key"] = self._network.key
data["_network_name"] = self._network.name
return WireGuardInterface(data, self)
[docs]
def list(
self,
filter: str | None = None,
fields: builtins.list[str] | None = None,
limit: int | None = None,
offset: int | None = None,
**filter_kwargs: Any,
) -> builtins.list[WireGuardInterface]:
"""List WireGuard interfaces on this network.
Args:
filter: OData filter string.
fields: List of fields to return.
limit: Maximum number of results.
offset: Skip this many results.
**filter_kwargs: Shorthand filter arguments (e.g., name="wg0").
Returns:
List of WireGuardInterface objects.
"""
# Build parameters
params: dict[str, Any] = {}
# Build filter - always include vnet parent filter
filters = [f"vnet eq {self._network.key}"]
if filter:
filters.append(filter)
params["filter"] = " and ".join(filters)
# Default fields
if fields is None:
fields = DEFAULT_INTERFACE_FIELDS.copy()
params["fields"] = ",".join(fields)
# Sort by name
params["sort"] = "name"
if limit is not None:
params["limit"] = limit
if offset is not None:
params["offset"] = offset
response = self._client._request("GET", self._endpoint, params=params)
if response is None:
return []
if not isinstance(response, builtins.list):
return [self._to_model(response)]
return [self._to_model(item) for item in response]
[docs]
def get(
self,
key: int | None = None,
*,
name: str | None = None,
fields: builtins.list[str] | None = None,
) -> WireGuardInterface:
"""Get a single WireGuard interface by key or name.
Args:
key: Interface $key (ID).
name: Interface name.
fields: List of fields to return.
Returns:
WireGuardInterface object.
Raises:
NotFoundError: If interface not found.
ValueError: If neither key nor name provided.
"""
# Use default fields if not specified
if fields is None:
fields = DEFAULT_INTERFACE_FIELDS.copy()
if key is not None:
params: dict[str, Any] = {"fields": ",".join(fields)}
response = self._client._request("GET", f"{self._endpoint}/{key}", params=params)
if response is None:
raise NotFoundError(f"WireGuard interface with key {key} not found")
if not isinstance(response, dict):
raise NotFoundError(f"WireGuard interface {key} returned invalid response")
return self._to_model(response)
if name is not None:
escaped_name = name.replace("'", "''")
results = self.list(filter=f"name eq '{escaped_name}'", fields=fields)
if not results:
raise NotFoundError(f"WireGuard interface with name '{name}' not found")
return results[0]
raise ValueError("Either key or name must be provided")
[docs]
def create( # type: ignore[override]
self,
name: str,
ip_address: str,
*,
listen_port: int = 51820,
mtu: int = 0,
endpoint_ip: str | None = None,
description: str = "",
enabled: bool = True,
) -> WireGuardInterface:
"""Create a new WireGuard interface.
Args:
name: Unique name for the interface.
ip_address: Tunnel IP address in CIDR notation (e.g., "10.100.0.1/24").
listen_port: UDP port to listen on (default: 51820).
mtu: MTU for the interface (0 = auto, default).
endpoint_ip: Public IP for peer configurations (auto-detected if not set).
description: Interface description.
enabled: Whether the interface is enabled.
Returns:
Created WireGuardInterface object.
Examples:
Basic interface::
wg = network.wireguard.create(
name="wg0",
ip_address="10.100.0.1/24"
)
Interface with custom port::
wg = network.wireguard.create(
name="wg-remote",
ip_address="10.100.0.1/24",
listen_port=51821,
endpoint_ip="203.0.113.50"
)
"""
body: dict[str, Any] = {
"vnet": self._network.key,
"name": name,
"enabled": enabled,
"ip": ip_address,
"listenport": listen_port,
"mtu": mtu,
}
if endpoint_ip:
body["endpoint_ip"] = endpoint_ip
if description:
body["description"] = description
response = self._client._request("POST", self._endpoint, json_data=body)
if response is None or not isinstance(response, dict):
raise ValueError("No response from create operation")
# Fetch the full interface
iface_key = response.get("$key")
if iface_key is None:
raise ValueError("Create response missing $key")
return self.get(int(iface_key))
[docs]
def update(self, key: int, **kwargs: Any) -> WireGuardInterface:
"""Update an existing WireGuard interface.
Args:
key: Interface $key (ID).
**kwargs: Attributes to update. Supports:
- name: New name
- ip_address: New tunnel IP in CIDR notation
- listen_port: New listen port
- mtu: New MTU (0 = auto)
- endpoint_ip: New endpoint IP
- description: New description
- enabled: Enable/disable interface
Returns:
Updated WireGuardInterface object.
"""
body: dict[str, Any] = {}
# Map kwargs to API field names
field_mapping = {
"name": "name",
"ip_address": "ip",
"listen_port": "listenport",
"mtu": "mtu",
"endpoint_ip": "endpoint_ip",
"description": "description",
"enabled": "enabled",
}
for kwarg, api_field in field_mapping.items():
if kwarg in kwargs:
body[api_field] = kwargs[kwarg]
if not body:
raise ValueError("No update parameters provided")
self._client._request("PUT", f"{self._endpoint}/{key}", json_data=body)
return self.get(key)
[docs]
def delete(self, key: int) -> None:
"""Delete a WireGuard interface.
This also removes all associated peers.
Args:
key: Interface $key (ID).
"""
self._client._request("DELETE", f"{self._endpoint}/{key}")
[docs]
class WireGuardPeerManager(ResourceManager[WireGuardPeer]):
"""Manager for WireGuard VPN peers.
Peers define remote endpoints that can connect to the WireGuard tunnel.
This is a sub-resource manager that operates on a specific WireGuard interface.
Examples:
List peers for an interface::
peers = interface.peers.list()
Create a site-to-site peer::
peer = interface.peers.create(
name="remote-office",
peer_ip="10.100.0.2",
public_key="abc123...",
allowed_ips="192.168.1.0/24"
)
Create a remote user peer::
peer = interface.peers.create(
name="laptop",
peer_ip="10.100.0.10",
public_key="xyz789...",
allowed_ips="10.100.0.10/32",
firewall_config="remote_user",
keepalive=25
)
Get peer configuration::
config = peer.get_config()
"""
_endpoint = "vnet_wireguard_peers"
[docs]
def __init__(self, client: VergeClient, interface: WireGuardInterface) -> None:
super().__init__(client)
self._interface = interface
def _to_model(self, data: dict[str, Any]) -> WireGuardPeer:
# Add interface info to the data
data["_interface_key"] = self._interface.key
data["_interface_name"] = self._interface.name
return WireGuardPeer(data, self)
[docs]
def list(
self,
filter: str | None = None,
fields: builtins.list[str] | None = None,
limit: int | None = None,
offset: int | None = None,
**filter_kwargs: Any,
) -> builtins.list[WireGuardPeer]:
"""List peers for this WireGuard interface.
Args:
filter: OData filter string.
fields: List of fields to return.
limit: Maximum number of results.
offset: Skip this many results.
**filter_kwargs: Shorthand filter arguments (e.g., name="laptop").
Returns:
List of WireGuardPeer objects.
"""
params: dict[str, Any] = {}
# Build filter - always include wireguard parent filter
filters = [f"wireguard eq {self._interface.key}"]
if filter:
filters.append(filter)
params["filter"] = " and ".join(filters)
# Default fields
if fields is None:
fields = DEFAULT_PEER_FIELDS.copy()
params["fields"] = ",".join(fields)
# Sort by name
params["sort"] = "name"
if limit is not None:
params["limit"] = limit
if offset is not None:
params["offset"] = offset
response = self._client._request("GET", self._endpoint, params=params)
if response is None:
return []
if not isinstance(response, builtins.list):
return [self._to_model(response)]
return [self._to_model(item) for item in response]
[docs]
def get(
self,
key: int | None = None,
*,
name: str | None = None,
fields: builtins.list[str] | None = None,
) -> WireGuardPeer:
"""Get a single peer by key or name.
Args:
key: Peer $key (ID).
name: Peer name.
fields: List of fields to return.
Returns:
WireGuardPeer object.
Raises:
NotFoundError: If peer not found.
ValueError: If neither key nor name provided.
"""
# Use default fields if not specified
if fields is None:
fields = DEFAULT_PEER_FIELDS.copy()
if key is not None:
params: dict[str, Any] = {"fields": ",".join(fields)}
response = self._client._request("GET", f"{self._endpoint}/{key}", params=params)
if response is None:
raise NotFoundError(f"WireGuard peer with key {key} not found")
if not isinstance(response, dict):
raise NotFoundError(f"WireGuard peer {key} returned invalid response")
return self._to_model(response)
if name is not None:
escaped_name = name.replace("'", "''")
results = self.list(filter=f"name eq '{escaped_name}'", fields=fields)
if not results:
raise NotFoundError(f"WireGuard peer with name '{name}' not found")
return results[0]
raise ValueError("Either key or name must be provided")
[docs]
def create( # type: ignore[override]
self,
name: str,
peer_ip: str,
public_key: str,
allowed_ips: str,
*,
endpoint: str | None = None,
port: int = 51820,
preshared_key: str | None = None,
keepalive: int = 0,
firewall_config: FirewallConfigType = "site_to_site",
description: str = "",
enabled: bool = True,
) -> WireGuardPeer:
"""Create a new WireGuard peer.
Args:
name: Unique name for the peer.
peer_ip: Tunnel IP address for the peer (e.g., "10.100.0.2").
public_key: Public key of the remote peer (base64 encoded).
allowed_ips: Comma-separated allowed IP ranges (e.g., "192.168.1.0/24").
endpoint: Remote peer IP/hostname (empty for roaming clients).
port: Remote peer port (default: 51820).
preshared_key: Optional preshared key for post-quantum resistance.
keepalive: Keepalive interval in seconds (0 = disabled).
firewall_config: Firewall rule configuration:
- "site_to_site": Create routes and accept rules for allowed IPs
- "remote_user": Same as site-to-site plus SNAT for outbound traffic
- "none": Don't create any firewall rules
description: Peer description.
enabled: Whether the peer is enabled.
Returns:
Created WireGuardPeer object.
Examples:
Site-to-site peer::
peer = interface.peers.create(
name="remote-office",
peer_ip="10.100.0.2",
public_key="abc123...",
allowed_ips="192.168.1.0/24",
endpoint="vpn.remote-office.com"
)
Remote user (roaming client)::
peer = interface.peers.create(
name="laptop",
peer_ip="10.100.0.10",
public_key="xyz789...",
allowed_ips="10.100.0.10/32",
firewall_config="remote_user",
keepalive=25
)
"""
# Map friendly value to API value
firewall_api = FIREWALL_CONFIG_API_MAP.get(firewall_config, "site-to-site")
body: dict[str, Any] = {
"wireguard": self._interface.key,
"name": name,
"enabled": enabled,
"peer_ip": peer_ip,
"public_key": public_key,
"allowed_ips": allowed_ips,
"port": port,
"keepalive": keepalive,
"configure_firewall": firewall_api,
}
if endpoint:
body["endpoint"] = endpoint
if preshared_key:
body["preshared_key"] = preshared_key
if description:
body["description"] = description
response = self._client._request("POST", self._endpoint, json_data=body)
if response is None or not isinstance(response, dict):
raise ValueError("No response from create operation")
# Fetch the full peer
peer_key = response.get("$key")
if peer_key is None:
raise ValueError("Create response missing $key")
return self.get(int(peer_key))
[docs]
def update(self, key: int, **kwargs: Any) -> WireGuardPeer:
"""Update an existing WireGuard peer.
Args:
key: Peer $key (ID).
**kwargs: Attributes to update. Supports:
- name: New name
- peer_ip: New peer tunnel IP
- public_key: New public key
- allowed_ips: New allowed IPs
- endpoint: New endpoint
- port: New port
- preshared_key: New preshared key
- keepalive: New keepalive interval
- firewall_config: New firewall configuration
- description: New description
- enabled: Enable/disable peer
Returns:
Updated WireGuardPeer object.
"""
body: dict[str, Any] = {}
# Map kwargs to API field names
field_mapping = {
"name": "name",
"peer_ip": "peer_ip",
"public_key": "public_key",
"allowed_ips": "allowed_ips",
"endpoint": "endpoint",
"port": "port",
"preshared_key": "preshared_key",
"keepalive": "keepalive",
"description": "description",
"enabled": "enabled",
}
for kwarg, api_field in field_mapping.items():
if kwarg in kwargs:
body[api_field] = kwargs[kwarg]
# Map firewall_config to API value
if "firewall_config" in kwargs:
body["configure_firewall"] = FIREWALL_CONFIG_API_MAP.get(
kwargs["firewall_config"], "site-to-site"
)
if not body:
raise ValueError("No update parameters provided")
self._client._request("PUT", f"{self._endpoint}/{key}", json_data=body)
return self.get(key)
[docs]
def delete(self, key: int) -> None:
"""Delete a WireGuard peer.
Args:
key: Peer $key (ID).
"""
self._client._request("DELETE", f"{self._endpoint}/{key}")
[docs]
def get_config(self, key: int) -> str:
"""Get the WireGuard configuration for a peer.
Retrieves the WireGuard config file content that can be used
by the remote peer to connect to this tunnel.
Args:
key: Peer $key (ID).
Returns:
WireGuard configuration file content as string.
Raises:
NotFoundError: If peer not found.
ValueError: If configuration cannot be retrieved.
Examples:
Get and save peer config::
config = interface.peers.get_config(peer.key)
with open("wg0.conf", "w") as f:
f.write(config)
Note:
Configuration is only available for peers that were created with
the autogenerate_peer option enabled in the VergeOS UI.
"""
try:
params = {"fields": "wg_config"}
response = self._client._request("GET", f"{self._endpoint}/{key}", params=params)
except NotFoundError:
# The wg_config field may not be available if peer wasn't auto-generated
raise ValueError(
"Configuration not available. This peer may not have been created "
"with auto-generate enabled, or the configuration has not been "
"generated yet."
) from None
if response is None:
raise NotFoundError(f"WireGuard peer with key {key} not found")
if not isinstance(response, dict):
raise ValueError("Invalid response from API")
config = response.get("wg_config", "")
if not config:
raise ValueError(
"No configuration available. Ensure peer was created with "
"auto-generate enabled or has proper key configuration."
)
return str(config)