Files
sshecret/packages/sshecret-backend/src/sshecret_backend/backend_api.py
2025-06-08 17:43:34 +02:00

69 lines
2.1 KiB
Python

"""Backend API."""
from collections.abc import AsyncGenerator
import logging
from typing import Annotated
from fastapi import APIRouter, Depends, Header, HTTPException
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sshecret_backend.db import DatabaseSessionManager
from sshecret_backend.settings import BackendSettings
from .api.audit.router import create_audit_router
from .api.secrets.router import create_client_secrets_router
from .api.clients.router import create_client_router
from .auth import verify_token
from .models import (
APIClient,
)
LOG = logging.getLogger(__name__)
API_VERSION = "v1"
def get_backend_api(
settings: BackendSettings,
) -> APIRouter:
"""Construct backend API."""
async def get_db_session() -> AsyncGenerator[AsyncSession, None]:
sessionmanager = DatabaseSessionManager(settings.async_db_url)
async with sessionmanager.session() as session:
yield session
async def validate_token(
x_api_token: Annotated[str, Header()],
session: Annotated[AsyncSession, Depends(get_db_session)],
) -> str:
"""Validate token."""
LOG.debug("Validating token %s", x_api_token)
statement = select(APIClient)
results = await session.scalars(statement)
valid = False
for result in results.all():
if verify_token(x_api_token, result.token):
valid = True
LOG.debug("Token is valid")
break
if not valid:
LOG.debug("Token is not valid.")
raise HTTPException(
status_code=401, detail="unauthorized. invalid api token."
)
return x_api_token
LOG.info("Initializing app.")
backend_api = APIRouter(
prefix=f"/api/{API_VERSION}",
dependencies=[Depends(validate_token)],
)
backend_api.include_router(create_audit_router(get_db_session))
backend_api.include_router(create_client_router(get_db_session))
backend_api.include_router(create_client_secrets_router(get_db_session))
return backend_api