"""Certificate resource manager for VergeOS SSL/TLS certificate management."""
from __future__ import annotations
import builtins
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Any
from pyvergeos.exceptions import NotFoundError
from pyvergeos.filters import build_filter
from pyvergeos.resources.base import ResourceManager, ResourceObject
if TYPE_CHECKING:
from pyvergeos.client import VergeClient
# Certificate type mappings (friendly name -> API value)
CERT_TYPE_MAP = {
"Manual": "manual",
"LetsEncrypt": "letsencrypt",
"SelfSigned": "self_signed",
}
# Reverse mapping (API value -> friendly name)
CERT_TYPE_DISPLAY = {
"manual": "Manual",
"letsencrypt": "Let's Encrypt",
"self_signed": "Self-Signed",
}
# Key type mappings
KEY_TYPE_MAP = {
"ECDSA": "ecdsa",
"RSA": "rsa",
}
KEY_TYPE_DISPLAY = {
"ecdsa": "ECDSA",
"rsa": "RSA",
}
# Default fields for certificate list operations
_DEFAULT_CERT_FIELDS = [
"$key",
"domain",
"domainname",
"domainlist",
"description",
"type",
"acme_server",
"key_type",
"rsa_key_size",
"contact",
"agree_tos",
"valid",
"autocreated",
"expires",
"created",
"modified",
]
# Fields that include sensitive key material
_CERT_KEY_FIELDS = [
"public",
"private",
"chain",
]
[docs]
class Certificate(ResourceObject):
"""SSL/TLS certificate resource object.
Represents an SSL/TLS certificate in VergeOS. Certificates can be:
- Manual: Uploaded certificate with public/private keys
- Let's Encrypt: Automatically managed via ACME protocol
- Self-Signed: Self-signed certificate generated by VergeOS
Properties:
domain: Primary domain name for the certificate.
domain_name: Alternative accessor for domain name.
domain_list: List of Subject Alternative Names (SANs).
description: Certificate description.
cert_type: Certificate type (API value).
cert_type_display: Friendly certificate type name.
key_type: Key type (ecdsa or rsa).
key_type_display: Friendly key type name.
rsa_key_size: RSA key size (if RSA key type).
acme_server: ACME server URL (for Let's Encrypt).
contact_user_key: Contact user key (for Let's Encrypt).
is_tos_agreed: Whether TOS is accepted (for Let's Encrypt).
is_valid: Whether certificate is valid (not expired).
is_auto_created: Whether certificate was auto-created by system.
expires_at: Expiration datetime.
created_at: Creation datetime.
modified_at: Last modified datetime.
days_until_expiry: Days until certificate expires.
public_key: Public certificate (PEM format, if loaded).
private_key: Private key (PEM format, if loaded).
chain: Certificate chain (PEM format, if loaded).
"""
@property
def domain(self) -> str:
"""Get primary domain name."""
# Prefer domain field, fall back to domainname
val = self.get("domain") or self.get("domainname")
return str(val) if val else ""
@property
def domain_name(self) -> str:
"""Get domain name (alias for domain)."""
return self.domain
@property
def domain_list(self) -> builtins.list[str]:
"""Get list of Subject Alternative Names."""
domain_list_raw = self.get("domainlist", "")
if not domain_list_raw:
return []
return [d.strip() for d in str(domain_list_raw).split(",") if d.strip()]
@property
def description(self) -> str:
"""Get certificate description."""
return str(self.get("description", ""))
@property
def cert_type(self) -> str:
"""Get certificate type (API value)."""
return str(self.get("type", ""))
@property
def cert_type_display(self) -> str:
"""Get friendly certificate type name."""
return CERT_TYPE_DISPLAY.get(self.cert_type, self.cert_type)
@property
def key_type(self) -> str:
"""Get key type (API value)."""
return str(self.get("key_type", ""))
@property
def key_type_display(self) -> str:
"""Get friendly key type name."""
return KEY_TYPE_DISPLAY.get(self.key_type, self.key_type.upper() if self.key_type else "")
@property
def rsa_key_size(self) -> int | None:
"""Get RSA key size (if RSA key type)."""
val = self.get("rsa_key_size")
if val is None or val == "":
return None
return int(val)
@property
def acme_server(self) -> str:
"""Get ACME server URL (for Let's Encrypt)."""
return str(self.get("acme_server", ""))
@property
def contact_user_key(self) -> int | None:
"""Get contact user key (for Let's Encrypt)."""
val = self.get("contact")
return int(val) if val is not None else None
@property
def is_tos_agreed(self) -> bool:
"""Check if Terms of Service is accepted."""
return bool(self.get("agree_tos", False))
@property
def is_valid(self) -> bool:
"""Check if certificate is valid (not expired)."""
return bool(self.get("valid", False))
@property
def is_auto_created(self) -> bool:
"""Check if certificate was auto-created by system."""
return bool(self.get("autocreated", False))
@property
def expires_at(self) -> datetime | None:
"""Get expiration datetime."""
ts = self.get("expires")
if ts is None or ts == 0:
return None
return datetime.fromtimestamp(int(ts), tz=timezone.utc)
@property
def created_at(self) -> datetime | None:
"""Get creation datetime."""
ts = self.get("created")
if ts is None or ts == 0:
return None
return datetime.fromtimestamp(int(ts), tz=timezone.utc)
@property
def modified_at(self) -> datetime | None:
"""Get last modified datetime."""
ts = self.get("modified")
if ts is None or ts == 0:
return None
return datetime.fromtimestamp(int(ts), tz=timezone.utc)
@property
def days_until_expiry(self) -> int | None:
"""Get days until certificate expires."""
if self.expires_at is None:
return None
now = datetime.now(timezone.utc)
delta = self.expires_at - now
return delta.days
@property
def public_key(self) -> str | None:
"""Get public certificate (PEM format, if loaded)."""
val = self.get("public")
return str(val) if val else None
@property
def private_key(self) -> str | None:
"""Get private key (PEM format, if loaded)."""
val = self.get("private")
return str(val) if val else None
@property
def chain(self) -> str | None:
"""Get certificate chain (PEM format, if loaded)."""
val = self.get("chain")
return str(val) if val else None
[docs]
def refresh(self) -> Certificate:
"""Refresh certificate data from API.
Returns:
Updated Certificate object.
"""
from typing import cast
manager = cast("CertificateManager", self._manager)
return manager.get(self.key)
[docs]
def save(self, **kwargs: Any) -> Certificate:
"""Update certificate with new values.
Args:
**kwargs: Fields to update.
Returns:
Updated Certificate object.
"""
from typing import cast
manager = cast("CertificateManager", self._manager)
return manager.update(self.key, **kwargs)
[docs]
def delete(self) -> None:
"""Delete this certificate."""
from typing import cast
manager = cast("CertificateManager", self._manager)
manager.delete(self.key)
[docs]
def renew(self, *, force: bool = False) -> Certificate:
"""Renew or regenerate this certificate.
For Let's Encrypt certificates, triggers ACME renewal.
For self-signed certificates, regenerates with a new key pair.
Args:
force: Force renewal even if not near expiration.
Returns:
Updated Certificate object after renewal.
"""
from typing import cast
manager = cast("CertificateManager", self._manager)
return manager.renew(self.key, force=force)
def __repr__(self) -> str:
return f"<Certificate key={self.get('$key', '?')} domain={self.domain!r} type={self.cert_type_display!r}>"
[docs]
class CertificateManager(ResourceManager[Certificate]):
"""Manager for SSL/TLS certificates.
Provides CRUD operations for SSL/TLS certificates including manual uploads,
Let's Encrypt (ACME) certificates, and self-signed certificates.
Example:
>>> # List all certificates
>>> certs = client.certificates.list()
>>>
>>> # Get a specific certificate
>>> cert = client.certificates.get(domain="example.com")
>>>
>>> # Create a self-signed certificate
>>> cert = client.certificates.create(
... domain="internal.local",
... cert_type="SelfSigned",
... description="Internal services"
... )
>>>
>>> # Create a Let's Encrypt certificate
>>> cert = client.certificates.create(
... domain="public.example.com",
... cert_type="LetsEncrypt",
... agree_tos=True,
... contact_user_key=1
... )
>>>
>>> # Renew a certificate
>>> cert = client.certificates.renew(cert.key, force=True)
"""
_endpoint = "certificates"
[docs]
def __init__(self, client: VergeClient) -> None:
super().__init__(client)
def _to_model(self, data: dict[str, Any]) -> Certificate:
"""Convert API response to Certificate object."""
return Certificate(data, self)
[docs]
def list(
self,
filter: str | None = None,
fields: builtins.list[str] | None = None,
limit: int | None = None,
offset: int | None = None,
*,
cert_type: str | None = None,
valid: bool | None = None,
include_keys: bool = False,
**filter_kwargs: Any,
) -> builtins.list[Certificate]:
"""List certificates with optional filtering.
Args:
filter: OData filter string.
fields: List of fields to return.
limit: Maximum number of results.
offset: Skip this many results.
cert_type: Filter by certificate type (Manual, LetsEncrypt, SelfSigned).
valid: Filter by valid status (True for valid only).
include_keys: Include sensitive key material (public, private, chain).
**filter_kwargs: Additional filter arguments.
Returns:
List of Certificate objects.
"""
params: dict[str, Any] = {}
filters: builtins.list[str] = []
# Build filter from string
if filter:
filters.append(filter)
# Filter by certificate type
if cert_type:
api_type = CERT_TYPE_MAP.get(cert_type, cert_type.lower())
filters.append(f"type eq '{api_type}'")
# Filter by valid status
if valid is not None:
filters.append(f"valid eq {str(valid).lower()}")
# Add filter kwargs
if filter_kwargs:
filters.append(build_filter(**filter_kwargs))
if filters:
params["filter"] = " and ".join(filters)
# Field selection
if fields:
params["fields"] = ",".join(fields)
else:
field_list = list(_DEFAULT_CERT_FIELDS)
if include_keys:
field_list.extend(_CERT_KEY_FIELDS)
params["fields"] = ",".join(field_list)
# Pagination
if limit is not None:
params["limit"] = limit
if offset is not None:
params["offset"] = offset
response = self._client._request("GET", self._endpoint, params=params)
if response is None:
return []
if not isinstance(response, list):
return [self._to_model(response)]
return [self._to_model(item) for item in response if item and item.get("$key")]
[docs]
def list_valid(self, **kwargs: Any) -> builtins.list[Certificate]:
"""List only valid (unexpired) certificates.
Args:
**kwargs: Additional arguments passed to list().
Returns:
List of valid Certificate objects.
"""
return self.list(valid=True, **kwargs)
[docs]
def list_by_type(
self,
cert_type: str,
**kwargs: Any,
) -> builtins.list[Certificate]:
"""List certificates by type.
Args:
cert_type: Certificate type (Manual, LetsEncrypt, SelfSigned).
**kwargs: Additional arguments passed to list().
Returns:
List of Certificate objects.
"""
return self.list(cert_type=cert_type, **kwargs)
[docs]
def list_expiring(
self,
days: int = 30,
**kwargs: Any,
) -> builtins.list[Certificate]:
"""List certificates expiring within specified days.
Args:
days: Number of days threshold.
**kwargs: Additional arguments passed to list().
Returns:
List of Certificate objects expiring within the threshold.
"""
certs = self.list(**kwargs)
return [
c for c in certs if c.days_until_expiry is not None and 0 <= c.days_until_expiry < days
]
[docs]
def list_expired(self, **kwargs: Any) -> builtins.list[Certificate]:
"""List expired certificates.
Args:
**kwargs: Additional arguments passed to list().
Returns:
List of expired Certificate objects.
"""
certs = self.list(**kwargs)
return [c for c in certs if c.days_until_expiry is not None and c.days_until_expiry < 0]
[docs]
def get( # type: ignore[override]
self,
key: int | None = None,
*,
domain: str | None = None,
fields: builtins.list[str] | None = None,
include_keys: bool = False,
) -> Certificate:
"""Get a certificate by key or domain.
Args:
key: Certificate $key (ID).
domain: Domain name (exact match).
fields: List of fields to return.
include_keys: Include sensitive key material.
Returns:
Certificate object.
Raises:
NotFoundError: If certificate not found.
ValueError: If neither key nor domain provided.
"""
# Build field list
if fields:
field_list = fields
else:
field_list = list(_DEFAULT_CERT_FIELDS)
if include_keys:
field_list.extend(_CERT_KEY_FIELDS)
if key is not None:
params: dict[str, Any] = {"fields": ",".join(field_list)}
response = self._client._request("GET", f"{self._endpoint}/{key}", params=params)
if response is None:
raise NotFoundError(f"Certificate with key {key} not found")
if not isinstance(response, dict):
raise NotFoundError(f"Certificate with key {key} returned invalid response")
return self._to_model(response)
if domain is not None:
# Escape single quotes in domain name
escaped_domain = domain.replace("'", "''")
results = self.list(filter=f"domain eq '{escaped_domain}'", fields=field_list, limit=1)
if not results:
# Try domainname field as fallback
results = self.list(
filter=f"domainname eq '{escaped_domain}'", fields=field_list, limit=1
)
if not results:
raise NotFoundError(f"Certificate for domain '{domain}' not found")
return results[0]
raise ValueError("Either key or domain must be provided")
[docs]
def create( # type: ignore[override]
self,
domain: str,
*,
cert_type: str = "SelfSigned",
domain_list: builtins.list[str] | str | None = None,
description: str | None = None,
public_key: str | None = None,
private_key: str | None = None,
chain: str | None = None,
acme_server: str | None = None,
eab_key_id: str | None = None,
eab_hmac_key: str | None = None,
key_type: str | None = None,
rsa_key_size: int | None = None,
contact_user_key: int | None = None,
agree_tos: bool = False,
) -> Certificate:
"""Create a new SSL/TLS certificate.
Args:
domain: Primary domain name for the certificate.
cert_type: Certificate type (Manual, LetsEncrypt, SelfSigned).
Default is SelfSigned.
domain_list: Additional domains (SANs) as list or comma-separated string.
description: Certificate description.
public_key: Public certificate in PEM format (required for Manual).
private_key: Private key in PEM format (required for Manual).
chain: Certificate chain in PEM format (optional for Manual).
acme_server: ACME server URL (for Let's Encrypt).
eab_key_id: External Account Binding key ID (for some ACME providers).
eab_hmac_key: External Account Binding HMAC key.
key_type: Key type (ECDSA or RSA). Default is ECDSA.
rsa_key_size: RSA key size (2048, 3072, or 4096). Default is 2048.
contact_user_key: Contact user $key for Let's Encrypt.
agree_tos: Accept Let's Encrypt Terms of Service (required for LE).
Returns:
Created Certificate object.
Raises:
ValidationError: If parameters invalid.
ConflictError: If certificate for domain already exists.
ValueError: If required parameters missing for certificate type.
"""
# Validate required parameters based on certificate type
api_type = CERT_TYPE_MAP.get(cert_type, cert_type.lower())
if api_type == "manual":
if not public_key:
raise ValueError("public_key is required for Manual certificate type")
if not private_key:
raise ValueError("private_key is required for Manual certificate type")
if api_type == "letsencrypt" and not agree_tos:
raise ValueError("agree_tos must be True for Let's Encrypt certificates")
# Build request body
body: dict[str, Any] = {
"domainname": domain,
"type": api_type,
}
# Add domain list (SANs)
if domain_list:
if isinstance(domain_list, list):
body["domainlist"] = ",".join(domain_list)
else:
body["domainlist"] = domain_list
# Add optional description
if description:
body["description"] = description
# Manual certificate fields
if api_type == "manual":
body["public"] = public_key
body["private"] = private_key
if chain:
body["chain"] = chain
# Let's Encrypt fields
if api_type == "letsencrypt":
body["agree_tos"] = True
if acme_server:
body["acme_server"] = acme_server
if eab_key_id:
body["eab_kid"] = eab_key_id
if eab_hmac_key:
body["eab_hmac_key"] = eab_hmac_key
if contact_user_key is not None:
body["contact"] = contact_user_key
# Key type for self-signed and Let's Encrypt
if api_type != "manual":
if key_type:
api_key_type = KEY_TYPE_MAP.get(key_type, key_type.lower())
body["key_type"] = api_key_type
if rsa_key_size is not None:
body["rsa_key_size"] = str(rsa_key_size)
response = self._client._request("POST", self._endpoint, json_data=body)
if response is None:
raise ValueError("No response from create operation")
if not isinstance(response, dict):
raise ValueError("Create operation returned invalid response")
# Fetch full object since POST response may not include all fields
key = response.get("$key")
if key is not None:
return self.get(int(key))
return self._to_model(response)
[docs]
def update( # type: ignore[override]
self,
key: int,
*,
description: str | None = None,
domain_list: builtins.list[str] | str | None = None,
public_key: str | None = None,
private_key: str | None = None,
chain: str | None = None,
acme_server: str | None = None,
eab_key_id: str | None = None,
eab_hmac_key: str | None = None,
key_type: str | None = None,
rsa_key_size: int | None = None,
contact_user_key: int | None = None,
agree_tos: bool | None = None,
) -> Certificate:
"""Update a certificate.
Args:
key: Certificate $key (ID).
description: New certificate description.
domain_list: New additional domains (SANs).
public_key: New public certificate (PEM format, manual certs only).
private_key: New private key (PEM format, manual certs only).
chain: New certificate chain (PEM format, manual certs only).
acme_server: New ACME server URL (Let's Encrypt only).
eab_key_id: New External Account Binding key ID.
eab_hmac_key: New External Account Binding HMAC key.
key_type: New key type (ECDSA or RSA).
rsa_key_size: New RSA key size (2048, 3072, or 4096).
contact_user_key: New contact user $key.
agree_tos: Accept Terms of Service.
Returns:
Updated Certificate object.
Raises:
NotFoundError: If certificate not found.
ValidationError: If parameters invalid.
"""
body: dict[str, Any] = {}
if description is not None:
body["description"] = description
if domain_list is not None:
if isinstance(domain_list, list):
body["domainlist"] = ",".join(domain_list)
else:
body["domainlist"] = domain_list
if public_key is not None:
body["public"] = public_key
if private_key is not None:
body["private"] = private_key
if chain is not None:
body["chain"] = chain
if acme_server is not None:
body["acme_server"] = acme_server
if eab_key_id is not None:
body["eab_kid"] = eab_key_id
if eab_hmac_key is not None:
body["eab_hmac_key"] = eab_hmac_key
if key_type is not None:
api_key_type = KEY_TYPE_MAP.get(key_type, key_type.lower())
body["key_type"] = api_key_type
if rsa_key_size is not None:
body["rsa_key_size"] = str(rsa_key_size)
if contact_user_key is not None:
body["contact"] = contact_user_key
if agree_tos is not None:
body["agree_tos"] = agree_tos
if not body:
# No changes, just fetch current
return self.get(key)
response = self._client._request("PUT", f"{self._endpoint}/{key}", json_data=body)
if response is None or not isinstance(response, dict):
return self.get(key)
return self._to_model(response)
[docs]
def delete(self, key: int) -> None:
"""Delete a certificate.
Args:
key: Certificate $key (ID).
Raises:
NotFoundError: If certificate not found.
"""
self._client._request("DELETE", f"{self._endpoint}/{key}")
[docs]
def renew(self, key: int, *, force: bool = False) -> Certificate:
"""Renew or regenerate a certificate.
For Let's Encrypt certificates, triggers ACME renewal.
For self-signed certificates, regenerates with a new key pair.
Manual certificates cannot be renewed (upload new keys instead).
Args:
key: Certificate $key (ID).
force: Force renewal even if not near expiration.
Returns:
Updated Certificate object after renewal.
Raises:
NotFoundError: If certificate not found.
ValueError: If certificate type doesn't support renewal.
"""
# Get current certificate to check type
cert = self.get(key)
if cert.cert_type == "manual":
raise ValueError(
"Manual certificates cannot be renewed. Use update() to upload new keys instead."
)
# Check if renewal is needed (unless forced)
if not force and cert.days_until_expiry is not None and cert.days_until_expiry > 30:
# Return current certificate - not near expiration
return cert
# Trigger renewal by setting renew=True
body = {"renew": True}
self._client._request("PUT", f"{self._endpoint}/{key}", json_data=body)
# Fetch updated certificate
return self.get(key)
[docs]
def refresh(self, key: int) -> Certificate:
"""Alias for renew() - refresh/renew a certificate.
Args:
key: Certificate $key (ID).
Returns:
Updated Certificate object after renewal.
"""
return self.renew(key, force=True)