Source code for pyvergeos.resources.certificates

"""Certificate resource manager for VergeOS SSL/TLS certificate management."""

from __future__ import annotations

import builtins
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Any

from pyvergeos.exceptions import NotFoundError
from pyvergeos.filters import build_filter
from pyvergeos.resources.base import ResourceManager, ResourceObject

if TYPE_CHECKING:
    from pyvergeos.client import VergeClient


# Certificate type mappings (friendly name -> API value)
CERT_TYPE_MAP = {
    "Manual": "manual",
    "LetsEncrypt": "letsencrypt",
    "SelfSigned": "self_signed",
}

# Reverse mapping (API value -> friendly name)
CERT_TYPE_DISPLAY = {
    "manual": "Manual",
    "letsencrypt": "Let's Encrypt",
    "self_signed": "Self-Signed",
}

# Key type mappings
KEY_TYPE_MAP = {
    "ECDSA": "ecdsa",
    "RSA": "rsa",
}

KEY_TYPE_DISPLAY = {
    "ecdsa": "ECDSA",
    "rsa": "RSA",
}

# Default fields for certificate list operations
_DEFAULT_CERT_FIELDS = [
    "$key",
    "domain",
    "domainname",
    "domainlist",
    "description",
    "type",
    "acme_server",
    "key_type",
    "rsa_key_size",
    "contact",
    "agree_tos",
    "valid",
    "autocreated",
    "expires",
    "created",
    "modified",
]

# Fields that include sensitive key material
_CERT_KEY_FIELDS = [
    "public",
    "private",
    "chain",
]


[docs] class Certificate(ResourceObject): """SSL/TLS certificate resource object. Represents an SSL/TLS certificate in VergeOS. Certificates can be: - Manual: Uploaded certificate with public/private keys - Let's Encrypt: Automatically managed via ACME protocol - Self-Signed: Self-signed certificate generated by VergeOS Properties: domain: Primary domain name for the certificate. domain_name: Alternative accessor for domain name. domain_list: List of Subject Alternative Names (SANs). description: Certificate description. cert_type: Certificate type (API value). cert_type_display: Friendly certificate type name. key_type: Key type (ecdsa or rsa). key_type_display: Friendly key type name. rsa_key_size: RSA key size (if RSA key type). acme_server: ACME server URL (for Let's Encrypt). contact_user_key: Contact user key (for Let's Encrypt). is_tos_agreed: Whether TOS is accepted (for Let's Encrypt). is_valid: Whether certificate is valid (not expired). is_auto_created: Whether certificate was auto-created by system. expires_at: Expiration datetime. created_at: Creation datetime. modified_at: Last modified datetime. days_until_expiry: Days until certificate expires. public_key: Public certificate (PEM format, if loaded). private_key: Private key (PEM format, if loaded). chain: Certificate chain (PEM format, if loaded). """ @property def domain(self) -> str: """Get primary domain name.""" # Prefer domain field, fall back to domainname val = self.get("domain") or self.get("domainname") return str(val) if val else "" @property def domain_name(self) -> str: """Get domain name (alias for domain).""" return self.domain @property def domain_list(self) -> builtins.list[str]: """Get list of Subject Alternative Names.""" domain_list_raw = self.get("domainlist", "") if not domain_list_raw: return [] return [d.strip() for d in str(domain_list_raw).split(",") if d.strip()] @property def description(self) -> str: """Get certificate description.""" return str(self.get("description", "")) @property def cert_type(self) -> str: """Get certificate type (API value).""" return str(self.get("type", "")) @property def cert_type_display(self) -> str: """Get friendly certificate type name.""" return CERT_TYPE_DISPLAY.get(self.cert_type, self.cert_type) @property def key_type(self) -> str: """Get key type (API value).""" return str(self.get("key_type", "")) @property def key_type_display(self) -> str: """Get friendly key type name.""" return KEY_TYPE_DISPLAY.get(self.key_type, self.key_type.upper() if self.key_type else "") @property def rsa_key_size(self) -> int | None: """Get RSA key size (if RSA key type).""" val = self.get("rsa_key_size") if val is None or val == "": return None return int(val) @property def acme_server(self) -> str: """Get ACME server URL (for Let's Encrypt).""" return str(self.get("acme_server", "")) @property def contact_user_key(self) -> int | None: """Get contact user key (for Let's Encrypt).""" val = self.get("contact") return int(val) if val is not None else None @property def is_tos_agreed(self) -> bool: """Check if Terms of Service is accepted.""" return bool(self.get("agree_tos", False)) @property def is_valid(self) -> bool: """Check if certificate is valid (not expired).""" return bool(self.get("valid", False)) @property def is_auto_created(self) -> bool: """Check if certificate was auto-created by system.""" return bool(self.get("autocreated", False)) @property def expires_at(self) -> datetime | None: """Get expiration datetime.""" ts = self.get("expires") if ts is None or ts == 0: return None return datetime.fromtimestamp(int(ts), tz=timezone.utc) @property def created_at(self) -> datetime | None: """Get creation datetime.""" ts = self.get("created") if ts is None or ts == 0: return None return datetime.fromtimestamp(int(ts), tz=timezone.utc) @property def modified_at(self) -> datetime | None: """Get last modified datetime.""" ts = self.get("modified") if ts is None or ts == 0: return None return datetime.fromtimestamp(int(ts), tz=timezone.utc) @property def days_until_expiry(self) -> int | None: """Get days until certificate expires.""" if self.expires_at is None: return None now = datetime.now(timezone.utc) delta = self.expires_at - now return delta.days @property def public_key(self) -> str | None: """Get public certificate (PEM format, if loaded).""" val = self.get("public") return str(val) if val else None @property def private_key(self) -> str | None: """Get private key (PEM format, if loaded).""" val = self.get("private") return str(val) if val else None @property def chain(self) -> str | None: """Get certificate chain (PEM format, if loaded).""" val = self.get("chain") return str(val) if val else None
[docs] def refresh(self) -> Certificate: """Refresh certificate data from API. Returns: Updated Certificate object. """ from typing import cast manager = cast("CertificateManager", self._manager) return manager.get(self.key)
[docs] def save(self, **kwargs: Any) -> Certificate: """Update certificate with new values. Args: **kwargs: Fields to update. Returns: Updated Certificate object. """ from typing import cast manager = cast("CertificateManager", self._manager) return manager.update(self.key, **kwargs)
[docs] def delete(self) -> None: """Delete this certificate.""" from typing import cast manager = cast("CertificateManager", self._manager) manager.delete(self.key)
[docs] def renew(self, *, force: bool = False) -> Certificate: """Renew or regenerate this certificate. For Let's Encrypt certificates, triggers ACME renewal. For self-signed certificates, regenerates with a new key pair. Args: force: Force renewal even if not near expiration. Returns: Updated Certificate object after renewal. """ from typing import cast manager = cast("CertificateManager", self._manager) return manager.renew(self.key, force=force)
def __repr__(self) -> str: return f"<Certificate key={self.get('$key', '?')} domain={self.domain!r} type={self.cert_type_display!r}>"
[docs] class CertificateManager(ResourceManager[Certificate]): """Manager for SSL/TLS certificates. Provides CRUD operations for SSL/TLS certificates including manual uploads, Let's Encrypt (ACME) certificates, and self-signed certificates. Example: >>> # List all certificates >>> certs = client.certificates.list() >>> >>> # Get a specific certificate >>> cert = client.certificates.get(domain="example.com") >>> >>> # Create a self-signed certificate >>> cert = client.certificates.create( ... domain="internal.local", ... cert_type="SelfSigned", ... description="Internal services" ... ) >>> >>> # Create a Let's Encrypt certificate >>> cert = client.certificates.create( ... domain="public.example.com", ... cert_type="LetsEncrypt", ... agree_tos=True, ... contact_user_key=1 ... ) >>> >>> # Renew a certificate >>> cert = client.certificates.renew(cert.key, force=True) """ _endpoint = "certificates"
[docs] def __init__(self, client: VergeClient) -> None: super().__init__(client)
def _to_model(self, data: dict[str, Any]) -> Certificate: """Convert API response to Certificate object.""" return Certificate(data, self)
[docs] def list( self, filter: str | None = None, fields: builtins.list[str] | None = None, limit: int | None = None, offset: int | None = None, *, cert_type: str | None = None, valid: bool | None = None, include_keys: bool = False, **filter_kwargs: Any, ) -> builtins.list[Certificate]: """List certificates with optional filtering. Args: filter: OData filter string. fields: List of fields to return. limit: Maximum number of results. offset: Skip this many results. cert_type: Filter by certificate type (Manual, LetsEncrypt, SelfSigned). valid: Filter by valid status (True for valid only). include_keys: Include sensitive key material (public, private, chain). **filter_kwargs: Additional filter arguments. Returns: List of Certificate objects. """ params: dict[str, Any] = {} filters: builtins.list[str] = [] # Build filter from string if filter: filters.append(filter) # Filter by certificate type if cert_type: api_type = CERT_TYPE_MAP.get(cert_type, cert_type.lower()) filters.append(f"type eq '{api_type}'") # Filter by valid status if valid is not None: filters.append(f"valid eq {str(valid).lower()}") # Add filter kwargs if filter_kwargs: filters.append(build_filter(**filter_kwargs)) if filters: params["filter"] = " and ".join(filters) # Field selection if fields: params["fields"] = ",".join(fields) else: field_list = list(_DEFAULT_CERT_FIELDS) if include_keys: field_list.extend(_CERT_KEY_FIELDS) params["fields"] = ",".join(field_list) # Pagination if limit is not None: params["limit"] = limit if offset is not None: params["offset"] = offset response = self._client._request("GET", self._endpoint, params=params) if response is None: return [] if not isinstance(response, list): return [self._to_model(response)] return [self._to_model(item) for item in response if item and item.get("$key")]
[docs] def list_valid(self, **kwargs: Any) -> builtins.list[Certificate]: """List only valid (unexpired) certificates. Args: **kwargs: Additional arguments passed to list(). Returns: List of valid Certificate objects. """ return self.list(valid=True, **kwargs)
[docs] def list_by_type( self, cert_type: str, **kwargs: Any, ) -> builtins.list[Certificate]: """List certificates by type. Args: cert_type: Certificate type (Manual, LetsEncrypt, SelfSigned). **kwargs: Additional arguments passed to list(). Returns: List of Certificate objects. """ return self.list(cert_type=cert_type, **kwargs)
[docs] def list_expiring( self, days: int = 30, **kwargs: Any, ) -> builtins.list[Certificate]: """List certificates expiring within specified days. Args: days: Number of days threshold. **kwargs: Additional arguments passed to list(). Returns: List of Certificate objects expiring within the threshold. """ certs = self.list(**kwargs) return [ c for c in certs if c.days_until_expiry is not None and 0 <= c.days_until_expiry < days ]
[docs] def list_expired(self, **kwargs: Any) -> builtins.list[Certificate]: """List expired certificates. Args: **kwargs: Additional arguments passed to list(). Returns: List of expired Certificate objects. """ certs = self.list(**kwargs) return [c for c in certs if c.days_until_expiry is not None and c.days_until_expiry < 0]
[docs] def get( # type: ignore[override] self, key: int | None = None, *, domain: str | None = None, fields: builtins.list[str] | None = None, include_keys: bool = False, ) -> Certificate: """Get a certificate by key or domain. Args: key: Certificate $key (ID). domain: Domain name (exact match). fields: List of fields to return. include_keys: Include sensitive key material. Returns: Certificate object. Raises: NotFoundError: If certificate not found. ValueError: If neither key nor domain provided. """ # Build field list if fields: field_list = fields else: field_list = list(_DEFAULT_CERT_FIELDS) if include_keys: field_list.extend(_CERT_KEY_FIELDS) if key is not None: params: dict[str, Any] = {"fields": ",".join(field_list)} response = self._client._request("GET", f"{self._endpoint}/{key}", params=params) if response is None: raise NotFoundError(f"Certificate with key {key} not found") if not isinstance(response, dict): raise NotFoundError(f"Certificate with key {key} returned invalid response") return self._to_model(response) if domain is not None: # Escape single quotes in domain name escaped_domain = domain.replace("'", "''") results = self.list(filter=f"domain eq '{escaped_domain}'", fields=field_list, limit=1) if not results: # Try domainname field as fallback results = self.list( filter=f"domainname eq '{escaped_domain}'", fields=field_list, limit=1 ) if not results: raise NotFoundError(f"Certificate for domain '{domain}' not found") return results[0] raise ValueError("Either key or domain must be provided")
[docs] def create( # type: ignore[override] self, domain: str, *, cert_type: str = "SelfSigned", domain_list: builtins.list[str] | str | None = None, description: str | None = None, public_key: str | None = None, private_key: str | None = None, chain: str | None = None, acme_server: str | None = None, eab_key_id: str | None = None, eab_hmac_key: str | None = None, key_type: str | None = None, rsa_key_size: int | None = None, contact_user_key: int | None = None, agree_tos: bool = False, ) -> Certificate: """Create a new SSL/TLS certificate. Args: domain: Primary domain name for the certificate. cert_type: Certificate type (Manual, LetsEncrypt, SelfSigned). Default is SelfSigned. domain_list: Additional domains (SANs) as list or comma-separated string. description: Certificate description. public_key: Public certificate in PEM format (required for Manual). private_key: Private key in PEM format (required for Manual). chain: Certificate chain in PEM format (optional for Manual). acme_server: ACME server URL (for Let's Encrypt). eab_key_id: External Account Binding key ID (for some ACME providers). eab_hmac_key: External Account Binding HMAC key. key_type: Key type (ECDSA or RSA). Default is ECDSA. rsa_key_size: RSA key size (2048, 3072, or 4096). Default is 2048. contact_user_key: Contact user $key for Let's Encrypt. agree_tos: Accept Let's Encrypt Terms of Service (required for LE). Returns: Created Certificate object. Raises: ValidationError: If parameters invalid. ConflictError: If certificate for domain already exists. ValueError: If required parameters missing for certificate type. """ # Validate required parameters based on certificate type api_type = CERT_TYPE_MAP.get(cert_type, cert_type.lower()) if api_type == "manual": if not public_key: raise ValueError("public_key is required for Manual certificate type") if not private_key: raise ValueError("private_key is required for Manual certificate type") if api_type == "letsencrypt" and not agree_tos: raise ValueError("agree_tos must be True for Let's Encrypt certificates") # Build request body body: dict[str, Any] = { "domainname": domain, "type": api_type, } # Add domain list (SANs) if domain_list: if isinstance(domain_list, list): body["domainlist"] = ",".join(domain_list) else: body["domainlist"] = domain_list # Add optional description if description: body["description"] = description # Manual certificate fields if api_type == "manual": body["public"] = public_key body["private"] = private_key if chain: body["chain"] = chain # Let's Encrypt fields if api_type == "letsencrypt": body["agree_tos"] = True if acme_server: body["acme_server"] = acme_server if eab_key_id: body["eab_kid"] = eab_key_id if eab_hmac_key: body["eab_hmac_key"] = eab_hmac_key if contact_user_key is not None: body["contact"] = contact_user_key # Key type for self-signed and Let's Encrypt if api_type != "manual": if key_type: api_key_type = KEY_TYPE_MAP.get(key_type, key_type.lower()) body["key_type"] = api_key_type if rsa_key_size is not None: body["rsa_key_size"] = str(rsa_key_size) response = self._client._request("POST", self._endpoint, json_data=body) if response is None: raise ValueError("No response from create operation") if not isinstance(response, dict): raise ValueError("Create operation returned invalid response") # Fetch full object since POST response may not include all fields key = response.get("$key") if key is not None: return self.get(int(key)) return self._to_model(response)
[docs] def update( # type: ignore[override] self, key: int, *, description: str | None = None, domain_list: builtins.list[str] | str | None = None, public_key: str | None = None, private_key: str | None = None, chain: str | None = None, acme_server: str | None = None, eab_key_id: str | None = None, eab_hmac_key: str | None = None, key_type: str | None = None, rsa_key_size: int | None = None, contact_user_key: int | None = None, agree_tos: bool | None = None, ) -> Certificate: """Update a certificate. Args: key: Certificate $key (ID). description: New certificate description. domain_list: New additional domains (SANs). public_key: New public certificate (PEM format, manual certs only). private_key: New private key (PEM format, manual certs only). chain: New certificate chain (PEM format, manual certs only). acme_server: New ACME server URL (Let's Encrypt only). eab_key_id: New External Account Binding key ID. eab_hmac_key: New External Account Binding HMAC key. key_type: New key type (ECDSA or RSA). rsa_key_size: New RSA key size (2048, 3072, or 4096). contact_user_key: New contact user $key. agree_tos: Accept Terms of Service. Returns: Updated Certificate object. Raises: NotFoundError: If certificate not found. ValidationError: If parameters invalid. """ body: dict[str, Any] = {} if description is not None: body["description"] = description if domain_list is not None: if isinstance(domain_list, list): body["domainlist"] = ",".join(domain_list) else: body["domainlist"] = domain_list if public_key is not None: body["public"] = public_key if private_key is not None: body["private"] = private_key if chain is not None: body["chain"] = chain if acme_server is not None: body["acme_server"] = acme_server if eab_key_id is not None: body["eab_kid"] = eab_key_id if eab_hmac_key is not None: body["eab_hmac_key"] = eab_hmac_key if key_type is not None: api_key_type = KEY_TYPE_MAP.get(key_type, key_type.lower()) body["key_type"] = api_key_type if rsa_key_size is not None: body["rsa_key_size"] = str(rsa_key_size) if contact_user_key is not None: body["contact"] = contact_user_key if agree_tos is not None: body["agree_tos"] = agree_tos if not body: # No changes, just fetch current return self.get(key) response = self._client._request("PUT", f"{self._endpoint}/{key}", json_data=body) if response is None or not isinstance(response, dict): return self.get(key) return self._to_model(response)
[docs] def delete(self, key: int) -> None: """Delete a certificate. Args: key: Certificate $key (ID). Raises: NotFoundError: If certificate not found. """ self._client._request("DELETE", f"{self._endpoint}/{key}")
[docs] def renew(self, key: int, *, force: bool = False) -> Certificate: """Renew or regenerate a certificate. For Let's Encrypt certificates, triggers ACME renewal. For self-signed certificates, regenerates with a new key pair. Manual certificates cannot be renewed (upload new keys instead). Args: key: Certificate $key (ID). force: Force renewal even if not near expiration. Returns: Updated Certificate object after renewal. Raises: NotFoundError: If certificate not found. ValueError: If certificate type doesn't support renewal. """ # Get current certificate to check type cert = self.get(key) if cert.cert_type == "manual": raise ValueError( "Manual certificates cannot be renewed. Use update() to upload new keys instead." ) # Check if renewal is needed (unless forced) if not force and cert.days_until_expiry is not None and cert.days_until_expiry > 30: # Return current certificate - not near expiration return cert # Trigger renewal by setting renew=True body = {"renew": True} self._client._request("PUT", f"{self._endpoint}/{key}", json_data=body) # Fetch updated certificate return self.get(key)
[docs] def refresh(self, key: int) -> Certificate: """Alias for renew() - refresh/renew a certificate. Args: key: Certificate $key (ID). Returns: Updated Certificate object after renewal. """ return self.renew(key, force=True)