Add support for groups of secrets
This commit is contained in:
@ -7,15 +7,16 @@ from pathlib import Path
|
||||
from typing import cast
|
||||
|
||||
import pykeepass
|
||||
from .master_password import decrypt_master_password
|
||||
from sshecret_admin.core.settings import AdminServerSettings
|
||||
|
||||
from .models import SecretGroup
|
||||
from .master_password import decrypt_master_password
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
NO_USERNAME = "NO_USERNAME"
|
||||
|
||||
|
||||
DEFAULT_LOCATION = "keepass.kdbx"
|
||||
|
||||
|
||||
@ -25,6 +26,20 @@ def create_password_db(location: Path, password: str) -> None:
|
||||
pykeepass.create_database(str(location.absolute()), password=password)
|
||||
|
||||
|
||||
def _kp_group_to_secret_group(
|
||||
kp_group: pykeepass.group.Group, parent: SecretGroup | None = None
|
||||
) -> SecretGroup:
|
||||
"""Convert keepass group to secret group dataclass."""
|
||||
group_name = cast(str, kp_group.name)
|
||||
group = SecretGroup(name=group_name, description=kp_group.notes)
|
||||
if parent:
|
||||
group.parent_group = parent
|
||||
for subgroup in kp_group.subgroups:
|
||||
group.children.append(_kp_group_to_secret_group(subgroup, group))
|
||||
|
||||
return group
|
||||
|
||||
|
||||
class PasswordContext:
|
||||
"""Password Context class."""
|
||||
|
||||
@ -32,15 +47,41 @@ class PasswordContext:
|
||||
"""Initialize password context."""
|
||||
self.keepass: pykeepass.PyKeePass = keepass
|
||||
|
||||
def add_entry(self, entry_name: str, secret: str, overwrite: bool = False) -> None:
|
||||
@property
|
||||
def _root_group(self) -> pykeepass.group.Group:
|
||||
"""Return the root group."""
|
||||
return cast(pykeepass.group.Group, self.keepass.root_group)
|
||||
|
||||
def _get_entry(self, name: str) -> pykeepass.entry.Entry | None:
|
||||
"""Get entry."""
|
||||
entry = cast(
|
||||
"pykeepass.entry.Entry | None",
|
||||
self.keepass.find_entries(title=name, first=True),
|
||||
)
|
||||
return entry
|
||||
|
||||
def _get_group(self, name: str) -> pykeepass.group.Group | None:
|
||||
"""Find a group."""
|
||||
group = cast(
|
||||
pykeepass.group.Group | None,
|
||||
self.keepass.find_groups(name=name, first=True),
|
||||
)
|
||||
return group
|
||||
|
||||
def add_entry(
|
||||
self,
|
||||
entry_name: str,
|
||||
secret: str,
|
||||
overwrite: bool = False,
|
||||
group_name: str | None = None,
|
||||
) -> None:
|
||||
"""Add an entry.
|
||||
|
||||
Specify overwrite=True to overwrite the existing secret value, if it exists.
|
||||
This will not move the entry, if the group_name is different from the original group.
|
||||
|
||||
"""
|
||||
entry = cast(
|
||||
"pykeepass.entry.Entry | None",
|
||||
self.keepass.find_entries(title=entry_name, first=True),
|
||||
)
|
||||
entry = self._get_entry(entry_name)
|
||||
if entry and overwrite:
|
||||
entry.password = secret
|
||||
self.keepass.save()
|
||||
@ -48,9 +89,14 @@ class PasswordContext:
|
||||
|
||||
if entry:
|
||||
raise ValueError("Error: A secret with this name already exists.")
|
||||
LOG.debug("Add secret entry to keepass: %s", entry_name)
|
||||
LOG.debug("Add secret entry to keepass: %s, group: %r", entry_name, group_name)
|
||||
if group_name:
|
||||
destination_group = self._get_group(group_name)
|
||||
else:
|
||||
destination_group = self._root_group
|
||||
|
||||
entry = self.keepass.add_entry(
|
||||
destination_group=self.keepass.root_group,
|
||||
destination_group=destination_group,
|
||||
title=entry_name,
|
||||
username=NO_USERNAME,
|
||||
password=secret,
|
||||
@ -59,10 +105,7 @@ class PasswordContext:
|
||||
|
||||
def get_secret(self, entry_name: str) -> str | None:
|
||||
"""Get the secret value."""
|
||||
entry = cast(
|
||||
"pykeepass.entry.Entry | None",
|
||||
self.keepass.find_entries(title=entry_name, first=True),
|
||||
)
|
||||
entry = self._get_entry(entry_name)
|
||||
if not entry:
|
||||
return None
|
||||
|
||||
@ -72,9 +115,98 @@ class PasswordContext:
|
||||
|
||||
raise RuntimeError(f"Cannot get password for entry {entry_name}")
|
||||
|
||||
def get_available_secrets(self) -> list[str]:
|
||||
def get_secret_groups(self, pattern: str | None = None) -> list[SecretGroup]:
|
||||
"""Get secret groups.
|
||||
|
||||
A regex pattern may be provided to filter groups.
|
||||
"""
|
||||
if pattern:
|
||||
groups = cast(
|
||||
list[pykeepass.group.Group],
|
||||
self.keepass.find_groups(name=pattern, regex=True),
|
||||
)
|
||||
else:
|
||||
all_groups = cast(list[pykeepass.group.Group], self.keepass.groups)
|
||||
# We skip the root group
|
||||
groups = [group for group in all_groups if not group.is_root_group]
|
||||
|
||||
secret_groups = [_kp_group_to_secret_group(group) for group in groups]
|
||||
return secret_groups
|
||||
|
||||
def add_group(
|
||||
self, name: str, description: str | None = None, parent_group: str | None = None
|
||||
) -> None:
|
||||
"""Add a group."""
|
||||
kp_parent_group = self._root_group
|
||||
if parent_group:
|
||||
query = cast(
|
||||
pykeepass.group.Group | None,
|
||||
self.keepass.find_groups(name=parent_group, first=True),
|
||||
)
|
||||
if not query:
|
||||
raise ValueError(
|
||||
f"Error: Cannot find a parent group named {parent_group}"
|
||||
)
|
||||
kp_parent_group = query
|
||||
self.keepass.add_group(kp_parent_group, name, notes=description)
|
||||
self.keepass.save()
|
||||
|
||||
def set_group_description(self, name: str, description: str) -> None:
|
||||
"""Set the description of a group."""
|
||||
group = self._get_group(name)
|
||||
if not group:
|
||||
raise ValueError(f"Error: No such group {name}")
|
||||
|
||||
group.notes = description
|
||||
self.keepass.save()
|
||||
|
||||
def set_secret_group(self, entry_name: str, group_name: str | None) -> None:
|
||||
"""Move a secret to a group.
|
||||
|
||||
If group is None, the secret will be placed in the root group.
|
||||
"""
|
||||
entry = self._get_entry(entry_name)
|
||||
if not entry:
|
||||
raise ValueError(
|
||||
f"Cannot find secret entry named {entry_name} in secrets database"
|
||||
)
|
||||
if group_name:
|
||||
group = self._get_group(group_name)
|
||||
if not group:
|
||||
raise ValueError(f"Cannot find a group named {group_name}")
|
||||
else:
|
||||
group = self._root_group
|
||||
|
||||
self.keepass.move_entry(entry, group)
|
||||
self.keepass.save()
|
||||
|
||||
def move_group(self, name: str, parent_group: str | None) -> None:
|
||||
"""Move a group.
|
||||
|
||||
If parent_group is None, it will be moved to the root.
|
||||
"""
|
||||
group = self._get_group(name)
|
||||
if not group:
|
||||
raise ValueError(f"Error: No such group {name}")
|
||||
if parent_group:
|
||||
parent = self._get_group(parent_group)
|
||||
if not parent:
|
||||
raise ValueError(f"Error: No such group {parent_group}")
|
||||
else:
|
||||
parent = self._root_group
|
||||
|
||||
self.keepass.move_group(group, parent)
|
||||
self.keepass.save()
|
||||
|
||||
def get_available_secrets(self, group_name: str | None = None) -> list[str]:
|
||||
"""Get the names of all secrets in the database."""
|
||||
entries = self.keepass.entries
|
||||
if group_name:
|
||||
group = self._get_group(group_name)
|
||||
if not group:
|
||||
raise ValueError(f"Error: No such group {group_name}")
|
||||
entries = group.entries
|
||||
else:
|
||||
entries = cast(list[pykeepass.entry.Entry], self.keepass.entries)
|
||||
if not entries:
|
||||
return []
|
||||
return [str(entry.title) for entry in entries]
|
||||
@ -90,6 +222,28 @@ class PasswordContext:
|
||||
entry.delete()
|
||||
self.keepass.save()
|
||||
|
||||
def delete_group(self, name: str, keep_entries: bool = True) -> None:
|
||||
"""Delete a group.
|
||||
|
||||
If keep_entries is set to False, all entries in the group will be deleted.
|
||||
"""
|
||||
group = self._get_group(name)
|
||||
if not group:
|
||||
return
|
||||
if keep_entries:
|
||||
for entry in cast(
|
||||
list[pykeepass.entry.Entry],
|
||||
self.keepass.find_entries(recursive=True, group=group),
|
||||
):
|
||||
# Move the entry to the root group.
|
||||
LOG.warning(
|
||||
"Moving orphaned secret entry %s to root group", entry.title
|
||||
)
|
||||
self.keepass.move_entry(entry, self._root_group)
|
||||
|
||||
self.keepass.delete_group(group)
|
||||
self.keepass.save()
|
||||
|
||||
|
||||
@contextmanager
|
||||
def _password_context(location: Path, password: str) -> Iterator[PasswordContext]:
|
||||
|
||||
@ -112,3 +112,12 @@ class SecretCreate(SecretUpdate):
|
||||
]
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
class SecretGroup(BaseModel):
|
||||
"""A secret group."""
|
||||
|
||||
name: str
|
||||
description: str | None = None
|
||||
parent_group: "SecretGroup | None" = None
|
||||
children: list["SecretGroup"] = Field(default_factory=list)
|
||||
|
||||
242
packages/sshecret-admin/tests/test_password_context.py
Normal file
242
packages/sshecret-admin/tests/test_password_context.py
Normal file
@ -0,0 +1,242 @@
|
||||
"""Unit tests for the password context class.
|
||||
|
||||
We are primarily testing whether the actions of the PasswordContext matches
|
||||
those in the low level API.
|
||||
|
||||
"""
|
||||
|
||||
import random
|
||||
import string
|
||||
from pathlib import Path
|
||||
from typing import cast
|
||||
import pytest
|
||||
import pykeepass
|
||||
|
||||
from sshecret_admin.services.keepass import PasswordContext
|
||||
|
||||
|
||||
def random_string(length: int = 5) -> str:
|
||||
"""Generate random string."""
|
||||
chars = string.ascii_lowercase
|
||||
return "".join(random.choice(chars) for _ in range(length))
|
||||
|
||||
|
||||
def create_random_entries(
|
||||
password_database: pykeepass.PyKeePass,
|
||||
amount: int = 10,
|
||||
prefix: str = "secret",
|
||||
group: pykeepass.group.Group | None = None,
|
||||
) -> None:
|
||||
"""Create some random entries."""
|
||||
if not group:
|
||||
group = cast(pykeepass.group.Group, password_database.root_group)
|
||||
for n in range(amount):
|
||||
name = f"{prefix}-{n}"
|
||||
username = "NONE"
|
||||
password = random_string(12)
|
||||
password_database.add_entry(
|
||||
destination_group=group,
|
||||
title=name,
|
||||
username=username,
|
||||
password=password,
|
||||
)
|
||||
password_database.save()
|
||||
|
||||
|
||||
@pytest.fixture(name="password_database")
|
||||
def password_db_fixture(tmp_path: Path):
|
||||
"""Create a password database."""
|
||||
filename = tmp_path / "kpdb.kdbx"
|
||||
yield pykeepass.create_database(str(filename), password="test")
|
||||
|
||||
|
||||
def test_add_entry(password_database: pykeepass.PyKeePass) -> None:
|
||||
"""Test add entry."""
|
||||
context = PasswordContext(password_database)
|
||||
context.add_entry("testentry", "testsecret")
|
||||
|
||||
entry = password_database.find_entries(title="testentry", first=True)
|
||||
assert entry is not None
|
||||
assert isinstance(entry, pykeepass.entry.Entry)
|
||||
assert entry.password == "testsecret"
|
||||
|
||||
|
||||
def test_get_secret(password_database: pykeepass.PyKeePass) -> None:
|
||||
"""Test get secret."""
|
||||
|
||||
context = PasswordContext(password_database)
|
||||
context.add_entry("testentry", "testsecret")
|
||||
|
||||
secret = context.get_secret("testentry")
|
||||
assert secret is not None
|
||||
assert secret == "testsecret"
|
||||
|
||||
|
||||
def test_get_available_secrets(password_database: pykeepass.PyKeePass) -> None:
|
||||
"""Test get_available_secrets."""
|
||||
create_random_entries(password_database, 10)
|
||||
context = PasswordContext(password_database)
|
||||
available_secrets = context.get_available_secrets()
|
||||
assert len(available_secrets) == 10
|
||||
for n in range(10):
|
||||
assert f"secret-{n}" in available_secrets
|
||||
|
||||
|
||||
def test_delete_entry(password_database: pykeepass.PyKeePass) -> None:
|
||||
"""Test deletion of entry."""
|
||||
create_random_entries(password_database, 3)
|
||||
context = PasswordContext(password_database)
|
||||
available_secrets = context.get_available_secrets()
|
||||
assert len(available_secrets) == 3
|
||||
context.delete_entry("secret-2")
|
||||
entry = password_database.find_entries(title="secret-2", first=True)
|
||||
assert entry is None
|
||||
available_secrets = context.get_available_secrets()
|
||||
assert len(available_secrets) == 2
|
||||
|
||||
|
||||
def test_get_secret_groups(password_database: pykeepass.PyKeePass) -> None:
|
||||
"""Test get secret groups."""
|
||||
# We create a hierarchy of groups to test how that parses.
|
||||
root_group = password_database.root_group
|
||||
first_group = password_database.add_group(
|
||||
root_group, "level_one", notes="A group in the root"
|
||||
)
|
||||
password_database.add_group(
|
||||
first_group, "level_two", notes="A group one level down"
|
||||
)
|
||||
# Another group at the root, without a note
|
||||
password_database.add_group(root_group, "free_group")
|
||||
password_database.save()
|
||||
|
||||
context = PasswordContext(password_database)
|
||||
groups = context.get_secret_groups()
|
||||
assert len(groups) == 3
|
||||
|
||||
|
||||
def test_get_secret_groups_regex(password_database: pykeepass.PyKeePass) -> None:
|
||||
"""Get secret groups matching a regex."""
|
||||
# create some groups matching a pattern
|
||||
for n in range(4):
|
||||
password_database.add_group(password_database.root_group, f"foo-{n}")
|
||||
|
||||
for n in range(3):
|
||||
parent_group = password_database.find_groups(name="foo-1", first=True)
|
||||
password_database.add_group(parent_group, f"bar-{n}")
|
||||
|
||||
password_database.save()
|
||||
|
||||
context = PasswordContext(password_database)
|
||||
foo_groups = context.get_secret_groups("foo-.*")
|
||||
assert len(foo_groups) == 4
|
||||
bar_groups = context.get_secret_groups("bar-.*")
|
||||
assert len(bar_groups) == 3
|
||||
|
||||
|
||||
def test_add_group(password_database: pykeepass.PyKeePass) -> None:
|
||||
"""Test add_group."""
|
||||
context = PasswordContext(password_database)
|
||||
context.add_group("test_group", "Test Group")
|
||||
assert password_database.find_groups(name="test_group", first=True) is not None
|
||||
|
||||
# add a nested group below the first one
|
||||
context.add_group("nested_group", "Nested test group", "test_group")
|
||||
group = password_database.find_groups(name="nested_group", first=True)
|
||||
assert group is not None
|
||||
assert isinstance(group, pykeepass.group.Group)
|
||||
parent_group = group.parentgroup
|
||||
assert parent_group.name == "test_group"
|
||||
|
||||
|
||||
def test_set_group_description(password_database: pykeepass.PyKeePass) -> None:
|
||||
"""Test setting the group description."""
|
||||
context = PasswordContext(password_database)
|
||||
context.add_group("test_group", "Test Group")
|
||||
|
||||
kp_group = password_database.find_groups(name="test_group", first=True)
|
||||
assert isinstance(kp_group, pykeepass.group.Group)
|
||||
assert kp_group.notes == "Test Group"
|
||||
|
||||
context.set_group_description("test_group", "New Description")
|
||||
|
||||
kp_group = password_database.find_groups(name="test_group", first=True)
|
||||
assert isinstance(kp_group, pykeepass.group.Group)
|
||||
assert kp_group.notes == "New Description"
|
||||
|
||||
|
||||
def test_add_entry_with_group(password_database: pykeepass.PyKeePass) -> None:
|
||||
"""Test adding an entry with a group."""
|
||||
context = PasswordContext(password_database)
|
||||
context.add_group("test_group", "A test group")
|
||||
context.add_entry("test_entry", "test_secret", group_name="test_group")
|
||||
|
||||
entry = password_database.find_entries(title="test_entry", first=True)
|
||||
assert entry is not None
|
||||
|
||||
assert isinstance(entry, pykeepass.entry.Entry)
|
||||
assert entry.group.name == "test_group"
|
||||
|
||||
|
||||
def test_move_entry(password_database: pykeepass.PyKeePass) -> None:
|
||||
"""Test moving entries between groups."""
|
||||
context = PasswordContext(password_database)
|
||||
|
||||
context.add_group("test_group", "A test group")
|
||||
context.add_group("test_group_2", "Another test group")
|
||||
|
||||
context.add_entry("test_entry", "test_secret")
|
||||
entry = password_database.find_entries(title="test_entry", first=True)
|
||||
assert isinstance(entry, pykeepass.entry.Entry)
|
||||
assert entry.group.is_root_group is True
|
||||
|
||||
context.set_secret_group("test_entry", "test_group")
|
||||
|
||||
entry = password_database.find_entries(title="test_entry", first=True)
|
||||
assert isinstance(entry, pykeepass.entry.Entry)
|
||||
|
||||
assert entry.group.is_root_group is False
|
||||
assert entry.group.name == "test_group"
|
||||
|
||||
context.set_secret_group("test_entry", "test_group_2")
|
||||
|
||||
entry = password_database.find_entries(title="test_entry", first=True)
|
||||
assert isinstance(entry, pykeepass.entry.Entry)
|
||||
|
||||
assert entry.group.is_root_group is False
|
||||
assert entry.group.name == "test_group_2"
|
||||
|
||||
context.set_secret_group("test_entry", None)
|
||||
|
||||
entry = password_database.find_entries(title="test_entry", first=True)
|
||||
assert isinstance(entry, pykeepass.entry.Entry)
|
||||
assert entry.group.is_root_group is True
|
||||
|
||||
|
||||
def test_move_group(password_database: pykeepass.PyKeePass) -> None:
|
||||
"""Test moving a group."""
|
||||
context = PasswordContext(password_database)
|
||||
context.add_group("test_group", "A test group")
|
||||
context.add_group("parent_group", "A parent group")
|
||||
context.move_group("test_group", "parent_group")
|
||||
|
||||
kp_group = password_database.find_groups(name="test_group", first=True)
|
||||
assert isinstance(kp_group, pykeepass.group.Group)
|
||||
assert kp_group.parentgroup.name == "parent_group"
|
||||
|
||||
|
||||
def test_delete_group(password_database: pykeepass.PyKeePass) -> None:
|
||||
"""Test group deletion."""
|
||||
context = PasswordContext(password_database)
|
||||
context.add_group("test_group", "A test group")
|
||||
# Add some entries to this group.
|
||||
kp_group = password_database.find_groups(name="test_group", first=True)
|
||||
assert isinstance(kp_group, pykeepass.group.Group)
|
||||
create_random_entries(password_database, amount=10, group=kp_group)
|
||||
|
||||
context.delete_group("test_group")
|
||||
kp_group = password_database.find_groups(name="test_group", first=True)
|
||||
assert kp_group is None
|
||||
|
||||
# Check if the secrets are still there.
|
||||
secrets = context.get_available_secrets()
|
||||
assert len(secrets) == 10
|
||||
Reference in New Issue
Block a user