From 3719a2611daafa32778e009a0cb9f962b428284f Mon Sep 17 00:00:00 2001 From: Allan Eising Date: Sun, 4 May 2025 09:20:11 +0200 Subject: [PATCH] Check in current backend --- packages/sshecret-backend/migrations/env.py | 17 +++++--- .../versions/a0befb5a74a0_initial_model.py | 33 --------------- .../f30e413c5757_add_subsystem_to_auditlog.py | 33 --------------- packages/sshecret-backend/pyproject.toml | 11 +++++ .../src/sshecret_backend/api/audit.py | 1 - .../src/sshecret_backend/api/policies.py | 2 +- .../src/sshecret_backend/cli.py | 41 +++++++++++++++---- .../src/sshecret_backend/db.py | 3 +- .../src/sshecret_backend/models.py | 11 +++-- .../src/sshecret_backend/settings.py | 36 ++++++++++++++-- 10 files changed, 93 insertions(+), 95 deletions(-) delete mode 100644 packages/sshecret-backend/migrations/versions/a0befb5a74a0_initial_model.py delete mode 100644 packages/sshecret-backend/migrations/versions/f30e413c5757_add_subsystem_to_auditlog.py diff --git a/packages/sshecret-backend/migrations/env.py b/packages/sshecret-backend/migrations/env.py index 91731d7..1e35f40 100644 --- a/packages/sshecret-backend/migrations/env.py +++ b/packages/sshecret-backend/migrations/env.py @@ -1,11 +1,20 @@ +import os from logging.config import fileConfig from sqlalchemy import engine_from_config from sqlalchemy import pool +from sqlmodel import create_engine from alembic import context from sshecret_backend.models import * +def get_database_url() -> str: + """Get database URL.""" + if db_file := os.getenv("SSHECRET_BACKEND_DB"): + return f"sqlite:///{db_file}" + return "sqlite:///sshecret.db" + + # this is the Alembic Config object, which provides # access to the values within the .ini file in use. config = context.config @@ -40,7 +49,7 @@ def run_migrations_offline() -> None: script output. """ - url = config.get_main_option("sqlalchemy.url") + url = get_database_url() context.configure( url=url, target_metadata=target_metadata, @@ -59,11 +68,7 @@ def run_migrations_online() -> None: and associate a connection with the context. """ - connectable = engine_from_config( - config.get_section(config.config_ini_section, {}), - prefix="sqlalchemy.", - poolclass=pool.NullPool, - ) + connectable = create_engine(get_database_url()) with connectable.connect() as connection: context.configure( diff --git a/packages/sshecret-backend/migrations/versions/a0befb5a74a0_initial_model.py b/packages/sshecret-backend/migrations/versions/a0befb5a74a0_initial_model.py deleted file mode 100644 index 08d7be8..0000000 --- a/packages/sshecret-backend/migrations/versions/a0befb5a74a0_initial_model.py +++ /dev/null @@ -1,33 +0,0 @@ -"""Initial model - -Revision ID: a0befb5a74a0 -Revises: -Create Date: 2025-04-28 21:18:59.069323 - -""" -from typing import Sequence, Union - -from alembic import op -import sqlalchemy as sa -import sqlmodel - - -# revision identifiers, used by Alembic. -revision: str = 'a0befb5a74a0' -down_revision: Union[str, None] = None -branch_labels: Union[str, Sequence[str], None] = None -depends_on: Union[str, Sequence[str], None] = None - - -def upgrade() -> None: - """Upgrade schema.""" - # ### commands auto generated by Alembic - please adjust! ### - pass - # ### end Alembic commands ### - - -def downgrade() -> None: - """Downgrade schema.""" - # ### commands auto generated by Alembic - please adjust! ### - pass - # ### end Alembic commands ### diff --git a/packages/sshecret-backend/migrations/versions/f30e413c5757_add_subsystem_to_auditlog.py b/packages/sshecret-backend/migrations/versions/f30e413c5757_add_subsystem_to_auditlog.py deleted file mode 100644 index 0d8c1bc..0000000 --- a/packages/sshecret-backend/migrations/versions/f30e413c5757_add_subsystem_to_auditlog.py +++ /dev/null @@ -1,33 +0,0 @@ -"""Add subsystem to auditlog - -Revision ID: f30e413c5757 -Revises: a0befb5a74a0 -Create Date: 2025-04-28 21:21:20.103423 - -""" -from typing import Sequence, Union - -from alembic import op -import sqlalchemy as sa -import sqlmodel - - -# revision identifiers, used by Alembic. -revision: str = 'f30e413c5757' -down_revision: Union[str, None] = 'a0befb5a74a0' -branch_labels: Union[str, Sequence[str], None] = None -depends_on: Union[str, Sequence[str], None] = None - - -def upgrade() -> None: - """Upgrade schema.""" - # ### commands auto generated by Alembic - please adjust! ### - op.add_column('auditlog', sa.Column('subsystem', sqlmodel.sql.sqltypes.AutoString(), nullable=True)) - # ### end Alembic commands ### - - -def downgrade() -> None: - """Downgrade schema.""" - # ### commands auto generated by Alembic - please adjust! ### - op.drop_column('auditlog', 'subsystem') - # ### end Alembic commands ### diff --git a/packages/sshecret-backend/pyproject.toml b/packages/sshecret-backend/pyproject.toml index deaaaca..03a5700 100644 --- a/packages/sshecret-backend/pyproject.toml +++ b/packages/sshecret-backend/pyproject.toml @@ -14,8 +14,13 @@ dependencies = [ "pytest>=8.3.5", "python-multipart>=0.0.20", "sqlmodel>=0.0.24", + "sshecret", ] + +[tool.uv.sources] +sshecret = { workspace = true } + [project.scripts] sshecret-backend = "sshecret_backend.cli:cli" @@ -26,3 +31,9 @@ build-backend = "hatchling.build" [tool.pytest.ini_options] log_cli = true log_cli_level = "INFO" + +[tool.pyright] +venvPath = "../.." +venv = ".venv" +strict = ["**/*.py"] +pythonVersion = "3.13" diff --git a/packages/sshecret-backend/src/sshecret_backend/api/audit.py b/packages/sshecret-backend/src/sshecret_backend/api/audit.py index 43a05c1..f429ec6 100644 --- a/packages/sshecret-backend/src/sshecret_backend/api/audit.py +++ b/packages/sshecret-backend/src/sshecret_backend/api/audit.py @@ -11,7 +11,6 @@ from typing import Annotated from sshecret_backend.models import AuditLog from sshecret_backend.types import DBSessionDep -from sshecret_backend import audit from sshecret_backend.view_models import AuditInfo diff --git a/packages/sshecret-backend/src/sshecret_backend/api/policies.py b/packages/sshecret-backend/src/sshecret_backend/api/policies.py index 70b794b..8c7a89e 100644 --- a/packages/sshecret-backend/src/sshecret_backend/api/policies.py +++ b/packages/sshecret-backend/src/sshecret_backend/api/policies.py @@ -7,7 +7,7 @@ from fastapi import APIRouter, Depends, HTTPException, Request from sqlmodel import Session, select from typing import Annotated -from sshecret_backend.models import Client, ClientAccessPolicy +from sshecret_backend.models import ClientAccessPolicy from sshecret_backend.view_models import ( ClientPolicyView, ClientPolicyUpdate, diff --git a/packages/sshecret-backend/src/sshecret_backend/cli.py b/packages/sshecret-backend/src/sshecret_backend/cli.py index 6eaaa15..956bad4 100644 --- a/packages/sshecret-backend/src/sshecret_backend/cli.py +++ b/packages/sshecret-backend/src/sshecret_backend/cli.py @@ -6,12 +6,12 @@ from pathlib import Path from typing import cast from dotenv import load_dotenv import click -from sqlmodel import Session, create_engine, select +from sqlmodel import Session, col, func, select import uvicorn -from .db import create_api_token +from .db import get_engine, create_api_token -from .models import Client, ClientSecret, ClientAccessPolicy, AuditLog, APIClient +from .models import Client, ClientSecret, ClientAccessPolicy, AuditLog, APIClient, init_db from .settings import BackendSettings DEFAULT_LISTEN = "127.0.0.1" @@ -21,6 +21,23 @@ WORKDIR = Path(os.getcwd()) load_dotenv() +def generate_token(settings: BackendSettings) -> str: + """Generate a token.""" + engine = get_engine(settings.db_url) + init_db(engine) + with Session(engine) as session: + token = create_api_token(session, True) + return token + +def count_tokens(settings: BackendSettings) -> int: + """Count the amount of tokens created.""" + engine = get_engine(settings.db_url) + init_db(engine) + with Session(engine) as session: + count = session.exec(select(func.count("*")).select_from(APIClient)).one() + + return count + @click.group() @click.option("--database", help="Path to the sqlite database file.") @@ -28,11 +45,19 @@ load_dotenv() def cli(ctx: click.Context, database: str) -> None: """CLI group.""" if database: - # Hopefully it's enough to set the environment variable as so. - settings = BackendSettings(db_url=f"sqlite:///{Path(database).absolute()}") + settings = BackendSettings(database=str(Path(database).absolute())) else: settings = BackendSettings() + + if settings.generate_initial_tokens: + if count_tokens(settings) == 0: + click.echo("Creating initial tokens for admin and sshd.") + admin_token = generate_token(settings) + sshd_token = generate_token(settings) + click.echo(f"Admin token: {admin_token}") + click.echo(f"SSHD token: {sshd_token}") + ctx.obj = settings @@ -41,9 +66,7 @@ def cli(ctx: click.Context, database: str) -> None: def cli_generate_token(ctx: click.Context) -> None: """Generate a token.""" settings = cast(BackendSettings, ctx.obj) - engine = create_engine(settings.db_url) - with Session(engine) as session: - token = create_api_token(session, True) + token = generate_token(settings) click.echo("Generated api token:") click.echo(token) @@ -61,7 +84,7 @@ def cli_run(host: str, port: int, dev: bool, workers: int | None) -> None: def cli_repl(ctx: click.Context) -> None: """Run an interactive console.""" settings = cast(BackendSettings, ctx.obj) - engine = create_engine(settings.db_url) + engine = get_engine(settings.db_url, True) with Session(engine) as session: locals = { diff --git a/packages/sshecret-backend/src/sshecret_backend/db.py b/packages/sshecret-backend/src/sshecret_backend/db.py index e1b9357..5840def 100644 --- a/packages/sshecret-backend/src/sshecret_backend/db.py +++ b/packages/sshecret-backend/src/sshecret_backend/db.py @@ -34,9 +34,8 @@ def setup_database( return engine, get_db_session -def get_engine(filename: Path, echo: bool = False) -> Engine: +def get_engine(url: URL, echo: bool = False) -> Engine: """Initialize the engine.""" - url = URL.create(drivername="sqlite", database=str(filename.absolute())) engine = create_engine(url, echo=echo) with engine.connect() as connection: connection.execute(text("PRAGMA foreign_keys=ON")) # for SQLite only diff --git a/packages/sshecret-backend/src/sshecret_backend/models.py b/packages/sshecret-backend/src/sshecret_backend/models.py index 15e89ea..4030e82 100644 --- a/packages/sshecret-backend/src/sshecret_backend/models.py +++ b/packages/sshecret-backend/src/sshecret_backend/models.py @@ -11,7 +11,7 @@ import logging import uuid from datetime import datetime import sqlalchemy as sa -from sqlmodel import Field, Relationship, SQLModel +from sqlmodel import JSON, Column, DateTime, Field, Relationship, SQLModel LOG = logging.getLogger(__name__) @@ -98,14 +98,13 @@ class AuditLog(SQLModel, table=True): """ id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True) - subsystem: str | None = None - object: str | None = None - object_id: str | None = None + subsystem: str + message: str operation: str client_id: uuid.UUID | None = None client_name: str | None = None - message: str origin: str | None = None + Field(default=None, sa_column=Column(JSON)) timestamp: datetime | None = Field( default=None, @@ -131,5 +130,5 @@ class APIClient(SQLModel, table=True): def init_db(engine: sa.Engine) -> None: """Create database.""" - LOG.info("Starting init_db") + LOG.info("Running init_db") SQLModel.metadata.create_all(engine) diff --git a/packages/sshecret-backend/src/sshecret_backend/settings.py b/packages/sshecret-backend/src/sshecret_backend/settings.py index 286ae35..7159fe5 100644 --- a/packages/sshecret-backend/src/sshecret_backend/settings.py +++ b/packages/sshecret-backend/src/sshecret_backend/settings.py @@ -1,21 +1,49 @@ """Settings management.""" -from pydantic import Field +from pathlib import Path +from typing import Annotated, Any +from pydantic import Field, field_validator from pydantic_settings import ( BaseSettings, SettingsConfigDict, + ForceDecode, ) +from sqlalchemy import URL -DEFAULT_DATABASE = "sqlite:///sshecret.db" +DEFAULT_DATABASE = "sshecret.db" class BackendSettings(BaseSettings): """Backend settings.""" - model_config = SettingsConfigDict(env_file=".backend.env", env_prefix="sshecret_") + model_config = SettingsConfigDict( + env_file=".backend.env", env_prefix="sshecret_backend_" + ) - db_url: str = Field(default=DEFAULT_DATABASE) + database: str = Field(default=DEFAULT_DATABASE) + generate_initial_tokens: Annotated[bool, ForceDecode] = Field(default=False) + + @field_validator("generate_initial_tokens", mode="before") + @classmethod + def cast_bool(cls, value: Any) -> bool: + """Ensure we catch the boolean.""" + if isinstance(value, str): + if value.lower() in ("1", "true", "on"): + return True + if value.lower() in ("0", "false", "off"): + return False + return bool(value) + + @property + def db_url(self) -> URL: + """Construct database url.""" + return URL.create(drivername="sqlite", database=self.database) + + @property + def db_exists(self) -> bool: + """Check if databatase exists.""" + return Path(self.database).exists() def get_settings() -> BackendSettings: