"""Tests where the sshd is the main consumer. This essentially also tests parts of the admin API. """ from contextlib import asynccontextmanager from typing import AsyncIterator import os import httpx import pytest from sshecret.crypto import decode_string from sshecret.backend.api import SshecretBackend from .clients import create_test_client, ClientData from .types import CommandRunner, ProcessRunner class TestSshd: """Class based tests. This allows us to create small helpers. """ @pytest.mark.asyncio async def test_get_secret( self, backend_api: SshecretBackend, ssh_command_runner: CommandRunner ) -> None: """Test get secret flow.""" test_client = create_test_client("testclient") await backend_api.create_client( "testclient", test_client.public_key, "A test client" ) await backend_api.create_client_secret("testclient", "testsecret", "bogus") response = await ssh_command_runner(test_client, "get_secret testsecret") assert response.exit_status == 0 assert response.stdout is not None assert isinstance(response.stdout, str) assert response.stdout.rstrip() == "bogus" @pytest.mark.asyncio async def test_register( self, backend_api: SshecretBackend, ssh_session: ProcessRunner ) -> None: """Test registration.""" await self.register_client("new_client", ssh_session) # Check that the client is created. clients = await backend_api.get_clients() assert len(clients) == 1 client = clients[0] assert client.name == "new_client" async def register_client( self, name: str, ssh_session: ProcessRunner ) -> ClientData: """Register client.""" test_client = create_test_client(name) async with ssh_session(test_client, "register") as session: maxlines = 10 linenum = 0 found = False while linenum < maxlines: line = await session.stdout.readline() if "Enter public key" in line: found = True break assert found is True session.stdin.write(test_client.public_key + "\n") result = await session.stdout.read() assert "Key is valid. Registering client." in result await session.wait() return test_client class TestSshdIntegration(TestSshd): """Integration tests.""" @pytest.mark.asyncio async def test_end_to_end( self, backend_api: SshecretBackend, admin_server: tuple[str, tuple[str, str]], ssh_session: ProcessRunner, ssh_command_runner: CommandRunner, ) -> None: """Test end to end.""" test_client = await self.register_client("myclient", ssh_session) url, credentials = admin_server username, password = credentials async with self.admin_client(url, username, password) as http_client: resp = await http_client.get("api/v1/clients/") assert resp.status_code == 200 clients = resp.json() assert len(clients) == 1 assert clients[0]["name"] == "myclient" create_model = { "name": "mysecret", "clients": ["myclient"], "value": "mypassword", } resp = await http_client.post("api/v1/secrets/", json=create_model) assert resp.status_code == 200 # Login via ssh to fetch the decrypted value. ssh_output = await ssh_command_runner(test_client, "get_secret mysecret") assert ssh_output.stdout is not None assert isinstance(ssh_output.stdout, str) encrypted = ssh_output.stdout.rstrip() decrypted = decode_string(encrypted, test_client.private_key) assert decrypted == "mypassword" async def login(self, url: str, username: str, password: str) -> str: """Login and get token.""" api_url = os.path.join(url, "api/v1", "token") client = httpx.AsyncClient() response = await client.post( api_url, data={"username": username, "password": password} ) assert response.status_code == 200 data = response.json() assert "access_token" in data assert isinstance(data["access_token"], str) return str(data["access_token"]) @asynccontextmanager async def admin_client( self, url: str, username: str, password: str ) -> AsyncIterator[httpx.AsyncClient]: """Create an admin client.""" token = await self.login(url, username, password) async with httpx.AsyncClient( base_url=url, headers={"Authorization": f"Bearer {token}"} ) as client: yield client