diff --git a/packages/sshecret-admin/src/sshecret_admin/api/endpoints/audit.py b/packages/sshecret-admin/src/sshecret_admin/api/endpoints/audit.py new file mode 100644 index 0000000..746a821 --- /dev/null +++ b/packages/sshecret-admin/src/sshecret_admin/api/endpoints/audit.py @@ -0,0 +1,31 @@ +"""Audit API.""" + +# pyright: reportUnusedFunction=false + +import logging +from typing import Annotated +from fastapi import APIRouter, Depends, Query, Security + +from sshecret_admin.core.dependencies import AdminDependencies +from sshecret_admin.services import AdminBackend +from sshecret_admin.services.models import AuditQueryFilter + +from sshecret.backend.models import AuditInfo, AuditListResult + +LOG = logging.getLogger(__name__) + +def create_router(dependencies: AdminDependencies) -> APIRouter: + """Create audit log API.""" + + app = APIRouter(dependencies=[Security(dependencies.get_current_active_user)]) + + @app.get("/audit/") + async def get_audit_log( + query_filter: Annotated[AuditQueryFilter, Query()], + admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], + ) -> AuditListResult: + """Query audit log.""" + params = query_filter.model_dump(exclude_none=True, exclude_defaults=True) + return await admin.get_audit_log_detailed(**params) + + return app diff --git a/packages/sshecret-admin/src/sshecret_admin/api/endpoints/secrets.py b/packages/sshecret-admin/src/sshecret_admin/api/endpoints/secrets.py index 702fe7f..919e76c 100644 --- a/packages/sshecret-admin/src/sshecret_admin/api/endpoints/secrets.py +++ b/packages/sshecret-admin/src/sshecret_admin/api/endpoints/secrets.py @@ -49,6 +49,7 @@ def create_router(dependencies: AdminDependencies) -> APIRouter: value=secret.get_secret(), clients=secret.clients, group=secret.group, + distinguisher=secret.client_distinguisher, ) @app.get("/secrets/{name}") @@ -86,9 +87,10 @@ def create_router(dependencies: AdminDependencies) -> APIRouter: async def get_secret_groups( admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], filter_regex: Annotated[str | None, Query()] = None, + flat: bool = False, ) -> ClientSecretGroupList: """Get secret groups.""" - result = await admin.get_secret_groups(filter_regex) + result = await admin.get_secret_groups(filter_regex, flat=flat) return result @app.get("/secrets/groups/{group_path:path}/") @@ -152,6 +154,19 @@ def create_router(dependencies: AdminDependencies) -> APIRouter: ) return result + @app.delete("/secrets/group/{id}") + async def delete_group_id( + id: str, + admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], + ) -> None: + """Remove a group by ID.""" + try: + await admin.delete_secret_group_by_id(id) + except InvalidGroupNameError: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, detail="Group ID not found" + ) + @app.delete("/secrets/groups/{group_path:path}/") async def delete_secret_group( group_path: str, diff --git a/packages/sshecret-admin/src/sshecret_admin/api/router.py b/packages/sshecret-admin/src/sshecret_admin/api/router.py index 4a115b1..f724f0e 100644 --- a/packages/sshecret-admin/src/sshecret_admin/api/router.py +++ b/packages/sshecret-admin/src/sshecret_admin/api/router.py @@ -17,7 +17,7 @@ from sshecret_admin.core.dependencies import BaseDependencies, AdminDependencies from sshecret_admin.auth import User, decode_token from sshecret_admin.auth.constants import LOCAL_ISSUER -from .endpoints import auth, clients, secrets +from .endpoints import audit, auth, clients, secrets LOG = logging.getLogger(__name__) @@ -112,6 +112,7 @@ def create_router(dependencies: BaseDependencies) -> APIRouter: LOG.debug("Registering sub-routers") + app.include_router(audit.create_router(endpoint_deps)) app.include_router(auth.create_router(endpoint_deps)) app.include_router(clients.create_router(endpoint_deps)) app.include_router(secrets.create_router(endpoint_deps)) diff --git a/packages/sshecret-admin/src/sshecret_admin/services/admin_backend.py b/packages/sshecret-admin/src/sshecret_admin/services/admin_backend.py index acab31f..0da22f2 100644 --- a/packages/sshecret-admin/src/sshecret_admin/services/admin_backend.py +++ b/packages/sshecret-admin/src/sshecret_admin/services/admin_backend.py @@ -6,7 +6,7 @@ Since we have a frontend and a REST API, it makes sense to have a generic librar import logging from collections.abc import AsyncIterator from contextlib import asynccontextmanager -from typing import Unpack +from typing import Literal, Unpack from sshecret.backend import ( AuditLog, @@ -68,6 +68,7 @@ def add_clients_to_secret_group( if parent: parent_ref = parent.reference() client_secret_group = ClientSecretGroup( + id=group.id, group_name=group.name, path=group.path, description=group.description, @@ -397,13 +398,15 @@ class AdminBackend: await password_manager.set_group_description(group_name, description) async def delete_secret_group(self, group_path: str) -> None: - """Delete a group. - - If keep_entries is set to False, all entries in the group will be deleted. - """ + """Delete a group.""" async with self.secrets_manager() as password_manager: await password_manager.delete_group(group_path) + async def delete_secret_group_by_id(self, id: str) -> None: + """Delete a secret group by ID.""" + async with self.secrets_manager() as password_manager: + await password_manager.delete_group_id(id) + async def get_secret_groups( self, group_filter: str | None = None, @@ -562,6 +565,7 @@ class AdminBackend: clients: list[str] | None, update: bool = False, group: str | None = None, + distinguisher: Literal["name", "id"] = "name", ) -> None: """Add a secret.""" async with self.secrets_manager() as password_manager: @@ -575,7 +579,11 @@ class AdminBackend: if not clients: return for client_name in clients: - client = await self.get_client(client_name) + client_id = client_name + if distinguisher == "id": + client_id = ("id", client_name) + + client = await self.get_client(client_id) if not client: if update: raise ClientNotFoundError() @@ -583,8 +591,8 @@ class AdminBackend: continue public_key = load_public_key(client.public_key.encode()) encrypted = encrypt_string(value, public_key) - LOG.info("Wrote encrypted secret for client %s", client_name) - await self.backend.create_client_secret(client_name, name, encrypted) + LOG.info("Wrote encrypted secret for client %r", client_id) + await self.backend.create_client_secret(client_id, name, encrypted) async def add_secret( self, @@ -592,10 +600,17 @@ class AdminBackend: value: str, clients: list[str] | None = None, group: str | None = None, + distinguisher: Literal["name", "id"] = "name", ) -> None: """Add a secret.""" try: - await self._add_secret(name=name, value=value, clients=clients, group=group) + await self._add_secret( + name=name, + value=value, + clients=clients, + group=group, + distinguisher=distinguisher, + ) except ClientManagementError: raise except Exception as e: diff --git a/packages/sshecret-admin/src/sshecret_admin/services/models.py b/packages/sshecret-admin/src/sshecret_admin/services/models.py index 05256e2..9329f3e 100644 --- a/packages/sshecret-admin/src/sshecret_admin/services/models.py +++ b/packages/sshecret-admin/src/sshecret_admin/services/models.py @@ -10,10 +10,11 @@ from pydantic import ( Field, IPvAnyAddress, IPvAnyNetwork, + field_validator, model_validator, ) from sshecret.crypto import validate_public_key -from sshecret.backend.models import ClientReference +from sshecret.backend.models import AuditFilter, ClientReference def public_key_validator(value: str) -> str: @@ -104,6 +105,7 @@ class SecretCreate(SecretUpdate): clients: list[str] | None = Field( default=None, description="Assign the secret to a list of clients." ) + client_distinguisher: Literal["id", "name"] = "name" group: str | None = None model_config: ConfigDict = ConfigDict( @@ -128,6 +130,7 @@ class SecretCreate(SecretUpdate): class SecretGroup(BaseModel): """A secret group.""" + id: uuid.UUID name: str path: str description: str | None = None @@ -158,6 +161,7 @@ class GroupReference(BaseModel): class ClientSecretGroup(BaseModel): """Client secrets grouped.""" + id: uuid.UUID group_name: str path: str description: str | None = None @@ -173,7 +177,7 @@ class ClientSecretGroup(BaseModel): class SecretGroupCreate(BaseModel): """Create model for creating secret groups.""" - name: str + name: str = Field(min_length=1) # blank group names are a pain! description: str | None = None parent_group: str | None = None @@ -185,6 +189,17 @@ class SecretGroupUdate(BaseModel): description: str | None = None parent_group: str | None = None + @field_validator("name") + @classmethod + def validate_name(cls, value: str | None) -> str | None: + """Validate name.""" + if not value: + return None + if "/" in value: + raise ValueError("Name cannot be a path") + + return value + class ClientSecretGroupList(BaseModel): """Secret group list.""" @@ -235,3 +250,10 @@ class GroupPath(BaseModel): """Path to a group.""" path: str = Field(pattern="^/.*") + + +class AuditQueryFilter(AuditFilter): + """Audit query filter.""" + + offset: int = 0 + limit: int = 100 diff --git a/packages/sshecret-admin/src/sshecret_admin/services/secret_manager.py b/packages/sshecret-admin/src/sshecret_admin/services/secret_manager.py index b56daa6..6c73803 100644 --- a/packages/sshecret-admin/src/sshecret_admin/services/secret_manager.py +++ b/packages/sshecret-admin/src/sshecret_admin/services/secret_manager.py @@ -215,7 +215,7 @@ class AsyncSecretContext: path = os.path.join(path, group.name) secret_group = SecretGroup( - name=group.name, path=path, description=group.description + id=group.id, name=group.name, path=path, description=group.description ) group_secrets = await self._get_group_secrets(group) for secret in group_secrets: @@ -715,6 +715,17 @@ class AsyncSecretContext: # We don't audit-log this operation currently, even though it indirectly # may affect secrets. + async def delete_group_id(self, id: str | uuid.UUID) -> None: + """Delete a group by ID.""" + if isinstance(id, str): + id = uuid.UUID(id) + group = await self._get_group_by_id(id) + if not group: + raise InvalidGroupNameError("Invalid or non-existing group ID.") + await self.session.delete(group) + + await self.session.commit() + async def _export_entries(self) -> list[SecretDataEntryExport]: """Export entries as a pydantic object.""" statement = ( diff --git a/src/sshecret/backend/identifiers.py b/src/sshecret/backend/identifiers.py new file mode 100644 index 0000000..3a4bc6b --- /dev/null +++ b/src/sshecret/backend/identifiers.py @@ -0,0 +1,66 @@ +"""Identifier handling.""" + +import enum +from typing import cast, Literal, Self +import uuid +from typing import Annotated +from fastapi import Path + + +from pydantic import BaseModel + + +KeyType = Literal["id", "name"] +KeySpec = str | tuple[KeyType, str] + +RelaxedId = uuid.UUID | str + + +class IdType(enum.Enum): + """Id type.""" + + ID = "id" + NAME = "name" + + +class FlexID(BaseModel): + """Flexible identifier.""" + + type: IdType + value: RelaxedId + + @classmethod + def id(cls, id: RelaxedId) -> Self: + """Construct from ID.""" + return cls(type=IdType.ID, value=id) + + @classmethod + def name(cls, name: str) -> Self: + """Construct from name.""" + return cls(type=IdType.NAME, value=name) + + @classmethod + def from_string(cls, value: str) -> Self: + """Convert from path string.""" + if value.startswith("id:"): + return cls.id(value[3:]) + elif value.startswith("name:"): + return cls.name(value[5:]) + return cls.name(value) + + @property + def keyspec(self) -> KeySpec: + """Convert to keyspec.""" + keytype: KeyType = self.type.value + keyspec = (keytype, str(self.value)) + return cast(KeySpec, keyspec) + + +ClientIdParam = Annotated[ + str, + Path( + title="Client identifier", + description="Identifier of path, may include the prefix 'id:' or 'name:'", + examples=["name:myclient", "id:8eab15a4-a8eb-4d1b-b47f-2283b9f6f6b0"], + ), +] diff --git a/tests/integration/admin/test_admin_api.py b/tests/integration/admin/test_admin_api.py index f20469d..1ffd400 100644 --- a/tests/integration/admin/test_admin_api.py +++ b/tests/integration/admin/test_admin_api.py @@ -111,6 +111,7 @@ class TestAdminApiSecrets(BaseAdminTests): async def test_add_secret(self, admin_server: AdminServer) -> None: """Test add_secret.""" await self.create_client(admin_server, name="testclient") + async with self.http_client(admin_server) as http_client: data = { "name": "testsecret", @@ -160,6 +161,31 @@ class TestAdminApiSecrets(BaseAdminTests): assert len(data["secret"]) == 17 assert "testclient" in [cl["name"] for cl in data["clients"]] + @allure.title("Test adding a secret with client ID") + @allure.description("Ensure that we can refer to clients with their IDs") + @pytest.mark.asyncio + async def test_add_secret_with_clientid(self, admin_server: AdminServer) -> None: + """Test add secret with client ID as distinguisher.""" + client = await self.create_client(admin_server, name="testclient") + + async with self.http_client(admin_server) as http_client: + data = { + "name": "testsecret", + "clients": [str(client.id)], + "value": "secret", + "client_distinguisher": "id", + } + + resp = await http_client.post("api/v1/secrets/", json=data) + assert resp.status_code == 200 + + async with self.http_client(admin_server) as http_client: + resp = await http_client.get("api/v1/secrets/testsecret") + assert resp.status_code == 200 + data = resp.json() + client_names = [cl["name"] for cl in data["clients"]] + assert "testclient" in client_names + @allure.title("Test updating a secret") @allure.description("Test that we can update the value of a stored secret.") @pytest.mark.asyncio