Stats and error handling

This commit is contained in:
2025-07-15 13:24:50 +02:00
parent 6a5149fd4c
commit 412a84150e
13 changed files with 247 additions and 28 deletions

View File

@ -18,7 +18,7 @@ from sshecret_admin.services.models import (
)
from sshecret.backend.identifiers import ClientIdParam, FlexID, KeySpec
from sshecret.backend.models import ClientQueryResult, ClientReference, FilterType
from sshecret.backend.models import ClientQueryResult, ClientReference, FilterType, SystemStats
LOG = logging.getLogger(__name__)
@ -210,4 +210,11 @@ def create_router(dependencies: AdminDependencies) -> APIRouter:
"""Add secret to a client."""
await admin.create_client_secret(_id(id), secret_name)
@app.get("/stats")
async def get_system_stats(
admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
) -> SystemStats:
"""Get system stats."""
return await admin.get_system_stats()
return app

View File

@ -25,6 +25,8 @@ from sshecret_admin.core.db import setup_database
from sshecret_admin.frontend.exceptions import RedirectException
from sshecret_admin.services.secret_manager import setup_private_key
from sshecret.backend.exceptions import BackendError, BackendValidationError
from .dependencies import BaseDependencies
from .settings import AdminServerSettings
@ -72,13 +74,13 @@ def create_admin_app(
app = FastAPI(lifespan=lifespan)
app.add_middleware(SessionMiddleware, secret_key=settings.secret_key)
origins = [ settings.frontend_origin ]
origins = [settings.frontend_origin]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"]
allow_headers=["*"],
)
@app.exception_handler(RequestValidationError)
@ -90,6 +92,24 @@ def create_admin_app(
content=jsonable_encoder({"detail": exc.errors(), "body": exc.body}),
)
@app.exception_handler(BackendValidationError)
async def validation_backend_validation_exception_handler(
request: Request, exc: BackendValidationError
):
return JSONResponse(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
content=jsonable_encoder({"detail": exc.errors()}),
)
@app.exception_handler(BackendError)
async def validation_backend_exception_handler(
request: Request, exc: BackendValidationError
):
return JSONResponse(
status_code=status.HTTP_400_BAD_REQUEST,
content=jsonable_encoder({"detail": [str(exc)]}),
)
@app.exception_handler(RedirectException)
async def redirect_handler(request: Request, exc: RedirectException) -> Response:
"""Handle redirect exceptions."""

View File

@ -17,8 +17,14 @@ from sshecret.backend import (
Operation,
SubSystem,
)
from sshecret.backend.exceptions import BackendError, BackendValidationError
from sshecret.backend.identifiers import KeySpec
from sshecret.backend.models import ClientQueryResult, ClientReference, DetailedSecrets
from sshecret.backend.models import (
ClientQueryResult,
ClientReference,
DetailedSecrets,
SystemStats,
)
from sshecret.backend.api import AuditAPI
from sshecret.crypto import encrypt_string, load_public_key
@ -209,7 +215,14 @@ class AdminBackend:
return await self._create_client(name, public_key, description, sources)
except ClientManagementError:
raise
except BackendValidationError as e:
LOG.error("Validation error: %s", e, exc_info=True)
raise e
except BackendError:
raise
except Exception as e:
LOG.error("Exception: %s", e, exc_info=True)
raise BackendUnavailableError() from e
async def _update_client_public_key(
@ -518,9 +531,9 @@ class AdminBackend:
secret: str | None = None
async with self.secrets_manager() as password_manager:
secret = await password_manager.get_secret(name)
secret_group: str | None = None
secret_group: GroupReference | None = None
if secret:
secret_group = await password_manager.get_entry_group(name)
secret_group = await password_manager.get_entry_group_info(name)
secret_view = SecretView(name=name, secret=secret, group=secret_group)
@ -535,6 +548,10 @@ class AdminBackend:
for ref in secret_mapping.clients
]
if not secret_mapping and not secret_group:
# This secret is effectively deleted.
return None
return secret_view
async def delete_secret(self, name: str) -> None:
@ -653,6 +670,10 @@ class AdminBackend:
except Exception as e:
raise BackendUnavailableError() from e
async def get_system_stats(self) -> SystemStats:
"""Get system stats."""
return await self.backend.get_system_stats()
@property
def audit(self) -> AuditAPI:
"""Resolve audit API."""

View File

@ -32,12 +32,23 @@ class SecretListView(BaseModel):
clients: list[str] = Field(default_factory=list) # Clients that have access to it.
class GroupReference(BaseModel):
"""Reference to a group.
This will be used for references to parent groups to avoid circular
references.
"""
group_name: str
path: str
class SecretView(BaseModel):
"""Model containing a secret, including its clear-text value."""
name: str
secret: str | None
group: str | None = None
group: GroupReference | None = None
clients: list[ClientReference] = Field(
default_factory=list
) # Clients that have access to it.
@ -147,17 +158,6 @@ class SecretClientMapping(BaseModel):
clients: list[ClientReference] = Field(default_factory=list)
class GroupReference(BaseModel):
"""Reference to a group.
This will be used for references to parent groups to avoid circular
references.
"""
group_name: str
path: str
class ClientSecretGroup(BaseModel):
"""Client secrets grouped."""

View File

@ -32,7 +32,7 @@ from sshecret_admin.auth import PasswordDB
from sshecret_admin.auth.models import Group, ManagedSecret
from sshecret_admin.core.db import DatabaseSessionManager
from sshecret_admin.core.settings import AdminServerSettings
from sshecret_admin.services.models import SecretGroup
from sshecret_admin.services.models import GroupReference, SecretGroup
KEY_FILENAME = "sshecret-admin-key"
@ -430,6 +430,17 @@ class AsyncSecretContext:
return entry.group.name
return None
async def get_entry_group_info(self, entry_name: str) -> GroupReference | None:
"""Get group of entry, with path."""
entry = await self._get_entry(entry_name)
if not entry:
raise InvalidSecretNameError("Invalid secret name or secret not found.")
if not entry.group:
return None
group = await self._get_group_by_id(entry.group.id)
group_tree = await self._build_group_tree(group)
return GroupReference(group_name=entry.group.name, path=group_tree.path)
async def _get_groups(
self, pattern: str | None = None, regex: bool = True, root_groups: bool = False
) -> list[Group]: