217 lines
7.2 KiB
Python
217 lines
7.2 KiB
Python
"""Secrets related endpoints factory."""
|
|
|
|
# pyright: reportUnusedFunction=false
|
|
import logging
|
|
from typing import Annotated
|
|
from fastapi import APIRouter, Depends, HTTPException, Query, Security, status
|
|
|
|
from sshecret_admin.core.dependencies import AdminDependencies
|
|
from sshecret_admin.services import AdminBackend
|
|
from sshecret_admin.services.models import (
|
|
ClientSecretGroup,
|
|
ClientSecretGroupList,
|
|
GroupPath,
|
|
SecretCreate,
|
|
SecretGroupAssign,
|
|
SecretGroupCreate,
|
|
SecretGroupUdate,
|
|
SecretListView,
|
|
SecretUpdate,
|
|
SecretView,
|
|
)
|
|
from sshecret_admin.services.secret_manager import (
|
|
InvalidGroupNameError,
|
|
InvalidSecretNameError,
|
|
)
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
|
|
def create_router(dependencies: AdminDependencies) -> APIRouter:
|
|
"""Create secrets router."""
|
|
app = APIRouter(dependencies=[Security(dependencies.get_current_active_user)])
|
|
|
|
@app.get("/secrets/")
|
|
async def get_secret_names(
|
|
admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
|
|
) -> list[SecretListView]:
|
|
"""Get Secret Names."""
|
|
return await admin.get_secrets()
|
|
|
|
@app.post("/secrets/")
|
|
async def add_secret(
|
|
secret: SecretCreate,
|
|
admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
|
|
) -> None:
|
|
"""Create a secret."""
|
|
await admin.add_secret(
|
|
name=secret.name,
|
|
value=secret.get_secret(),
|
|
clients=secret.clients,
|
|
group=secret.group,
|
|
)
|
|
|
|
@app.get("/secrets/{name}")
|
|
async def get_secret(
|
|
name: str,
|
|
admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
|
|
) -> SecretView:
|
|
"""Get a secret."""
|
|
secret_view = await admin.get_secret(name)
|
|
|
|
if not secret_view:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND, detail="Item not found."
|
|
)
|
|
return secret_view
|
|
|
|
@app.put("/secrets/{name}")
|
|
async def update_secret(
|
|
name: str,
|
|
value: SecretUpdate,
|
|
admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
|
|
) -> None:
|
|
new_value = value.get_secret()
|
|
await admin.update_secret(name, new_value)
|
|
|
|
@app.delete("/secrets/{name}")
|
|
async def delete_secret(
|
|
name: str,
|
|
admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
|
|
) -> None:
|
|
"""Delete secret."""
|
|
await admin.delete_secret(name)
|
|
|
|
@app.get("/secrets/groups/")
|
|
async def get_secret_groups(
|
|
admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
|
|
filter_regex: Annotated[str | None, Query()] = None,
|
|
) -> ClientSecretGroupList:
|
|
"""Get secret groups."""
|
|
result = await admin.get_secret_groups(filter_regex)
|
|
return result
|
|
|
|
@app.get("/secrets/groups/{group_path:path}/")
|
|
async def get_secret_group(
|
|
group_path: str,
|
|
admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
|
|
) -> ClientSecretGroup:
|
|
"""Get a specific secret group."""
|
|
results = await admin.get_secret_group_by_path(group_path)
|
|
if not results:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND, detail="No such group."
|
|
)
|
|
return results
|
|
|
|
@app.put("/secrets/groups/{group_path:path}/")
|
|
async def update_secret_group(
|
|
group_path: str,
|
|
group: SecretGroupUdate,
|
|
admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
|
|
) -> ClientSecretGroup:
|
|
"""Update a secret group."""
|
|
existing_group = await admin.lookup_secret_group(group_path)
|
|
if not existing_group:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND, detail="No such group."
|
|
)
|
|
|
|
params: dict[str, str] = {}
|
|
if name := group.name:
|
|
params["name"] = name
|
|
|
|
if description := group.description:
|
|
params["description"] = description
|
|
|
|
if parent := group.parent_group:
|
|
params["parent"] = parent
|
|
|
|
new_group = await admin.update_secret_group(
|
|
group_path,
|
|
**params,
|
|
)
|
|
return new_group
|
|
|
|
@app.post("/secrets/groups/")
|
|
async def add_secret_group(
|
|
group: SecretGroupCreate,
|
|
admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
|
|
) -> ClientSecretGroup:
|
|
"""Create a secret grouping."""
|
|
await admin.add_secret_group(
|
|
group_name=group.name,
|
|
description=group.description,
|
|
parent_group=group.parent_group,
|
|
)
|
|
|
|
result = await admin.lookup_secret_group(group.name)
|
|
if not result:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST, detail="Group creation failed"
|
|
)
|
|
return result
|
|
|
|
@app.delete("/secrets/groups/{group_path:path}/")
|
|
async def delete_secret_group(
|
|
group_path: str,
|
|
admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
|
|
) -> None:
|
|
"""Remove a group.
|
|
|
|
Entries within the group will be moved to the root.
|
|
This also includes nested entries further down from the group.
|
|
"""
|
|
group = await admin.get_secret_group_by_path(group_path)
|
|
if not group:
|
|
return
|
|
await admin.delete_secret_group(group_path)
|
|
|
|
@app.post("/secrets/set-group")
|
|
async def assign_secret_group(
|
|
assignment: SecretGroupAssign,
|
|
admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
|
|
) -> None:
|
|
"""Assign a secret to a group or root."""
|
|
try:
|
|
await admin.set_secret_group(assignment.secret_name, assignment.group_path)
|
|
except InvalidSecretNameError:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND, detail="Secret not fount"
|
|
)
|
|
except InvalidGroupNameError:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND, detail="Invalid group name"
|
|
)
|
|
|
|
@app.post("/secrets/move-group/{group_name:path}")
|
|
async def move_group(
|
|
group_name: str,
|
|
destination: GroupPath,
|
|
admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
|
|
) -> None:
|
|
"""Move a group."""
|
|
group = await admin.lookup_secret_group(group_name)
|
|
if not group:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"No such group {group_name}",
|
|
)
|
|
parent_path: str | None = destination.path
|
|
if destination.path == "/" or not destination.path:
|
|
# / means root
|
|
parent_path = None
|
|
|
|
LOG.debug("Moving group %s to %r", group_name, parent_path)
|
|
|
|
if parent_path:
|
|
parent_group = await admin.get_secret_group_by_path(destination.path)
|
|
if not parent_group:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"No such group {parent_path}",
|
|
)
|
|
await admin.move_secret_group(group_name, parent_path)
|
|
|
|
return app
|