Files
sshecret/packages/sshecret-admin/src/sshecret_admin/frontend/views/secrets.py
2025-06-01 16:16:15 +02:00

504 lines
16 KiB
Python

#!/usr/bin/env python3
# pyright: reportUnusedFunction=false
import logging
import secrets as pysecrets
from typing import Annotated, Any
from fastapi import APIRouter, Depends, Form, HTTPException, Request, status
from pydantic import BaseModel, BeforeValidator, Field
from sshecret_admin.auth import LocalUserInfo
from sshecret_admin.services import AdminBackend
from sshecret_admin.services.models import SecretGroupCreate
from sshecret.backend.models import Operation
from ..dependencies import FrontendDependencies
LOG = logging.getLogger(__name__)
def split_clients(clients: Any) -> Any: # pyright: ignore[reportAny]
"""Split clients."""
if isinstance(clients, list):
return clients # pyright: ignore[reportUnknownVariableType]
if not isinstance(clients, str):
raise ValueError("Invalid type for clients.")
if not clients:
return []
return [client.rstrip() for client in clients.split(",")]
def handle_select_bool(value: Any) -> Any: # pyright: ignore[reportAny]
"""Handle boolean from select."""
if isinstance(value, bool):
return value
if value == "on":
return True
if value == "off":
return False
class CreateSecret(BaseModel):
"""Create secret model."""
name: str
value: str | None = None
auto_generate: Annotated[bool, BeforeValidator(handle_select_bool)] = False
clients: Annotated[list[str], BeforeValidator(split_clients)] = Field(
default_factory=list
)
def create_router(dependencies: FrontendDependencies) -> APIRouter:
"""Create secrets router."""
app = APIRouter(dependencies=[Depends(dependencies.require_login)])
templates = dependencies.templates
@app.get("/secrets/")
async def get_secrets_tree(
request: Request,
admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
current_user: Annotated[LocalUserInfo, Depends(dependencies.get_user_info)],
):
groups = await admin.get_secret_groups()
LOG.info("Groups: %r", groups)
return templates.TemplateResponse(
request,
"secrets/index.html.j2",
{
"groups": groups,
"user": current_user,
},
)
@app.get("/secrets/partial/root_group")
async def get_root_group(
request: Request,
admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
):
"""Get root group."""
clients = await admin.get_clients()
return templates.TemplateResponse(
request,
"secrets/partials/edit_root.html.j2",
{
"clients": clients,
},
)
@app.get("/secrets/partial/secret/{name}")
async def get_secret_tree_detail(
request: Request,
name: str,
admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
):
"""Get partial secret detail."""
secret = await admin.get_secret(name)
groups = await admin.get_secret_groups(flat=True)
events = await admin.get_audit_log_detailed(limit=10, secret_name=name)
if not secret:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Secret not found"
)
return templates.TemplateResponse(
request,
"secrets/partials/tree_detail.html.j2",
{
"secret": secret,
"groups": groups,
"events": events,
},
)
@app.get("/secrets/partial/group/{name}")
async def get_group_details(
request: Request,
name: str,
admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
):
"""Get group details partial."""
group = await admin.get_secret_group(name)
if not group:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Group not found"
)
clients = await admin.get_clients()
return templates.TemplateResponse(
request,
"secrets/partials/group_detail.html.j2",
{
"name": group.group_name,
"description": group.description,
"clients": clients,
},
)
@app.delete("/secrets/group/{name}")
async def delete_secret_group(
request: Request,
name: str,
admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
):
"""Delete a secret group."""
group = await admin.get_secret_group(name)
if not group:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Group not found"
)
await admin.delete_secret_group(name)
headers = {"Hx-Refresh": "true"}
return templates.TemplateResponse(
request,
"secrets/partials/default_detail.html.j2",
headers=headers,
)
@app.post("/secrets/group/")
async def create_group(
request: Request,
admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
group: Annotated[SecretGroupCreate, Form()],
):
"""Create group."""
LOG.info("Creating secret group: %r", group)
await admin.add_secret_group(
group_name=group.name,
description=group.description,
parent_group=group.parent_group,
)
headers = {"Hx-Refresh": "true"}
return templates.TemplateResponse(
request,
"secrets/partials/default_detail.html.j2",
headers=headers,
)
@app.put("/secrets/set-group/{name}")
async def set_secret_group(
request: Request,
name: str,
admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
group_name: Annotated[str, Form()],
):
"""Move a secret to a group."""
secret = await admin.get_secret(name)
if not secret:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Secret not found"
)
if group_name == "__ROOT":
await admin.set_secret_group(name, None)
else:
group = await admin.get_secret_group(group_name)
if not group:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Group not found"
)
await admin.set_secret_group(name, group_name)
groups = await admin.get_secret_groups()
events = await admin.get_audit_log_detailed(limit=10, secret_name=secret.name)
secret = await admin.get_secret(name)
headers = {"Hx-Refresh": "true"}
return templates.TemplateResponse(
request,
"secrets/partials/tree_detail.html.j2",
{
"secret": secret,
"groups": groups,
"events": events,
},
headers=headers,
)
@app.put("/secrets/partial/group/{name}/description")
async def update_group_description(
request: Request,
name: str,
admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
description: Annotated[str, Form()],
):
"""Update group description."""
group = await admin.get_secret_group(name)
if not group:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Group not found"
)
await admin.set_group_description(group_name=name, description=description)
clients = await admin.get_clients()
headers = {"Hx-Refresh": "true"}
return templates.TemplateResponse(
request,
"secrets/partials/group_detail.html.j2",
{
"name": group.group_name,
"description": group.description,
"clients": clients,
},
headers=headers,
)
@app.put("/secrets/partial/secret/{name}/value")
async def update_secret_value_inline(
request: Request,
name: str,
secret_value: Annotated[str, Form()],
current_user: Annotated[LocalUserInfo, Depends(dependencies.get_user_info)],
admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
):
"""Update secret value."""
secret = await admin.get_secret(name)
if not secret:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Secret not found"
)
origin = "UNKNOWN"
if request.client:
origin = request.client.host
await admin.write_audit_message(
operation=Operation.UPDATE,
message="Secret was updated via admin interface",
secret_name=name,
origin=origin,
username=current_user.display_name,
)
await admin.update_secret(name, secret_value)
secret = await admin.get_secret(name)
return templates.TemplateResponse(
request,
"secrets/partials/secret_value.html.j2",
{
"secret": secret,
"updated": True,
},
)
@app.get("/secrets/partial/{name}/viewsecret")
async def view_secret_in_tree(
request: Request,
name: str,
admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
current_user: Annotated[LocalUserInfo, Depends(dependencies.get_user_info)],
):
"""View secret inline partial."""
secret = await admin.get_secret(name)
if not secret:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Secret not found"
)
origin = "UNKNOWN"
if request.client:
origin = request.client.host
await admin.write_audit_message(
operation=Operation.READ,
message="Secret viewed",
secret_name=name,
origin=origin,
username=current_user.display_name,
)
return templates.TemplateResponse(
request,
"secrets/partials/secret_value.html.j2",
{
"secret": secret,
"updated": False,
},
)
@app.post("/secrets/create/group/{name}")
async def add_secret_in_group(
request: Request,
name: str,
admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
secret: Annotated[CreateSecret, Form()],
):
"""Create secret in group."""
LOG.info("secret: %s", secret.model_dump_json(indent=2))
if secret.value:
value = secret.value
else:
value = pysecrets.token_urlsafe(32)
await admin.add_secret(secret.name, value, secret.clients, group=name)
headers = {"Hx-Refresh": "true"}
new_secret = await admin.get_secret(secret.name)
groups = await admin.get_secret_groups()
events = await admin.get_audit_log_detailed(limit=10, secret_name=secret.name)
if not new_secret:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Secret not found"
)
return templates.TemplateResponse(
request,
"secrets/partials/tree_detail.html.j2",
{
"secret": new_secret,
"groups": groups,
"events": events,
},
headers=headers,
)
@app.post("/secrets/create/root")
async def add_secret_in_root(
request: Request,
admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
secret: Annotated[CreateSecret, Form()],
):
"""Create secret in the root."""
LOG.info("secret: %s", secret.model_dump_json(indent=2))
if secret.value:
value = secret.value
else:
value = pysecrets.token_urlsafe(32)
await admin.add_secret(secret.name, value, secret.clients, group=None)
headers = {"Hx-Refresh": "true"}
new_secret = await admin.get_secret(secret.name)
groups = await admin.get_secret_groups()
events = await admin.get_audit_log_detailed(limit=10, secret_name=secret.name)
if not new_secret:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Secret not found"
)
return templates.TemplateResponse(
request,
"secrets/partials/tree_detail.html.j2",
{
"secret": new_secret,
"groups": groups,
"events": events,
},
headers=headers,
)
@app.delete("/secrets/{name}/clients/{client_name}")
async def remove_client_secret_access(
request: Request,
name: str,
client_name: str,
admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
):
"""Remove a client's access to a secret."""
client = await admin.get_client(client_name)
if not client:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Client not found."
)
await admin.delete_client_secret(str(client.id), name)
clients = await admin.get_clients()
secret = await admin.get_secret(name)
if not secret:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Secret not found."
)
return templates.TemplateResponse(
request,
"secrets/partials/client_list_inner.html.j2",
{"clients": clients, "secret": secret},
)
@app.get("/secrets/{name}/clients/")
async def show_secret_client_add(
request: Request,
name: str,
admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
):
"""Show partial to add new client to a secret."""
clients = await admin.get_clients()
secret = await admin.get_secret(name)
if not secret:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Secret not found."
)
return templates.TemplateResponse(
request,
"secrets/partials/client_assign.html.j2",
{
"clients": clients,
"secret": secret,
},
)
@app.post("/secrets/{name}/clients/")
async def add_secret_to_client(
request: Request,
name: str,
client: Annotated[str, Form()],
admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
):
"""Add a secret to a client."""
await admin.create_client_secret(client, name)
secret = await admin.get_secret(name)
if not secret:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Secret not found."
)
clients = await admin.get_clients()
return templates.TemplateResponse(
request,
"secrets/partials/client_secret_details.html.j2",
{
"secret": secret,
"clients": clients,
},
)
# @app.delete("/secrets/{name}")
# async def delete_secret(
# request: Request,
# name: str,
# admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
# ):
# """Delete a secret."""
# await admin.delete_secret(name)
# clients = await admin.get_clients()
# secrets = await admin.get_detailed_secrets()
# headers = {"Hx-Refresh": "true"}
# return templates.TemplateResponse(
# request,
# "secrets/inner.html.j2",
# {
# "clients": clients,
# "secrets": secrets,
# },
# headers=headers,
# )
return app