From b34c49d3e33e03f9457573590770fdbc0dd97b56 Mon Sep 17 00:00:00 2001 From: Allan Eising Date: Sun, 11 May 2025 11:19:59 +0200 Subject: [PATCH] Centralize hashing --- .../src/sshecret_admin/auth/authentication.py | 6 ++++++ .../src/sshecret_admin/auth/models.py | 7 +++---- .../src/sshecret_admin/core/cli.py | 19 +++++++------------ .../src/sshecret_admin/core/db.py | 2 +- 4 files changed, 17 insertions(+), 17 deletions(-) diff --git a/packages/sshecret-admin/src/sshecret_admin/auth/authentication.py b/packages/sshecret-admin/src/sshecret_admin/auth/authentication.py index f85eba3..8c584ec 100644 --- a/packages/sshecret-admin/src/sshecret_admin/auth/authentication.py +++ b/packages/sshecret-admin/src/sshecret_admin/auth/authentication.py @@ -93,3 +93,9 @@ def decode_token(settings: AdminServerSettings, token: str) -> TokenData | None: except jwt.InvalidTokenError as e: LOG.debug("Could not decode token: %s", e, exc_info=True) return None + +def hash_password(password: str) -> str: + """Hash password.""" + salt = bcrypt.gensalt() + hashed_password = bcrypt.hashpw(password.encode(), salt) + return hashed_password.decode() diff --git a/packages/sshecret-admin/src/sshecret_admin/auth/models.py b/packages/sshecret-admin/src/sshecret_admin/auth/models.py index 7a25d84..5d2ab86 100644 --- a/packages/sshecret-admin/src/sshecret_admin/auth/models.py +++ b/packages/sshecret-admin/src/sshecret_admin/auth/models.py @@ -46,10 +46,6 @@ class PasswordDB(SQLModel, table=True): ) -def init_db(engine: sa.Engine) -> None: - """Create database.""" - SQLModel.metadata.create_all(engine) - class TokenData(SQLModel): """Token data.""" @@ -69,3 +65,6 @@ class LoginError(SQLModel): title: str message: str +def init_db(engine: sa.Engine) -> None: + """Create database.""" + SQLModel.metadata.create_all(engine) diff --git a/packages/sshecret-admin/src/sshecret_admin/core/cli.py b/packages/sshecret-admin/src/sshecret_admin/core/cli.py index bbad59f..2e9df1a 100644 --- a/packages/sshecret-admin/src/sshecret_admin/core/cli.py +++ b/packages/sshecret-admin/src/sshecret_admin/core/cli.py @@ -5,13 +5,13 @@ import code from collections.abc import Awaitable import logging from typing import Any, cast -import bcrypt import click from sshecret_admin.services.admin_backend import AdminBackend import uvicorn from pydantic import ValidationError from sqlmodel import Session, create_engine, select from sshecret_admin.auth.models import init_db, User, PasswordDB +from sshecret_admin.auth.authentication import hash_password from sshecret_admin.core.settings import AdminServerSettings handler = logging.StreamHandler() @@ -19,17 +19,6 @@ formatter = logging.Formatter( "%(asctime)s [%(processName)s: %(process)d] [%(threadName)s: %(thread)d] [%(levelname)s] %(name)s: %(message)s" ) handler.setFormatter(formatter) -LOG = logging.getLogger() -LOG.addHandler(handler) -LOG.setLevel(logging.INFO) - - -def hash_password(password: str) -> str: - """Hash password.""" - salt = bcrypt.gensalt() - hashed_password = bcrypt.hashpw(password.encode(), salt) - return hashed_password.decode() - def create_user(session: Session, username: str, password: str) -> None: """Create a user.""" @@ -44,8 +33,14 @@ def create_user(session: Session, username: str, password: str) -> None: @click.pass_context def cli(ctx: click.Context, debug: bool) -> None: """Sshecret Admin.""" + LOG = logging.getLogger() + LOG.addHandler(handler) + if debug: + click.echo("Setting logging to debug level") LOG.setLevel(logging.DEBUG) + else: + LOG.setLevel(logging.INFO) try: settings = AdminServerSettings() # pyright: ignore[reportCallIssue] except ValidationError as e: diff --git a/packages/sshecret-admin/src/sshecret_admin/core/db.py b/packages/sshecret-admin/src/sshecret_admin/core/db.py index 7a74b0d..bd55f3e 100644 --- a/packages/sshecret-admin/src/sshecret_admin/core/db.py +++ b/packages/sshecret-admin/src/sshecret_admin/core/db.py @@ -12,7 +12,7 @@ def setup_database( ) -> tuple[sa.Engine, Callable[[], Generator[Session, None, None]]]: """Setup database.""" - engine = create_engine(db_url, echo=True) + engine = create_engine(db_url, echo=False) def get_db_session() -> Generator[Session, None, None]: """Get DB Session."""