Implement password change API endpoint

This commit is contained in:
2025-07-16 08:39:22 +02:00
parent 45ae0929e6
commit 37f381c884
2 changed files with 54 additions and 1 deletions

View File

@ -3,19 +3,25 @@
# pyright: reportUnusedFunction=false # pyright: reportUnusedFunction=false
import logging import logging
from typing import Annotated, Literal from typing import Annotated, Literal
from fastapi import APIRouter, Depends, Form, HTTPException, status from fastapi import APIRouter, Depends, Form, HTTPException, Request, Security, status
from fastapi.security import OAuth2PasswordRequestForm from fastapi.security import OAuth2PasswordRequestForm
from pydantic import BaseModel from pydantic import BaseModel
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from sshecret_admin.auth import ( from sshecret_admin.auth import (
Token, Token,
User,
authenticate_user_async, authenticate_user_async,
create_access_token, create_access_token,
create_refresh_token, create_refresh_token,
decode_token, decode_token,
) )
from sshecret_admin.auth.authentication import hash_password
from sshecret_admin.core.dependencies import AdminDependencies from sshecret_admin.core.dependencies import AdminDependencies
from sshecret_admin.services import AdminBackend
from sshecret_admin.services.models import UserPasswordChange
from sshecret.backend.models import Operation
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -84,4 +90,36 @@ def create_router(dependencies: AdminDependencies) -> APIRouter:
access_token=access_token, refresh_token=refresh_token, token_type="bearer" access_token=access_token, refresh_token=refresh_token, token_type="bearer"
) )
@app.post("/password")
async def change_password(
request: Request,
current_user: Annotated[User, Security(dependencies.get_current_active_user)],
admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
session: Annotated[AsyncSession, Depends(dependencies.get_async_session)],
password_form: UserPasswordChange,
) -> None:
"""Change user password"""
user = await authenticate_user_async(
session, current_user.username, password_form.current_password
)
if not user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid current password",
)
new_password_hash = hash_password(password_form.new_password)
user.hashed_password = new_password_hash
session.add(user)
await session.commit()
origin = "UNKNOWN"
if request.client:
origin = request.client.host
await admin.write_audit_message(
Operation.UPDATE,
message="User changed their password",
origin=origin,
username=user.username,
)
return app return app

View File

@ -257,3 +257,18 @@ class AuditQueryFilter(AuditFilter):
offset: int = 0 offset: int = 0
limit: int = 100 limit: int = 100
class UserPasswordChange(BaseModel):
"""Model for changing the password of a user."""
current_password: str
new_password: str
new_password_confirm: str
@model_validator(mode="after")
def validate_passwords(self) -> Self:
"""Validate that the passwords match."""
if self.new_password != self.new_password_confirm:
raise ValueError("Passwords don't match")
return self