"""Test secret manager. This package tests the rewritten secret manager system. This is technically an integration test, as it requires the other subsystems to run, but it uses the internal API rather than the exposed routes. """ import json import allure import pytest import pytest_asyncio from sqlalchemy import create_engine from sshecret_admin.core.settings import AdminServerSettings from sshecret_admin.services.models import SecretGroup from sshecret_admin.services.secret_manager import ( password_manager_context, AsyncSecretContext, InvalidSecretNameError, InvalidGroupNameError, ) from sshecret_admin.auth.models import Base from sshecret_admin.services.secret_manager import setup_private_key @pytest_asyncio.fixture(autouse=True) async def create_admin_db(admin_server_settings: AdminServerSettings) -> None: """Create the database.""" engine = create_engine(admin_server_settings.admin_db) Base.metadata.create_all(engine) setup_private_key(settings=admin_server_settings, regenerate=True) @pytest_asyncio.fixture() async def secrets_manager(admin_server_settings: AdminServerSettings): """Test that the context manager can be created.""" async with password_manager_context( admin_server_settings, "TEST", "127.0.0.1" ) as manager: yield manager # -------- Tests start here -------- # @allure.title("Adding entries") @pytest.mark.parametrize("name,secret", [("testentry", "testsecret")]) class TestSecretsAddEntry: """Tests for the add_entry method.""" @pytest.mark.asyncio async def test_add_entry( self, secrets_manager: AsyncSecretContext, name: str, secret: str, ) -> None: """Test add entry. This tests add_entry and get_secret """ await secrets_manager.add_entry(name, secret) stored_secret = await secrets_manager.get_secret(name) assert stored_secret == secret async def test_add_entry_duplicate( self, secrets_manager: AsyncSecretContext, name: str, secret: str, ) -> None: """Test adding an entry twice.""" await secrets_manager.add_entry(name, secret) stored_secret = await secrets_manager.get_secret(name) assert stored_secret == secret with pytest.raises(InvalidSecretNameError): await secrets_manager.add_entry(name, secret) async def test_add_entry_with_group( self, secrets_manager: AsyncSecretContext, name: str, secret: str, ) -> None: """Test adding a secret with a group.""" group = "testgroup" await secrets_manager.add_group(group) await secrets_manager.add_entry(name, secret, group_path=group) result = await secrets_manager.get_entry_group(name) assert result == group async def test_add_entry_with_nonexisting_group( self, secrets_manager: AsyncSecretContext, name: str, secret: str, ) -> None: """Test adding a secret where the group does not exist.""" group = "testgroup" with pytest.raises(InvalidGroupNameError): await secrets_manager.add_entry(name, secret, group_path=group) async def test_add_entry_with_deep_path( self, secrets_manager: AsyncSecretContext, name: str, secret: str, ) -> None: """Test adding a secret to a nested group with a path specification.""" await secrets_manager.add_group("root") await secrets_manager.add_group("nested", parent_group="root") await secrets_manager.add_entry(name, secret, group_path="/root/nested") group = await secrets_manager.get_secret_group("/root/nested") assert group is not None assert name in group.entries async def test_overwrite_secret( self, secrets_manager: AsyncSecretContext, name: str, secret: str ) -> None: """Test overwriting a secret.""" await secrets_manager.add_entry(name, secret) stored_secret = await secrets_manager.get_secret(name) assert stored_secret == secret new_secret = "newsecret" await secrets_manager.add_entry(name, new_secret, overwrite=True) stored_secret = await secrets_manager.get_secret(name) assert stored_secret == new_secret @allure.title("Creating groups") class TestSecretGroupCreation: """Test secret groups.""" @pytest.mark.parametrize( "group_name", ["testgroup", "long group name with spaces", "blåbærgrød"] ) @allure.title("Add a group name {group_name}") @pytest.mark.asyncio async def test_add_group( self, secrets_manager: AsyncSecretContext, group_name: str, ) -> None: """Get adding a group.""" await secrets_manager.add_group(group_name) groups = await secrets_manager.get_secret_groups() assert len(groups) == 1 assert groups[0].name == group_name @pytest.mark.asyncio async def test_add_group_with_parent( self, secrets_manager: AsyncSecretContext ) -> None: """Test add a group with a parent group.""" parent_name = "parent" child_name = "child" await secrets_manager.add_group(parent_name) await secrets_manager.add_group(child_name, parent_group=parent_name) parent_group = await secrets_manager.get_secret_group(f"/parent") assert parent_group is not None assert len(parent_group.children) == 1 assert parent_group.children[0].name == child_name child_group = await secrets_manager.get_secret_group("/parent/child") assert child_group is not None assert child_group.name == child_name assert child_group.parent_group is not None assert child_group.parent_group.name == parent_name assert len(child_group.children) == 0 @pytest.mark.asyncio async def test_add_group_as_path(self, secrets_manager: AsyncSecretContext) -> None: """Add a nested group with path annotation.""" parent_name = "parent" child_path = "/parent/child" await secrets_manager.add_group(parent_name) await secrets_manager.add_group(child_path) parent_group = await secrets_manager.get_secret_group(f"/parent") assert parent_group is not None assert len(parent_group.children) == 1 assert parent_group.children[0].name == "child" child_group = await secrets_manager.get_secret_group(child_path) assert child_group is not None @pytest.mark.asyncio async def test_overlapping_names(self, secrets_manager: AsyncSecretContext) -> None: """Test having overlapping names in different groups.""" await secrets_manager.add_group("root") with pytest.raises(InvalidGroupNameError): await secrets_manager.add_group("/root") await secrets_manager.add_group("/root/root") group = await secrets_manager.get_secret_group("/root/root") assert group is not None assert group.name == "root" @pytest.mark.asyncio async def test_add_group_with_nonexisting_parent( self, secrets_manager: AsyncSecretContext ) -> None: """Test adding a group with a nonexisting parent.""" with pytest.raises(InvalidGroupNameError): await secrets_manager.add_group("orphan", parent_group="unknown") @pytest.mark.asyncio async def test_add_duplicate_group( self, secrets_manager: AsyncSecretContext ) -> None: """Test adding the same group twice.""" await secrets_manager.add_group("snowflake") with pytest.raises(InvalidGroupNameError): await secrets_manager.add_group("snowflake") @pytest.mark.parametrize( "group_name,description", [("testgroup", "test description")] ) @allure.title("Add a group name {group_name} with description {description}") @pytest.mark.asyncio async def test_add_group_with_description( self, secrets_manager: AsyncSecretContext, group_name: str, description: str, ) -> None: """Test adding a group with description.""" await secrets_manager.add_group(group_name, description) result = await secrets_manager.get_secret_group(group_name) assert result is not None assert result.description == description @pytest.mark.parametrize( "groups", [ [ ("root", None, "root"), ("level1", "root", "/root/level1"), ("level2", "level1", "/root/level1/level2"), ], [("flat1", None, "flat1"), ("flat2", None, "flat2")], [ ("stub", None, "stub"), ("root", None, "root"), ("nested", "root", "/root/nested"), ], ], ) @allure.title("Listing groups") class TestSecretGroupListing: """Tests for listing groups.""" @pytest_asyncio.fixture(autouse=True) async def create_groups( self, secrets_manager: AsyncSecretContext, groups: list[tuple[str, str | None, str]], ) -> None: """Pre-create groups.""" for name, parent_group, _path in groups: await secrets_manager.add_group(name, parent_group=parent_group) @pytest.mark.asyncio async def test_get_secret_groups_list( self, secrets_manager: AsyncSecretContext, groups: list[tuple[str, str | None, str]], ) -> None: """Test the flat get_secret_groups_list.""" # Create three levels of content group_list = await secrets_manager.get_secret_group_list() assert len(group_list) == len(groups) @pytest.mark.asyncio async def test_get_secret_groups( self, secrets_manager: AsyncSecretContext, groups: list[tuple[str, str | None, str]], ) -> None: """Test the tree-oriented get_secretsgroups.""" group_map = dict([(group[0], group[1]) for group in sorted(groups)]) group_tree = await secrets_manager.get_secret_groups() root_groups = [key for key, value in group_map.items() if value is None] assert len(group_tree) == len(root_groups) reconstructed_groups: list[tuple[str, str | None]] = [] def crawl_tree(item: SecretGroup) -> list[tuple[str, str | None]]: """Crawl a tree recursively.""" parent_group_name = None if item.parent_group: parent_group_name = item.parent_group.name items: list[tuple[str, str | None]] = [(item.name, parent_group_name)] for child in item.children: items.extend(crawl_tree(child)) return items for item in group_tree: reconstructed_groups.extend(crawl_tree(item)) assert dict(sorted(reconstructed_groups)) == group_map @pytest.mark.asyncio async def test_get_secret_groups_with_secrets( self, secrets_manager: AsyncSecretContext, groups: list[tuple[str, str | None, str]], ) -> None: """Test fetching groups where there are secrets in all groups.""" # We will create exactly two secrets in each group. for group_name, _parent, path in groups: await secrets_manager.add_entry( f"{group_name}_1", f"{group_name}_secret_1", group_path=path ) await secrets_manager.add_entry( f"{group_name}_2", f"{group_name}_secret_2", group_path=path ) group_list = await secrets_manager.get_secret_group_list() for group in group_list: assert len(group.entries) == 2 assert group.entries[0] == f"{group.name}_1" assert group.entries[1] == f"{group.name}_2" @pytest.mark.parametrize( "query,expected,groups,children", [ ("MATCH", 1, [("MATCH", None), ("SOMETHINGELSE", None)], 0), ("MATCH", 1, [("root", None), ("MATCH", "root"), ("SOMETHINGELSE", "root")], 0), ("MATCH", 3, [("MATCH1", None), ("MATCH2", None), ("MATCH3", None)], 0), ("MATCH", 1, [("root", None), ("MATCH", "root"), ("CHILD", "MATCH")], 1), ( "NOMATCH", 0, [("foo", None), ("bar", None), ("foobar", "foo"), ("barfoo", "bar")], 0, ), ], ) @allure.title("Searching in groups using patterns") class TestGroupSearchPattern: @pytest.mark.asyncio async def test_group_list_pattern( self, secrets_manager: AsyncSecretContext, query: str, expected: int, groups: list[tuple[str, str | None]], children: int, ) -> None: """Test matching a pattern.""" for name, parent_group in groups: await secrets_manager.add_group(name, parent_group=parent_group) result = await secrets_manager.get_secret_group_list(pattern=query, regex=False) assert len(result) == expected @pytest.mark.asyncio async def test_group_tree_pattern( self, secrets_manager: AsyncSecretContext, query: str, expected: int, groups: list[tuple[str, str | None]], children: int, ) -> None: """Test matching a pattern with a tree result.""" for name, parent_group in groups: await secrets_manager.add_group(name, parent_group=parent_group) result = await secrets_manager.get_secret_groups(pattern=query, regex=False) assert len(result) == expected if expected == 1 and children > 0: assert len(result[0].children) == children @allure.title("Modifying groups") class TestGroupModification: """Test modifying groups.""" @pytest.mark.parametrize( "group_name,parent,description", [("test", None, "test description"), ("test", "root", "test_description")], ) @pytest.mark.asyncio async def test_set_group_description( self, secrets_manager: AsyncSecretContext, group_name: str, parent: str | None, description: str, ) -> None: """Test setting a description on a group.""" if parent: await secrets_manager.add_group(parent) await secrets_manager.add_group(group_name, parent_group=parent) path = group_name if parent: path = f"/{parent}/{group_name}" group = await secrets_manager.get_secret_group(path) assert group is not None assert group.description is None await secrets_manager.set_group_description(path, description) group = await secrets_manager.get_secret_group(path) assert group is not None assert group.description == description @pytest.mark.parametrize( "groups,target_group, expected_path", [ ( [("root", None), ("test", None)], ("test", "root"), "/root/test", ), ( [("root", None), ("test", "root")], ("/root/test", None), "test", ), ([("test", None)], ("test", None), "test"), ], ) @pytest.mark.asyncio async def test_move_group( self, secrets_manager: AsyncSecretContext, groups: list[tuple[str, str | None]], target_group: tuple[str, str | None], expected_path: str, ) -> None: """Test moving groups around.""" for group_name, parent_name in groups: await secrets_manager.add_group(group_name, parent_group=parent_name) group_name, target = target_group await secrets_manager.move_group(group_name, target) group = await secrets_manager.get_secret_group(expected_path) assert group is not None @allure.title("Deleting items") class TestSecretManagerDeletions: """Test secret manager deletions.""" @pytest_asyncio.fixture(autouse=True) async def create_test_data(self, secrets_manager: AsyncSecretContext) -> None: """Create some test data.""" groups = [ ("root", None, "root"), ("level1", "root", "/root/level1"), ("level2", "level1", "/root/level1/level2"), ] for n in range(2): await secrets_manager.add_entry(f"ungrouped_{n}", "secret") for group_name, parent_name, path in groups: await secrets_manager.add_group(group_name, parent_group=parent_name) for n in range(2): await secrets_manager.add_entry( f"{group_name}_{n}", "secret", group_path=path ) @pytest.mark.parametrize( "name,group_name", [("root_1", "root"), ("level1_0", "/root/level1"), ("ungrouped_1", None)], ) @allure.title("Delete secret {name in group {group_name}}") @pytest.mark.asyncio async def test_secret_deletion( self, secrets_manager: AsyncSecretContext, name: str, group_name: str | None ) -> None: """Test secret deletion.""" if group_name: group = await secrets_manager.get_secret_group(group_name) assert group is not None assert name in group.entries secret = await secrets_manager.get_secret(name) assert secret is not None await secrets_manager.delete_entry(name) secret = await secrets_manager.get_secret(name) assert secret is None if group_name: group = await secrets_manager.get_secret_group(group_name) assert group is not None assert name not in group.entries @pytest.mark.parametrize("name", ["NONEXISTING"]) @allure.title("Deleting non-existing entry {name}") @pytest.mark.asyncio async def test_nonexisting_entry( self, secrets_manager: AsyncSecretContext, name: str ) -> None: """Test deleting something that doesn't exist.""" secret = await secrets_manager.get_secret(name) assert secret is None # Deleting something that is already deleted returns None await secrets_manager.delete_entry(name) @pytest.mark.parametrize("path", ["/root/level1"]) @allure.title("Deleting group {path}") async def test_group_delete( self, secrets_manager: AsyncSecretContext, path: str ) -> None: """Test deleting a group.""" group = await secrets_manager.get_secret_group(path) assert group is not None entries = list(group.entries) await secrets_manager.delete_group(path) group = await secrets_manager.get_secret_group(path) assert group is None for name in entries: new_grouping = await secrets_manager.get_entry_group(name) assert new_grouping is None @allure.title("Other tests") class TestSecretManagerOther: """Uncategorized tests to standardize module.""" @pytest.mark.asyncio async def test_get_secret_nonexisting( self, secrets_manager: AsyncSecretContext ) -> None: """Test get_secret with invalid name.""" result = await secrets_manager.get_secret("NOMATCH") assert result is None @pytest.mark.parametrize( "group_name,num_grouped,num_ungrouped", [("GROUP", 3, 3), ("GROUP", 3, 0), ("GROUP", 0, 0)], ) @pytest.mark.asyncio async def test_get_ungrouped_secrets( self, secrets_manager: AsyncSecretContext, group_name: str, num_grouped: int, num_ungrouped: int, ) -> None: """Test get_ungrouped_secrets.""" await secrets_manager.add_group(group_name) for n in range(num_ungrouped): await secrets_manager.add_entry(f"ungrouped_{n}", "secret") for n in range(num_grouped): await secrets_manager.add_entry( f"grouped_{n}", "secret", group_path="GROUP" ) ungrouped = await secrets_manager.get_ungrouped_secrets() assert len(ungrouped) == num_ungrouped matching = [entry for entry in ungrouped if entry.startswith("ungrouped_")] assert len(matching) == num_ungrouped grouped_secrets = await secrets_manager.get_available_secrets( group_path=group_name ) assert len(grouped_secrets) == num_grouped all_secrets = await secrets_manager.get_available_secrets() assert len(all_secrets) == (num_ungrouped + num_grouped) @pytest.mark.parametrize( "entries", [[("test1", "secret1"), ("test2", "secret2")], []] ) @pytest.mark.asyncio async def test_get_available_secrets( self, secrets_manager: AsyncSecretContext, entries: list[tuple[str, str]] ) -> None: """Test the get_available_secrets method.""" for name, secret in entries: await secrets_manager.add_entry(name, secret) entry_names = [entry[0] for entry in entries] response = await secrets_manager.get_available_secrets() assert len(response) == len(entries) assert sorted(response) == sorted(entry_names) async def test_get_secret_groups_none( self, secrets_manager: AsyncSecretContext ) -> None: """Test get_secret_groups with no groups created.""" result = await secrets_manager.get_secret_groups() assert len(result) == 0 result_flat = await secrets_manager.get_secret_group_list() assert len(result_flat) == 0 @allure.title("Search for a group using regular expression") async def test_group_regex_search( self, secrets_manager: AsyncSecretContext ) -> None: """Search for entries with regular expressions.""" groups = [ "test1", "test2", "other", "somethingelse", ] for group in groups: await secrets_manager.add_group(group) results = await secrets_manager.get_secret_group_list( pattern="^test", regex=True ) assert len(results) == 2 for group in results: assert group.name.startswith("test")