607 lines
19 KiB
Python
607 lines
19 KiB
Python
"""Tests of the backend api using pytest."""
|
|
|
|
import uuid
|
|
import logging
|
|
from pathlib import Path
|
|
from httpx import Response
|
|
import pytest
|
|
|
|
from fastapi.testclient import TestClient
|
|
|
|
from sshecret.crypto import generate_private_key, generate_public_key_string
|
|
from sshecret_backend.app import create_backend_app
|
|
from sshecret_backend.testing import create_test_token
|
|
from sshecret_backend.api.audit.schemas import AuditView
|
|
from sshecret_backend.settings import BackendSettings
|
|
|
|
|
|
LOG = logging.getLogger()
|
|
handler = logging.StreamHandler()
|
|
formatter = logging.Formatter("'%(asctime)s - %(levelname)s - %(message)s'")
|
|
handler.setFormatter(formatter)
|
|
LOG.addHandler(handler)
|
|
#LOG.setLevel(logging.DEBUG)
|
|
|
|
|
|
def make_test_key() -> str:
|
|
"""Generate a test key."""
|
|
private_key = generate_private_key()
|
|
return generate_public_key_string(private_key.public_key())
|
|
|
|
|
|
def create_client(
|
|
test_client: TestClient,
|
|
name: str,
|
|
public_key: str | None = None,
|
|
description: str | None = None,
|
|
) -> Response:
|
|
"""Create client."""
|
|
if not public_key:
|
|
public_key = make_test_key()
|
|
data = {
|
|
"name": name,
|
|
"public_key": public_key,
|
|
}
|
|
if description:
|
|
data["description"] = description
|
|
create_response = test_client.post("/api/v1/clients", json=data)
|
|
return create_response
|
|
|
|
|
|
@pytest.fixture(name="test_client")
|
|
def create_client_fixture(tmp_path: Path):
|
|
"""Test client fixture."""
|
|
|
|
db_file = tmp_path / "backend.db"
|
|
print(f"DB File: {db_file.absolute()}")
|
|
settings = BackendSettings(database=str(db_file.absolute()))
|
|
app = create_backend_app(settings)
|
|
|
|
token = create_test_token(settings)
|
|
|
|
test_client = TestClient(app, headers={"X-API-Token": token})
|
|
yield test_client
|
|
|
|
|
|
def test_missing_token(test_client: TestClient) -> None:
|
|
"""Test logging in with missing token."""
|
|
# Save headers
|
|
old_headers = test_client.headers
|
|
test_client.headers = {}
|
|
response = test_client.get("/api/v1/clients/", headers={})
|
|
assert response.status_code == 422
|
|
test_client.headers = old_headers
|
|
|
|
|
|
def test_incorrect_token(test_client: TestClient) -> None:
|
|
"""Test logging in with missing token."""
|
|
response = test_client.get("/api/v1/clients/", headers={"X-API-Token": "WRONG"})
|
|
assert response.status_code == 401
|
|
|
|
|
|
def test_with_token(test_client: TestClient) -> None:
|
|
"""Test with a valid token."""
|
|
response = test_client.get("/api/v1/clients/")
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["total_results"] == 0
|
|
|
|
|
|
def test_create_client(test_client: TestClient) -> None:
|
|
"""Test creating a client."""
|
|
client_name = "test"
|
|
client_publickey = make_test_key()
|
|
create_response = create_client(test_client, client_name, client_publickey)
|
|
assert create_response.status_code == 200
|
|
response = test_client.get("/api/v1/clients")
|
|
assert response.status_code == 200
|
|
clients_result = response.json()
|
|
clients = clients_result["clients"]
|
|
assert isinstance(clients, list)
|
|
client = clients[0]
|
|
assert isinstance(client, dict)
|
|
assert client.get("name") == client_name
|
|
assert client.get("created_at") is not None
|
|
|
|
|
|
def test_delete_client(test_client: TestClient) -> None:
|
|
"""Test creating a client."""
|
|
client_name = "test"
|
|
create_response = create_client(
|
|
test_client,
|
|
client_name,
|
|
)
|
|
assert create_response.status_code == 200
|
|
|
|
resp = test_client.delete("/api/v1/clients/test")
|
|
assert resp.status_code == 200
|
|
|
|
resp = test_client.get("/api/v1/clients/test")
|
|
assert resp.status_code == 404
|
|
|
|
|
|
def test_add_secret(test_client: TestClient) -> None:
|
|
"""Test adding a secret to a client."""
|
|
client_name = "test"
|
|
client_publickey = make_test_key()
|
|
create_response = create_client(
|
|
test_client,
|
|
client_name,
|
|
client_publickey,
|
|
)
|
|
assert create_response.status_code == 200
|
|
secret_name = "mysecret"
|
|
secret_value = "shhhh"
|
|
data = {"name": secret_name, "secret": secret_value, "description": "A test secret"}
|
|
response = test_client.post("/api/v1/clients/test/secrets/", json=data)
|
|
assert response.status_code == 200
|
|
# Get it back
|
|
get_response = test_client.get("/api/v1/clients/test/secrets/mysecret")
|
|
assert get_response.status_code == 200
|
|
secret_body = get_response.json()
|
|
assert secret_body["name"] == data["name"]
|
|
assert secret_body["secret"] == data["secret"]
|
|
|
|
|
|
def test_delete_secret(test_client: TestClient) -> None:
|
|
"""Test deleting a secret."""
|
|
test_add_secret(test_client)
|
|
resp = test_client.delete("/api/v1/clients/test/secrets/mysecret")
|
|
assert resp.status_code == 200
|
|
get_response = test_client.get("/api/v1/clients/test/secrets/mysecret")
|
|
assert get_response.status_code == 404
|
|
|
|
|
|
def test_put_add_secret(test_client: TestClient) -> None:
|
|
"""Test adding secret via PUT."""
|
|
# Use the test_create_client function to create a client.
|
|
test_create_client(test_client)
|
|
secret_name = "mysecret"
|
|
secret_value = "shhhh"
|
|
data = {"name": secret_name, "secret": secret_value, "description": None}
|
|
response = test_client.put(
|
|
"/api/v1/clients/test/secrets/mysecret",
|
|
json={"value": secret_value},
|
|
)
|
|
assert response.status_code == 200
|
|
response_model = response.json()
|
|
del response_model["created_at"]
|
|
del response_model["updated_at"]
|
|
for key, value in data.items():
|
|
assert response_model.get(key) == value
|
|
|
|
def test_put_update_secret(test_client: TestClient) -> None:
|
|
"""Test updating a client secret."""
|
|
test_add_secret(test_client)
|
|
new_value = "itsasecret"
|
|
update_response = test_client.put(
|
|
"/api/v1/clients/test/secrets/mysecret",
|
|
json={"value": new_value},
|
|
)
|
|
assert update_response.status_code == 200
|
|
expected = {"name": "mysecret", "secret": new_value}
|
|
response_model = update_response.json()
|
|
|
|
assert {
|
|
"name": response_model["name"],
|
|
"secret": response_model["secret"],
|
|
} == expected
|
|
# Ensure that the updated_at has been set.
|
|
assert "updated_at" in response_model
|
|
|
|
|
|
def test_audit_logging(test_client: TestClient) -> None:
|
|
"""Test audit logging."""
|
|
public_key = make_test_key()
|
|
create_client_resp = create_client(test_client, "test", public_key)
|
|
assert create_client_resp.status_code == 200
|
|
secrets = {"secret1": "foo", "secret2": "bar", "secret3": "baz"}
|
|
for name, secret in secrets.items():
|
|
add_resp = test_client.post(
|
|
"/api/v1/clients/test/secrets/",
|
|
json={"name": name, "secret": secret},
|
|
)
|
|
assert add_resp.status_code == 200
|
|
|
|
# Fetch the entire client.
|
|
get_client_resp = test_client.get("/api/v1/clients/test")
|
|
assert get_client_resp.status_code == 200
|
|
|
|
# Fetch the audit log
|
|
audit_log_resp = test_client.get("/api/v1/audit/")
|
|
assert audit_log_resp.status_code == 200
|
|
audit_logs = audit_log_resp.json()
|
|
assert isinstance(audit_logs, dict)
|
|
audit_count = audit_logs.get("total")
|
|
assert audit_count is not None
|
|
assert audit_count > 0
|
|
assert "results" in audit_logs
|
|
|
|
for entry in audit_logs["results"]:
|
|
# Let's try to reassemble the objects
|
|
audit_log = AuditView.model_validate(entry)
|
|
assert audit_log is not None
|
|
|
|
def test_secret_invalidation(test_client: TestClient) -> None:
|
|
"""Test secret invalidation."""
|
|
initial_key = make_test_key()
|
|
create_client_resp = create_client(test_client, "test", initial_key)
|
|
assert create_client_resp.status_code == 200
|
|
secrets = {"secret1": "foo", "secret2": "bar", "secret3": "baz"}
|
|
for name, secret in secrets.items():
|
|
add_resp = test_client.post(
|
|
"/api/v1/clients/test/secrets/",
|
|
json={"name": name, "secret": secret},
|
|
)
|
|
assert add_resp.status_code == 200
|
|
|
|
# Update the public-key. This should cause all secrets to be invalidated
|
|
# and no longer associated with a client.
|
|
new_key = make_test_key()
|
|
update_resp = test_client.post(
|
|
"/api/v1/clients/test/public-key",
|
|
json={"public_key": new_key},
|
|
)
|
|
assert update_resp.status_code == 200
|
|
|
|
# Fetch the client. The list of secrets should be empty.
|
|
get_resp = test_client.get("/api/v1/clients/test")
|
|
assert get_resp.status_code == 200
|
|
client = get_resp.json()
|
|
secrets = client.get("secrets")
|
|
assert bool(secrets) is False
|
|
|
|
|
|
def test_client_default_policies(
|
|
test_client: TestClient,
|
|
) -> None:
|
|
"""Test client policies."""
|
|
public_key = make_test_key()
|
|
resp = create_client(test_client, "test")
|
|
assert resp.status_code == 200
|
|
# Fetch policies, should return *
|
|
resp = test_client.get("/api/v1/clients/test/policies/")
|
|
assert resp.status_code == 200
|
|
|
|
policies = resp.json()
|
|
|
|
assert policies["sources"] == ["0.0.0.0/0", "::/0"]
|
|
|
|
|
|
def test_client_policy_update_one(test_client: TestClient) -> None:
|
|
"""Update client policy with single policy."""
|
|
public_key = make_test_key()
|
|
resp = create_client(test_client, "test", public_key)
|
|
assert resp.status_code == 200
|
|
|
|
policy = ["192.0.2.1"]
|
|
resp = test_client.put("/api/v1/clients/test/policies/", json={"sources": policy})
|
|
assert resp.status_code == 200
|
|
|
|
resp = test_client.get("/api/v1/clients/test/policies/")
|
|
assert resp.status_code == 200
|
|
|
|
policies = resp.json()
|
|
|
|
assert policies["sources"] == policy
|
|
|
|
|
|
def test_client_policy_update_advanced(test_client: TestClient) -> None:
|
|
"""Test other policy update scenarios."""
|
|
public_key = make_test_key()
|
|
resp = create_client(test_client, "test", public_key)
|
|
assert resp.status_code == 200
|
|
|
|
policy = ["192.0.2.1", "198.18.0.0/24"]
|
|
|
|
resp = test_client.put("/api/v1/clients/test/policies/", json={"sources": policy})
|
|
assert resp.status_code == 200
|
|
|
|
resp = test_client.get("/api/v1/clients/test/policies/")
|
|
assert resp.status_code == 200
|
|
|
|
policies = resp.json()
|
|
|
|
assert "192.0.2.1" in policies["sources"]
|
|
assert "198.18.0.0/24" in policies["sources"]
|
|
|
|
# Try to set it to something incorrect
|
|
|
|
policy = ["obviosly_wrong"]
|
|
|
|
resp = test_client.put("/api/v1/clients/test/policies/", json={"sources": policy})
|
|
assert resp.status_code == 422
|
|
# Check that the old value is still there
|
|
|
|
resp = test_client.get("/api/v1/clients/test/policies/")
|
|
assert resp.status_code == 200
|
|
|
|
policies = resp.json()
|
|
|
|
assert "192.0.2.1" in policies["sources"]
|
|
assert "198.18.0.0/24" in policies["sources"]
|
|
|
|
# Clear the policies
|
|
#
|
|
|
|
|
|
def test_client_policy_update_unset(test_client: TestClient) -> None:
|
|
"""Test clearing the client policy."""
|
|
public_key = make_test_key()
|
|
resp = create_client(test_client, "test", public_key)
|
|
assert resp.status_code == 200
|
|
policy = ["192.0.2.1", "198.18.0.0/24"]
|
|
|
|
resp = test_client.put("/api/v1/clients/test/policies/", json={"sources": policy})
|
|
assert resp.status_code == 200
|
|
|
|
policies = resp.json()
|
|
|
|
assert "192.0.2.1" in policies["sources"]
|
|
assert "198.18.0.0/24" in policies["sources"]
|
|
|
|
# Now we clear the policies
|
|
|
|
resp = test_client.put("/api/v1/clients/test/policies/", json={"sources": []})
|
|
assert resp.status_code == 200
|
|
|
|
policies = resp.json()
|
|
|
|
assert policies["sources"] == ["0.0.0.0/0", "::/0"]
|
|
|
|
|
|
def test_client_update(test_client: TestClient) -> None:
|
|
"""Test generic update of a client."""
|
|
|
|
public_key = make_test_key()
|
|
resp = create_client(test_client, "test", public_key, "PRE")
|
|
assert resp.status_code == 200
|
|
resp = test_client.get("/api/v1/clients/test")
|
|
assert resp.status_code == 200
|
|
client_data = resp.json()
|
|
assert client_data["description"] == "PRE"
|
|
|
|
# Update the description
|
|
new_client_data = {
|
|
"name": "test",
|
|
"description": "POST",
|
|
"public_key": client_data["public_key"],
|
|
}
|
|
resp = test_client.put("/api/v1/clients/test", json=new_client_data)
|
|
assert resp.status_code == 200
|
|
|
|
client_data = resp.json()
|
|
assert client_data["description"] == "POST"
|
|
|
|
resp = test_client.get("/api/v1/clients/test")
|
|
assert resp.status_code == 200
|
|
client_data = resp.json()
|
|
assert client_data["description"] == "POST"
|
|
|
|
|
|
def test_get_secret_list(test_client: TestClient) -> None:
|
|
"""Test the secret to client map view."""
|
|
# Make 4 clients
|
|
for x in range(4):
|
|
public_key = make_test_key()
|
|
create_client(test_client, f"client-{x}", public_key)
|
|
# Create a secret that only this client has.
|
|
resp = test_client.put(
|
|
f"/api/v1/clients/client-{x}/secrets/client-{x}", json={"value": "SECRET"}
|
|
)
|
|
assert resp.status_code == 200
|
|
# Create a secret that all of them have.
|
|
resp = test_client.put(
|
|
f"/api/v1/clients/client-{x}/secrets/commonsecret", json={"value": "SECRET"}
|
|
)
|
|
assert resp.status_code == 200
|
|
|
|
# Get the secret list
|
|
resp = test_client.get("/api/v1/secrets/")
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert isinstance(data, list)
|
|
assert len(data) == 5
|
|
for entry in data:
|
|
if entry["name"] == "commonsecret":
|
|
assert len(entry["clients"]) == 4
|
|
else:
|
|
assert len(entry["clients"]) == 1
|
|
assert entry["clients"][0]["name"] == entry["name"]
|
|
|
|
|
|
def test_get_secret_clients(test_client: TestClient) -> None:
|
|
"""Get the clients for a single secret."""
|
|
for x in range(4):
|
|
public_key = make_test_key()
|
|
create_client(test_client, f"client-{x}", public_key)
|
|
# Create a secret that every second of them have.
|
|
if x % 2 == 1:
|
|
continue
|
|
resp = test_client.put(
|
|
f"/api/v1/clients/client-{x}/secrets/commonsecret", json={"value": "SECRET"}
|
|
)
|
|
assert resp.status_code == 200
|
|
|
|
resp = test_client.get("/api/v1/secrets/commonsecret")
|
|
assert resp.status_code == 200
|
|
|
|
data = resp.json()
|
|
assert data["name"] == "commonsecret"
|
|
client_names = [client["name"] for client in data["clients"]]
|
|
assert "client-0" in client_names
|
|
assert "client-1" not in client_names
|
|
assert len(data["clients"]) == 2
|
|
|
|
|
|
def test_searching(test_client: TestClient) -> None:
|
|
"""Test searching."""
|
|
for x in range(4):
|
|
# Create four clients
|
|
create_client(test_client, f"client-{x}")
|
|
|
|
# Create one with a different name.
|
|
create_client(test_client, "othername")
|
|
|
|
# Search for a specific one.
|
|
resp = test_client.get("/api/v1/clients/", params={"name": "othername"})
|
|
assert resp.status_code == 200
|
|
result = resp.json()
|
|
assert result["total_results"] == 1
|
|
assert result["clients"][0]["name"] == "othername"
|
|
client_id = result["clients"][0]["id"]
|
|
|
|
# Search by ID
|
|
resp = test_client.get("/api/v1/clients/", params={"id": client_id})
|
|
assert resp.status_code == 200
|
|
result = resp.json()
|
|
assert result["total_results"] == 1
|
|
assert result["clients"][0]["name"] == "othername"
|
|
|
|
# Search for the four similarly named ones
|
|
resp = test_client.get("/api/v1/clients/", params={"name__like": "client-%"})
|
|
assert resp.status_code == 200
|
|
result = resp.json()
|
|
assert result["total_results"] == 4
|
|
assert str(result["clients"][0]["name"]).startswith("client-")
|
|
|
|
|
|
def test_operations_with_id(test_client: TestClient) -> None:
|
|
"""Test operations using ID instead of name."""
|
|
create_client(test_client, "test")
|
|
resp = test_client.get("/api/v1/clients/")
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
client = data["clients"][0]
|
|
client_id = client["id"]
|
|
resp = test_client.get(f"/api/v1/clients/id:{client_id}")
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["name"] == "test"
|
|
|
|
|
|
def test_write_audit_log(test_client: TestClient) -> None:
|
|
"""Test writing to the audit log."""
|
|
params = {
|
|
"subsystem": "backend",
|
|
"operation": "read",
|
|
"message": "Test Message"
|
|
}
|
|
resp = test_client.post("/api/v1/audit", json=params)
|
|
assert resp.status_code == 200
|
|
|
|
resp = test_client.get("/api/v1/audit")
|
|
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
entry = data["results"][0]
|
|
for key, value in params.items():
|
|
assert entry[key] == value
|
|
|
|
|
|
def test_filter_audit_log(test_client: TestClient) -> None:
|
|
"""Test filtering of audit logs."""
|
|
# prepare some audit logs
|
|
messages = [
|
|
{
|
|
"subsystem": "backend",
|
|
"operation": "login",
|
|
"client_id": str(uuid.uuid4()),
|
|
"client_name": "foo",
|
|
"origin": "192.0.2.1",
|
|
"message": "message1",
|
|
},
|
|
{
|
|
"subsystem": "backend",
|
|
"operation": "create",
|
|
"client_id": str(uuid.uuid4()),
|
|
"client_name": "foo",
|
|
"origin": "192.0.2.1",
|
|
"secret_id": str(uuid.uuid4()),
|
|
"message": "message2",
|
|
},
|
|
{
|
|
"subsystem": "test",
|
|
"operation": "deny",
|
|
"client_id": str(uuid.uuid4()),
|
|
"client_name": "bar",
|
|
"origin": "192.0.2.2",
|
|
"message": "message3",
|
|
},
|
|
]
|
|
for message in messages:
|
|
test_client.post("/api/v1/audit", json=message)
|
|
|
|
# find all client_name=foo entries
|
|
resp = test_client.get("/api/v1/audit/", params={"client_name": "foo"})
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
|
|
assert data["total"] == 2
|
|
assert len(data["results"]) == 2
|
|
|
|
# Get the one message from 'bar'
|
|
resp = test_client.get("/api/v1/audit/", params={"client_name": "bar"})
|
|
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
|
|
assert data["total"] == 1
|
|
assert len(data["results"]) == 1
|
|
|
|
assert data["results"][0]["operation"] == "deny"
|
|
|
|
# test combining fields to get the login event
|
|
|
|
resp = test_client.get("/api/v1/audit/", params={"client_name": "foo", "operation": "login"})
|
|
data = resp.json()
|
|
|
|
assert data["total"] == 1
|
|
|
|
assert data["results"][0]["operation"] == "login"
|
|
assert data["results"][0]["message"] == "message1"
|
|
|
|
|
|
def test_secret_flexid(test_client: TestClient) -> None:
|
|
"""Test flexible IDs in the secret API."""
|
|
client_name = "test"
|
|
create_response = create_client(
|
|
test_client,
|
|
client_name,
|
|
)
|
|
assert create_response.status_code == 200
|
|
assert "id" in create_response.json()
|
|
client_id = create_response.json()["id"]
|
|
|
|
# Create a secret using the client name
|
|
secrets: dict[str, str] = {}
|
|
resp = test_client.put(
|
|
"/api/v1/clients/test/secrets/clientnamesecret",
|
|
json={"value": "secret"},
|
|
)
|
|
assert resp.status_code == 200
|
|
secret_data = resp.json()
|
|
assert "id" in secret_data
|
|
secrets["clientnamesecret"] = secret_data["id"]
|
|
|
|
# Create one using the client ID
|
|
resp = test_client.put(
|
|
f"/api/v1/clients/id:{client_id}/secrets/clientidsecret",
|
|
json={"value": "secret"},
|
|
)
|
|
assert resp.status_code == 200
|
|
|
|
secret_data = resp.json()
|
|
assert "id" in secret_data
|
|
secrets["clientidsecret"] = secret_data["id"]
|
|
|
|
# Let's try fetching the various permutations
|
|
for client_identifier in ("test", f"id:{client_id}"):
|
|
for secret_name, secret_id in secrets.items():
|
|
for secret_identifier in (secret_name, f"id:{secret_id}"):
|
|
resp = test_client.get(f"/api/v1/clients/{client_identifier}/secrets/{secret_identifier}")
|
|
assert resp.status_code == 200
|
|
resp_body = resp.json()
|
|
assert "id" in resp_body
|
|
assert resp_body["id"] == secret_id
|