From 0a427b6a91f1c5ead00ec4d3085ac6d038cf03bc Mon Sep 17 00:00:00 2001 From: Allan Eising Date: Sat, 10 May 2025 08:28:15 +0200 Subject: [PATCH] Complete admin package restructuring --- packages/sshecret-admin/pyproject.toml | 7 +- .../src/sshecret_admin/admin_api.py | 284 ------------------ .../src/sshecret_admin/api/__init__.py | 5 + .../sshecret_admin/api/endpoints/__init__.py | 1 + .../src/sshecret_admin/api/endpoints/auth.py | 39 +++ .../sshecret_admin/api/endpoints/clients.py | 124 ++++++++ .../sshecret_admin/api/endpoints/secrets.py | 70 +++++ .../src/sshecret_admin/api/router.py | 78 +++++ .../src/sshecret_admin/auth/__init__.py | 24 ++ .../src/sshecret_admin/auth/authentication.py | 95 ++++++ .../src/sshecret_admin/auth/exceptions.py | 30 ++ .../src/sshecret_admin/auth/models.py | 71 +++++ .../src/sshecret_admin/auth_models.py | 125 -------- .../widgets/clients.html => core/__init__.py} | 0 .../src/sshecret_admin/{ => core}/app.py | 50 ++- .../src/sshecret_admin/{ => core}/cli.py | 27 +- .../src/sshecret_admin/{ => core}/db.py | 0 .../src/sshecret_admin/core/dependencies.py | 37 +++ .../src/sshecret_admin/{ => core}/main.py | 1 - .../src/sshecret_admin/{ => core}/settings.py | 12 +- .../src/sshecret_admin/frontend.py | 240 --------------- .../src/sshecret_admin/frontend/__init__.py | 5 + .../src/sshecret_admin/frontend/auth.py | 7 + .../sshecret_admin/frontend/dependencies.py | 48 +++ .../src/sshecret_admin/frontend/exceptions.py | 13 + .../src/sshecret_admin/frontend/router.py | 133 ++++++++ .../templates/audit/entry.html.j2 | 0 .../templates/audit/index.html.j2 | 0 .../templates/audit/inner.html.j2 | 0 .../templates/audit/inner_save.html.j2 | 0 .../templates/audit/pagination.html.j2 | 0 .../templates/clients/client.html.j2 | 0 .../clients/drawer_client_create.html.j2 | 0 .../clients/drawer_client_delete.html.j2 | 0 .../clients/drawer_client_update.html.j2 | 0 .../templates/clients/dynamic.html.j2 | 0 .../templates/clients/field_invalid.html.j2 | 0 .../templates/clients/field_valid.html.j2 | 0 .../templates/clients/index.html.j2 | 0 .../templates/clients/inner.html.j2 | 0 .../frontend/templates/dashboard.html | 89 ++++++ .../templates/dashboard/_base.html | 0 .../templates/dashboard/_favicons.html | 0 .../templates/dashboard/_header.html | 0 .../templates/dashboard/_scripts.html | 0 .../templates/dashboard/_stylesheet.html | 0 .../templates/dashboard/index.html.j2 | 0 .../templates/dashboard/navbar.html | 0 .../templates/dashboard/sidebar.html | 0 .../templates/dashboard_old.html | 0 .../templates/fragments/error.html | 0 .../templates/fragments/ok.html | 0 .../{ => frontend}/templates/login.html | 0 .../templates/secrets/client_options.html.j2 | 0 .../secrets/drawer_secret_create.html.j2 | 0 .../templates/secrets/index.html.j2 | 0 .../templates/secrets/inner.html.j2 | 0 .../secrets/modal_client_secret.html.j2 | 0 .../templates/secrets/secret.html.j2 | 0 .../templates/shared/_base.html | 0 .../templates/shared/_dashboard.html | 0 .../templates/shared/_dashboard_save.html | 0 .../{ => frontend}/templates/success.html | 0 .../frontend/templates/widgets/clients.html | 0 .../sshecret_admin/frontend/views/__init__.py | 1 + .../{ => frontend}/views/audit.py | 58 ++-- .../src/sshecret_admin/frontend/views/auth.py | 143 +++++++++ .../{ => frontend}/views/clients.py | 72 +++-- .../sshecret_admin/frontend/views/index.py | 70 +++++ .../{ => frontend}/views/secrets.py | 61 ++-- .../src/sshecret_admin/services/__init__.py | 8 + .../{ => services}/admin_backend.py | 42 ++- .../sshecret_admin/{ => services}/keepass.py | 2 +- .../{ => services}/master_password.py | 2 +- .../{view_models.py => services/models.py} | 3 +- .../sshecret_admin/templates/dashboard.html | 10 - .../src/sshecret_admin/testing.py | 3 +- .../src/sshecret_admin/types.py | 6 - .../src/sshecret_admin/views/__init__.py | 5 - packages/sshecret-admin/static/index.html | 24 -- 80 files changed, 1282 insertions(+), 843 deletions(-) delete mode 100644 packages/sshecret-admin/src/sshecret_admin/admin_api.py create mode 100644 packages/sshecret-admin/src/sshecret_admin/api/__init__.py create mode 100644 packages/sshecret-admin/src/sshecret_admin/api/endpoints/__init__.py create mode 100644 packages/sshecret-admin/src/sshecret_admin/api/endpoints/auth.py create mode 100644 packages/sshecret-admin/src/sshecret_admin/api/endpoints/clients.py create mode 100644 packages/sshecret-admin/src/sshecret_admin/api/endpoints/secrets.py create mode 100644 packages/sshecret-admin/src/sshecret_admin/api/router.py create mode 100644 packages/sshecret-admin/src/sshecret_admin/auth/__init__.py create mode 100644 packages/sshecret-admin/src/sshecret_admin/auth/authentication.py create mode 100644 packages/sshecret-admin/src/sshecret_admin/auth/exceptions.py create mode 100644 packages/sshecret-admin/src/sshecret_admin/auth/models.py delete mode 100644 packages/sshecret-admin/src/sshecret_admin/auth_models.py rename packages/sshecret-admin/src/sshecret_admin/{templates/widgets/clients.html => core/__init__.py} (100%) rename packages/sshecret-admin/src/sshecret_admin/{ => core}/app.py (68%) rename packages/sshecret-admin/src/sshecret_admin/{ => core}/cli.py (85%) rename packages/sshecret-admin/src/sshecret_admin/{ => core}/db.py (100%) create mode 100644 packages/sshecret-admin/src/sshecret_admin/core/dependencies.py rename packages/sshecret-admin/src/sshecret_admin/{ => core}/main.py (97%) rename packages/sshecret-admin/src/sshecret_admin/{ => core}/settings.py (64%) delete mode 100644 packages/sshecret-admin/src/sshecret_admin/frontend.py create mode 100644 packages/sshecret-admin/src/sshecret_admin/frontend/__init__.py create mode 100644 packages/sshecret-admin/src/sshecret_admin/frontend/auth.py create mode 100644 packages/sshecret-admin/src/sshecret_admin/frontend/dependencies.py create mode 100644 packages/sshecret-admin/src/sshecret_admin/frontend/exceptions.py create mode 100644 packages/sshecret-admin/src/sshecret_admin/frontend/router.py rename packages/sshecret-admin/src/sshecret_admin/{ => frontend}/templates/audit/entry.html.j2 (100%) rename packages/sshecret-admin/src/sshecret_admin/{ => frontend}/templates/audit/index.html.j2 (100%) rename packages/sshecret-admin/src/sshecret_admin/{ => frontend}/templates/audit/inner.html.j2 (100%) rename packages/sshecret-admin/src/sshecret_admin/{ => frontend}/templates/audit/inner_save.html.j2 (100%) rename packages/sshecret-admin/src/sshecret_admin/{ => frontend}/templates/audit/pagination.html.j2 (100%) rename packages/sshecret-admin/src/sshecret_admin/{ => frontend}/templates/clients/client.html.j2 (100%) rename packages/sshecret-admin/src/sshecret_admin/{ => frontend}/templates/clients/drawer_client_create.html.j2 (100%) rename packages/sshecret-admin/src/sshecret_admin/{ => frontend}/templates/clients/drawer_client_delete.html.j2 (100%) rename packages/sshecret-admin/src/sshecret_admin/{ => frontend}/templates/clients/drawer_client_update.html.j2 (100%) rename packages/sshecret-admin/src/sshecret_admin/{ => frontend}/templates/clients/dynamic.html.j2 (100%) rename packages/sshecret-admin/src/sshecret_admin/{ => frontend}/templates/clients/field_invalid.html.j2 (100%) rename packages/sshecret-admin/src/sshecret_admin/{ => frontend}/templates/clients/field_valid.html.j2 (100%) rename packages/sshecret-admin/src/sshecret_admin/{ => frontend}/templates/clients/index.html.j2 (100%) rename packages/sshecret-admin/src/sshecret_admin/{ => frontend}/templates/clients/inner.html.j2 (100%) create mode 100644 packages/sshecret-admin/src/sshecret_admin/frontend/templates/dashboard.html rename packages/sshecret-admin/src/sshecret_admin/{ => frontend}/templates/dashboard/_base.html (100%) rename packages/sshecret-admin/src/sshecret_admin/{ => frontend}/templates/dashboard/_favicons.html (100%) rename packages/sshecret-admin/src/sshecret_admin/{ => frontend}/templates/dashboard/_header.html (100%) rename packages/sshecret-admin/src/sshecret_admin/{ => frontend}/templates/dashboard/_scripts.html (100%) rename packages/sshecret-admin/src/sshecret_admin/{ => frontend}/templates/dashboard/_stylesheet.html (100%) create mode 100644 packages/sshecret-admin/src/sshecret_admin/frontend/templates/dashboard/index.html.j2 rename packages/sshecret-admin/src/sshecret_admin/{ => frontend}/templates/dashboard/navbar.html (100%) rename packages/sshecret-admin/src/sshecret_admin/{ => frontend}/templates/dashboard/sidebar.html (100%) rename packages/sshecret-admin/src/sshecret_admin/{ => frontend}/templates/dashboard_old.html (100%) rename packages/sshecret-admin/src/sshecret_admin/{ => frontend}/templates/fragments/error.html (100%) rename packages/sshecret-admin/src/sshecret_admin/{ => frontend}/templates/fragments/ok.html (100%) rename packages/sshecret-admin/src/sshecret_admin/{ => frontend}/templates/login.html (100%) rename packages/sshecret-admin/src/sshecret_admin/{ => frontend}/templates/secrets/client_options.html.j2 (100%) rename packages/sshecret-admin/src/sshecret_admin/{ => frontend}/templates/secrets/drawer_secret_create.html.j2 (100%) rename packages/sshecret-admin/src/sshecret_admin/{ => frontend}/templates/secrets/index.html.j2 (100%) rename packages/sshecret-admin/src/sshecret_admin/{ => frontend}/templates/secrets/inner.html.j2 (100%) rename packages/sshecret-admin/src/sshecret_admin/{ => frontend}/templates/secrets/modal_client_secret.html.j2 (100%) rename packages/sshecret-admin/src/sshecret_admin/{ => frontend}/templates/secrets/secret.html.j2 (100%) rename packages/sshecret-admin/src/sshecret_admin/{ => frontend}/templates/shared/_base.html (100%) rename packages/sshecret-admin/src/sshecret_admin/{ => frontend}/templates/shared/_dashboard.html (100%) rename packages/sshecret-admin/src/sshecret_admin/{ => frontend}/templates/shared/_dashboard_save.html (100%) rename packages/sshecret-admin/src/sshecret_admin/{ => frontend}/templates/success.html (100%) create mode 100644 packages/sshecret-admin/src/sshecret_admin/frontend/templates/widgets/clients.html create mode 100644 packages/sshecret-admin/src/sshecret_admin/frontend/views/__init__.py rename packages/sshecret-admin/src/sshecret_admin/{ => frontend}/views/audit.py (68%) create mode 100644 packages/sshecret-admin/src/sshecret_admin/frontend/views/auth.py rename packages/sshecret-admin/src/sshecret_admin/{ => frontend}/views/clients.py (78%) create mode 100644 packages/sshecret-admin/src/sshecret_admin/frontend/views/index.py rename packages/sshecret-admin/src/sshecret_admin/{ => frontend}/views/secrets.py (72%) create mode 100644 packages/sshecret-admin/src/sshecret_admin/services/__init__.py rename packages/sshecret-admin/src/sshecret_admin/{ => services}/admin_backend.py (92%) rename packages/sshecret-admin/src/sshecret_admin/{ => services}/keepass.py (98%) rename packages/sshecret-admin/src/sshecret_admin/{ => services}/master_password.py (97%) rename packages/sshecret-admin/src/sshecret_admin/{view_models.py => services/models.py} (97%) delete mode 100644 packages/sshecret-admin/src/sshecret_admin/templates/dashboard.html delete mode 100644 packages/sshecret-admin/src/sshecret_admin/views/__init__.py delete mode 100644 packages/sshecret-admin/static/index.html diff --git a/packages/sshecret-admin/pyproject.toml b/packages/sshecret-admin/pyproject.toml index 73c1c3c..17abda3 100644 --- a/packages/sshecret-admin/pyproject.toml +++ b/packages/sshecret-admin/pyproject.toml @@ -19,10 +19,14 @@ dependencies = [ "pyjwt>=2.10.1", "pykeepass>=4.1.1.post1", "sqlmodel>=0.0.24", + "sshecret", ] +[tool.uv.sources] +sshecret = { workspace = true } + [project.scripts] -sshecret-admin = "sshecret_admin.cli:cli" +sshecret-admin = "sshecret_admin.core.cli:cli" [build-system] requires = ["hatchling"] @@ -31,4 +35,5 @@ build-backend = "hatchling.build" [dependency-groups] dev = [ "pytailwindcss>=0.2.0", + "types-pyjwt>=1.7.1", ] diff --git a/packages/sshecret-admin/src/sshecret_admin/admin_api.py b/packages/sshecret-admin/src/sshecret_admin/admin_api.py deleted file mode 100644 index 1cf9d9a..0000000 --- a/packages/sshecret-admin/src/sshecret_admin/admin_api.py +++ /dev/null @@ -1,284 +0,0 @@ -"""Admin API.""" - -# pyright: reportUnusedFunction=false - -import logging -from collections import defaultdict -from datetime import timedelta -from typing import Annotated - -import jwt -from fastapi import APIRouter, Depends, HTTPException, status -from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm -from sqlmodel import Session, select -from sshecret.backend import Client, SshecretBackend -from sshecret.backend.models import Secret - -from .admin_backend import AdminBackend -from .auth_models import ( - PasswordDB, - Token, - TokenData, - User, - create_access_token, - verify_password, -) -from .settings import AdminServerSettings -from .types import DBSessionDep -from .view_models import ( - ClientCreate, - SecretCreate, - SecretUpdate, - SecretView, - UpdateKeyModel, - UpdateKeyResponse, - UpdatePoliciesRequest, -) - -LOG = logging.getLogger(__name__) - - -API_VERSION = "v1" -JWT_ALGORITHM = "HS256" -ACCESS_TOKEN_EXPIRE_MINUTES = 30 - - -def authenticate_user(session: Session, username: str, password: str) -> User | None: - """Authenticate user.""" - user = session.exec(select(User).where(User.username == username)).first() - if not user: - return None - if not verify_password(password, user.hashed_password): - return None - return user - - -async def map_secrets_to_clients( - backend: SshecretBackend, -) -> defaultdict[str, list[str]]: - """Map secrets to clients.""" - clients = await backend.get_clients() - client_secret_map: defaultdict[str, list[str]] = defaultdict(list) - for client in clients: - for secret in client.secrets: - client_secret_map[secret].append(client.name) - return client_secret_map - - -def get_admin_api( - get_db_session: DBSessionDep, settings: AdminServerSettings -) -> APIRouter: - """Get Admin API.""" - oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") - - async def get_admin_backend(session: Annotated[Session, Depends(get_db_session)]): - """Get admin backend API.""" - password_db = session.exec(select(PasswordDB).where(PasswordDB.id == 1)).first() - if not password_db: - raise HTTPException( - 500, detail="Error: The password manager has not yet been set up." - ) - admin = AdminBackend(settings, password_db.encrypted_password) - yield admin - - async def get_current_user( - token: Annotated[str, Depends(oauth2_scheme)], - session: Annotated[Session, Depends(get_db_session)], - ) -> User: - """Get current user from token.""" - credentials_exception = HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail="Could not validate credentials", - headers={"WWW-Authenticate": "Bearer"}, - ) - try: - payload = jwt.decode(token, settings.secret_key, algorithms=[JWT_ALGORITHM]) - username = payload.get("sub") - if not username: - raise credentials_exception - token_data = TokenData(username=username) - except jwt.InvalidTokenError: - raise credentials_exception - - user = session.exec( - select(User).where(User.username == token_data.username) - ).first() - if not user: - raise credentials_exception - return user - - async def get_current_active_user( - current_user: Annotated[User, Depends(get_current_user)], - ) -> User: - """Get current active user.""" - if current_user.disabled: - raise HTTPException(status_code=400, detail="Inactive or disabled user") - return current_user - - app = APIRouter( - prefix=f"/api/{API_VERSION}", dependencies=[Depends(get_current_active_user)] - ) - - @app.post("/token") - async def login_for_access_token( - session: Annotated[Session, Depends(get_db_session)], - form_data: Annotated[OAuth2PasswordRequestForm, Depends()], - ) -> Token: - """Login user and generate token.""" - user = authenticate_user(session, form_data.username, form_data.password) - if not user: - raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail="Incorrect username or password", - headers={"WWW-Authenticate": "Bearer"}, - ) - access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) - access_token = create_access_token( - settings, - data={"sub": user.username}, - expires_delta=access_token_expires, - ) - return Token(access_token=access_token, token_type="bearer") - - @app.get("/clients/") - async def get_clients( - admin: Annotated[AdminBackend, Depends(get_admin_backend)] - ) -> list[Client]: - """Get clients.""" - clients = await admin.get_clients() - return clients - - @app.post("/clients/") - async def create_client( - new_client: ClientCreate, - admin: Annotated[AdminBackend, Depends(get_admin_backend)], - ) -> Client: - """Create a new client.""" - sources: list[str] | None = None - if new_client.sources: - sources = [str(source) for source in new_client.sources] - client = await admin.create_client( - new_client.name, new_client.public_key, sources - ) - return client - - @app.delete("/clients/{name}") - async def delete_client( - name: str, admin: Annotated[AdminBackend, Depends(get_admin_backend)] - ) -> None: - """Delete a client.""" - await admin.delete_client(name) - - @app.delete("/clients/{name}/secrets/{secret_name}") - async def delete_secret_from_client( - name: str, - secret_name: str, - admin: Annotated[AdminBackend, Depends(get_admin_backend)], - ) -> None: - """Delete a secret from a client.""" - client = await admin.get_client(name) - if not client: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, detail="Item not found" - ) - - if secret_name not in client.secrets: - LOG.debug("Client does not have requested secret. No action to perform.") - return None - - await admin.delete_client_secret(name, secret_name) - - @app.put("/clients/{name}/policies") - async def update_client_policies( - name: str, - updated: UpdatePoliciesRequest, - admin: Annotated[AdminBackend, Depends(get_admin_backend)], - ) -> Client: - """Update the client access policies.""" - client = await admin.get_client(name) - if not client: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, detail="Item not found" - ) - - LOG.debug("Old policies: %r. New: %r", client.policies, updated.sources) - - addresses: list[str] = [str(source) for source in updated.sources] - await admin.update_client_sources(name, addresses) - client = await admin.get_client(name) - - assert client is not None, "Critical: The client disappeared after update!" - - return client - - @app.get("/secrets/") - async def get_secret_names( - admin: Annotated[AdminBackend, Depends(get_admin_backend)] - ) -> list[Secret]: - """Get Secret Names.""" - return await admin.get_secrets() - - @app.post("/secrets/") - async def add_secret( - secret: SecretCreate, admin: Annotated[AdminBackend, Depends(get_admin_backend)] - ) -> None: - """Create a secret.""" - await admin.add_secret(secret.name, secret.get_secret(), secret.clients) - - @app.get("/secrets/{name}") - async def get_secret( - name: str, admin: Annotated[AdminBackend, Depends(get_admin_backend)] - ) -> SecretView: - """Get a secret.""" - secret_view = await admin.get_secret(name) - - if not secret_view: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, detail="Item not found." - ) - return secret_view - - @app.put("/secrets/{name}") - async def update_secret( - name: str, - value: SecretUpdate, - admin: Annotated[AdminBackend, Depends(get_admin_backend)], - ) -> None: - new_value = value.get_secret() - await admin.update_secret(name, new_value) - - @app.delete("/secrets/{name}") - async def delete_secret( - name: str, - admin: Annotated[AdminBackend, Depends(get_admin_backend)], - ) -> None: - """Delete secret.""" - await admin.delete_secret(name) - - @app.put("/clients/{name}/public-key") - async def update_client_public_key( - name: str, - updated: UpdateKeyModel, - admin: Annotated[AdminBackend, Depends(get_admin_backend)], - ) -> UpdateKeyResponse: - """Update client public key. - - Updating the public key will invalidate the current secrets, so these well - be resolved first, and re-encrypted using the new key. - """ - # Let's first ensure that the key is actually updated. - updated_secrets = await admin.update_client_public_key(name, updated.public_key) - return UpdateKeyResponse( - public_key=updated.public_key, updated_secrets=updated_secrets - ) - - @app.put("/clients/{name}/secrets/{secret_name}") - async def add_secret_to_client( - name: str, - secret_name: str, - admin: Annotated[AdminBackend, Depends(get_admin_backend)], - ) -> None: - """Add secret to a client.""" - await admin.create_client_secret(name, secret_name) - - return app diff --git a/packages/sshecret-admin/src/sshecret_admin/api/__init__.py b/packages/sshecret-admin/src/sshecret_admin/api/__init__.py new file mode 100644 index 0000000..d4840e7 --- /dev/null +++ b/packages/sshecret-admin/src/sshecret_admin/api/__init__.py @@ -0,0 +1,5 @@ +"""Admin REST API.""" + +from .router import create_router as create_api_router + +__all__ = ["create_api_router"] diff --git a/packages/sshecret-admin/src/sshecret_admin/api/endpoints/__init__.py b/packages/sshecret-admin/src/sshecret_admin/api/endpoints/__init__.py new file mode 100644 index 0000000..13d4058 --- /dev/null +++ b/packages/sshecret-admin/src/sshecret_admin/api/endpoints/__init__.py @@ -0,0 +1 @@ +"""API Endpoints.""" diff --git a/packages/sshecret-admin/src/sshecret_admin/api/endpoints/auth.py b/packages/sshecret-admin/src/sshecret_admin/api/endpoints/auth.py new file mode 100644 index 0000000..0e73eae --- /dev/null +++ b/packages/sshecret-admin/src/sshecret_admin/api/endpoints/auth.py @@ -0,0 +1,39 @@ +"""Authentication related endpoints factory.""" + +# pyright: reportUnusedFunction=false +import logging +from typing import Annotated +from fastapi import APIRouter, Depends, HTTPException, status +from fastapi.security import OAuth2PasswordRequestForm +from sqlmodel import Session + +from sshecret_admin.auth import Token, authenticate_user, create_access_token +from sshecret_admin.core.dependencies import AdminDependencies + +LOG = logging.getLogger(__name__) + +def create_router(dependencies: AdminDependencies) -> APIRouter: + """Create auth router.""" + app = APIRouter() + + @app.post("/token") + async def login_for_access_token( + session: Annotated[Session, Depends(dependencies.get_db_session)], + form_data: Annotated[OAuth2PasswordRequestForm, Depends()], + ) -> Token: + """Login user and generate token.""" + user = authenticate_user(session, form_data.username, form_data.password) + if not user: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Incorrect username or password", + headers={"WWW-Authenticate": "Bearer"}, + ) + access_token = create_access_token( + dependencies.settings, + data={"sub": user.username}, + ) + return Token(access_token=access_token, token_type="bearer") + + + return app diff --git a/packages/sshecret-admin/src/sshecret_admin/api/endpoints/clients.py b/packages/sshecret-admin/src/sshecret_admin/api/endpoints/clients.py new file mode 100644 index 0000000..c8384f4 --- /dev/null +++ b/packages/sshecret-admin/src/sshecret_admin/api/endpoints/clients.py @@ -0,0 +1,124 @@ +"""Client-related endpoints factory.""" + +# pyright: reportUnusedFunction=false + +import logging +from typing import Annotated +from fastapi import APIRouter, Depends, HTTPException, status + +from sshecret.backend import Client +from sshecret_admin.core.dependencies import AdminDependencies +from sshecret_admin.services import AdminBackend +from sshecret_admin.services.models import ( + ClientCreate, + UpdateKeyModel, + UpdateKeyResponse, + UpdatePoliciesRequest, +) + +LOG = logging.getLogger(__name__) + + +def create_router(dependencies: AdminDependencies) -> APIRouter: + """Create clients router.""" + app = APIRouter() + + @app.get("/clients/") + async def get_clients( + admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)] + ) -> list[Client]: + """Get clients.""" + clients = await admin.get_clients() + return clients + + @app.post("/clients/") + async def create_client( + new_client: ClientCreate, + admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], + ) -> Client: + """Create a new client.""" + sources: list[str] | None = None + if new_client.sources: + sources = [str(source) for source in new_client.sources] + client = await admin.create_client( + new_client.name, new_client.public_key, sources=sources + ) + return client + + @app.delete("/clients/{name}") + async def delete_client( + name: str, + admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], + ) -> None: + """Delete a client.""" + await admin.delete_client(name) + + @app.delete("/clients/{name}/secrets/{secret_name}") + async def delete_secret_from_client( + name: str, + secret_name: str, + admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], + ) -> None: + """Delete a secret from a client.""" + client = await admin.get_client(name) + if not client: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, detail="Item not found" + ) + + if secret_name not in client.secrets: + LOG.debug("Client does not have requested secret. No action to perform.") + return None + + await admin.delete_client_secret(name, secret_name) + + @app.put("/clients/{name}/policies") + async def update_client_policies( + name: str, + updated: UpdatePoliciesRequest, + admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], + ) -> Client: + """Update the client access policies.""" + client = await admin.get_client(name) + if not client: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, detail="Item not found" + ) + + LOG.debug("Old policies: %r. New: %r", client.policies, updated.sources) + + addresses: list[str] = [str(source) for source in updated.sources] + await admin.update_client_sources(name, addresses) + client = await admin.get_client(name) + + assert client is not None, "Critical: The client disappeared after update!" + + return client + + @app.put("/clients/{name}/public-key") + async def update_client_public_key( + name: str, + updated: UpdateKeyModel, + admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], + ) -> UpdateKeyResponse: + """Update client public key. + + Updating the public key will invalidate the current secrets, so these well + be resolved first, and re-encrypted using the new key. + """ + # Let's first ensure that the key is actually updated. + updated_secrets = await admin.update_client_public_key(name, updated.public_key) + return UpdateKeyResponse( + public_key=updated.public_key, updated_secrets=updated_secrets + ) + + @app.put("/clients/{name}/secrets/{secret_name}") + async def add_secret_to_client( + name: str, + secret_name: str, + admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], + ) -> None: + """Add secret to a client.""" + await admin.create_client_secret(name, secret_name) + + return app diff --git a/packages/sshecret-admin/src/sshecret_admin/api/endpoints/secrets.py b/packages/sshecret-admin/src/sshecret_admin/api/endpoints/secrets.py new file mode 100644 index 0000000..01ee01d --- /dev/null +++ b/packages/sshecret-admin/src/sshecret_admin/api/endpoints/secrets.py @@ -0,0 +1,70 @@ +"""Secrets related endpoints factory.""" + +# pyright: reportUnusedFunction=false +import logging +from typing import Annotated +from fastapi import APIRouter, Depends, HTTPException, status + +from sshecret.backend.models import Secret +from sshecret_admin.core.dependencies import AdminDependencies +from sshecret_admin.services import AdminBackend +from sshecret_admin.services.models import ( + SecretCreate, + SecretUpdate, + SecretView, +) + +LOG = logging.getLogger(__name__) + + +def create_router(dependencies: AdminDependencies) -> APIRouter: + """Create secrets router.""" + app = APIRouter() + + @app.get("/secrets/") + async def get_secret_names( + admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)] + ) -> list[Secret]: + """Get Secret Names.""" + return await admin.get_secrets() + + @app.post("/secrets/") + async def add_secret( + secret: SecretCreate, + admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], + ) -> None: + """Create a secret.""" + await admin.add_secret(secret.name, secret.get_secret(), secret.clients) + + @app.get("/secrets/{name}") + async def get_secret( + name: str, + admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], + ) -> SecretView: + """Get a secret.""" + secret_view = await admin.get_secret(name) + + if not secret_view: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, detail="Item not found." + ) + return secret_view + + @app.put("/secrets/{name}") + async def update_secret( + name: str, + value: SecretUpdate, + admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], + ) -> None: + new_value = value.get_secret() + await admin.update_secret(name, new_value) + + @app.delete("/secrets/{name}") + async def delete_secret( + name: str, + admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], + ) -> None: + """Delete secret.""" + await admin.delete_secret(name) + + return app diff --git a/packages/sshecret-admin/src/sshecret_admin/api/router.py b/packages/sshecret-admin/src/sshecret_admin/api/router.py new file mode 100644 index 0000000..ae939fb --- /dev/null +++ b/packages/sshecret-admin/src/sshecret_admin/api/router.py @@ -0,0 +1,78 @@ +"""Main API Router.""" + +# pyright: reportUnusedFunction=false + +import logging +from typing import Annotated + +from fastapi import APIRouter, Depends, HTTPException, status +from fastapi.security import OAuth2PasswordBearer + +from sqlmodel import Session, select + +from sshecret_admin.services.admin_backend import AdminBackend +from sshecret_admin.core.dependencies import BaseDependencies, AdminDependencies +from sshecret_admin.auth import PasswordDB, User, decode_token + +from .endpoints import auth, clients, secrets + +LOG = logging.getLogger(__name__) + +API_VERSION = "v1" + + +def create_router(dependencies: BaseDependencies) -> APIRouter: + """Create clients router.""" + + oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") + + async def get_current_user( + token: Annotated[str, Depends(oauth2_scheme)], + session: Annotated[Session, Depends(dependencies.get_db_session)], + ) -> User: + """Get current user from token.""" + credentials_exception = HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Could not validate credentials", + headers={"WWW-Authenticate": "Bearer"}, + ) + token_data = decode_token(dependencies.settings, token) + if not token_data: + raise credentials_exception + + user = session.exec( + select(User).where(User.username == token_data.username) + ).first() + if not user: + raise credentials_exception + return user + + async def get_current_active_user( + current_user: Annotated[User, Depends(get_current_user)], + ) -> User: + """Get current active user.""" + if current_user.disabled: + raise HTTPException(status_code=400, detail="Inactive or disabled user") + return current_user + + async def get_admin_backend(session: Annotated[Session, Depends(dependencies.get_db_session)]): + """Get admin backend API.""" + password_db = session.exec(select(PasswordDB).where(PasswordDB.id == 1)).first() + if not password_db: + raise HTTPException( + 500, detail="Error: The password manager has not yet been set up." + ) + admin = AdminBackend(dependencies.settings, password_db.encrypted_password) + yield admin + + app = APIRouter( + prefix=f"/api/{API_VERSION}", dependencies=[Depends(get_current_active_user)] + ) + + endpoint_deps = AdminDependencies.create(dependencies, get_admin_backend) + + app.include_router(auth.create_router(endpoint_deps)) + app.include_router(clients.create_router(endpoint_deps)) + app.include_router(secrets.create_router(endpoint_deps)) + + return app diff --git a/packages/sshecret-admin/src/sshecret_admin/auth/__init__.py b/packages/sshecret-admin/src/sshecret_admin/auth/__init__.py new file mode 100644 index 0000000..e83c7f5 --- /dev/null +++ b/packages/sshecret-admin/src/sshecret_admin/auth/__init__.py @@ -0,0 +1,24 @@ +"""Authentication related module.""" + +from .authentication import ( + authenticate_user, + create_access_token, + create_refresh_token, + check_password, + decode_token, + verify_password, +) +from .models import User, Token, PasswordDB + + +__all__ = [ + "PasswordDB", + "Token", + "User", + "authenticate_user", + "check_password", + "create_access_token", + "create_refresh_token", + "decode_token", + "verify_password", +] diff --git a/packages/sshecret-admin/src/sshecret_admin/auth/authentication.py b/packages/sshecret-admin/src/sshecret_admin/auth/authentication.py new file mode 100644 index 0000000..f85eba3 --- /dev/null +++ b/packages/sshecret-admin/src/sshecret_admin/auth/authentication.py @@ -0,0 +1,95 @@ +"""Authentication utilities.""" + +import logging +from datetime import datetime, timezone, timedelta +from typing import cast, Any + +import bcrypt +import jwt +from sqlmodel import Session, select + +from sshecret_admin.core.settings import AdminServerSettings +from .models import User, TokenData +from .exceptions import AuthenticationFailedError + +JWT_ALGORITHM = "HS256" +ACCESS_TOKEN_EXPIRE_MINUTES = 30 +# I know refresh tokens are supposed to be long-lived, but 6 hours for a +# sensitive application, seems reasonable. +REFRESH_TOKEN_EXPIRE_HOURS = 6 + +LOG = logging.getLogger(__name__) + + +def create_token( + settings: AdminServerSettings, + data: dict[str, Any], + expires_delta: timedelta, +) -> str: + """Create access token.""" + to_encode = data.copy() + expire = datetime.now(timezone.utc) + expires_delta + to_encode.update({"exp": expire}) + encoded_jwt = jwt.encode(to_encode, settings.secret_key, algorithm=JWT_ALGORITHM) + return str(encoded_jwt) + + +def create_access_token( + settings: AdminServerSettings, + data: dict[str, Any], + expires_delta: timedelta | None = None, +) -> str: + """Create access token.""" + if not expires_delta: + expires_delta = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) + return create_token(settings, data, expires_delta) + + +def create_refresh_token( + settings: AdminServerSettings, + data: dict[str, Any], + expires_delta: timedelta | None = None, +) -> str: + """Create access token.""" + if not expires_delta: + expires_delta = timedelta(hours=REFRESH_TOKEN_EXPIRE_HOURS) + return create_token(settings, data, expires_delta) + + +def verify_password(plain_password: str, hashed_password: str) -> bool: + """Verify password against stored hash.""" + return bcrypt.checkpw(plain_password.encode(), hashed_password.encode()) + + +def check_password(plain_password: str, hashed_password: str) -> None: + """Check password. + + If password doesn't match, throw AuthenticationFailedError. + """ + if not verify_password(plain_password, hashed_password): + raise AuthenticationFailedError() + + +def authenticate_user(session: Session, username: str, password: str) -> User | None: + """Authenticate user.""" + user = session.exec(select(User).where(User.username == username)).first() + if not user: + return None + if not verify_password(password, user.hashed_password): + return None + return user + + +def decode_token(settings: AdminServerSettings, token: str) -> TokenData | None: + """Decode token.""" + try: + payload = jwt.decode(token, settings.secret_key, algorithms=[JWT_ALGORITHM]) + username = cast("str | None", payload.get("sub")) + if not username: + return None + + token_data = TokenData(username=username) + return token_data + except jwt.InvalidTokenError as e: + LOG.debug("Could not decode token: %s", e, exc_info=True) + return None diff --git a/packages/sshecret-admin/src/sshecret_admin/auth/exceptions.py b/packages/sshecret-admin/src/sshecret_admin/auth/exceptions.py new file mode 100644 index 0000000..0acb34d --- /dev/null +++ b/packages/sshecret-admin/src/sshecret_admin/auth/exceptions.py @@ -0,0 +1,30 @@ +"""Authentication related exceptions.""" +from typing import override + +from .models import LoginError + + +class AuthenticationFailedError(Exception): + """Authentication failed.""" + + @override + def __init__(self, message: str | None = None) -> None: + """Initialize exception class.""" + if not message: + message = "Invalid user or password." + super().__init__(message) + self.login_error: LoginError = LoginError( + title="Authentication Failed", message=message + ) + + +class AuthenticationNeededError(Exception): + """Authentication needed error.""" + + @override + def __init__(self, message: str | None = None) -> None: + """Initialize exception class.""" + if not message: + message = "You need to be logged in to continue." + super().__init__(message) + self.login_error: LoginError = LoginError(title="Unauthorized", message=message) diff --git a/packages/sshecret-admin/src/sshecret_admin/auth/models.py b/packages/sshecret-admin/src/sshecret_admin/auth/models.py new file mode 100644 index 0000000..7a25d84 --- /dev/null +++ b/packages/sshecret-admin/src/sshecret_admin/auth/models.py @@ -0,0 +1,71 @@ +"""Models for authentication.""" + +from datetime import datetime +import sqlalchemy as sa +from sqlmodel import SQLModel, Field + + +JWT_ALGORITHM = "HS256" +ACCESS_TOKEN_EXPIRE_MINUTES = 30 +# I know refresh tokens are supposed to be long-lived, but 6 hours for a +# sensitive application, seems reasonable. +REFRESH_TOKEN_EXPIRE_HOURS = 6 + + +class User(SQLModel, table=True): + """Users.""" + + username: str = Field(unique=True, primary_key=True) + hashed_password: str + disabled: bool = Field(default=False) + created_at: datetime | None = Field( + default=None, + sa_type=sa.DateTime(timezone=True), + sa_column_kwargs={"server_default": sa.func.now()}, + nullable=False, + ) + + +class PasswordDB(SQLModel, table=True): + """Password database.""" + + id: int | None = Field(default=None, primary_key=True) + encrypted_password: str + + created_at: datetime | None = Field( + default=None, + sa_type=sa.DateTime(timezone=True), + sa_column_kwargs={"server_default": sa.func.now()}, + nullable=False, + ) + + updated_at: datetime | None = Field( + default=None, + sa_type=sa.DateTime(timezone=True), + sa_column_kwargs={"onupdate": sa.func.now(), "server_default": sa.func.now()}, + ) + + +def init_db(engine: sa.Engine) -> None: + """Create database.""" + SQLModel.metadata.create_all(engine) + + +class TokenData(SQLModel): + """Token data.""" + + username: str | None = None + + +class Token(SQLModel): + access_token: str + token_type: str + + +class LoginError(SQLModel): + """Login Error model.""" + # TODO: Remove this. + + title: str + message: str + diff --git a/packages/sshecret-admin/src/sshecret_admin/auth_models.py b/packages/sshecret-admin/src/sshecret_admin/auth_models.py deleted file mode 100644 index 87f861b..0000000 --- a/packages/sshecret-admin/src/sshecret_admin/auth_models.py +++ /dev/null @@ -1,125 +0,0 @@ -"""Models for authentication.""" - -from datetime import datetime, timedelta, timezone -import bcrypt -import sqlalchemy as sa -from typing import Any, override -import jwt -from sqlmodel import SQLModel, Field -from sshecret_admin.settings import AdminServerSettings - - -JWT_ALGORITHM = "HS256" -ACCESS_TOKEN_EXPIRE_MINUTES = 30 - - -class User(SQLModel, table=True): - """Users.""" - - username: str = Field(unique=True, primary_key=True) - hashed_password: str - disabled: bool = Field(default=False) - created_at: datetime | None = Field( - default=None, - sa_type=sa.DateTime(timezone=True), - sa_column_kwargs={"server_default": sa.func.now()}, - nullable=False, - ) - - -class PasswordDB(SQLModel, table=True): - """Password database.""" - - id: int | None = Field(default=None, primary_key=True) - encrypted_password: str - - created_at: datetime | None = Field( - default=None, - sa_type=sa.DateTime(timezone=True), - sa_column_kwargs={"server_default": sa.func.now()}, - nullable=False, - ) - - updated_at: datetime | None = Field( - default=None, - sa_type=sa.DateTime(timezone=True), - sa_column_kwargs={"onupdate": sa.func.now(), "server_default": sa.func.now()}, - ) - - -def init_db(engine: sa.Engine) -> None: - """Create database.""" - SQLModel.metadata.create_all(engine) - - -class TokenData(SQLModel): - """Token data.""" - - username: str | None = None - - -class Token(SQLModel): - access_token: str - token_type: str - - -def create_access_token( - settings: AdminServerSettings, - data: dict[str, Any], - expires_delta: timedelta | None = None, -) -> str: - """Create access token.""" - to_encode = data.copy() - expire = datetime.now(timezone.utc) + timedelta(minutes=15) - if expires_delta: - expire = datetime.now(timezone.utc) + expires_delta - to_encode.update({"exp": expire}) - encoded_jwt = jwt.encode(to_encode, settings.secret_key, algorithm=JWT_ALGORITHM) - return encoded_jwt - - -def verify_password(plain_password: str, hashed_password: str) -> bool: - """Verify password against stored hash.""" - return bcrypt.checkpw(plain_password.encode(), hashed_password.encode()) - - -def check_password(plain_password: str, hashed_password: str) -> None: - """Check password. - - If password doesn't match, throw AuthenticationFailedError. - """ - if not verify_password(plain_password, hashed_password): - raise AuthenticationFailedError() - - -class LoginError(SQLModel): - """Login Error model.""" - - title: str - message: str - - -class AuthenticationFailedError(Exception): - """Authentication failed.""" - - @override - def __init__(self, message: str | None = None) -> None: - """Initialize exception class.""" - if not message: - message = "Invalid user or password." - super().__init__(message) - self.login_error: LoginError = LoginError( - title="Authentication Failed", message=message - ) - - -class AuthenticationNeededError(Exception): - """Authentication needed error.""" - - @override - def __init__(self, message: str | None = None) -> None: - """Initialize exception class.""" - if not message: - message = "You need to be logged in to continue." - super().__init__(message) - self.login_error: LoginError = LoginError(title="Unauthorized", message=message) diff --git a/packages/sshecret-admin/src/sshecret_admin/templates/widgets/clients.html b/packages/sshecret-admin/src/sshecret_admin/core/__init__.py similarity index 100% rename from packages/sshecret-admin/src/sshecret_admin/templates/widgets/clients.html rename to packages/sshecret-admin/src/sshecret_admin/core/__init__.py diff --git a/packages/sshecret-admin/src/sshecret_admin/app.py b/packages/sshecret-admin/src/sshecret_admin/core/app.py similarity index 68% rename from packages/sshecret-admin/src/sshecret_admin/app.py rename to packages/sshecret-admin/src/sshecret_admin/core/app.py index 79cbfc3..06cc166 100644 --- a/packages/sshecret-admin/src/sshecret_admin/app.py +++ b/packages/sshecret-admin/src/sshecret_admin/core/app.py @@ -5,24 +5,22 @@ import logging import os from contextlib import asynccontextmanager - from pathlib import Path -from fastapi import FastAPI, Request, status +from fastapi import FastAPI, Request, Response, status from fastapi.encoders import jsonable_encoder from fastapi.exceptions import RequestValidationError from fastapi.responses import JSONResponse, RedirectResponse from fastapi.staticfiles import StaticFiles - from sqlmodel import Session, select +from sshecret_admin import api, frontend +from sshecret_admin.auth.models import PasswordDB, init_db +from sshecret_admin.core.db import setup_database +from sshecret_admin.frontend.exceptions import RedirectException +from sshecret_admin.services.master_password import setup_master_password -from .admin_api import get_admin_api -from .auth_models import init_db, PasswordDB, AuthenticationFailedError, AuthenticationNeededError -from .db import setup_database -from .master_password import setup_master_password +from .dependencies import BaseDependencies from .settings import AdminServerSettings -from .frontend import create_frontend -from .types import DBSessionDep LOG = logging.getLogger(__name__) @@ -30,15 +28,14 @@ LOG = logging.getLogger(__name__) def setup_frontend( - app: FastAPI, settings: AdminServerSettings, get_db_session: DBSessionDep + app: FastAPI, dependencies: BaseDependencies ) -> None: """Setup frontend.""" script_path = Path(os.path.dirname(os.path.realpath(__file__))) - static_path = script_path / "static" + static_path = script_path.parent / "static" app.mount("/static", StaticFiles(directory=static_path), name="static") - frontend = create_frontend(settings, get_db_session) - app.include_router(frontend) + app.include_router(frontend.create_frontend_router(dependencies)) def create_admin_app( @@ -88,19 +85,15 @@ def create_admin_app( content=jsonable_encoder({"detail": exc.errors(), "body": exc.body}), ) - @app.exception_handler(AuthenticationNeededError) - async def authentication_needed_handler( - request: Request, exc: AuthenticationNeededError, - ): - qs = f"error_title={exc.login_error.title}&error_message={exc.login_error.message}" - return RedirectResponse(f"/?{qs}") + @app.exception_handler(RedirectException) + async def redirect_handler(request: Request, exc: RedirectException) -> Response: + """Handle redirect exceptions.""" + if "hx-request" in request.headers: + response = Response() + response.headers["HX-Redirect"] = str(exc.to) + return response + return RedirectResponse(url=str(exc.to)) - @app.exception_handler(AuthenticationFailedError) - async def authentication_failed_handler( - request: Request, exc: AuthenticationNeededError, - ): - qs = f"error_title={exc.login_error.title}&error_message={exc.login_error.message}" - return RedirectResponse(f"/?{qs}") @app.get("/health") async def get_health() -> JSONResponse: @@ -109,10 +102,11 @@ def create_admin_app( status_code=status.HTTP_200_OK, content=jsonable_encoder({"status": "LIVE"}) ) - admin_api = get_admin_api(get_db_session, settings) + dependencies = BaseDependencies(settings, get_db_session) - app.include_router(admin_api) + + app.include_router(api.create_api_router(dependencies)) if with_frontend: - setup_frontend(app, settings, get_db_session) + setup_frontend(app, dependencies) return app diff --git a/packages/sshecret-admin/src/sshecret_admin/cli.py b/packages/sshecret-admin/src/sshecret_admin/core/cli.py similarity index 85% rename from packages/sshecret-admin/src/sshecret_admin/cli.py rename to packages/sshecret-admin/src/sshecret_admin/core/cli.py index b45bcc4..bbad59f 100644 --- a/packages/sshecret-admin/src/sshecret_admin/cli.py +++ b/packages/sshecret-admin/src/sshecret_admin/core/cli.py @@ -7,29 +7,30 @@ import logging from typing import Any, cast import bcrypt import click -from sshecret_admin.admin_backend import AdminBackend +from sshecret_admin.services.admin_backend import AdminBackend import uvicorn from pydantic import ValidationError from sqlmodel import Session, create_engine, select -from .auth_models import init_db, User, PasswordDB -from .settings import AdminServerSettings +from sshecret_admin.auth.models import init_db, User, PasswordDB +from sshecret_admin.core.settings import AdminServerSettings handler = logging.StreamHandler() -formatter = logging.Formatter("%(asctime)s [%(processName)s: %(process)d] [%(threadName)s: %(thread)d] [%(levelname)s] %(name)s: %(message)s") +formatter = logging.Formatter( + "%(asctime)s [%(processName)s: %(process)d] [%(threadName)s: %(thread)d] [%(levelname)s] %(name)s: %(message)s" +) handler.setFormatter(formatter) LOG = logging.getLogger() LOG.addHandler(handler) LOG.setLevel(logging.INFO) - - def hash_password(password: str) -> str: """Hash password.""" salt = bcrypt.gensalt() hashed_password = bcrypt.hashpw(password.encode(), salt) return hashed_password.decode() + def create_user(session: Session, username: str, password: str) -> None: """Create a user.""" hashed_password = hash_password(password) @@ -48,7 +49,9 @@ def cli(ctx: click.Context, debug: bool) -> None: try: settings = AdminServerSettings() # pyright: ignore[reportCallIssue] except ValidationError as e: - raise click.ClickException("Error: One or more required environment options are missing.") from e + raise click.ClickException( + "Error: One or more required environment options are missing." + ) from e ctx.obj = settings @@ -66,6 +69,7 @@ def cli_create_user(ctx: click.Context, username: str, password: str) -> None: click.echo("User created.") + @cli.command("passwd") @click.argument("username") @click.password_option() @@ -85,6 +89,7 @@ def cli_change_user_passwd(ctx: click.Context, username: str, password: str) -> session.commit() click.echo("Password updated.") + @cli.command("deluser") @click.argument("username") @click.confirmation_option() @@ -112,7 +117,9 @@ def cli_delete_user(ctx: click.Context, username: str) -> None: @click.option("--workers", type=click.INT) def cli_run(host: str, port: int, dev: bool, workers: int | None) -> None: """Run the server.""" - uvicorn.run("sshecret_admin.main:app", host=host, port=port, reload=dev, workers=workers) + uvicorn.run( + "sshecret_admin.core.main:app", host=host, port=port, reload=dev, workers=workers + ) @cli.command("repl") @@ -126,7 +133,9 @@ def cli_repl(ctx: click.Context) -> None: password_db = session.exec(select(PasswordDB).where(PasswordDB.id == 1)).first() if not password_db: - raise click.ClickException("Error: Password database has not yet been setup. Start the server to finish setup.") + raise click.ClickException( + "Error: Password database has not yet been setup. Start the server to finish setup." + ) def run(func: Awaitable[Any]) -> Any: """Run an async function.""" diff --git a/packages/sshecret-admin/src/sshecret_admin/db.py b/packages/sshecret-admin/src/sshecret_admin/core/db.py similarity index 100% rename from packages/sshecret-admin/src/sshecret_admin/db.py rename to packages/sshecret-admin/src/sshecret_admin/core/db.py diff --git a/packages/sshecret-admin/src/sshecret_admin/core/dependencies.py b/packages/sshecret-admin/src/sshecret_admin/core/dependencies.py new file mode 100644 index 0000000..d2d2137 --- /dev/null +++ b/packages/sshecret-admin/src/sshecret_admin/core/dependencies.py @@ -0,0 +1,37 @@ +"""Common type definitions.""" + +from collections.abc import AsyncGenerator, Callable, Generator +from dataclasses import dataclass +from typing import Self + +from sqlmodel import Session +from sshecret_admin.services import AdminBackend +from sshecret_admin.core.settings import AdminServerSettings + + +DBSessionDep = Callable[[], Generator[Session, None, None]] + +AdminDep = Callable[[Session], AsyncGenerator[AdminBackend, None]] + + +@dataclass +class BaseDependencies: + """Base level dependencies.""" + + settings: AdminServerSettings + get_db_session: DBSessionDep + +@dataclass +class AdminDependencies(BaseDependencies): + """Dependency class with admin.""" + + get_admin_backend: AdminDep + + @classmethod + def create(cls, deps: BaseDependencies, get_admin_backend: AdminDep) -> Self: + """Create from base dependencies.""" + return cls( + settings=deps.settings, + get_db_session=deps.get_db_session, + get_admin_backend=get_admin_backend, + ) diff --git a/packages/sshecret-admin/src/sshecret_admin/main.py b/packages/sshecret-admin/src/sshecret_admin/core/main.py similarity index 97% rename from packages/sshecret-admin/src/sshecret_admin/main.py rename to packages/sshecret-admin/src/sshecret_admin/core/main.py index c610df4..1bc005b 100644 --- a/packages/sshecret-admin/src/sshecret_admin/main.py +++ b/packages/sshecret-admin/src/sshecret_admin/core/main.py @@ -1,6 +1,5 @@ """Main server app.""" import sys -import uvicorn import click from pydantic import ValidationError diff --git a/packages/sshecret-admin/src/sshecret_admin/settings.py b/packages/sshecret-admin/src/sshecret_admin/core/settings.py similarity index 64% rename from packages/sshecret-admin/src/sshecret_admin/settings.py rename to packages/sshecret-admin/src/sshecret_admin/core/settings.py index 571288f..ab3af40 100644 --- a/packages/sshecret-admin/src/sshecret_admin/settings.py +++ b/packages/sshecret-admin/src/sshecret_admin/core/settings.py @@ -2,11 +2,12 @@ from pydantic import AnyHttpUrl, Field from pydantic_settings import BaseSettings, SettingsConfigDict +from sqlalchemy import URL DEFAULT_LISTEN_PORT = 8822 -DEFAULT_DATABASE = "sqlite:///ssh_admin.db" +DEFAULT_DATABASE = "ssh_admin.db" class AdminServerSettings(BaseSettings): @@ -21,5 +22,12 @@ class AdminServerSettings(BaseSettings): listen_address: str = Field(default="") secret_key: str port: int = DEFAULT_LISTEN_PORT - admin_db: str = Field(default=DEFAULT_DATABASE) + + database: str = Field(default=DEFAULT_DATABASE) + #admin_db: str = Field(default=DEFAULT_DATABASE) debug: bool = False + + @property + def admin_db(self) -> URL: + """Construct database url.""" + return URL.create(drivername="sqlite", database=self.database) diff --git a/packages/sshecret-admin/src/sshecret_admin/frontend.py b/packages/sshecret-admin/src/sshecret_admin/frontend.py deleted file mode 100644 index 40296aa..0000000 --- a/packages/sshecret-admin/src/sshecret_admin/frontend.py +++ /dev/null @@ -1,240 +0,0 @@ -"""Frontend methods.""" - -# pyright: reportUnusedFunction=false -import logging -import os -from datetime import timedelta -from pathlib import Path -from typing import Annotated - -import jwt -from fastapi import APIRouter, Depends, Form, HTTPException, Request, Response, status -from fastapi.responses import RedirectResponse -from jinja2_fragments.fastapi import Jinja2Blocks -from sqlmodel import Session, select -from sshecret_admin.settings import AdminServerSettings -from sshecret.backend import SshecretBackend -from .admin_backend import AdminBackend -from .auth_models import ( - JWT_ALGORITHM, - AuthenticationFailedError, - AuthenticationNeededError, - LoginError, - PasswordDB, - User, - TokenData, - create_access_token, - verify_password, -) -from .types import DBSessionDep -from .views import create_audit_view, create_client_view, create_secrets_view - - -ACCESS_TOKEN_EXPIRE_MINUTES = 45 -LOG = logging.getLogger(__name__) - - -def login_error(templates: Jinja2Blocks, request: Request): - """Return a login error.""" - return templates.TemplateResponse( - request, - "login.html", - { - "page_title": "Login", - "page_description": "Login Page", - "error": "Invalid Login.", - }, - ) - - -def create_frontend( - settings: AdminServerSettings, get_db_session: DBSessionDep -) -> APIRouter: - """Create frontend.""" - app = APIRouter(include_in_schema=False) - - script_path = Path(os.path.dirname(os.path.realpath(__file__))) - - template_path = script_path / "templates" - - templates = Jinja2Blocks(directory=template_path) - - # @app.exception_handler(AuthenticationFailedError) - # async def handle_authentication_failed(request: Request, exc: AuthenticationFailedError): - # """Handle authentication failed error.""" - # return templates.TemplateResponse(request, "login.html") - - async def get_backend(): - """Get backend client.""" - backend_client = SshecretBackend( - str(settings.backend_url), settings.backend_token - ) - yield backend_client - - async def get_admin_backend(session: Annotated[Session, Depends(get_db_session)]): - """Get admin backend API.""" - password_db = session.exec(select(PasswordDB).where(PasswordDB.id == 1)).first() - if not password_db: - raise HTTPException( - 500, detail="Error: The password manager has not yet been set up." - ) - admin = AdminBackend(settings, password_db.encrypted_password) - yield admin - - async def get_login_status( - request: Request, session: Annotated[Session, Depends(get_db_session)] - ) -> bool: - """Get login status.""" - token = request.cookies.get("access_token") - if not token: - return False - try: - payload = jwt.decode(token, settings.secret_key, algorithms=[JWT_ALGORITHM]) - username = payload.get("sub") - if not username: - return False - except jwt.InvalidTokenError: - return False - token_data = TokenData(username=username) - user = session.exec( - select(User).where(User.username == token_data.username) - ).first() - if not user: - return False - return True - - async def get_current_user_from_token( - request: Request, session: Annotated[Session, Depends(get_db_session)] - ) -> User: - credentials_exception = AuthenticationNeededError() - """Get current user from token.""" - token = request.cookies.get("access_token") - if not token: - raise credentials_exception - try: - payload = jwt.decode(token, settings.secret_key, algorithms=[JWT_ALGORITHM]) - username = payload.get("sub") - if not username: - raise credentials_exception - except jwt.InvalidTokenError: - raise credentials_exception - token_data = TokenData(username=username) - user = session.exec( - select(User).where(User.username == token_data.username) - ).first() - if not user: - raise credentials_exception - - return user - - @app.get("/") - async def get_index( - request: Request, - login_status: Annotated[bool, Depends(get_login_status)], - error_title: str | None = None, - error_message: str | None = None, - ): - """Get index.""" - if login_status: - return RedirectResponse("/dashboard") - login_error: LoginError | None = None - if error_title and error_message: - login_error = LoginError(title=error_title, message=error_message) - return templates.TemplateResponse( - request, - "login.html", - { - "page_title": "Login", - "page_description": "Login page.", - "login_error": login_error, - }, - ) - - @app.post("/") - async def post_index( - request: Request, - error_title: str | None = None, - error_message: str | None = None, - ): - """Get index.""" - login_error: LoginError | None = None - if error_title and error_message: - login_error = LoginError(title=error_title, message=error_message) - return templates.TemplateResponse( - request, - "login.html", - { - "page_title": "Login", - "page_description": "Login page.", - "login_error": login_error, - }, - ) - - @app.post("/login") - async def login_user( - response: Response, - request: Request, - session: Annotated[Session, Depends(get_db_session)], - username: Annotated[str, Form()], - password: Annotated[str, Form()], - ): - """Log in user.""" - user = session.exec(select(User).where(User.username == username)).first() - auth_error = AuthenticationFailedError() - if not user: - raise auth_error - - if not verify_password(password, user.hashed_password): - raise auth_error - - token_data = {"sub": user.username} - expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) - token = create_access_token(settings, token_data, expires_delta=expires) - response = RedirectResponse(url="/dashboard", status_code=status.HTTP_302_FOUND) - response.set_cookie( - key="access_token", value=token, httponly=True, secure=False, samesite="lax" - ) - return response - - @app.get("/success") - async def success_page( - request: Request, - current_user: Annotated[User, Depends(get_current_user_from_token)], - ): - """Display a success page.""" - return templates.TemplateResponse( - request, "success.html", {"page_title": "Success!", "user": current_user} - ) - - @app.get("/dashboard") - async def get_dashboard( - request: Request, - current_user: Annotated[User, Depends(get_current_user_from_token)], - admin: Annotated[AdminBackend, Depends(get_admin_backend)], - ): - """Dashboard for mocking up the dashboard.""" - # secrets = await admin.get_secrets() - return templates.TemplateResponse( - request, - "dashboard.html", - { - "page_title": "sshecret", - "user": current_user.username, - }, - ) - - # Stop adding routes here. - - app.include_router( - create_client_view(templates, get_current_user_from_token, get_admin_backend) - ) - - app.include_router( - create_secrets_view(templates, get_current_user_from_token, get_admin_backend) - ) - - app.include_router( - create_audit_view(templates, get_current_user_from_token, get_admin_backend) - ) - - return app diff --git a/packages/sshecret-admin/src/sshecret_admin/frontend/__init__.py b/packages/sshecret-admin/src/sshecret_admin/frontend/__init__.py new file mode 100644 index 0000000..c527245 --- /dev/null +++ b/packages/sshecret-admin/src/sshecret_admin/frontend/__init__.py @@ -0,0 +1,5 @@ +"""Frontend app.""" + +from .router import create_router as create_frontend_router + +__all__ = ["create_frontend_router"] diff --git a/packages/sshecret-admin/src/sshecret_admin/frontend/auth.py b/packages/sshecret-admin/src/sshecret_admin/frontend/auth.py new file mode 100644 index 0000000..7ac883a --- /dev/null +++ b/packages/sshecret-admin/src/sshecret_admin/frontend/auth.py @@ -0,0 +1,7 @@ +"""Custom oauth2 class.""" + +from fastapi.security import OAuth2 + + +class Oauth2TokenInCookies(OAuth2): + """TODO: Create this.""" diff --git a/packages/sshecret-admin/src/sshecret_admin/frontend/dependencies.py b/packages/sshecret-admin/src/sshecret_admin/frontend/dependencies.py new file mode 100644 index 0000000..1424653 --- /dev/null +++ b/packages/sshecret-admin/src/sshecret_admin/frontend/dependencies.py @@ -0,0 +1,48 @@ +"""Frontend dependencies.""" + +from dataclasses import dataclass +from collections.abc import Callable, Awaitable +from typing import Self + +from jinja2_fragments.fastapi import Jinja2Blocks +from fastapi import Request +from sqlmodel import Session + +from sshecret_admin.core.dependencies import AdminDep, BaseDependencies + +from sshecret_admin.auth.models import User + +UserTokenDep = Callable[[Request, Session], Awaitable[User]] +UserLoginDep = Callable[[Request, Session], Awaitable[bool]] + + +@dataclass +class FrontendDependencies(BaseDependencies): + """Frontend dependencies.""" + + get_admin_backend: AdminDep + templates: Jinja2Blocks + get_user_from_access_token: UserTokenDep + get_user_from_refresh_token: UserTokenDep + get_login_status: UserLoginDep + + @classmethod + def create( + cls, + deps: BaseDependencies, + get_admin_backend: AdminDep, + templates: Jinja2Blocks, + get_user_from_access_token: UserTokenDep, + get_user_from_refresh_token: UserTokenDep, + get_login_status: UserLoginDep, + ) -> Self: + """Create from base dependencies.""" + return cls( + settings=deps.settings, + get_db_session=deps.get_db_session, + get_admin_backend=get_admin_backend, + templates=templates, + get_user_from_access_token=get_user_from_access_token, + get_user_from_refresh_token=get_user_from_refresh_token, + get_login_status=get_login_status, + ) diff --git a/packages/sshecret-admin/src/sshecret_admin/frontend/exceptions.py b/packages/sshecret-admin/src/sshecret_admin/frontend/exceptions.py new file mode 100644 index 0000000..4c92ff4 --- /dev/null +++ b/packages/sshecret-admin/src/sshecret_admin/frontend/exceptions.py @@ -0,0 +1,13 @@ +"""Frontend exceptions.""" +from starlette.datastructures import URL + + +class RedirectException(Exception): + """Exception that initiates a redirect flow.""" + + def __init__(self, to: str | URL) -> None: # pyright: ignore[reportMissingSuperCall] + """Raise exception that redirects.""" + if isinstance(to, str): + to = URL(to) + + self.to: URL = to diff --git a/packages/sshecret-admin/src/sshecret_admin/frontend/router.py b/packages/sshecret-admin/src/sshecret_admin/frontend/router.py new file mode 100644 index 0000000..5b83067 --- /dev/null +++ b/packages/sshecret-admin/src/sshecret_admin/frontend/router.py @@ -0,0 +1,133 @@ +"""Frontend router.""" + +# pyright: reportUnusedFunction=false + +import logging +import os +from pathlib import Path +from typing import Annotated + +from fastapi import APIRouter, Depends, HTTPException, Request + +from jinja2_fragments.fastapi import Jinja2Blocks + +from sqlmodel import Session, select +from starlette.datastructures import URL + + +from sshecret_admin.auth import PasswordDB, User, decode_token +from sshecret_admin.core.dependencies import BaseDependencies +from sshecret_admin.services.admin_backend import AdminBackend + +from .dependencies import FrontendDependencies +from .exceptions import RedirectException +from .views import audit, auth, clients, index, secrets + + +LOG = logging.getLogger(__name__) + + +access_token = "access_token" +refresh_token = "refresh_token" + + +def create_router(dependencies: BaseDependencies) -> APIRouter: + """Create frontend router.""" + + app = APIRouter(include_in_schema=False) + + script_path = Path(os.path.dirname(os.path.realpath(__file__))) + + template_path = script_path / "templates" + + templates = Jinja2Blocks(directory=template_path) + + async def get_admin_backend( + session: Annotated[Session, Depends(dependencies.get_db_session)] + ): + """Get admin backend API.""" + password_db = session.exec(select(PasswordDB).where(PasswordDB.id == 1)).first() + if not password_db: + raise HTTPException( + 500, detail="Error: The password manager has not yet been set up." + ) + admin = AdminBackend(dependencies.settings, password_db.encrypted_password) + yield admin + + async def get_user_from_token( + token: str, + session: Session, + ) -> User | None: + """Get user from a token.""" + token_data = decode_token(dependencies.settings, token) + if not token_data: + return None + user = session.exec( + select(User).where(User.username == token_data.username) + ).first() + if not user or user.disabled: + return None + return user + + async def get_user_from_refresh_token( + request: Request, + session: Annotated[Session, Depends(dependencies.get_db_session)], + ) -> User: + """Get user from refresh token.""" + next = URL("/login").include_query_params(next=request.url.path) + credentials_error = RedirectException(to=next) + token = request.cookies.get("refresh_token") + if not token: + raise credentials_error + + user = await get_user_from_token(token, session) + if not user: + raise credentials_error + return user + + async def get_user_from_access_token( + request: Request, + session: Annotated[Session, Depends(dependencies.get_db_session)], + ) -> User: + """Get user from access token.""" + token = request.cookies.get("access_token") + next = URL("/refresh").include_query_params(next=request.url.path) + credentials_error = RedirectException(to=next) + if not token: + raise credentials_error + + user = await get_user_from_token(token, session) + if not user: + raise credentials_error + return user + + async def get_login_status( + request: Request, + session: Annotated[Session, Depends(dependencies.get_db_session)], + ) -> bool: + """Get login status.""" + token = request.cookies.get("access_token") + if not token: + return False + + user = await get_user_from_token(token, session) + if not user: + return False + return True + + view_dependencies = FrontendDependencies.create( + dependencies, + get_admin_backend, + templates, + get_user_from_access_token, + get_user_from_refresh_token, + get_login_status, + ) + + app.include_router(audit.create_router(view_dependencies)) + app.include_router(auth.create_router(view_dependencies)) + app.include_router(clients.create_router(view_dependencies)) + app.include_router(index.create_router(view_dependencies)) + app.include_router(secrets.create_router(view_dependencies)) + + return app diff --git a/packages/sshecret-admin/src/sshecret_admin/templates/audit/entry.html.j2 b/packages/sshecret-admin/src/sshecret_admin/frontend/templates/audit/entry.html.j2 similarity index 100% rename from packages/sshecret-admin/src/sshecret_admin/templates/audit/entry.html.j2 rename to packages/sshecret-admin/src/sshecret_admin/frontend/templates/audit/entry.html.j2 diff --git a/packages/sshecret-admin/src/sshecret_admin/templates/audit/index.html.j2 b/packages/sshecret-admin/src/sshecret_admin/frontend/templates/audit/index.html.j2 similarity index 100% rename from packages/sshecret-admin/src/sshecret_admin/templates/audit/index.html.j2 rename to packages/sshecret-admin/src/sshecret_admin/frontend/templates/audit/index.html.j2 diff --git a/packages/sshecret-admin/src/sshecret_admin/templates/audit/inner.html.j2 b/packages/sshecret-admin/src/sshecret_admin/frontend/templates/audit/inner.html.j2 similarity index 100% rename from packages/sshecret-admin/src/sshecret_admin/templates/audit/inner.html.j2 rename to packages/sshecret-admin/src/sshecret_admin/frontend/templates/audit/inner.html.j2 diff --git a/packages/sshecret-admin/src/sshecret_admin/templates/audit/inner_save.html.j2 b/packages/sshecret-admin/src/sshecret_admin/frontend/templates/audit/inner_save.html.j2 similarity index 100% rename from packages/sshecret-admin/src/sshecret_admin/templates/audit/inner_save.html.j2 rename to packages/sshecret-admin/src/sshecret_admin/frontend/templates/audit/inner_save.html.j2 diff --git a/packages/sshecret-admin/src/sshecret_admin/templates/audit/pagination.html.j2 b/packages/sshecret-admin/src/sshecret_admin/frontend/templates/audit/pagination.html.j2 similarity index 100% rename from packages/sshecret-admin/src/sshecret_admin/templates/audit/pagination.html.j2 rename to packages/sshecret-admin/src/sshecret_admin/frontend/templates/audit/pagination.html.j2 diff --git a/packages/sshecret-admin/src/sshecret_admin/templates/clients/client.html.j2 b/packages/sshecret-admin/src/sshecret_admin/frontend/templates/clients/client.html.j2 similarity index 100% rename from packages/sshecret-admin/src/sshecret_admin/templates/clients/client.html.j2 rename to packages/sshecret-admin/src/sshecret_admin/frontend/templates/clients/client.html.j2 diff --git a/packages/sshecret-admin/src/sshecret_admin/templates/clients/drawer_client_create.html.j2 b/packages/sshecret-admin/src/sshecret_admin/frontend/templates/clients/drawer_client_create.html.j2 similarity index 100% rename from packages/sshecret-admin/src/sshecret_admin/templates/clients/drawer_client_create.html.j2 rename to packages/sshecret-admin/src/sshecret_admin/frontend/templates/clients/drawer_client_create.html.j2 diff --git a/packages/sshecret-admin/src/sshecret_admin/templates/clients/drawer_client_delete.html.j2 b/packages/sshecret-admin/src/sshecret_admin/frontend/templates/clients/drawer_client_delete.html.j2 similarity index 100% rename from packages/sshecret-admin/src/sshecret_admin/templates/clients/drawer_client_delete.html.j2 rename to packages/sshecret-admin/src/sshecret_admin/frontend/templates/clients/drawer_client_delete.html.j2 diff --git a/packages/sshecret-admin/src/sshecret_admin/templates/clients/drawer_client_update.html.j2 b/packages/sshecret-admin/src/sshecret_admin/frontend/templates/clients/drawer_client_update.html.j2 similarity index 100% rename from packages/sshecret-admin/src/sshecret_admin/templates/clients/drawer_client_update.html.j2 rename to packages/sshecret-admin/src/sshecret_admin/frontend/templates/clients/drawer_client_update.html.j2 diff --git a/packages/sshecret-admin/src/sshecret_admin/templates/clients/dynamic.html.j2 b/packages/sshecret-admin/src/sshecret_admin/frontend/templates/clients/dynamic.html.j2 similarity index 100% rename from packages/sshecret-admin/src/sshecret_admin/templates/clients/dynamic.html.j2 rename to packages/sshecret-admin/src/sshecret_admin/frontend/templates/clients/dynamic.html.j2 diff --git a/packages/sshecret-admin/src/sshecret_admin/templates/clients/field_invalid.html.j2 b/packages/sshecret-admin/src/sshecret_admin/frontend/templates/clients/field_invalid.html.j2 similarity index 100% rename from packages/sshecret-admin/src/sshecret_admin/templates/clients/field_invalid.html.j2 rename to packages/sshecret-admin/src/sshecret_admin/frontend/templates/clients/field_invalid.html.j2 diff --git a/packages/sshecret-admin/src/sshecret_admin/templates/clients/field_valid.html.j2 b/packages/sshecret-admin/src/sshecret_admin/frontend/templates/clients/field_valid.html.j2 similarity index 100% rename from packages/sshecret-admin/src/sshecret_admin/templates/clients/field_valid.html.j2 rename to packages/sshecret-admin/src/sshecret_admin/frontend/templates/clients/field_valid.html.j2 diff --git a/packages/sshecret-admin/src/sshecret_admin/templates/clients/index.html.j2 b/packages/sshecret-admin/src/sshecret_admin/frontend/templates/clients/index.html.j2 similarity index 100% rename from packages/sshecret-admin/src/sshecret_admin/templates/clients/index.html.j2 rename to packages/sshecret-admin/src/sshecret_admin/frontend/templates/clients/index.html.j2 diff --git a/packages/sshecret-admin/src/sshecret_admin/templates/clients/inner.html.j2 b/packages/sshecret-admin/src/sshecret_admin/frontend/templates/clients/inner.html.j2 similarity index 100% rename from packages/sshecret-admin/src/sshecret_admin/templates/clients/inner.html.j2 rename to packages/sshecret-admin/src/sshecret_admin/frontend/templates/clients/inner.html.j2 diff --git a/packages/sshecret-admin/src/sshecret_admin/frontend/templates/dashboard.html b/packages/sshecret-admin/src/sshecret_admin/frontend/templates/dashboard.html new file mode 100644 index 0000000..1d719bd --- /dev/null +++ b/packages/sshecret-admin/src/sshecret_admin/frontend/templates/dashboard.html @@ -0,0 +1,89 @@ +{% extends "/dashboard/_base.html" %} {% block content %} + +
+

