diff --git a/packages/sshecret-admin/src/sshecret_admin/core/app.py b/packages/sshecret-admin/src/sshecret_admin/core/app.py index 35854d3..f63b32a 100644 --- a/packages/sshecret-admin/src/sshecret_admin/core/app.py +++ b/packages/sshecret-admin/src/sshecret_admin/core/app.py @@ -4,25 +4,21 @@ # from collections.abc import AsyncGenerator import logging -import os from contextlib import asynccontextmanager -from pathlib import Path -from fastapi import FastAPI, Request, Response, status +from fastapi import FastAPI, Request, status from fastapi.encoders import jsonable_encoder from fastapi.exceptions import RequestValidationError -from fastapi.responses import JSONResponse, RedirectResponse -from fastapi.staticfiles import StaticFiles +from fastapi.responses import JSONResponse from fastapi.middleware.cors import CORSMiddleware from sqlalchemy.ext.asyncio import AsyncSession from sshecret_backend.db import DatabaseSessionManager from starlette.middleware.sessions import SessionMiddleware -from sshecret_admin import api, frontend +from sshecret_admin import api from sshecret_admin.auth.models import Base from sshecret_admin.core.db import setup_database -from sshecret_admin.frontend.exceptions import RedirectException from sshecret_admin.services.secret_manager import setup_private_key from sshecret.backend.exceptions import BackendError, BackendValidationError @@ -32,21 +28,9 @@ from .settings import AdminServerSettings LOG = logging.getLogger(__name__) -# dir_path = os.path.dirname(os.path.realpath(__file__)) - - -def setup_frontend(app: FastAPI, dependencies: BaseDependencies) -> None: - """Setup frontend.""" - script_path = Path(os.path.dirname(os.path.realpath(__file__))) - static_path = script_path.parent / "static" - - app.mount("/static", StaticFiles(directory=static_path), name="static") - app.include_router(frontend.create_frontend_router(dependencies)) - def create_admin_app( settings: AdminServerSettings, - with_frontend: bool = True, create_db: bool = False, ) -> FastAPI: """Create admin app.""" @@ -110,15 +94,6 @@ def create_admin_app( content=jsonable_encoder({"detail": [str(exc)]}), ) - @app.exception_handler(RedirectException) - async def redirect_handler(request: Request, exc: RedirectException) -> Response: - """Handle redirect exceptions.""" - if "hx-request" in request.headers: - response = Response() - response.headers["HX-Redirect"] = str(exc.to) - return response - return RedirectResponse(url=str(exc.to)) - @app.get("/health") async def get_health() -> JSONResponse: """Provide simple health check.""" @@ -129,7 +104,5 @@ def create_admin_app( dependencies = BaseDependencies(settings, get_db_session, get_async_session) app.include_router(api.create_api_router(dependencies)) - if with_frontend: - setup_frontend(app, dependencies) return app diff --git a/packages/sshecret-admin/src/sshecret_admin/frontend/__init__.py b/packages/sshecret-admin/src/sshecret_admin/frontend/__init__.py deleted file mode 100644 index c527245..0000000 --- a/packages/sshecret-admin/src/sshecret_admin/frontend/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -"""Frontend app.""" - -from .router import create_router as create_frontend_router - -__all__ = ["create_frontend_router"] diff --git a/packages/sshecret-admin/src/sshecret_admin/frontend/auth.py b/packages/sshecret-admin/src/sshecret_admin/frontend/auth.py deleted file mode 100644 index 7ac883a..0000000 --- a/packages/sshecret-admin/src/sshecret_admin/frontend/auth.py +++ /dev/null @@ -1,7 +0,0 @@ -"""Custom oauth2 class.""" - -from fastapi.security import OAuth2 - - -class Oauth2TokenInCookies(OAuth2): - """TODO: Create this.""" diff --git a/packages/sshecret-admin/src/sshecret_admin/frontend/dependencies.py b/packages/sshecret-admin/src/sshecret_admin/frontend/dependencies.py deleted file mode 100644 index 6ab6a27..0000000 --- a/packages/sshecret-admin/src/sshecret_admin/frontend/dependencies.py +++ /dev/null @@ -1,57 +0,0 @@ -"""Frontend dependencies.""" - -from dataclasses import dataclass -from collections.abc import AsyncGenerator, Callable, Awaitable -from typing import Self - -from sqlalchemy.orm import Session -from sqlalchemy.ext.asyncio import AsyncSession -from jinja2_fragments.fastapi import Jinja2Blocks -from fastapi import Request - -from sshecret_admin.core.dependencies import AdminDep, BaseDependencies - -from sshecret_admin.auth.models import IdentityClaims, LocalUserInfo, User - -UserTokenDep = Callable[[Request, Session], Awaitable[User]] -LoginStatusDep = Callable[[Request], Awaitable[bool]] -AsyncSessionDep = Callable[[], AsyncGenerator[AsyncSession, None]] -UserInfoDep = Callable[[Request, AsyncSession], Awaitable[LocalUserInfo]] -RefreshTokenDep = Callable[[Request], IdentityClaims] -LoginGuardDep = Callable[[Request], Awaitable[None]] - - -@dataclass -class FrontendDependencies(BaseDependencies): - """Frontend dependencies.""" - - get_admin_backend: AdminDep - templates: Jinja2Blocks - get_refresh_claims: RefreshTokenDep - get_login_status: LoginStatusDep - get_user_info: UserInfoDep - require_login: LoginGuardDep - - @classmethod - def create( - cls, - deps: BaseDependencies, - get_admin_backend: AdminDep, - templates: Jinja2Blocks, - get_refresh_claims: RefreshTokenDep, - get_login_status: LoginStatusDep, - get_user_info: UserInfoDep, - require_login: LoginGuardDep, - ) -> Self: - """Create from base dependencies.""" - return cls( - settings=deps.settings, - get_db_session=deps.get_db_session, - get_async_session=deps.get_async_session, - get_admin_backend=get_admin_backend, - templates=templates, - get_refresh_claims=get_refresh_claims, - get_login_status=get_login_status, - get_user_info=get_user_info, - require_login=require_login, - ) diff --git a/packages/sshecret-admin/src/sshecret_admin/frontend/exceptions.py b/packages/sshecret-admin/src/sshecret_admin/frontend/exceptions.py deleted file mode 100644 index 4b07eee..0000000 --- a/packages/sshecret-admin/src/sshecret_admin/frontend/exceptions.py +++ /dev/null @@ -1,14 +0,0 @@ -"""Frontend exceptions.""" - -from starlette.datastructures import URL - - -class RedirectException(Exception): - """Exception that initiates a redirect flow.""" - - def __init__(self, to: str | URL) -> None: # pyright: ignore[reportMissingSuperCall] - """Raise exception that redirects.""" - if isinstance(to, str): - to = URL(to) - - self.to: URL = to diff --git a/packages/sshecret-admin/src/sshecret_admin/frontend/router.py b/packages/sshecret-admin/src/sshecret_admin/frontend/router.py deleted file mode 100644 index aba599e..0000000 --- a/packages/sshecret-admin/src/sshecret_admin/frontend/router.py +++ /dev/null @@ -1,179 +0,0 @@ -"""Frontend router.""" - -# pyright: reportUnusedFunction=false - -import logging -import os -from pathlib import Path -from typing import Annotated - -from fastapi import APIRouter, Depends, Request - -from jinja2_fragments.fastapi import Jinja2Blocks - -from sqlalchemy import select -from sqlalchemy.ext.asyncio import AsyncSession -from sshecret_admin.auth.authentication import generate_user_info -from sshecret_admin.auth.models import AuthProvider, IdentityClaims, LocalUserInfo -from starlette.datastructures import URL - - -from sshecret_admin.auth import User, decode_token -from sshecret_admin.auth.constants import LOCAL_ISSUER - -from sshecret_admin.core.dependencies import BaseDependencies -from sshecret_admin.services.admin_backend import AdminBackend - -from .dependencies import FrontendDependencies -from .exceptions import RedirectException -from .views import audit, auth, clients, index, secrets, oidc_auth - - -LOG = logging.getLogger(__name__) - - -access_token = "access_token" -refresh_token = "refresh_token" - - -def create_router(dependencies: BaseDependencies) -> APIRouter: - """Create frontend router.""" - - app = APIRouter(include_in_schema=False) - - script_path = Path(os.path.dirname(os.path.realpath(__file__))) - - template_path = script_path / "templates" - - templates = Jinja2Blocks(directory=template_path) - - async def get_admin_backend( - request: Request, - ): - """Get admin backend API.""" - username = get_optional_username(request) - origin = get_client_origin(request) - admin = AdminBackend( - dependencies.settings, - username=username, - origin=origin, - ) - yield admin - - def get_identity_claims(request: Request) -> IdentityClaims: - """Get identity claim from session.""" - token = request.cookies.get("access_token") - next = URL("/refresh").include_query_params(next=request.url.path) - credentials_error = RedirectException(to=next) - if not token: - raise credentials_error - claims = decode_token(dependencies.settings, token) - if not claims: - raise credentials_error - return claims - - def refresh_identity_claims(request: Request) -> IdentityClaims: - """Get identity claim from session for refreshing the token.""" - token = request.cookies.get("refresh_token") - next = URL("/login").include_query_params(next=request.url.path) - credentials_error = RedirectException(to=next) - if not token: - raise credentials_error - claims = decode_token(dependencies.settings, token) - if not claims: - raise credentials_error - return claims - - async def get_login_status(request: Request) -> bool: - """Get login status.""" - token = request.cookies.get("access_token") - if not token: - return False - - claims = decode_token(dependencies.settings, token) - return claims is not None - - async def require_login(request: Request) -> None: - """Enforce login requirement.""" - token = request.cookies.get("access_token") - LOG.info("User has no cookie") - if not token: - url = URL("/login").include_query_params(next=request.url.path) - raise RedirectException(to=url) - is_logged_in = await get_login_status(request) - if not is_logged_in: - next = URL("/refresh").include_query_params(next=request.url.path) - raise RedirectException(to=next) - - async def get_user_info( - request: Request, - session: Annotated[AsyncSession, Depends(dependencies.get_async_session)], - ) -> LocalUserInfo: - """Get User information.""" - claims = get_identity_claims(request) - if claims.provider == LOCAL_ISSUER: - LOG.info("Local user, finding username %s", claims.sub) - query = ( - select(User) - .where(User.username == claims.sub) - .where(User.provider == AuthProvider.LOCAL) - ) - else: - query = ( - select(User) - .where(User.oidc_issuer == claims.provider) - .where(User.oidc_sub == claims.sub) - ) - - result = await session.scalars(query) - if user := result.first(): - if user.disabled: - raise RedirectException(to=URL("/logout")) - return generate_user_info(user) - - next = URL("/refresh").include_query_params(next=request.url.path) - raise RedirectException(to=next) - - def get_optional_username( - request: Request, - ) -> str | None: - """Get username, if available. - - This is purely used for auditing purposes. - """ - try: - claims = get_identity_claims(request) - except Exception: - return None - - if claims.provider == LOCAL_ISSUER: - return claims.sub - - return f"oidc:{claims.email}" - - def get_client_origin(request: Request) -> str: - """Get client origin.""" - fallback_origin = "UNKNOWN" - if request.client: - return request.client.host - return fallback_origin - - view_dependencies = FrontendDependencies.create( - dependencies, - get_admin_backend, - templates, - refresh_identity_claims, - get_login_status, - get_user_info, - require_login, - ) - - app.include_router(audit.create_router(view_dependencies)) - app.include_router(auth.create_router(view_dependencies)) - app.include_router(clients.create_router(view_dependencies)) - app.include_router(index.create_router(view_dependencies)) - app.include_router(secrets.create_router(view_dependencies)) - if dependencies.settings.oidc: - app.include_router(oidc_auth.create_router(view_dependencies)) - - return app diff --git a/packages/sshecret-admin/src/sshecret_admin/frontend/templates/audit/entry.html.j2 b/packages/sshecret-admin/src/sshecret_admin/frontend/templates/audit/entry.html.j2 deleted file mode 100644 index 25aa9d5..0000000 --- a/packages/sshecret-admin/src/sshecret_admin/frontend/templates/audit/entry.html.j2 +++ /dev/null @@ -1,48 +0,0 @@ -
| - Timestamp - | -- - Subsystem - - - - - | -- - Operation - - - - | -- Client - | -- Secret - | -- Message - | -- Origin - | -
|---|---|---|---|---|---|---|
|
- {{ entry.timestamp }} - -
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- {% if entry.data %}
- {% for key, value in entry.data.items() %}
-
-
- {% endfor %}
- {% endif %}
-
- |
-
- - {{ entry.subsystem }} - | - -- {{ entry.operation }} - | - -- - {% if entry.client_name %} - {{ entry.client_name }} - {% endif %} - | - -- {% if entry.secret_name %} - {{ entry.secret_name }} - {% endif %} - | -- {{ entry.message }} - | -- {{ entry.origin }} - | - -
| - ID - | -- Operation - | -- Client Name - | -- Message - | -- Origin - | -
|---|
Select an item to view details
-This is a generic page.
- {% endblock %} -Oops! Passwords do not match!
- -Your password was changed sucessfully. Next time you log in, use your new password.
- - - Go back to the dashboard - -Invalid value. {{explanation}}.
diff --git a/packages/sshecret-admin/src/sshecret_admin/frontend/templates/clients/field_valid.html.j2 b/packages/sshecret-admin/src/sshecret_admin/frontend/templates/clients/field_valid.html.j2 deleted file mode 100644 index dc1c977..0000000 --- a/packages/sshecret-admin/src/sshecret_admin/frontend/templates/clients/field_valid.html.j2 +++ /dev/null @@ -1 +0,0 @@ - diff --git a/packages/sshecret-admin/src/sshecret_admin/frontend/templates/clients/index.html.j2 b/packages/sshecret-admin/src/sshecret_admin/frontend/templates/clients/index.html.j2 deleted file mode 100644 index 8e94d81..0000000 --- a/packages/sshecret-admin/src/sshecret_admin/frontend/templates/clients/index.html.j2 +++ /dev/null @@ -1,33 +0,0 @@ -{% extends "/base/master-detail.html.j2" %} - -{% block master %} -| - Name - | -- ID - | -- Description - | -- Number of secrets allocated - | -- Allowed Sources - | -- Actions - | -
|---|
{{ client.description }}
- {% endif %} -| Timestamp | -Subsystem | -Message | -Origin | -
|---|---|---|---|
|
- {{ entry.timestamp }} - -
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- {% if entry.data %}
- {% for key, value in entry.data.items() %}
-
-
- {% endfor %}
- {% endif %}
- |
-
- - {{ entry.subsystem }} - | - -- {{ entry.message }} - | - -- {{ entry.origin }} - | -
| - Timestamp - | -- Subsystem - | -- Client/Username - | -- Origin - | -
|---|---|---|---|
|
-
- {{ entry.timestamp }}
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- {% if entry.data %} {% for key, value in entry.data.items()
- %}
-
-
- {% endfor %} {% endif %}
- |
-
- - {{ entry.subsystem }} - | - -- {% if entry.client_name %} {{ entry.client_name }} {% elif - entry.data.username %} {{ entry.data.username }} {% endif %} - | - -- {{ entry.origin }} - | -
No entries
- {% endif %} -| - Timestamp - | -- Subsystem - | -- Message - | -- Origin - | -
|---|---|---|---|
|
-
- {{ entry.timestamp }}
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- {% if entry.data %} {% for key, value in entry.data.items()
- %}
-
-
- {% endfor %} {% endif %}
- |
-
- - {{ entry.subsystem }} - | - -- {{ entry.message }} - | - -- {{ entry.origin }} - | -
No entries
- {% endif %} -| Client Name | -Description | -Action | -
|---|---|---|
| - {{ client.name }} - | -{{ client.description }} | -- Edit - | -
- {{ message }} -
diff --git a/packages/sshecret-admin/src/sshecret_admin/frontend/templates/fragments/ok.html b/packages/sshecret-admin/src/sshecret_admin/frontend/templates/fragments/ok.html deleted file mode 100644 index f98c54b..0000000 --- a/packages/sshecret-admin/src/sshecret_admin/frontend/templates/fragments/ok.html +++ /dev/null @@ -1,3 +0,0 @@ -- {{ message }} -
diff --git a/packages/sshecret-admin/src/sshecret_admin/frontend/templates/login.html b/packages/sshecret-admin/src/sshecret_admin/frontend/templates/login.html deleted file mode 100644 index 9ff154f..0000000 --- a/packages/sshecret-admin/src/sshecret_admin/frontend/templates/login.html +++ /dev/null @@ -1,92 +0,0 @@ -{% extends "/base/bare.html.j2" %} {% block content %} {% if login_error %} - -Secret updated.
- {% endif %} -This secret was created outside of sshecret-admin. It cannot be decrypted, and therefore fewer options are available here.
- {% endif %} -| Timestamp | -Subsystem | -Message | -Origin | -
|---|---|---|---|
|
- {{ entry.timestamp }}
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- {% if entry.data %}
- {% for key, value in entry.data.items() %}
-
-
- {% endfor %}
- {% endif %}
- |
-
- - {{ entry.subsystem }} - | - -- {{ entry.message }} - | - -- {{ entry.origin }} - | -
It worked!
-Welcome, {{ user.username }}
-{% endblock %} diff --git a/packages/sshecret-admin/src/sshecret_admin/frontend/templates/widgets/clients.html b/packages/sshecret-admin/src/sshecret_admin/frontend/templates/widgets/clients.html deleted file mode 100644 index e69de29..0000000 diff --git a/packages/sshecret-admin/src/sshecret_admin/frontend/views/__init__.py b/packages/sshecret-admin/src/sshecret_admin/frontend/views/__init__.py deleted file mode 100644 index f74c036..0000000 --- a/packages/sshecret-admin/src/sshecret_admin/frontend/views/__init__.py +++ /dev/null @@ -1 +0,0 @@ -"""Frontend views.""" diff --git a/packages/sshecret-admin/src/sshecret_admin/frontend/views/audit.py b/packages/sshecret-admin/src/sshecret_admin/frontend/views/audit.py deleted file mode 100644 index bbe2853..0000000 --- a/packages/sshecret-admin/src/sshecret_admin/frontend/views/audit.py +++ /dev/null @@ -1,91 +0,0 @@ -"""Audit view factory.""" - -# pyright: reportUnusedFunction=false -import logging -from typing import Annotated, cast -from fastapi import APIRouter, Depends, Request, Response - -from sshecret.backend import AuditFilter, Operation - -from sshecret_admin.auth import LocalUserInfo -from sshecret_admin.services import AdminBackend - -from .common import PagingInfo -from ..dependencies import FrontendDependencies - -LOG = logging.getLogger(__name__) - - -def create_router(dependencies: FrontendDependencies) -> APIRouter: - """Create clients router.""" - - app = APIRouter() - templates = dependencies.templates - - async def resolve_audit_entries( - request: Request, - current_user: LocalUserInfo, - admin: AdminBackend, - page: int, - filters: AuditFilter, - ) -> Response: - """Resolve audit entries.""" - LOG.info("Page: %r", page) - per_page = 20 - offset = 0 - if page > 1: - offset = (page - 1) * per_page - - filter_args = cast(dict[str, str], filters.model_dump(exclude_none=True)) - audit_log = await admin.get_audit_log_detailed(offset, per_page, **filter_args) - page_info = PagingInfo( - page=page, limit=per_page, total=audit_log.total, offset=offset - ) - operations = list(Operation) - breadcrumbs = [("Audit", "/audit/")] - if request.headers.get("HX-Request"): - return templates.TemplateResponse( - request, - "audit/inner.html.j2", - { - "entries": audit_log.results, - "page_info": page_info, - "operations": operations, - }, - ) - return templates.TemplateResponse( - request, - "audit/index.html.j2", - { - "page_title": "Audit Log", - "breadcrumbs": breadcrumbs, - "entries": audit_log.results, - "user": current_user, - "page_info": page_info, - "operations": operations, - }, - ) - - @app.get("/audit/") - async def get_audit_entries( - request: Request, - current_user: Annotated[LocalUserInfo, Depends(dependencies.get_user_info)], - admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], - filters: Annotated[AuditFilter, Depends()], - ) -> Response: - """Get audit entries.""" - return await resolve_audit_entries(request, current_user, admin, 1, filters) - - @app.get("/audit/page/{page}") - async def get_audit_entries_page( - request: Request, - current_user: Annotated[LocalUserInfo, Depends(dependencies.get_user_info)], - admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], - filters: Annotated[AuditFilter, Depends()], - page: int, - ) -> Response: - """Get audit entries.""" - LOG.info("Get audit entries page: %r", page) - return await resolve_audit_entries(request, current_user, admin, page, filters) - - return app diff --git a/packages/sshecret-admin/src/sshecret_admin/frontend/views/auth.py b/packages/sshecret-admin/src/sshecret_admin/frontend/views/auth.py deleted file mode 100644 index dffbfad..0000000 --- a/packages/sshecret-admin/src/sshecret_admin/frontend/views/auth.py +++ /dev/null @@ -1,218 +0,0 @@ -"""Authentication related views factory.""" - -# pyright: reportUnusedFunction=false -import logging -from pydantic import BaseModel -from typing import Annotated -from fastapi import APIRouter, Depends, Query, Request, Response, status -from fastapi.responses import RedirectResponse -from fastapi.security import OAuth2PasswordRequestForm - -from sqlalchemy.ext.asyncio import AsyncSession -from sshecret_admin.services import AdminBackend -from starlette.datastructures import URL - -from sshecret_admin.auth import ( - IdentityClaims, - authenticate_user_async, - create_access_token, - create_refresh_token, -) - -from sshecret.backend.models import Operation - -from ..dependencies import FrontendDependencies -from ..exceptions import RedirectException - -LOG = logging.getLogger(__name__) - - -class LoginError(BaseModel): - """Login error.""" - - title: str - message: str - - -class OidcLogin(BaseModel): - """Small container to hold OIDC info for the login box.""" - - enabled: bool = False - provider_name: str | None = None - - -async def audit_login_failure( - admin: AdminBackend, username: str, request: Request -) -> None: - """Write login failure to audit log.""" - origin: str | None = None - if request.client: - origin = request.client.host - await admin.write_audit_message( - operation=Operation.DENY, - message="Login failed", - origin=origin or "UNKNOWN", - username=username, - ) - - -def create_router(dependencies: FrontendDependencies) -> APIRouter: - """Create auth router.""" - - app = APIRouter() - templates = dependencies.templates - - @app.get("/login") - async def get_login( - request: Request, - login_status: Annotated[bool, Depends(dependencies.get_login_status)], - error_title: str | None = None, - error_message: str | None = None, - ): - """Get index.""" - if login_status: - return RedirectResponse("/dashboard") - login_error: LoginError | None = None - if error_title and error_message: - LOG.info("Got an error here: %s %s", error_title, error_message) - login_error = LoginError(title=error_title, message=error_message) - else: - LOG.info("Got no errors") - - oidc_login = OidcLogin() - if dependencies.settings.oidc: - oidc_login.enabled = True - oidc_login.provider_name = dependencies.settings.oidc.name - - return templates.TemplateResponse( - request, - "login.html", - { - "page_title": "Login", - "page_description": "Login page.", - "login_error": login_error, - "oidc": oidc_login, - }, - ) - - @app.post("/login") - async def login_user( - request: Request, - response: Response, - session: Annotated[AsyncSession, Depends(dependencies.get_async_session)], - admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], - form_data: Annotated[OAuth2PasswordRequestForm, Depends()], - next: Annotated[str, Query()] = "/dashboard", - error_title: str | None = None, - error_message: str | None = None, - ): - """Log in user.""" - if error_title and error_message: - login_error = LoginError(title=error_title, message=error_message) - return templates.TemplateResponse( - request, - "login.html", - { - "page_title": "Login", - "page_description": "Login page.", - "login_error": login_error, - }, - ) - - user = await authenticate_user_async( - session, form_data.username, form_data.password - ) - login_failed = RedirectException( - to=URL("/login").include_query_params( - error_title="Login Error", error_message="Invalid username or password" - ) - ) - if not user: - await audit_login_failure(admin, form_data.username, request) - raise login_failed - token_data: dict[str, str] = {"sub": user.username} - access_token = create_access_token(dependencies.settings, data=token_data) - refresh_token = create_refresh_token(dependencies.settings, data=token_data) - if next == "/refresh": - # Don't redirect from login to refresh. Send to dashboard instead. - next = "/" - - response = RedirectResponse(url=next, status_code=status.HTTP_302_FOUND) - response.set_cookie( - "access_token", - value=access_token, - httponly=True, - secure=False, - samesite="strict", - ) - response.set_cookie( - "refresh_token", - value=refresh_token, - httponly=True, - secure=False, - samesite="strict", - ) - origin = "UNKNOWN" - if request.client: - origin = request.client.host - - await admin.write_audit_message( - operation=Operation.LOGIN, - message="Logged in to admin frontend", - origin=origin, - username=form_data.username, - ) - - return response - - @app.get("/refresh") - async def get_refresh_token( - response: Response, - refresh_claims: Annotated[ - IdentityClaims, Depends(dependencies.get_refresh_claims) - ], - next: Annotated[str, Query()], - ): - """Refresh tokens. - - We might as well refresh the long-lived one here. - """ - token_data: dict[str, str] = {"sub": refresh_claims.sub} - access_token = create_access_token( - dependencies.settings, data=token_data, provider=refresh_claims.provider - ) - refresh_token = create_refresh_token( - dependencies.settings, data=token_data, provider=refresh_claims.provider - ) - response = RedirectResponse(url=next, status_code=status.HTTP_302_FOUND) - response.set_cookie( - "access_token", - value=access_token, - httponly=True, - secure=False, - samesite="strict", - ) - response.set_cookie( - "refresh_token", - value=refresh_token, - httponly=True, - secure=False, - samesite="strict", - ) - return response - - @app.get("/logout") - async def logout( - response: Response, - ): - """Log out user.""" - response = RedirectResponse(url="/login", status_code=status.HTTP_302_FOUND) - response.delete_cookie( - "refresh_token", httponly=True, secure=False, samesite="strict" - ) - response.delete_cookie( - "access_token", httponly=True, secure=False, samesite="strict" - ) - return response - - return app diff --git a/packages/sshecret-admin/src/sshecret_admin/frontend/views/clients.py b/packages/sshecret-admin/src/sshecret_admin/frontend/views/clients.py deleted file mode 100644 index 9fb8ef8..0000000 --- a/packages/sshecret-admin/src/sshecret_admin/frontend/views/clients.py +++ /dev/null @@ -1,424 +0,0 @@ -"""clients view factory.""" - -# pyright: reportUnusedFunction=false -import ipaddress -import logging -import uuid -from typing import Annotated -from fastapi import APIRouter, Depends, Form, HTTPException, Request, Response -from fastapi.responses import RedirectResponse -from pydantic import BaseModel, IPvAnyAddress, IPvAnyNetwork -from sshecret_admin.frontend.views.common import PagingInfo - -from sshecret.backend import ClientFilter -from sshecret.backend.models import Client, ClientQueryResult, FilterType -from sshecret.crypto import validate_public_key -from sshecret_admin.auth import LocalUserInfo -from sshecret_admin.services import AdminBackend - -from ..dependencies import FrontendDependencies - -LOG = logging.getLogger(__name__) - -CLIENTS_PER_PAGE = 20 -EVENTS_PER_PAGE = 20 - - -class ClientUpdate(BaseModel): - id: uuid.UUID - name: str - description: str - public_key: str - sources: str | None = None - - -class ClientCreate(BaseModel): - name: str - public_key: str - description: str | None - sources: str | None - - -class LocatedClient(BaseModel): - """A located client.""" - - client: Client - results: ClientQueryResult - pages: PagingInfo - - -async def locate_client(admin: AdminBackend, client_id: str) -> LocatedClient | None: - """Locate a client in a paginated dataset.""" - offset = 0 - page = 1 - total_clients = await admin.get_client_count() - while offset < total_clients: - filter = ClientFilter(limit=CLIENTS_PER_PAGE, offset=offset) - results = await admin.query_clients(filter) - matches = [client for client in results.clients if str(client.id) == client_id] - if matches: - client = matches[0] - pages = PagingInfo( - page=page, - limit=CLIENTS_PER_PAGE, - total=results.total_results, - offset=offset, - ) - return LocatedClient(client=client, results=results, pages=pages) - offset += CLIENTS_PER_PAGE - page += 1 - - return None - - -def create_router(dependencies: FrontendDependencies) -> APIRouter: - """Create clients router.""" - - app = APIRouter(dependencies=[Depends(dependencies.require_login)]) - - templates = dependencies.templates - - @app.get("/clients/") - async def get_test_page( - request: Request, - current_user: Annotated[LocalUserInfo, Depends(dependencies.get_user_info)], - admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], - ) -> Response: - """Get test page.""" - - - breadcrumbs = [("clients", "/clients/")] - - return templates.TemplateResponse( - request, - "admin/index.html.j2", - { - "breadcrumbs": breadcrumbs, - "page_title": "Clients", - "user": current_user, - } - ) - - # @app.get("/clients/") - # async def get_client_tree( - # request: Request, - # current_user: Annotated[LocalUserInfo, Depends(dependencies.get_user_info)], - # admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], - # ) -> Response: - # """Get client tree view.""" - # page = 1 - # per_page = CLIENTS_PER_PAGE - # offset = 0 - - # client_filter = ClientFilter(offset=offset, limit=per_page) - # results = await admin.query_clients(client_filter) - # paginate = PagingInfo( - # page=page, limit=per_page, total=results.total_results, offset=offset - # ) - - # breadcrumbs = [("clients", "/clients/")] - - # LOG.info("Results %r", results) - # return templates.TemplateResponse( - # request, - # "clients/index_new.html.j2", - # { - # "breadcrumbs": breadcrumbs, - # "page_title": "Clients", - # "offset": offset, - # "pages": paginate, - # "clients": results.clients, - # "user": current_user, - # "results": results, - # }, - # ) - - @app.get("/clients/page/{page}") - async def get_client_page( - request: Request, - current_user: Annotated[LocalUserInfo, Depends(dependencies.get_user_info)], - admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], - page: int, - ) -> Response: - """Get client tree view.""" - per_page = CLIENTS_PER_PAGE - offset = 0 - if page > 1: - offset = (page - 1) * per_page - - client_filter = ClientFilter(offset=offset, limit=per_page) - results = await admin.query_clients(client_filter) - paginate = PagingInfo( - page=page, - limit=per_page, - offset=offset, - total=results.total_results, - ) - LOG.info("Results %r", results) - template = "clients/index.html.j2" - if request.headers.get("HX-Request"): - # This is a HTMX request. - template = "clients/partials/tree.html.j2" - - return templates.TemplateResponse( - request, - template, - { - "page_title": "Clients", - "offset": offset, - "last_num": offset + per_page, - "pages": paginate, - "clients": results.clients, - "user": current_user, - "results": results, - }, - ) - - @app.get("/clients/client/{id}") - async def get_client( - request: Request, - current_user: Annotated[LocalUserInfo, Depends(dependencies.get_user_info)], - admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], - id: str, - ) -> Response: - """Fetch a client.""" - - results = await locate_client(admin, id) - - if not results: - raise HTTPException(status_code=404, detail="Client not found.") - events = await admin.get_audit_log_detailed( - limit=EVENTS_PER_PAGE, client_name=results.client.name - ) - template = "clients/client.html.j2" - - breadcrumbs = [ - ("clients", "/clients/"), - (results.client.name, request.url.path), - ] - - headers: dict[str, str] = {} - if request.headers.get("HX-Request"): - headers["HX-Push-Url"] = request.url.path - template = "clients/partials/client_details.html.j2" - - events_paging = PagingInfo( - page=1, limit=EVENTS_PER_PAGE, total=events.total, offset=0 - ) - return templates.TemplateResponse( - request, - template, - { - "page_title": f"Client {results.client.name}", - "breadcrumbs": breadcrumbs, - "pages": results.pages, - "clients": results.results.clients, - "client": results.client, - "user": current_user, - "results": results.results, - "events": events, - "events_paging": events_paging, - }, - headers=headers, - ) - - @app.get("/clients/client/{id}/events/{page}") - async def get_client_events( - request: Request, - admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], - id: str, - page: int, - ) -> Response: - """Get more events for a client.""" - if "HX-Request" not in request.headers: - return RedirectResponse(url=f"/clients/client/{id}") - - client = await admin.get_client(("id", id)) - if not client: - raise HTTPException(status_code=404, detail="Client not found.") - offset = 0 - if page > 1: - offset = (page - 1) * EVENTS_PER_PAGE - events = await admin.get_audit_log_detailed( - limit=EVENTS_PER_PAGE, client_name=client.name, offset=offset - ) - - events_paging = PagingInfo( - page=page, limit=EVENTS_PER_PAGE, total=events.total, offset=offset - ) - return templates.TemplateResponse( - request, - "clients/partials/client_events.html.j2", - { - "events": events, - "client": client, - "events_paging": events_paging, - }, - ) - - @app.put("/clients/{id}") - async def update_client( - request: Request, - id: str, - admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], - client: Annotated[ClientUpdate, Form()], - ): - """Update a client.""" - original_client = await admin.get_client(("id", id)) - if not original_client: - return templates.TemplateResponse( - request, "fragments/error.html", {"message": "Client not found"} - ) - - sources: list[IPvAnyAddress | IPvAnyNetwork] = [] - if client.sources: - source_str = client.sources.split(",") - for source in source_str: - if "/" in source: - sources.append(ipaddress.ip_network(source.strip())) - else: - sources.append(ipaddress.ip_address(source.strip())) - client_fields = client.model_dump(exclude_unset=True) - - del client_fields["sources"] - if sources: - client_fields["policies"] = sources - - LOG.info("Fields: %r", client_fields) - updated_client = original_client.model_copy(update=client_fields) - - final_client = await admin.update_client(updated_client) - - events = await admin.get_audit_log_detailed( - limit=EVENTS_PER_PAGE, client_name=client.name - ) - - events_paging = PagingInfo( - page=1, limit=EVENTS_PER_PAGE, total=events.total, offset=0 - ) - - return templates.TemplateResponse( - request, - "clients/partials/client_details.html.j2", - { - "client": final_client, - "events": events, - "events_paging": events_paging, - }, - ) - - @app.post("/clients/") - async def create_client( - request: Request, - admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], - client: Annotated[ClientCreate, Form()], - ) -> Response: - """Create client.""" - sources: list[str] | None = None - if client.sources: - sources = [source.strip() for source in client.sources.split(",")] - await admin.create_client( - name=client.name, - public_key=client.public_key, - description=client.description, - sources=sources, - ) - - headers = {"Hx-Refresh": "true"} - return Response( - headers=headers, - ) - - @app.delete("/clients/{id}") - async def delete_client( - id: str, - admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], - ) -> Response: - """Delete a client.""" - await admin.delete_client(("id", id)) - headers = {"Hx-Refresh": "true"} - return Response(headers=headers) - - @app.post("/clients/validate/source") - async def validate_client_source( - request: Request, - sources: Annotated[str, Form()], - ) -> Response: - """Validate source.""" - source_str = sources.split(",") - for source in source_str: - if "/" in source: - try: - _network = ipaddress.ip_network(source.strip()) - except Exception: - return templates.TemplateResponse( - request, - "/clients/field_invalid.html.j2", - {"explanation": f"Invalid network {source.strip()}"}, - ) - else: - try: - _address = ipaddress.ip_address(source.strip()) - except Exception: - return templates.TemplateResponse( - request, - "/clients/field_invalid.html.j2", - {"explanation": f"Invalid address {source.strip()}"}, - ) - return templates.TemplateResponse( - request, - "/clients/field_valid.html.j2", - ) - - @app.post("/clients/validate/public_key") - async def validate_client_public_key( - request: Request, - public_key: Annotated[str, Form()], - ) -> Response: - """Validate source.""" - if validate_public_key(public_key.rstrip()): - return templates.TemplateResponse( - request, - "/clients/field_valid.html.j2", - ) - return templates.TemplateResponse( - request, - "/clients/field_invalid.html.j2", - {"explanation": "Invalid value. Not a valid SSH RSA Public Key."}, - ) - - @app.post("/clients/query") - async def query_clients( - request: Request, - admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], - query: Annotated[str, Form()], - ) -> Response: - """Query for a client.""" - query_filter = ClientFilter(limit=CLIENTS_PER_PAGE) - if query: - name = f"%{query}%" - query_filter = ClientFilter( - name=name, filter_name=FilterType.LIKE, limit=CLIENTS_PER_PAGE - ) - results = await admin.query_clients(query_filter) - pages: PagingInfo | None = None - if not query: - pages = PagingInfo( - page=1, limit=CLIENTS_PER_PAGE, offset=0, total=results.total_results - ) - - more_results: int | None = None - if query and results.total_results > CLIENTS_PER_PAGE: - more_results = results.total_results - CLIENTS_PER_PAGE - return templates.TemplateResponse( - request, - "clients/partials/tree_items.html.j2", - { - "clients": results.clients, - "pages": pages, - "results": results, - "more_results": more_results, - }, - ) - - return app diff --git a/packages/sshecret-admin/src/sshecret_admin/frontend/views/common.py b/packages/sshecret-admin/src/sshecret_admin/frontend/views/common.py deleted file mode 100644 index d4ed3d4..0000000 --- a/packages/sshecret-admin/src/sshecret_admin/frontend/views/common.py +++ /dev/null @@ -1,41 +0,0 @@ -"""Common utilities.""" - -import math -from pydantic import BaseModel - - -class PagingInfo(BaseModel): - page: int - limit: int - total: int - offset: int = 0 - - @property - def first(self) -> int: - """The first result number.""" - return self.offset + 1 - - @property - def last(self) -> int: - """Return the last result number.""" - return self.offset + self.limit - - @property - def total_pages(self) -> int: - """Return total pages.""" - return math.ceil(self.total / self.limit) - - @property - def pages(self) -> list[int]: - """Return all page numbers.""" - return [page for page in range(1, self.total_pages + 1)] - - @property - def is_last(self) -> bool: - """Is this the last page?""" - return self.page == self.total_pages - - @property - def is_first(self) -> bool: - """Is this the first page?""" - return self.page == 1 diff --git a/packages/sshecret-admin/src/sshecret_admin/frontend/views/index.py b/packages/sshecret-admin/src/sshecret_admin/frontend/views/index.py deleted file mode 100644 index b4f3490..0000000 --- a/packages/sshecret-admin/src/sshecret_admin/frontend/views/index.py +++ /dev/null @@ -1,190 +0,0 @@ -"""Front page view factory.""" - -# pyright: reportUnusedFunction=false -import logging -from typing import Annotated -from fastapi import APIRouter, Depends, Form, Request, Response -from fastapi.responses import RedirectResponse -from pydantic import BaseModel -from sqlalchemy.ext.asyncio import AsyncSession -from sshecret_admin.auth import LocalUserInfo, authenticate_user_async -from sshecret_admin.auth.authentication import hash_password -from sshecret_admin.frontend.exceptions import RedirectException -from sshecret_admin.services import AdminBackend -from starlette.datastructures import URL - -from sshecret.backend.models import Operation - -from ..dependencies import FrontendDependencies - -LOG = logging.getLogger(__name__) - -START_PAGE = "/dashboard" -LOGIN_PAGE = "/login" - - -class StatsView(BaseModel): - """Stats for the frontend.""" - - clients: int = 0 - secrets: int = 0 - audit_events: int = 0 - - -class PasswordChangeForm(BaseModel): - """Password change form data.""" - - current_password: str - password: str - confirm_password: str - - -async def get_stats(admin: AdminBackend) -> StatsView: - """Get stats for the frontpage.""" - clients = await admin.get_clients() - secrets = await admin.get_secrets() - audit = await admin.get_audit_log_count() - return StatsView(clients=len(clients), secrets=len(secrets), audit_events=audit) - - -def create_router(dependencies: FrontendDependencies) -> APIRouter: - """Create auth router.""" - - app = APIRouter() - templates = dependencies.templates - - @app.get("/") - def get_index(logged_in: Annotated[bool, Depends(dependencies.get_login_status)]): - """Get the index.""" - next = LOGIN_PAGE - if logged_in: - next = START_PAGE - - return RedirectResponse(url=next) - - @app.get("/dashboard") - async def get_dashboard( - request: Request, - current_user: Annotated[LocalUserInfo, Depends(dependencies.get_user_info)], - admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], - ): - """Dashboard for mocking up the dashboard.""" - stats = await get_stats(admin) - last_login_events = await admin.get_audit_log_detailed( - limit=5, operation="login" - ) - last_audit_events = await admin.get_audit_log_detailed(limit=10) - - LOG.info("CurrentUser: %r", current_user) - - return templates.TemplateResponse( - request, - "dashboard.html", - { - "page_title": "Dashboard", - "user": current_user, - "stats": stats, - "last_login_events": last_login_events, - "last_audit_events": last_audit_events, - }, - ) - - @app.get("/password") - async def get_change_password( - request: Request, - current_user: Annotated[LocalUserInfo, Depends(dependencies.get_user_info)], - ): - """Render Change password site.""" - if not current_user.local: - LOG.debug("User tried to change password, but is not a local user.") - return RedirectException(to=URL("/")) - - return templates.TemplateResponse( - request, - "change_password/index.html.j2", - { - "page_title": "Change Password", - "user": current_user, - "errors": [], - }, - ) - - @app.post("/password") - async def change_password( - request: Request, - current_user: Annotated[LocalUserInfo, Depends(dependencies.get_user_info)], - admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], - session: Annotated[AsyncSession, Depends(dependencies.get_async_session)], - passwd_form: Annotated[PasswordChangeForm, Form()], - ): - """Change password.""" - errors: list[str] = [] - user = await authenticate_user_async( - session, current_user.display_name, passwd_form.current_password - ) - new_password_matches = passwd_form.password == passwd_form.confirm_password - if not user: - errors.append("Invalid current password entered") - if not new_password_matches: - errors.append("Passwords do not match") - - if errors: - return templates.TemplateResponse( - request, - "change_password/index.html.j2", - { - "page_title": "Change Password", - "user": current_user, - "errors": errors, - }, - ) - - assert user is not None - new_password_hash = hash_password(passwd_form.password) - user.hashed_password = new_password_hash - session.add(user) - await session.commit() - origin = "UNKNOWN" - if request.client: - origin = request.client.host - await admin.write_audit_message( - Operation.UPDATE, - "User changed their password", - origin, - username=user.username, - ) - - return templates.TemplateResponse( - request, - "change_password/success.html.j2", - { - "page_title": "Change Password success", - "user": current_user, - }, - ) - - @app.post("/password/validate-confirm") - async def validate_password_match( - request: Request, - password: Annotated[str, Form()], - confirm_password: Annotated[str, Form()], - ): - """Validate password matches.""" - valid = "/change_password/valid_password.html.j2" - invalid = "/change_password/invalid_password.html.j2" - template = valid - if password != confirm_password: - template = invalid - - LOG.info("Password matches: %r", (password == confirm_password)) - - return templates.TemplateResponse( - request, - template, - { - "password": password, - "confirm_password": confirm_password, - }, - ) - - return app diff --git a/packages/sshecret-admin/src/sshecret_admin/frontend/views/oidc_auth.py b/packages/sshecret-admin/src/sshecret_admin/frontend/views/oidc_auth.py deleted file mode 100644 index 268810d..0000000 --- a/packages/sshecret-admin/src/sshecret_admin/frontend/views/oidc_auth.py +++ /dev/null @@ -1,142 +0,0 @@ -"""Optional OIDC auth module.""" - -# pyright: reportUnusedFunction=false -import logging -from datetime import datetime -from typing import Annotated -from fastapi import APIRouter, Depends, Request -from fastapi.responses import HTMLResponse, RedirectResponse -from pydantic import ValidationError -from sqlalchemy.ext.asyncio import AsyncSession -from sshecret_admin.auth import create_access_token, create_refresh_token -from sshecret_admin.auth.authentication import generate_user_info, handle_oidc_claim -from sshecret_admin.auth.exceptions import AuthenticationFailedError -from sshecret_admin.auth.oidc import AdminOidc -from sshecret_admin.frontend.exceptions import RedirectException -from sshecret_admin.services import AdminBackend -from starlette.datastructures import URL - -from sshecret.backend.models import Operation - -from ..dependencies import FrontendDependencies - -LOG = logging.getLogger(__name__) - - -async def audit_login_failure( - admin: AdminBackend, - error_message: str, - request: Request, -) -> None: - """Write login failure to audit log.""" - origin: str | None = None - if request.client: - origin = request.client.host - await admin.write_audit_message( - operation=Operation.DENY, - message="Login failed", - origin=origin or "UNKNOWN", - provider_error_message=error_message, - ) - - -def create_router(dependencies: FrontendDependencies) -> APIRouter: - """Create auth router.""" - - app = APIRouter() - - def get_oidc_client() -> AdminOidc: - """Get OIDC client dependency.""" - if not dependencies.settings.oidc: - raise RuntimeError("OIDC authentication not configured.") - oidc = AdminOidc(dependencies.settings.oidc) - return oidc - - @app.get("/oidc/login") - async def oidc_login( - request: Request, oidc: Annotated[AdminOidc, Depends(get_oidc_client)] - ) -> RedirectResponse: - """Redirect to oidc login.""" - redirect_url = request.url_for("oidc_auth") - return await oidc.start_auth(request, redirect_url) - - @app.get("/oidc/auth") - async def oidc_auth( - request: Request, - session: Annotated[AsyncSession, Depends(dependencies.get_async_session)], - admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], - oidc: Annotated[AdminOidc, Depends(get_oidc_client)], - ): - """Handle OIDC auth callback.""" - try: - claims = await oidc.handle_auth_callback(request) - except AuthenticationFailedError as error: - raise RedirectException( - to=URL("/login").include_query_params( - error_title="Login error from external provider", - error_message=str(error), - ) - ) - except ValidationError as error: - LOG.error("Validation error: %s", error, exc_info=True) - raise RedirectException( - to=URL("/login").include_query_params( - error_title="Error parsing claim", - error_message="One or more required parameters were not included in the claim.", - ) - ) - - # We now have a IdentityClaims object. - # We need to check if this matches an existing user, or we need to create a new one. - - user = await handle_oidc_claim(session, claims) - user.last_login = datetime.now() - session.add(user) - await session.commit() - # Set cookies - token_data: dict[str, str] = {"sub": claims.sub} - access_token = create_access_token( - dependencies.settings, data=token_data, provider=claims.provider - ) - refresh_token = create_refresh_token( - dependencies.settings, data=token_data, provider=claims.provider - ) - user_info = generate_user_info(user) - response = HTMLResponse(""" - - -Login successful. Redirecting...
- - - -""") - response.set_cookie( - "access_token", - value=access_token, - httponly=True, - secure=False, - samesite="strict", - ) - response.set_cookie( - "refresh_token", - value=refresh_token, - httponly=True, - secure=False, - samesite="strict", - ) - origin = "UNKNOWN" - if request.client: - origin = request.client.host - await admin.write_audit_message( - operation=Operation.LOGIN, - message="Logged in to admin frontend", - origin=origin, - username=user_info.display_name, - oidc=claims.provider, - ) - - return response - - return app diff --git a/packages/sshecret-admin/src/sshecret_admin/frontend/views/secrets.py b/packages/sshecret-admin/src/sshecret_admin/frontend/views/secrets.py deleted file mode 100644 index 8394829..0000000 --- a/packages/sshecret-admin/src/sshecret_admin/frontend/views/secrets.py +++ /dev/null @@ -1,571 +0,0 @@ -"""Secrets views.""" - -# pyright: reportUnusedFunction=false -import os -import logging -import secrets as pysecrets -from typing import Annotated, Any -from fastapi import APIRouter, Depends, Form, HTTPException, Request, status -from pydantic import BaseModel, BeforeValidator, Field - -from sshecret_admin.auth import LocalUserInfo -from sshecret_admin.services import AdminBackend -from sshecret_admin.services.models import SecretGroupCreate - -from sshecret.backend.models import Operation - -from ..dependencies import FrontendDependencies - -LOG = logging.getLogger(__name__) - - -def split_clients(clients: Any) -> Any: # pyright: ignore[reportAny] - """Split clients.""" - if isinstance(clients, list): - return clients # pyright: ignore[reportUnknownVariableType] - if not isinstance(clients, str): - raise ValueError("Invalid type for clients.") - if not clients: - return [] - return [client.rstrip() for client in clients.split(",")] - - -def handle_select_bool(value: Any) -> Any: # pyright: ignore[reportAny] - """Handle boolean from select.""" - if isinstance(value, bool): - return value - if value == "on": - return True - if value == "off": - return False - - -class CreateSecret(BaseModel): - """Create secret model.""" - - name: str - value: str | None = None - auto_generate: Annotated[bool, BeforeValidator(handle_select_bool)] = False - clients: Annotated[list[str], BeforeValidator(split_clients)] = Field( - default_factory=list - ) - - -def create_router(dependencies: FrontendDependencies) -> APIRouter: - """Create secrets router.""" - - app = APIRouter(dependencies=[Depends(dependencies.require_login)]) - templates = dependencies.templates - - @app.get("/secrets/") - async def get_secrets_tree( - request: Request, - admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], - current_user: Annotated[LocalUserInfo, Depends(dependencies.get_user_info)], - ): - breadcrumbs = [("secrets", "/secrets/")] - groups = await admin.get_secret_groups() - return templates.TemplateResponse( - request, - "secrets/index.html.j2", - { - "page_title": "Secrets", - "groups": groups, - "breadcrumbs": breadcrumbs, - "user": current_user, - "selected_group": None, - "group_path_nodes": ["/"], - }, - ) - - @app.get("/secrets/group/") - async def show_root_group( - request: Request, - admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], - current_user: Annotated[LocalUserInfo, Depends(dependencies.get_user_info)], - ): - """Show the root path.""" - clients = await admin.get_clients() - - breadcrumbs = [ - ("secrets", "/secrets/"), - ("groups", "/secrets/groups/"), - ("Ungrouped", "/secrets/groups/"), - ] - context: dict[str, Any] = { - "clients": clients, - "breadcrumbs": breadcrumbs, - "root_group_page": True, - "mobile_show_details": True, - } - headers: dict[str, str] = {} - if request.headers.get("HX-Request"): - # This is a HTMX request. - template_name = "secrets/partials/edit_root.html.j2" - headers["HX-Push-Url"] = request.url.path - else: - groups = await admin.get_secret_groups() - template_name = "secrets/index.html.j2" - context["page_title"] = "Secrets" - context["user"] = current_user - context["groups"] = groups - context["group_path_nodes"] = ["/"] - context["selected_group"] = "/" - - return templates.TemplateResponse( - request, template_name, context, headers=headers - ) - - @app.get("/secrets/group/{group_path:path}") - async def show_group( - request: Request, - group_path: str, - admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], - current_user: Annotated[LocalUserInfo, Depends(dependencies.get_user_info)], - ): - """Show a group.""" - group = await admin.get_secret_group_by_path(group_path) - if not group: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, detail="Group not found" - ) - clients = await admin.get_clients() - - breadcrumbs = [("secrets", "/secrets/"), ("groups", "/secrets/groups/")] - path_nodes = group.path.split("/") - for x in range(len(path_nodes)): - next_node = x + 1 - group_path = "/".join(path_nodes[:next_node]) - crumb_path = os.path.join("/secrets", group_path) - breadcrumbs.append((path_nodes[x], crumb_path)) - - headers: dict[str, str] = {} - context: dict[str, Any] = { - "group_page": True, - "group": group, - "clients": clients, - "breadcrumbs": breadcrumbs, - "mobile_show_details": True, - } - if request.headers.get("HX-Request"): - # This is a HTMX request. - template_name = "secrets/partials/group_detail.html.j2" - headers["HX-Push-Url"] = request.url.path - else: - template_name = "secrets/index.html.j2" - - groups = await admin.get_secret_groups() - context["page_title"] = "Secrets" - context["user"] = current_user - context["groups"] = groups - context["group_path_nodes"] = group.path.split("/") - context["selected_group"] = group.path - - return templates.TemplateResponse( - request, template_name, context, headers=headers - ) - - @app.get("/secrets/secret/{name}") - async def get_secret_tree_detail( - request: Request, - name: str, - admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], - current_user: Annotated[LocalUserInfo, Depends(dependencies.get_user_info)], - ): - """Get secret detail.""" - secret = await admin.get_secret(name) - groups = await admin.get_secret_groups() - flat_groups = await admin.get_secret_groups(flat=True) - events = await admin.get_audit_log_detailed(limit=10, secret_name=name) - - if not secret: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, detail="Secret not found" - ) - - context: dict[str, Any] = { - "secret": secret, - "groups": groups, - "flat_groups": flat_groups, - "events": events, - "secret_page": True, - "mobile_show_details": True, - } - - headers: dict[str, str] = {} - - if request.headers.get("HX-Request"): - # This is a HTMX request. - template_name = "secrets/partials/tree_detail.html.j2" - headers["HX-Push-Url"] = request.url.path - else: - group_path = ["/"] - if secret.group: - group = await admin.get_secret_group(secret.group) - if group: - group_path = group.path.split("/") - - template_name = "secrets/index.html.j2" - context["page_title"] = "Secrets" - context["user"] = current_user - context["groups"] = groups - context["group_path_nodes"] = group_path - context["selected_group"] = None - - return templates.TemplateResponse( - request, template_name, context, headers=headers - ) - - @app.delete("/secrets/group/{name}") - async def delete_secret_group( - request: Request, - name: str, - admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], - ): - """Delete a secret group.""" - group = await admin.get_secret_group(name) - if not group: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, detail="Group not found" - ) - - await admin.delete_secret_group(name) - - new_path = "/secrets/group/" - if group.parent_group: - new_path = os.path.join(new_path, group.parent_group.path) - headers = {"Hx-Redirect": new_path} - - return templates.TemplateResponse( - request, - "secrets/partials/redirect.html.j2", - {"destination": new_path}, - headers=headers, - ) - - @app.post("/secrets/group/") - async def create_group( - request: Request, - admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], - group: Annotated[SecretGroupCreate, Form()], - ): - """Create group.""" - - LOG.info("Creating secret group: %r", group) - await admin.add_secret_group( - group_name=group.name, - description=group.description, - parent_group=group.parent_group, - ) - - headers = {"Hx-Refresh": "true"} - - return templates.TemplateResponse( - request, - "secrets/partials/default_detail.html.j2", - headers=headers, - ) - - @app.put("/secrets/set-group/{name}") - async def set_secret_group( - request: Request, - name: str, - admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], - group_name: Annotated[str, Form()], - ): - """Move a secret to a group.""" - secret = await admin.get_secret(name) - if not secret: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, detail="Secret not found" - ) - - if group_name == "__ROOT": - await admin.set_secret_group(name, None) - - else: - group = await admin.get_secret_group(group_name) - if not group: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, detail="Group not found" - ) - await admin.set_secret_group(name, group_name) - - groups = await admin.get_secret_groups() - events = await admin.get_audit_log_detailed(limit=10, secret_name=secret.name) - - secret = await admin.get_secret(name) - - headers = {"Hx-Refresh": "true"} - - return templates.TemplateResponse( - request, - "secrets/partials/tree_detail.html.j2", - { - "secret": secret, - "groups": groups, - "events": events, - }, - headers=headers, - ) - - @app.put("/secrets/partial/group/{name}/description") - async def update_group_description( - request: Request, - name: str, - admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], - description: Annotated[str, Form()], - ): - """Update group description.""" - group = await admin.get_secret_group(name) - - if not group: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, detail="Group not found" - ) - await admin.set_group_description(group_name=name, description=description) - clients = await admin.get_clients() - headers = {"Hx-Refresh": "true"} - return templates.TemplateResponse( - request, - "secrets/partials/group_detail.html.j2", - { - "group": group, - "clients": clients, - }, - headers=headers, - ) - - @app.put("/secrets/partial/secret/{name}/value") - async def update_secret_value_inline( - request: Request, - name: str, - secret_value: Annotated[str, Form()], - current_user: Annotated[LocalUserInfo, Depends(dependencies.get_user_info)], - admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], - ): - """Update secret value.""" - secret = await admin.get_secret(name) - if not secret: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, detail="Secret not found" - ) - - origin = "UNKNOWN" - if request.client: - origin = request.client.host - - await admin.write_audit_message( - operation=Operation.UPDATE, - message="Secret was updated via admin interface", - secret_name=name, - origin=origin, - username=current_user.display_name, - ) - - await admin.update_secret(name, secret_value) - - secret = await admin.get_secret(name) - - return templates.TemplateResponse( - request, - "secrets/partials/secret_value.html.j2", - { - "secret": secret, - "updated": True, - }, - ) - - @app.get("/secrets/partial/{name}/viewsecret") - async def view_secret_in_tree( - request: Request, - name: str, - admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], - current_user: Annotated[LocalUserInfo, Depends(dependencies.get_user_info)], - ): - """View secret inline partial.""" - secret = await admin.get_secret(name) - - if not secret: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, detail="Secret not found" - ) - origin = "UNKNOWN" - if request.client: - origin = request.client.host - await admin.write_audit_message( - operation=Operation.READ, - message="Secret viewed", - secret_name=name, - origin=origin, - username=current_user.display_name, - ) - - return templates.TemplateResponse( - request, - "secrets/partials/secret_value.html.j2", - { - "secret": secret, - "updated": False, - }, - ) - - @app.post("/secrets/create/group/{name}") - async def add_secret_in_group( - request: Request, - name: str, - admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], - secret: Annotated[CreateSecret, Form()], - ): - """Create secret in group.""" - if secret.value: - value = secret.value - else: - value = pysecrets.token_urlsafe(32) - - await admin.add_secret(secret.name, value, secret.clients, group=name) - - new_path = f"/secrets/secret/{secret.name}" - - headers = {"Hx-Redirect": new_path} - - return templates.TemplateResponse( - request, - "secrets/partials/redirect.html.j2", - {"destination": new_path}, - headers=headers, - ) - - @app.post("/secrets/create/root") - async def add_secret_in_root( - request: Request, - admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], - secret: Annotated[CreateSecret, Form()], - ): - """Create secret in the root.""" - LOG.info("secret: %s", secret.model_dump_json(indent=2)) - if secret.value: - value = secret.value - else: - value = pysecrets.token_urlsafe(32) - - await admin.add_secret(secret.name, value, secret.clients, group=None) - - new_path = f"/secrets/secret/{secret.name}" - - headers = {"Hx-Redirect": new_path} - - return templates.TemplateResponse( - request, - "secrets/partials/redirect.html.j2", - { - "destination": new_path, - }, - headers=headers, - ) - - @app.delete("/secrets/{name}/clients/{client_name}") - async def remove_client_secret_access( - request: Request, - name: str, - client_name: str, - admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], - ): - """Remove a client's access to a secret.""" - client = await admin.get_client(client_name) - if not client: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, detail="Client not found." - ) - - await admin.delete_client_secret(str(client.id), name) - clients = await admin.get_clients() - - secret = await admin.get_secret(name) - if not secret: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, detail="Secret not found." - ) - - return templates.TemplateResponse( - request, - "secrets/partials/client_list_inner.html.j2", - {"clients": clients, "secret": secret}, - ) - - @app.get("/secrets/{name}/clients/") - async def show_secret_client_add( - request: Request, - name: str, - admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], - ): - """Show partial to add new client to a secret.""" - clients = await admin.get_clients() - secret = await admin.get_secret(name) - if not secret: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, detail="Secret not found." - ) - - return templates.TemplateResponse( - request, - "secrets/partials/client_assign.html.j2", - { - "clients": clients, - "secret": secret, - }, - ) - - @app.post("/secrets/{name}/clients/") - async def add_secret_to_client( - request: Request, - name: str, - client: Annotated[str, Form()], - admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], - ): - """Add a secret to a client.""" - await admin.create_client_secret(("id", client), name) - secret = await admin.get_secret(name) - if not secret: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, detail="Secret not found." - ) - clients = await admin.get_clients() - - return templates.TemplateResponse( - request, - "secrets/partials/client_secret_details.html.j2", - { - "secret": secret, - "clients": clients, - }, - ) - - @app.delete("/secrets/{name}") - async def delete_secret( - request: Request, - name: str, - admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], - ): - """Delete a secret.""" - secret = await admin.get_secret(name) - if not secret: - raise HTTPException(status_code=404, detail="Secret not found") - new_path = "/secrets/group/" - if secret.group: - secret_group = await admin.get_secret_group(secret.group) - if secret_group: - new_path = os.path.join("/secrets/group", secret_group.path) - - await admin.delete_secret(name) - headers = {"Hx-Redirect": new_path} - # headers["HX-Push-Url"] = request.url.path - - return templates.TemplateResponse( - request, - "secrets/partials/redirect.html.j2", - {"destination": new_path}, - headers=headers, - ) - - return app