Source code for pyvergeos.resources.rules

"""Network firewall rule resource manager."""

from __future__ import annotations

import builtins
from typing import TYPE_CHECKING, Any, Literal

from pyvergeos.exceptions import NotFoundError, ValidationError
from pyvergeos.resources.base import ResourceManager, ResourceObject

if TYPE_CHECKING:
    from pyvergeos.client import VergeClient
    from pyvergeos.resources.networks import Network

# Type aliases for rule configuration
Direction = Literal["incoming", "outgoing"]
Action = Literal["accept", "drop", "reject", "translate", "route"]
Protocol = Literal["tcp", "udp", "tcpudp", "icmp", "any"]
Interface = Literal["auto", "router", "dmz", "wireguard", "any"]
PinPosition = Literal["top", "bottom"]

# Default fields for comprehensive rule data
RULE_DEFAULT_FIELDS = [
    "$key",
    "vnet",
    "vnet#name as vnet_name",
    "name",
    "description",
    "enabled",
    "orderid",
    "pin",
    "direction",
    "action",
    "protocol",
    "interface",
    "source_ip",
    "source_ports",
    "destination_ip",
    "destination_ports",
    "target_ip",
    "target_ports",
    "ct_state",
    "statistics",
    "log",
    "trace",
    "throttle",
    "drop_throttle",
    "packets",
    "bytes",
    "system_rule",
    "modified",
]