Welcome to Sshecret

+ +
+ +
+
+
+

Clients

+ {{ stats.clients }} +
+
+
+ + + +
+
+
+

New products

+ 2,340 +

+ + + 12.5% + + Since last month +

+
+
+
+
+
+

Users

+ 2,340 +

+ + + 3,4% + + Since last month +

+
+
+
+
+
+

Audience by age

+
+
50+
+
+
+
+
+
+
40+
+
+
+
+
+
+
30+
+
+
+
+
+
+
20+
+
+
+
+
+
+
+
+
+ + + + +{% endblock %} diff --git a/packages/sshecret-admin/src/sshecret_admin/templates/dashboard/_base.html b/packages/sshecret-admin/src/sshecret_admin/frontend/templates/dashboard/_base.html similarity index 100% rename from packages/sshecret-admin/src/sshecret_admin/templates/dashboard/_base.html rename to packages/sshecret-admin/src/sshecret_admin/frontend/templates/dashboard/_base.html diff --git a/packages/sshecret-admin/src/sshecret_admin/templates/dashboard/_favicons.html b/packages/sshecret-admin/src/sshecret_admin/frontend/templates/dashboard/_favicons.html similarity index 100% rename from packages/sshecret-admin/src/sshecret_admin/templates/dashboard/_favicons.html rename to packages/sshecret-admin/src/sshecret_admin/frontend/templates/dashboard/_favicons.html diff --git a/packages/sshecret-admin/src/sshecret_admin/templates/dashboard/_header.html b/packages/sshecret-admin/src/sshecret_admin/frontend/templates/dashboard/_header.html similarity index 100% rename from packages/sshecret-admin/src/sshecret_admin/templates/dashboard/_header.html rename to packages/sshecret-admin/src/sshecret_admin/frontend/templates/dashboard/_header.html diff --git a/packages/sshecret-admin/src/sshecret_admin/templates/dashboard/_scripts.html b/packages/sshecret-admin/src/sshecret_admin/frontend/templates/dashboard/_scripts.html similarity index 100% rename from packages/sshecret-admin/src/sshecret_admin/templates/dashboard/_scripts.html rename to packages/sshecret-admin/src/sshecret_admin/frontend/templates/dashboard/_scripts.html diff --git a/packages/sshecret-admin/src/sshecret_admin/templates/dashboard/_stylesheet.html b/packages/sshecret-admin/src/sshecret_admin/frontend/templates/dashboard/_stylesheet.html similarity index 100% rename from packages/sshecret-admin/src/sshecret_admin/templates/dashboard/_stylesheet.html rename to packages/sshecret-admin/src/sshecret_admin/frontend/templates/dashboard/_stylesheet.html diff --git a/packages/sshecret-admin/src/sshecret_admin/frontend/templates/dashboard/index.html.j2 b/packages/sshecret-admin/src/sshecret_admin/frontend/templates/dashboard/index.html.j2 new file mode 100644 index 0000000..e69de29 diff --git a/packages/sshecret-admin/src/sshecret_admin/templates/dashboard/navbar.html b/packages/sshecret-admin/src/sshecret_admin/frontend/templates/dashboard/navbar.html similarity index 100% rename from packages/sshecret-admin/src/sshecret_admin/templates/dashboard/navbar.html rename to packages/sshecret-admin/src/sshecret_admin/frontend/templates/dashboard/navbar.html diff --git a/packages/sshecret-admin/src/sshecret_admin/templates/dashboard/sidebar.html b/packages/sshecret-admin/src/sshecret_admin/frontend/templates/dashboard/sidebar.html similarity index 100% rename from packages/sshecret-admin/src/sshecret_admin/templates/dashboard/sidebar.html rename to packages/sshecret-admin/src/sshecret_admin/frontend/templates/dashboard/sidebar.html diff --git a/packages/sshecret-admin/src/sshecret_admin/templates/dashboard_old.html b/packages/sshecret-admin/src/sshecret_admin/frontend/templates/dashboard_old.html similarity index 100% rename from packages/sshecret-admin/src/sshecret_admin/templates/dashboard_old.html rename to packages/sshecret-admin/src/sshecret_admin/frontend/templates/dashboard_old.html diff --git a/packages/sshecret-admin/src/sshecret_admin/templates/fragments/error.html b/packages/sshecret-admin/src/sshecret_admin/frontend/templates/fragments/error.html similarity index 100% rename from packages/sshecret-admin/src/sshecret_admin/templates/fragments/error.html rename to packages/sshecret-admin/src/sshecret_admin/frontend/templates/fragments/error.html diff --git a/packages/sshecret-admin/src/sshecret_admin/templates/fragments/ok.html b/packages/sshecret-admin/src/sshecret_admin/frontend/templates/fragments/ok.html similarity index 100% rename from packages/sshecret-admin/src/sshecret_admin/templates/fragments/ok.html rename to packages/sshecret-admin/src/sshecret_admin/frontend/templates/fragments/ok.html diff --git a/packages/sshecret-admin/src/sshecret_admin/templates/login.html b/packages/sshecret-admin/src/sshecret_admin/frontend/templates/login.html similarity index 100% rename from packages/sshecret-admin/src/sshecret_admin/templates/login.html rename to packages/sshecret-admin/src/sshecret_admin/frontend/templates/login.html diff --git a/packages/sshecret-admin/src/sshecret_admin/templates/secrets/client_options.html.j2 b/packages/sshecret-admin/src/sshecret_admin/frontend/templates/secrets/client_options.html.j2 similarity index 100% rename from packages/sshecret-admin/src/sshecret_admin/templates/secrets/client_options.html.j2 rename to packages/sshecret-admin/src/sshecret_admin/frontend/templates/secrets/client_options.html.j2 diff --git a/packages/sshecret-admin/src/sshecret_admin/templates/secrets/drawer_secret_create.html.j2 b/packages/sshecret-admin/src/sshecret_admin/frontend/templates/secrets/drawer_secret_create.html.j2 similarity index 100% rename from packages/sshecret-admin/src/sshecret_admin/templates/secrets/drawer_secret_create.html.j2 rename to packages/sshecret-admin/src/sshecret_admin/frontend/templates/secrets/drawer_secret_create.html.j2 diff --git a/packages/sshecret-admin/src/sshecret_admin/templates/secrets/index.html.j2 b/packages/sshecret-admin/src/sshecret_admin/frontend/templates/secrets/index.html.j2 similarity index 100% rename from packages/sshecret-admin/src/sshecret_admin/templates/secrets/index.html.j2 rename to packages/sshecret-admin/src/sshecret_admin/frontend/templates/secrets/index.html.j2 diff --git a/packages/sshecret-admin/src/sshecret_admin/templates/secrets/inner.html.j2 b/packages/sshecret-admin/src/sshecret_admin/frontend/templates/secrets/inner.html.j2 similarity index 100% rename from packages/sshecret-admin/src/sshecret_admin/templates/secrets/inner.html.j2 rename to packages/sshecret-admin/src/sshecret_admin/frontend/templates/secrets/inner.html.j2 diff --git a/packages/sshecret-admin/src/sshecret_admin/templates/secrets/modal_client_secret.html.j2 b/packages/sshecret-admin/src/sshecret_admin/frontend/templates/secrets/modal_client_secret.html.j2 similarity index 100% rename from packages/sshecret-admin/src/sshecret_admin/templates/secrets/modal_client_secret.html.j2 rename to packages/sshecret-admin/src/sshecret_admin/frontend/templates/secrets/modal_client_secret.html.j2 diff --git a/packages/sshecret-admin/src/sshecret_admin/templates/secrets/secret.html.j2 b/packages/sshecret-admin/src/sshecret_admin/frontend/templates/secrets/secret.html.j2 similarity index 100% rename from packages/sshecret-admin/src/sshecret_admin/templates/secrets/secret.html.j2 rename to packages/sshecret-admin/src/sshecret_admin/frontend/templates/secrets/secret.html.j2 diff --git a/packages/sshecret-admin/src/sshecret_admin/templates/shared/_base.html b/packages/sshecret-admin/src/sshecret_admin/frontend/templates/shared/_base.html similarity index 100% rename from packages/sshecret-admin/src/sshecret_admin/templates/shared/_base.html rename to packages/sshecret-admin/src/sshecret_admin/frontend/templates/shared/_base.html diff --git a/packages/sshecret-admin/src/sshecret_admin/templates/shared/_dashboard.html b/packages/sshecret-admin/src/sshecret_admin/frontend/templates/shared/_dashboard.html similarity index 100% rename from packages/sshecret-admin/src/sshecret_admin/templates/shared/_dashboard.html rename to packages/sshecret-admin/src/sshecret_admin/frontend/templates/shared/_dashboard.html diff --git a/packages/sshecret-admin/src/sshecret_admin/templates/shared/_dashboard_save.html b/packages/sshecret-admin/src/sshecret_admin/frontend/templates/shared/_dashboard_save.html similarity index 100% rename from packages/sshecret-admin/src/sshecret_admin/templates/shared/_dashboard_save.html rename to packages/sshecret-admin/src/sshecret_admin/frontend/templates/shared/_dashboard_save.html diff --git a/packages/sshecret-admin/src/sshecret_admin/templates/success.html b/packages/sshecret-admin/src/sshecret_admin/frontend/templates/success.html similarity index 100% rename from packages/sshecret-admin/src/sshecret_admin/templates/success.html rename to packages/sshecret-admin/src/sshecret_admin/frontend/templates/success.html diff --git a/packages/sshecret-admin/src/sshecret_admin/frontend/templates/widgets/clients.html b/packages/sshecret-admin/src/sshecret_admin/frontend/templates/widgets/clients.html new file mode 100644 index 0000000..e69de29 diff --git a/packages/sshecret-admin/src/sshecret_admin/frontend/views/__init__.py b/packages/sshecret-admin/src/sshecret_admin/frontend/views/__init__.py new file mode 100644 index 0000000..f74c036 --- /dev/null +++ b/packages/sshecret-admin/src/sshecret_admin/frontend/views/__init__.py @@ -0,0 +1 @@ +"""Frontend views.""" diff --git a/packages/sshecret-admin/src/sshecret_admin/views/audit.py b/packages/sshecret-admin/src/sshecret_admin/frontend/views/audit.py similarity index 68% rename from packages/sshecret-admin/src/sshecret_admin/views/audit.py rename to packages/sshecret-admin/src/sshecret_admin/frontend/views/audit.py index a9333de..0efcfae 100644 --- a/packages/sshecret-admin/src/sshecret_admin/views/audit.py +++ b/packages/sshecret-admin/src/sshecret_admin/frontend/views/audit.py @@ -1,19 +1,20 @@ -"""Audit view.""" -# pyright: reportUnusedFunction=false +"""Audit view factory.""" -import math +# pyright: reportUnusedFunction=false import logging +import math from typing import Annotated from fastapi import APIRouter, Depends, Request, Response -from jinja2_fragments.fastapi import Jinja2Blocks from pydantic import BaseModel -from sshecret_admin.admin_backend import AdminBackend -from sshecret_admin.types import UserTokenDep, AdminDep -from sshecret_admin.auth_models import User +from sshecret_admin.auth import User +from sshecret_admin.services import AdminBackend + +from ..dependencies import FrontendDependencies LOG = logging.getLogger(__name__) + class PagingInfo(BaseModel): page: int @@ -36,20 +37,15 @@ class PagingInfo(BaseModel): """Return total pages.""" return math.ceil(self.total / self.limit) -def create_audit_view( - templates: Jinja2Blocks, - get_current_user_from_token: UserTokenDep, - get_admin_backend: AdminDep, -) -> APIRouter: - """Create client view.""" + +def create_router(dependencies: FrontendDependencies) -> APIRouter: + """Create clients router.""" app = APIRouter() + templates = dependencies.templates async def resolve_audit_entries( - request: Request, - current_user: User, - admin: AdminBackend, - page: int + request: Request, current_user: User, admin: AdminBackend, page: int ) -> Response: """Resolve audit entries.""" LOG.info("Page: %r", page) @@ -61,7 +57,9 @@ def create_audit_view( entries = await admin.get_audit_log(offset=offset, limit=per_page) LOG.info("Entries: %r", entries) - page_info = PagingInfo(page=page, limit=per_page, total=total_messages, offset=offset) + page_info = PagingInfo( + page=page, limit=per_page, total=total_messages, offset=offset + ) if request.headers.get("HX-Request"): return templates.TemplateResponse( request, @@ -69,8 +67,7 @@ def create_audit_view( { "entries": entries, "page_info": page_info, - } - + }, ) return templates.TemplateResponse( request, @@ -80,34 +77,27 @@ def create_audit_view( "entries": entries, "user": current_user.username, "page_info": page_info, - - } + }, ) - @app.get("/audit/") async def get_audit_entries( request: Request, - current_user: Annotated[User, Depends(get_current_user_from_token)], - admin: Annotated[AdminBackend, Depends(get_admin_backend)], - ): + current_user: Annotated[User, Depends(dependencies.get_user_from_access_token)], + admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], + ) -> Response: """Get audit entries.""" return await resolve_audit_entries(request, current_user, admin, 1) @app.get("/audit/page/{page}") async def get_audit_entries_page( request: Request, - current_user: Annotated[User, Depends(get_current_user_from_token)], - admin: Annotated[AdminBackend, Depends(get_admin_backend)], + current_user: Annotated[User, Depends(dependencies.get_user_from_access_token)], + admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], page: int, - ): + ) -> Response: """Get audit entries.""" LOG.info("Get audit entries page: %r", page) return await resolve_audit_entries(request, current_user, admin, page) - - - # --------------# - # END OF ROUTES # - # --------------# return app diff --git a/packages/sshecret-admin/src/sshecret_admin/frontend/views/auth.py b/packages/sshecret-admin/src/sshecret_admin/frontend/views/auth.py new file mode 100644 index 0000000..fbf896a --- /dev/null +++ b/packages/sshecret-admin/src/sshecret_admin/frontend/views/auth.py @@ -0,0 +1,143 @@ +"""Authentication related views factory.""" + +# pyright: reportUnusedFunction=false +import logging +from pydantic import BaseModel +from typing import Annotated +from fastapi import APIRouter, Depends, Query, Request, Response, status +from fastapi.responses import RedirectResponse +from fastapi.security import OAuth2PasswordRequestForm +from sqlmodel import Session +from starlette.datastructures import URL + +from sshecret_admin.auth import ( + User, + authenticate_user, + create_access_token, + create_refresh_token, +) + +from ..dependencies import FrontendDependencies +from ..exceptions import RedirectException + +LOG = logging.getLogger(__name__) + + +class LoginError(BaseModel): + """Login error.""" + + title: str + message: str + + +def create_router(dependencies: FrontendDependencies) -> APIRouter: + """Create auth router.""" + + app = APIRouter() + templates = dependencies.templates + + @app.get("/login") + async def get_login( + request: Request, + login_status: Annotated[bool, Depends(dependencies.get_login_status)], + error_title: str | None = None, + error_message: str | None = None, + ): + """Get index.""" + if login_status: + return RedirectResponse("/dashboard") + login_error: LoginError | None = None + if error_title and error_message: + login_error = LoginError(title=error_title, message=error_message) + return templates.TemplateResponse( + request, + "login.html", + { + "page_title": "Login", + "page_description": "Login page.", + "login_error": login_error, + }, + ) + + @app.post("/login") + async def login_user( + request: Request, + response: Response, + session: Annotated[Session, Depends(dependencies.get_db_session)], + form_data: Annotated[OAuth2PasswordRequestForm, Depends()], + next: Annotated[str, Query()] = "/dashboard", + error_title: str | None = None, + error_message: str | None = None, + ): + """Log in user.""" + if error_title and error_message: + login_error = LoginError(title=error_title, message=error_message) + return templates.TemplateResponse( + request, + "login.html", + { + "page_title": "Login", + "page_description": "Login page.", + "login_error": login_error, + }, + ) + + user = authenticate_user(session, form_data.username, form_data.password) + login_failed = RedirectException( + to=URL("/login").include_query_params( + error_title="Login Error", error_message="Invalid username or password" + ) + ) + if not user: + raise login_failed + token_data: dict[str, str] = {"sub": user.username} + access_token = create_access_token(dependencies.settings, data=token_data) + refresh_token = create_refresh_token(dependencies.settings, data=token_data) + response = RedirectResponse(url=next, status_code=status.HTTP_302_FOUND) + response.set_cookie( + "access_token", + value=access_token, + httponly=True, + secure=False, + samesite="strict", + ) + response.set_cookie( + "refresh_token", + value=refresh_token, + httponly=True, + secure=False, + samesite="strict", + ) + return response + + @app.get("/refresh") + async def get_refresh_token( + response: Response, + user: Annotated[User, Depends(dependencies.get_user_from_refresh_token)], + next: Annotated[str, Query()], + ): + """Refresh tokens. + + We might as well refresh the long-lived one here. + """ + token_data: dict[str, str] = {"sub": user.username} + access_token = create_access_token(dependencies.settings, data=token_data) + refresh_token = create_refresh_token(dependencies.settings, data=token_data) + response = RedirectResponse(url=next, status_code=status.HTTP_302_FOUND) + response.set_cookie( + "access_token", + value=access_token, + httponly=True, + secure=False, + samesite="strict", + ) + response.set_cookie( + "refresh_token", + value=refresh_token, + httponly=True, + secure=False, + samesite="strict", + ) + return response + + return app diff --git a/packages/sshecret-admin/src/sshecret_admin/views/clients.py b/packages/sshecret-admin/src/sshecret_admin/frontend/views/clients.py similarity index 78% rename from packages/sshecret-admin/src/sshecret_admin/views/clients.py rename to packages/sshecret-admin/src/sshecret_admin/frontend/views/clients.py index 431f183..81fa535 100644 --- a/packages/sshecret-admin/src/sshecret_admin/views/clients.py +++ b/packages/sshecret-admin/src/sshecret_admin/frontend/views/clients.py @@ -1,21 +1,20 @@ -"""Client views.""" +"""clients view factory.""" # pyright: reportUnusedFunction=false - import ipaddress import logging import uuid from typing import Annotated -from fastapi import APIRouter, Depends, Request, Form -from jinja2_fragments.fastapi import Jinja2Blocks - +from fastapi import APIRouter, Depends, Form, Request, Response from pydantic import BaseModel, IPvAnyAddress, IPvAnyNetwork -from sshecret_admin.admin_backend import AdminBackend + from sshecret.backend import ClientFilter from sshecret.backend.models import FilterType from sshecret.crypto import validate_public_key -from sshecret_admin.types import UserTokenDep, AdminDep -from sshecret_admin.auth_models import User +from sshecret_admin.auth import User +from sshecret_admin.services import AdminBackend + +from ..dependencies import FrontendDependencies LOG = logging.getLogger(__name__) @@ -37,21 +36,18 @@ class ClientCreate(BaseModel): sources: str | None -def create_client_view( - templates: Jinja2Blocks, - get_current_user_from_token: UserTokenDep, - get_admin_backend: AdminDep, -) -> APIRouter: - """Create client view.""" +def create_router(dependencies: FrontendDependencies) -> APIRouter: + """Create clients router.""" app = APIRouter() + templates = dependencies.templates @app.get("/clients") async def get_clients( request: Request, - current_user: Annotated[User, Depends(get_current_user_from_token)], - admin: Annotated[AdminBackend, Depends(get_admin_backend)], - ): + current_user: Annotated[User, Depends(dependencies.get_user_from_access_token)], + admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], + ) -> Response: """Get clients.""" clients = await admin.get_clients() LOG.info("Clients %r", clients) @@ -68,10 +64,12 @@ def create_client_view( @app.post("/clients/query") async def query_clients( request: Request, - _current_user: Annotated[User, Depends(get_current_user_from_token)], - admin: Annotated[AdminBackend, Depends(get_admin_backend)], + _current_user: Annotated[ + User, Depends(dependencies.get_user_from_access_token) + ], + admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], query: Annotated[str, Form()], - ): + ) -> Response: """Query for a client.""" query_filter: ClientFilter | None = None if query: @@ -90,8 +88,10 @@ def create_client_view( async def update_client( request: Request, id: str, - _current_user: Annotated[User, Depends(get_current_user_from_token)], - admin: Annotated[AdminBackend, Depends(get_admin_backend)], + _current_user: Annotated[ + User, Depends(dependencies.get_user_from_access_token) + ], + admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], client: Annotated[ClientUpdate, Form()], ): """Update a client.""" @@ -135,9 +135,11 @@ def create_client_view( async def delete_client( request: Request, id: str, - _current_user: Annotated[User, Depends(get_current_user_from_token)], - admin: Annotated[AdminBackend, Depends(get_admin_backend)], - ): + _current_user: Annotated[ + User, Depends(dependencies.get_user_from_access_token) + ], + admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], + ) -> Response: """Delete a client.""" await admin.delete_client(id) clients = await admin.get_clients() @@ -154,10 +156,12 @@ def create_client_view( @app.post("/clients/") async def create_client( request: Request, - _current_user: Annotated[User, Depends(get_current_user_from_token)], - admin: Annotated[AdminBackend, Depends(get_admin_backend)], + _current_user: Annotated[ + User, Depends(dependencies.get_user_from_access_token) + ], + admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], client: Annotated[ClientCreate, Form()], - ): + ) -> Response: """Create client.""" sources: list[str] | None = None if client.sources: @@ -179,9 +183,11 @@ def create_client_view( @app.post("/clients/validate/source") async def validate_client_source( request: Request, - _current_user: Annotated[User, Depends(get_current_user_from_token)], + _current_user: Annotated[ + User, Depends(dependencies.get_user_from_access_token) + ], sources: Annotated[str, Form()], - ): + ) -> Response: """Validate source.""" source_str = sources.split(",") for source in source_str: @@ -211,9 +217,11 @@ def create_client_view( @app.post("/clients/validate/public_key") async def validate_client_public_key( request: Request, - _current_user: Annotated[User, Depends(get_current_user_from_token)], + _current_user: Annotated[ + User, Depends(dependencies.get_user_from_access_token) + ], public_key: Annotated[str, Form()], - ): + ) -> Response: """Validate source.""" if validate_public_key(public_key.rstrip()): return templates.TemplateResponse( diff --git a/packages/sshecret-admin/src/sshecret_admin/frontend/views/index.py b/packages/sshecret-admin/src/sshecret_admin/frontend/views/index.py new file mode 100644 index 0000000..8404233 --- /dev/null +++ b/packages/sshecret-admin/src/sshecret_admin/frontend/views/index.py @@ -0,0 +1,70 @@ +"""Front page view factory.""" + +# pyright: reportUnusedFunction=false +import logging +from typing import Annotated +from fastapi import APIRouter, Depends, Request +from fastapi.responses import RedirectResponse +from pydantic import BaseModel +from sshecret_admin.auth import User +from sshecret_admin.services import AdminBackend + +from ..dependencies import FrontendDependencies + +LOG = logging.getLogger(__name__) + +START_PAGE = "/dashboard" +LOGIN_PAGE = "/login" + + +class StatsView(BaseModel): + """Stats for the frontend.""" + + clients: int = 0 + secrets: int = 0 + audit_events: int = 0 + + +async def get_stats(admin: AdminBackend) -> StatsView: + """Get stats for the frontpage.""" + clients = await admin.get_clients() + secrets = await admin.get_secrets() + audit = await admin.get_audit_log_count() + return StatsView(clients=len(clients), secrets=len(secrets), audit_events=audit) + + +def create_router(dependencies: FrontendDependencies) -> APIRouter: + """Create auth router.""" + + app = APIRouter() + templates = dependencies.templates + + @app.get("/") + def get_index(logged_in: Annotated[bool, Depends(dependencies.get_login_status)]): + """Get the index.""" + next = LOGIN_PAGE + if logged_in: + next = START_PAGE + + return RedirectResponse(url=next) + + @app.get("/dashboard") + async def get_dashboard( + request: Request, + admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], + current_user: Annotated[User, Depends(dependencies.get_user_from_access_token)], + ): + """Dashboard for mocking up the dashboard.""" + stats = await get_stats(admin) + + return templates.TemplateResponse( + request, + "dashboard.html", + { + "page_title": "sshecret", + "user": current_user.username, + "stats": stats, + }, + ) + + return app diff --git a/packages/sshecret-admin/src/sshecret_admin/views/secrets.py b/packages/sshecret-admin/src/sshecret_admin/frontend/views/secrets.py similarity index 72% rename from packages/sshecret-admin/src/sshecret_admin/views/secrets.py rename to packages/sshecret-admin/src/sshecret_admin/frontend/views/secrets.py index 4d75729..82e1691 100644 --- a/packages/sshecret-admin/src/sshecret_admin/views/secrets.py +++ b/packages/sshecret-admin/src/sshecret_admin/frontend/views/secrets.py @@ -1,24 +1,25 @@ -"""Secrets view.""" +#!/usr/bin/env python3 + # pyright: reportUnusedFunction=false import logging import secrets as pysecrets from typing import Annotated, Any -from fastapi import APIRouter, Depends, Request, Form -from jinja2_fragments.fastapi import Jinja2Blocks - +from fastapi import APIRouter, Depends, Form, Request from pydantic import BaseModel, BeforeValidator, Field -from sshecret_admin.admin_backend import AdminBackend -from sshecret_admin.types import UserTokenDep, AdminDep -from sshecret_admin.auth_models import User + +from sshecret_admin.auth import User +from sshecret_admin.services import AdminBackend + +from ..dependencies import FrontendDependencies LOG = logging.getLogger(__name__) -def split_clients(clients: Any) -> Any: +def split_clients(clients: Any) -> Any: # pyright: ignore[reportAny] """Split clients.""" if isinstance(clients, list): - return clients + return clients # pyright: ignore[reportUnknownVariableType] if not isinstance(clients, str): raise ValueError("Invalid type for clients.") if not clients: @@ -26,7 +27,7 @@ def split_clients(clients: Any) -> Any: return [client.rstrip() for client in clients.split(",")] -def handle_select_bool(value: Any) -> Any: +def handle_select_bool(value: Any) -> Any: # pyright: ignore[reportAny] """Handle boolean from select.""" if isinstance(value, bool): return value @@ -47,20 +48,17 @@ class CreateSecret(BaseModel): ) -def create_secrets_view( - templates: Jinja2Blocks, - get_current_user_from_token: UserTokenDep, - get_admin_backend: AdminDep, -) -> APIRouter: - """Create secrets view.""" +def create_router(dependencies: FrontendDependencies) -> APIRouter: + """Create secrets router.""" app = APIRouter() + templates = dependencies.templates @app.get("/secrets/") async def get_secrets( request: Request, - current_user: Annotated[User, Depends(get_current_user_from_token)], - admin: Annotated[AdminBackend, Depends(get_admin_backend)], + current_user: Annotated[User, Depends(dependencies.get_user_from_access_token)], + admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], ): """Get secrets index page.""" secrets = await admin.get_detailed_secrets() @@ -79,8 +77,10 @@ def create_secrets_view( @app.post("/secrets/") async def add_secret( request: Request, - _current_user: Annotated[User, Depends(get_current_user_from_token)], - admin: Annotated[AdminBackend, Depends(get_admin_backend)], + _current_user: Annotated[ + User, Depends(dependencies.get_user_from_access_token) + ], + admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], secret: Annotated[CreateSecret, Form()], ): """Add secret.""" @@ -108,8 +108,10 @@ def create_secrets_view( request: Request, name: str, id: str, - _current_user: Annotated[User, Depends(get_current_user_from_token)], - admin: Annotated[AdminBackend, Depends(get_admin_backend)], + _current_user: Annotated[ + User, Depends(dependencies.get_user_from_access_token) + ], + admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], ): """Remove a client's access to a secret.""" await admin.delete_client_secret(id, name) @@ -130,8 +132,10 @@ def create_secrets_view( request: Request, name: str, client: Annotated[str, Form()], - _current_user: Annotated[User, Depends(get_current_user_from_token)], - admin: Annotated[AdminBackend, Depends(get_admin_backend)], + _current_user: Annotated[ + User, Depends(dependencies.get_user_from_access_token) + ], + admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], ): """Add a secret to a client.""" await admin.create_client_secret(client, name) @@ -153,8 +157,10 @@ def create_secrets_view( async def delete_secret( request: Request, name: str, - _current_user: Annotated[User, Depends(get_current_user_from_token)], - admin: Annotated[AdminBackend, Depends(get_admin_backend)], + _current_user: Annotated[ + User, Depends(dependencies.get_user_from_access_token) + ], + admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], ): """Delete a secret.""" await admin.delete_secret(name) @@ -172,7 +178,4 @@ def create_secrets_view( headers=headers, ) - # --------------# - # END OF ROUTES # - # --------------# return app diff --git a/packages/sshecret-admin/src/sshecret_admin/services/__init__.py b/packages/sshecret-admin/src/sshecret_admin/services/__init__.py new file mode 100644 index 0000000..8cbc690 --- /dev/null +++ b/packages/sshecret-admin/src/sshecret_admin/services/__init__.py @@ -0,0 +1,8 @@ +"""Services module. + +This module contains business logic. +""" + +from .admin_backend import AdminBackend + +__all__ = ["AdminBackend"] diff --git a/packages/sshecret-admin/src/sshecret_admin/admin_backend.py b/packages/sshecret-admin/src/sshecret_admin/services/admin_backend.py similarity index 92% rename from packages/sshecret-admin/src/sshecret_admin/admin_backend.py rename to packages/sshecret-admin/src/sshecret_admin/services/admin_backend.py index 7a7c5cc..f320b24 100644 --- a/packages/sshecret-admin/src/sshecret_admin/admin_backend.py +++ b/packages/sshecret-admin/src/sshecret_admin/services/admin_backend.py @@ -7,13 +7,14 @@ import logging from collections.abc import Iterator from contextlib import contextmanager -from sshecret.backend import AuditLog, Client, ClientFilter, Secret, SshecretBackend +from sshecret.backend import AuditLog, Client, ClientFilter, Secret, SshecretBackend, Operation, SubSystem from sshecret.backend.models import DetailedSecrets +from sshecret.backend.api import AuditAPI from sshecret.crypto import encrypt_string, load_public_key from .keepass import PasswordContext, load_password_manager -from .settings import AdminServerSettings -from .view_models import SecretView +from sshecret_admin.core.settings import AdminServerSettings +from .models import SecretView class ClientManagementError(Exception): @@ -381,6 +382,11 @@ class AdminBackend: except Exception as e: raise BackendUnavailableError() from e + @property + def audit(self) -> AuditAPI: + """Resolve audit API.""" + return self.backend.audit(SubSystem.ADMIN) + async def get_audit_log( self, offset: int = 0, @@ -389,14 +395,36 @@ class AdminBackend: subsystem: str | None = None, ) -> list[AuditLog]: """Get audit log from backend.""" - return await self.backend.get_audit_log(offset, limit, client_name, subsystem) + return await self.audit.get(offset, limit, client_name, subsystem) + + async def write_audit_message( + self, + operation: Operation, + message: str, + origin: str, + client: Client | None = None, + secret_name: str | None = None, + **data: str, + ) -> None: + """Write an audit message.""" + await self.audit.write_async( + operation=operation, + message=message, + origin=origin, + client=client, + secret=None, + secret_name=secret_name, + **data, + ) async def write_audit_log(self, entry: AuditLog) -> None: """Write to the audit log.""" if not entry.subsystem: - entry.subsystem = "admin" - await self.backend.add_audit_log(entry) + entry.subsystem = SubSystem.ADMIN + + await self.audit.write_model_async(entry) + #await self.backend.add_audit_log(entry) async def get_audit_log_count(self) -> int: """Get audit log count.""" - return await self.backend.get_audit_log_count() + return await self.audit.count() diff --git a/packages/sshecret-admin/src/sshecret_admin/keepass.py b/packages/sshecret-admin/src/sshecret_admin/services/keepass.py similarity index 98% rename from packages/sshecret-admin/src/sshecret_admin/keepass.py rename to packages/sshecret-admin/src/sshecret_admin/services/keepass.py index a60ca15..f8682a3 100644 --- a/packages/sshecret-admin/src/sshecret_admin/keepass.py +++ b/packages/sshecret-admin/src/sshecret_admin/services/keepass.py @@ -8,7 +8,7 @@ from typing import cast import pykeepass from .master_password import decrypt_master_password -from .settings import AdminServerSettings +from sshecret_admin.core.settings import AdminServerSettings LOG = logging.getLogger(__name__) diff --git a/packages/sshecret-admin/src/sshecret_admin/master_password.py b/packages/sshecret-admin/src/sshecret_admin/services/master_password.py similarity index 97% rename from packages/sshecret-admin/src/sshecret_admin/master_password.py rename to packages/sshecret-admin/src/sshecret_admin/services/master_password.py index 68f3e41..165812a 100644 --- a/packages/sshecret-admin/src/sshecret_admin/master_password.py +++ b/packages/sshecret-admin/src/sshecret_admin/services/master_password.py @@ -8,7 +8,7 @@ from sshecret.crypto import ( encrypt_string, decode_string, ) -from .settings import AdminServerSettings +from sshecret_admin.core.settings import AdminServerSettings KEY_FILENAME = "sshecret-admin-key" diff --git a/packages/sshecret-admin/src/sshecret_admin/view_models.py b/packages/sshecret-admin/src/sshecret_admin/services/models.py similarity index 97% rename from packages/sshecret-admin/src/sshecret_admin/view_models.py rename to packages/sshecret-admin/src/sshecret_admin/services/models.py index 21e40b8..2b8c073 100644 --- a/packages/sshecret-admin/src/sshecret_admin/view_models.py +++ b/packages/sshecret-admin/src/sshecret_admin/services/models.py @@ -1,7 +1,7 @@ """Models for the API.""" import secrets -from typing import Annotated, Literal, Self, Union +from typing import Annotated, Literal from pydantic import ( AfterValidator, BaseModel, @@ -9,7 +9,6 @@ from pydantic import ( Field, IPvAnyAddress, IPvAnyNetwork, - model_validator, ) from sshecret.crypto import validate_public_key diff --git a/packages/sshecret-admin/src/sshecret_admin/templates/dashboard.html b/packages/sshecret-admin/src/sshecret_admin/templates/dashboard.html deleted file mode 100644 index 0a19283..0000000 --- a/packages/sshecret-admin/src/sshecret_admin/templates/dashboard.html +++ /dev/null @@ -1,10 +0,0 @@ -{% extends "/dashboard/_base.html" %} {% block content %} - -
-

