"""Keepass password manager.""" import logging from collections.abc import Iterator from contextlib import contextmanager from pathlib import Path from typing import cast import pykeepass import pykeepass.exceptions from sshecret_admin.core.settings import AdminServerSettings from .models import SecretGroup from .master_password import decrypt_master_password LOG = logging.getLogger(__name__) NO_USERNAME = "NO_USERNAME" DEFAULT_LOCATION = "keepass.kdbx" class PasswordCredentialsError(Exception): pass def create_password_db(location: Path, password: str) -> None: """Create the password database.""" LOG.info("Creating password database at %s", location) pykeepass.create_database(str(location.absolute()), password=password) def _kp_group_to_secret_group( kp_group: pykeepass.group.Group, parent: SecretGroup | None = None, depth: int | None = None ) -> SecretGroup: """Convert keepass group to secret group dataclass.""" group_name = cast(str, kp_group.name) path = "/".join(cast(list[str], kp_group.path)) group = SecretGroup(name=group_name, path=path, description=kp_group.notes) for entry in kp_group.entries: group.entries.append(str(entry.title)) if parent: group.parent_group = parent current_depth = len(kp_group.path) if not parent and current_depth > 1: parent = _kp_group_to_secret_group(kp_group.parentgroup, depth=current_depth) parent.children.append(group) group.parent_group = parent if depth and depth == current_depth: return group for subgroup in kp_group.subgroups: group.children.append(_kp_group_to_secret_group(subgroup, group, depth=depth)) return group class PasswordContext: """Password Context class.""" def __init__(self, keepass: pykeepass.PyKeePass) -> None: """Initialize password context.""" self.keepass: pykeepass.PyKeePass = keepass @property def _root_group(self) -> pykeepass.group.Group: """Return the root group.""" return cast(pykeepass.group.Group, self.keepass.root_group) def _get_entry(self, name: str) -> pykeepass.entry.Entry | None: """Get entry.""" entry = cast( "pykeepass.entry.Entry | None", self.keepass.find_entries(title=name, first=True), ) return entry def _get_group(self, name: str) -> pykeepass.group.Group | None: """Find a group.""" group = cast( pykeepass.group.Group | None, self.keepass.find_groups(name=name, first=True), ) return group def add_entry( self, entry_name: str, secret: str, overwrite: bool = False, group_name: str | None = None, ) -> None: """Add an entry. Specify overwrite=True to overwrite the existing secret value, if it exists. This will not move the entry, if the group_name is different from the original group. """ entry = self._get_entry(entry_name) if entry and overwrite: entry.password = secret self.keepass.save() return if entry: raise ValueError("Error: A secret with this name already exists.") LOG.debug("Add secret entry to keepass: %s, group: %r", entry_name, group_name) if group_name: destination_group = self._get_group(group_name) else: destination_group = self._root_group entry = self.keepass.add_entry( destination_group=destination_group, title=entry_name, username=NO_USERNAME, password=secret, ) self.keepass.save() def get_secret(self, entry_name: str) -> str | None: """Get the secret value.""" entry = self._get_entry(entry_name) if not entry: return None LOG.warning("Secret name %s accessed", entry_name) if password := cast(str, entry.password): return str(password) raise RuntimeError(f"Cannot get password for entry {entry_name}") def get_secret_groups(self, pattern: str | None = None, regex: bool = True) -> list[SecretGroup]: """Get secret groups. A regex pattern may be provided to filter groups. """ if pattern: groups = cast( list[pykeepass.group.Group], self.keepass.find_groups(name=pattern, regex=regex), ) else: all_groups = cast(list[pykeepass.group.Group], self.keepass.groups) # We skip the root group groups = [group for group in all_groups if not group.is_root_group] secret_groups = [_kp_group_to_secret_group(group) for group in groups] return secret_groups def add_group( self, name: str, description: str | None = None, parent_group: str | None = None ) -> None: """Add a group.""" kp_parent_group = self._root_group if parent_group: query = cast( pykeepass.group.Group | None, self.keepass.find_groups(name=parent_group, first=True), ) if not query: raise ValueError( f"Error: Cannot find a parent group named {parent_group}" ) kp_parent_group = query self.keepass.add_group(destination_group=kp_parent_group, group_name=name, notes=description) self.keepass.save() def set_group_description(self, name: str, description: str) -> None: """Set the description of a group.""" group = self._get_group(name) if not group: raise ValueError(f"Error: No such group {name}") group.notes = description self.keepass.save() def set_secret_group(self, entry_name: str, group_name: str | None) -> None: """Move a secret to a group. If group is None, the secret will be placed in the root group. """ entry = self._get_entry(entry_name) if not entry: raise ValueError( f"Cannot find secret entry named {entry_name} in secrets database" ) if group_name: group = self._get_group(group_name) if not group: raise ValueError(f"Cannot find a group named {group_name}") else: group = self._root_group self.keepass.move_entry(entry, group) self.keepass.save() def move_group(self, name: str, parent_group: str | None) -> None: """Move a group. If parent_group is None, it will be moved to the root. """ group = self._get_group(name) if not group: raise ValueError(f"Error: No such group {name}") if parent_group: parent = self._get_group(parent_group) if not parent: raise ValueError(f"Error: No such group {parent_group}") else: parent = self._root_group self.keepass.move_group(group, parent) self.keepass.save() def get_available_secrets(self, group_name: str | None = None) -> list[str]: """Get the names of all secrets in the database.""" if group_name: group = self._get_group(group_name) if not group: raise ValueError(f"Error: No such group {group_name}") entries = group.entries else: entries = cast(list[pykeepass.entry.Entry], self.keepass.entries) if not entries: return [] return [str(entry.title) for entry in entries] def delete_entry(self, entry_name: str) -> None: """Delete entry.""" entry = cast( "pykeepass.entry.Entry | None", self.keepass.find_entries(title=entry_name, first=True), ) if not entry: return entry.delete() self.keepass.save() def delete_group(self, name: str, keep_entries: bool = True) -> None: """Delete a group. If keep_entries is set to False, all entries in the group will be deleted. """ group = self._get_group(name) if not group: return if keep_entries: for entry in cast( list[pykeepass.entry.Entry], self.keepass.find_entries(recursive=True, group=group), ): # Move the entry to the root group. LOG.warning( "Moving orphaned secret entry %s to root group", entry.title ) self.keepass.move_entry(entry, self._root_group) self.keepass.delete_group(group) self.keepass.save() @contextmanager def _password_context(location: Path, password: str) -> Iterator[PasswordContext]: """Open the password context.""" try: database = pykeepass.PyKeePass(str(location.absolute()), password=password) except pykeepass.exceptions.CredentialsError as e: raise PasswordCredentialsError( "Could not open password database. Invalid credentials." ) from e context = PasswordContext(database) yield context @contextmanager def load_password_manager( settings: AdminServerSettings, encrypted_password: str, location: str = DEFAULT_LOCATION, ) -> Iterator[PasswordContext]: """Load password manager. This function decrypts the password, and creates the password database if it has not yet been created. """ db_location = Path(location) password = decrypt_master_password(settings=settings, encrypted=encrypted_password) if not db_location.exists(): create_password_db(db_location, password) with _password_context(db_location, password) as context: yield context