191 lines
5.9 KiB
Python
191 lines
5.9 KiB
Python
"""Front page view factory."""
|
|
|
|
# pyright: reportUnusedFunction=false
|
|
import logging
|
|
from typing import Annotated
|
|
from fastapi import APIRouter, Depends, Form, Request
|
|
from fastapi.responses import RedirectResponse
|
|
from pydantic import BaseModel
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sshecret_admin.auth import LocalUserInfo, authenticate_user_async
|
|
from sshecret_admin.auth.authentication import hash_password
|
|
from sshecret_admin.frontend.exceptions import RedirectException
|
|
from sshecret_admin.services import AdminBackend
|
|
from starlette.datastructures import URL
|
|
|
|
from sshecret.backend.models import Operation
|
|
|
|
from ..dependencies import FrontendDependencies
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
START_PAGE = "/dashboard"
|
|
LOGIN_PAGE = "/login"
|
|
|
|
|
|
class StatsView(BaseModel):
|
|
"""Stats for the frontend."""
|
|
|
|
clients: int = 0
|
|
secrets: int = 0
|
|
audit_events: int = 0
|
|
|
|
|
|
class PasswordChangeForm(BaseModel):
|
|
"""Password change form data."""
|
|
|
|
current_password: str
|
|
password: str
|
|
confirm_password: str
|
|
|
|
|
|
async def get_stats(admin: AdminBackend) -> StatsView:
|
|
"""Get stats for the frontpage."""
|
|
clients = await admin.get_clients()
|
|
secrets = await admin.get_secrets()
|
|
audit = await admin.get_audit_log_count()
|
|
return StatsView(clients=len(clients), secrets=len(secrets), audit_events=audit)
|
|
|
|
|
|
def create_router(dependencies: FrontendDependencies) -> APIRouter:
|
|
"""Create auth router."""
|
|
|
|
app = APIRouter()
|
|
templates = dependencies.templates
|
|
|
|
@app.get("/")
|
|
def get_index(logged_in: Annotated[bool, Depends(dependencies.get_login_status)]):
|
|
"""Get the index."""
|
|
next = LOGIN_PAGE
|
|
if logged_in:
|
|
next = START_PAGE
|
|
|
|
return RedirectResponse(url=next)
|
|
|
|
@app.get("/dashboard")
|
|
async def get_dashboard(
|
|
request: Request,
|
|
current_user: Annotated[LocalUserInfo, Depends(dependencies.get_user_info)],
|
|
admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
|
|
):
|
|
"""Dashboard for mocking up the dashboard."""
|
|
stats = await get_stats(admin)
|
|
last_login_events = await admin.get_audit_log_detailed(
|
|
limit=5, operation="login"
|
|
)
|
|
last_audit_events = await admin.get_audit_log_detailed(limit=10)
|
|
|
|
LOG.info("CurrentUser: %r", current_user)
|
|
|
|
return templates.TemplateResponse(
|
|
request,
|
|
"dashboard.html",
|
|
{
|
|
"page_title": "sshecret",
|
|
"user": current_user,
|
|
"stats": stats,
|
|
"last_login_events": last_login_events,
|
|
"last_audit_events": last_audit_events,
|
|
},
|
|
)
|
|
|
|
@app.get("/password")
|
|
async def get_change_password(
|
|
request: Request,
|
|
current_user: Annotated[LocalUserInfo, Depends(dependencies.get_user_info)],
|
|
):
|
|
"""Render Change password site."""
|
|
if not current_user.local:
|
|
LOG.debug("User tried to change password, but is not a local user.")
|
|
return RedirectException(to=URL("/"))
|
|
|
|
return templates.TemplateResponse(
|
|
request,
|
|
"change_password/index.html.j2",
|
|
{
|
|
"page_title": "Change Password",
|
|
"user": current_user,
|
|
"errors": [],
|
|
},
|
|
)
|
|
|
|
@app.post("/password")
|
|
async def change_password(
|
|
request: Request,
|
|
current_user: Annotated[LocalUserInfo, Depends(dependencies.get_user_info)],
|
|
admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
|
|
session: Annotated[AsyncSession, Depends(dependencies.get_async_session)],
|
|
passwd_form: Annotated[PasswordChangeForm, Form()],
|
|
):
|
|
"""Change password."""
|
|
errors: list[str] = []
|
|
user = await authenticate_user_async(
|
|
session, current_user.display_name, passwd_form.current_password
|
|
)
|
|
new_password_matches = passwd_form.password == passwd_form.confirm_password
|
|
if not user:
|
|
errors.append("Invalid current password entered")
|
|
if not new_password_matches:
|
|
errors.append("Passwords do not match")
|
|
|
|
if errors:
|
|
return templates.TemplateResponse(
|
|
request,
|
|
"change_password/index.html.j2",
|
|
{
|
|
"page_title": "Change Password",
|
|
"user": current_user,
|
|
"errors": errors,
|
|
},
|
|
)
|
|
|
|
assert user is not None
|
|
new_password_hash = hash_password(passwd_form.password)
|
|
user.hashed_password = new_password_hash
|
|
session.add(user)
|
|
await session.commit()
|
|
origin = "UNKNOWN"
|
|
if request.client:
|
|
origin = request.client.host
|
|
await admin.write_audit_message(
|
|
Operation.UPDATE,
|
|
"User changed their password",
|
|
origin,
|
|
username=user.username,
|
|
)
|
|
|
|
return templates.TemplateResponse(
|
|
request,
|
|
"change_password/success.html.j2",
|
|
{
|
|
"page_title": "Change Password success",
|
|
"user": current_user,
|
|
},
|
|
)
|
|
|
|
@app.post("/password/validate-confirm")
|
|
async def validate_password_match(
|
|
request: Request,
|
|
password: Annotated[str, Form()],
|
|
confirm_password: Annotated[str, Form()],
|
|
):
|
|
"""Validate password matches."""
|
|
valid = "/change_password/valid_password.html.j2"
|
|
invalid = "/change_password/invalid_password.html.j2"
|
|
template = valid
|
|
if password != confirm_password:
|
|
template = invalid
|
|
|
|
LOG.info("Password matches: %r", (password == confirm_password))
|
|
|
|
return templates.TemplateResponse(
|
|
request,
|
|
template,
|
|
{
|
|
"password": password,
|
|
"confirm_password": confirm_password,
|
|
},
|
|
)
|
|
|
|
return app
|