Files
sshecret/tests/integration/test_sshd.py
Allan Eising dcf0b4274c Refactor command handling
This now supports usage/help texts
2025-05-18 17:56:53 +02:00

140 lines
4.7 KiB
Python

"""Tests where the sshd is the main consumer.
This essentially also tests parts of the admin API.
"""
from contextlib import asynccontextmanager
from typing import AsyncIterator
import os
import httpx
import pytest
from sshecret.crypto import decode_string
from sshecret.backend.api import SshecretBackend
from .clients import create_test_client, ClientData
from .types import CommandRunner, ProcessRunner
class TestSshd:
"""Class based tests.
This allows us to create small helpers.
"""
@pytest.mark.asyncio
async def test_get_secret(
self, backend_api: SshecretBackend, ssh_command_runner: CommandRunner
) -> None:
"""Test get secret flow."""
test_client = create_test_client("testclient")
await backend_api.create_client(
"testclient", test_client.public_key, "A test client"
)
await backend_api.create_client_secret("testclient", "testsecret", "bogus")
response = await ssh_command_runner(test_client, "get_secret testsecret")
assert response.exit_status == 0
assert response.stdout is not None
assert isinstance(response.stdout, str)
assert response.stdout.rstrip() == "bogus"
@pytest.mark.asyncio
async def test_register(
self, backend_api: SshecretBackend, ssh_session: ProcessRunner
) -> None:
"""Test registration."""
await self.register_client("new_client", ssh_session)
# Check that the client is created.
clients = await backend_api.get_clients()
assert len(clients) == 1
client = clients[0]
assert client.name == "new_client"
async def register_client(
self, name: str, ssh_session: ProcessRunner
) -> ClientData:
"""Register client."""
test_client = create_test_client(name)
async with ssh_session(test_client, "register") as session:
maxlines = 10
linenum = 0
found = False
while linenum < maxlines:
line = await session.stdout.readline()
if "Enter public key" in line:
found = True
break
assert found is True
session.stdin.write(test_client.public_key + "\n")
result = await session.stdout.read()
assert "Key is valid. Registering client." in result
await session.wait()
return test_client
class TestSshdIntegration(TestSshd):
"""Integration tests."""
@pytest.mark.asyncio
async def test_end_to_end(
self,
backend_api: SshecretBackend,
admin_server: tuple[str, tuple[str, str]],
ssh_session: ProcessRunner,
ssh_command_runner: CommandRunner,
) -> None:
"""Test end to end."""
test_client = await self.register_client("myclient", ssh_session)
url, credentials = admin_server
username, password = credentials
async with self.admin_client(url, username, password) as http_client:
resp = await http_client.get("api/v1/clients/")
assert resp.status_code == 200
clients = resp.json()
assert len(clients) == 1
assert clients[0]["name"] == "myclient"
create_model = {
"name": "mysecret",
"clients": ["myclient"],
"value": "mypassword",
}
resp = await http_client.post("api/v1/secrets/", json=create_model)
assert resp.status_code == 200
# Login via ssh to fetch the decrypted value.
ssh_output = await ssh_command_runner(test_client, "get_secret mysecret")
assert ssh_output.stdout is not None
assert isinstance(ssh_output.stdout, str)
encrypted = ssh_output.stdout.rstrip()
decrypted = decode_string(encrypted, test_client.private_key)
assert decrypted == "mypassword"
async def login(self, url: str, username: str, password: str) -> str:
"""Login and get token."""
api_url = os.path.join(url, "api/v1", "token")
client = httpx.AsyncClient()
response = await client.post(
api_url, data={"username": username, "password": password}
)
assert response.status_code == 200
data = response.json()
assert "access_token" in data
assert isinstance(data["access_token"], str)
return str(data["access_token"])
@asynccontextmanager
async def admin_client(
self, url: str, username: str, password: str
) -> AsyncIterator[httpx.AsyncClient]:
"""Create an admin client."""
token = await self.login(url, username, password)
async with httpx.AsyncClient(
base_url=url, headers={"Authorization": f"Bearer {token}"}
) as client:
yield client