From 289352d87218a0591569c278920b734ea084097c Mon Sep 17 00:00:00 2001 From: Allan Eising Date: Sat, 31 May 2025 10:41:58 +0200 Subject: [PATCH] Add support for groups of secrets --- .../src/sshecret_admin/services/keepass.py | 184 +++++++++++-- .../src/sshecret_admin/services/models.py | 9 + .../tests/test_password_context.py | 242 ++++++++++++++++++ 3 files changed, 420 insertions(+), 15 deletions(-) create mode 100644 packages/sshecret-admin/tests/test_password_context.py diff --git a/packages/sshecret-admin/src/sshecret_admin/services/keepass.py b/packages/sshecret-admin/src/sshecret_admin/services/keepass.py index 7eeb7e1..0367774 100644 --- a/packages/sshecret-admin/src/sshecret_admin/services/keepass.py +++ b/packages/sshecret-admin/src/sshecret_admin/services/keepass.py @@ -7,15 +7,16 @@ from pathlib import Path from typing import cast import pykeepass -from .master_password import decrypt_master_password from sshecret_admin.core.settings import AdminServerSettings +from .models import SecretGroup +from .master_password import decrypt_master_password + LOG = logging.getLogger(__name__) NO_USERNAME = "NO_USERNAME" - DEFAULT_LOCATION = "keepass.kdbx" @@ -25,6 +26,20 @@ def create_password_db(location: Path, password: str) -> None: pykeepass.create_database(str(location.absolute()), password=password) +def _kp_group_to_secret_group( + kp_group: pykeepass.group.Group, parent: SecretGroup | None = None +) -> SecretGroup: + """Convert keepass group to secret group dataclass.""" + group_name = cast(str, kp_group.name) + group = SecretGroup(name=group_name, description=kp_group.notes) + if parent: + group.parent_group = parent + for subgroup in kp_group.subgroups: + group.children.append(_kp_group_to_secret_group(subgroup, group)) + + return group + + class PasswordContext: """Password Context class.""" @@ -32,15 +47,41 @@ class PasswordContext: """Initialize password context.""" self.keepass: pykeepass.PyKeePass = keepass - def add_entry(self, entry_name: str, secret: str, overwrite: bool = False) -> None: + @property + def _root_group(self) -> pykeepass.group.Group: + """Return the root group.""" + return cast(pykeepass.group.Group, self.keepass.root_group) + + def _get_entry(self, name: str) -> pykeepass.entry.Entry | None: + """Get entry.""" + entry = cast( + "pykeepass.entry.Entry | None", + self.keepass.find_entries(title=name, first=True), + ) + return entry + + def _get_group(self, name: str) -> pykeepass.group.Group | None: + """Find a group.""" + group = cast( + pykeepass.group.Group | None, + self.keepass.find_groups(name=name, first=True), + ) + return group + + def add_entry( + self, + entry_name: str, + secret: str, + overwrite: bool = False, + group_name: str | None = None, + ) -> None: """Add an entry. Specify overwrite=True to overwrite the existing secret value, if it exists. + This will not move the entry, if the group_name is different from the original group. + """ - entry = cast( - "pykeepass.entry.Entry | None", - self.keepass.find_entries(title=entry_name, first=True), - ) + entry = self._get_entry(entry_name) if entry and overwrite: entry.password = secret self.keepass.save() @@ -48,9 +89,14 @@ class PasswordContext: if entry: raise ValueError("Error: A secret with this name already exists.") - LOG.debug("Add secret entry to keepass: %s", entry_name) + LOG.debug("Add secret entry to keepass: %s, group: %r", entry_name, group_name) + if group_name: + destination_group = self._get_group(group_name) + else: + destination_group = self._root_group + entry = self.keepass.add_entry( - destination_group=self.keepass.root_group, + destination_group=destination_group, title=entry_name, username=NO_USERNAME, password=secret, @@ -59,10 +105,7 @@ class PasswordContext: def get_secret(self, entry_name: str) -> str | None: """Get the secret value.""" - entry = cast( - "pykeepass.entry.Entry | None", - self.keepass.find_entries(title=entry_name, first=True), - ) + entry = self._get_entry(entry_name) if not entry: return None @@ -72,9 +115,98 @@ class PasswordContext: raise RuntimeError(f"Cannot get password for entry {entry_name}") - def get_available_secrets(self) -> list[str]: + def get_secret_groups(self, pattern: str | None = None) -> list[SecretGroup]: + """Get secret groups. + + A regex pattern may be provided to filter groups. + """ + if pattern: + groups = cast( + list[pykeepass.group.Group], + self.keepass.find_groups(name=pattern, regex=True), + ) + else: + all_groups = cast(list[pykeepass.group.Group], self.keepass.groups) + # We skip the root group + groups = [group for group in all_groups if not group.is_root_group] + + secret_groups = [_kp_group_to_secret_group(group) for group in groups] + return secret_groups + + def add_group( + self, name: str, description: str | None = None, parent_group: str | None = None + ) -> None: + """Add a group.""" + kp_parent_group = self._root_group + if parent_group: + query = cast( + pykeepass.group.Group | None, + self.keepass.find_groups(name=parent_group, first=True), + ) + if not query: + raise ValueError( + f"Error: Cannot find a parent group named {parent_group}" + ) + kp_parent_group = query + self.keepass.add_group(kp_parent_group, name, notes=description) + self.keepass.save() + + def set_group_description(self, name: str, description: str) -> None: + """Set the description of a group.""" + group = self._get_group(name) + if not group: + raise ValueError(f"Error: No such group {name}") + + group.notes = description + self.keepass.save() + + def set_secret_group(self, entry_name: str, group_name: str | None) -> None: + """Move a secret to a group. + + If group is None, the secret will be placed in the root group. + """ + entry = self._get_entry(entry_name) + if not entry: + raise ValueError( + f"Cannot find secret entry named {entry_name} in secrets database" + ) + if group_name: + group = self._get_group(group_name) + if not group: + raise ValueError(f"Cannot find a group named {group_name}") + else: + group = self._root_group + + self.keepass.move_entry(entry, group) + self.keepass.save() + + def move_group(self, name: str, parent_group: str | None) -> None: + """Move a group. + + If parent_group is None, it will be moved to the root. + """ + group = self._get_group(name) + if not group: + raise ValueError(f"Error: No such group {name}") + if parent_group: + parent = self._get_group(parent_group) + if not parent: + raise ValueError(f"Error: No such group {parent_group}") + else: + parent = self._root_group + + self.keepass.move_group(group, parent) + self.keepass.save() + + def get_available_secrets(self, group_name: str | None = None) -> list[str]: """Get the names of all secrets in the database.""" - entries = self.keepass.entries + if group_name: + group = self._get_group(group_name) + if not group: + raise ValueError(f"Error: No such group {group_name}") + entries = group.entries + else: + entries = cast(list[pykeepass.entry.Entry], self.keepass.entries) if not entries: return [] return [str(entry.title) for entry in entries] @@ -90,6 +222,28 @@ class PasswordContext: entry.delete() self.keepass.save() + def delete_group(self, name: str, keep_entries: bool = True) -> None: + """Delete a group. + + If keep_entries is set to False, all entries in the group will be deleted. + """ + group = self._get_group(name) + if not group: + return + if keep_entries: + for entry in cast( + list[pykeepass.entry.Entry], + self.keepass.find_entries(recursive=True, group=group), + ): + # Move the entry to the root group. + LOG.warning( + "Moving orphaned secret entry %s to root group", entry.title + ) + self.keepass.move_entry(entry, self._root_group) + + self.keepass.delete_group(group) + self.keepass.save() + @contextmanager def _password_context(location: Path, password: str) -> Iterator[PasswordContext]: diff --git a/packages/sshecret-admin/src/sshecret_admin/services/models.py b/packages/sshecret-admin/src/sshecret_admin/services/models.py index 6455198..d9327ce 100644 --- a/packages/sshecret-admin/src/sshecret_admin/services/models.py +++ b/packages/sshecret-admin/src/sshecret_admin/services/models.py @@ -112,3 +112,12 @@ class SecretCreate(SecretUpdate): ] } ) + + +class SecretGroup(BaseModel): + """A secret group.""" + + name: str + description: str | None = None + parent_group: "SecretGroup | None" = None + children: list["SecretGroup"] = Field(default_factory=list) diff --git a/packages/sshecret-admin/tests/test_password_context.py b/packages/sshecret-admin/tests/test_password_context.py new file mode 100644 index 0000000..7b834bf --- /dev/null +++ b/packages/sshecret-admin/tests/test_password_context.py @@ -0,0 +1,242 @@ +"""Unit tests for the password context class. + +We are primarily testing whether the actions of the PasswordContext matches +those in the low level API. + +""" + +import random +import string +from pathlib import Path +from typing import cast +import pytest +import pykeepass + +from sshecret_admin.services.keepass import PasswordContext + + +def random_string(length: int = 5) -> str: + """Generate random string.""" + chars = string.ascii_lowercase + return "".join(random.choice(chars) for _ in range(length)) + + +def create_random_entries( + password_database: pykeepass.PyKeePass, + amount: int = 10, + prefix: str = "secret", + group: pykeepass.group.Group | None = None, +) -> None: + """Create some random entries.""" + if not group: + group = cast(pykeepass.group.Group, password_database.root_group) + for n in range(amount): + name = f"{prefix}-{n}" + username = "NONE" + password = random_string(12) + password_database.add_entry( + destination_group=group, + title=name, + username=username, + password=password, + ) + password_database.save() + + +@pytest.fixture(name="password_database") +def password_db_fixture(tmp_path: Path): + """Create a password database.""" + filename = tmp_path / "kpdb.kdbx" + yield pykeepass.create_database(str(filename), password="test") + + +def test_add_entry(password_database: pykeepass.PyKeePass) -> None: + """Test add entry.""" + context = PasswordContext(password_database) + context.add_entry("testentry", "testsecret") + + entry = password_database.find_entries(title="testentry", first=True) + assert entry is not None + assert isinstance(entry, pykeepass.entry.Entry) + assert entry.password == "testsecret" + + +def test_get_secret(password_database: pykeepass.PyKeePass) -> None: + """Test get secret.""" + + context = PasswordContext(password_database) + context.add_entry("testentry", "testsecret") + + secret = context.get_secret("testentry") + assert secret is not None + assert secret == "testsecret" + + +def test_get_available_secrets(password_database: pykeepass.PyKeePass) -> None: + """Test get_available_secrets.""" + create_random_entries(password_database, 10) + context = PasswordContext(password_database) + available_secrets = context.get_available_secrets() + assert len(available_secrets) == 10 + for n in range(10): + assert f"secret-{n}" in available_secrets + + +def test_delete_entry(password_database: pykeepass.PyKeePass) -> None: + """Test deletion of entry.""" + create_random_entries(password_database, 3) + context = PasswordContext(password_database) + available_secrets = context.get_available_secrets() + assert len(available_secrets) == 3 + context.delete_entry("secret-2") + entry = password_database.find_entries(title="secret-2", first=True) + assert entry is None + available_secrets = context.get_available_secrets() + assert len(available_secrets) == 2 + + +def test_get_secret_groups(password_database: pykeepass.PyKeePass) -> None: + """Test get secret groups.""" + # We create a hierarchy of groups to test how that parses. + root_group = password_database.root_group + first_group = password_database.add_group( + root_group, "level_one", notes="A group in the root" + ) + password_database.add_group( + first_group, "level_two", notes="A group one level down" + ) + # Another group at the root, without a note + password_database.add_group(root_group, "free_group") + password_database.save() + + context = PasswordContext(password_database) + groups = context.get_secret_groups() + assert len(groups) == 3 + + +def test_get_secret_groups_regex(password_database: pykeepass.PyKeePass) -> None: + """Get secret groups matching a regex.""" + # create some groups matching a pattern + for n in range(4): + password_database.add_group(password_database.root_group, f"foo-{n}") + + for n in range(3): + parent_group = password_database.find_groups(name="foo-1", first=True) + password_database.add_group(parent_group, f"bar-{n}") + + password_database.save() + + context = PasswordContext(password_database) + foo_groups = context.get_secret_groups("foo-.*") + assert len(foo_groups) == 4 + bar_groups = context.get_secret_groups("bar-.*") + assert len(bar_groups) == 3 + + +def test_add_group(password_database: pykeepass.PyKeePass) -> None: + """Test add_group.""" + context = PasswordContext(password_database) + context.add_group("test_group", "Test Group") + assert password_database.find_groups(name="test_group", first=True) is not None + + # add a nested group below the first one + context.add_group("nested_group", "Nested test group", "test_group") + group = password_database.find_groups(name="nested_group", first=True) + assert group is not None + assert isinstance(group, pykeepass.group.Group) + parent_group = group.parentgroup + assert parent_group.name == "test_group" + + +def test_set_group_description(password_database: pykeepass.PyKeePass) -> None: + """Test setting the group description.""" + context = PasswordContext(password_database) + context.add_group("test_group", "Test Group") + + kp_group = password_database.find_groups(name="test_group", first=True) + assert isinstance(kp_group, pykeepass.group.Group) + assert kp_group.notes == "Test Group" + + context.set_group_description("test_group", "New Description") + + kp_group = password_database.find_groups(name="test_group", first=True) + assert isinstance(kp_group, pykeepass.group.Group) + assert kp_group.notes == "New Description" + + +def test_add_entry_with_group(password_database: pykeepass.PyKeePass) -> None: + """Test adding an entry with a group.""" + context = PasswordContext(password_database) + context.add_group("test_group", "A test group") + context.add_entry("test_entry", "test_secret", group_name="test_group") + + entry = password_database.find_entries(title="test_entry", first=True) + assert entry is not None + + assert isinstance(entry, pykeepass.entry.Entry) + assert entry.group.name == "test_group" + + +def test_move_entry(password_database: pykeepass.PyKeePass) -> None: + """Test moving entries between groups.""" + context = PasswordContext(password_database) + + context.add_group("test_group", "A test group") + context.add_group("test_group_2", "Another test group") + + context.add_entry("test_entry", "test_secret") + entry = password_database.find_entries(title="test_entry", first=True) + assert isinstance(entry, pykeepass.entry.Entry) + assert entry.group.is_root_group is True + + context.set_secret_group("test_entry", "test_group") + + entry = password_database.find_entries(title="test_entry", first=True) + assert isinstance(entry, pykeepass.entry.Entry) + + assert entry.group.is_root_group is False + assert entry.group.name == "test_group" + + context.set_secret_group("test_entry", "test_group_2") + + entry = password_database.find_entries(title="test_entry", first=True) + assert isinstance(entry, pykeepass.entry.Entry) + + assert entry.group.is_root_group is False + assert entry.group.name == "test_group_2" + + context.set_secret_group("test_entry", None) + + entry = password_database.find_entries(title="test_entry", first=True) + assert isinstance(entry, pykeepass.entry.Entry) + assert entry.group.is_root_group is True + + +def test_move_group(password_database: pykeepass.PyKeePass) -> None: + """Test moving a group.""" + context = PasswordContext(password_database) + context.add_group("test_group", "A test group") + context.add_group("parent_group", "A parent group") + context.move_group("test_group", "parent_group") + + kp_group = password_database.find_groups(name="test_group", first=True) + assert isinstance(kp_group, pykeepass.group.Group) + assert kp_group.parentgroup.name == "parent_group" + + +def test_delete_group(password_database: pykeepass.PyKeePass) -> None: + """Test group deletion.""" + context = PasswordContext(password_database) + context.add_group("test_group", "A test group") + # Add some entries to this group. + kp_group = password_database.find_groups(name="test_group", first=True) + assert isinstance(kp_group, pykeepass.group.Group) + create_random_entries(password_database, amount=10, group=kp_group) + + context.delete_group("test_group") + kp_group = password_database.find_groups(name="test_group", first=True) + assert kp_group is None + + # Check if the secrets are still there. + secrets = context.get_available_secrets() + assert len(secrets) == 10