Files
sshecret/packages/sshecret-admin/src/sshecret_admin/frontend/views/auth.py

184 lines
5.9 KiB
Python

"""Authentication related views factory."""
# pyright: reportUnusedFunction=false
import logging
from pydantic import BaseModel
from typing import Annotated
from fastapi import APIRouter, Depends, Query, Request, Response, status
from fastapi.responses import RedirectResponse
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.ext.asyncio import AsyncSession
from sshecret_admin.services import AdminBackend
from starlette.datastructures import URL
from sshecret_admin.auth import (
User,
authenticate_user_async,
create_access_token,
create_refresh_token,
)
from sshecret.backend.models import Operation
from ..dependencies import FrontendDependencies
from ..exceptions import RedirectException
LOG = logging.getLogger(__name__)
class LoginError(BaseModel):
"""Login error."""
title: str
message: str
async def audit_login_failure(admin: AdminBackend, username: str, request: Request) -> None:
"""Write login failure to audit log."""
origin: str | None = None
if request.client:
origin = request.client.host
await admin.write_audit_message(
operation=Operation.DENY,
message="Login failed",
origin=origin or "UNKNOWN",
username=username,
)
def create_router(dependencies: FrontendDependencies) -> APIRouter:
"""Create auth router."""
app = APIRouter()
templates = dependencies.templates
@app.get("/login")
async def get_login(
request: Request,
login_status: Annotated[bool, Depends(dependencies.get_login_status)],
error_title: str | None = None,
error_message: str | None = None,
):
"""Get index."""
if login_status:
return RedirectResponse("/dashboard")
login_error: LoginError | None = None
if error_title and error_message:
login_error = LoginError(title=error_title, message=error_message)
return templates.TemplateResponse(
request,
"login.html",
{
"page_title": "Login",
"page_description": "Login page.",
"login_error": login_error,
},
)
@app.post("/login")
async def login_user(
request: Request,
response: Response,
session: Annotated[AsyncSession, Depends(dependencies.get_async_session)],
admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
next: Annotated[str, Query()] = "/dashboard",
error_title: str | None = None,
error_message: str | None = None,
):
"""Log in user."""
if error_title and error_message:
login_error = LoginError(title=error_title, message=error_message)
return templates.TemplateResponse(
request,
"login.html",
{
"page_title": "Login",
"page_description": "Login page.",
"login_error": login_error,
},
)
user = await authenticate_user_async(session, form_data.username, form_data.password)
login_failed = RedirectException(
to=URL("/login").include_query_params(
error_title="Login Error", error_message="Invalid username or password"
)
)
if not user:
await audit_login_failure(admin, form_data.username, request)
raise login_failed
token_data: dict[str, str] = {"sub": user.username}
access_token = create_access_token(dependencies.settings, data=token_data)
refresh_token = create_refresh_token(dependencies.settings, data=token_data)
response = RedirectResponse(url=next, status_code=status.HTTP_302_FOUND)
response.set_cookie(
"access_token",
value=access_token,
httponly=True,
secure=False,
samesite="strict",
)
response.set_cookie(
"refresh_token",
value=refresh_token,
httponly=True,
secure=False,
samesite="strict",
)
origin = "UNKNOWN"
if request.client:
origin = request.client.host
await admin.write_audit_message(
operation=Operation.LOGIN,
message="Logged in to admin frontend",
origin=origin,
username=form_data.username,
)
return response
@app.get("/refresh")
async def get_refresh_token(
response: Response,
user: Annotated[User, Depends(dependencies.get_user_from_refresh_token)],
next: Annotated[str, Query()],
):
"""Refresh tokens.
We might as well refresh the long-lived one here.
"""
token_data: dict[str, str] = {"sub": user.username}
access_token = create_access_token(dependencies.settings, data=token_data)
refresh_token = create_refresh_token(dependencies.settings, data=token_data)
response = RedirectResponse(url=next, status_code=status.HTTP_302_FOUND)
response.set_cookie(
"access_token",
value=access_token,
httponly=True,
secure=False,
samesite="strict",
)
response.set_cookie(
"refresh_token",
value=refresh_token,
httponly=True,
secure=False,
samesite="strict",
)
return response
@app.get("/logout")
async def logout(
response: Response,
):
"""Log out user."""
response = RedirectResponse(url="/login", status_code=status.HTTP_302_FOUND)
response.delete_cookie("refresh_token", httponly=True, secure=False, samesite="strict")
response.delete_cookie("access_token", httponly=True, secure=False, samesite="strict")
return response
return app