Source code for pyvergeos.resources.dns

"""DNS Zone and Record resource managers."""

from __future__ import annotations

import builtins
from typing import TYPE_CHECKING, Any, Literal

from pyvergeos.exceptions import NotFoundError
from pyvergeos.resources.base import ResourceManager, ResourceObject

if TYPE_CHECKING:
    from pyvergeos.client import VergeClient
    from pyvergeos.resources.dns_views import DNSView
    from pyvergeos.resources.networks import Network

# Default fields for DNS zone data
DNS_ZONE_DEFAULT_FIELDS = [
    "$key",
    "view",
    "domain",
    "type",
    "nameserver",
    "email",
    "default_ttl",
    "serial_number",
]

# Default fields for DNS record data
DNS_RECORD_DEFAULT_FIELDS = [
    "$key",
    "zone",
    "host",
    "type",
    "value",
    "ttl",
    "mx_preference",
    "weight",
    "port",
    "description",
]

# Type mappings for zone types
ZONE_TYPE_DISPLAY = {
    "master": "Primary",
    "slave": "Secondary",
    "redirect": "Redirect",
    "forward": "Forward",
    "static-stub": "Static Stub",
    "stub": "Stub",
}

# Type alias for record types
RecordType = Literal["A", "AAAA", "CNAME", "MX", "NS", "PTR", "SRV", "TXT", "CAA"]

# Type alias for zone types
ZoneType = Literal["master", "slave", "redirect", "forward", "static-stub", "stub"]


