Files
sshecret/packages/sshecret-admin/src/sshecret_admin/api/router.py

128 lines
4.1 KiB
Python

"""Main API Router."""
# pyright: reportUnusedFunction=false
import logging
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException, Request, status
from fastapi.security import OAuth2PasswordBearer
from fastapi.security.utils import get_authorization_scheme_param
from sqlalchemy import select
from sqlalchemy.orm import Session
from sshecret_admin.services.admin_backend import AdminBackend
from sshecret_admin.core.dependencies import BaseDependencies, AdminDependencies
from sshecret_admin.auth import PasswordDB, User, decode_token
from sshecret_admin.auth.constants import LOCAL_ISSUER
from .endpoints import auth, clients, secrets
LOG = logging.getLogger(__name__)
API_VERSION = "v1"
def create_router(dependencies: BaseDependencies) -> APIRouter:
"""Create clients router."""
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/token", refreshUrl="/api/v1/refresh")
async def get_current_user(
token: Annotated[str, Depends(oauth2_scheme)],
session: Annotated[Session, Depends(dependencies.get_db_session)],
) -> User:
"""Get current user from token."""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
token_data = decode_token(dependencies.settings, token)
if not token_data:
raise credentials_exception
if token_data.provider == LOCAL_ISSUER:
user = session.scalars(
select(User).where(User.username == token_data.sub)
).first()
else:
user = session.scalars(
select(User)
.where(User.oidc_issuer == token_data.provider)
.where(User.oidc_sub == token_data.sub)
).first()
if not user:
raise credentials_exception
return user
def get_client_origin(request: Request) -> str:
"""Get client origin."""
fallback_origin = "UNKNOWN"
if request.client:
return request.client.host
return fallback_origin
def get_optional_username(request: Request) -> str | None:
"""Get username, if available.
This is purely used for auditing purposes.
"""
authorization = request.headers.get("Authorization")
scheme, param = get_authorization_scheme_param(authorization)
if not authorization or scheme.lower() != "bearer":
return None
claims = decode_token(dependencies.settings, param)
if not claims:
return None
if claims.provider == LOCAL_ISSUER:
return claims.sub
return f"oidc:{claims.email}"
async def get_current_active_user(
current_user: Annotated[User, Depends(get_current_user)],
) -> User:
"""Get current active user."""
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive or disabled user")
return current_user
async def get_admin_backend(
request: Request,
session: Annotated[Session, Depends(dependencies.get_db_session)],
):
"""Get admin backend API."""
username = get_optional_username(request)
origin = get_client_origin(request)
password_db = session.scalars(
select(PasswordDB).where(PasswordDB.id == 1)
).first()
if not password_db:
raise HTTPException(
500, detail="Error: The password manager has not yet been set up."
)
admin = AdminBackend(
dependencies.settings,
username=username,
origin=origin,
)
yield admin
app = APIRouter(prefix=f"/api/{API_VERSION}")
endpoint_deps = AdminDependencies.create(
dependencies, get_admin_backend, get_current_active_user
)
LOG.debug("Registering sub-routers")
app.include_router(auth.create_router(endpoint_deps))
app.include_router(clients.create_router(endpoint_deps))
app.include_router(secrets.create_router(endpoint_deps))
return app