"""Permission resource manager for VergeOS permissions."""
from __future__ import annotations
import builtins
from typing import TYPE_CHECKING, Any
from pyvergeos.exceptions import NotFoundError
from pyvergeos.filters import build_filter
from pyvergeos.resources.base import ResourceManager, ResourceObject
if TYPE_CHECKING:
from pyvergeos.client import VergeClient
from pyvergeos.resources.groups import Group
from pyvergeos.resources.users import User
[docs]
class Permission(ResourceObject):
"""Permission resource object.
Represents a permission grant in VergeOS that controls access to
resources for users or groups.
Attributes:
key: Permission primary key ($key).
identity_key: Identity key (links to user or group identity).
identity_name: Display name of the identity (user/group name).
table: Resource table name (e.g., 'vms', 'vnets', '/').
row_key: Specific row key (0 for table-level access).
row_display: Display name of the row (if row-specific).
is_table_level: Whether this is table-level (all rows) permission.
can_list: Whether list/see permission is granted.
can_read: Whether read permission is granted.
can_create: Whether create permission is granted.
can_modify: Whether modify permission is granted.
can_delete: Whether delete permission is granted.
"""
@property
def identity_key(self) -> int:
"""Get the identity key."""
return int(self.get("identity", 0))
@property
def identity_name(self) -> str | None:
"""Get the identity display name (user/group name)."""
return self.get("identity_display")
@property
def table(self) -> str:
"""Get the resource table name."""
return str(self.get("table", ""))
@property
def row_key(self) -> int:
"""Get the row key (0 for table-level access)."""
return int(self.get("row", 0))
@property
def row_display(self) -> str | None:
"""Get the row display name (if row-specific)."""
return self.get("rowdisplay")
@property
def is_table_level(self) -> bool:
"""Check if this is a table-level permission (applies to all rows)."""
return self.row_key == 0
@property
def can_list(self) -> bool:
"""Check if list permission is granted."""
return bool(self.get("list", False))
@property
def can_read(self) -> bool:
"""Check if read permission is granted."""
return bool(self.get("read", False))
@property
def can_create(self) -> bool:
"""Check if create permission is granted."""
return bool(self.get("create", False))
@property
def can_modify(self) -> bool:
"""Check if modify permission is granted."""
return bool(self.get("modify", False))
@property
def can_delete(self) -> bool:
"""Check if delete permission is granted."""
return bool(self.get("delete", False))
@property
def has_full_control(self) -> bool:
"""Check if all permissions are granted (full control)."""
return (
self.can_list
and self.can_read
and self.can_create
and self.can_modify
and self.can_delete
)
[docs]
def revoke(self) -> None:
"""Revoke this permission.
Raises:
ValueError: If permission key is not available.
"""
from typing import cast
manager = cast("PermissionManager", self._manager)
manager.revoke(self.key)
[docs]
class PermissionManager(ResourceManager[Permission]):
"""Manager for VergeOS permission operations.
Provides operations to list, grant, and revoke permissions for users
and groups on resources.
Example:
>>> # List all permissions for a user
>>> perms = client.permissions.list(user=user.key)
>>> # List permissions for a group on VMs
>>> perms = client.permissions.list(group=group.key, table="vms")
>>> # Grant read-only access to VMs
>>> client.permissions.grant(
... user=user.key,
... table="vms",
... can_list=True,
... can_read=True
... )
>>> # Grant full control to a group
>>> client.permissions.grant(
... group=group.key,
... table="vms",
... full_control=True
... )
>>> # Revoke a permission
>>> client.permissions.revoke(permission.key)
"""
_endpoint = "permissions"
# Default fields for list operations (matches PowerShell module)
_default_fields = [
"$key",
"identity",
"identity#owner#$display as identity_display",
"table",
"rowdisplay",
"row",
"list",
"read",
"create",
"modify",
"delete",
]
[docs]
def __init__(self, client: VergeClient) -> None:
super().__init__(client)
def _to_model(self, data: dict[str, Any]) -> Permission:
"""Convert API response to Permission object."""
return Permission(data, self)
def _resolve_identity(
self,
user: int | User | None = None,
group: int | Group | None = None,
identity_key: int | None = None,
) -> int | None:
"""Resolve user/group to identity key.
Args:
user: User key or User object.
group: Group key or Group object.
identity_key: Direct identity key.
Returns:
Identity key or None if not resolved.
"""
if identity_key is not None:
return identity_key
if user is not None:
# If it's a User object, get the identity directly
if hasattr(user, "identity"):
return user.identity
# If it's an int, we need to look up the user
user_obj = self._client.users.get(int(user))
return user_obj.identity
if group is not None:
# If it's a Group object, get the identity directly
if hasattr(group, "identity"):
return group.identity
# If it's an int, we need to look up the group
group_obj = self._client.groups.get(int(group))
return group_obj.identity
return None
[docs]
def list(
self,
filter: str | None = None,
fields: builtins.list[str] | None = None,
limit: int | None = None,
offset: int | None = None,
user: int | User | None = None,
group: int | Group | None = None,
identity_key: int | None = None,
table: str | None = None,
**filter_kwargs: Any,
) -> builtins.list[Permission]:
"""List permissions with optional filtering.
Args:
filter: OData filter string.
fields: List of fields to return (uses defaults if not specified).
limit: Maximum number of results.
offset: Skip this many results.
user: User key or User object to filter by.
group: Group key or Group object to filter by.
identity_key: Identity key to filter by directly.
table: Resource table name to filter by (e.g., 'vms', 'vnets').
**filter_kwargs: Shorthand filter arguments.
Returns:
List of Permission objects.
Example:
>>> # List all permissions
>>> perms = client.permissions.list()
>>> # List permissions for a user
>>> perms = client.permissions.list(user=user.key)
>>> # List permissions for a group on VMs
>>> perms = client.permissions.list(group=group.key, table="vms")
>>> # List permissions by identity key directly
>>> perms = client.permissions.list(identity_key=123)
"""
params: dict[str, Any] = {}
# Build filter
filters: builtins.list[str] = []
if filter:
filters.append(filter)
if filter_kwargs:
filters.append(build_filter(**filter_kwargs))
# Resolve identity from user/group
resolved_identity = self._resolve_identity(user, group, identity_key)
if resolved_identity is not None:
filters.append(f"identity eq {resolved_identity}")
# Add table filter
if table is not None:
escaped_table = table.replace("'", "''")
filters.append(f"table eq '{escaped_table}'")
if filters:
params["filter"] = " and ".join(filters)
# Use default fields if not specified
if fields:
params["fields"] = ",".join(fields)
else:
params["fields"] = ",".join(self._default_fields)
# Pagination
if limit is not None:
params["limit"] = limit
if offset is not None:
params["offset"] = offset
response = self._client._request("GET", self._endpoint, params=params)
if response is None:
return []
if not isinstance(response, list):
response = [response]
return [self._to_model(item) for item in response if item]
[docs]
def get( # type: ignore[override]
self,
key: int,
*,
fields: builtins.list[str] | None = None,
) -> Permission:
"""Get a single permission by key.
Args:
key: Permission $key (ID).
fields: List of fields to return.
Returns:
Permission object.
Raises:
NotFoundError: If permission not found.
Example:
>>> perm = client.permissions.get(123)
"""
params: dict[str, Any] = {}
if fields:
params["fields"] = ",".join(fields)
else:
params["fields"] = ",".join(self._default_fields)
response = self._client._request("GET", f"{self._endpoint}/{key}", params=params)
if response is None:
raise NotFoundError(f"Permission with key {key} not found")
if not isinstance(response, dict):
raise NotFoundError(f"Permission with key {key} returned invalid response")
return self._to_model(response)
[docs]
def grant(
self,
table: str,
*,
user: int | User | None = None,
group: int | Group | None = None,
identity_key: int | None = None,
row_key: int = 0,
can_list: bool = True,
can_read: bool = False,
can_create: bool = False,
can_modify: bool = False,
can_delete: bool = False,
full_control: bool = False,
) -> Permission:
"""Grant permission to a user or group.
Args:
table: Resource table to grant access to (e.g., 'vms', 'vnets', '/').
user: User key or User object to grant to.
group: Group key or Group object to grant to.
identity_key: Identity key to grant to directly.
row_key: Specific row key (0 for table-level access to all rows).
can_list: Grant permission to list/see items. Default True.
can_read: Grant permission to read item details. Default False.
can_create: Grant permission to create new items. Default False.
can_modify: Grant permission to modify existing items. Default False.
can_delete: Grant permission to delete items. Default False.
full_control: Grant all permissions (overrides individual flags).
Returns:
Created Permission object.
Raises:
ValueError: If no user, group, or identity_key provided.
ConflictError: If permission already exists.
Example:
>>> # Grant read-only access to VMs for a user
>>> perm = client.permissions.grant(
... table="vms",
... user=user.key,
... can_list=True,
... can_read=True
... )
>>> # Grant full control to a group
>>> perm = client.permissions.grant(
... table="vms",
... group=group.key,
... full_control=True
... )
>>> # Grant root access (all resources)
>>> perm = client.permissions.grant(
... table="/",
... user=user.key,
... can_list=True,
... can_read=True
... )
>>> # Grant access to a specific VM only
>>> perm = client.permissions.grant(
... table="vms",
... user=user.key,
... row_key=vm.key,
... full_control=True
... )
"""
# Resolve identity
resolved_identity = self._resolve_identity(user, group, identity_key)
if resolved_identity is None:
raise ValueError("Either user, group, or identity_key must be provided")
# Build request body
body: dict[str, Any] = {
"identity": resolved_identity,
"table": table,
"row": row_key,
}
# Set permission flags
if full_control:
body["list"] = True
body["read"] = True
body["create"] = True
body["modify"] = True
body["delete"] = True
else:
body["list"] = can_list
body["read"] = can_read
body["create"] = can_create
body["modify"] = can_modify
body["delete"] = can_delete
response = self._client._request("POST", self._endpoint, json_data=body)
# Get the created permission
if response and isinstance(response, dict):
perm_key = response.get("$key")
if perm_key:
return self.get(key=int(perm_key))
# Fallback: search by identity and table
perms = self.list(identity_key=resolved_identity, table=table)
if perms:
# Find the one with matching row
for p in perms:
if p.row_key == row_key:
return p
# If no exact row match, return the first one
return perms[0]
raise NotFoundError("Failed to retrieve created permission")
[docs]
def revoke(self, key: int) -> None:
"""Revoke (delete) a permission.
Args:
key: Permission $key (ID) to revoke.
Example:
>>> client.permissions.revoke(perm.key)
"""
self._client._request("DELETE", f"{self._endpoint}/{key}")
[docs]
def revoke_for_user(
self,
user: int | User,
table: str | None = None,
) -> int:
"""Revoke all permissions for a user, optionally filtered by table.
Args:
user: User key or User object.
table: Optional table name to filter by.
Returns:
Number of permissions revoked.
Example:
>>> # Revoke all permissions for user
>>> count = client.permissions.revoke_for_user(user.key)
>>> # Revoke only VM permissions for user
>>> count = client.permissions.revoke_for_user(user.key, table="vms")
"""
perms = self.list(user=user, table=table)
count = 0
for perm in perms:
self.revoke(perm.key)
count += 1
return count
[docs]
def revoke_for_group(
self,
group: int | Group,
table: str | None = None,
) -> int:
"""Revoke all permissions for a group, optionally filtered by table.
Args:
group: Group key or Group object.
table: Optional table name to filter by.
Returns:
Number of permissions revoked.
Example:
>>> # Revoke all permissions for group
>>> count = client.permissions.revoke_for_group(group.key)
>>> # Revoke only network permissions for group
>>> count = client.permissions.revoke_for_group(group.key, table="vnets")
"""
perms = self.list(group=group, table=table)
count = 0
for perm in perms:
self.revoke(perm.key)
count += 1
return count