Files
sshecret/packages/sshecret-admin/src/sshecret_admin/frontend/views/index.py

191 lines
5.9 KiB
Python

"""Front page view factory."""
# pyright: reportUnusedFunction=false
import logging
from typing import Annotated
from fastapi import APIRouter, Depends, Form, Request
from fastapi.responses import RedirectResponse
from pydantic import BaseModel
from sqlalchemy.ext.asyncio import AsyncSession
from sshecret_admin.auth import LocalUserInfo, authenticate_user_async
from sshecret_admin.auth.authentication import hash_password
from sshecret_admin.frontend.exceptions import RedirectException
from sshecret_admin.services import AdminBackend
from starlette.datastructures import URL
from sshecret.backend.models import Operation
from ..dependencies import FrontendDependencies
LOG = logging.getLogger(__name__)
START_PAGE = "/dashboard"
LOGIN_PAGE = "/login"
class StatsView(BaseModel):
"""Stats for the frontend."""
clients: int = 0
secrets: int = 0
audit_events: int = 0
class PasswordChangeForm(BaseModel):
"""Password change form data."""
current_password: str
password: str
confirm_password: str
async def get_stats(admin: AdminBackend) -> StatsView:
"""Get stats for the frontpage."""
clients = await admin.get_clients()
secrets = await admin.get_secrets()
audit = await admin.get_audit_log_count()
return StatsView(clients=len(clients), secrets=len(secrets), audit_events=audit)
def create_router(dependencies: FrontendDependencies) -> APIRouter:
"""Create auth router."""
app = APIRouter()
templates = dependencies.templates
@app.get("/")
def get_index(logged_in: Annotated[bool, Depends(dependencies.get_login_status)]):
"""Get the index."""
next = LOGIN_PAGE
if logged_in:
next = START_PAGE
return RedirectResponse(url=next)
@app.get("/dashboard")
async def get_dashboard(
request: Request,
current_user: Annotated[LocalUserInfo, Depends(dependencies.get_user_info)],
admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
):
"""Dashboard for mocking up the dashboard."""
stats = await get_stats(admin)
last_login_events = await admin.get_audit_log_detailed(
limit=5, operation="login"
)
last_audit_events = await admin.get_audit_log_detailed(limit=10)
LOG.info("CurrentUser: %r", current_user)
return templates.TemplateResponse(
request,
"dashboard.html",
{
"page_title": "sshecret",
"user": current_user,
"stats": stats,
"last_login_events": last_login_events,
"last_audit_events": last_audit_events,
},
)
@app.get("/password")
async def get_change_password(
request: Request,
current_user: Annotated[LocalUserInfo, Depends(dependencies.get_user_info)],
):
"""Render Change password site."""
if not current_user.local:
LOG.debug("User tried to change password, but is not a local user.")
return RedirectException(to=URL("/"))
return templates.TemplateResponse(
request,
"change_password/index.html.j2",
{
"page_title": "Change Password",
"user": current_user,
"errors": [],
},
)
@app.post("/password")
async def change_password(
request: Request,
current_user: Annotated[LocalUserInfo, Depends(dependencies.get_user_info)],
admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
session: Annotated[AsyncSession, Depends(dependencies.get_async_session)],
passwd_form: Annotated[PasswordChangeForm, Form()],
):
"""Change password."""
errors: list[str] = []
user = await authenticate_user_async(
session, current_user.display_name, passwd_form.current_password
)
new_password_matches = passwd_form.password == passwd_form.confirm_password
if not user:
errors.append("Invalid current password entered")
if not new_password_matches:
errors.append("Passwords do not match")
if errors:
return templates.TemplateResponse(
request,
"change_password/index.html.j2",
{
"page_title": "Change Password",
"user": current_user,
"errors": errors,
},
)
assert user is not None
new_password_hash = hash_password(passwd_form.password)
user.hashed_password = new_password_hash
session.add(user)
await session.commit()
origin = "UNKNOWN"
if request.client:
origin = request.client.host
await admin.write_audit_message(
Operation.UPDATE,
"User changed their password",
origin,
username=user.username,
)
return templates.TemplateResponse(
request,
"change_password/success.html.j2",
{
"page_title": "Change Password success",
"user": current_user,
},
)
@app.post("/password/validate-confirm")
async def validate_password_match(
request: Request,
password: Annotated[str, Form()],
confirm_password: Annotated[str, Form()],
):
"""Validate password matches."""
valid = "/change_password/valid_password.html.j2"
invalid = "/change_password/invalid_password.html.j2"
template = valid
if password != confirm_password:
template = invalid
LOG.info("Password matches: %r", (password == confirm_password))
return templates.TemplateResponse(
request,
template,
{
"password": password,
"confirm_password": confirm_password,
},
)
return app