Implement password change flow

This commit is contained in:
2025-05-30 16:44:55 +02:00
parent 2585eb1fb3
commit f853ca81d0
8 changed files with 353 additions and 33 deletions

View File

@ -3,11 +3,17 @@
# pyright: reportUnusedFunction=false
import logging
from typing import Annotated
from fastapi import APIRouter, Depends, Request
from fastapi import APIRouter, Depends, Form, Request
from fastapi.responses import RedirectResponse
from pydantic import BaseModel
from sshecret_admin.auth import LocalUserInfo
from sqlalchemy.ext.asyncio import AsyncSession
from sshecret_admin.auth import LocalUserInfo, authenticate_user_async
from sshecret_admin.auth.authentication import hash_password
from sshecret_admin.frontend.exceptions import RedirectException
from sshecret_admin.services import AdminBackend
from starlette.datastructures import URL
from sshecret.backend.models import Operation
from ..dependencies import FrontendDependencies
@ -25,6 +31,14 @@ class StatsView(BaseModel):
audit_events: int = 0
class PasswordChangeForm(BaseModel):
"""Password change form data."""
current_password: str
password: str
confirm_password: str
async def get_stats(admin: AdminBackend) -> StatsView:
"""Get stats for the frontpage."""
clients = await admin.get_clients()
@ -75,4 +89,102 @@ def create_router(dependencies: FrontendDependencies) -> APIRouter:
},
)
@app.get("/password")
async def get_change_password(
request: Request,
current_user: Annotated[LocalUserInfo, Depends(dependencies.get_user_info)],
):
"""Render Change password site."""
if not current_user.local:
LOG.debug("User tried to change password, but is not a local user.")
return RedirectException(to=URL("/"))
return templates.TemplateResponse(
request,
"change_password/index.html.j2",
{
"page_title": "Change Password",
"user": current_user,
"errors": [],
},
)
@app.post("/password")
async def change_password(
request: Request,
current_user: Annotated[LocalUserInfo, Depends(dependencies.get_user_info)],
admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
session: Annotated[AsyncSession, Depends(dependencies.get_async_session)],
passwd_form: Annotated[PasswordChangeForm, Form()],
):
"""Change password."""
errors: list[str] = []
user = await authenticate_user_async(
session, current_user.display_name, passwd_form.current_password
)
new_password_matches = passwd_form.password == passwd_form.confirm_password
if not user:
errors.append("Invalid current password entered")
if not new_password_matches:
errors.append("Passwords do not match")
if errors:
return templates.TemplateResponse(
request,
"change_password/index.html.j2",
{
"page_title": "Change Password",
"user": current_user,
"errors": errors,
},
)
assert user is not None
new_password_hash = hash_password(passwd_form.password)
user.hashed_password = new_password_hash
session.add(user)
await session.commit()
origin = "UNKNOWN"
if request.client:
origin = request.client.host
await admin.write_audit_message(
Operation.UPDATE,
"User changed their password",
origin,
username=user.username,
)
return templates.TemplateResponse(
request,
"change_password/success.html.j2",
{
"page_title": "Change Password success",
"user": current_user,
},
)
@app.post("/password/validate-confirm")
async def validate_password_match(
request: Request,
password: Annotated[str, Form()],
confirm_password: Annotated[str, Form()],
):
"""Validate password matches."""
valid = "/change_password/valid_password.html.j2"
invalid = "/change_password/invalid_password.html.j2"
template = valid
if password != confirm_password:
template = invalid
LOG.info("Password matches: %r", (password == confirm_password))
return templates.TemplateResponse(
request,
template,
{
"password": password,
"confirm_password": confirm_password,
},
)
return app