Files
sshecret/tests/packages/backend/test_backend.py
Allan Eising d86d9a9256 Adapt admin api to use new key format
Filter out deleted an previous version in count

Remove todo comment

Allow explicit ID specification

Update tests
2025-06-09 08:57:59 +02:00

610 lines
19 KiB
Python

"""Tests of the backend api using pytest."""
import uuid
import logging
from pathlib import Path
from httpx import Response
import pytest
from fastapi.testclient import TestClient
from sshecret.crypto import generate_private_key, generate_public_key_string
from sshecret_backend.app import create_backend_app
from sshecret_backend.testing import create_test_token
from sshecret_backend.api.audit.schemas import AuditView
from sshecret_backend.settings import BackendSettings
LOG = logging.getLogger()
handler = logging.StreamHandler()
formatter = logging.Formatter("'%(asctime)s - %(levelname)s - %(message)s'")
handler.setFormatter(formatter)
LOG.addHandler(handler)
#LOG.setLevel(logging.DEBUG)
def make_test_key() -> str:
"""Generate a test key."""
private_key = generate_private_key()
return generate_public_key_string(private_key.public_key())
def create_client(
test_client: TestClient,
name: str,
public_key: str | None = None,
description: str | None = None,
) -> Response:
"""Create client."""
if not public_key:
public_key = make_test_key()
data = {
"name": name,
"public_key": public_key,
}
if description:
data["description"] = description
create_response = test_client.post("/api/v1/clients", json=data)
return create_response
@pytest.fixture(name="test_client")
def create_client_fixture(tmp_path: Path):
"""Test client fixture."""
db_file = tmp_path / "backend.db"
print(f"DB File: {db_file.absolute()}")
settings = BackendSettings(database=str(db_file.absolute()))
app = create_backend_app(settings)
token = create_test_token(settings)
test_client = TestClient(app, headers={"X-API-Token": token})
yield test_client
def test_missing_token(test_client: TestClient) -> None:
"""Test logging in with missing token."""
# Save headers
old_headers = test_client.headers
test_client.headers = {}
response = test_client.get("/api/v1/clients/", headers={})
assert response.status_code == 422
test_client.headers = old_headers
def test_incorrect_token(test_client: TestClient) -> None:
"""Test logging in with missing token."""
response = test_client.get("/api/v1/clients/", headers={"X-API-Token": "WRONG"})
assert response.status_code == 401
def test_with_token(test_client: TestClient) -> None:
"""Test with a valid token."""
response = test_client.get("/api/v1/clients/")
assert response.status_code == 200
data = response.json()
assert data["total_results"] == 0
def test_create_client(test_client: TestClient) -> None:
"""Test creating a client."""
client_name = "test"
client_publickey = make_test_key()
create_response = create_client(test_client, client_name, client_publickey)
assert create_response.status_code == 200
response = test_client.get("/api/v1/clients")
assert response.status_code == 200
clients_result = response.json()
clients = clients_result["clients"]
assert isinstance(clients, list)
client = clients[0]
assert isinstance(client, dict)
assert client.get("name") == client_name
assert client.get("created_at") is not None
def test_delete_client(test_client: TestClient) -> None:
"""Test creating a client."""
client_name = "test"
create_response = create_client(
test_client,
client_name,
)
assert create_response.status_code == 200
resp = test_client.delete("/api/v1/clients/test")
assert resp.status_code == 200
resp = test_client.get("/api/v1/clients/test")
assert resp.status_code == 404
resp = test_client.get("/api/v1/clients/")
assert resp.status_code == 200
def test_add_secret(test_client: TestClient) -> None:
"""Test adding a secret to a client."""
client_name = "test"
client_publickey = make_test_key()
create_response = create_client(
test_client,
client_name,
client_publickey,
)
assert create_response.status_code == 200
secret_name = "mysecret"
secret_value = "shhhh"
data = {"name": secret_name, "secret": secret_value, "description": "A test secret"}
response = test_client.post("/api/v1/clients/test/secrets/", json=data)
assert response.status_code == 200
# Get it back
get_response = test_client.get("/api/v1/clients/test/secrets/mysecret")
assert get_response.status_code == 200
secret_body = get_response.json()
assert secret_body["name"] == data["name"]
assert secret_body["secret"] == data["secret"]
def test_delete_secret(test_client: TestClient) -> None:
"""Test deleting a secret."""
test_add_secret(test_client)
resp = test_client.delete("/api/v1/clients/test/secrets/mysecret")
assert resp.status_code == 200
get_response = test_client.get("/api/v1/clients/test/secrets/mysecret")
assert get_response.status_code == 404
def test_put_add_secret(test_client: TestClient) -> None:
"""Test adding secret via PUT."""
# Use the test_create_client function to create a client.
test_create_client(test_client)
secret_name = "mysecret"
secret_value = "shhhh"
data = {"name": secret_name, "secret": secret_value, "description": None}
response = test_client.put(
"/api/v1/clients/test/secrets/mysecret",
json={"value": secret_value},
)
assert response.status_code == 200
response_model = response.json()
del response_model["created_at"]
del response_model["updated_at"]
for key, value in data.items():
assert response_model.get(key) == value
def test_put_update_secret(test_client: TestClient) -> None:
"""Test updating a client secret."""
test_add_secret(test_client)
new_value = "itsasecret"
update_response = test_client.put(
"/api/v1/clients/test/secrets/mysecret",
json={"value": new_value},
)
assert update_response.status_code == 200
expected = {"name": "mysecret", "secret": new_value}
response_model = update_response.json()
assert {
"name": response_model["name"],
"secret": response_model["secret"],
} == expected
# Ensure that the updated_at has been set.
assert "updated_at" in response_model
def test_audit_logging(test_client: TestClient) -> None:
"""Test audit logging."""
public_key = make_test_key()
create_client_resp = create_client(test_client, "test", public_key)
assert create_client_resp.status_code == 200
secrets = {"secret1": "foo", "secret2": "bar", "secret3": "baz"}
for name, secret in secrets.items():
add_resp = test_client.post(
"/api/v1/clients/test/secrets/",
json={"name": name, "secret": secret},
)
assert add_resp.status_code == 200
# Fetch the entire client.
get_client_resp = test_client.get("/api/v1/clients/test")
assert get_client_resp.status_code == 200
# Fetch the audit log
audit_log_resp = test_client.get("/api/v1/audit/")
assert audit_log_resp.status_code == 200
audit_logs = audit_log_resp.json()
assert isinstance(audit_logs, dict)
audit_count = audit_logs.get("total")
assert audit_count is not None
assert audit_count > 0
assert "results" in audit_logs
for entry in audit_logs["results"]:
# Let's try to reassemble the objects
audit_log = AuditView.model_validate(entry)
assert audit_log is not None
def test_secret_invalidation(test_client: TestClient) -> None:
"""Test secret invalidation."""
initial_key = make_test_key()
create_client_resp = create_client(test_client, "test", initial_key)
assert create_client_resp.status_code == 200
secrets = {"secret1": "foo", "secret2": "bar", "secret3": "baz"}
for name, secret in secrets.items():
add_resp = test_client.post(
"/api/v1/clients/test/secrets/",
json={"name": name, "secret": secret},
)
assert add_resp.status_code == 200
# Update the public-key. This should cause all secrets to be invalidated
# and no longer associated with a client.
new_key = make_test_key()
update_resp = test_client.post(
"/api/v1/clients/test/public-key",
json={"public_key": new_key},
)
assert update_resp.status_code == 200
# Fetch the client. The list of secrets should be empty.
get_resp = test_client.get("/api/v1/clients/test")
assert get_resp.status_code == 200
client = get_resp.json()
secrets = client.get("secrets")
assert bool(secrets) is False
def test_client_default_policies(
test_client: TestClient,
) -> None:
"""Test client policies."""
public_key = make_test_key()
resp = create_client(test_client, "test")
assert resp.status_code == 200
# Fetch policies, should return *
resp = test_client.get("/api/v1/clients/test/policies/")
assert resp.status_code == 200
policies = resp.json()
assert policies["sources"] == ["0.0.0.0/0", "::/0"]
def test_client_policy_update_one(test_client: TestClient) -> None:
"""Update client policy with single policy."""
public_key = make_test_key()
resp = create_client(test_client, "test", public_key)
assert resp.status_code == 200
policy = ["192.0.2.1"]
resp = test_client.put("/api/v1/clients/test/policies/", json={"sources": policy})
assert resp.status_code == 200
resp = test_client.get("/api/v1/clients/test/policies/")
assert resp.status_code == 200
policies = resp.json()
assert policies["sources"] == policy
def test_client_policy_update_advanced(test_client: TestClient) -> None:
"""Test other policy update scenarios."""
public_key = make_test_key()
resp = create_client(test_client, "test", public_key)
assert resp.status_code == 200
policy = ["192.0.2.1", "198.18.0.0/24"]
resp = test_client.put("/api/v1/clients/test/policies/", json={"sources": policy})
assert resp.status_code == 200
resp = test_client.get("/api/v1/clients/test/policies/")
assert resp.status_code == 200
policies = resp.json()
assert "192.0.2.1" in policies["sources"]
assert "198.18.0.0/24" in policies["sources"]
# Try to set it to something incorrect
policy = ["obviosly_wrong"]
resp = test_client.put("/api/v1/clients/test/policies/", json={"sources": policy})
assert resp.status_code == 422
# Check that the old value is still there
resp = test_client.get("/api/v1/clients/test/policies/")
assert resp.status_code == 200
policies = resp.json()
assert "192.0.2.1" in policies["sources"]
assert "198.18.0.0/24" in policies["sources"]
# Clear the policies
#
def test_client_policy_update_unset(test_client: TestClient) -> None:
"""Test clearing the client policy."""
public_key = make_test_key()
resp = create_client(test_client, "test", public_key)
assert resp.status_code == 200
policy = ["192.0.2.1", "198.18.0.0/24"]
resp = test_client.put("/api/v1/clients/test/policies/", json={"sources": policy})
assert resp.status_code == 200
policies = resp.json()
assert "192.0.2.1" in policies["sources"]
assert "198.18.0.0/24" in policies["sources"]
# Now we clear the policies
resp = test_client.put("/api/v1/clients/test/policies/", json={"sources": []})
assert resp.status_code == 200
policies = resp.json()
assert policies["sources"] == ["0.0.0.0/0", "::/0"]
def test_client_update(test_client: TestClient) -> None:
"""Test generic update of a client."""
public_key = make_test_key()
resp = create_client(test_client, "test", public_key, "PRE")
assert resp.status_code == 200
resp = test_client.get("/api/v1/clients/test")
assert resp.status_code == 200
client_data = resp.json()
assert client_data["description"] == "PRE"
# Update the description
new_client_data = {
"name": "test",
"description": "POST",
"public_key": client_data["public_key"],
}
resp = test_client.put("/api/v1/clients/test", json=new_client_data)
assert resp.status_code == 200
client_data = resp.json()
assert client_data["description"] == "POST"
resp = test_client.get("/api/v1/clients/test")
assert resp.status_code == 200
client_data = resp.json()
assert client_data["description"] == "POST"
def test_get_secret_list(test_client: TestClient) -> None:
"""Test the secret to client map view."""
# Make 4 clients
for x in range(4):
public_key = make_test_key()
create_client(test_client, f"client-{x}", public_key)
# Create a secret that only this client has.
resp = test_client.put(
f"/api/v1/clients/client-{x}/secrets/client-{x}", json={"value": "SECRET"}
)
assert resp.status_code == 200
# Create a secret that all of them have.
resp = test_client.put(
f"/api/v1/clients/client-{x}/secrets/commonsecret", json={"value": "SECRET"}
)
assert resp.status_code == 200
# Get the secret list
resp = test_client.get("/api/v1/secrets/")
assert resp.status_code == 200
data = resp.json()
assert isinstance(data, list)
assert len(data) == 5
for entry in data:
if entry["name"] == "commonsecret":
assert len(entry["clients"]) == 4
else:
assert len(entry["clients"]) == 1
assert entry["clients"][0]["name"] == entry["name"]
def test_get_secret_clients(test_client: TestClient) -> None:
"""Get the clients for a single secret."""
for x in range(4):
public_key = make_test_key()
create_client(test_client, f"client-{x}", public_key)
# Create a secret that every second of them have.
if x % 2 == 1:
continue
resp = test_client.put(
f"/api/v1/clients/client-{x}/secrets/commonsecret", json={"value": "SECRET"}
)
assert resp.status_code == 200
resp = test_client.get("/api/v1/secrets/commonsecret")
assert resp.status_code == 200
data = resp.json()
assert data["name"] == "commonsecret"
client_names = [client["name"] for client in data["clients"]]
assert "client-0" in client_names
assert "client-1" not in client_names
assert len(data["clients"]) == 2
def test_searching(test_client: TestClient) -> None:
"""Test searching."""
for x in range(4):
# Create four clients
create_client(test_client, f"client-{x}")
# Create one with a different name.
create_client(test_client, "othername")
# Search for a specific one.
resp = test_client.get("/api/v1/clients/", params={"name": "othername"})
assert resp.status_code == 200
result = resp.json()
assert result["total_results"] == 1
assert result["clients"][0]["name"] == "othername"
client_id = result["clients"][0]["id"]
# Search by ID
resp = test_client.get("/api/v1/clients/", params={"id": client_id})
assert resp.status_code == 200
result = resp.json()
assert result["total_results"] == 1
assert result["clients"][0]["name"] == "othername"
# Search for the four similarly named ones
resp = test_client.get("/api/v1/clients/", params={"name__like": "client-%"})
assert resp.status_code == 200
result = resp.json()
assert result["total_results"] == 4
assert str(result["clients"][0]["name"]).startswith("client-")
def test_operations_with_id(test_client: TestClient) -> None:
"""Test operations using ID instead of name."""
create_client(test_client, "test")
resp = test_client.get("/api/v1/clients/")
assert resp.status_code == 200
data = resp.json()
client = data["clients"][0]
client_id = client["id"]
resp = test_client.get(f"/api/v1/clients/id:{client_id}")
assert resp.status_code == 200
data = resp.json()
assert data["name"] == "test"
def test_write_audit_log(test_client: TestClient) -> None:
"""Test writing to the audit log."""
params = {
"subsystem": "backend",
"operation": "read",
"message": "Test Message"
}
resp = test_client.post("/api/v1/audit", json=params)
assert resp.status_code == 200
resp = test_client.get("/api/v1/audit")
assert resp.status_code == 200
data = resp.json()
entry = data["results"][0]
for key, value in params.items():
assert entry[key] == value
def test_filter_audit_log(test_client: TestClient) -> None:
"""Test filtering of audit logs."""
# prepare some audit logs
messages = [
{
"subsystem": "backend",
"operation": "login",
"client_id": str(uuid.uuid4()),
"client_name": "foo",
"origin": "192.0.2.1",
"message": "message1",
},
{
"subsystem": "backend",
"operation": "create",
"client_id": str(uuid.uuid4()),
"client_name": "foo",
"origin": "192.0.2.1",
"secret_id": str(uuid.uuid4()),
"message": "message2",
},
{
"subsystem": "test",
"operation": "deny",
"client_id": str(uuid.uuid4()),
"client_name": "bar",
"origin": "192.0.2.2",
"message": "message3",
},
]
for message in messages:
test_client.post("/api/v1/audit", json=message)
# find all client_name=foo entries
resp = test_client.get("/api/v1/audit/", params={"client_name": "foo"})
assert resp.status_code == 200
data = resp.json()
assert data["total"] == 2
assert len(data["results"]) == 2
# Get the one message from 'bar'
resp = test_client.get("/api/v1/audit/", params={"client_name": "bar"})
assert resp.status_code == 200
data = resp.json()
assert data["total"] == 1
assert len(data["results"]) == 1
assert data["results"][0]["operation"] == "deny"
# test combining fields to get the login event
resp = test_client.get("/api/v1/audit/", params={"client_name": "foo", "operation": "login"})
data = resp.json()
assert data["total"] == 1
assert data["results"][0]["operation"] == "login"
assert data["results"][0]["message"] == "message1"
def test_secret_flexid(test_client: TestClient) -> None:
"""Test flexible IDs in the secret API."""
client_name = "test"
create_response = create_client(
test_client,
client_name,
)
assert create_response.status_code == 200
assert "id" in create_response.json()
client_id = create_response.json()["id"]
# Create a secret using the client name
secrets: dict[str, str] = {}
resp = test_client.put(
"/api/v1/clients/test/secrets/clientnamesecret",
json={"value": "secret"},
)
assert resp.status_code == 200
secret_data = resp.json()
assert "id" in secret_data
secrets["clientnamesecret"] = secret_data["id"]
# Create one using the client ID
resp = test_client.put(
f"/api/v1/clients/id:{client_id}/secrets/clientidsecret",
json={"value": "secret"},
)
assert resp.status_code == 200
secret_data = resp.json()
assert "id" in secret_data
secrets["clientidsecret"] = secret_data["id"]
# Let's try fetching the various permutations
for client_identifier in ("test", f"id:{client_id}"):
for secret_name, secret_id in secrets.items():
for secret_identifier in (secret_name, f"id:{secret_id}"):
resp = test_client.get(f"/api/v1/clients/{client_identifier}/secrets/{secret_identifier}")
assert resp.status_code == 200
resp_body = resp.json()
assert "id" in resp_body
assert resp_body["id"] == secret_id