Files

68 lines
2.1 KiB
Python

from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
import httpx
from sshecret.backend import Client
from sshecret.crypto import generate_private_key, generate_public_key_string
from ..types import AdminServer
def make_test_key() -> str:
"""Generate a test key."""
private_key = generate_private_key()
return generate_public_key_string(private_key.public_key())
class BaseAdminTests:
"""Base admin test class."""
@asynccontextmanager
async def http_client(
self, admin_server: AdminServer, authenticate: bool = True
) -> AsyncIterator[httpx.AsyncClient]:
"""Run a client towards the admin rest api."""
admin_url, credentials = admin_server
username, password = credentials
headers: dict[str, str] | None = None
if authenticate:
async with httpx.AsyncClient(base_url=admin_url) as client:
response = await client.post(
"api/v1/token", data={"username": username, "password": password}
)
assert response.status_code == 200
data = response.json()
assert "access_token" in data
token = data["access_token"]
headers = {"Authorization": f"Bearer {token}"}
async with httpx.AsyncClient(base_url=admin_url, headers=headers) as client:
yield client
async def create_client(
self,
admin_server: AdminServer,
name: str,
public_key: str | None = None,
) -> Client:
"""Create a client."""
if not public_key:
public_key = make_test_key()
new_client = {
"name": name,
"public_key": public_key,
"sources": ["192.0.2.0/24"],
}
async with self.http_client(admin_server, True) as http_client:
response = await http_client.post("api/v1/clients/", json=new_client)
assert response.status_code == 200
data = response.json()
client = Client.model_validate(data)
return client