"""Tests of the admin interface.""" from collections.abc import AsyncIterator from contextlib import asynccontextmanager import pytest import httpx from sshecret.backend import Client from sshecret.crypto import generate_private_key, generate_public_key_string from .types import AdminServer def make_test_key() -> str: """Generate a test key.""" private_key = generate_private_key() return generate_public_key_string(private_key.public_key()) class BaseAdminTests: """Base admin test class.""" @asynccontextmanager async def http_client( self, admin_server: AdminServer, authenticate: bool = True ) -> AsyncIterator[httpx.AsyncClient]: """Run a client towards the admin rest api.""" admin_url, credentials = admin_server username, password = credentials headers: dict[str, str] | None = None if authenticate: async with httpx.AsyncClient(base_url=admin_url) as client: response = await client.post( "api/v1/token", data={"username": username, "password": password} ) assert response.status_code == 200 data = response.json() assert "access_token" in data token = data["access_token"] headers = {"Authorization": f"Bearer {token}"} async with httpx.AsyncClient(base_url=admin_url, headers=headers) as client: yield client async def create_client( self, admin_server: AdminServer, name: str, public_key: str | None = None, ) -> Client: """Create a client.""" if not public_key: public_key = make_test_key() new_client = { "name": name, "public_key": public_key, "sources": ["192.0.2.0/24"], } async with self.http_client(admin_server, True) as http_client: response = await http_client.post("api/v1/clients/", json=new_client) assert response.status_code == 200 data = response.json() client = Client.model_validate(data) return client class TestAdminAPI(BaseAdminTests): """Tests of the Admin REST API.""" @pytest.mark.asyncio async def test_health_check( self, admin_server: tuple[str, tuple[str, str]] ) -> None: """Test admin login.""" async with self.http_client(admin_server, False) as client: resp = await client.get("/health") assert resp.status_code == 200 @pytest.mark.asyncio async def test_admin_login(self, admin_server: AdminServer) -> None: """Test admin login.""" async with self.http_client(admin_server, False) as client: resp = await client.get("api/v1/clients/") assert resp.status_code == 401 async with self.http_client(admin_server, True) as client: resp = await client.get("api/v1/clients/") assert resp.status_code == 200 class TestAdminApiClients(BaseAdminTests): """Test client routes.""" @pytest.mark.asyncio async def test_create_client(self, admin_server: AdminServer) -> None: """Test create_client.""" client = await self.create_client(admin_server, "testclient") assert client.id is not None assert client.name == "testclient" @pytest.mark.asyncio async def test_get_clients(self, admin_server: AdminServer) -> None: """Test get_clients.""" client_names = ["test-db", "test-app", "test-www"] for name in client_names: await self.create_client(admin_server, name) async with self.http_client(admin_server) as http_client: resp = await http_client.get("api/v1/clients/") assert resp.status_code == 200 data = resp.json() assert isinstance(data, list) assert len(data) == 3 for entry in data: assert isinstance(entry, dict) client_name = entry.get("name") assert client_name in client_names @pytest.mark.asyncio async def test_delete_client(self, admin_server: AdminServer) -> None: """Test delete_client.""" await self.create_client(admin_server, name="testclient") async with self.http_client(admin_server) as http_client: resp = await http_client.get("api/v1/clients/") assert resp.status_code == 200 data = resp.json() assert isinstance(data, list) assert len(data) == 1 assert data[0]["name"] == "testclient" resp = await http_client.delete("/api/v1/clients/testclient") assert resp.status_code == 200 resp = await http_client.get("api/v1/clients/") assert resp.status_code == 200 data = resp.json() assert isinstance(data, list) assert len(data) == 0 class TestAdminApiSecrets(BaseAdminTests): """Test secret management.""" @pytest.mark.asyncio async def test_add_secret(self, admin_server: AdminServer) -> None: """Test add_secret.""" await self.create_client(admin_server, name="testclient") async with self.http_client(admin_server) as http_client: data = { "name": "testsecret", "clients": ["testclient"], "value": "secretstring", } resp = await http_client.post("api/v1/secrets/", json=data) assert resp.status_code == 200 @pytest.mark.asyncio async def test_get_secret(self, admin_server: AdminServer) -> None: """Test get_secret.""" await self.test_add_secret(admin_server) async with self.http_client(admin_server) as http_client: resp = await http_client.get("api/v1/secrets/testsecret") assert resp.status_code == 200 data = resp.json() assert isinstance(data, dict) assert data["name"] == "testsecret" assert data["secret"] == "secretstring" assert "testclient" in data["clients"] @pytest.mark.asyncio async def test_add_secret_auto(self, admin_server: AdminServer) -> None: """Test adding a secret with an auto-generated value.""" await self.create_client(admin_server, name="testclient") async with self.http_client(admin_server) as http_client: data = { "name": "testsecret", "clients": ["testclient"], "value": {"auto_generate": True, "length": 17}, } resp = await http_client.post("api/v1/secrets/", json=data) assert resp.status_code == 200 resp = await http_client.get("api/v1/secrets/testsecret") assert resp.status_code == 200 data = resp.json() assert isinstance(data, dict) assert data["name"] == "testsecret" assert len(data["secret"]) == 17 assert "testclient" in data["clients"] @pytest.mark.asyncio async def test_update_secret(self, admin_server: AdminServer) -> None: """Test updating secrets.""" await self.test_add_secret_auto(admin_server) async with self.http_client(admin_server) as http_client: resp = await http_client.put( "api/v1/secrets/testsecret", json={"value": "secret"}, ) assert resp.status_code == 200 resp = await http_client.get("api/v1/secrets/testsecret") assert resp.status_code == 200 data = resp.json() assert data["secret"] == "secret" resp = await http_client.put( "api/v1/secrets/testsecret", json={"value": {"auto_generate": True, "length": 16}}, ) assert resp.status_code == 200 resp = await http_client.get("api/v1/secrets/testsecret") assert resp.status_code == 200 data = resp.json() assert len(data["secret"]) == 16