"""CLI and main entry point.""" import code import os from pathlib import Path from typing import Literal, cast import click from sshecret_backend.auth import hash_token import uvicorn from dotenv import load_dotenv from sqlalchemy import select from sqlalchemy.orm import Session from .db import create_api_token, get_engine from .models import ( APIClient, AuditLog, Client, ClientAccessPolicy, ClientSecret, SubSystem, init_db, ) from .settings import BackendSettings DEFAULT_LISTEN = "127.0.0.1" DEFAULT_PORT = 8022 WORKDIR = Path(os.getcwd()) load_dotenv() def generate_token( settings: BackendSettings, subsystem: Literal["admin", "sshd"] ) -> str: """Generate a token.""" engine = get_engine(settings.db_url) init_db(engine) with Session(engine) as session: token = create_api_token(session, subsystem) return token def add_system_tokens(settings: BackendSettings) -> None: """Add token for subsystems.""" if not settings.admin_token and not settings.sshd_token: # Tokens should be generated manually. return engine = get_engine(settings.db_url) init_db(engine) tokens: list[tuple[str, SubSystem]] = [] if admin_token := settings.admin_token: tokens.append((admin_token, SubSystem.ADMIN)) if sshd_token := settings.sshd_token: tokens.append((sshd_token, SubSystem.SSHD)) with Session(engine) as session: for token, subsystem in tokens: hashed_token = hash_token(token) if existing := session.scalars( select(APIClient).where(APIClient.subsystem == subsystem) ).first(): existing.token = hashed_token else: new_token = APIClient(token=hashed_token, subsystem=subsystem) session.add(new_token) session.commit() click.echo("Generated system tokens.") @click.group() @click.option("--database", help="Path to the sqlite database file.") @click.pass_context def cli(ctx: click.Context, database: str) -> None: """CLI group.""" if database: settings = BackendSettings(database=str(Path(database).absolute())) else: settings = BackendSettings() add_system_tokens(settings) # if settings.generate_initial_tokens: # if count_tokens(settings) == 0: # click.echo("Creating initial tokens for admin and sshd.") # admin_token = generate_token(settings) # sshd_token = generate_token(settings) # click.echo(f"Admin token: {admin_token}") # click.echo(f"SSHD token: {sshd_token}") ctx.obj = settings @cli.command("generate-token") @click.argument("subsystem", type=click.Choice(["sshd", "admin"])) @click.pass_context def cli_generate_token(ctx: click.Context, subsystem: Literal["sshd", "admin"]) -> None: """Generate a token for a subsystem..""" settings = cast(BackendSettings, ctx.obj) token = generate_token(settings, subsystem) click.echo("Generated api token:") click.echo(token) @cli.command("run") @click.option("--host", default="127.0.0.1") @click.option("--port", default=8022, type=click.INT) @click.option("--dev", is_flag=True) @click.option("--workers", type=click.INT) def cli_run(host: str, port: int, dev: bool, workers: int | None) -> None: """Run the server.""" uvicorn.run( "sshecret_backend.main:app", host=host, port=port, reload=dev, workers=workers ) @cli.command("repl") @click.pass_context def cli_repl(ctx: click.Context) -> None: """Run an interactive console.""" settings = cast(BackendSettings, ctx.obj) engine = get_engine(settings.db_url, True) with Session(engine) as session: locals = { "session": session, "select": select, "Client": Client, "ClientSecret": ClientSecret, "ClientAccessPolicy": ClientAccessPolicy, "APIClient": APIClient, "AuditLog": AuditLog, } console = code.InteractiveConsole(locals=locals, local_exit=True) banner = "Sshecret-backend REPL.\nUse 'session' to interact with the database." console.interact(banner=banner, exitmsg="Bye!")