"""API for working with the clients. Since we have a frontend and a REST API, it makes sense to have a generic library to work with the clients. """ import logging from collections.abc import Iterator from contextlib import contextmanager from sshecret.backend import ( AuditLog, AuditListResult, Client, ClientFilter, SshecretBackend, Operation, SubSystem, ) from sshecret.backend.models import DetailedSecrets, Secret from sshecret.backend.api import AuditAPI, KeySpec from sshecret.crypto import encrypt_string, load_public_key from .keepass import PasswordContext, load_password_manager from sshecret_admin.core.settings import AdminServerSettings from .models import ( ClientSecretGroup, ClientSecretGroupList, SecretClientMapping, SecretGroup, SecretView, ) class ClientManagementError(Exception): """Base exception for client management operations.""" class ClientNotFoundError(ClientManagementError): """Client not found.""" class SecretNotFoundError(ClientManagementError): """Secret not found.""" class BackendUnavailableError(ClientManagementError): """Backend unavailable.""" LOG = logging.getLogger(__name__) def add_clients_to_secret_group( group: SecretGroup, client_secret_mapping: dict[str, DetailedSecrets], parent: ClientSecretGroup | None = None, ) -> ClientSecretGroup: """Add client information to a secret group.""" client_secret_group = ClientSecretGroup( group_name=group.name, path=group.path, description=group.description, parent_group=parent, ) for entry in group.entries: secret_entries = SecretClientMapping(name=entry) if details := client_secret_mapping.get(entry): secret_entries.clients = details.clients client_secret_group.entries.append(secret_entries) for subgroup in group.children: client_secret_group.children.append( add_clients_to_secret_group( subgroup, client_secret_mapping, client_secret_group ) ) # We'll save a bit of memory and complexity by just adding the name of the parent, if available. if not parent and group.parent_group: client_secret_group.parent_group = ClientSecretGroup( group_name=group.parent_group.name, path=group.parent_group.path, ) return client_secret_group class AdminBackend: """Admin backend API.""" def __init__(self, settings: AdminServerSettings, keepass_password: str) -> None: """Create client management API.""" self.settings: AdminServerSettings = settings self.backend: SshecretBackend = SshecretBackend( str(settings.backend_url), settings.backend_token ) self.keepass_password: str = keepass_password @contextmanager def password_manager(self) -> Iterator[PasswordContext]: """Open the password manager.""" with load_password_manager(self.settings, self.keepass_password) as kp: yield kp async def _get_clients(self, filter: ClientFilter | None = None) -> list[Client]: """Get clients from backend.""" return await self.backend.get_clients(filter) async def get_clients(self, filter: ClientFilter | None = None) -> list[Client]: """Get clients from backend.""" try: return await self._get_clients(filter) except ClientManagementError: raise except Exception as e: raise BackendUnavailableError() from e async def _get_client(self, idname: KeySpec) -> Client | None: """Get a client from the backend.""" return await self.backend.get_client(idname) async def _verify_client_exists(self, idname: KeySpec) -> None: """Check that a client exists.""" client = await self.backend.get_client(idname) if not client: raise ClientNotFoundError() return None async def verify_client_exists(self, name: str) -> None: """Check that a client exists.""" try: await self._verify_client_exists(name) except ClientManagementError: raise except Exception as e: raise BackendUnavailableError() from e async def get_client(self, name: KeySpec) -> Client | None: """Get a client from the backend.""" try: return await self._get_client(name) except ClientManagementError: raise except Exception as e: raise BackendUnavailableError() from e async def _create_client( self, name: str, public_key: str, description: str | None = None, sources: list[str] | None = None, ) -> Client: """Create client.""" await self.backend.create_client(name, public_key, description) if sources: await self.backend.update_client_sources(name, sources) client = await self.get_client(name) if not client: raise ClientNotFoundError() return client async def create_client( self, name: str, public_key: str, description: str | None = None, sources: list[str] | None = None, ) -> Client: """Create client.""" try: return await self._create_client(name, public_key, description, sources) except ClientManagementError: raise except Exception as e: raise BackendUnavailableError() from e async def _update_client_public_key( self, name: KeySpec, new_key: str, password_manager: PasswordContext, ) -> list[str]: """Update client public key.""" LOG.info( "Updating client %s public key. This will invalidate all existing secrets." ) client = await self.get_client(name) if not client: raise ClientNotFoundError() await self.backend.update_client_key(name, new_key) updated_secrets: list[str] = [] for secret in client.secrets: LOG.debug("Re-encrypting secret %s for client %s", secret, name) secret_value = password_manager.get_secret(secret) if not secret_value: LOG.warning( "Referenced secret %s does not exist! Skipping.", secret_value ) continue rsa_public_key = load_public_key(client.public_key.encode()) encrypted = encrypt_string(secret_value, rsa_public_key) LOG.debug("Sending new encrypted value to backend.") await self.backend.create_client_secret(name, secret, encrypted) updated_secrets.append(secret) return updated_secrets async def update_client_public_key(self, name: KeySpec, new_key: str) -> list[str]: """Update client public key.""" try: with self.password_manager() as password_manager: return await self._update_client_public_key( name, new_key, password_manager ) except ClientManagementError: raise except Exception as e: raise BackendUnavailableError() from e async def _update_client(self, new_client: Client) -> Client: """Update a client object.""" existing_client = await self.get_client(new_client.name) if not existing_client: raise ClientNotFoundError() await self.backend.update_client(new_client) if new_client.public_key != existing_client.public_key: await self.update_client_public_key(new_client.name, new_client.public_key) updated_client = await self.get_client(new_client.name) if not updated_client: raise ClientNotFoundError() return updated_client async def update_client(self, new_client: Client) -> Client: """Update a client object.""" try: return await self._update_client(new_client) except ClientManagementError: raise except Exception as e: raise BackendUnavailableError() from e async def update_client_sources(self, name: KeySpec, sources: list[str]) -> None: """Update client sources.""" try: await self.backend.update_client_sources(name, sources) except Exception as e: raise BackendUnavailableError() from e async def _delete_client(self, name: KeySpec) -> None: """Delete client.""" await self.backend.delete_client(name) async def delete_client(self, name: KeySpec) -> None: """Delete client.""" try: await self._delete_client(name) except ClientManagementError: raise except Exception as e: raise BackendUnavailableError() from e async def delete_client_secret( self, client_name: KeySpec, secret_name: KeySpec ) -> None: """Delete a secret from a client.""" try: await self.backend.delete_client_secret(client_name, secret_name) except Exception as e: raise BackendUnavailableError() from e async def _get_secrets(self) -> list[Secret]: """Get secrets. This fetches the secret to client mapping from backend, and adds secrets from the password manager. """ with self.password_manager() as password_manager: all_secrets = password_manager.get_available_secrets() secrets = await self.backend.get_secrets() backend_secret_names = [secret.name for secret in secrets] for secret in all_secrets: if secret not in backend_secret_names: secrets.append(Secret(name=secret, clients=[])) return secrets async def get_secrets(self) -> list[Secret]: """Get secrets from backend.""" try: return await self._get_secrets() except ClientManagementError: raise except Exception as e: raise BackendUnavailableError() from e async def _get_detailed_secrets(self) -> list[DetailedSecrets]: """Get detailed secrets. This fetches the secret to client mapping from backend, and adds secrets from the password manager. """ with self.password_manager() as password_manager: all_secrets = password_manager.get_available_secrets() secrets = await self.backend.get_detailed_secrets() backend_secret_names = [secret.name for secret in secrets] for secret in all_secrets: if secret not in backend_secret_names: secrets.append(DetailedSecrets(name=secret, ids=[], clients=[])) return secrets async def get_detailed_secrets(self) -> list[DetailedSecrets]: """Get detailed secrets from backend.""" try: return await self._get_detailed_secrets() except ClientManagementError: raise except Exception as e: raise BackendUnavailableError() from e async def add_secret_group( self, group_name: str, description: str | None = None, parent_group: str | None = None, ) -> None: """Add secret group.""" with self.password_manager() as password_manager: password_manager.add_group(group_name, description, parent_group) async def set_secret_group(self, secret_name: str, group_name: str | None) -> None: """Assign a group to a secret.""" with self.password_manager() as password_manager: password_manager.set_secret_group(secret_name, group_name) async def move_secret_group( self, group_name: str, parent_group: str | None ) -> None: """Move a group. If parent_group is None, it will be moved to the root. """ with self.password_manager() as password_manager: password_manager.move_group(group_name, parent_group) async def set_group_description(self, group_name: str, description: str) -> None: """Set a group description.""" with self.password_manager() as password_manager: password_manager.set_group_description(group_name, description) async def delete_secret_group( self, group_name: str, keep_entries: bool = True ) -> None: """Delete a group. If keep_entries is set to False, all entries in the group will be deleted. """ with self.password_manager() as password_manager: password_manager.delete_group(group_name, keep_entries) async def get_secret_groups( self, group_filter: str | None = None, regex: bool = True, flat: bool = False, ) -> ClientSecretGroupList: """Get secret groups. The starting group can be filtered with the group_name argument, which may be a regular expression. Groups are returned in a tree, unless flat is True. """ all_secrets = await self.backend.get_detailed_secrets() secrets_mapping = {secret.name: secret for secret in all_secrets} with self.password_manager() as password_manager: if flat: all_groups = password_manager.get_secret_group_list( group_filter, regex=regex ) else: all_groups = password_manager.get_secret_groups( group_filter, regex=regex ) ungrouped = password_manager.get_ungrouped_secrets() group_result: list[ClientSecretGroup] = [] for group in all_groups: # We have to do this recursively. group_result.append(add_clients_to_secret_group(group, secrets_mapping)) result = ClientSecretGroupList(groups=group_result) if group_filter: return result ungrouped_clients: list[SecretClientMapping] = [] for name in ungrouped: mapping = SecretClientMapping(name=name) if client_mapping := secrets_mapping.get(name): mapping.clients = client_mapping.clients ungrouped_clients.append(mapping) result.ungrouped = ungrouped_clients return result async def get_secret_group(self, name: str) -> ClientSecretGroup | None: """Get a single secret group by name.""" matches = await self.get_secret_groups(group_filter=name, regex=False) if matches.groups: return matches.groups[0] return None async def get_secret(self, name: str) -> SecretView | None: """Get secrets from backend.""" try: return await self._get_secret(name) except ClientManagementError: raise except Exception as e: raise BackendUnavailableError() from e async def _get_secret( self, name: str, secret_id: str | None = None ) -> SecretView | None: """Get a secret, including the actual unencrypted value and clients.""" with self.password_manager() as password_manager: secret = password_manager.get_secret(name) secret_group = password_manager.get_entry_group(name) if not secret: return None secret_view = SecretView(name=name, secret=secret, group=secret_group) idname: KeySpec = name if secret_id: idname = ("id", secret_id) secret_mapping = await self.backend.get_secret(idname) if secret_mapping: secret_view.clients = [ref.name for ref in secret_mapping.clients] return secret_view async def delete_secret(self, name: str) -> None: """Delete a secret.""" try: return await self._delete_secret(name) except ClientManagementError: raise except Exception as e: raise BackendUnavailableError() from e async def _delete_secret(self, name: str) -> None: """Delete a secret.""" with self.password_manager() as password_manager: password_manager.delete_entry(name) secret_mapping = await self.backend.get_secret(name) if not secret_mapping: return for client in secret_mapping.clients: LOG.info("Deleting secret %s from client %s", name, client) await self.backend.delete_client_secret(("id", client.id), name) async def _add_secret( self, name: str, value: str, clients: list[str] | None, update: bool = False, group: str | None = None, ) -> None: """Add a secret.""" with self.password_manager() as password_manager: password_manager.add_entry(name, value, update, group_name=group) if update: secret_map = await self.backend.get_secret(name) if secret_map: clients = [ref.name for ref in secret_map.clients] if not clients: return for client_name in clients: client = await self.get_client(client_name) if not client: if update: raise ClientNotFoundError() LOG.warning("Requested client %s not found!", client_name) continue public_key = load_public_key(client.public_key.encode()) encrypted = encrypt_string(value, public_key) LOG.info("Wrote encrypted secret for client %s", client_name) await self.backend.create_client_secret(client_name, name, encrypted) async def add_secret( self, name: str, value: str, clients: list[str] | None = None, group: str | None = None, ) -> None: """Add a secret.""" try: await self._add_secret(name=name, value=value, clients=clients, group=group) except ClientManagementError: raise except Exception as e: raise BackendUnavailableError() from e async def update_secret(self, name: str, value: str) -> None: """Update secrets.""" try: await self._add_secret(name, value, None, True) except ClientManagementError: raise except Exception as e: raise BackendUnavailableError() from e async def _create_client_secret(self, client_name: str, secret_name: str) -> None: """Create client secret.""" client = await self.get_client(client_name) if not client: raise ClientNotFoundError() with self.password_manager() as password_manager: secret = password_manager.get_secret(secret_name) if not secret: raise SecretNotFoundError() public_key = load_public_key(client.public_key.encode()) encrypted = encrypt_string(secret, public_key) await self.backend.create_client_secret(client_name, secret_name, encrypted) async def create_client_secret(self, client_name: str, secret_name: str) -> None: """Create client secret.""" try: await self._create_client_secret(client_name, secret_name) except ClientManagementError: raise except Exception as e: raise BackendUnavailableError() from e @property def audit(self) -> AuditAPI: """Resolve audit API.""" return self.backend.audit(SubSystem.ADMIN) async def get_audit_log( self, offset: int = 0, limit: int = 100, **kwargs: str, ) -> list[AuditLog]: """Get audit log from backend. Keyword Arguments: operation: str | None subsystem: str | None client_id: str | None client_name: str | None secret_id: str | None secret_name: str | None origin: str | None """ return await self.audit.get(offset, limit, **kwargs) async def get_audit_log_detailed( self, offset: int = 0, limit: int = 100, **kwargs: str, ) -> AuditListResult: """Get audit log from backend. Keyword Arguments: operation: str | None subsystem: str | None client_id: str | None client_name: str | None secret_id: str | None secret_name: str | None origin: str | None """ return await self.audit.get_detailed(offset, limit, **kwargs) async def write_audit_message( self, operation: Operation, message: str, origin: str, client: Client | None = None, secret_name: str | None = None, **data: str, ) -> None: """Write an audit message.""" await self.audit.write_async( operation=operation, message=message, origin=origin, client=client, secret=None, secret_name=secret_name, **data, ) async def write_audit_log(self, entry: AuditLog) -> None: """Write to the audit log.""" if not entry.subsystem: entry.subsystem = SubSystem.ADMIN await self.audit.write_model_async(entry) # await self.backend.add_audit_log(entry) async def get_audit_log_count(self) -> int: """Get audit log count.""" return await self.audit.count()