Filter out deleted an previous version in count Remove todo comment Allow explicit ID specification Update tests
610 lines
19 KiB
Python
610 lines
19 KiB
Python
"""Tests of the backend api using pytest."""
|
|
|
|
import uuid
|
|
import logging
|
|
from pathlib import Path
|
|
from httpx import Response
|
|
import pytest
|
|
|
|
from fastapi.testclient import TestClient
|
|
|
|
from sshecret.crypto import generate_private_key, generate_public_key_string
|
|
from sshecret_backend.app import create_backend_app
|
|
from sshecret_backend.testing import create_test_token
|
|
from sshecret_backend.api.audit.schemas import AuditView
|
|
from sshecret_backend.settings import BackendSettings
|
|
|
|
|
|
LOG = logging.getLogger()
|
|
handler = logging.StreamHandler()
|
|
formatter = logging.Formatter("'%(asctime)s - %(levelname)s - %(message)s'")
|
|
handler.setFormatter(formatter)
|
|
LOG.addHandler(handler)
|
|
#LOG.setLevel(logging.DEBUG)
|
|
|
|
|
|
def make_test_key() -> str:
|
|
"""Generate a test key."""
|
|
private_key = generate_private_key()
|
|
return generate_public_key_string(private_key.public_key())
|
|
|
|
|
|
def create_client(
|
|
test_client: TestClient,
|
|
name: str,
|
|
public_key: str | None = None,
|
|
description: str | None = None,
|
|
) -> Response:
|
|
"""Create client."""
|
|
if not public_key:
|
|
public_key = make_test_key()
|
|
data = {
|
|
"name": name,
|
|
"public_key": public_key,
|
|
}
|
|
if description:
|
|
data["description"] = description
|
|
create_response = test_client.post("/api/v1/clients", json=data)
|
|
return create_response
|
|
|
|
|
|
@pytest.fixture(name="test_client")
|
|
def create_client_fixture(tmp_path: Path):
|
|
"""Test client fixture."""
|
|
|
|
db_file = tmp_path / "backend.db"
|
|
print(f"DB File: {db_file.absolute()}")
|
|
settings = BackendSettings(database=str(db_file.absolute()))
|
|
app = create_backend_app(settings)
|
|
|
|
token = create_test_token(settings)
|
|
|
|
test_client = TestClient(app, headers={"X-API-Token": token})
|
|
yield test_client
|
|
|
|
|
|
def test_missing_token(test_client: TestClient) -> None:
|
|
"""Test logging in with missing token."""
|
|
# Save headers
|
|
old_headers = test_client.headers
|
|
test_client.headers = {}
|
|
response = test_client.get("/api/v1/clients/", headers={})
|
|
assert response.status_code == 422
|
|
test_client.headers = old_headers
|
|
|
|
|
|
def test_incorrect_token(test_client: TestClient) -> None:
|
|
"""Test logging in with missing token."""
|
|
response = test_client.get("/api/v1/clients/", headers={"X-API-Token": "WRONG"})
|
|
assert response.status_code == 401
|
|
|
|
|
|
def test_with_token(test_client: TestClient) -> None:
|
|
"""Test with a valid token."""
|
|
response = test_client.get("/api/v1/clients/")
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["total_results"] == 0
|
|
|
|
|
|
def test_create_client(test_client: TestClient) -> None:
|
|
"""Test creating a client."""
|
|
client_name = "test"
|
|
client_publickey = make_test_key()
|
|
create_response = create_client(test_client, client_name, client_publickey)
|
|
assert create_response.status_code == 200
|
|
response = test_client.get("/api/v1/clients")
|
|
assert response.status_code == 200
|
|
clients_result = response.json()
|
|
clients = clients_result["clients"]
|
|
assert isinstance(clients, list)
|
|
client = clients[0]
|
|
assert isinstance(client, dict)
|
|
assert client.get("name") == client_name
|
|
assert client.get("created_at") is not None
|
|
|
|
|
|
def test_delete_client(test_client: TestClient) -> None:
|
|
"""Test creating a client."""
|
|
client_name = "test"
|
|
create_response = create_client(
|
|
test_client,
|
|
client_name,
|
|
)
|
|
assert create_response.status_code == 200
|
|
|
|
resp = test_client.delete("/api/v1/clients/test")
|
|
assert resp.status_code == 200
|
|
|
|
resp = test_client.get("/api/v1/clients/test")
|
|
assert resp.status_code == 404
|
|
|
|
resp = test_client.get("/api/v1/clients/")
|
|
assert resp.status_code == 200
|
|
|
|
|
|
def test_add_secret(test_client: TestClient) -> None:
|
|
"""Test adding a secret to a client."""
|
|
client_name = "test"
|
|
client_publickey = make_test_key()
|
|
create_response = create_client(
|
|
test_client,
|
|
client_name,
|
|
client_publickey,
|
|
)
|
|
assert create_response.status_code == 200
|
|
secret_name = "mysecret"
|
|
secret_value = "shhhh"
|
|
data = {"name": secret_name, "secret": secret_value, "description": "A test secret"}
|
|
response = test_client.post("/api/v1/clients/test/secrets/", json=data)
|
|
assert response.status_code == 200
|
|
# Get it back
|
|
get_response = test_client.get("/api/v1/clients/test/secrets/mysecret")
|
|
assert get_response.status_code == 200
|
|
secret_body = get_response.json()
|
|
assert secret_body["name"] == data["name"]
|
|
assert secret_body["secret"] == data["secret"]
|
|
|
|
|
|
def test_delete_secret(test_client: TestClient) -> None:
|
|
"""Test deleting a secret."""
|
|
test_add_secret(test_client)
|
|
resp = test_client.delete("/api/v1/clients/test/secrets/mysecret")
|
|
assert resp.status_code == 200
|
|
get_response = test_client.get("/api/v1/clients/test/secrets/mysecret")
|
|
assert get_response.status_code == 404
|
|
|
|
|
|
def test_put_add_secret(test_client: TestClient) -> None:
|
|
"""Test adding secret via PUT."""
|
|
# Use the test_create_client function to create a client.
|
|
test_create_client(test_client)
|
|
secret_name = "mysecret"
|
|
secret_value = "shhhh"
|
|
data = {"name": secret_name, "secret": secret_value, "description": None}
|
|
response = test_client.put(
|
|
"/api/v1/clients/test/secrets/mysecret",
|
|
json={"value": secret_value},
|
|
)
|
|
assert response.status_code == 200
|
|
response_model = response.json()
|
|
del response_model["created_at"]
|
|
del response_model["updated_at"]
|
|
for key, value in data.items():
|
|
assert response_model.get(key) == value
|
|
|
|
def test_put_update_secret(test_client: TestClient) -> None:
|
|
"""Test updating a client secret."""
|
|
test_add_secret(test_client)
|
|
new_value = "itsasecret"
|
|
update_response = test_client.put(
|
|
"/api/v1/clients/test/secrets/mysecret",
|
|
json={"value": new_value},
|
|
)
|
|
assert update_response.status_code == 200
|
|
expected = {"name": "mysecret", "secret": new_value}
|
|
response_model = update_response.json()
|
|
|
|
assert {
|
|
"name": response_model["name"],
|
|
"secret": response_model["secret"],
|
|
} == expected
|
|
# Ensure that the updated_at has been set.
|
|
assert "updated_at" in response_model
|
|
|
|
|
|
def test_audit_logging(test_client: TestClient) -> None:
|
|
"""Test audit logging."""
|
|
public_key = make_test_key()
|
|
create_client_resp = create_client(test_client, "test", public_key)
|
|
assert create_client_resp.status_code == 200
|
|
secrets = {"secret1": "foo", "secret2": "bar", "secret3": "baz"}
|
|
for name, secret in secrets.items():
|
|
add_resp = test_client.post(
|
|
"/api/v1/clients/test/secrets/",
|
|
json={"name": name, "secret": secret},
|
|
)
|
|
assert add_resp.status_code == 200
|
|
|
|
# Fetch the entire client.
|
|
get_client_resp = test_client.get("/api/v1/clients/test")
|
|
assert get_client_resp.status_code == 200
|
|
|
|
# Fetch the audit log
|
|
audit_log_resp = test_client.get("/api/v1/audit/")
|
|
assert audit_log_resp.status_code == 200
|
|
audit_logs = audit_log_resp.json()
|
|
assert isinstance(audit_logs, dict)
|
|
audit_count = audit_logs.get("total")
|
|
assert audit_count is not None
|
|
assert audit_count > 0
|
|
assert "results" in audit_logs
|
|
|
|
for entry in audit_logs["results"]:
|
|
# Let's try to reassemble the objects
|
|
audit_log = AuditView.model_validate(entry)
|
|
assert audit_log is not None
|
|
|
|
def test_secret_invalidation(test_client: TestClient) -> None:
|
|
"""Test secret invalidation."""
|
|
initial_key = make_test_key()
|
|
create_client_resp = create_client(test_client, "test", initial_key)
|
|
assert create_client_resp.status_code == 200
|
|
secrets = {"secret1": "foo", "secret2": "bar", "secret3": "baz"}
|
|
for name, secret in secrets.items():
|
|
add_resp = test_client.post(
|
|
"/api/v1/clients/test/secrets/",
|
|
json={"name": name, "secret": secret},
|
|
)
|
|
assert add_resp.status_code == 200
|
|
|
|
# Update the public-key. This should cause all secrets to be invalidated
|
|
# and no longer associated with a client.
|
|
new_key = make_test_key()
|
|
update_resp = test_client.post(
|
|
"/api/v1/clients/test/public-key",
|
|
json={"public_key": new_key},
|
|
)
|
|
assert update_resp.status_code == 200
|
|
|
|
# Fetch the client. The list of secrets should be empty.
|
|
get_resp = test_client.get("/api/v1/clients/test")
|
|
assert get_resp.status_code == 200
|
|
client = get_resp.json()
|
|
secrets = client.get("secrets")
|
|
assert bool(secrets) is False
|
|
|
|
|
|
def test_client_default_policies(
|
|
test_client: TestClient,
|
|
) -> None:
|
|
"""Test client policies."""
|
|
public_key = make_test_key()
|
|
resp = create_client(test_client, "test")
|
|
assert resp.status_code == 200
|
|
# Fetch policies, should return *
|
|
resp = test_client.get("/api/v1/clients/test/policies/")
|
|
assert resp.status_code == 200
|
|
|
|
policies = resp.json()
|
|
|
|
assert policies["sources"] == ["0.0.0.0/0", "::/0"]
|
|
|
|
|
|
def test_client_policy_update_one(test_client: TestClient) -> None:
|
|
"""Update client policy with single policy."""
|
|
public_key = make_test_key()
|
|
resp = create_client(test_client, "test", public_key)
|
|
assert resp.status_code == 200
|
|
|
|
policy = ["192.0.2.1"]
|
|
resp = test_client.put("/api/v1/clients/test/policies/", json={"sources": policy})
|
|
assert resp.status_code == 200
|
|
|
|
resp = test_client.get("/api/v1/clients/test/policies/")
|
|
assert resp.status_code == 200
|
|
|
|
policies = resp.json()
|
|
|
|
assert policies["sources"] == policy
|
|
|
|
|
|
def test_client_policy_update_advanced(test_client: TestClient) -> None:
|
|
"""Test other policy update scenarios."""
|
|
public_key = make_test_key()
|
|
resp = create_client(test_client, "test", public_key)
|
|
assert resp.status_code == 200
|
|
|
|
policy = ["192.0.2.1", "198.18.0.0/24"]
|
|
|
|
resp = test_client.put("/api/v1/clients/test/policies/", json={"sources": policy})
|
|
assert resp.status_code == 200
|
|
|
|
resp = test_client.get("/api/v1/clients/test/policies/")
|
|
assert resp.status_code == 200
|
|
|
|
policies = resp.json()
|
|
|
|
assert "192.0.2.1" in policies["sources"]
|
|
assert "198.18.0.0/24" in policies["sources"]
|
|
|
|
# Try to set it to something incorrect
|
|
|
|
policy = ["obviosly_wrong"]
|
|
|
|
resp = test_client.put("/api/v1/clients/test/policies/", json={"sources": policy})
|
|
assert resp.status_code == 422
|
|
# Check that the old value is still there
|
|
|
|
resp = test_client.get("/api/v1/clients/test/policies/")
|
|
assert resp.status_code == 200
|
|
|
|
policies = resp.json()
|
|
|
|
assert "192.0.2.1" in policies["sources"]
|
|
assert "198.18.0.0/24" in policies["sources"]
|
|
|
|
# Clear the policies
|
|
#
|
|
|
|
|
|
def test_client_policy_update_unset(test_client: TestClient) -> None:
|
|
"""Test clearing the client policy."""
|
|
public_key = make_test_key()
|
|
resp = create_client(test_client, "test", public_key)
|
|
assert resp.status_code == 200
|
|
policy = ["192.0.2.1", "198.18.0.0/24"]
|
|
|
|
resp = test_client.put("/api/v1/clients/test/policies/", json={"sources": policy})
|
|
assert resp.status_code == 200
|
|
|
|
policies = resp.json()
|
|
|
|
assert "192.0.2.1" in policies["sources"]
|
|
assert "198.18.0.0/24" in policies["sources"]
|
|
|
|
# Now we clear the policies
|
|
|
|
resp = test_client.put("/api/v1/clients/test/policies/", json={"sources": []})
|
|
assert resp.status_code == 200
|
|
|
|
policies = resp.json()
|
|
|
|
assert policies["sources"] == ["0.0.0.0/0", "::/0"]
|
|
|
|
|
|
def test_client_update(test_client: TestClient) -> None:
|
|
"""Test generic update of a client."""
|
|
|
|
public_key = make_test_key()
|
|
resp = create_client(test_client, "test", public_key, "PRE")
|
|
assert resp.status_code == 200
|
|
resp = test_client.get("/api/v1/clients/test")
|
|
assert resp.status_code == 200
|
|
client_data = resp.json()
|
|
assert client_data["description"] == "PRE"
|
|
|
|
# Update the description
|
|
new_client_data = {
|
|
"name": "test",
|
|
"description": "POST",
|
|
"public_key": client_data["public_key"],
|
|
}
|
|
resp = test_client.put("/api/v1/clients/test", json=new_client_data)
|
|
assert resp.status_code == 200
|
|
|
|
client_data = resp.json()
|
|
assert client_data["description"] == "POST"
|
|
|
|
resp = test_client.get("/api/v1/clients/test")
|
|
assert resp.status_code == 200
|
|
client_data = resp.json()
|
|
assert client_data["description"] == "POST"
|
|
|
|
|
|
def test_get_secret_list(test_client: TestClient) -> None:
|
|
"""Test the secret to client map view."""
|
|
# Make 4 clients
|
|
for x in range(4):
|
|
public_key = make_test_key()
|
|
create_client(test_client, f"client-{x}", public_key)
|
|
# Create a secret that only this client has.
|
|
resp = test_client.put(
|
|
f"/api/v1/clients/client-{x}/secrets/client-{x}", json={"value": "SECRET"}
|
|
)
|
|
assert resp.status_code == 200
|
|
# Create a secret that all of them have.
|
|
resp = test_client.put(
|
|
f"/api/v1/clients/client-{x}/secrets/commonsecret", json={"value": "SECRET"}
|
|
)
|
|
assert resp.status_code == 200
|
|
|
|
# Get the secret list
|
|
resp = test_client.get("/api/v1/secrets/")
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert isinstance(data, list)
|
|
assert len(data) == 5
|
|
for entry in data:
|
|
if entry["name"] == "commonsecret":
|
|
assert len(entry["clients"]) == 4
|
|
else:
|
|
assert len(entry["clients"]) == 1
|
|
assert entry["clients"][0]["name"] == entry["name"]
|
|
|
|
|
|
def test_get_secret_clients(test_client: TestClient) -> None:
|
|
"""Get the clients for a single secret."""
|
|
for x in range(4):
|
|
public_key = make_test_key()
|
|
create_client(test_client, f"client-{x}", public_key)
|
|
# Create a secret that every second of them have.
|
|
if x % 2 == 1:
|
|
continue
|
|
resp = test_client.put(
|
|
f"/api/v1/clients/client-{x}/secrets/commonsecret", json={"value": "SECRET"}
|
|
)
|
|
assert resp.status_code == 200
|
|
|
|
resp = test_client.get("/api/v1/secrets/commonsecret")
|
|
assert resp.status_code == 200
|
|
|
|
data = resp.json()
|
|
assert data["name"] == "commonsecret"
|
|
client_names = [client["name"] for client in data["clients"]]
|
|
assert "client-0" in client_names
|
|
assert "client-1" not in client_names
|
|
assert len(data["clients"]) == 2
|
|
|
|
|
|
def test_searching(test_client: TestClient) -> None:
|
|
"""Test searching."""
|
|
for x in range(4):
|
|
# Create four clients
|
|
create_client(test_client, f"client-{x}")
|
|
|
|
# Create one with a different name.
|
|
create_client(test_client, "othername")
|
|
|
|
# Search for a specific one.
|
|
resp = test_client.get("/api/v1/clients/", params={"name": "othername"})
|
|
assert resp.status_code == 200
|
|
result = resp.json()
|
|
assert result["total_results"] == 1
|
|
assert result["clients"][0]["name"] == "othername"
|
|
client_id = result["clients"][0]["id"]
|
|
|
|
# Search by ID
|
|
resp = test_client.get("/api/v1/clients/", params={"id": client_id})
|
|
assert resp.status_code == 200
|
|
result = resp.json()
|
|
assert result["total_results"] == 1
|
|
assert result["clients"][0]["name"] == "othername"
|
|
|
|
# Search for the four similarly named ones
|
|
resp = test_client.get("/api/v1/clients/", params={"name__like": "client-%"})
|
|
assert resp.status_code == 200
|
|
result = resp.json()
|
|
assert result["total_results"] == 4
|
|
assert str(result["clients"][0]["name"]).startswith("client-")
|
|
|
|
|
|
def test_operations_with_id(test_client: TestClient) -> None:
|
|
"""Test operations using ID instead of name."""
|
|
create_client(test_client, "test")
|
|
resp = test_client.get("/api/v1/clients/")
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
client = data["clients"][0]
|
|
client_id = client["id"]
|
|
resp = test_client.get(f"/api/v1/clients/id:{client_id}")
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["name"] == "test"
|
|
|
|
|
|
def test_write_audit_log(test_client: TestClient) -> None:
|
|
"""Test writing to the audit log."""
|
|
params = {
|
|
"subsystem": "backend",
|
|
"operation": "read",
|
|
"message": "Test Message"
|
|
}
|
|
resp = test_client.post("/api/v1/audit", json=params)
|
|
assert resp.status_code == 200
|
|
|
|
resp = test_client.get("/api/v1/audit")
|
|
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
entry = data["results"][0]
|
|
for key, value in params.items():
|
|
assert entry[key] == value
|
|
|
|
|
|
def test_filter_audit_log(test_client: TestClient) -> None:
|
|
"""Test filtering of audit logs."""
|
|
# prepare some audit logs
|
|
messages = [
|
|
{
|
|
"subsystem": "backend",
|
|
"operation": "login",
|
|
"client_id": str(uuid.uuid4()),
|
|
"client_name": "foo",
|
|
"origin": "192.0.2.1",
|
|
"message": "message1",
|
|
},
|
|
{
|
|
"subsystem": "backend",
|
|
"operation": "create",
|
|
"client_id": str(uuid.uuid4()),
|
|
"client_name": "foo",
|
|
"origin": "192.0.2.1",
|
|
"secret_id": str(uuid.uuid4()),
|
|
"message": "message2",
|
|
},
|
|
{
|
|
"subsystem": "test",
|
|
"operation": "deny",
|
|
"client_id": str(uuid.uuid4()),
|
|
"client_name": "bar",
|
|
"origin": "192.0.2.2",
|
|
"message": "message3",
|
|
},
|
|
]
|
|
for message in messages:
|
|
test_client.post("/api/v1/audit", json=message)
|
|
|
|
# find all client_name=foo entries
|
|
resp = test_client.get("/api/v1/audit/", params={"client_name": "foo"})
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
|
|
assert data["total"] == 2
|
|
assert len(data["results"]) == 2
|
|
|
|
# Get the one message from 'bar'
|
|
resp = test_client.get("/api/v1/audit/", params={"client_name": "bar"})
|
|
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
|
|
assert data["total"] == 1
|
|
assert len(data["results"]) == 1
|
|
|
|
assert data["results"][0]["operation"] == "deny"
|
|
|
|
# test combining fields to get the login event
|
|
|
|
resp = test_client.get("/api/v1/audit/", params={"client_name": "foo", "operation": "login"})
|
|
data = resp.json()
|
|
|
|
assert data["total"] == 1
|
|
|
|
assert data["results"][0]["operation"] == "login"
|
|
assert data["results"][0]["message"] == "message1"
|
|
|
|
|
|
def test_secret_flexid(test_client: TestClient) -> None:
|
|
"""Test flexible IDs in the secret API."""
|
|
client_name = "test"
|
|
create_response = create_client(
|
|
test_client,
|
|
client_name,
|
|
)
|
|
assert create_response.status_code == 200
|
|
assert "id" in create_response.json()
|
|
client_id = create_response.json()["id"]
|
|
|
|
# Create a secret using the client name
|
|
secrets: dict[str, str] = {}
|
|
resp = test_client.put(
|
|
"/api/v1/clients/test/secrets/clientnamesecret",
|
|
json={"value": "secret"},
|
|
)
|
|
assert resp.status_code == 200
|
|
secret_data = resp.json()
|
|
assert "id" in secret_data
|
|
secrets["clientnamesecret"] = secret_data["id"]
|
|
|
|
# Create one using the client ID
|
|
resp = test_client.put(
|
|
f"/api/v1/clients/id:{client_id}/secrets/clientidsecret",
|
|
json={"value": "secret"},
|
|
)
|
|
assert resp.status_code == 200
|
|
|
|
secret_data = resp.json()
|
|
assert "id" in secret_data
|
|
secrets["clientidsecret"] = secret_data["id"]
|
|
|
|
# Let's try fetching the various permutations
|
|
for client_identifier in ("test", f"id:{client_id}"):
|
|
for secret_name, secret_id in secrets.items():
|
|
for secret_identifier in (secret_name, f"id:{secret_id}"):
|
|
resp = test_client.get(f"/api/v1/clients/{client_identifier}/secrets/{secret_identifier}")
|
|
assert resp.status_code == 200
|
|
resp_body = resp.json()
|
|
assert "id" in resp_body
|
|
assert resp_body["id"] == secret_id
|