[docs] class DNSRecord(ResourceObject): """DNS Record resource object.""" @property def zone_key(self) -> int: """Get the zone key this record belongs to.""" zone = self.get("zone") if zone is None: raise ValueError("Record has no zone key") return int(zone) @property def host(self) -> str: """Get the hostname of this record.""" return self.get("host") or "" @property def record_type(self) -> str: """Get the record type (A, AAAA, CNAME, MX, etc.).""" return self.get("type") or "A" @property def value(self) -> str: """Get the record value (IP address, hostname, or text).""" return self.get("value") or "" @property def ttl(self) -> str | None: """Get the TTL for this record.""" return self.get("ttl") @property def mx_preference(self) -> int: """Get the MX preference (for MX records).""" return self.get("mx_preference") or 0 @property def weight(self) -> int: """Get the weight (for SRV records).""" return self.get("weight") or 0 @property def port(self) -> int: """Get the port (for SRV records).""" return self.get("port") or 0 @property def description(self) -> str | None: """Get the record description.""" return self.get("description")
[docs] class DNSRecordManager(ResourceManager[DNSRecord]): """Manager for DNS Record operations. This manager is accessed through a DNSZone object's records property. Examples: List all records in a zone:: records = zone.records.list() Get an A record by host:: record = zone.records.get(host="www") Create an A record:: record = zone.records.create( host="www", record_type="A", value="10.0.0.100" ) Create an MX record:: record = zone.records.create( host="", # root domain record_type="MX", value="mail.example.com", mx_preference=10 ) Delete a record:: zone.records.delete(record.key) """ _endpoint = "vnet_dns_zone_records" _default_fields = DNS_RECORD_DEFAULT_FIELDS
[docs] def __init__(self, client: VergeClient, zone: DNSZone) -> None: super().__init__(client) self._zone = zone
@property def zone_key(self) -> int: """Get the zone key for this manager.""" return self._zone.key def _to_model(self, data: dict[str, Any]) -> DNSRecord: return DNSRecord(data, self)
[docs] def list( # type: ignore[override] self, filter: str | None = None, fields: builtins.list[str] | None = None, host: str | None = None, record_type: RecordType | None = None, **kwargs: Any, ) -> builtins.list[DNSRecord]: """List DNS records in this zone. Args: filter: Additional OData filter string. fields: List of fields to return. host: Filter by exact hostname. record_type: Filter by record type (A, CNAME, MX, etc.). **kwargs: Additional filter arguments. Returns: List of DNSRecord objects sorted by order. """ if fields is None: fields = self._default_fields.copy() # Build filter for this zone filters: builtins.list[str] = [ f"zone eq {self.zone_key}", ] if host is not None: escaped_host = host.replace("'", "''") filters.append(f"host eq '{escaped_host}'") if record_type: filters.append(f"type eq '{record_type}'") if filter: filters.append(f"({filter})") combined_filter = " and ".join(filters) params: dict[str, Any] = { "filter": combined_filter, "fields": ",".join(fields), "sort": "+orderid", } response = self._client._request("GET", self._endpoint, params=params) if response is None: return [] if not isinstance(response, list): return [self._to_model(response)] return [self._to_model(item) for item in response]
[docs] def get( # type: ignore[override] self, key: int | None = None, *, host: str | None = None, record_type: RecordType | None = None, fields: builtins.list[str] | None = None, ) -> DNSRecord: """Get a DNS record by key, host, or type. Args: key: Record $key (ID). host: Hostname of the record. record_type: Record type to filter by. fields: List of fields to return. Returns: DNSRecord object. Raises: NotFoundError: If record not found. ValueError: If no identifier provided. """ if fields is None: fields = self._default_fields.copy() if key is not None: params: dict[str, Any] = {"fields": ",".join(fields)} response = self._client._request("GET", f"{self._endpoint}/{key}", params=params) if response is None: raise NotFoundError(f"DNS record {key} not found") if not isinstance(response, dict): raise NotFoundError(f"DNS record {key} returned invalid response") return self._to_model(response) if host is not None: records = self.list(host=host, record_type=record_type, fields=fields) if not records: raise NotFoundError(f"DNS record with host '{host}' not found in this zone") return records[0] if record_type is not None: records = self.list(record_type=record_type, fields=fields) if not records: raise NotFoundError(f"DNS record with type '{record_type}' not found in this zone") return records[0] raise ValueError("Either key, host, or record_type must be provided")
[docs] def create( # type: ignore[override] self, record_type: RecordType, value: str, host: str = "", ttl: str | None = None, mx_preference: int = 0, weight: int = 0, port: int = 0, description: str | None = None, ) -> DNSRecord: """Create a new DNS record. Args: record_type: Record type (A, AAAA, CNAME, MX, NS, PTR, SRV, TXT, CAA). value: Record value (IP address, hostname, or text). host: Hostname for the record (empty string for root domain). ttl: Time-to-live for the record (e.g., "1h", "30m", "1d"). mx_preference: Preference value for MX records (lower = higher priority). weight: Weight for SRV records. port: Port for SRV records. description: Optional description for the record. Returns: Created DNSRecord object. Note: DNS changes require DNS apply on the network to take effect. """ body: dict[str, Any] = { "zone": self.zone_key, "host": host, "type": record_type, "value": value, } if ttl: body["ttl"] = ttl if mx_preference > 0: body["mx_preference"] = mx_preference if weight > 0: body["weight"] = weight if port > 0: body["port"] = port if description: body["description"] = description response = self._client._request("POST", self._endpoint, json_data=body) if response is None: raise ValueError("No response from create operation") if not isinstance(response, dict): raise ValueError("Create operation returned invalid response") # Get the created record key and fetch full data key = response.get("$key") if key is None: raise ValueError("Create response missing $key") return self.get(int(key))
[docs] def delete(self, key: int) -> None: """Delete a DNS record. Args: key: Record $key (ID). Note: DNS changes require DNS apply on the network to take effect. """ self._client._request("DELETE", f"{self._endpoint}/{key}")
[docs] class DNSZone(ResourceObject): """DNS Zone resource object."""
[docs] def __init__( self, data: dict[str, Any], manager: DNSZoneManager, view_key: int | None = None, view_name: str | None = None, ) -> None: super().__init__(data, manager) self._view_key = view_key self._view_name = view_name
@property def view_key(self) -> int: """Get the DNS view key this zone belongs to.""" if self._view_key is not None: return self._view_key view = self.get("view") if view is None: raise ValueError("Zone has no view key") return int(view) @property def view_name(self) -> str | None: """Get the DNS view name this zone belongs to.""" return self._view_name @property def domain(self) -> str: """Get the domain name for this zone.""" return self.get("domain") or "" @property def zone_type(self) -> str: """Get the zone type (raw API value).""" return self.get("type") or "master" @property def zone_type_display(self) -> str: """Get the zone type display name.""" return ZONE_TYPE_DISPLAY.get(self.zone_type, self.zone_type) @property def nameserver(self) -> str | None: """Get the primary nameserver for this zone.""" return self.get("nameserver") @property def email(self) -> str | None: """Get the zone administrator email.""" return self.get("email") @property def default_ttl(self) -> str | None: """Get the default TTL for records in this zone.""" return self.get("default_ttl") @property def serial_number(self) -> int: """Get the zone serial number.""" return self.get("serial_number") or 1 @property def records(self) -> DNSRecordManager: """Access DNS records for this zone. Returns: DNSRecordManager for this zone. Examples: List all records:: records = zone.records.list() Create an A record:: record = zone.records.create( host="www", record_type="A", value="10.0.0.100" ) """ return DNSRecordManager(self._manager._client, self)
[docs] class DNSZoneManager(ResourceManager[DNSZone]): """Manager for DNS Zone operations. This manager can be accessed through a Network object's dns_zones property (read-only listing) or through a DNSView object's zones property (full CRUD). Examples: List all zones for a network:: zones = network.dns_zones.list() Get a zone by domain:: zone = network.dns_zones.get(domain="example.com") Create a zone through a view:: view = network.dns_views.get(name="default") zone = view.zones.create(domain="example.com") List records in a zone:: records = zone.records.list() Create a record:: record = zone.records.create( host="www", record_type="A", value="10.0.0.100" ) """ _endpoint = "vnet_dns_zones" _views_endpoint = "vnet_dns_views" _default_fields = DNS_ZONE_DEFAULT_FIELDS
[docs] def __init__( self, client: VergeClient, network: Network | None = None, view: DNSView | None = None, ) -> None: super().__init__(client) self._network = network self._view = view if network is None and view is None: raise ValueError("Either network or view must be provided")
@property def network_key(self) -> int: """Get the network key for this manager.""" if self._network is not None: return self._network.key if self._view is not None: return self._view.network_key raise ValueError("No network or view available") @property def view_key(self) -> int | None: """Get the view key if this manager is scoped to a view.""" if self._view is not None: return self._view.key return None def _to_model( self, data: dict[str, Any], view_key: int | None = None, view_name: str | None = None, ) -> DNSZone: return DNSZone(data, self, view_key=view_key, view_name=view_name) def _get_views(self) -> builtins.list[dict[str, Any]]: """Get DNS views for this network. Returns: List of DNS view dictionaries with $key and name. """ params: dict[str, Any] = { "filter": f"vnet eq {self.network_key}", "fields": "$key,name", } response = self._client._request("GET", self._views_endpoint, params=params) if response is None: return [] if not isinstance(response, list): return [response] return response
[docs] def list( # type: ignore[override] self, filter: str | None = None, fields: builtins.list[str] | None = None, domain: str | None = None, zone_type: ZoneType | None = None, include_records: bool = False, **kwargs: Any, ) -> builtins.list[DNSZone]: """List DNS zones for this network or view. Args: filter: Additional OData filter string. fields: List of fields to return. domain: Filter by exact domain name. zone_type: Filter by zone type. include_records: Include records in each zone (not yet implemented). **kwargs: Additional filter arguments. Returns: List of DNSZone objects sorted by domain. """ if fields is None: fields = self._default_fields.copy() # When scoped to a view, query directly if self._view is not None: return self._list_for_view( self._view.key, self._view.get("name"), filter=filter, fields=fields, domain=domain, zone_type=zone_type, ) # Otherwise iterate all views for this network views = self._get_views() if not views: return [] all_zones: builtins.list[DNSZone] = [] for view in views: view_key = view.get("$key") view_name = view.get("name") if view_key is None: continue all_zones.extend( self._list_for_view( view_key, view_name, filter=filter, fields=fields, domain=domain, zone_type=zone_type, ) ) return all_zones
def _list_for_view( self, view_key: int, view_name: str | None, filter: str | None = None, # noqa: A002 fields: builtins.list[str] | None = None, domain: str | None = None, zone_type: ZoneType | None = None, ) -> builtins.list[DNSZone]: """List zones for a specific view.""" if fields is None: fields = self._default_fields.copy() filters: builtins.list[str] = [f"view eq {view_key}"] if domain: escaped_domain = domain.replace("'", "''") filters.append(f"domain eq '{escaped_domain}'") if zone_type: filters.append(f"type eq '{zone_type}'") if filter: filters.append(f"({filter})") combined_filter = " and ".join(filters) params: dict[str, Any] = { "filter": combined_filter, "fields": ",".join(fields), "sort": "+domain", } response = self._client._request("GET", self._endpoint, params=params) if response is None: return [] if not isinstance(response, list): return [self._to_model(response, view_key=view_key, view_name=view_name)] return [self._to_model(item, view_key=view_key, view_name=view_name) for item in response]
[docs] def get( # type: ignore[override] self, key: int | None = None, *, domain: str | None = None, fields: builtins.list[str] | None = None, ) -> DNSZone: """Get a DNS zone by key or domain. Args: key: Zone $key (ID). domain: Domain name of the zone. fields: List of fields to return. Returns: DNSZone object. Raises: NotFoundError: If zone not found. ValueError: If no identifier provided. """ if fields is None: fields = self._default_fields.copy() if key is not None: params: dict[str, Any] = {"fields": ",".join(fields)} response = self._client._request("GET", f"{self._endpoint}/{key}", params=params) if response is None: raise NotFoundError(f"DNS zone {key} not found") if not isinstance(response, dict): raise NotFoundError(f"DNS zone {key} returned invalid response") return self._to_model(response) if domain is not None: zones = self.list(domain=domain, fields=fields) if not zones: raise NotFoundError(f"DNS zone '{domain}' not found on this network") return zones[0] raise ValueError("Either key or domain must be provided")
[docs] def create( # type: ignore[override] self, domain: str, zone_type: ZoneType = "master", nameserver: str = "", email: str = "", default_ttl: str = "1h", notify: str = "yes", allow_notify: str = "none;", allow_transfer: str = "none;", masters: str = "", refresh_interval: str = "3h", retry_interval: str = "30m", expiry_period: str = "3w", negative_ttl: str = "10m", forwarders: str = "", **kwargs: Any, ) -> DNSZone: """Create a new DNS zone. This method is only available when the manager is scoped to a view (accessed via ``view.zones``). Use ``network.dns_views`` to get a view first. Args: domain: Domain name for the zone (required). zone_type: Zone type (default "master"). nameserver: Primary nameserver FQDN. email: Zone administrator email. default_ttl: Default TTL for records (e.g., "1h", "30m", "1d"). notify: Notify setting ("yes", "no", "explicit"). allow_notify: IP networks allowed to send NOTIFY. allow_transfer: Networks allowed zone transfers. masters: Master server(s) for secondary zones. refresh_interval: SOA refresh interval (default "3h"). retry_interval: SOA retry interval (default "30m"). expiry_period: SOA expiry period (default "3w"). negative_ttl: Negative cache TTL (default "10m"). forwarders: Forwarder servers (semicolon-delimited). **kwargs: Additional zone properties. Returns: Created DNSZone object. Raises: ValueError: If manager is not scoped to a view. Note: DNS changes require DNS apply on the network to take effect. """ if self._view is None: raise ValueError( "Zone creation requires a view. " "Use network.dns_views.get(name='...').zones.create() instead." ) body: dict[str, Any] = { "view": self._view.key, "domain": domain, "type": zone_type, "default_ttl": default_ttl, "notify": notify, "allow_notify": allow_notify, "allow_transfer": allow_transfer, "refresh_interval": refresh_interval, "retry_interval": retry_interval, "expiry_period": expiry_period, "negative_ttl": negative_ttl, **kwargs, } if nameserver: body["nameserver"] = nameserver if email: body["email"] = email if masters: body["masters"] = masters if forwarders: body["forwarders"] = forwarders response = self._client._request("POST", self._endpoint, json_data=body) if response is None: raise ValueError("No response from create operation") if not isinstance(response, dict): raise ValueError("Create operation returned invalid response") key = response.get("$key") if key is None: raise ValueError("Create response missing $key") return self.get(int(key))
[docs] def delete(self, key: int) -> None: """Delete a DNS zone. Args: key: Zone $key (ID). Note: Deleting a zone also deletes all records within it. DNS changes require DNS apply on the network to take effect. """ self._client._request("DELETE", f"{self._endpoint}/{key}")