from collections.abc import AsyncIterator from contextlib import asynccontextmanager import httpx from sshecret.backend import Client from sshecret.crypto import generate_private_key, generate_public_key_string from ..types import AdminServer def make_test_key() -> str: """Generate a test key.""" private_key = generate_private_key() return generate_public_key_string(private_key.public_key()) class BaseAdminTests: """Base admin test class.""" @asynccontextmanager async def http_client( self, admin_server: AdminServer, authenticate: bool = True ) -> AsyncIterator[httpx.AsyncClient]: """Run a client towards the admin rest api.""" admin_url, credentials = admin_server username, password = credentials headers: dict[str, str] | None = None if authenticate: async with httpx.AsyncClient(base_url=admin_url) as client: response = await client.post( "api/v1/token", data={"username": username, "password": password} ) assert response.status_code == 200 data = response.json() assert "access_token" in data token = data["access_token"] headers = {"Authorization": f"Bearer {token}"} async with httpx.AsyncClient(base_url=admin_url, headers=headers) as client: yield client async def create_client( self, admin_server: AdminServer, name: str, public_key: str | None = None, ) -> Client: """Create a client.""" if not public_key: public_key = make_test_key() new_client = { "name": name, "public_key": public_key, "sources": ["192.0.2.0/24"], } async with self.http_client(admin_server, True) as http_client: response = await http_client.post("api/v1/clients/", json=new_client) assert response.status_code == 200 data = response.json() client = Client.model_validate(data) return client