349 lines
11 KiB
Python
349 lines
11 KiB
Python
"""Keepass password manager."""
|
|
|
|
import logging
|
|
from collections.abc import Iterator
|
|
from contextlib import contextmanager
|
|
from pathlib import Path
|
|
from typing import cast
|
|
|
|
import pykeepass
|
|
import pykeepass.exceptions
|
|
from sshecret_admin.core.settings import AdminServerSettings
|
|
|
|
from .models import SecretGroup
|
|
from .master_password import decrypt_master_password
|
|
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
NO_USERNAME = "NO_USERNAME"
|
|
|
|
DEFAULT_LOCATION = "keepass.kdbx"
|
|
|
|
|
|
class PasswordCredentialsError(Exception):
|
|
pass
|
|
|
|
|
|
def create_password_db(location: Path, password: str) -> None:
|
|
"""Create the password database."""
|
|
LOG.info("Creating password database at %s", location)
|
|
pykeepass.create_database(str(location.absolute()), password=password)
|
|
|
|
|
|
def _kp_group_to_secret_group(
|
|
kp_group: pykeepass.group.Group,
|
|
parent: SecretGroup | None = None,
|
|
depth: int | None = None,
|
|
) -> SecretGroup:
|
|
"""Convert keepass group to secret group dataclass."""
|
|
group_name = cast(str, kp_group.name)
|
|
path = "/".join(cast(list[str], kp_group.path))
|
|
group = SecretGroup(name=group_name, path=path, description=kp_group.notes)
|
|
for entry in kp_group.entries:
|
|
group.entries.append(str(entry.title))
|
|
if parent:
|
|
group.parent_group = parent
|
|
|
|
current_depth = len(kp_group.path)
|
|
|
|
if not parent and current_depth > 1:
|
|
parent = _kp_group_to_secret_group(kp_group.parentgroup, depth=current_depth)
|
|
parent.children.append(group)
|
|
group.parent_group = parent
|
|
|
|
if depth and depth == current_depth:
|
|
return group
|
|
|
|
for subgroup in kp_group.subgroups:
|
|
group.children.append(_kp_group_to_secret_group(subgroup, group, depth=depth))
|
|
|
|
return group
|
|
|
|
|
|
class PasswordContext:
|
|
"""Password Context class."""
|
|
|
|
def __init__(self, keepass: pykeepass.PyKeePass) -> None:
|
|
"""Initialize password context."""
|
|
self.keepass: pykeepass.PyKeePass = keepass
|
|
|
|
@property
|
|
def _root_group(self) -> pykeepass.group.Group:
|
|
"""Return the root group."""
|
|
return cast(pykeepass.group.Group, self.keepass.root_group)
|
|
|
|
def _get_entry(self, name: str) -> pykeepass.entry.Entry | None:
|
|
"""Get entry."""
|
|
entry = cast(
|
|
"pykeepass.entry.Entry | None",
|
|
self.keepass.find_entries(title=name, first=True),
|
|
)
|
|
return entry
|
|
|
|
def _get_group(self, name: str) -> pykeepass.group.Group | None:
|
|
"""Find a group."""
|
|
group = cast(
|
|
pykeepass.group.Group | None,
|
|
self.keepass.find_groups(name=name, first=True),
|
|
)
|
|
return group
|
|
|
|
def add_entry(
|
|
self,
|
|
entry_name: str,
|
|
secret: str,
|
|
overwrite: bool = False,
|
|
group_name: str | None = None,
|
|
) -> None:
|
|
"""Add an entry.
|
|
|
|
Specify overwrite=True to overwrite the existing secret value, if it exists.
|
|
This will not move the entry, if the group_name is different from the original group.
|
|
|
|
"""
|
|
entry = self._get_entry(entry_name)
|
|
if entry and overwrite:
|
|
entry.password = secret
|
|
self.keepass.save()
|
|
return
|
|
|
|
if entry:
|
|
raise ValueError("Error: A secret with this name already exists.")
|
|
LOG.debug("Add secret entry to keepass: %s, group: %r", entry_name, group_name)
|
|
if group_name:
|
|
destination_group = self._get_group(group_name)
|
|
else:
|
|
destination_group = self._root_group
|
|
|
|
entry = self.keepass.add_entry(
|
|
destination_group=destination_group,
|
|
title=entry_name,
|
|
username=NO_USERNAME,
|
|
password=secret,
|
|
)
|
|
self.keepass.save()
|
|
|
|
def get_secret(self, entry_name: str) -> str | None:
|
|
"""Get the secret value."""
|
|
entry = self._get_entry(entry_name)
|
|
if not entry:
|
|
return None
|
|
|
|
LOG.warning("Secret name %s accessed", entry_name)
|
|
if password := cast(str, entry.password):
|
|
return str(password)
|
|
|
|
raise RuntimeError(f"Cannot get password for entry {entry_name}")
|
|
|
|
def get_entry_group(self, entry_name: str) -> str | None:
|
|
"""Get the group for an entry."""
|
|
entry = self._get_entry(entry_name)
|
|
if not entry:
|
|
return None
|
|
if entry.group.is_root_group:
|
|
return None
|
|
return str(entry.group.name)
|
|
|
|
def get_secret_groups(
|
|
self, pattern: str | None = None, regex: bool = True
|
|
) -> list[SecretGroup]:
|
|
"""Get secret groups.
|
|
|
|
A regex pattern may be provided to filter groups.
|
|
"""
|
|
if pattern:
|
|
groups = cast(
|
|
list[pykeepass.group.Group],
|
|
self.keepass.find_groups(name=pattern, regex=regex),
|
|
)
|
|
else:
|
|
groups = self._root_group.subgroups
|
|
|
|
secret_groups = [_kp_group_to_secret_group(group) for group in groups]
|
|
return secret_groups
|
|
|
|
def get_secret_group_list(
|
|
self, pattern: str | None = None, regex: bool = True
|
|
) -> list[SecretGroup]:
|
|
"""Get a flat list of groups."""
|
|
if pattern:
|
|
return self.get_secret_groups(pattern, regex)
|
|
|
|
groups = [group for group in self.keepass.groups if not group.is_root_group]
|
|
secret_groups = [_kp_group_to_secret_group(group) for group in groups]
|
|
return secret_groups
|
|
|
|
def get_secret_group(self, path: str) -> SecretGroup | None:
|
|
"""Get a secret group by path."""
|
|
elements = path.split("/")
|
|
final_element = elements[-1]
|
|
|
|
current = self._root_group
|
|
while elements:
|
|
groupname = elements.pop(0)
|
|
matches = [
|
|
subgroup for subgroup in current.subgroups if subgroup.name == groupname
|
|
]
|
|
if matches:
|
|
current = matches[0]
|
|
else:
|
|
return None
|
|
if not current.is_root_group and current.name == final_element:
|
|
return _kp_group_to_secret_group(current)
|
|
return None
|
|
|
|
def get_ungrouped_secrets(self) -> list[str]:
|
|
"""Get secrets without groups."""
|
|
entries: list[str] = []
|
|
for entry in self._root_group.entries:
|
|
entries.append(str(entry.title))
|
|
|
|
return entries
|
|
|
|
def add_group(
|
|
self, name: str, description: str | None = None, parent_group: str | None = None
|
|
) -> None:
|
|
"""Add a group."""
|
|
kp_parent_group = self._root_group
|
|
if parent_group:
|
|
query = cast(
|
|
pykeepass.group.Group | None,
|
|
self.keepass.find_groups(name=parent_group, first=True),
|
|
)
|
|
if not query:
|
|
raise ValueError(
|
|
f"Error: Cannot find a parent group named {parent_group}"
|
|
)
|
|
kp_parent_group = query
|
|
self.keepass.add_group(
|
|
destination_group=kp_parent_group, group_name=name, notes=description
|
|
)
|
|
self.keepass.save()
|
|
|
|
def set_group_description(self, name: str, description: str) -> None:
|
|
"""Set the description of a group."""
|
|
group = self._get_group(name)
|
|
if not group:
|
|
raise ValueError(f"Error: No such group {name}")
|
|
|
|
group.notes = description
|
|
self.keepass.save()
|
|
|
|
def set_secret_group(self, entry_name: str, group_name: str | None) -> None:
|
|
"""Move a secret to a group.
|
|
|
|
If group is None, the secret will be placed in the root group.
|
|
"""
|
|
entry = self._get_entry(entry_name)
|
|
if not entry:
|
|
raise ValueError(
|
|
f"Cannot find secret entry named {entry_name} in secrets database"
|
|
)
|
|
if group_name:
|
|
group = self._get_group(group_name)
|
|
if not group:
|
|
raise ValueError(f"Cannot find a group named {group_name}")
|
|
else:
|
|
group = self._root_group
|
|
|
|
self.keepass.move_entry(entry, group)
|
|
self.keepass.save()
|
|
|
|
def move_group(self, name: str, parent_group: str | None) -> None:
|
|
"""Move a group.
|
|
|
|
If parent_group is None, it will be moved to the root.
|
|
"""
|
|
group = self._get_group(name)
|
|
if not group:
|
|
raise ValueError(f"Error: No such group {name}")
|
|
if parent_group:
|
|
parent = self._get_group(parent_group)
|
|
if not parent:
|
|
raise ValueError(f"Error: No such group {parent_group}")
|
|
else:
|
|
parent = self._root_group
|
|
|
|
self.keepass.move_group(group, parent)
|
|
self.keepass.save()
|
|
|
|
def get_available_secrets(self, group_name: str | None = None) -> list[str]:
|
|
"""Get the names of all secrets in the database."""
|
|
if group_name:
|
|
group = self._get_group(group_name)
|
|
if not group:
|
|
raise ValueError(f"Error: No such group {group_name}")
|
|
entries = group.entries
|
|
else:
|
|
entries = cast(list[pykeepass.entry.Entry], self.keepass.entries)
|
|
if not entries:
|
|
return []
|
|
return [str(entry.title) for entry in entries]
|
|
|
|
def delete_entry(self, entry_name: str) -> None:
|
|
"""Delete entry."""
|
|
entry = cast(
|
|
"pykeepass.entry.Entry | None",
|
|
self.keepass.find_entries(title=entry_name, first=True),
|
|
)
|
|
if not entry:
|
|
return
|
|
entry.delete()
|
|
self.keepass.save()
|
|
|
|
def delete_group(self, name: str, keep_entries: bool = True) -> None:
|
|
"""Delete a group.
|
|
|
|
If keep_entries is set to False, all entries in the group will be deleted.
|
|
"""
|
|
group = self._get_group(name)
|
|
if not group:
|
|
return
|
|
if keep_entries:
|
|
for entry in cast(
|
|
list[pykeepass.entry.Entry],
|
|
self.keepass.find_entries(recursive=True, group=group),
|
|
):
|
|
# Move the entry to the root group.
|
|
LOG.warning(
|
|
"Moving orphaned secret entry %s to root group", entry.title
|
|
)
|
|
self.keepass.move_entry(entry, self._root_group)
|
|
|
|
self.keepass.delete_group(group)
|
|
self.keepass.save()
|
|
|
|
|
|
@contextmanager
|
|
def _password_context(location: Path, password: str) -> Iterator[PasswordContext]:
|
|
"""Open the password context."""
|
|
try:
|
|
database = pykeepass.PyKeePass(str(location.absolute()), password=password)
|
|
except pykeepass.exceptions.CredentialsError as e:
|
|
raise PasswordCredentialsError(
|
|
"Could not open password database. Invalid credentials."
|
|
) from e
|
|
context = PasswordContext(database)
|
|
yield context
|
|
|
|
|
|
@contextmanager
|
|
def load_password_manager(
|
|
settings: AdminServerSettings,
|
|
encrypted_password: str,
|
|
location: str = DEFAULT_LOCATION,
|
|
) -> Iterator[PasswordContext]:
|
|
"""Load password manager.
|
|
|
|
This function decrypts the password, and creates the password database if it
|
|
has not yet been created.
|
|
"""
|
|
db_location = Path(location)
|
|
password = decrypt_master_password(settings=settings, encrypted=encrypted_password)
|
|
if not db_location.exists():
|
|
create_password_db(db_location, password)
|
|
|
|
with _password_context(db_location, password) as context:
|
|
yield context
|