[docs] class NetworkRule(ResourceObject): """Network firewall rule resource object.""" @property def network_key(self) -> int: """Get the network key this rule belongs to.""" vnet = self.get("vnet") if vnet is None: raise ValueError("Rule has no network (vnet) key") return int(vnet) @property def network_name(self) -> str | None: """Get the network name this rule belongs to.""" return self.get("vnet_name") @property def is_enabled(self) -> bool: """Check if rule is enabled.""" return bool(self.get("enabled", True)) @property def is_system_rule(self) -> bool: """Check if this is a system rule (cannot be modified/deleted).""" return bool(self.get("system_rule", False)) @property def order(self) -> int: """Get rule order position.""" return int(self.get("orderid", 0)) @property def direction(self) -> str: """Get rule direction (incoming/outgoing).""" return str(self.get("direction", "incoming")) @property def action(self) -> str: """Get rule action (accept/drop/reject/translate/route).""" return str(self.get("action", "accept")) @property def protocol(self) -> str: """Get rule protocol (tcp/udp/tcpudp/icmp/any).""" return str(self.get("protocol", "any")) @property def source_ip(self) -> str | None: """Get source IP filter.""" return self.get("source_ip") @property def source_ports(self) -> str | None: """Get source ports filter.""" return self.get("source_ports") @property def destination_ip(self) -> str | None: """Get destination IP filter.""" return self.get("destination_ip") @property def destination_ports(self) -> str | None: """Get destination ports filter.""" return self.get("destination_ports") @property def target_ip(self) -> str | None: """Get target IP for translate/route actions.""" return self.get("target_ip") @property def target_ports(self) -> str | None: """Get target ports for port translation.""" return self.get("target_ports") @property def is_logging(self) -> bool: """Check if logging is enabled.""" return bool(self.get("log", False)) @property def has_statistics(self) -> bool: """Check if statistics tracking is enabled.""" return bool(self.get("statistics", False)) @property def packet_count(self) -> int: """Get number of packets matched by this rule.""" return int(self.get("packets") or 0) @property def byte_count(self) -> int: """Get number of bytes matched by this rule.""" return int(self.get("bytes") or 0)
[docs] def enable(self) -> NetworkRule: """Enable this rule. Returns: Self for chaining. """ if self.is_system_rule: raise ValidationError("Cannot modify system rule") return self.save(enabled=True) # type: ignore[return-value]
[docs] def disable(self) -> NetworkRule: """Disable this rule. Returns: Self for chaining. """ if self.is_system_rule: raise ValidationError("Cannot modify system rule") return self.save(enabled=False) # type: ignore[return-value]
[docs] class NetworkRuleManager(ResourceManager[NetworkRule]): """Manager for Network firewall rule operations. This manager is accessed through a Network object's rules property. Examples: List all rules for a network:: rules = network.rules.list() List only incoming rules:: incoming = network.rules.list(direction="incoming") Create a new accept rule:: rule = network.rules.create( name="Allow HTTPS", direction="incoming", action="accept", protocol="tcp", destination_ports="443" ) Create a NAT rule:: rule = network.rules.create( name="NAT to Web Server", direction="incoming", action="translate", protocol="tcp", destination_ports="80,443", target_ip="192.168.1.10" ) Delete a rule:: network.rules.delete(rule.key) """ _endpoint = "vnet_rules" _default_fields = RULE_DEFAULT_FIELDS
[docs] def __init__(self, client: VergeClient, network: Network) -> None: super().__init__(client) self._network = network
@property def network_key(self) -> int: """Get the network key for this manager.""" return self._network.key def _to_model(self, data: dict[str, Any]) -> NetworkRule: return NetworkRule(data, self)
[docs] def list( # type: ignore[override] self, filter: str | None = None, fields: builtins.list[str] | None = None, direction: Direction | None = None, action: Action | None = None, protocol: Protocol | None = None, enabled: bool | None = None, **kwargs: Any, ) -> builtins.list[NetworkRule]: """List firewall rules for this network. Args: filter: Additional OData filter string. fields: List of fields to return. direction: Filter by direction (incoming/outgoing). action: Filter by action (accept/drop/reject/translate/route). protocol: Filter by protocol (tcp/udp/tcpudp/icmp/any). enabled: Filter by enabled status. **kwargs: Additional filter arguments. Returns: List of NetworkRule objects sorted by order. """ if fields is None: fields = self._default_fields.copy() # Build filter for this network filters: builtins.list[str] = [f"vnet eq {self.network_key}"] if direction: filters.append(f"direction eq '{direction}'") if action: filters.append(f"action eq '{action}'") if protocol: filters.append(f"protocol eq '{protocol}'") if enabled is not None: filters.append(f"enabled eq {str(enabled).lower()}") if filter: filters.append(f"({filter})") combined_filter = " and ".join(filters) params: dict[str, Any] = { "filter": combined_filter, "fields": ",".join(fields), "sort": "+orderid", } response = self._client._request("GET", self._endpoint, params=params) if response is None: return [] if not isinstance(response, list): return [self._to_model(response)] return [self._to_model(item) for item in response]
[docs] def get( self, key: int | None = None, *, name: str | None = None, fields: builtins.list[str] | None = None, ) -> NetworkRule: """Get a rule by key or name. Args: key: Rule $key (ID). name: Rule name. fields: List of fields to return. Returns: NetworkRule object. Raises: NotFoundError: If rule not found. ValueError: If neither key nor name provided. """ if fields is None: fields = self._default_fields.copy() if key is not None: params: dict[str, Any] = {"fields": ",".join(fields)} response = self._client._request("GET", f"{self._endpoint}/{key}", params=params) if response is None: raise NotFoundError(f"Rule {key} not found") if not isinstance(response, dict): raise NotFoundError(f"Rule {key} returned invalid response") return self._to_model(response) if name is not None: escaped_name = name.replace("'", "''") rules = self.list(filter=f"name eq '{escaped_name}'", fields=fields) if not rules: raise NotFoundError(f"Rule with name '{name}' not found on this network") return rules[0] raise ValueError("Either key or name must be provided")
[docs] def create( # type: ignore[override] self, name: str, direction: Direction = "incoming", action: Action = "accept", protocol: Protocol = "any", interface: Interface = "auto", source_ip: str | None = None, source_ports: str | None = None, destination_ip: str | None = None, destination_ports: str | None = None, target_ip: str | None = None, target_ports: str | None = None, enabled: bool = True, log: bool = False, statistics: bool = False, pin: PinPosition | None = None, order: int | None = None, description: str = "", ) -> NetworkRule: """Create a new firewall rule. Args: name: Rule name (must be unique within the network). direction: Traffic direction (incoming or outgoing). action: Action to take (accept, drop, reject, translate, route). protocol: Protocol to match (tcp, udp, tcpudp, icmp, any). interface: Interface for the rule (auto, router, dmz, wireguard, any). source_ip: Source IP filter (IP, CIDR, or special value like "vnetself"). source_ports: Source ports (e.g., "80", "1024-65535", "80,443"). destination_ip: Destination IP filter. destination_ports: Destination ports. target_ip: Target IP for translate/route actions. target_ports: Target ports for port translation. enabled: Enable the rule (default True). log: Enable logging for this rule. statistics: Enable statistics tracking. pin: Pin position ("top" or "bottom"). order: Specific order position (alternative to pin). description: Rule description. Returns: Created NetworkRule object. Raises: ValidationError: If a rule with this name already exists. """ body: dict[str, Any] = { "vnet": self.network_key, "name": name, "direction": direction, "action": action, "protocol": protocol, "interface": interface, "enabled": enabled, "log": log, "statistics": statistics, } if source_ip: body["source_ip"] = source_ip if source_ports: body["source_ports"] = source_ports if destination_ip: body["destination_ip"] = destination_ip if destination_ports: body["destination_ports"] = destination_ports if target_ip: body["target_ip"] = target_ip if target_ports: body["target_ports"] = target_ports if description: body["description"] = description if pin: body["pin"] = pin if order is not None: body["orderid"] = order response = self._client._request("POST", self._endpoint, json_data=body) if response is None: raise ValueError("No response from create operation") if not isinstance(response, dict): raise ValueError("Create operation returned invalid response") # Get the created rule key and fetch full data key = response.get("$key") if key is None: raise ValueError("Create response missing $key") return self.get(int(key))
[docs] def update(self, key: int, **kwargs: Any) -> NetworkRule: """Update a firewall rule. Args: key: Rule $key (ID). **kwargs: Fields to update. Common fields: - name: New rule name - direction: incoming/outgoing - action: accept/drop/reject/translate/route - protocol: tcp/udp/tcpudp/icmp/any - source_ip, source_ports: Source filters - destination_ip, destination_ports: Destination filters - target_ip, target_ports: Target for translate/route - enabled: Enable/disable rule - log: Enable/disable logging - statistics: Enable/disable statistics Returns: Updated NetworkRule object. Raises: ValidationError: If trying to modify a system rule. """ # Check if this is a system rule rule = self.get(key) if rule.is_system_rule: raise ValidationError("Cannot modify system rule") self._client._request("PUT", f"{self._endpoint}/{key}", json_data=kwargs) return self.get(key)
[docs] def delete(self, key: int) -> None: """Delete a firewall rule. Args: key: Rule $key (ID). Raises: ValidationError: If trying to delete a system rule. Note: Rule changes are not active until network.apply_rules() is called. """ # Check if this is a system rule rule = self.get(key) if rule.is_system_rule: raise ValidationError("Cannot delete system rule") self._client._request("DELETE", f"{self._endpoint}/{key}")
[docs] def list_incoming(self) -> builtins.list[NetworkRule]: """List incoming rules. Returns: List of incoming rules. """ return self.list(direction="incoming")
[docs] def list_outgoing(self) -> builtins.list[NetworkRule]: """List outgoing rules. Returns: List of outgoing rules. """ return self.list(direction="outgoing")
[docs] def list_enabled(self) -> builtins.list[NetworkRule]: """List enabled rules. Returns: List of enabled rules. """ return self.list(enabled=True)
[docs] def list_disabled(self) -> builtins.list[NetworkRule]: """List disabled rules. Returns: List of disabled rules. """ return self.list(enabled=False)