From 773a1e2976af2932a89a0633686e6c6a4129a59e Mon Sep 17 00:00:00 2001 From: Allan Eising Date: Sat, 31 May 2025 14:13:49 +0200 Subject: [PATCH] Integrate group in admin rest API --- .../sshecret_admin/api/endpoints/secrets.py | 142 +++++++++++++++++- .../src/sshecret_admin/api/router.py | 2 +- .../templates/dashboard/_scripts.html | 1 + .../templates/dashboard/_stylesheet.html | 11 ++ .../sshecret_admin/services/admin_backend.py | 119 ++++++++++++++- .../src/sshecret_admin/services/keepass.py | 26 +++- .../src/sshecret_admin/services/models.py | 31 ++++ .../tests/test_password_context.py | 35 +++++ 8 files changed, 352 insertions(+), 15 deletions(-) diff --git a/packages/sshecret-admin/src/sshecret_admin/api/endpoints/secrets.py b/packages/sshecret-admin/src/sshecret_admin/api/endpoints/secrets.py index cfa3f73..42aaf39 100644 --- a/packages/sshecret-admin/src/sshecret_admin/api/endpoints/secrets.py +++ b/packages/sshecret-admin/src/sshecret_admin/api/endpoints/secrets.py @@ -3,13 +3,15 @@ # pyright: reportUnusedFunction=false import logging from typing import Annotated -from fastapi import APIRouter, Depends, HTTPException, status +from fastapi import APIRouter, Depends, HTTPException, Query, Security, status from sshecret.backend.models import Secret from sshecret_admin.core.dependencies import AdminDependencies from sshecret_admin.services import AdminBackend from sshecret_admin.services.models import ( + ClientSecretGroup, SecretCreate, + SecretGroupCreate, SecretUpdate, SecretView, ) @@ -19,7 +21,7 @@ LOG = logging.getLogger(__name__) def create_router(dependencies: AdminDependencies) -> APIRouter: """Create secrets router.""" - app = APIRouter(dependencies=[Depends(dependencies.get_current_active_user)]) + app = APIRouter(dependencies=[Security(dependencies.get_current_active_user)]) @app.get("/secrets/") async def get_secret_names( @@ -34,7 +36,12 @@ def create_router(dependencies: AdminDependencies) -> APIRouter: admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], ) -> None: """Create a secret.""" - await admin.add_secret(secret.name, secret.get_secret(), secret.clients) + await admin.add_secret( + name=secret.name, + value=secret.get_secret(), + clients=secret.clients, + group=secret.group, + ) @app.get("/secrets/{name}") async def get_secret( @@ -67,4 +74,133 @@ def create_router(dependencies: AdminDependencies) -> APIRouter: """Delete secret.""" await admin.delete_secret(name) + @app.get("/secrets/groups/") + async def get_secret_groups( + admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], + filter_regex: Annotated[str | None, Query()] = None, + ) -> list[ClientSecretGroup]: + """Get secret groups.""" + return await admin.get_secret_groups(filter_regex) + + @app.get("/secrets/groups/{group_name}/") + async def get_secret_group( + group_name: str, + admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], + ) -> ClientSecretGroup: + """Get a specific secret group.""" + results = await admin.get_secret_groups(group_name, False) + if not results: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, detail="No such group." + ) + return results[0] + + @app.post("/secrets/groups/") + async def add_secret_group( + group: SecretGroupCreate, + admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], + ) -> ClientSecretGroup: + """Create a secret grouping.""" + await admin.add_secret_group( + group_name=group.name, + description=group.description, + parent_group=group.parent_group, + ) + + result = await admin.get_secret_group(group.name) + if not result: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, detail="Group creation failed" + ) + return result + + @app.delete("/secrets/groups/{group_name}/") + async def delete_secret_group( + group_name: str, + admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], + ) -> None: + """Remove a group. + + Entries within the group will be moved to the root. + This also includes nested entries further down from the group. + """ + group = await admin.get_secret_group(group.name) + if not group: + return + await admin.delete_secret_group(group_name, keep_entries=True) + + @app.post("/secrets/groups/{group_name}/{secret_name}") + async def move_secret_to_group( + group_name: str, + secret_name: str, + admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], + ) -> None: + """Move a secret to a group.""" + groups = await admin.get_secret_groups(group_name, False) + if not groups: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, detail="No such group." + ) + + await admin.set_secret_group(secret_name, group_name) + + @app.post("/secrets/group/{group_name}/parent/{parent_name}") + async def move_group( + group_name: str, + parent_name: str, + admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], + ) -> None: + """Move a group.""" + group = await admin.get_secret_group(group_name) + if not group: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"No such group {group_name}", + ) + parent_group = await admin.get_secret_group(parent_name) + if not parent_group: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"No such group {parent_name}", + ) + await admin.move_secret_group(group_name, parent_name) + + @app.delete("/secrets/group/{group_name}/parent/") + async def move_group_to_root( + group_name: str, + admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], + ) -> None: + """Move a group to the root.""" + group = await admin.get_secret_group(group_name) + if not group: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"No such group {group_name}", + ) + + await admin.move_secret_group(group_name, None) + + @app.delete("/secrets/groups/{group_name}/{secret_name}") + async def remove_secret_from_group( + group_name: str, + secret_name: str, + admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], + ) -> None: + """Remove a secret from a group. + + Secret will be moved to the root group. + """ + groups = await admin.get_secret_groups(group_name, False) + if not groups: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, detail="No such group." + ) + group = groups[0] + matching_entries = [ + entry for entry in group.entries if entry.name == secret_name + ] + if not matching_entries: + return + await admin.set_secret_group(secret_name, None) + return app diff --git a/packages/sshecret-admin/src/sshecret_admin/api/router.py b/packages/sshecret-admin/src/sshecret_admin/api/router.py index 19788db..13a9dd4 100644 --- a/packages/sshecret-admin/src/sshecret_admin/api/router.py +++ b/packages/sshecret-admin/src/sshecret_admin/api/router.py @@ -26,7 +26,7 @@ API_VERSION = "v1" def create_router(dependencies: BaseDependencies) -> APIRouter: """Create clients router.""" - oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") + oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/token") async def get_current_user( token: Annotated[str, Depends(oauth2_scheme)], diff --git a/packages/sshecret-admin/src/sshecret_admin/frontend/templates/dashboard/_scripts.html b/packages/sshecret-admin/src/sshecret_admin/frontend/templates/dashboard/_scripts.html index 892381c..fa42842 100644 --- a/packages/sshecret-admin/src/sshecret_admin/frontend/templates/dashboard/_scripts.html +++ b/packages/sshecret-admin/src/sshecret_admin/frontend/templates/dashboard/_scripts.html @@ -3,6 +3,7 @@ +