Refactor database layer and auditing
This commit is contained in:
@ -3,15 +3,24 @@
|
||||
import code
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import cast
|
||||
from dotenv import load_dotenv
|
||||
from typing import Literal, cast
|
||||
|
||||
import click
|
||||
from sqlmodel import Session, col, func, select
|
||||
import uvicorn
|
||||
from dotenv import load_dotenv
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from .db import get_engine, create_api_token
|
||||
|
||||
from .models import Client, ClientSecret, ClientAccessPolicy, AuditLog, APIClient, init_db
|
||||
from .db import create_api_token, get_engine, hash_token
|
||||
from .models import (
|
||||
APIClient,
|
||||
AuditLog,
|
||||
Client,
|
||||
ClientAccessPolicy,
|
||||
ClientSecret,
|
||||
SubSystem,
|
||||
init_db,
|
||||
)
|
||||
from .settings import BackendSettings
|
||||
|
||||
DEFAULT_LISTEN = "127.0.0.1"
|
||||
@ -21,22 +30,44 @@ WORKDIR = Path(os.getcwd())
|
||||
|
||||
load_dotenv()
|
||||
|
||||
def generate_token(settings: BackendSettings) -> str:
|
||||
|
||||
def generate_token(
|
||||
settings: BackendSettings, subsystem: Literal["admin", "sshd"]
|
||||
) -> str:
|
||||
"""Generate a token."""
|
||||
engine = get_engine(settings.db_url)
|
||||
init_db(engine)
|
||||
with Session(engine) as session:
|
||||
token = create_api_token(session, True)
|
||||
token = create_api_token(session, subsystem)
|
||||
return token
|
||||
|
||||
def count_tokens(settings: BackendSettings) -> int:
|
||||
"""Count the amount of tokens created."""
|
||||
|
||||
def add_system_tokens(settings: BackendSettings) -> None:
|
||||
"""Add token for subsystems."""
|
||||
if not settings.admin_token and not settings.sshd_token:
|
||||
# Tokens should be generated manually.
|
||||
return
|
||||
|
||||
engine = get_engine(settings.db_url)
|
||||
init_db(engine)
|
||||
tokens: list[tuple[str, SubSystem]] = []
|
||||
if admin_token := settings.admin_token:
|
||||
tokens.append((admin_token, SubSystem.ADMIN))
|
||||
if sshd_token := settings.sshd_token:
|
||||
tokens.append((sshd_token, SubSystem.SSHD))
|
||||
with Session(engine) as session:
|
||||
count = session.exec(select(func.count("*")).select_from(APIClient)).one()
|
||||
for token, subsystem in tokens:
|
||||
hashed_token = hash_token(token)
|
||||
if existing := session.scalars(
|
||||
select(APIClient).where(APIClient.subsystem == subsystem)
|
||||
).first():
|
||||
existing.token = hashed_token
|
||||
else:
|
||||
new_token = APIClient(token=hashed_token, subsystem=subsystem)
|
||||
session.add(new_token)
|
||||
|
||||
return count
|
||||
session.commit()
|
||||
click.echo("Generated system tokens.")
|
||||
|
||||
|
||||
@click.group()
|
||||
@ -49,27 +80,30 @@ def cli(ctx: click.Context, database: str) -> None:
|
||||
else:
|
||||
settings = BackendSettings()
|
||||
|
||||
add_system_tokens(settings)
|
||||
|
||||
if settings.generate_initial_tokens:
|
||||
if count_tokens(settings) == 0:
|
||||
click.echo("Creating initial tokens for admin and sshd.")
|
||||
admin_token = generate_token(settings)
|
||||
sshd_token = generate_token(settings)
|
||||
click.echo(f"Admin token: {admin_token}")
|
||||
click.echo(f"SSHD token: {sshd_token}")
|
||||
# if settings.generate_initial_tokens:
|
||||
# if count_tokens(settings) == 0:
|
||||
# click.echo("Creating initial tokens for admin and sshd.")
|
||||
# admin_token = generate_token(settings)
|
||||
# sshd_token = generate_token(settings)
|
||||
# click.echo(f"Admin token: {admin_token}")
|
||||
# click.echo(f"SSHD token: {sshd_token}")
|
||||
|
||||
ctx.obj = settings
|
||||
|
||||
|
||||
@cli.command("generate-token")
|
||||
@click.argument("subsystem", type=click.Choice(["sshd", "admin"]))
|
||||
@click.pass_context
|
||||
def cli_generate_token(ctx: click.Context) -> None:
|
||||
"""Generate a token."""
|
||||
def cli_generate_token(ctx: click.Context, subsystem: Literal["sshd", "admin"]) -> None:
|
||||
"""Generate a token for a subsystem.."""
|
||||
settings = cast(BackendSettings, ctx.obj)
|
||||
token = generate_token(settings)
|
||||
token = generate_token(settings, subsystem)
|
||||
click.echo("Generated api token:")
|
||||
click.echo(token)
|
||||
|
||||
|
||||
@cli.command("run")
|
||||
@click.option("--host", default="127.0.0.1")
|
||||
@click.option("--port", default=8022, type=click.INT)
|
||||
@ -77,7 +111,10 @@ def cli_generate_token(ctx: click.Context) -> None:
|
||||
@click.option("--workers", type=click.INT)
|
||||
def cli_run(host: str, port: int, dev: bool, workers: int | None) -> None:
|
||||
"""Run the server."""
|
||||
uvicorn.run("sshecret_backend.main:app", host=host, port=port, reload=dev, workers=workers)
|
||||
uvicorn.run(
|
||||
"sshecret_backend.main:app", host=host, port=port, reload=dev, workers=workers
|
||||
)
|
||||
|
||||
|
||||
@cli.command("repl")
|
||||
@click.pass_context
|
||||
|
||||
Reference in New Issue
Block a user