Welcome to Sshecret

-
- - -{% endblock %} diff --git a/packages/sshecret-admin/src/sshecret_admin/testing.py b/packages/sshecret-admin/src/sshecret_admin/testing.py index d180e1c..d81ae4d 100644 --- a/packages/sshecret-admin/src/sshecret_admin/testing.py +++ b/packages/sshecret-admin/src/sshecret_admin/testing.py @@ -4,8 +4,7 @@ import os import bcrypt -from sqlalchemy import Engine -from sqlmodel import Session, select +from sqlmodel import Session from .auth_models import User diff --git a/packages/sshecret-admin/src/sshecret_admin/types.py b/packages/sshecret-admin/src/sshecret_admin/types.py index 15973fd..41a1485 100644 --- a/packages/sshecret-admin/src/sshecret_admin/types.py +++ b/packages/sshecret-admin/src/sshecret_admin/types.py @@ -6,16 +6,10 @@ from fastapi import Request from sqlmodel import Session from sshecret_admin.admin_backend import AdminBackend from sshecret_admin.auth_models import User -from sshecret.backend import SshecretBackend -from . import keepass DBSessionDep = Callable[[], Generator[Session, None, None]] -BackendDep = Callable[[], AsyncGenerator[SshecretBackend, None]] - -PasswdCtxDep = Callable[[DBSessionDep], AsyncGenerator[keepass.PasswordContext, None]] - AdminDep = Callable[[Session], AsyncGenerator[AdminBackend, None]] UserTokenDep = Callable[[Request, Session], Awaitable[User]] diff --git a/packages/sshecret-admin/src/sshecret_admin/views/__init__.py b/packages/sshecret-admin/src/sshecret_admin/views/__init__.py deleted file mode 100644 index 7b27f66..0000000 --- a/packages/sshecret-admin/src/sshecret_admin/views/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -from .audit import create_audit_view -from .clients import create_client_view -from .secrets import create_secrets_view - -__all__ = ["create_audit_view", "create_client_view", "create_secrets_view"] diff --git a/packages/sshecret-admin/static/index.html b/packages/sshecret-admin/static/index.html deleted file mode 100644 index 5cdf42a..0000000 --- a/packages/sshecret-admin/static/index.html +++ /dev/null @@ -1,24 +0,0 @@ - - - - - - Untitled - - - - - - - - - -

I am outside of the package

- -