"""Resource group management for VergeOS hardware device pools."""
from __future__ import annotations
import builtins
import logging
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Any, Literal
from pyvergeos.exceptions import NotFoundError
from pyvergeos.resources.base import ResourceManager, ResourceObject
if TYPE_CHECKING:
from pyvergeos.client import VergeClient
logger = logging.getLogger(__name__)
# Device type mappings
DEVICE_TYPE_MAP = {
"node_pci_devices": "PCI",
"node_sriov_nic_devices": "SR-IOV NIC",
"node_usb_devices": "USB",
"node_host_gpu_devices": "Host GPU",
"node_nvidia_vgpu_devices": "NVIDIA vGPU",
}
DEVICE_TYPE_REVERSE_MAP = {v: k for k, v in DEVICE_TYPE_MAP.items()}
# Device class mappings
DEVICE_CLASS_MAP = {
"gpu": "GPU",
"vgpu": "vGPU",
"storage": "Storage",
"hid": "Human Input Device",
"usb": "Generic USB Device",
"network": "Network",
"media": "Media",
"audio": "Audio",
"fpga": "FPGA",
"pci": "Generic PCI",
"unknown": "Unknown",
}
DEVICE_CLASS_REVERSE_MAP = {v.lower(): k for k, v in DEVICE_CLASS_MAP.items()}
[docs]
class ResourceGroup(ResourceObject):
"""Represents a resource group in VergeOS.
Resource groups define collections of hardware devices (GPU, PCI, USB,
SR-IOV NIC, vGPU) that can be assigned to virtual machines for device
passthrough.
Resource groups are read-only - they are typically configured through
the VergeOS UI to associate physical devices with virtual machines.
Note:
Resource groups use UUID as their primary key, not integer IDs.
The `key` property returns the UUID string.
"""
@property
def key(self) -> str: # type: ignore[override]
"""Resource group key (UUID).
Resource groups use UUID as their primary identifier.
"""
uuid = self.get("uuid")
if uuid is None:
raise ValueError("Resource group has no UUID")
return str(uuid)
@property
def uuid(self) -> str:
"""Resource group UUID (same as key)."""
return str(self.get("uuid", ""))
@property
def name(self) -> str:
"""Resource group name."""
return str(self.get("name", ""))
@property
def description(self) -> str:
"""Resource group description."""
return str(self.get("description", ""))
@property
def device_type(self) -> str:
"""Device type (raw API value).
One of: node_pci_devices, node_sriov_nic_devices, node_usb_devices,
node_host_gpu_devices, node_nvidia_vgpu_devices
"""
return str(self.get("type", ""))
@property
def device_type_display(self) -> str:
"""Human-readable device type.
One of: PCI, SR-IOV NIC, USB, Host GPU, NVIDIA vGPU
"""
type_display = self.get("type_display")
if type_display:
return str(type_display)
return DEVICE_TYPE_MAP.get(self.device_type, self.device_type)
@property
def device_class(self) -> str:
"""Device class (raw API value).
One of: gpu, vgpu, storage, hid, usb, network, media, audio, fpga, pci, unknown
"""
return str(self.get("class", ""))
@property
def device_class_display(self) -> str:
"""Human-readable device class.
One of: GPU, vGPU, Storage, Human Input Device, Generic USB Device,
Network, Media, Audio, FPGA, Generic PCI, Unknown
"""
class_display = self.get("class_display")
if class_display:
return str(class_display)
return DEVICE_CLASS_MAP.get(self.device_class, self.device_class)
@property
def is_enabled(self) -> bool:
"""Whether the resource group is enabled."""
return bool(self.get("enabled", False))
@property
def resource_count(self) -> int:
"""Number of resources (devices) in this group."""
return int(self.get("resource_count", 0))
@property
def created_at(self) -> datetime | None:
"""Creation timestamp."""
ts = self.get("created")
if ts:
return datetime.fromtimestamp(int(ts), tz=timezone.utc)
return None
@property
def modified_at(self) -> datetime | None:
"""Last modification timestamp."""
ts = self.get("modified")
if ts:
return datetime.fromtimestamp(int(ts), tz=timezone.utc)
return None
@property
def rules(self) -> ResourceRuleManager:
"""Access resource rules for this resource group.
Returns:
ResourceRuleManager scoped to this resource group.
Example:
>>> # List rules for this group
>>> for rule in group.rules.list():
... print(f"{rule.name}: {rule.filter_expression}")
>>> # Create a rule
>>> rule = group.rules.create(
... name="Intel NICs",
... filter_expression="vendor ct 'Intel'",
... )
"""
from typing import cast
manager = cast("ResourceGroupManager", self._manager)
return ResourceRuleManager(manager._client, self.key)
[docs]
def refresh(self) -> ResourceGroup:
"""Refresh this resource group's data from the server."""
from typing import cast
manager = cast("ResourceGroupManager", self._manager)
return manager.get(self.key)
[docs]
def save(self, **kwargs: Any) -> ResourceGroup:
"""Update this resource group with the given values."""
from typing import cast
manager = cast("ResourceGroupManager", self._manager)
return manager.update(self.key, **kwargs)
[docs]
def delete(self) -> None:
"""Delete this resource group."""
from typing import cast
manager = cast("ResourceGroupManager", self._manager)
manager.delete(self.key)
def __repr__(self) -> str:
return (
f"<ResourceGroup uuid={self.uuid!r} name={self.name!r} "
f"type={self.device_type_display!r} class={self.device_class_display!r}>"
)
[docs]
class ResourceGroupManager(ResourceManager[ResourceGroup]):
"""Manages resource groups in VergeOS.
Resource groups define collections of hardware devices (GPU, PCI, USB,
SR-IOV NIC, vGPU) that can be assigned to VMs for device passthrough.
Device Types:
- PCI: General PCI passthrough (network cards, storage controllers, etc.)
- SR-IOV NIC: SR-IOV enabled NICs for direct network virtualization
- USB: USB device passthrough
- Host GPU: Full GPU passthrough to a single VM
- NVIDIA vGPU: NVIDIA vGPU for GPU sharing across multiple VMs
Device Classes:
GPU, vGPU, Storage, Human Input Device, Generic USB Device, Network,
Media, Audio, FPGA, Generic PCI, Unknown
Example:
>>> # List all resource groups
>>> for rg in client.resource_groups.list():
... print(f"{rg.name}: {rg.device_type_display} ({rg.device_class_display})")
>>> # Get a specific resource group
>>> gpu_group = client.resource_groups.get(name="GPU Pool")
>>> print(f"Devices: {gpu_group.resource_count}")
>>> # Filter by device type
>>> gpu_groups = client.resource_groups.list_by_type("Host GPU")
>>> pci_groups = client.resource_groups.list_by_type("PCI")
>>> # Filter by device class
>>> network_groups = client.resource_groups.list_by_class("Network")
>>> # List enabled groups only
>>> enabled_groups = client.resource_groups.list_enabled()
"""
_endpoint = "resource_groups"
# Default fields for list operations
_default_fields = [
"$key",
"uuid",
"name",
"description",
"type",
"display(type) as type_display",
"class",
"display(class) as class_display",
"enabled",
"count(resources) as resource_count",
"created",
"modified",
]
def _to_model(self, data: dict[str, Any]) -> ResourceGroup:
return ResourceGroup(data, self)
[docs]
def list( # noqa: A003
self,
filter: str | None = None, # noqa: A002
fields: builtins.list[str] | None = None,
limit: int | None = None,
offset: int | None = None,
**filter_kwargs: Any,
) -> builtins.list[ResourceGroup]:
"""List resource groups with optional filtering.
Args:
filter: OData filter string.
fields: List of fields to return.
limit: Maximum number of results.
offset: Skip this many results.
**filter_kwargs: Shorthand filter arguments.
Returns:
List of ResourceGroup objects.
Example:
>>> # List all resource groups
>>> groups = client.resource_groups.list()
>>> # Filter by enabled status
>>> enabled = client.resource_groups.list(enabled=True)
>>> # Filter with OData
>>> gpu_groups = client.resource_groups.list(
... filter="class eq 'gpu' and enabled eq true"
... )
"""
if fields is None:
fields = self._default_fields
return super().list(
filter=filter, fields=fields, limit=limit, offset=offset, **filter_kwargs
)
[docs]
def get(
self,
key: str | int | None = None,
*,
name: str | None = None,
uuid: str | None = None,
fields: builtins.list[str] | None = None,
) -> ResourceGroup:
"""Get a resource group by key (UUID), name, or UUID.
Args:
key: Resource group key (UUID string).
name: Resource group name.
uuid: Resource group UUID (alias for key).
fields: List of fields to return.
Returns:
ResourceGroup object.
Raises:
NotFoundError: If resource group not found.
ValueError: If no identifier provided.
Example:
>>> # Get by UUID key
>>> rg = client.resource_groups.get("12345678-1234-1234-1234-123456789abc")
>>> # Get by name
>>> rg = client.resource_groups.get(name="GPU Pool")
>>> # Get by uuid parameter (same as key)
>>> rg = client.resource_groups.get(uuid="12345678-1234-1234-1234-123456789abc")
"""
if fields is None:
fields = self._default_fields
# key parameter is actually UUID for resource groups
if key is not None:
key_str = str(key).lower()
results = self.list(filter=f"uuid eq '{key_str}'", fields=fields, limit=1)
if not results:
raise NotFoundError(f"Resource group with UUID '{key}' not found")
return results[0]
if name is not None:
# Search by name
escaped_name = name.replace("'", "''")
results = self.list(filter=f"name eq '{escaped_name}'", fields=fields, limit=1)
if not results:
raise NotFoundError(f"Resource group with name '{name}' not found")
return results[0]
if uuid is not None:
# Search by UUID (alias for key)
uuid_lower = uuid.lower()
results = self.list(filter=f"uuid eq '{uuid_lower}'", fields=fields, limit=1)
if not results:
raise NotFoundError(f"Resource group with UUID '{uuid}' not found")
return results[0]
raise ValueError("Either key, name, or uuid must be provided")
[docs]
def list_enabled(
self,
fields: builtins.list[str] | None = None,
) -> builtins.list[ResourceGroup]:
"""List enabled resource groups.
Args:
fields: List of fields to return.
Returns:
List of enabled ResourceGroup objects.
Example:
>>> enabled_groups = client.resource_groups.list_enabled()
>>> for rg in enabled_groups:
... print(f"{rg.name}: {rg.device_type_display}")
"""
return self.list(filter="enabled eq true", fields=fields)
[docs]
def list_disabled(
self,
fields: builtins.list[str] | None = None,
) -> builtins.list[ResourceGroup]:
"""List disabled resource groups.
Args:
fields: List of fields to return.
Returns:
List of disabled ResourceGroup objects.
"""
return self.list(filter="enabled eq false", fields=fields)
[docs]
def list_by_type(
self,
device_type: str,
*,
enabled: bool | None = None,
fields: builtins.list[str] | None = None,
) -> builtins.list[ResourceGroup]:
"""List resource groups by device type.
Args:
device_type: Device type - either display name (PCI, SR-IOV NIC, USB,
Host GPU, NVIDIA vGPU) or API value (node_pci_devices, etc.)
enabled: Filter by enabled status (optional).
fields: List of fields to return.
Returns:
List of ResourceGroup objects matching the device type.
Example:
>>> # List all Host GPU resource groups
>>> gpu_groups = client.resource_groups.list_by_type("Host GPU")
>>> # List enabled PCI resource groups
>>> pci_groups = client.resource_groups.list_by_type("PCI", enabled=True)
>>> # Use API value
>>> usb_groups = client.resource_groups.list_by_type("node_usb_devices")
"""
# Convert display name to API value if needed
api_type = DEVICE_TYPE_REVERSE_MAP.get(device_type, device_type)
filters = [f"type eq '{api_type}'"]
if enabled is not None:
filters.append(f"enabled eq {str(enabled).lower()}")
return self.list(filter=" and ".join(filters), fields=fields)
[docs]
def list_by_class(
self,
device_class: str,
*,
enabled: bool | None = None,
fields: builtins.list[str] | None = None,
) -> builtins.list[ResourceGroup]:
"""List resource groups by device class.
Args:
device_class: Device class - either display name (GPU, vGPU, Storage,
Network, etc.) or API value (gpu, vgpu, storage, network, etc.)
enabled: Filter by enabled status (optional).
fields: List of fields to return.
Returns:
List of ResourceGroup objects matching the device class.
Example:
>>> # List all GPU class resource groups
>>> gpu_groups = client.resource_groups.list_by_class("GPU")
>>> # List enabled Network resource groups
>>> net_groups = client.resource_groups.list_by_class("Network", enabled=True)
>>> # List storage device groups
>>> storage_groups = client.resource_groups.list_by_class("Storage")
"""
# Convert display name to API value if needed
api_class = DEVICE_CLASS_REVERSE_MAP.get(device_class.lower(), device_class.lower())
filters = [f"class eq '{api_class}'"]
if enabled is not None:
filters.append(f"enabled eq {str(enabled).lower()}")
return self.list(filter=" and ".join(filters), fields=fields)
[docs]
def create( # type: ignore[override]
self,
name: str,
device_type: Literal[
"node_pci_devices",
"node_sriov_nic_devices",
"node_usb_devices",
"node_host_gpu_devices",
"node_nvidia_vgpu_devices",
],
*,
description: str = "",
device_class: str = "unknown",
enabled: bool = True,
settings: dict[str, Any] | None = None,
device_keys: builtins.list[int] | None = None,
) -> ResourceGroup:
"""Create a new resource group.
Args:
name: Resource group name.
device_type: Type of devices in this group.
description: Resource group description.
device_class: Device classification for UI icon.
enabled: Whether the resource group is enabled.
settings: Type-specific settings (see create_* methods for details).
device_keys: List of device keys to auto-create rules from.
Returns:
Created ResourceGroup object.
Example:
>>> # Create a PCI device group
>>> group = client.resource_groups.create(
... name="GPU Pool",
... device_type="node_host_gpu_devices",
... device_class="gpu",
... )
"""
body: dict[str, Any] = {
"name": name,
"type": device_type,
"enabled": enabled,
}
if description:
body["description"] = description
if device_class:
# Convert display name to API value if needed
api_class = DEVICE_CLASS_REVERSE_MAP.get(device_class.lower(), device_class.lower())
body["class"] = api_class
if settings:
body["settings_args"] = settings
if device_keys:
body["key_args"] = device_keys
response = self._client._request("POST", self._endpoint, json_data=body)
if response is None:
raise ValueError("No response from create operation")
if not isinstance(response, dict):
raise ValueError("Create operation returned invalid response")
# Fetch the full object by name (POST response doesn't include UUID)
return self.get(name=name)
[docs]
def create_nvidia_vgpu(
self,
name: str,
driver_file: int,
*,
nvidia_vgpu_profile: int | None = None,
make_guest_driver_iso: bool = False,
driver_iso: int | None = None,
scheduler_policy: Literal["1", "2", "3"] = "1",
strict_round_robin: Literal["", "0", "1"] = "",
frequency: int = 0,
averaging_factor: int = 33,
time_slice_length_ns: int = 0,
description: str = "",
enabled: bool = True,
device_keys: builtins.list[int] | None = None,
) -> ResourceGroup:
"""Create an NVIDIA vGPU resource group.
Args:
name: Resource group name.
driver_file: File key for the NVIDIA vGPU driver bundle.
nvidia_vgpu_profile: vGPU profile key to use.
make_guest_driver_iso: Auto-create guest driver ISO from bundle.
driver_iso: Pre-existing guest driver ISO file key.
scheduler_policy: vGPU scheduler policy (1=Best Effort, 2=Equal Share, 3=Fixed Share).
strict_round_robin: Enable strict round robin ("", "0", "1").
frequency: Scheduler frequency (0-960).
averaging_factor: Averaging factor (1-60, default 33).
time_slice_length_ns: Time slice length in nanoseconds (0-30000000).
description: Resource group description.
enabled: Whether the resource group is enabled.
device_keys: List of PCI device keys to auto-create rules from.
Returns:
Created ResourceGroup object.
Example:
>>> group = client.resource_groups.create_nvidia_vgpu(
... name="vGPU A100 Pool",
... driver_file=driver.key,
... nvidia_vgpu_profile=profile.key,
... make_guest_driver_iso=True,
... )
"""
settings: dict[str, Any] = {
"driver_file": driver_file,
"scheduler_policy": scheduler_policy,
"strict_round_robin": strict_round_robin,
"frequency": frequency,
"averaging_factor": averaging_factor,
"time_slice_length_ns": time_slice_length_ns,
}
if nvidia_vgpu_profile is not None:
settings["nvidia_vgpu_profile"] = nvidia_vgpu_profile
if make_guest_driver_iso:
settings["make_guest_driver_iso"] = make_guest_driver_iso
if driver_iso is not None:
settings["driver_iso"] = driver_iso
return self.create(
name=name,
device_type="node_nvidia_vgpu_devices",
description=description,
device_class="vgpu",
enabled=enabled,
settings=settings,
device_keys=device_keys,
)
[docs]
def create_usb(
self,
name: str,
*,
allow_guest_reset: bool = True,
allow_guest_reset_all: bool = False,
device_class: str = "usb",
description: str = "",
enabled: bool = True,
device_keys: builtins.list[int] | None = None,
) -> ResourceGroup:
"""Create a USB device resource group.
Args:
name: Resource group name.
allow_guest_reset: Allow VM to reset the USB device.
allow_guest_reset_all: Allow VM to reset the USB hub.
device_class: Device classification for UI icon.
description: Resource group description.
enabled: Whether the resource group is enabled.
device_keys: List of USB device keys to auto-create rules from.
Returns:
Created ResourceGroup object.
Example:
>>> group = client.resource_groups.create_usb(
... name="USB License Keys",
... device_class="hid",
... device_keys=[usb_device.key],
... )
"""
settings = {
"guest_reset": allow_guest_reset,
"guest_resets_all": allow_guest_reset_all,
}
return self.create(
name=name,
device_type="node_usb_devices",
description=description,
device_class=device_class,
enabled=enabled,
settings=settings,
device_keys=device_keys,
)
[docs]
def create_sriov_nic(
self,
name: str,
*,
vf_count: int = 1,
native_vlan: int = 0,
vlan_qos: int = 0,
vlan_protocol: Literal["802.1Q", "802.1ad"] = "802.1Q",
max_tx_rate: int = 0,
min_tx_rate: int = 0,
trust: Literal["default", "on", "off"] = "off",
spoof_checking: Literal["default", "on", "off"] = "on",
query_rss: Literal["default", "on", "off"] = "default",
virtual_link_state: Literal["default", "auto", "enable", "disable"] = "default",
mac_allow_override: bool = True,
vlan_allow_override: bool = False,
qos_allow_override: bool = False,
proto_allow_override: bool = False,
max_tx_rate_allow_override: bool = False,
min_tx_rate_allow_override: bool = False,
trust_allow_override: bool = False,
spoofchk_allow_override: bool = False,
query_rss_allow_override: bool = False,
state_allow_override: bool = False,
description: str = "",
enabled: bool = True,
device_keys: builtins.list[int] | None = None,
) -> ResourceGroup:
"""Create an SR-IOV NIC resource group.
Args:
name: Resource group name.
vf_count: Number of VFs to create per physical device.
native_vlan: Default VLAN tag (0 disables VLAN tagging).
vlan_qos: VLAN QOS priority (0-7).
vlan_protocol: VLAN protocol (802.1Q or 802.1ad for QinQ).
max_tx_rate: Maximum transmit bandwidth in Mbps (0 disables).
min_tx_rate: Minimum transmit bandwidth in Mbps (0 disables).
trust: Enable VF trust mode for special features.
spoof_checking: Enable MAC spoof checking.
query_rss: Allow querying RSS configuration.
virtual_link_state: Virtual link state (auto mirrors PF state).
mac_allow_override: Allow VMs to override MAC address.
vlan_allow_override: Allow VMs to override VLAN.
qos_allow_override: Allow VMs to override QOS.
proto_allow_override: Allow VMs to override protocol.
max_tx_rate_allow_override: Allow VMs to override max TX rate.
min_tx_rate_allow_override: Allow VMs to override min TX rate.
trust_allow_override: Allow VMs to override trust.
spoofchk_allow_override: Allow VMs to override spoof checking.
query_rss_allow_override: Allow VMs to override query RSS.
state_allow_override: Allow VMs to override link state.
description: Resource group description.
enabled: Whether the resource group is enabled.
device_keys: List of PCI device keys to auto-create rules from.
Returns:
Created ResourceGroup object.
Example:
>>> group = client.resource_groups.create_sriov_nic(
... name="SR-IOV NIC Pool",
... vf_count=8,
... native_vlan=100,
... device_keys=[pci_device.key],
... )
"""
settings = {
"numvfs": vf_count,
"vlan": native_vlan,
"qos": vlan_qos,
"proto": vlan_protocol,
"max_tx_rate": max_tx_rate,
"min_tx_rate": min_tx_rate,
"trust": trust,
"spoofchk": spoof_checking,
"query_rss": query_rss,
"state": virtual_link_state,
"macaddress_allow_override": mac_allow_override,
"vlan_allow_override": vlan_allow_override,
"qos_allow_override": qos_allow_override,
"proto_allow_override": proto_allow_override,
"max_tx_rate_allow_override": max_tx_rate_allow_override,
"min_tx_rate_allow_override": min_tx_rate_allow_override,
"trust_allow_override": trust_allow_override,
"spoofchk_allow_override": spoofchk_allow_override,
"query_rss_allow_override": query_rss_allow_override,
"state_allow_override": state_allow_override,
}
return self.create(
name=name,
device_type="node_sriov_nic_devices",
description=description,
device_class="network",
enabled=enabled,
settings=settings,
device_keys=device_keys,
)
[docs]
def create_pci(
self,
name: str,
*,
device_class: str = "pci",
description: str = "",
enabled: bool = True,
device_keys: builtins.list[int] | None = None,
) -> ResourceGroup:
"""Create a PCI passthrough resource group.
Args:
name: Resource group name.
device_class: Device classification for UI icon (gpu, storage, network, pci, etc.).
description: Resource group description.
enabled: Whether the resource group is enabled.
device_keys: List of PCI device keys to auto-create rules from.
Returns:
Created ResourceGroup object.
Example:
>>> group = client.resource_groups.create_pci(
... name="NVMe Controllers",
... device_class="storage",
... device_keys=[pci_device.key],
... )
"""
return self.create(
name=name,
device_type="node_pci_devices",
description=description,
device_class=device_class,
enabled=enabled,
device_keys=device_keys,
)
[docs]
def create_host_gpu(
self,
name: str,
*,
description: str = "",
enabled: bool = True,
device_keys: builtins.list[int] | None = None,
) -> ResourceGroup:
"""Create a host GPU passthrough resource group.
Args:
name: Resource group name.
description: Resource group description.
enabled: Whether the resource group is enabled.
device_keys: List of GPU device keys to auto-create rules from.
Returns:
Created ResourceGroup object.
Example:
>>> group = client.resource_groups.create_host_gpu(
... name="GPU Passthrough Pool",
... device_keys=[gpu_device.key],
... )
"""
return self.create(
name=name,
device_type="node_host_gpu_devices",
description=description,
device_class="gpu",
enabled=enabled,
device_keys=device_keys,
)
[docs]
def update(self, key: str, **kwargs: Any) -> ResourceGroup: # type: ignore[override]
"""Update a resource group.
Args:
key: Resource group UUID.
**kwargs: Fields to update. Common fields:
- name: Resource group name
- description: Description
- enabled: Whether enabled
- settings_args: Type-specific settings dict
Returns:
Updated ResourceGroup object.
Example:
>>> group = client.resource_groups.update(
... group.key,
... name="Renamed Pool",
... enabled=False,
... )
"""
response = self._client._request("PUT", f"{self._endpoint}/{key}", json_data=kwargs)
if response is None:
return self.get(key)
if not isinstance(response, dict):
return self.get(key)
return self._to_model(response)
[docs]
def delete(self, key: str) -> None: # type: ignore[override]
"""Delete a resource group.
Args:
key: Resource group UUID.
Raises:
ConflictError: If there are machine devices using this group.
Note:
All machine devices using this resource group must be removed first.
"""
self._client._request("DELETE", f"{self._endpoint}/{key}")
@property
def rules(self) -> ResourceRuleManager:
"""Access resource rules globally (unscoped).
Returns:
ResourceRuleManager for all resource rules.
Example:
>>> # List all resource rules
>>> rules = client.resource_groups.rules.list()
"""
return ResourceRuleManager(self._client)
# =============================================================================
# Resource Rules
# =============================================================================
[docs]
class ResourceRule(ResourceObject):
"""Resource rule that defines which devices belong to a resource group.
Resource rules use filter expressions to match devices by attributes
such as vendor, slot, serial number, etc.
"""
@property
def resource_group_key(self) -> int | None:
"""Parent resource group key."""
rg = self.get("resource_group")
return int(rg) if rg else None
@property
def resource_group_name(self) -> str:
"""Parent resource group name."""
return str(self.get("resource_group_display", ""))
@property
def name(self) -> str:
"""Rule name."""
return str(self.get("name", ""))
@property
def is_enabled(self) -> bool:
"""Whether the rule is enabled."""
return bool(self.get("enabled", True))
@property
def device_type(self) -> str:
"""Device type (raw API value)."""
return str(self.get("type", ""))
@property
def device_type_display(self) -> str:
"""Human-readable device type."""
type_display = self.get("type_display")
if type_display:
return str(type_display)
return DEVICE_TYPE_MAP.get(self.device_type, self.device_type)
@property
def node_key(self) -> int | None:
"""Node filter (None = all nodes)."""
node = self.get("node")
return int(node) if node else None
@property
def node_name(self) -> str:
"""Node name filter."""
return str(self.get("node_display", ""))
@property
def filter_expression(self) -> str:
"""OData-style filter expression for matching devices."""
return str(self.get("filter", ""))
@property
def filter_configuration(self) -> dict[str, Any]:
"""Structured filter configuration."""
config = self.get("filter_configuration")
return config if isinstance(config, dict) else {}
@property
def resource_count(self) -> int:
"""Number of devices matched by this rule."""
return int(self.get("resource_count", 0))
@property
def is_system_created(self) -> bool:
"""Whether this rule was auto-generated by the system."""
return bool(self.get("system_created", False))
@property
def modified_at(self) -> datetime | None:
"""Last modification timestamp."""
ts = self.get("modified")
if ts:
return datetime.fromtimestamp(int(ts), tz=timezone.utc)
return None
[docs]
def refresh(self) -> ResourceRule:
"""Refresh this rule's data from the server."""
from typing import cast
manager = cast("ResourceRuleManager", self._manager)
return manager.get(self.key)
[docs]
def save(self, **kwargs: Any) -> ResourceRule:
"""Update this rule with the given values."""
from typing import cast
manager = cast("ResourceRuleManager", self._manager)
return manager.update(self.key, **kwargs)
[docs]
def delete(self) -> None:
"""Delete this rule."""
from typing import cast
manager = cast("ResourceRuleManager", self._manager)
manager.delete(self.key)
def __repr__(self) -> str:
return (
f"<ResourceRule key={self.get('$key')} name={self.name!r} "
f"type={self.device_type_display!r} resources={self.resource_count}>"
)
[docs]
class ResourceRuleManager(ResourceManager[ResourceRule]):
"""Manages resource rules for device passthrough.
Resource rules define which physical devices belong to a resource group
using filter expressions that match device attributes.
Example:
>>> # List all rules for a resource group
>>> rules = resource_group.rules.list()
>>> # Create a rule matching devices by vendor
>>> rule = resource_group.rules.create(
... name="Intel NICs",
... filter="vendor ct 'Intel'",
... )
>>> # Create a rule for a specific PCI slot
>>> rule = resource_group.rules.create(
... name="GPU in slot 3",
... filter="slot eq '03:00.0'",
... node=node.key,
... )
"""
_endpoint = "resource_rules"
_default_fields = [
"$key",
"resource_group",
"display(resource_group) as resource_group_display",
"name",
"enabled",
"type",
"display(type) as type_display",
"node",
"display(node) as node_display",
"filter",
"filter_configuration",
"count(resources) as resource_count",
"system_created",
"modified",
]
[docs]
def __init__(self, client: VergeClient, resource_group_key: str | int | None = None) -> None:
super().__init__(client)
self._resource_group_key = resource_group_key
def _to_model(self, data: dict[str, Any]) -> ResourceRule:
return ResourceRule(data, self)
[docs]
def list( # noqa: A003
self,
filter: str | None = None, # noqa: A002
fields: builtins.list[str] | None = None,
limit: int | None = None,
offset: int | None = None,
*,
node_key: int | None = None,
enabled_only: bool = False,
**filter_kwargs: Any,
) -> builtins.list[ResourceRule]:
"""List resource rules.
Args:
filter: OData filter string.
fields: List of fields to return.
limit: Maximum number of results.
offset: Skip this many results.
node_key: Filter by node.
enabled_only: Only return enabled rules.
**filter_kwargs: Shorthand filter arguments.
Returns:
List of ResourceRule objects.
"""
if fields is None:
fields = self._default_fields
filters = []
if filter:
filters.append(filter)
if self._resource_group_key is not None:
# Resource groups use UUID strings, which need quotes in filters
rg_key = str(self._resource_group_key)
if "-" in rg_key: # UUID format needs quotes
filters.append(f"resource_group eq '{rg_key}'")
else: # Integer key (for backwards compatibility)
filters.append(f"resource_group eq {rg_key}")
if node_key is not None:
filters.append(f"node eq {node_key}")
if enabled_only:
filters.append("enabled eq true")
if filter_kwargs:
from pyvergeos.filters import build_filter
filters.append(build_filter(**filter_kwargs))
params: dict[str, Any] = {"fields": ",".join(fields)}
if filters:
params["filter"] = " and ".join(filters)
if limit is not None:
params["limit"] = limit
if offset is not None:
params["offset"] = offset
response = self._client._request("GET", self._endpoint, params=params)
if response is None:
return []
if isinstance(response, list):
return [self._to_model(item) for item in response]
return [self._to_model(response)]
[docs]
def get(
self,
key: int | None = None,
*,
name: str | None = None,
fields: builtins.list[str] | None = None,
) -> ResourceRule:
"""Get a resource rule by key or name.
Args:
key: Rule $key (ID).
name: Rule name.
fields: List of fields to return.
Returns:
ResourceRule object.
Raises:
NotFoundError: If rule not found.
ValueError: If no identifier provided.
"""
if fields is None:
fields = self._default_fields
if key is not None:
params: dict[str, Any] = {"fields": ",".join(fields)}
response = self._client._request("GET", f"{self._endpoint}/{key}", params=params)
if response is None:
raise NotFoundError(f"Resource rule with key {key} not found")
if not isinstance(response, dict):
raise NotFoundError(f"Resource rule with key {key} returned invalid response")
return self._to_model(response)
if name is not None:
escaped_name = name.replace("'", "''")
results = self.list(filter=f"name eq '{escaped_name}'", fields=fields, limit=1)
if not results:
raise NotFoundError(f"Resource rule with name '{name}' not found")
return results[0]
raise ValueError("Either key or name must be provided")
[docs]
def create( # type: ignore[override]
self,
name: str,
filter_expression: str,
*,
node: int | None = None,
enabled: bool = True,
auto_create_from_device: int | None = None,
) -> ResourceRule:
"""Create a new resource rule.
Args:
name: Rule name.
filter_expression: OData-style filter for matching devices.
Examples: "vendor ct 'Intel'", "slot eq '03:00.0'",
"vendor_device_hex eq '8086:8c31'"
node: Restrict to a specific node (None = all nodes).
enabled: Whether the rule is enabled.
auto_create_from_device: Device key to auto-generate filter from.
Returns:
Created ResourceRule object.
Raises:
ValueError: If not scoped to a resource group.
Example:
>>> rule = group.rules.create(
... name="Intel X710 NICs",
... filter_expression="vendor ct 'Intel' and device ct 'X710'",
... )
"""
if self._resource_group_key is None:
raise ValueError(
"Must use a scoped ResourceRuleManager to create rules. "
"Access via resource_group.rules.create()"
)
body: dict[str, Any] = {
"resource_group": self._resource_group_key,
"name": name,
"filter": filter_expression,
"enabled": enabled,
}
if node is not None:
body["node"] = node
if auto_create_from_device is not None:
body["auto_create_based_on"] = auto_create_from_device
response = self._client._request("POST", self._endpoint, json_data=body)
if response is None:
raise ValueError("No response from create operation")
if not isinstance(response, dict):
raise ValueError("Create operation returned invalid response")
rule = self._to_model(response)
return self.get(rule.key)
[docs]
def update(self, key: int, **kwargs: Any) -> ResourceRule:
"""Update a resource rule.
Args:
key: Rule $key (ID).
**kwargs: Fields to update. Common fields:
- name: Rule name
- filter: Filter expression
- node: Node filter (None = all nodes)
- enabled: Whether enabled
Returns:
Updated ResourceRule object.
"""
response = self._client._request("PUT", f"{self._endpoint}/{key}", json_data=kwargs)
if response is None:
return self.get(key)
if not isinstance(response, dict):
return self.get(key)
return self._to_model(response)
[docs]
def delete(self, key: int) -> None:
"""Delete a resource rule.
Args:
key: Rule $key (ID).
"""
self._client._request("DELETE", f"{self._endpoint}/{key}")