From 37f381c884ce9caee56d05e0b44f535f26adc35e Mon Sep 17 00:00:00 2001 From: Allan Eising Date: Wed, 16 Jul 2025 08:39:22 +0200 Subject: [PATCH] Implement password change API endpoint --- .../src/sshecret_admin/api/endpoints/auth.py | 40 ++++++++++++++++++- .../src/sshecret_admin/services/models.py | 15 +++++++ 2 files changed, 54 insertions(+), 1 deletion(-) diff --git a/packages/sshecret-admin/src/sshecret_admin/api/endpoints/auth.py b/packages/sshecret-admin/src/sshecret_admin/api/endpoints/auth.py index 8f4b8b8..3c3f781 100644 --- a/packages/sshecret-admin/src/sshecret_admin/api/endpoints/auth.py +++ b/packages/sshecret-admin/src/sshecret_admin/api/endpoints/auth.py @@ -3,19 +3,25 @@ # pyright: reportUnusedFunction=false import logging from typing import Annotated, Literal -from fastapi import APIRouter, Depends, Form, HTTPException, status +from fastapi import APIRouter, Depends, Form, HTTPException, Request, Security, status from fastapi.security import OAuth2PasswordRequestForm from pydantic import BaseModel from sqlalchemy.ext.asyncio import AsyncSession from sshecret_admin.auth import ( Token, + User, authenticate_user_async, create_access_token, create_refresh_token, decode_token, ) +from sshecret_admin.auth.authentication import hash_password from sshecret_admin.core.dependencies import AdminDependencies +from sshecret_admin.services import AdminBackend +from sshecret_admin.services.models import UserPasswordChange + +from sshecret.backend.models import Operation LOG = logging.getLogger(__name__) @@ -84,4 +90,36 @@ def create_router(dependencies: AdminDependencies) -> APIRouter: access_token=access_token, refresh_token=refresh_token, token_type="bearer" ) + @app.post("/password") + async def change_password( + request: Request, + current_user: Annotated[User, Security(dependencies.get_current_active_user)], + admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], + session: Annotated[AsyncSession, Depends(dependencies.get_async_session)], + password_form: UserPasswordChange, + ) -> None: + """Change user password""" + user = await authenticate_user_async( + session, current_user.username, password_form.current_password + ) + if not user: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Invalid current password", + ) + new_password_hash = hash_password(password_form.new_password) + user.hashed_password = new_password_hash + session.add(user) + await session.commit() + origin = "UNKNOWN" + if request.client: + origin = request.client.host + + await admin.write_audit_message( + Operation.UPDATE, + message="User changed their password", + origin=origin, + username=user.username, + ) + return app diff --git a/packages/sshecret-admin/src/sshecret_admin/services/models.py b/packages/sshecret-admin/src/sshecret_admin/services/models.py index 9c04c6d..5bbc0ed 100644 --- a/packages/sshecret-admin/src/sshecret_admin/services/models.py +++ b/packages/sshecret-admin/src/sshecret_admin/services/models.py @@ -257,3 +257,18 @@ class AuditQueryFilter(AuditFilter): offset: int = 0 limit: int = 100 + + +class UserPasswordChange(BaseModel): + """Model for changing the password of a user.""" + + current_password: str + new_password: str + new_password_confirm: str + + @model_validator(mode="after") + def validate_passwords(self) -> Self: + """Validate that the passwords match.""" + if self.new_password != self.new_password_confirm: + raise ValueError("Passwords don't match") + return self