Adapt admin api to use new key format

Filter out deleted an previous version in count

Remove todo comment

Allow explicit ID specification

Update tests
This commit is contained in:
2025-06-09 08:55:37 +02:00
parent 3779e93b8c
commit d86d9a9256
6 changed files with 80 additions and 96 deletions

View File

@ -12,13 +12,12 @@ from sshecret.backend import (
AuditListResult, AuditListResult,
Client, Client,
ClientFilter, ClientFilter,
Secret,
SshecretBackend, SshecretBackend,
Operation, Operation,
SubSystem, SubSystem,
) )
from sshecret.backend.models import DetailedSecrets from sshecret.backend.models import DetailedSecrets
from sshecret.backend.api import AuditAPI from sshecret.backend.api import AuditAPI, KeySpec
from sshecret.crypto import encrypt_string, load_public_key from sshecret.crypto import encrypt_string, load_public_key
from .keepass import PasswordContext, load_password_manager from .keepass import PasswordContext, load_password_manager
@ -113,13 +112,13 @@ class AdminBackend:
except Exception as e: except Exception as e:
raise BackendUnavailableError() from e raise BackendUnavailableError() from e
async def _get_client(self, name: str) -> Client | None: async def _get_client(self, idname: KeySpec) -> Client | None:
"""Get a client from the backend.""" """Get a client from the backend."""
return await self.backend.get_client(name) return await self.backend.get_client(idname)
async def _verify_client_exists(self, name: str) -> None: async def _verify_client_exists(self, idname: KeySpec) -> None:
"""Check that a client exists.""" """Check that a client exists."""
client = await self.backend.get_client(name) client = await self.backend.get_client(idname)
if not client: if not client:
raise ClientNotFoundError() raise ClientNotFoundError()
return None return None
@ -133,10 +132,13 @@ class AdminBackend:
except Exception as e: except Exception as e:
raise BackendUnavailableError() from e raise BackendUnavailableError() from e
async def get_client(self, name: str) -> Client | None: async def get_client(self, name: str, is_id: bool = False) -> Client | None:
"""Get a client from the backend.""" """Get a client from the backend."""
key = name
if is_id:
key = ("id", name)
try: try:
return await self._get_client(name) return await self._get_client(key)
except ClientManagementError: except ClientManagementError:
raise raise
except Exception as e: except Exception as e:
@ -176,16 +178,20 @@ class AdminBackend:
raise BackendUnavailableError() from e raise BackendUnavailableError() from e
async def _update_client_public_key( async def _update_client_public_key(
self, name: str, new_key: str, password_manager: PasswordContext self, name: str, new_key: str, password_manager: PasswordContext, is_id: bool = False,
) -> list[str]: ) -> list[str]:
"""Update client public key.""" """Update client public key."""
LOG.info( LOG.info(
"Updating client %s public key. This will invalidate all existing secrets." "Updating client %s public key. This will invalidate all existing secrets."
) )
client = await self.get_client(name) client = await self.get_client(name, is_id=is_id)
if not client: if not client:
raise ClientNotFoundError() raise ClientNotFoundError()
await self.backend.update_client_key(name, new_key) idname: KeySpec = name
if is_id:
idname = ("id", name)
await self.backend.update_client_key(idname, new_key)
updated_secrets: list[str] = [] updated_secrets: list[str] = []
for secret in client.secrets: for secret in client.secrets:
LOG.debug("Re-encrypting secret %s for client %s", secret, name) LOG.debug("Re-encrypting secret %s for client %s", secret, name)
@ -198,17 +204,17 @@ class AdminBackend:
rsa_public_key = load_public_key(client.public_key.encode()) rsa_public_key = load_public_key(client.public_key.encode())
encrypted = encrypt_string(secret_value, rsa_public_key) encrypted = encrypt_string(secret_value, rsa_public_key)
LOG.debug("Sending new encrypted value to backend.") LOG.debug("Sending new encrypted value to backend.")
await self.backend.create_client_secret(name, secret, encrypted) await self.backend.create_client_secret(idname, secret, encrypted)
updated_secrets.append(secret) updated_secrets.append(secret)
return updated_secrets return updated_secrets
async def update_client_public_key(self, name: str, new_key: str) -> list[str]: async def update_client_public_key(self, name: str, new_key: str, is_id: bool = False) -> list[str]:
"""Update client public key.""" """Update client public key."""
try: try:
with self.password_manager() as password_manager: with self.password_manager() as password_manager:
return await self._update_client_public_key( return await self._update_client_public_key(
name, new_key, password_manager name, new_key, password_manager, is_id=is_id
) )
except ClientManagementError: except ClientManagementError:
raise raise
@ -238,10 +244,13 @@ class AdminBackend:
except Exception as e: except Exception as e:
raise BackendUnavailableError() from e raise BackendUnavailableError() from e
async def update_client_sources(self, name: str, sources: list[str]) -> None: async def update_client_sources(self, name: str, sources: list[str], is_id: bool = False) -> None:
"""Update client sources.""" """Update client sources."""
key: KeySpec = name
if is_id:
key = ("id", name)
try: try:
await self.backend.update_client_sources(name, sources) await self.backend.update_client_sources(key, sources)
except Exception as e: except Exception as e:
raise BackendUnavailableError() from e raise BackendUnavailableError() from e
@ -265,26 +274,10 @@ class AdminBackend:
except Exception as e: except Exception as e:
raise BackendUnavailableError() from e raise BackendUnavailableError() from e
async def _get_secrets(self) -> list[Secret]: async def get_secrets(self) -> list[DetailedSecrets]:
"""Get secrets.
This fetches the secret to client mapping from backend, and adds secrets from the password manager.
"""
with self.password_manager() as password_manager:
all_secrets = password_manager.get_available_secrets()
secrets = await self.backend.get_secrets()
backend_secret_names = [secret.name for secret in secrets]
for secret in all_secrets:
if secret not in backend_secret_names:
secrets.append(Secret(name=secret, clients=[]))
return secrets
async def get_secrets(self) -> list[Secret]:
"""Get secrets from backend.""" """Get secrets from backend."""
try: try:
return await self._get_secrets() return await self._get_detailed_secrets()
except ClientManagementError: except ClientManagementError:
raise raise
except Exception as e: except Exception as e:
@ -298,7 +291,7 @@ class AdminBackend:
with self.password_manager() as password_manager: with self.password_manager() as password_manager:
all_secrets = password_manager.get_available_secrets() all_secrets = password_manager.get_available_secrets()
secrets = await self.backend.get_detailed_secrets() secrets = await self.backend.get_secrets()
backend_secret_names = [secret.name for secret in secrets] backend_secret_names = [secret.name for secret in secrets]
for secret in all_secrets: for secret in all_secrets:
if secret not in backend_secret_names: if secret not in backend_secret_names:
@ -368,7 +361,7 @@ class AdminBackend:
Groups are returned in a tree, unless flat is True. Groups are returned in a tree, unless flat is True.
""" """
all_secrets = await self.backend.get_detailed_secrets() all_secrets = await self.backend.get_secrets()
secrets_mapping = {secret.name: secret for secret in all_secrets} secrets_mapping = {secret.name: secret for secret in all_secrets}
with self.password_manager() as password_manager: with self.password_manager() as password_manager:
if flat: if flat:
@ -427,7 +420,7 @@ class AdminBackend:
secret_view = SecretView(name=name, secret=secret, group=secret_group) secret_view = SecretView(name=name, secret=secret, group=secret_group)
secret_mapping = await self.backend.get_secret(name) secret_mapping = await self.backend.get_secret(name)
if secret_mapping: if secret_mapping:
secret_view.clients = secret_mapping.clients secret_view.clients = [ref.name for ref in secret_mapping.clients]
return secret_view return secret_view
@ -450,7 +443,7 @@ class AdminBackend:
return return
for client in secret_mapping.clients: for client in secret_mapping.clients:
LOG.info("Deleting secret %s from client %s", name, client) LOG.info("Deleting secret %s from client %s", name, client)
await self.backend.delete_client_secret(client, name) await self.backend.delete_client_secret(("id", client.id), name)
async def _add_secret( async def _add_secret(
self, self,
@ -467,7 +460,7 @@ class AdminBackend:
if update: if update:
secret_map = await self.backend.get_secret(name) secret_map = await self.backend.get_secret(name)
if secret_map: if secret_map:
clients = secret_map.clients clients = [ref.name for ref in secret_map.clients]
if not clients: if not clients:
return return

View File

@ -128,7 +128,7 @@ class ClientOperations:
"""Delete client.""" """Delete client."""
db_client = await self._get_client(client) db_client = await self._get_client(client)
if not db_client: if not db_client:
return raise HTTPException(status_code=404, detail="Client not found.")
if db_client.is_deleted: if db_client.is_deleted:
return return
db_client.is_deleted = True db_client.is_deleted = True
@ -271,7 +271,7 @@ async def get_clients(
filter_query: ClientListParams, filter_query: ClientListParams,
) -> ClientQueryResult: ) -> ClientQueryResult:
"""Get Clients.""" """Get Clients."""
count_statement = select(func.count("*")).select_from(Client) count_statement = select(func.count("*")).select_from(Client).where(Client.is_deleted.is_not(True)).where(Client.is_active.is_not(False))
count_statement = cast( count_statement = cast(
Select[tuple[int]], Select[tuple[int]],
filter_client_statement(count_statement, filter_query, True), filter_client_statement(count_statement, filter_query, True),

View File

@ -72,7 +72,6 @@ def create_client_secrets_router(get_db_session: AsyncDBSessionDep) -> APIRouter
client_op = ClientSecretOperations(session, request, client) client_op = ClientSecretOperations(session, request, client)
return await client_op.get_client_secret(secret) return await client_op.get_client_secret(secret)
# TODO: delete_client_secret
@router.delete("/clients/{client_identifier}/secrets/{secret_identifier}") @router.delete("/clients/{client_identifier}/secrets/{secret_identifier}")
async def delete_client_secret( async def delete_client_secret(
request: Request, request: Request,

View File

@ -5,7 +5,7 @@ admin and sshd library do not need to implement the same
""" """
import logging import logging
from typing import Any, Self, override from typing import Any, Literal, Self, override
import httpx import httpx
from pydantic import TypeAdapter from pydantic import TypeAdapter
@ -21,7 +21,6 @@ from .models import (
ClientFilter, ClientFilter,
DetailedSecrets, DetailedSecrets,
Operation, Operation,
Secret,
SubSystem, SubSystem,
) )
from .exceptions import BackendValidationError, BackendConnectionError from .exceptions import BackendValidationError, BackendConnectionError
@ -29,6 +28,17 @@ from .utils import validate_public_key
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
KeyType = Literal["id", "name"]
KeySpec = str | tuple[KeyType, str]
def _key(id_or_name: KeySpec) -> str:
"""Get the correct key string."""
if isinstance(id_or_name, str):
return id_or_name
prefix, suffix = id_or_name
return f"{prefix}:{suffix}"
class ClientQueryIterator: class ClientQueryIterator:
"""Asynchronous query iterator.""" """Asynchronous query iterator."""
@ -313,56 +323,51 @@ class SshecretBackend(BaseBackend):
return clients return clients
async def get_client(self, name: str) -> Client | None: async def get_client(self, id_or_name: KeySpec) -> Client | None:
"""Lookup a client on username.""" """Lookup a client on username."""
path = f"/api/v1/clients/{name}" key = _key(id_or_name)
path = f"/api/v1/clients/{key}"
response = await self._get(path) response = await self._get(path)
if response.status_code == 404: if response.status_code == 404:
return None return None
client = Client.model_validate(response.json()) client = Client.model_validate(response.json())
return client return client
async def get_client_by_id(self, id: str) -> Client | None:
"""Lookup a client on username."""
path = f"/api/v1/clients/id/{id}"
response = await self._get(path)
if response.status_code == 404:
return None
client = Client.model_validate(response.json())
return client
async def delete_client(self, client_name: str) -> None: async def delete_client(self, id_or_name: str) -> None:
"""Delete a client.""" """Delete a client."""
path = f"/api/v1/clients/{client_name}" key = _key(id_or_name)
response = await self._delete(path) path = f"/api/v1/clients/{key}"
async def delete_client_by_id(self, id: str) -> None:
"""Delete a client."""
path = f"/api/v1/clients/id/{id}"
response = await self._delete(path) response = await self._delete(path)
LOG.debug("response: %s", response.text)
async def create_client_secret( async def create_client_secret(
self, client_name: str, secret_name: str, encrypted_secret: str self, client_idname: KeySpec, secret_name: str, encrypted_secret: str
) -> None: ) -> None:
"""Create a secret. """Create a secret.
This will overwrite any existing secret with that name. This will overwrite any existing secret with that name.
""" """
path = f"api/v1/clients/{client_name}/secrets/{secret_name}" client_key = _key(client_idname)
path = f"api/v1/clients/{client_key}/secrets/{secret_name}"
response = await self._put(path, json={"value": encrypted_secret}) response = await self._put(path, json={"value": encrypted_secret})
async def get_client_secret(self, name: str, secret_name: str) -> str | None: async def get_client_secret(self, client_idname: KeySpec, secret_idname: KeySpec) -> str | None:
"""Fetch a secret.""" """Fetch a secret."""
path = f"/api/v1/clients/{name}/secrets/{secret_name}" client_key = _key(client_idname)
secret_key = _key(secret_idname)
path = f"/api/v1/clients/{client_key}/secrets/{secret_key}"
response = await self._get(path) response = await self._get(path)
if response.status_code == 404: if response.status_code == 404:
return None return None
secret = ClientSecret.model_validate(response.json()) secret = ClientSecret.model_validate(response.json())
return secret.secret return secret.secret
async def delete_client_secret(self, client_name: str, secret_name: str) -> None: async def delete_client_secret(self, client_idname: KeySpec, secret_idname: KeySpec) -> None:
"""Delete a secret from a client.""" """Delete a secret from a client."""
path = f"api/v1/clients/{client_name}/secrets/{secret_name}" client_key = _key(client_idname)
secret_key = _key(secret_idname)
path = f"api/v1/clients/{client_key}/secrets/{secret_key}"
await self._delete(path) await self._delete(path)
async def update_client(self, client: Client) -> Client: async def update_client(self, client: Client) -> Client:
@ -382,13 +387,14 @@ class SshecretBackend(BaseBackend):
) )
return client return client
async def update_client_key(self, client_name: str, public_key: str) -> None: async def update_client_key(self, client_idname: KeySpec, public_key: str) -> None:
"""Update the client key.""" """Update the client key."""
path = f"/api/v1/clients/{client_name}/public-key" client_key = _key(client_idname)
path = f"/api/v1/clients/{client_key}/public-key"
await self._post(path, json={"public_key": public_key}) await self._post(path, json={"public_key": public_key})
async def update_client_sources( async def update_client_sources(
self, client_name: str, addresses: list[str] | None self, client_idname: KeySpec, addresses: list[str] | None
) -> None: ) -> None:
"""Update client source addresses. """Update client source addresses.
@ -397,46 +403,29 @@ class SshecretBackend(BaseBackend):
if not addresses: if not addresses:
addresses = [] addresses = []
path = f"/api/v1/clients/{client_name}/policies/" client_key = _key(client_idname)
path = f"/api/v1/clients/{client_key}/policies/"
await self._put(path, json={"sources": addresses}) await self._put(path, json={"sources": addresses})
async def get_detailed_secrets(self) -> list[DetailedSecrets]: async def get_secrets(self) -> list[DetailedSecrets]:
"""Get detailed list of secrets.""" """Get detailed list of secrets."""
path = "/api/v1/secrets/detailed/" path = "/api/v1/secrets/"
response = await self._get(path) response = await self._get(path)
secret_list = TypeAdapter(list[DetailedSecrets]) secret_list = TypeAdapter(list[DetailedSecrets])
return secret_list.validate_python(response.json()) return secret_list.validate_python(response.json())
async def get_secrets(self) -> list[Secret]: async def get_secret(self, idname: KeySpec) -> DetailedSecrets | None:
"""Get Secrets.
This provides a list of secret names and which clients have them.
"""
path = "/api/v1/secrets/"
response = await self._get(path)
secret_list = TypeAdapter(list[Secret])
return secret_list.validate_python(response.json())
async def get_secret(self, name: str) -> Secret | None:
"""Get clients mapped to a single secret.""" """Get clients mapped to a single secret."""
path = f"/api/v1/secrets/{name}" secret_key = _key(idname)
response = await self._get(path) path = f"/api/v1/secrets/{secret_key}"
if response.status_code == 404:
return None
return Secret.model_validate(response.json())
async def get_detailed_secret(self, name: str) -> DetailedSecrets | None:
"""Get clients mapped to a single secret."""
path = f"/api/v1/secrets/{name}/detailed"
response = await self._get(path) response = await self._get(path)
if response.status_code == 404: if response.status_code == 404:
return None return None
return DetailedSecrets.model_validate(response.json()) return DetailedSecrets.model_validate(response.json())
def audit(self, subsystem: SubSystem) -> AuditAPI: def audit(self, subsystem: SubSystem) -> AuditAPI:
"""Create the audit API.""" """Create the audit API."""
audit = AuditAPI(self._backend_url, self._api_token, subsystem) audit = AuditAPI(self._backend_url, self._api_token, subsystem)

View File

@ -51,7 +51,7 @@ async def test_create_secret(backend_api: SshecretBackend) -> None:
assert secret_to_client is not None assert secret_to_client is not None
assert secret_to_client.name == "mysecret" assert secret_to_client.name == "mysecret"
assert "test" in secret_to_client.clients assert secret_to_client.clients[0].name == "test"
secret = await backend_api.get_client_secret("test", "mysecret") secret = await backend_api.get_client_secret("test", "mysecret")

View File

@ -119,6 +119,9 @@ def test_delete_client(test_client: TestClient) -> None:
resp = test_client.get("/api/v1/clients/test") resp = test_client.get("/api/v1/clients/test")
assert resp.status_code == 404 assert resp.status_code == 404
resp = test_client.get("/api/v1/clients/")
assert resp.status_code == 200
def test_add_secret(test_client: TestClient) -> None: def test_add_secret(test_client: TestClient) -> None:
"""Test adding a secret to a client.""" """Test adding a secret to a client."""