"""Secrets related endpoints factory.""" # pyright: reportUnusedFunction=false import logging from typing import Annotated from fastapi import APIRouter, Depends, HTTPException, Query, Security, status from sshecret_admin.core.dependencies import AdminDependencies from sshecret_admin.services import AdminBackend from sshecret_admin.services.models import ( ClientSecretGroup, ClientSecretGroupList, GroupPath, SecretCreate, SecretGroupAssign, SecretGroupCreate, SecretGroupUdate, SecretListView, SecretUpdate, SecretView, ) from sshecret_admin.services.secret_manager import ( InvalidGroupNameError, InvalidSecretNameError, ) LOG = logging.getLogger(__name__) def create_router(dependencies: AdminDependencies) -> APIRouter: """Create secrets router.""" app = APIRouter(dependencies=[Security(dependencies.get_current_active_user)]) @app.get("/secrets/") async def get_secret_names( admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], ) -> list[SecretListView]: """Get Secret Names.""" return await admin.get_secrets() @app.post("/secrets/") async def add_secret( secret: SecretCreate, admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], ) -> None: """Create a secret.""" await admin.add_secret( name=secret.name, value=secret.get_secret(), clients=secret.clients, group=secret.group, distinguisher=secret.client_distinguisher, ) @app.get("/secrets/{name}") async def get_secret( name: str, admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], ) -> SecretView: """Get a secret.""" secret_view = await admin.get_secret(name) if not secret_view: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Item not found." ) return secret_view @app.put("/secrets/{name}") async def update_secret( name: str, value: SecretUpdate, admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], ) -> None: new_value = value.get_secret() await admin.update_secret(name, new_value) @app.delete("/secrets/{name}") async def delete_secret( name: str, admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], ) -> None: """Delete secret.""" await admin.delete_secret(name) @app.get("/secrets/groups/") async def get_secret_groups( admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], filter_regex: Annotated[str | None, Query()] = None, flat: bool = False, ) -> ClientSecretGroupList: """Get secret groups.""" result = await admin.get_secret_groups(filter_regex, flat=flat) return result @app.get("/secrets/groups/{group_path:path}/") async def get_secret_group( group_path: str, admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], ) -> ClientSecretGroup: """Get a specific secret group.""" results = await admin.get_secret_group_by_path(group_path) if not results: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="No such group." ) return results @app.put("/secrets/groups/{group_path:path}/") async def update_secret_group( group_path: str, group: SecretGroupUdate, admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], ) -> ClientSecretGroup: """Update a secret group.""" existing_group = await admin.lookup_secret_group(group_path) if not existing_group: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="No such group." ) params: dict[str, str] = {} if name := group.name: params["name"] = name if description := group.description: params["description"] = description if parent := group.parent_group: params["parent"] = parent new_group = await admin.update_secret_group( group_path, **params, ) return new_group @app.post("/secrets/groups/") async def add_secret_group( group: SecretGroupCreate, admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], ) -> ClientSecretGroup: """Create a secret grouping.""" await admin.add_secret_group( group_name=group.name, description=group.description, parent_group=group.parent_group, ) result = await admin.lookup_secret_group(group.name) if not result: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Group creation failed" ) return result @app.delete("/secrets/group/{id}") async def delete_group_id( id: str, admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], ) -> None: """Remove a group by ID.""" try: await admin.delete_secret_group_by_id(id) except InvalidGroupNameError: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Group ID not found" ) @app.delete("/secrets/groups/{group_path:path}/") async def delete_secret_group( group_path: str, admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], ) -> None: """Remove a group. Entries within the group will be moved to the root. This also includes nested entries further down from the group. """ group = await admin.get_secret_group_by_path(group_path) if not group: return await admin.delete_secret_group(group_path) @app.post("/secrets/set-group") async def assign_secret_group( assignment: SecretGroupAssign, admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], ) -> None: """Assign a secret to a group or root.""" try: await admin.set_secret_group(assignment.secret_name, assignment.group_path) except InvalidSecretNameError: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Secret not fount" ) except InvalidGroupNameError: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Invalid group name" ) @app.post("/secrets/move-group/{group_name:path}") async def move_group( group_name: str, destination: GroupPath, admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], ) -> None: """Move a group.""" group = await admin.lookup_secret_group(group_name) if not group: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"No such group {group_name}", ) parent_path: str | None = destination.path if destination.path == "/" or not destination.path: # / means root parent_path = None LOG.debug("Moving group %s to %r", group_name, parent_path) if parent_path: parent_group = await admin.get_secret_group_by_path(destination.path) if not parent_group: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"No such group {parent_path}", ) await admin.move_secret_group(group_name, parent_path) return app