"""Tests of the admin interface.""" import os import allure from dataclasses import dataclass, field from httpx import Response import pytest from allure_commons.types import Severity from ..types import AdminServer from .base import BaseAdminTests from sshecret_admin.services.models import ClientSecretGroup, ClientSecretGroupList @allure.title("Admin API") class TestAdminAPI(BaseAdminTests): """Tests of the Admin REST API.""" @allure.title("Test health test endpoint") @allure.severity(Severity.TRIVIAL) @pytest.mark.asyncio async def test_health_check( self, admin_server: tuple[str, tuple[str, str]] ) -> None: """Test admin login.""" async with self.http_client(admin_server, False) as client: resp = await client.get("/health") assert resp.status_code == 200 @allure.title("Test login over API") @allure.severity(Severity.BLOCKER) @pytest.mark.asyncio async def test_admin_login(self, admin_server: AdminServer) -> None: """Test admin login.""" async with self.http_client(admin_server, False) as client: resp = await client.get("api/v1/clients/") assert resp.status_code == 401 async with self.http_client(admin_server, True) as client: resp = await client.get("api/v1/clients/") assert resp.status_code == 200 @allure.title("Admin API Client API") class TestAdminApiClients(BaseAdminTests): """Test client routes.""" @allure.title("Test creating a client") @allure.description("Ensure we can create a new client.") @pytest.mark.asyncio async def test_create_client(self, admin_server: AdminServer) -> None: """Test create_client.""" client = await self.create_client(admin_server, "testclient") assert client.id is not None assert client.name == "testclient" @allure.title("Test reading clients") @allure.description("Ensure we can retrieve a list of current clients.") @pytest.mark.asyncio async def test_get_clients(self, admin_server: AdminServer) -> None: """Test get_clients.""" client_names = ["test-db", "test-app", "test-www"] for name in client_names: await self.create_client(admin_server, name) async with self.http_client(admin_server) as http_client: resp = await http_client.get("api/v1/clients/") assert resp.status_code == 200 data = resp.json() assert isinstance(data, list) assert len(data) == 3 for entry in data: assert isinstance(entry, dict) client_name = entry.get("name") assert client_name in client_names @allure.title("Test client deletion") @allure.description("Ensure we can delete a client.") @pytest.mark.asyncio async def test_delete_client(self, admin_server: AdminServer) -> None: """Test delete_client.""" await self.create_client(admin_server, name="testclient") async with self.http_client(admin_server) as http_client: resp = await http_client.get("api/v1/clients/") assert resp.status_code == 200 data = resp.json() assert isinstance(data, list) assert len(data) == 1 assert data[0]["name"] == "testclient" resp = await http_client.delete("/api/v1/clients/testclient") assert resp.status_code == 200 resp = await http_client.get("api/v1/clients/") assert resp.status_code == 200 data = resp.json() assert isinstance(data, list) assert len(data) == 0 @allure.title("Test secret management") class TestAdminApiSecrets(BaseAdminTests): """Test secret management.""" @allure.title("Test adding a secret") @allure.description("Ensure that we can add a secret to a client.") @pytest.mark.asyncio async def test_add_secret(self, admin_server: AdminServer) -> None: """Test add_secret.""" await self.create_client(admin_server, name="testclient") async with self.http_client(admin_server) as http_client: data = { "name": "testsecret", "clients": ["testclient"], "value": "secretstring", } resp = await http_client.post("api/v1/secrets/", json=data) assert resp.status_code == 200 @allure.title("Test read a secret") @allure.description("Ensure that we can retrieve a secret we have stored.") @pytest.mark.asyncio async def test_get_secret(self, admin_server: AdminServer) -> None: """Test get_secret.""" await self.test_add_secret(admin_server) async with self.http_client(admin_server) as http_client: resp = await http_client.get("api/v1/secrets/testsecret") assert resp.status_code == 200 data = resp.json() assert isinstance(data, dict) assert data["name"] == "testsecret" assert data["secret"] == "secretstring" client_names = [cl["name"] for cl in data["clients"]] assert "testclient" in client_names @allure.title("Test adding a secret with automatic value") @allure.description( "Test that we can add a secret where we let the system come up with the value of a given length." ) @pytest.mark.asyncio async def test_add_secret_auto(self, admin_server: AdminServer) -> None: """Test adding a secret with an auto-generated value.""" await self.create_client(admin_server, name="testclient") async with self.http_client(admin_server) as http_client: data = { "name": "testsecret", "clients": ["testclient"], "value": {"auto_generate": True, "length": 17}, } resp = await http_client.post("api/v1/secrets/", json=data) assert resp.status_code == 200 resp = await http_client.get("api/v1/secrets/testsecret") assert resp.status_code == 200 data = resp.json() assert isinstance(data, dict) assert data["name"] == "testsecret" assert len(data["secret"]) == 17 assert "testclient" in [cl["name"] for cl in data["clients"]] @allure.title("Test adding a secret with client ID") @allure.description("Ensure that we can refer to clients with their IDs") @pytest.mark.asyncio async def test_add_secret_with_clientid(self, admin_server: AdminServer) -> None: """Test add secret with client ID as distinguisher.""" client = await self.create_client(admin_server, name="testclient") async with self.http_client(admin_server) as http_client: data = { "name": "testsecret", "clients": [str(client.id)], "value": "secret", "client_distinguisher": "id", } resp = await http_client.post("api/v1/secrets/", json=data) assert resp.status_code == 200 async with self.http_client(admin_server) as http_client: resp = await http_client.get("api/v1/secrets/testsecret") assert resp.status_code == 200 data = resp.json() client_names = [cl["name"] for cl in data["clients"]] assert "testclient" in client_names @allure.title("Test updating a secret") @allure.description("Test that we can update the value of a stored secret.") @pytest.mark.asyncio async def test_update_secret(self, admin_server: AdminServer) -> None: """Test updating secrets.""" await self.test_add_secret_auto(admin_server) async with self.http_client(admin_server) as http_client: resp = await http_client.put( "api/v1/secrets/testsecret", json={"value": "secret"}, ) assert resp.status_code == 200 resp = await http_client.get("api/v1/secrets/testsecret") assert resp.status_code == 200 data = resp.json() assert data["secret"] == "secret" resp = await http_client.put( "api/v1/secrets/testsecret", json={"value": {"auto_generate": True, "length": 16}}, ) assert resp.status_code == 200 resp = await http_client.get("api/v1/secrets/testsecret") assert resp.status_code == 200 data = resp.json() assert len(data["secret"]) == 16 @dataclass(kw_only=True) class GroupHier: """Group hierarchy for testing.""" name: str secrets: list[str] path: list[str] = field(default_factory=list) class TestSecretGroupApi(BaseAdminTests): """Test secret group api.""" async def add_group( self, admin_server: AdminServer, group_name: str, parent: str | None = None, description: str | None = None, ) -> None: """Add a group.""" path = "api/v1/secrets/groups/" async with self.http_client(admin_server) as http_client: data = {"name": group_name, "parent_group": parent} if description: data["description"] = description resp = await http_client.post(path, json=data) assert resp.status_code == 200 async def get_group(self, admin_server: AdminServer, groups: list[str]) -> Response: """Get group.""" group_name = "/".join(groups) path = f"api/v1/secrets/groups/{group_name}/" async with self.http_client(admin_server) as http_client: resp = await http_client.get(path) return resp async def get_groups(self, admin_server: AdminServer) -> Response: """Get groups.""" path = "api/v1/secrets/groups/" async with self.http_client(admin_server) as http_client: resp = await http_client.get(path) return resp async def add_secret( self, admin_server: AdminServer, secret_name: str, group: str | None = None ) -> Response: """Add a secret.""" async with self.http_client(admin_server) as http_client: data = { "name": secret_name, "value": "secretstring", } if group: data["group"] = group resp = await http_client.post("api/v1/secrets/", json=data) return resp async def add_secret_to_group( self, admin_server: AdminServer, secret_name: str, groups: list[str] | None ) -> Response: """Add a secret to a group. Secret should be created in advance. """ path = f"api/v1/secrets/set-group" if not groups: groups = [] groups.insert(0, "") group_path = "/".join(groups) async with self.http_client(admin_server) as http_client: resp = await http_client.post( path, json={"secret_name": secret_name, "group_path": group_path} ) return resp async def delete_secret_group( self, admin_server: AdminServer, group_path: str ) -> Response: """Delete secret group.""" if group_path.startswith("/"): group_path = group_path[1:] path = os.path.join(f"/api/v1/secrets/groups", group_path) + "/" async with self.http_client(admin_server) as http_client: resp = await http_client.delete(path) return resp async def move_secret_group( self, admin_server: AdminServer, group_path: str, new_path: str ) -> Response: """Move a secret group.""" if group_path.startswith("/"): group_path = group_path[1:] path = f"/api/v1/secrets/move-group/{group_path}" async with self.http_client(admin_server) as http_client: resp = await http_client.post(path, json={"path": new_path}) return resp async def update_secret_group( self, admin_server: AdminServer, name: str, group_path: str, *, description: str | None = None, parent: str | None = None, ) -> Response: """Update secret group.""" if group_path.startswith("/"): group_path = group_path[1:] data = { "name": name, "description": description, "parent_group": parent, } path = f"/api/v1/secrets/groups/{group_path}/" async with self.http_client(admin_server) as http_client: resp = await http_client.put(path, json=data) return resp @pytest.mark.parametrize("group_name", ["test", "test with spaces", "blåbærgrød"]) @pytest.mark.asyncio async def test_add_group(self, admin_server: AdminServer, group_name: str) -> None: """Test adding a group, then getting it.""" await self.add_group(admin_server, group_name) response = await self.get_group(admin_server, [group_name]) assert response.status_code == 200 # We might as well try to deserialize the group. group = ClientSecretGroup.model_validate(response.json()) assert group.group_name == group_name @pytest.mark.parametrize("parent,child", [("parent", "child")]) @pytest.mark.asyncio async def test_add_nested_group( self, admin_server: AdminServer, parent: str, child: str ) -> None: """Test adding a group with a parent group.""" await self.add_group(admin_server, parent) await self.add_group(admin_server, child, parent) response = await self.get_group(admin_server, [parent]) assert response.status_code == 200 parent_group = ClientSecretGroup.model_validate(response.json()) assert parent_group.group_name == parent assert len(parent_group.children) == 1 assert parent_group.children[0].group_name == child response = await self.get_group(admin_server, [parent, child]) assert response.status_code == 200 child_group = ClientSecretGroup.model_validate(response.json()) assert child_group.group_name == child assert child_group.parent_group is not None assert child_group.parent_group.group_name == parent assert child_group.path == "/parent/child" @pytest.mark.parametrize( "secret_name,group_name,parent_name", [("test", "group", None), ("test", "child", "parent")], ) @pytest.mark.asyncio async def test_add_secret_to_group( self, admin_server: AdminServer, secret_name: str, group_name: str, parent_name: str | None, ) -> None: """Test adding a secret to a group.""" resp = await self.add_secret(admin_server, secret_name) assert resp.status_code == 200 groups = [group_name] if parent_name: await self.add_group(admin_server, parent_name) groups = [parent_name, group_name] await self.add_group(admin_server, group_name, parent_name) resp = await self.add_secret_to_group(admin_server, secret_name, groups) assert resp.status_code == 200 resp = await self.get_group(admin_server, groups) assert resp.status_code == 200 group = ClientSecretGroup.model_validate(resp.json()) assert len(group.entries) == 1 assert group.entries[0].name == secret_name @pytest.mark.parametrize("groups", [["group1", "group2", "group3"]]) @pytest.mark.asyncio async def test_get_group_flat( self, admin_server: AdminServer, groups: list[str] ) -> None: """Test getting a list of groups with no recursion.""" for group in groups: await self.add_group(admin_server, group) response = await self.get_groups(admin_server) assert response.status_code == 200 group_list = ClientSecretGroupList.model_validate(response.json()) assert len(group_list.groups) == len(groups) @pytest.mark.asyncio async def test_get_group_tree(self, admin_server: AdminServer) -> None: """Test getting a list of groups where recursion exists.""" await self.add_group(admin_server, "root") await self.add_group(admin_server, "level1", "root") await self.add_group(admin_server, "level2", "level1") await self.add_secret(admin_server, "secret1") await self.add_secret(admin_server, "secret2", "/root/level1") await self.add_secret(admin_server, "secret3", "/root/level1") response = await self.get_groups(admin_server) assert response.status_code == 200 group_list = ClientSecretGroupList.model_validate(response.json()) # we expect this to be a tree now assert len(group_list.ungrouped) == 1 assert len(group_list.groups) == 1 assert group_list.groups[0].group_name == "root" assert len(group_list.groups[0].children) == 1 assert len(group_list.groups[0].children[0].children) == 1 @pytest.mark.asyncio async def test_move_secret_to_root(self, admin_server: AdminServer) -> None: """Test moving a secret to the root.""" await self.add_group(admin_server, "secretgroup") await self.add_secret(admin_server, "secret1", "/secretgroup") response = await self.get_groups(admin_server) assert response.status_code == 200 group_list = ClientSecretGroupList.model_validate(response.json()) assert len(group_list.ungrouped) == 0 await self.add_secret_to_group(admin_server, "secret1", None) response = await self.get_groups(admin_server) assert response.status_code == 200 group_list = ClientSecretGroupList.model_validate(response.json()) assert len(group_list.ungrouped) == 1 @pytest.mark.asyncio async def test_delete_secret_group(self, admin_server: AdminServer) -> None: """Test deleting a secret group.""" await self.add_group(admin_server, "secretgroup") await self.add_group(admin_server, "othergroup") await self.add_secret(admin_server, "secret1", "/secretgroup") await self.add_secret(admin_server, "secret2", "/secretgroup") await self.add_secret(admin_server, "secret3", "/secretgroup") response = await self.get_groups(admin_server) assert response.status_code == 200 group_list = ClientSecretGroupList.model_validate(response.json()) assert len(group_list.groups) == 2 response = await self.delete_secret_group(admin_server, "/secretgroup") assert response.status_code == 200 response = await self.get_groups(admin_server) assert response.status_code == 200 group_list = ClientSecretGroupList.model_validate(response.json()) assert len(group_list.groups) == 1 assert len(group_list.ungrouped) == 3 @pytest.mark.asyncio async def test_nest_group(self, admin_server: AdminServer) -> None: """Test moving a group below another group.""" await self.add_group(admin_server, "secretgroup") await self.add_group(admin_server, "othergroup") await self.add_group(admin_server, "nested", "/othergroup") await self.add_secret(admin_server, "testsecret", "/secretgroup") response = await self.get_groups(admin_server) assert response.status_code == 200 group_list = ClientSecretGroupList.model_validate(response.json()) assert len(group_list.groups) == 2 response = await self.move_secret_group( admin_server, "/secretgroup", "/othergroup/nested" ) assert response.status_code == 200 response = await self.get_groups(admin_server) assert response.status_code == 200 group_list = ClientSecretGroupList.model_validate(response.json()) assert len(group_list.groups) == 1 assert group_list.groups[0].group_name == "othergroup" assert len(group_list.groups[0].children) == 1 assert len(group_list.groups[0].children[0].children) == 1 assert group_list.groups[0].children[0].children[0].group_name == "secretgroup" assert len(group_list.groups[0].children[0].children[0].entries) == 1 @pytest.mark.asyncio async def test_add_nested_group_by_path(self, admin_server: AdminServer) -> None: """Test adding a group directly by path""" await self.add_group(admin_server, "/secretgroup") await self.add_group(admin_server, "/secretgroup/othergroup") await self.add_group(admin_server, "/secretgroup/othergroup/nestedgroup") response = await self.get_groups(admin_server) assert response.status_code == 200 group_list = ClientSecretGroupList.model_validate(response.json()) assert len(group_list.groups) == 1 assert len(group_list.groups[0].children) == 1 assert len(group_list.groups[0].children[0].children) == 1 @pytest.mark.asyncio async def test_unnest_group(self, admin_server: AdminServer) -> None: """Test moving a deeply nested group back to the root.""" await self.add_group(admin_server, "/secretgroup") await self.add_group(admin_server, "/secretgroup/othergroup") await self.add_group(admin_server, "/secretgroup/othergroup/nestedgroup") await self.add_secret( admin_server, "secret1", "/secretgroup/othergroup/nestedgroup" ) await self.add_secret( admin_server, "secret2", "/secretgroup/othergroup/nestedgroup" ) response = await self.get_groups(admin_server) assert response.status_code == 200 group_list = ClientSecretGroupList.model_validate(response.json()) assert len(group_list.groups) == 1 move_resp = await self.move_secret_group( admin_server, "/secretgroup/othergroup/nestedgroup", "/" ) assert move_resp.status_code == 200 response = await self.get_groups(admin_server) assert response.status_code == 200 group_list = ClientSecretGroupList.model_validate(response.json()) assert len(group_list.groups) == 2 target_group = next( filter(lambda x: x.group_name == "nestedgroup", group_list.groups) ) assert len(target_group.entries) == 2 assert target_group.path == "/nestedgroup" @pytest.mark.parametrize( "group_name,description,parent", [ (("test", "test"), ("before", "after"), (None, None)), (("test", "newname"), ("descr", "descr"), ("parent", "parent")), (("test", "test"), ("descr", "descr"), ("oldparent", "newparent")), (("test", "test"), ("descr", "descr"), ("oldparent", None)), (("oldname", "newname"), ("before", "after"), ("oldparent", "newparent")), ], ) @pytest.mark.asyncio async def test_group_update( self, admin_server: AdminServer, group_name: tuple[str, str], description: tuple[str, str], parent: tuple[str | None, str | None], ) -> None: """Test updating a group""" name_b, name_a = group_name descr_b, descr_a = description parent_b, parent_a = parent if parent_b: await self.add_group(admin_server, parent_b, None) if parent_a and parent_a != parent_b: await self.add_group(admin_server, parent_a, None) elif not parent_a: parent_a = "/" await self.add_group(admin_server, name_b, parent_b, descr_b) group_path = name_b if parent_b: group_path = f"{parent_b}/{name_b}" resp = await self.update_secret_group( admin_server, name_a, group_path, description=descr_a, parent=parent_a ) assert resp.status_code == 200 group = ClientSecretGroup.model_validate(resp.json()) assert group.group_name == name_a assert group.description == descr_a if parent_a and parent_a != "/": assert group.parent_group is not None assert group.parent_group.group_name == parent_a else: assert group.parent_group is None