diff --git a/packages/sshecret-admin/pyproject.toml b/packages/sshecret-admin/pyproject.toml index 73c1c3c..17abda3 100644 --- a/packages/sshecret-admin/pyproject.toml +++ b/packages/sshecret-admin/pyproject.toml @@ -19,10 +19,14 @@ dependencies = [ "pyjwt>=2.10.1", "pykeepass>=4.1.1.post1", "sqlmodel>=0.0.24", + "sshecret", ] +[tool.uv.sources] +sshecret = { workspace = true } + [project.scripts] -sshecret-admin = "sshecret_admin.cli:cli" +sshecret-admin = "sshecret_admin.core.cli:cli" [build-system] requires = ["hatchling"] @@ -31,4 +35,5 @@ build-backend = "hatchling.build" [dependency-groups] dev = [ "pytailwindcss>=0.2.0", + "types-pyjwt>=1.7.1", ] diff --git a/packages/sshecret-admin/src/sshecret_admin/admin_api.py b/packages/sshecret-admin/src/sshecret_admin/admin_api.py deleted file mode 100644 index 1cf9d9a..0000000 --- a/packages/sshecret-admin/src/sshecret_admin/admin_api.py +++ /dev/null @@ -1,284 +0,0 @@ -"""Admin API.""" - -# pyright: reportUnusedFunction=false - -import logging -from collections import defaultdict -from datetime import timedelta -from typing import Annotated - -import jwt -from fastapi import APIRouter, Depends, HTTPException, status -from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm -from sqlmodel import Session, select -from sshecret.backend import Client, SshecretBackend -from sshecret.backend.models import Secret - -from .admin_backend import AdminBackend -from .auth_models import ( - PasswordDB, - Token, - TokenData, - User, - create_access_token, - verify_password, -) -from .settings import AdminServerSettings -from .types import DBSessionDep -from .view_models import ( - ClientCreate, - SecretCreate, - SecretUpdate, - SecretView, - UpdateKeyModel, - UpdateKeyResponse, - UpdatePoliciesRequest, -) - -LOG = logging.getLogger(__name__) - - -API_VERSION = "v1" -JWT_ALGORITHM = "HS256" -ACCESS_TOKEN_EXPIRE_MINUTES = 30 - - -def authenticate_user(session: Session, username: str, password: str) -> User | None: - """Authenticate user.""" - user = session.exec(select(User).where(User.username == username)).first() - if not user: - return None - if not verify_password(password, user.hashed_password): - return None - return user - - -async def map_secrets_to_clients( - backend: SshecretBackend, -) -> defaultdict[str, list[str]]: - """Map secrets to clients.""" - clients = await backend.get_clients() - client_secret_map: defaultdict[str, list[str]] = defaultdict(list) - for client in clients: - for secret in client.secrets: - client_secret_map[secret].append(client.name) - return client_secret_map - - -def get_admin_api( - get_db_session: DBSessionDep, settings: AdminServerSettings -) -> APIRouter: - """Get Admin API.""" - oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") - - async def get_admin_backend(session: Annotated[Session, Depends(get_db_session)]): - """Get admin backend API.""" - password_db = session.exec(select(PasswordDB).where(PasswordDB.id == 1)).first() - if not password_db: - raise HTTPException( - 500, detail="Error: The password manager has not yet been set up." - ) - admin = AdminBackend(settings, password_db.encrypted_password) - yield admin - - async def get_current_user( - token: Annotated[str, Depends(oauth2_scheme)], - session: Annotated[Session, Depends(get_db_session)], - ) -> User: - """Get current user from token.""" - credentials_exception = HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail="Could not validate credentials", - headers={"WWW-Authenticate": "Bearer"}, - ) - try: - payload = jwt.decode(token, settings.secret_key, algorithms=[JWT_ALGORITHM]) - username = payload.get("sub") - if not username: - raise credentials_exception - token_data = TokenData(username=username) - except jwt.InvalidTokenError: - raise credentials_exception - - user = session.exec( - select(User).where(User.username == token_data.username) - ).first() - if not user: - raise credentials_exception - return user - - async def get_current_active_user( - current_user: Annotated[User, Depends(get_current_user)], - ) -> User: - """Get current active user.""" - if current_user.disabled: - raise HTTPException(status_code=400, detail="Inactive or disabled user") - return current_user - - app = APIRouter( - prefix=f"/api/{API_VERSION}", dependencies=[Depends(get_current_active_user)] - ) - - @app.post("/token") - async def login_for_access_token( - session: Annotated[Session, Depends(get_db_session)], - form_data: Annotated[OAuth2PasswordRequestForm, Depends()], - ) -> Token: - """Login user and generate token.""" - user = authenticate_user(session, form_data.username, form_data.password) - if not user: - raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail="Incorrect username or password", - headers={"WWW-Authenticate": "Bearer"}, - ) - access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) - access_token = create_access_token( - settings, - data={"sub": user.username}, - expires_delta=access_token_expires, - ) - return Token(access_token=access_token, token_type="bearer") - - @app.get("/clients/") - async def get_clients( - admin: Annotated[AdminBackend, Depends(get_admin_backend)] - ) -> list[Client]: - """Get clients.""" - clients = await admin.get_clients() - return clients - - @app.post("/clients/") - async def create_client( - new_client: ClientCreate, - admin: Annotated[AdminBackend, Depends(get_admin_backend)], - ) -> Client: - """Create a new client.""" - sources: list[str] | None = None - if new_client.sources: - sources = [str(source) for source in new_client.sources] - client = await admin.create_client( - new_client.name, new_client.public_key, sources - ) - return client - - @app.delete("/clients/{name}") - async def delete_client( - name: str, admin: Annotated[AdminBackend, Depends(get_admin_backend)] - ) -> None: - """Delete a client.""" - await admin.delete_client(name) - - @app.delete("/clients/{name}/secrets/{secret_name}") - async def delete_secret_from_client( - name: str, - secret_name: str, - admin: Annotated[AdminBackend, Depends(get_admin_backend)], - ) -> None: - """Delete a secret from a client.""" - client = await admin.get_client(name) - if not client: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, detail="Item not found" - ) - - if secret_name not in client.secrets: - LOG.debug("Client does not have requested secret. No action to perform.") - return None - - await admin.delete_client_secret(name, secret_name) - - @app.put("/clients/{name}/policies") - async def update_client_policies( - name: str, - updated: UpdatePoliciesRequest, - admin: Annotated[AdminBackend, Depends(get_admin_backend)], - ) -> Client: - """Update the client access policies.""" - client = await admin.get_client(name) - if not client: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, detail="Item not found" - ) - - LOG.debug("Old policies: %r. New: %r", client.policies, updated.sources) - - addresses: list[str] = [str(source) for source in updated.sources] - await admin.update_client_sources(name, addresses) - client = await admin.get_client(name) - - assert client is not None, "Critical: The client disappeared after update!" - - return client - - @app.get("/secrets/") - async def get_secret_names( - admin: Annotated[AdminBackend, Depends(get_admin_backend)] - ) -> list[Secret]: - """Get Secret Names.""" - return await admin.get_secrets() - - @app.post("/secrets/") - async def add_secret( - secret: SecretCreate, admin: Annotated[AdminBackend, Depends(get_admin_backend)] - ) -> None: - """Create a secret.""" - await admin.add_secret(secret.name, secret.get_secret(), secret.clients) - - @app.get("/secrets/{name}") - async def get_secret( - name: str, admin: Annotated[AdminBackend, Depends(get_admin_backend)] - ) -> SecretView: - """Get a secret.""" - secret_view = await admin.get_secret(name) - - if not secret_view: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, detail="Item not found." - ) - return secret_view - - @app.put("/secrets/{name}") - async def update_secret( - name: str, - value: SecretUpdate, - admin: Annotated[AdminBackend, Depends(get_admin_backend)], - ) -> None: - new_value = value.get_secret() - await admin.update_secret(name, new_value) - - @app.delete("/secrets/{name}") - async def delete_secret( - name: str, - admin: Annotated[AdminBackend, Depends(get_admin_backend)], - ) -> None: - """Delete secret.""" - await admin.delete_secret(name) - - @app.put("/clients/{name}/public-key") - async def update_client_public_key( - name: str, - updated: UpdateKeyModel, - admin: Annotated[AdminBackend, Depends(get_admin_backend)], - ) -> UpdateKeyResponse: - """Update client public key. - - Updating the public key will invalidate the current secrets, so these well - be resolved first, and re-encrypted using the new key. - """ - # Let's first ensure that the key is actually updated. - updated_secrets = await admin.update_client_public_key(name, updated.public_key) - return UpdateKeyResponse( - public_key=updated.public_key, updated_secrets=updated_secrets - ) - - @app.put("/clients/{name}/secrets/{secret_name}") - async def add_secret_to_client( - name: str, - secret_name: str, - admin: Annotated[AdminBackend, Depends(get_admin_backend)], - ) -> None: - """Add secret to a client.""" - await admin.create_client_secret(name, secret_name) - - return app diff --git a/packages/sshecret-admin/src/sshecret_admin/api/__init__.py b/packages/sshecret-admin/src/sshecret_admin/api/__init__.py new file mode 100644 index 0000000..d4840e7 --- /dev/null +++ b/packages/sshecret-admin/src/sshecret_admin/api/__init__.py @@ -0,0 +1,5 @@ +"""Admin REST API.""" + +from .router import create_router as create_api_router + +__all__ = ["create_api_router"] diff --git a/packages/sshecret-admin/src/sshecret_admin/api/endpoints/__init__.py b/packages/sshecret-admin/src/sshecret_admin/api/endpoints/__init__.py new file mode 100644 index 0000000..13d4058 --- /dev/null +++ b/packages/sshecret-admin/src/sshecret_admin/api/endpoints/__init__.py @@ -0,0 +1 @@ +"""API Endpoints.""" diff --git a/packages/sshecret-admin/src/sshecret_admin/api/endpoints/auth.py b/packages/sshecret-admin/src/sshecret_admin/api/endpoints/auth.py new file mode 100644 index 0000000..0e73eae --- /dev/null +++ b/packages/sshecret-admin/src/sshecret_admin/api/endpoints/auth.py @@ -0,0 +1,39 @@ +"""Authentication related endpoints factory.""" + +# pyright: reportUnusedFunction=false +import logging +from typing import Annotated +from fastapi import APIRouter, Depends, HTTPException, status +from fastapi.security import OAuth2PasswordRequestForm +from sqlmodel import Session + +from sshecret_admin.auth import Token, authenticate_user, create_access_token +from sshecret_admin.core.dependencies import AdminDependencies + +LOG = logging.getLogger(__name__) + +def create_router(dependencies: AdminDependencies) -> APIRouter: + """Create auth router.""" + app = APIRouter() + + @app.post("/token") + async def login_for_access_token( + session: Annotated[Session, Depends(dependencies.get_db_session)], + form_data: Annotated[OAuth2PasswordRequestForm, Depends()], + ) -> Token: + """Login user and generate token.""" + user = authenticate_user(session, form_data.username, form_data.password) + if not user: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Incorrect username or password", + headers={"WWW-Authenticate": "Bearer"}, + ) + access_token = create_access_token( + dependencies.settings, + data={"sub": user.username}, + ) + return Token(access_token=access_token, token_type="bearer") + + + return app diff --git a/packages/sshecret-admin/src/sshecret_admin/api/endpoints/clients.py b/packages/sshecret-admin/src/sshecret_admin/api/endpoints/clients.py new file mode 100644 index 0000000..c8384f4 --- /dev/null +++ b/packages/sshecret-admin/src/sshecret_admin/api/endpoints/clients.py @@ -0,0 +1,124 @@ +"""Client-related endpoints factory.""" + +# pyright: reportUnusedFunction=false + +import logging +from typing import Annotated +from fastapi import APIRouter, Depends, HTTPException, status + +from sshecret.backend import Client +from sshecret_admin.core.dependencies import AdminDependencies +from sshecret_admin.services import AdminBackend +from sshecret_admin.services.models import ( + ClientCreate, + UpdateKeyModel, + UpdateKeyResponse, + UpdatePoliciesRequest, +) + +LOG = logging.getLogger(__name__) + + +def create_router(dependencies: AdminDependencies) -> APIRouter: + """Create clients router.""" + app = APIRouter() + + @app.get("/clients/") + async def get_clients( + admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)] + ) -> list[Client]: + """Get clients.""" + clients = await admin.get_clients() + return clients + + @app.post("/clients/") + async def create_client( + new_client: ClientCreate, + admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], + ) -> Client: + """Create a new client.""" + sources: list[str] | None = None + if new_client.sources: + sources = [str(source) for source in new_client.sources] + client = await admin.create_client( + new_client.name, new_client.public_key, sources=sources + ) + return client + + @app.delete("/clients/{name}") + async def delete_client( + name: str, + admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], + ) -> None: + """Delete a client.""" + await admin.delete_client(name) + + @app.delete("/clients/{name}/secrets/{secret_name}") + async def delete_secret_from_client( + name: str, + secret_name: str, + admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], + ) -> None: + """Delete a secret from a client.""" + client = await admin.get_client(name) + if not client: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, detail="Item not found" + ) + + if secret_name not in client.secrets: + LOG.debug("Client does not have requested secret. No action to perform.") + return None + + await admin.delete_client_secret(name, secret_name) + + @app.put("/clients/{name}/policies") + async def update_client_policies( + name: str, + updated: UpdatePoliciesRequest, + admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], + ) -> Client: + """Update the client access policies.""" + client = await admin.get_client(name) + if not client: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, detail="Item not found" + ) + + LOG.debug("Old policies: %r. New: %r", client.policies, updated.sources) + + addresses: list[str] = [str(source) for source in updated.sources] + await admin.update_client_sources(name, addresses) + client = await admin.get_client(name) + + assert client is not None, "Critical: The client disappeared after update!" + + return client + + @app.put("/clients/{name}/public-key") + async def update_client_public_key( + name: str, + updated: UpdateKeyModel, + admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], + ) -> UpdateKeyResponse: + """Update client public key. + + Updating the public key will invalidate the current secrets, so these well + be resolved first, and re-encrypted using the new key. + """ + # Let's first ensure that the key is actually updated. + updated_secrets = await admin.update_client_public_key(name, updated.public_key) + return UpdateKeyResponse( + public_key=updated.public_key, updated_secrets=updated_secrets + ) + + @app.put("/clients/{name}/secrets/{secret_name}") + async def add_secret_to_client( + name: str, + secret_name: str, + admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], + ) -> None: + """Add secret to a client.""" + await admin.create_client_secret(name, secret_name) + + return app diff --git a/packages/sshecret-admin/src/sshecret_admin/api/endpoints/secrets.py b/packages/sshecret-admin/src/sshecret_admin/api/endpoints/secrets.py new file mode 100644 index 0000000..01ee01d --- /dev/null +++ b/packages/sshecret-admin/src/sshecret_admin/api/endpoints/secrets.py @@ -0,0 +1,70 @@ +"""Secrets related endpoints factory.""" + +# pyright: reportUnusedFunction=false +import logging +from typing import Annotated +from fastapi import APIRouter, Depends, HTTPException, status + +from sshecret.backend.models import Secret +from sshecret_admin.core.dependencies import AdminDependencies +from sshecret_admin.services import AdminBackend +from sshecret_admin.services.models import ( + SecretCreate, + SecretUpdate, + SecretView, +) + +LOG = logging.getLogger(__name__) + + +def create_router(dependencies: AdminDependencies) -> APIRouter: + """Create secrets router.""" + app = APIRouter() + + @app.get("/secrets/") + async def get_secret_names( + admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)] + ) -> list[Secret]: + """Get Secret Names.""" + return await admin.get_secrets() + + @app.post("/secrets/") + async def add_secret( + secret: SecretCreate, + admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], + ) -> None: + """Create a secret.""" + await admin.add_secret(secret.name, secret.get_secret(), secret.clients) + + @app.get("/secrets/{name}") + async def get_secret( + name: str, + admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], + ) -> SecretView: + """Get a secret.""" + secret_view = await admin.get_secret(name) + + if not secret_view: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, detail="Item not found." + ) + return secret_view + + @app.put("/secrets/{name}") + async def update_secret( + name: str, + value: SecretUpdate, + admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], + ) -> None: + new_value = value.get_secret() + await admin.update_secret(name, new_value) + + @app.delete("/secrets/{name}") + async def delete_secret( + name: str, + admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], + ) -> None: + """Delete secret.""" + await admin.delete_secret(name) + + return app diff --git a/packages/sshecret-admin/src/sshecret_admin/api/router.py b/packages/sshecret-admin/src/sshecret_admin/api/router.py new file mode 100644 index 0000000..ae939fb --- /dev/null +++ b/packages/sshecret-admin/src/sshecret_admin/api/router.py @@ -0,0 +1,78 @@ +"""Main API Router.""" + +# pyright: reportUnusedFunction=false + +import logging +from typing import Annotated + +from fastapi import APIRouter, Depends, HTTPException, status +from fastapi.security import OAuth2PasswordBearer + +from sqlmodel import Session, select + +from sshecret_admin.services.admin_backend import AdminBackend +from sshecret_admin.core.dependencies import BaseDependencies, AdminDependencies +from sshecret_admin.auth import PasswordDB, User, decode_token + +from .endpoints import auth, clients, secrets + +LOG = logging.getLogger(__name__) + +API_VERSION = "v1" + + +def create_router(dependencies: BaseDependencies) -> APIRouter: + """Create clients router.""" + + oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") + + async def get_current_user( + token: Annotated[str, Depends(oauth2_scheme)], + session: Annotated[Session, Depends(dependencies.get_db_session)], + ) -> User: + """Get current user from token.""" + credentials_exception = HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Could not validate credentials", + headers={"WWW-Authenticate": "Bearer"}, + ) + token_data = decode_token(dependencies.settings, token) + if not token_data: + raise credentials_exception + + user = session.exec( + select(User).where(User.username == token_data.username) + ).first() + if not user: + raise credentials_exception + return user + + async def get_current_active_user( + current_user: Annotated[User, Depends(get_current_user)], + ) -> User: + """Get current active user.""" + if current_user.disabled: + raise HTTPException(status_code=400, detail="Inactive or disabled user") + return current_user + + async def get_admin_backend(session: Annotated[Session, Depends(dependencies.get_db_session)]): + """Get admin backend API.""" + password_db = session.exec(select(PasswordDB).where(PasswordDB.id == 1)).first() + if not password_db: + raise HTTPException( + 500, detail="Error: The password manager has not yet been set up." + ) + admin = AdminBackend(dependencies.settings, password_db.encrypted_password) + yield admin + + app = APIRouter( + prefix=f"/api/{API_VERSION}", dependencies=[Depends(get_current_active_user)] + ) + + endpoint_deps = AdminDependencies.create(dependencies, get_admin_backend) + + app.include_router(auth.create_router(endpoint_deps)) + app.include_router(clients.create_router(endpoint_deps)) + app.include_router(secrets.create_router(endpoint_deps)) + + return app diff --git a/packages/sshecret-admin/src/sshecret_admin/auth/__init__.py b/packages/sshecret-admin/src/sshecret_admin/auth/__init__.py new file mode 100644 index 0000000..e83c7f5 --- /dev/null +++ b/packages/sshecret-admin/src/sshecret_admin/auth/__init__.py @@ -0,0 +1,24 @@ +"""Authentication related module.""" + +from .authentication import ( + authenticate_user, + create_access_token, + create_refresh_token, + check_password, + decode_token, + verify_password, +) +from .models import User, Token, PasswordDB + + +__all__ = [ + "PasswordDB", + "Token", + "User", + "authenticate_user", + "check_password", + "create_access_token", + "create_refresh_token", + "decode_token", + "verify_password", +] diff --git a/packages/sshecret-admin/src/sshecret_admin/auth/authentication.py b/packages/sshecret-admin/src/sshecret_admin/auth/authentication.py new file mode 100644 index 0000000..f85eba3 --- /dev/null +++ b/packages/sshecret-admin/src/sshecret_admin/auth/authentication.py @@ -0,0 +1,95 @@ +"""Authentication utilities.""" + +import logging +from datetime import datetime, timezone, timedelta +from typing import cast, Any + +import bcrypt +import jwt +from sqlmodel import Session, select + +from sshecret_admin.core.settings import AdminServerSettings +from .models import User, TokenData +from .exceptions import AuthenticationFailedError + +JWT_ALGORITHM = "HS256" +ACCESS_TOKEN_EXPIRE_MINUTES = 30 +# I know refresh tokens are supposed to be long-lived, but 6 hours for a +# sensitive application, seems reasonable. +REFRESH_TOKEN_EXPIRE_HOURS = 6 + +LOG = logging.getLogger(__name__) + + +def create_token( + settings: AdminServerSettings, + data: dict[str, Any], + expires_delta: timedelta, +) -> str: + """Create access token.""" + to_encode = data.copy() + expire = datetime.now(timezone.utc) + expires_delta + to_encode.update({"exp": expire}) + encoded_jwt = jwt.encode(to_encode, settings.secret_key, algorithm=JWT_ALGORITHM) + return str(encoded_jwt) + + +def create_access_token( + settings: AdminServerSettings, + data: dict[str, Any], + expires_delta: timedelta | None = None, +) -> str: + """Create access token.""" + if not expires_delta: + expires_delta = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) + return create_token(settings, data, expires_delta) + + +def create_refresh_token( + settings: AdminServerSettings, + data: dict[str, Any], + expires_delta: timedelta | None = None, +) -> str: + """Create access token.""" + if not expires_delta: + expires_delta = timedelta(hours=REFRESH_TOKEN_EXPIRE_HOURS) + return create_token(settings, data, expires_delta) + + +def verify_password(plain_password: str, hashed_password: str) -> bool: + """Verify password against stored hash.""" + return bcrypt.checkpw(plain_password.encode(), hashed_password.encode()) + + +def check_password(plain_password: str, hashed_password: str) -> None: + """Check password. + + If password doesn't match, throw AuthenticationFailedError. + """ + if not verify_password(plain_password, hashed_password): + raise AuthenticationFailedError() + + +def authenticate_user(session: Session, username: str, password: str) -> User | None: + """Authenticate user.""" + user = session.exec(select(User).where(User.username == username)).first() + if not user: + return None + if not verify_password(password, user.hashed_password): + return None + return user + + +def decode_token(settings: AdminServerSettings, token: str) -> TokenData | None: + """Decode token.""" + try: + payload = jwt.decode(token, settings.secret_key, algorithms=[JWT_ALGORITHM]) + username = cast("str | None", payload.get("sub")) + if not username: + return None + + token_data = TokenData(username=username) + return token_data + except jwt.InvalidTokenError as e: + LOG.debug("Could not decode token: %s", e, exc_info=True) + return None diff --git a/packages/sshecret-admin/src/sshecret_admin/auth/exceptions.py b/packages/sshecret-admin/src/sshecret_admin/auth/exceptions.py new file mode 100644 index 0000000..0acb34d --- /dev/null +++ b/packages/sshecret-admin/src/sshecret_admin/auth/exceptions.py @@ -0,0 +1,30 @@ +"""Authentication related exceptions.""" +from typing import override + +from .models import LoginError + + +class AuthenticationFailedError(Exception): + """Authentication failed.""" + + @override + def __init__(self, message: str | None = None) -> None: + """Initialize exception class.""" + if not message: + message = "Invalid user or password." + super().__init__(message) + self.login_error: LoginError = LoginError( + title="Authentication Failed", message=message + ) + + +class AuthenticationNeededError(Exception): + """Authentication needed error.""" + + @override + def __init__(self, message: str | None = None) -> None: + """Initialize exception class.""" + if not message: + message = "You need to be logged in to continue." + super().__init__(message) + self.login_error: LoginError = LoginError(title="Unauthorized", message=message) diff --git a/packages/sshecret-admin/src/sshecret_admin/auth/models.py b/packages/sshecret-admin/src/sshecret_admin/auth/models.py new file mode 100644 index 0000000..7a25d84 --- /dev/null +++ b/packages/sshecret-admin/src/sshecret_admin/auth/models.py @@ -0,0 +1,71 @@ +"""Models for authentication.""" + +from datetime import datetime +import sqlalchemy as sa +from sqlmodel import SQLModel, Field + + +JWT_ALGORITHM = "HS256" +ACCESS_TOKEN_EXPIRE_MINUTES = 30 +# I know refresh tokens are supposed to be long-lived, but 6 hours for a +# sensitive application, seems reasonable. +REFRESH_TOKEN_EXPIRE_HOURS = 6 + + +class User(SQLModel, table=True): + """Users.""" + + username: str = Field(unique=True, primary_key=True) + hashed_password: str + disabled: bool = Field(default=False) + created_at: datetime | None = Field( + default=None, + sa_type=sa.DateTime(timezone=True), + sa_column_kwargs={"server_default": sa.func.now()}, + nullable=False, + ) + + +class PasswordDB(SQLModel, table=True): + """Password database.""" + + id: int | None = Field(default=None, primary_key=True) + encrypted_password: str + + created_at: datetime | None = Field( + default=None, + sa_type=sa.DateTime(timezone=True), + sa_column_kwargs={"server_default": sa.func.now()}, + nullable=False, + ) + + updated_at: datetime | None = Field( + default=None, + sa_type=sa.DateTime(timezone=True), + sa_column_kwargs={"onupdate": sa.func.now(), "server_default": sa.func.now()}, + ) + + +def init_db(engine: sa.Engine) -> None: + """Create database.""" + SQLModel.metadata.create_all(engine) + + +class TokenData(SQLModel): + """Token data.""" + + username: str | None = None + + +class Token(SQLModel): + access_token: str + token_type: str + + +class LoginError(SQLModel): + """Login Error model.""" + # TODO: Remove this. + + title: str + message: str + diff --git a/packages/sshecret-admin/src/sshecret_admin/auth_models.py b/packages/sshecret-admin/src/sshecret_admin/auth_models.py deleted file mode 100644 index 87f861b..0000000 --- a/packages/sshecret-admin/src/sshecret_admin/auth_models.py +++ /dev/null @@ -1,125 +0,0 @@ -"""Models for authentication.""" - -from datetime import datetime, timedelta, timezone -import bcrypt -import sqlalchemy as sa -from typing import Any, override -import jwt -from sqlmodel import SQLModel, Field -from sshecret_admin.settings import AdminServerSettings - - -JWT_ALGORITHM = "HS256" -ACCESS_TOKEN_EXPIRE_MINUTES = 30 - - -class User(SQLModel, table=True): - """Users.""" - - username: str = Field(unique=True, primary_key=True) - hashed_password: str - disabled: bool = Field(default=False) - created_at: datetime | None = Field( - default=None, - sa_type=sa.DateTime(timezone=True), - sa_column_kwargs={"server_default": sa.func.now()}, - nullable=False, - ) - - -class PasswordDB(SQLModel, table=True): - """Password database.""" - - id: int | None = Field(default=None, primary_key=True) - encrypted_password: str - - created_at: datetime | None = Field( - default=None, - sa_type=sa.DateTime(timezone=True), - sa_column_kwargs={"server_default": sa.func.now()}, - nullable=False, - ) - - updated_at: datetime | None = Field( - default=None, - sa_type=sa.DateTime(timezone=True), - sa_column_kwargs={"onupdate": sa.func.now(), "server_default": sa.func.now()}, - ) - - -def init_db(engine: sa.Engine) -> None: - """Create database.""" - SQLModel.metadata.create_all(engine) - - -class TokenData(SQLModel): - """Token data.""" - - username: str | None = None - - -class Token(SQLModel): - access_token: str - token_type: str - - -def create_access_token( - settings: AdminServerSettings, - data: dict[str, Any], - expires_delta: timedelta | None = None, -) -> str: - """Create access token.""" - to_encode = data.copy() - expire = datetime.now(timezone.utc) + timedelta(minutes=15) - if expires_delta: - expire = datetime.now(timezone.utc) + expires_delta - to_encode.update({"exp": expire}) - encoded_jwt = jwt.encode(to_encode, settings.secret_key, algorithm=JWT_ALGORITHM) - return encoded_jwt - - -def verify_password(plain_password: str, hashed_password: str) -> bool: - """Verify password against stored hash.""" - return bcrypt.checkpw(plain_password.encode(), hashed_password.encode()) - - -def check_password(plain_password: str, hashed_password: str) -> None: - """Check password. - - If password doesn't match, throw AuthenticationFailedError. - """ - if not verify_password(plain_password, hashed_password): - raise AuthenticationFailedError() - - -class LoginError(SQLModel): - """Login Error model.""" - - title: str - message: str - - -class AuthenticationFailedError(Exception): - """Authentication failed.""" - - @override - def __init__(self, message: str | None = None) -> None: - """Initialize exception class.""" - if not message: - message = "Invalid user or password." - super().__init__(message) - self.login_error: LoginError = LoginError( - title="Authentication Failed", message=message - ) - - -class AuthenticationNeededError(Exception): - """Authentication needed error.""" - - @override - def __init__(self, message: str | None = None) -> None: - """Initialize exception class.""" - if not message: - message = "You need to be logged in to continue." - super().__init__(message) - self.login_error: LoginError = LoginError(title="Unauthorized", message=message) diff --git a/packages/sshecret-admin/src/sshecret_admin/templates/widgets/clients.html b/packages/sshecret-admin/src/sshecret_admin/core/__init__.py similarity index 100% rename from packages/sshecret-admin/src/sshecret_admin/templates/widgets/clients.html rename to packages/sshecret-admin/src/sshecret_admin/core/__init__.py diff --git a/packages/sshecret-admin/src/sshecret_admin/app.py b/packages/sshecret-admin/src/sshecret_admin/core/app.py similarity index 68% rename from packages/sshecret-admin/src/sshecret_admin/app.py rename to packages/sshecret-admin/src/sshecret_admin/core/app.py index 79cbfc3..06cc166 100644 --- a/packages/sshecret-admin/src/sshecret_admin/app.py +++ b/packages/sshecret-admin/src/sshecret_admin/core/app.py @@ -5,24 +5,22 @@ import logging import os from contextlib import asynccontextmanager - from pathlib import Path -from fastapi import FastAPI, Request, status +from fastapi import FastAPI, Request, Response, status from fastapi.encoders import jsonable_encoder from fastapi.exceptions import RequestValidationError from fastapi.responses import JSONResponse, RedirectResponse from fastapi.staticfiles import StaticFiles - from sqlmodel import Session, select +from sshecret_admin import api, frontend +from sshecret_admin.auth.models import PasswordDB, init_db +from sshecret_admin.core.db import setup_database +from sshecret_admin.frontend.exceptions import RedirectException +from sshecret_admin.services.master_password import setup_master_password -from .admin_api import get_admin_api -from .auth_models import init_db, PasswordDB, AuthenticationFailedError, AuthenticationNeededError -from .db import setup_database -from .master_password import setup_master_password +from .dependencies import BaseDependencies from .settings import AdminServerSettings -from .frontend import create_frontend -from .types import DBSessionDep LOG = logging.getLogger(__name__) @@ -30,15 +28,14 @@ LOG = logging.getLogger(__name__) def setup_frontend( - app: FastAPI, settings: AdminServerSettings, get_db_session: DBSessionDep + app: FastAPI, dependencies: BaseDependencies ) -> None: """Setup frontend.""" script_path = Path(os.path.dirname(os.path.realpath(__file__))) - static_path = script_path / "static" + static_path = script_path.parent / "static" app.mount("/static", StaticFiles(directory=static_path), name="static") - frontend = create_frontend(settings, get_db_session) - app.include_router(frontend) + app.include_router(frontend.create_frontend_router(dependencies)) def create_admin_app( @@ -88,19 +85,15 @@ def create_admin_app( content=jsonable_encoder({"detail": exc.errors(), "body": exc.body}), ) - @app.exception_handler(AuthenticationNeededError) - async def authentication_needed_handler( - request: Request, exc: AuthenticationNeededError, - ): - qs = f"error_title={exc.login_error.title}&error_message={exc.login_error.message}" - return RedirectResponse(f"/?{qs}") + @app.exception_handler(RedirectException) + async def redirect_handler(request: Request, exc: RedirectException) -> Response: + """Handle redirect exceptions.""" + if "hx-request" in request.headers: + response = Response() + response.headers["HX-Redirect"] = str(exc.to) + return response + return RedirectResponse(url=str(exc.to)) - @app.exception_handler(AuthenticationFailedError) - async def authentication_failed_handler( - request: Request, exc: AuthenticationNeededError, - ): - qs = f"error_title={exc.login_error.title}&error_message={exc.login_error.message}" - return RedirectResponse(f"/?{qs}") @app.get("/health") async def get_health() -> JSONResponse: @@ -109,10 +102,11 @@ def create_admin_app( status_code=status.HTTP_200_OK, content=jsonable_encoder({"status": "LIVE"}) ) - admin_api = get_admin_api(get_db_session, settings) + dependencies = BaseDependencies(settings, get_db_session) - app.include_router(admin_api) + + app.include_router(api.create_api_router(dependencies)) if with_frontend: - setup_frontend(app, settings, get_db_session) + setup_frontend(app, dependencies) return app diff --git a/packages/sshecret-admin/src/sshecret_admin/cli.py b/packages/sshecret-admin/src/sshecret_admin/core/cli.py similarity index 85% rename from packages/sshecret-admin/src/sshecret_admin/cli.py rename to packages/sshecret-admin/src/sshecret_admin/core/cli.py index b45bcc4..bbad59f 100644 --- a/packages/sshecret-admin/src/sshecret_admin/cli.py +++ b/packages/sshecret-admin/src/sshecret_admin/core/cli.py @@ -7,29 +7,30 @@ import logging from typing import Any, cast import bcrypt import click -from sshecret_admin.admin_backend import AdminBackend +from sshecret_admin.services.admin_backend import AdminBackend import uvicorn from pydantic import ValidationError from sqlmodel import Session, create_engine, select -from .auth_models import init_db, User, PasswordDB -from .settings import AdminServerSettings +from sshecret_admin.auth.models import init_db, User, PasswordDB +from sshecret_admin.core.settings import AdminServerSettings handler = logging.StreamHandler() -formatter = logging.Formatter("%(asctime)s [%(processName)s: %(process)d] [%(threadName)s: %(thread)d] [%(levelname)s] %(name)s: %(message)s") +formatter = logging.Formatter( + "%(asctime)s [%(processName)s: %(process)d] [%(threadName)s: %(thread)d] [%(levelname)s] %(name)s: %(message)s" +) handler.setFormatter(formatter) LOG = logging.getLogger() LOG.addHandler(handler) LOG.setLevel(logging.INFO) - - def hash_password(password: str) -> str: """Hash password.""" salt = bcrypt.gensalt() hashed_password = bcrypt.hashpw(password.encode(), salt) return hashed_password.decode() + def create_user(session: Session, username: str, password: str) -> None: """Create a user.""" hashed_password = hash_password(password) @@ -48,7 +49,9 @@ def cli(ctx: click.Context, debug: bool) -> None: try: settings = AdminServerSettings() # pyright: ignore[reportCallIssue] except ValidationError as e: - raise click.ClickException("Error: One or more required environment options are missing.") from e + raise click.ClickException( + "Error: One or more required environment options are missing." + ) from e ctx.obj = settings @@ -66,6 +69,7 @@ def cli_create_user(ctx: click.Context, username: str, password: str) -> None: click.echo("User created.") + @cli.command("passwd") @click.argument("username") @click.password_option() @@ -85,6 +89,7 @@ def cli_change_user_passwd(ctx: click.Context, username: str, password: str) -> session.commit() click.echo("Password updated.") + @cli.command("deluser") @click.argument("username") @click.confirmation_option() @@ -112,7 +117,9 @@ def cli_delete_user(ctx: click.Context, username: str) -> None: @click.option("--workers", type=click.INT) def cli_run(host: str, port: int, dev: bool, workers: int | None) -> None: """Run the server.""" - uvicorn.run("sshecret_admin.main:app", host=host, port=port, reload=dev, workers=workers) + uvicorn.run( + "sshecret_admin.core.main:app", host=host, port=port, reload=dev, workers=workers + ) @cli.command("repl") @@ -126,7 +133,9 @@ def cli_repl(ctx: click.Context) -> None: password_db = session.exec(select(PasswordDB).where(PasswordDB.id == 1)).first() if not password_db: - raise click.ClickException("Error: Password database has not yet been setup. Start the server to finish setup.") + raise click.ClickException( + "Error: Password database has not yet been setup. Start the server to finish setup." + ) def run(func: Awaitable[Any]) -> Any: """Run an async function.""" diff --git a/packages/sshecret-admin/src/sshecret_admin/db.py b/packages/sshecret-admin/src/sshecret_admin/core/db.py similarity index 100% rename from packages/sshecret-admin/src/sshecret_admin/db.py rename to packages/sshecret-admin/src/sshecret_admin/core/db.py diff --git a/packages/sshecret-admin/src/sshecret_admin/core/dependencies.py b/packages/sshecret-admin/src/sshecret_admin/core/dependencies.py new file mode 100644 index 0000000..d2d2137 --- /dev/null +++ b/packages/sshecret-admin/src/sshecret_admin/core/dependencies.py @@ -0,0 +1,37 @@ +"""Common type definitions.""" + +from collections.abc import AsyncGenerator, Callable, Generator +from dataclasses import dataclass +from typing import Self + +from sqlmodel import Session +from sshecret_admin.services import AdminBackend +from sshecret_admin.core.settings import AdminServerSettings + + +DBSessionDep = Callable[[], Generator[Session, None, None]] + +AdminDep = Callable[[Session], AsyncGenerator[AdminBackend, None]] + + +@dataclass +class BaseDependencies: + """Base level dependencies.""" + + settings: AdminServerSettings + get_db_session: DBSessionDep + +@dataclass +class AdminDependencies(BaseDependencies): + """Dependency class with admin.""" + + get_admin_backend: AdminDep + + @classmethod + def create(cls, deps: BaseDependencies, get_admin_backend: AdminDep) -> Self: + """Create from base dependencies.""" + return cls( + settings=deps.settings, + get_db_session=deps.get_db_session, + get_admin_backend=get_admin_backend, + ) diff --git a/packages/sshecret-admin/src/sshecret_admin/main.py b/packages/sshecret-admin/src/sshecret_admin/core/main.py similarity index 97% rename from packages/sshecret-admin/src/sshecret_admin/main.py rename to packages/sshecret-admin/src/sshecret_admin/core/main.py index c610df4..1bc005b 100644 --- a/packages/sshecret-admin/src/sshecret_admin/main.py +++ b/packages/sshecret-admin/src/sshecret_admin/core/main.py @@ -1,6 +1,5 @@ """Main server app.""" import sys -import uvicorn import click from pydantic import ValidationError diff --git a/packages/sshecret-admin/src/sshecret_admin/settings.py b/packages/sshecret-admin/src/sshecret_admin/core/settings.py similarity index 64% rename from packages/sshecret-admin/src/sshecret_admin/settings.py rename to packages/sshecret-admin/src/sshecret_admin/core/settings.py index 571288f..ab3af40 100644 --- a/packages/sshecret-admin/src/sshecret_admin/settings.py +++ b/packages/sshecret-admin/src/sshecret_admin/core/settings.py @@ -2,11 +2,12 @@ from pydantic import AnyHttpUrl, Field from pydantic_settings import BaseSettings, SettingsConfigDict +from sqlalchemy import URL DEFAULT_LISTEN_PORT = 8822 -DEFAULT_DATABASE = "sqlite:///ssh_admin.db" +DEFAULT_DATABASE = "ssh_admin.db" class AdminServerSettings(BaseSettings): @@ -21,5 +22,12 @@ class AdminServerSettings(BaseSettings): listen_address: str = Field(default="") secret_key: str port: int = DEFAULT_LISTEN_PORT - admin_db: str = Field(default=DEFAULT_DATABASE) + + database: str = Field(default=DEFAULT_DATABASE) + #admin_db: str = Field(default=DEFAULT_DATABASE) debug: bool = False + + @property + def admin_db(self) -> URL: + """Construct database url.""" + return URL.create(drivername="sqlite", database=self.database) diff --git a/packages/sshecret-admin/src/sshecret_admin/frontend.py b/packages/sshecret-admin/src/sshecret_admin/frontend.py deleted file mode 100644 index 40296aa..0000000 --- a/packages/sshecret-admin/src/sshecret_admin/frontend.py +++ /dev/null @@ -1,240 +0,0 @@ -"""Frontend methods.""" - -# pyright: reportUnusedFunction=false -import logging -import os -from datetime import timedelta -from pathlib import Path -from typing import Annotated - -import jwt -from fastapi import APIRouter, Depends, Form, HTTPException, Request, Response, status -from fastapi.responses import RedirectResponse -from jinja2_fragments.fastapi import Jinja2Blocks -from sqlmodel import Session, select -from sshecret_admin.settings import AdminServerSettings -from sshecret.backend import SshecretBackend -from .admin_backend import AdminBackend -from .auth_models import ( - JWT_ALGORITHM, - AuthenticationFailedError, - AuthenticationNeededError, - LoginError, - PasswordDB, - User, - TokenData, - create_access_token, - verify_password, -) -from .types import DBSessionDep -from .views import create_audit_view, create_client_view, create_secrets_view - - -ACCESS_TOKEN_EXPIRE_MINUTES = 45 -LOG = logging.getLogger(__name__) - - -def login_error(templates: Jinja2Blocks, request: Request): - """Return a login error.""" - return templates.TemplateResponse( - request, - "login.html", - { - "page_title": "Login", - "page_description": "Login Page", - "error": "Invalid Login.", - }, - ) - - -def create_frontend( - settings: AdminServerSettings, get_db_session: DBSessionDep -) -> APIRouter: - """Create frontend.""" - app = APIRouter(include_in_schema=False) - - script_path = Path(os.path.dirname(os.path.realpath(__file__))) - - template_path = script_path / "templates" - - templates = Jinja2Blocks(directory=template_path) - - # @app.exception_handler(AuthenticationFailedError) - # async def handle_authentication_failed(request: Request, exc: AuthenticationFailedError): - # """Handle authentication failed error.""" - # return templates.TemplateResponse(request, "login.html") - - async def get_backend(): - """Get backend client.""" - backend_client = SshecretBackend( - str(settings.backend_url), settings.backend_token - ) - yield backend_client - - async def get_admin_backend(session: Annotated[Session, Depends(get_db_session)]): - """Get admin backend API.""" - password_db = session.exec(select(PasswordDB).where(PasswordDB.id == 1)).first() - if not password_db: - raise HTTPException( - 500, detail="Error: The password manager has not yet been set up." - ) - admin = AdminBackend(settings, password_db.encrypted_password) - yield admin - - async def get_login_status( - request: Request, session: Annotated[Session, Depends(get_db_session)] - ) -> bool: - """Get login status.""" - token = request.cookies.get("access_token") - if not token: - return False - try: - payload = jwt.decode(token, settings.secret_key, algorithms=[JWT_ALGORITHM]) - username = payload.get("sub") - if not username: - return False - except jwt.InvalidTokenError: - return False - token_data = TokenData(username=username) - user = session.exec( - select(User).where(User.username == token_data.username) - ).first() - if not user: - return False - return True - - async def get_current_user_from_token( - request: Request, session: Annotated[Session, Depends(get_db_session)] - ) -> User: - credentials_exception = AuthenticationNeededError() - """Get current user from token.""" - token = request.cookies.get("access_token") - if not token: - raise credentials_exception - try: - payload = jwt.decode(token, settings.secret_key, algorithms=[JWT_ALGORITHM]) - username = payload.get("sub") - if not username: - raise credentials_exception - except jwt.InvalidTokenError: - raise credentials_exception - token_data = TokenData(username=username) - user = session.exec( - select(User).where(User.username == token_data.username) - ).first() - if not user: - raise credentials_exception - - return user - - @app.get("/") - async def get_index( - request: Request, - login_status: Annotated[bool, Depends(get_login_status)], - error_title: str | None = None, - error_message: str | None = None, - ): - """Get index.""" - if login_status: - return RedirectResponse("/dashboard") - login_error: LoginError | None = None - if error_title and error_message: - login_error = LoginError(title=error_title, message=error_message) - return templates.TemplateResponse( - request, - "login.html", - { - "page_title": "Login", - "page_description": "Login page.", - "login_error": login_error, - }, - ) - - @app.post("/") - async def post_index( - request: Request, - error_title: str | None = None, - error_message: str | None = None, - ): - """Get index.""" - login_error: LoginError | None = None - if error_title and error_message: - login_error = LoginError(title=error_title, message=error_message) - return templates.TemplateResponse( - request, - "login.html", - { - "page_title": "Login", - "page_description": "Login page.", - "login_error": login_error, - }, - ) - - @app.post("/login") - async def login_user( - response: Response, - request: Request, - session: Annotated[Session, Depends(get_db_session)], - username: Annotated[str, Form()], - password: Annotated[str, Form()], - ): - """Log in user.""" - user = session.exec(select(User).where(User.username == username)).first() - auth_error = AuthenticationFailedError() - if not user: - raise auth_error - - if not verify_password(password, user.hashed_password): - raise auth_error - - token_data = {"sub": user.username} - expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) - token = create_access_token(settings, token_data, expires_delta=expires) - response = RedirectResponse(url="/dashboard", status_code=status.HTTP_302_FOUND) - response.set_cookie( - key="access_token", value=token, httponly=True, secure=False, samesite="lax" - ) - return response - - @app.get("/success") - async def success_page( - request: Request, - current_user: Annotated[User, Depends(get_current_user_from_token)], - ): - """Display a success page.""" - return templates.TemplateResponse( - request, "success.html", {"page_title": "Success!", "user": current_user} - ) - - @app.get("/dashboard") - async def get_dashboard( - request: Request, - current_user: Annotated[User, Depends(get_current_user_from_token)], - admin: Annotated[AdminBackend, Depends(get_admin_backend)], - ): - """Dashboard for mocking up the dashboard.""" - # secrets = await admin.get_secrets() - return templates.TemplateResponse( - request, - "dashboard.html", - { - "page_title": "sshecret", - "user": current_user.username, - }, - ) - - # Stop adding routes here. - - app.include_router( - create_client_view(templates, get_current_user_from_token, get_admin_backend) - ) - - app.include_router( - create_secrets_view(templates, get_current_user_from_token, get_admin_backend) - ) - - app.include_router( - create_audit_view(templates, get_current_user_from_token, get_admin_backend) - ) - - return app diff --git a/packages/sshecret-admin/src/sshecret_admin/frontend/__init__.py b/packages/sshecret-admin/src/sshecret_admin/frontend/__init__.py new file mode 100644 index 0000000..c527245 --- /dev/null +++ b/packages/sshecret-admin/src/sshecret_admin/frontend/__init__.py @@ -0,0 +1,5 @@ +"""Frontend app.""" + +from .router import create_router as create_frontend_router + +__all__ = ["create_frontend_router"] diff --git a/packages/sshecret-admin/src/sshecret_admin/frontend/auth.py b/packages/sshecret-admin/src/sshecret_admin/frontend/auth.py new file mode 100644 index 0000000..7ac883a --- /dev/null +++ b/packages/sshecret-admin/src/sshecret_admin/frontend/auth.py @@ -0,0 +1,7 @@ +"""Custom oauth2 class.""" + +from fastapi.security import OAuth2 + + +class Oauth2TokenInCookies(OAuth2): + """TODO: Create this.""" diff --git a/packages/sshecret-admin/src/sshecret_admin/frontend/dependencies.py b/packages/sshecret-admin/src/sshecret_admin/frontend/dependencies.py new file mode 100644 index 0000000..1424653 --- /dev/null +++ b/packages/sshecret-admin/src/sshecret_admin/frontend/dependencies.py @@ -0,0 +1,48 @@ +"""Frontend dependencies.""" + +from dataclasses import dataclass +from collections.abc import Callable, Awaitable +from typing import Self + +from jinja2_fragments.fastapi import Jinja2Blocks +from fastapi import Request +from sqlmodel import Session + +from sshecret_admin.core.dependencies import AdminDep, BaseDependencies + +from sshecret_admin.auth.models import User + +UserTokenDep = Callable[[Request, Session], Awaitable[User]] +UserLoginDep = Callable[[Request, Session], Awaitable[bool]] + + +@dataclass +class FrontendDependencies(BaseDependencies): + """Frontend dependencies.""" + + get_admin_backend: AdminDep + templates: Jinja2Blocks + get_user_from_access_token: UserTokenDep + get_user_from_refresh_token: UserTokenDep + get_login_status: UserLoginDep + + @classmethod + def create( + cls, + deps: BaseDependencies, + get_admin_backend: AdminDep, + templates: Jinja2Blocks, + get_user_from_access_token: UserTokenDep, + get_user_from_refresh_token: UserTokenDep, + get_login_status: UserLoginDep, + ) -> Self: + """Create from base dependencies.""" + return cls( + settings=deps.settings, + get_db_session=deps.get_db_session, + get_admin_backend=get_admin_backend, + templates=templates, + get_user_from_access_token=get_user_from_access_token, + get_user_from_refresh_token=get_user_from_refresh_token, + get_login_status=get_login_status, + ) diff --git a/packages/sshecret-admin/src/sshecret_admin/frontend/exceptions.py b/packages/sshecret-admin/src/sshecret_admin/frontend/exceptions.py new file mode 100644 index 0000000..4c92ff4 --- /dev/null +++ b/packages/sshecret-admin/src/sshecret_admin/frontend/exceptions.py @@ -0,0 +1,13 @@ +"""Frontend exceptions.""" +from starlette.datastructures import URL + + +class RedirectException(Exception): + """Exception that initiates a redirect flow.""" + + def __init__(self, to: str | URL) -> None: # pyright: ignore[reportMissingSuperCall] + """Raise exception that redirects.""" + if isinstance(to, str): + to = URL(to) + + self.to: URL = to diff --git a/packages/sshecret-admin/src/sshecret_admin/frontend/router.py b/packages/sshecret-admin/src/sshecret_admin/frontend/router.py new file mode 100644 index 0000000..5b83067 --- /dev/null +++ b/packages/sshecret-admin/src/sshecret_admin/frontend/router.py @@ -0,0 +1,133 @@ +"""Frontend router.""" + +# pyright: reportUnusedFunction=false + +import logging +import os +from pathlib import Path +from typing import Annotated + +from fastapi import APIRouter, Depends, HTTPException, Request + +from jinja2_fragments.fastapi import Jinja2Blocks + +from sqlmodel import Session, select +from starlette.datastructures import URL + + +from sshecret_admin.auth import PasswordDB, User, decode_token +from sshecret_admin.core.dependencies import BaseDependencies +from sshecret_admin.services.admin_backend import AdminBackend + +from .dependencies import FrontendDependencies +from .exceptions import RedirectException +from .views import audit, auth, clients, index, secrets + + +LOG = logging.getLogger(__name__) + + +access_token = "access_token" +refresh_token = "refresh_token" + + +def create_router(dependencies: BaseDependencies) -> APIRouter: + """Create frontend router.""" + + app = APIRouter(include_in_schema=False) + + script_path = Path(os.path.dirname(os.path.realpath(__file__))) + + template_path = script_path / "templates" + + templates = Jinja2Blocks(directory=template_path) + + async def get_admin_backend( + session: Annotated[Session, Depends(dependencies.get_db_session)] + ): + """Get admin backend API.""" + password_db = session.exec(select(PasswordDB).where(PasswordDB.id == 1)).first() + if not password_db: + raise HTTPException( + 500, detail="Error: The password manager has not yet been set up." + ) + admin = AdminBackend(dependencies.settings, password_db.encrypted_password) + yield admin + + async def get_user_from_token( + token: str, + session: Session, + ) -> User | None: + """Get user from a token.""" + token_data = decode_token(dependencies.settings, token) + if not token_data: + return None + user = session.exec( + select(User).where(User.username == token_data.username) + ).first() + if not user or user.disabled: + return None + return user + + async def get_user_from_refresh_token( + request: Request, + session: Annotated[Session, Depends(dependencies.get_db_session)], + ) -> User: + """Get user from refresh token.""" + next = URL("/login").include_query_params(next=request.url.path) + credentials_error = RedirectException(to=next) + token = request.cookies.get("refresh_token") + if not token: + raise credentials_error + + user = await get_user_from_token(token, session) + if not user: + raise credentials_error + return user + + async def get_user_from_access_token( + request: Request, + session: Annotated[Session, Depends(dependencies.get_db_session)], + ) -> User: + """Get user from access token.""" + token = request.cookies.get("access_token") + next = URL("/refresh").include_query_params(next=request.url.path) + credentials_error = RedirectException(to=next) + if not token: + raise credentials_error + + user = await get_user_from_token(token, session) + if not user: + raise credentials_error + return user + + async def get_login_status( + request: Request, + session: Annotated[Session, Depends(dependencies.get_db_session)], + ) -> bool: + """Get login status.""" + token = request.cookies.get("access_token") + if not token: + return False + + user = await get_user_from_token(token, session) + if not user: + return False + return True + + view_dependencies = FrontendDependencies.create( + dependencies, + get_admin_backend, + templates, + get_user_from_access_token, + get_user_from_refresh_token, + get_login_status, + ) + + app.include_router(audit.create_router(view_dependencies)) + app.include_router(auth.create_router(view_dependencies)) + app.include_router(clients.create_router(view_dependencies)) + app.include_router(index.create_router(view_dependencies)) + app.include_router(secrets.create_router(view_dependencies)) + + return app diff --git a/packages/sshecret-admin/src/sshecret_admin/templates/audit/entry.html.j2 b/packages/sshecret-admin/src/sshecret_admin/frontend/templates/audit/entry.html.j2 similarity index 100% rename from packages/sshecret-admin/src/sshecret_admin/templates/audit/entry.html.j2 rename to packages/sshecret-admin/src/sshecret_admin/frontend/templates/audit/entry.html.j2 diff --git a/packages/sshecret-admin/src/sshecret_admin/templates/audit/index.html.j2 b/packages/sshecret-admin/src/sshecret_admin/frontend/templates/audit/index.html.j2 similarity index 100% rename from packages/sshecret-admin/src/sshecret_admin/templates/audit/index.html.j2 rename to packages/sshecret-admin/src/sshecret_admin/frontend/templates/audit/index.html.j2 diff --git a/packages/sshecret-admin/src/sshecret_admin/templates/audit/inner.html.j2 b/packages/sshecret-admin/src/sshecret_admin/frontend/templates/audit/inner.html.j2 similarity index 100% rename from packages/sshecret-admin/src/sshecret_admin/templates/audit/inner.html.j2 rename to packages/sshecret-admin/src/sshecret_admin/frontend/templates/audit/inner.html.j2 diff --git a/packages/sshecret-admin/src/sshecret_admin/templates/audit/inner_save.html.j2 b/packages/sshecret-admin/src/sshecret_admin/frontend/templates/audit/inner_save.html.j2 similarity index 100% rename from packages/sshecret-admin/src/sshecret_admin/templates/audit/inner_save.html.j2 rename to packages/sshecret-admin/src/sshecret_admin/frontend/templates/audit/inner_save.html.j2 diff --git a/packages/sshecret-admin/src/sshecret_admin/templates/audit/pagination.html.j2 b/packages/sshecret-admin/src/sshecret_admin/frontend/templates/audit/pagination.html.j2 similarity index 100% rename from packages/sshecret-admin/src/sshecret_admin/templates/audit/pagination.html.j2 rename to packages/sshecret-admin/src/sshecret_admin/frontend/templates/audit/pagination.html.j2 diff --git a/packages/sshecret-admin/src/sshecret_admin/templates/clients/client.html.j2 b/packages/sshecret-admin/src/sshecret_admin/frontend/templates/clients/client.html.j2 similarity index 100% rename from packages/sshecret-admin/src/sshecret_admin/templates/clients/client.html.j2 rename to packages/sshecret-admin/src/sshecret_admin/frontend/templates/clients/client.html.j2 diff --git a/packages/sshecret-admin/src/sshecret_admin/templates/clients/drawer_client_create.html.j2 b/packages/sshecret-admin/src/sshecret_admin/frontend/templates/clients/drawer_client_create.html.j2 similarity index 100% rename from packages/sshecret-admin/src/sshecret_admin/templates/clients/drawer_client_create.html.j2 rename to packages/sshecret-admin/src/sshecret_admin/frontend/templates/clients/drawer_client_create.html.j2 diff --git a/packages/sshecret-admin/src/sshecret_admin/templates/clients/drawer_client_delete.html.j2 b/packages/sshecret-admin/src/sshecret_admin/frontend/templates/clients/drawer_client_delete.html.j2 similarity index 100% rename from packages/sshecret-admin/src/sshecret_admin/templates/clients/drawer_client_delete.html.j2 rename to packages/sshecret-admin/src/sshecret_admin/frontend/templates/clients/drawer_client_delete.html.j2 diff --git a/packages/sshecret-admin/src/sshecret_admin/templates/clients/drawer_client_update.html.j2 b/packages/sshecret-admin/src/sshecret_admin/frontend/templates/clients/drawer_client_update.html.j2 similarity index 100% rename from packages/sshecret-admin/src/sshecret_admin/templates/clients/drawer_client_update.html.j2 rename to packages/sshecret-admin/src/sshecret_admin/frontend/templates/clients/drawer_client_update.html.j2 diff --git a/packages/sshecret-admin/src/sshecret_admin/templates/clients/dynamic.html.j2 b/packages/sshecret-admin/src/sshecret_admin/frontend/templates/clients/dynamic.html.j2 similarity index 100% rename from packages/sshecret-admin/src/sshecret_admin/templates/clients/dynamic.html.j2 rename to packages/sshecret-admin/src/sshecret_admin/frontend/templates/clients/dynamic.html.j2 diff --git a/packages/sshecret-admin/src/sshecret_admin/templates/clients/field_invalid.html.j2 b/packages/sshecret-admin/src/sshecret_admin/frontend/templates/clients/field_invalid.html.j2 similarity index 100% rename from packages/sshecret-admin/src/sshecret_admin/templates/clients/field_invalid.html.j2 rename to packages/sshecret-admin/src/sshecret_admin/frontend/templates/clients/field_invalid.html.j2 diff --git a/packages/sshecret-admin/src/sshecret_admin/templates/clients/field_valid.html.j2 b/packages/sshecret-admin/src/sshecret_admin/frontend/templates/clients/field_valid.html.j2 similarity index 100% rename from packages/sshecret-admin/src/sshecret_admin/templates/clients/field_valid.html.j2 rename to packages/sshecret-admin/src/sshecret_admin/frontend/templates/clients/field_valid.html.j2 diff --git a/packages/sshecret-admin/src/sshecret_admin/templates/clients/index.html.j2 b/packages/sshecret-admin/src/sshecret_admin/frontend/templates/clients/index.html.j2 similarity index 100% rename from packages/sshecret-admin/src/sshecret_admin/templates/clients/index.html.j2 rename to packages/sshecret-admin/src/sshecret_admin/frontend/templates/clients/index.html.j2 diff --git a/packages/sshecret-admin/src/sshecret_admin/templates/clients/inner.html.j2 b/packages/sshecret-admin/src/sshecret_admin/frontend/templates/clients/inner.html.j2 similarity index 100% rename from packages/sshecret-admin/src/sshecret_admin/templates/clients/inner.html.j2 rename to packages/sshecret-admin/src/sshecret_admin/frontend/templates/clients/inner.html.j2 diff --git a/packages/sshecret-admin/src/sshecret_admin/frontend/templates/dashboard.html b/packages/sshecret-admin/src/sshecret_admin/frontend/templates/dashboard.html new file mode 100644 index 0000000..1d719bd --- /dev/null +++ b/packages/sshecret-admin/src/sshecret_admin/frontend/templates/dashboard.html @@ -0,0 +1,89 @@ +{% extends "/dashboard/_base.html" %} {% block content %} + +
+ + + 12.5% + + Since last month +
++ + + 3,4% + + Since last month +
+I am outside of the package
- -