From d3d99775d9e707dc074f2b5e5d2592c7ccfbdcde Mon Sep 17 00:00:00 2001 From: Allan Eising Date: Sun, 11 May 2025 11:22:00 +0200 Subject: [PATCH] Centralize testing --- packages/sshecret-sshd/pytest.ini | 2 - .../sshecret-sshd/tests => tests}/__init__.py | 0 tests/integration/__init__.py | 0 tests/integration/clients.py | 27 +++ tests/integration/conftest.py | 209 ++++++++++++++++++ tests/integration/helpers.py | 41 ++++ tests/integration/test_admin_api.py | 55 +++++ tests/integration/test_backend.py | 58 +++++ tests/integration/test_sshd.py | 139 ++++++++++++ tests/integration/types.py | 29 +++ tests/packages/__init__.py | 0 tests/packages/backend/__init__.py | 0 .../packages/backend}/test_backend.py | 0 tests/packages/sshd/__init__.py | 1 + .../tests => tests/packages/sshd}/conftest.py | 8 +- .../packages/sshd}/test_get_secret.py | 0 .../packages/sshd}/test_ping.py | 0 .../packages/sshd}/test_register.py | 0 .../tests => tests/packages/sshd}/types.py | 0 19 files changed, 565 insertions(+), 4 deletions(-) delete mode 100644 packages/sshecret-sshd/pytest.ini rename {packages/sshecret-sshd/tests => tests}/__init__.py (100%) create mode 100644 tests/integration/__init__.py create mode 100644 tests/integration/clients.py create mode 100644 tests/integration/conftest.py create mode 100644 tests/integration/helpers.py create mode 100644 tests/integration/test_admin_api.py create mode 100644 tests/integration/test_backend.py create mode 100644 tests/integration/test_sshd.py create mode 100644 tests/integration/types.py create mode 100644 tests/packages/__init__.py create mode 100644 tests/packages/backend/__init__.py rename {packages/sshecret-backend/tests => tests/packages/backend}/test_backend.py (100%) create mode 100644 tests/packages/sshd/__init__.py rename {packages/sshecret-sshd/tests => tests/packages/sshd}/conftest.py (96%) rename {packages/sshecret-sshd/tests => tests/packages/sshd}/test_get_secret.py (100%) rename {packages/sshecret-sshd/tests => tests/packages/sshd}/test_ping.py (100%) rename {packages/sshecret-sshd/tests => tests/packages/sshd}/test_register.py (100%) rename {packages/sshecret-sshd/tests => tests/packages/sshd}/types.py (100%) diff --git a/packages/sshecret-sshd/pytest.ini b/packages/sshecret-sshd/pytest.ini deleted file mode 100644 index 2f4c80e..0000000 --- a/packages/sshecret-sshd/pytest.ini +++ /dev/null @@ -1,2 +0,0 @@ -[pytest] -asyncio_mode = auto diff --git a/packages/sshecret-sshd/tests/__init__.py b/tests/__init__.py similarity index 100% rename from packages/sshecret-sshd/tests/__init__.py rename to tests/__init__.py diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/integration/clients.py b/tests/integration/clients.py new file mode 100644 index 0000000..2623b1b --- /dev/null +++ b/tests/integration/clients.py @@ -0,0 +1,27 @@ +"""Client helpers.""" + +from dataclasses import dataclass +from cryptography.hazmat.primitives.asymmetric import rsa + +from sshecret.crypto import generate_private_key, generate_public_key_string + + +@dataclass +class ClientData: + """Test client.""" + + name: str + private_key: rsa.RSAPrivateKey + + @property + def public_key(self) -> str: + """Return public key as string.""" + return generate_public_key_string(self.private_key.public_key()) + + +def create_test_client(name: str) -> ClientData: + """Create test client.""" + return ClientData( + name=name, + private_key=generate_private_key() + ) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py new file mode 100644 index 0000000..33f5ed7 --- /dev/null +++ b/tests/integration/conftest.py @@ -0,0 +1,209 @@ +"""Test library. + +Strategy: + +We start by spawning the backend server, and create two test keys. + +Then we spawn the sshd and the admin api. +""" + +import asyncio +import asyncssh +import secrets +import tempfile +from contextlib import asynccontextmanager +from pathlib import Path + +import httpx +import pytest +import pytest_asyncio + +import uvicorn +from sshecret.backend import SshecretBackend +from sshecret.crypto import ( + generate_private_key, + generate_public_key_string, + write_private_key, +) +from sshecret_admin.core.app import create_admin_app +from sshecret_admin.core.settings import AdminServerSettings +from sshecret_backend.app import create_backend_app +from sshecret_backend.settings import BackendSettings +from sshecret_backend.testing import create_test_token +from sshecret_sshd.settings import ServerSettings +from sshecret_sshd.ssh_server import start_sshecret_sshd + +from .clients import ClientData +from .helpers import create_sshd_server_key, create_test_admin_user, in_tempdir +from .types import PortFactory, TestPorts + +TEST_SCOPE = "function" +LOOP_SCOPE = "function" + + +def make_test_key() -> str: + """Generate a test key.""" + private_key = generate_private_key() + return generate_public_key_string(private_key.public_key()) + + +@pytest.fixture(name="test_ports", scope="session") +def generate_test_ports(unused_tcp_port_factory: PortFactory) -> TestPorts: + """Generate the test ports.""" + test_ports = TestPorts( + backend=unused_tcp_port_factory(), + admin=unused_tcp_port_factory(), + sshd=unused_tcp_port_factory(), + ) + print(f"{test_ports=!r}") + return test_ports + + +@pytest_asyncio.fixture(scope=TEST_SCOPE, name="backend_server", loop_scope=LOOP_SCOPE) +async def run_backend_server(test_ports: TestPorts): + """Run the backend server.""" + port = test_ports.backend + with tempfile.TemporaryDirectory() as tmp_dir: + backend_work_path = Path(tmp_dir) + db_file = backend_work_path / "backend.db" + backend_settings = BackendSettings(database=str(db_file.absolute())) + backend_app = create_backend_app(backend_settings) + token = create_test_token(backend_settings) + config = uvicorn.Config(app=backend_app, port=port, loop="asyncio") + server = uvicorn.Server(config=config) + server_task = asyncio.create_task(server.serve()) + await asyncio.sleep(0.1) + backend_url = f"http://127.0.0.1:{port}" + yield (backend_url, token) + server.should_exit = True + await server_task + + +@pytest_asyncio.fixture(scope=TEST_SCOPE, name="admin_server", loop_scope=LOOP_SCOPE) +async def run_admin_server(test_ports: TestPorts, backend_server: tuple[str, str]): + """Run admin server.""" + backend_url, backend_token = backend_server + secret_key = secrets.token_urlsafe(32) + port = test_ports.admin + with in_tempdir() as admin_work_path: + admin_db = admin_work_path / "ssh_admin.db" + admin_settings = AdminServerSettings.model_validate( + { + "sshecret_backend_url": backend_url, + "backend_token": backend_token, + "secret_key": secret_key, + "listen_address": "127.0.0.1", + "port": port, + "database": str(admin_db.absolute()), + "password_manager_directory": str(admin_work_path.absolute()), + } + ) + admin_app = create_admin_app(admin_settings) + config = uvicorn.Config(app=admin_app, port=port, loop="asyncio") + server = uvicorn.Server(config=config) + server_task = asyncio.create_task(server.serve()) + await asyncio.sleep(0.1) + admin_url = f"http://127.0.0.1:{port}" + admin_password = secrets.token_urlsafe(10) + create_test_admin_user(admin_settings, "test", admin_password) + await asyncio.sleep(0.1) + yield (admin_url, ("test", admin_password)) + server.should_exit = True + await server_task + + +@pytest_asyncio.fixture(scope=TEST_SCOPE, name="ssh_server", loop_scope=LOOP_SCOPE) +async def start_ssh_server(test_ports: TestPorts, backend_server: tuple[str, str]): + """Run ssh server.""" + backend_url, backend_token = backend_server + port = test_ports.sshd + with in_tempdir() as ssh_workdir: + create_sshd_server_key(ssh_workdir) + sshd_server_settings = ServerSettings.model_validate( + { + "sshecret_backend_url": backend_url, + "backend_token": backend_token, + "listen_address": "", + "port": port, + "registration": {"enabled": True, "allow_from": "0.0.0.0/0"}, + "enable_ping_command": True, + } + ) + + ssh_server = await start_sshecret_sshd(sshd_server_settings) + await asyncio.sleep(0.1) + print(f"Started sshd on port {port}") + yield port + + ssh_server.close() + await ssh_server.wait_closed() + + +@pytest_asyncio.fixture(scope=TEST_SCOPE, name="backend_client", loop_scope=LOOP_SCOPE) +async def create_backend_http_client(backend_server: tuple[str, str]): + """Create a test client.""" + backend_url, backend_token = backend_server + print(f"Creating backend client towards {backend_url}") + async with httpx.AsyncClient( + base_url=backend_url, headers={"X-API-Token": backend_token} + ) as client: + yield client + + +@pytest_asyncio.fixture(name="backend_api") +async def get_test_backend_api(backend_server: tuple[str, str]) -> SshecretBackend: + """Get the backend API.""" + backend_url, backend_token = backend_server + return SshecretBackend(backend_url, backend_token) + + +@pytest.fixture(scope=TEST_SCOPE) +def ssh_command_runner(ssh_server: int, tmp_path: Path): + """Run a single command on the ssh server.""" + port = ssh_server + + async def run_command_as(test_client: ClientData, command: str): + private_key_file = tmp_path / f"id_{test_client.name}" + write_private_key(test_client.private_key, private_key_file) + + conn = await asyncssh.connect( + "127.0.0.1", + port=port, + username=test_client.name, + client_keys=[str(private_key_file)], + known_hosts=None, + ) + try: + result = await conn.run(command) + return result + finally: + conn.close() + await conn.wait_closed() + + return run_command_as + + +@pytest.fixture(name="ssh_session", scope=TEST_SCOPE) +def create_ssh_session(ssh_server: int, tmp_path: Path): + """Create a ssh Session.""" + port = ssh_server + + @asynccontextmanager + async def run_process(test_client: ClientData, command: str): + private_key_file = tmp_path / f"id_{test_client.name}" + write_private_key(test_client.private_key, private_key_file) + conn = await asyncssh.connect( + "127.0.0.1", + port=port, + username=test_client.name, + client_keys=[str(private_key_file)], + known_hosts=None, + ) + try: + async with conn.create_process(command) as process: + yield process + finally: + conn.close() + await conn.wait_closed() + + return run_process diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py new file mode 100644 index 0000000..68b0477 --- /dev/null +++ b/tests/integration/helpers.py @@ -0,0 +1,41 @@ +"""Helper functions.""" + +import os +import tempfile +from collections.abc import Iterator +from contextlib import contextmanager +from pathlib import Path +from sqlmodel import Session, create_engine +from sshecret.crypto import generate_private_key, write_private_key +from sshecret_admin.auth.authentication import hash_password +from sshecret_admin.auth.models import User, init_db +from sshecret_admin.core.settings import AdminServerSettings + +def create_test_admin_user(settings: AdminServerSettings, username: str, password: str) -> None: + """Create a test admin user.""" + hashed_password = hash_password(password) + engine = create_engine(settings.admin_db) + init_db(engine) + with Session(engine) as session: + user = User(username=username, hashed_password=hashed_password) + session.add(user) + session.commit() + + +def create_sshd_server_key(sshd_path: Path) -> Path: + """Create a ssh key at a general""" + server_file = sshd_path / "ssh_host_key" + private_key = generate_private_key() + write_private_key(private_key, server_file) + return server_file + + +@contextmanager +def in_tempdir() -> Iterator[Path]: + """Run in a temporary directory.""" + curdir = os.getcwd() + with tempfile.TemporaryDirectory() as temp_directory: + temp_path = Path(temp_directory) + os.chdir(temp_directory) + yield temp_path + os.chdir(curdir) diff --git a/tests/integration/test_admin_api.py b/tests/integration/test_admin_api.py new file mode 100644 index 0000000..5227cfd --- /dev/null +++ b/tests/integration/test_admin_api.py @@ -0,0 +1,55 @@ +"""Tests of the admin interface.""" + +from collections.abc import AsyncIterator +from contextlib import asynccontextmanager +import pytest + +import httpx + + +class TestAdminAPI: + """Tests of the Admin REST API.""" + + @pytest.mark.asyncio + async def test_health_check( + self, admin_server: tuple[str, tuple[str, str]] + ) -> None: + """Test admin login.""" + async with self.http_client(admin_server, False) as client: + resp = await client.get("/health") + assert resp.status_code == 200 + + @pytest.mark.asyncio + async def test_admin_login(self, admin_server: tuple[str, tuple[str, str]]) -> None: + """Test admin login.""" + + async with self.http_client(admin_server, False) as client: + resp = await client.get("api/v1/clients/") + assert resp.status_code == 401 + + async with self.http_client(admin_server, True) as client: + resp = await client.get("api/v1/clients/") + assert resp.status_code == 200 + + @asynccontextmanager + async def http_client( + self, admin_server: tuple[str, tuple[str, str]], authenticate: bool = True + ) -> AsyncIterator[httpx.AsyncClient]: + """Run a client towards the admin rest api.""" + admin_url, credentials = admin_server + username, password = credentials + headers: dict[str, str] | None = None + if authenticate: + async with httpx.AsyncClient(base_url=admin_url) as client: + + response = await client.post( + "api/v1/token", data={"username": username, "password": password} + ) + assert response.status_code == 200 + data = response.json() + assert "access_token" in data + token = data["access_token"] + headers = {"Authorization": f"Bearer {token}"} + + async with httpx.AsyncClient(base_url=admin_url, headers=headers) as client: + yield client diff --git a/tests/integration/test_backend.py b/tests/integration/test_backend.py new file mode 100644 index 0000000..b942991 --- /dev/null +++ b/tests/integration/test_backend.py @@ -0,0 +1,58 @@ +"""Test backend. + +These tests just ensure that the backend works well enough for us to run the +rest of the tests. + +""" +import pytest +import httpx +from sshecret.backend import SshecretBackend +from .clients import create_test_client + + +@pytest.mark.asyncio +async def test_healthcheck(backend_client: httpx.AsyncClient) -> None: + """Test healthcheck command.""" + resp = await backend_client.get("/health") + assert resp.status_code == 200 + assert resp.json() == {"status": "LIVE"} + +@pytest.mark.asyncio +async def test_create_client(backend_api: SshecretBackend) -> None: + """Test creating a client.""" + test_client = create_test_client("test") + await backend_api.create_client("test", test_client.public_key, "A test client") + + # fetch the list of clients. + + clients = await backend_api.get_clients() + assert clients is not None + + assert len(clients) == 1 + + assert clients[0].name == "test" + + assert clients[0].public_key == test_client.public_key + +async def test_create_secret(backend_api: SshecretBackend) -> None: + """Test creating secrets.""" + test_client = create_test_client("test") + await backend_api.create_client("test", test_client.public_key, "A test client") + + await backend_api.create_client_secret("test", "mysecret", "encrypted_secret") + + secrets = await backend_api.get_secrets() + assert len(secrets) == 1 + assert secrets[0].name == "mysecret" + + + secret_to_client = await backend_api.get_secret("mysecret") + assert secret_to_client is not None + + assert secret_to_client.name == "mysecret" + assert "test" in secret_to_client.clients + + secret = await backend_api.get_client_secret("test", "mysecret") + + assert secret is not None + assert secret == "encrypted_secret" diff --git a/tests/integration/test_sshd.py b/tests/integration/test_sshd.py new file mode 100644 index 0000000..c4628c4 --- /dev/null +++ b/tests/integration/test_sshd.py @@ -0,0 +1,139 @@ +"""Tests where the sshd is the main consumer. + +This essentially also tests parts of the admin API. +""" + +from contextlib import asynccontextmanager +from typing import AsyncIterator +import os +import httpx + +import pytest +from sshecret.crypto import decode_string +from sshecret.backend.api import SshecretBackend + +from .clients import create_test_client, ClientData + +from .types import CommandRunner, ProcessRunner + + +class TestSshd: + """Class based tests. + + This allows us to create small helpers. + """ + + @pytest.mark.asyncio + async def test_get_secret( + self, backend_api: SshecretBackend, ssh_command_runner: CommandRunner + ) -> None: + """Test get secret flow.""" + test_client = create_test_client("testclient") + await backend_api.create_client( + "testclient", test_client.public_key, "A test client" + ) + await backend_api.create_client_secret("testclient", "testsecret", "bogus") + response = await ssh_command_runner(test_client, "get_secret testsecret") + assert response.exit_status == 0 + assert response.stdout is not None + assert isinstance(response.stdout, str) + assert response.stdout.rstrip() == "bogus" + + @pytest.mark.asyncio + async def test_register( + self, backend_api: SshecretBackend, ssh_session: ProcessRunner + ) -> None: + """Test registration.""" + await self.register_client("new_client", ssh_session) + # Check that the client is created. + clients = await backend_api.get_clients() + assert len(clients) == 1 + + client = clients[0] + assert client.name == "new_client" + + async def register_client( + self, name: str, ssh_session: ProcessRunner + ) -> ClientData: + """Register client.""" + test_client = create_test_client(name) + async with ssh_session(test_client, "register") as session: + maxlines = 10 + linenum = 0 + found = False + while linenum < maxlines: + line = await session.stdout.readline() + if "Enter public key" in line: + found = True + break + assert found is True + session.stdin.write(test_client.public_key + "\n") + + result = await session.stdout.readline() + assert "OK" in result + await session.wait() + return test_client + + +class TestSshdIntegration(TestSshd): + """Integration tests.""" + + @pytest.mark.asyncio + async def test_end_to_end( + self, + backend_api: SshecretBackend, + admin_server: tuple[str, tuple[str, str]], + ssh_session: ProcessRunner, + ssh_command_runner: CommandRunner, + ) -> None: + """Test end to end.""" + test_client = await self.register_client("myclient", ssh_session) + url, credentials = admin_server + username, password = credentials + async with self.admin_client(url, username, password) as http_client: + resp = await http_client.get("api/v1/clients/") + assert resp.status_code == 200 + clients = resp.json() + assert len(clients) == 1 + assert clients[0]["name"] == "myclient" + + create_model = { + "name": "mysecret", + "clients": ["myclient"], + "value": "mypassword", + } + resp = await http_client.post("api/v1/secrets/", json=create_model) + assert resp.status_code == 200 + + # Login via ssh to fetch the decrypted value. + ssh_output = await ssh_command_runner(test_client, "get_secret mysecret") + assert ssh_output.stdout is not None + assert isinstance(ssh_output.stdout, str) + encrypted = ssh_output.stdout.rstrip() + decrypted = decode_string(encrypted, test_client.private_key) + assert decrypted == "mypassword" + + async def login(self, url: str, username: str, password: str) -> str: + """Login and get token.""" + api_url = os.path.join(url, "api/v1", "token") + client = httpx.AsyncClient() + + response = await client.post( + api_url, data={"username": username, "password": password} + ) + assert response.status_code == 200 + data = response.json() + assert "access_token" in data + assert isinstance(data["access_token"], str) + return str(data["access_token"]) + + @asynccontextmanager + async def admin_client( + self, url: str, username: str, password: str + ) -> AsyncIterator[httpx.AsyncClient]: + """Create an admin client.""" + token = await self.login(url, username, password) + async with httpx.AsyncClient( + base_url=url, headers={"Authorization": f"Bearer {token}"} + ) as client: + yield client diff --git a/tests/integration/types.py b/tests/integration/types.py new file mode 100644 index 0000000..96998f6 --- /dev/null +++ b/tests/integration/types.py @@ -0,0 +1,29 @@ +"""Typings.""" +import asyncssh + +from typing import Any, AsyncContextManager, Protocol +from dataclasses import dataclass +from collections.abc import Callable, Awaitable + +from .clients import ClientData + + +PortFactory = Callable[[], int] + + +@dataclass +class TestPorts: + """Test port dataclass.""" + + backend: int + admin: int + sshd: int + + +CommandRunner = Callable[[ClientData, str], Awaitable[asyncssh.SSHCompletedProcess]] + +class ProcessRunner(Protocol): + """Process runner typing.""" + + def __call__(self, test_client: ClientData, command: str) -> AsyncContextManager[asyncssh.SSHClientProcess[Any]]: + ... diff --git a/tests/packages/__init__.py b/tests/packages/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/packages/backend/__init__.py b/tests/packages/backend/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/packages/sshecret-backend/tests/test_backend.py b/tests/packages/backend/test_backend.py similarity index 100% rename from packages/sshecret-backend/tests/test_backend.py rename to tests/packages/backend/test_backend.py diff --git a/tests/packages/sshd/__init__.py b/tests/packages/sshd/__init__.py new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/tests/packages/sshd/__init__.py @@ -0,0 +1 @@ + diff --git a/packages/sshecret-sshd/tests/conftest.py b/tests/packages/sshd/conftest.py similarity index 96% rename from packages/sshecret-sshd/tests/conftest.py rename to tests/packages/sshd/conftest.py index daeec14..c27580e 100644 --- a/packages/sshecret-sshd/tests/conftest.py +++ b/tests/packages/sshd/conftest.py @@ -1,4 +1,5 @@ import asyncio +from typing import Any import pytest import uuid import asyncssh @@ -79,13 +80,16 @@ async def mock_backend(client_registry: ClientRegistry) -> MagicMock: continue secrets_data[(name, secret_name)] = secret + async def write_audit(*args, **kwargs): + """Write audit mock.""" + return None + backend.get_client = AsyncMock(side_effect=get_client) backend.get_client_secret = AsyncMock(side_effect=get_client_secret) backend.create_client = AsyncMock(side_effect=create_client) - # Make sure backend.audit(...) returns the audit mock audit = MagicMock() - audit.write = MagicMock() + audit.write_async = AsyncMock(side_effect=write_audit) backend.audit = MagicMock(return_value=audit) return backend diff --git a/packages/sshecret-sshd/tests/test_get_secret.py b/tests/packages/sshd/test_get_secret.py similarity index 100% rename from packages/sshecret-sshd/tests/test_get_secret.py rename to tests/packages/sshd/test_get_secret.py diff --git a/packages/sshecret-sshd/tests/test_ping.py b/tests/packages/sshd/test_ping.py similarity index 100% rename from packages/sshecret-sshd/tests/test_ping.py rename to tests/packages/sshd/test_ping.py diff --git a/packages/sshecret-sshd/tests/test_register.py b/tests/packages/sshd/test_register.py similarity index 100% rename from packages/sshecret-sshd/tests/test_register.py rename to tests/packages/sshd/test_register.py diff --git a/packages/sshecret-sshd/tests/types.py b/tests/packages/sshd/types.py similarity index 100% rename from packages/sshecret-sshd/tests/types.py rename to tests/packages/sshd/types.py