Complete admin package restructuring

This commit is contained in:
2025-05-10 08:28:15 +02:00
parent 4f970a3f71
commit 0a427b6a91
80 changed files with 1282 additions and 843 deletions

View File

@ -0,0 +1,116 @@
"""Keepass password manager."""
import logging
from collections.abc import Iterator
from contextlib import contextmanager
from pathlib import Path
from typing import cast
import pykeepass
from .master_password import decrypt_master_password
from sshecret_admin.core.settings import AdminServerSettings
LOG = logging.getLogger(__name__)
NO_USERNAME = "NO_USERNAME"
DEFAULT_LOCATION = "keepass.kdbx"
def create_password_db(location: Path, password: str) -> None:
"""Create the password database."""
LOG.info("Creating password database at %s", location)
pykeepass.create_database(str(location.absolute()), password=password)
class PasswordContext:
"""Password Context class."""
def __init__(self, keepass: pykeepass.PyKeePass) -> None:
"""Initialize password context."""
self.keepass: pykeepass.PyKeePass = keepass
def add_entry(self, entry_name: str, secret: str, overwrite: bool = False) -> None:
"""Add an entry.
Specify overwrite=True to overwrite the existing secret value, if it exists.
"""
entry = cast(
"pykeepass.entry.Entry | None",
self.keepass.find_entries(title=entry_name, first=True),
)
if entry and overwrite:
entry.password = secret
elif entry:
raise ValueError("Error: A secret with this name already exists.")
LOG.debug("Add secret entry to keepass: %s", entry_name)
entry = self.keepass.add_entry(
destination_group=self.keepass.root_group,
title=entry_name,
username=NO_USERNAME,
password=secret,
)
self.keepass.save()
def get_secret(self, entry_name: str) -> str | None:
"""Get the secret value."""
entry = cast(
"pykeepass.entry.Entry | None",
self.keepass.find_entries(title=entry_name, first=True),
)
if not entry:
return None
LOG.warning("Secret name %s accessed", entry_name)
if password := cast(str, entry.password):
return str(password)
raise RuntimeError(f"Cannot get password for entry {entry_name}")
def get_available_secrets(self) -> list[str]:
"""Get the names of all secrets in the database."""
entries = self.keepass.entries
if not entries:
return []
return [str(entry.title) for entry in entries]
def delete_entry(self, entry_name: str) -> None:
"""Delete entry."""
entry = cast(
"pykeepass.entry.Entry | None",
self.keepass.find_entries(title=entry_name, first=True),
)
if not entry:
return
entry.delete()
self.keepass.save()
@contextmanager
def _password_context(location: Path, password: str) -> Iterator[PasswordContext]:
"""Open the password context."""
database = pykeepass.PyKeePass(str(location.absolute()), password=password)
context = PasswordContext(database)
yield context
@contextmanager
def load_password_manager(
settings: AdminServerSettings,
encrypted_password: str,
location: str = DEFAULT_LOCATION,
) -> Iterator[PasswordContext]:
"""Load password manager.
This function decrypts the password, and creates the password database if it
has not yet been created.
"""
db_location = Path(location)
password = decrypt_master_password(settings=settings, encrypted=encrypted_password)
if not db_location.exists():
create_password_db(db_location, password)
with _password_context(db_location, password) as context:
yield context