Centralize hashing

This commit is contained in:
2025-05-11 11:19:59 +02:00
parent d0b92b220e
commit b34c49d3e3
4 changed files with 17 additions and 17 deletions

View File

@ -93,3 +93,9 @@ def decode_token(settings: AdminServerSettings, token: str) -> TokenData | None:
except jwt.InvalidTokenError as e: except jwt.InvalidTokenError as e:
LOG.debug("Could not decode token: %s", e, exc_info=True) LOG.debug("Could not decode token: %s", e, exc_info=True)
return None return None
def hash_password(password: str) -> str:
"""Hash password."""
salt = bcrypt.gensalt()
hashed_password = bcrypt.hashpw(password.encode(), salt)
return hashed_password.decode()

View File

@ -46,10 +46,6 @@ class PasswordDB(SQLModel, table=True):
) )
def init_db(engine: sa.Engine) -> None:
"""Create database."""
SQLModel.metadata.create_all(engine)
class TokenData(SQLModel): class TokenData(SQLModel):
"""Token data.""" """Token data."""
@ -69,3 +65,6 @@ class LoginError(SQLModel):
title: str title: str
message: str message: str
def init_db(engine: sa.Engine) -> None:
"""Create database."""
SQLModel.metadata.create_all(engine)

View File

@ -5,13 +5,13 @@ import code
from collections.abc import Awaitable from collections.abc import Awaitable
import logging import logging
from typing import Any, cast from typing import Any, cast
import bcrypt
import click import click
from sshecret_admin.services.admin_backend import AdminBackend from sshecret_admin.services.admin_backend import AdminBackend
import uvicorn import uvicorn
from pydantic import ValidationError from pydantic import ValidationError
from sqlmodel import Session, create_engine, select from sqlmodel import Session, create_engine, select
from sshecret_admin.auth.models import init_db, User, PasswordDB from sshecret_admin.auth.models import init_db, User, PasswordDB
from sshecret_admin.auth.authentication import hash_password
from sshecret_admin.core.settings import AdminServerSettings from sshecret_admin.core.settings import AdminServerSettings
handler = logging.StreamHandler() handler = logging.StreamHandler()
@ -19,17 +19,6 @@ formatter = logging.Formatter(
"%(asctime)s [%(processName)s: %(process)d] [%(threadName)s: %(thread)d] [%(levelname)s] %(name)s: %(message)s" "%(asctime)s [%(processName)s: %(process)d] [%(threadName)s: %(thread)d] [%(levelname)s] %(name)s: %(message)s"
) )
handler.setFormatter(formatter) handler.setFormatter(formatter)
LOG = logging.getLogger()
LOG.addHandler(handler)
LOG.setLevel(logging.INFO)
def hash_password(password: str) -> str:
"""Hash password."""
salt = bcrypt.gensalt()
hashed_password = bcrypt.hashpw(password.encode(), salt)
return hashed_password.decode()
def create_user(session: Session, username: str, password: str) -> None: def create_user(session: Session, username: str, password: str) -> None:
"""Create a user.""" """Create a user."""
@ -44,8 +33,14 @@ def create_user(session: Session, username: str, password: str) -> None:
@click.pass_context @click.pass_context
def cli(ctx: click.Context, debug: bool) -> None: def cli(ctx: click.Context, debug: bool) -> None:
"""Sshecret Admin.""" """Sshecret Admin."""
LOG = logging.getLogger()
LOG.addHandler(handler)
if debug: if debug:
click.echo("Setting logging to debug level")
LOG.setLevel(logging.DEBUG) LOG.setLevel(logging.DEBUG)
else:
LOG.setLevel(logging.INFO)
try: try:
settings = AdminServerSettings() # pyright: ignore[reportCallIssue] settings = AdminServerSettings() # pyright: ignore[reportCallIssue]
except ValidationError as e: except ValidationError as e:

View File

@ -12,7 +12,7 @@ def setup_database(
) -> tuple[sa.Engine, Callable[[], Generator[Session, None, None]]]: ) -> tuple[sa.Engine, Callable[[], Generator[Session, None, None]]]:
"""Setup database.""" """Setup database."""
engine = create_engine(db_url, echo=True) engine = create_engine(db_url, echo=False)
def get_db_session() -> Generator[Session, None, None]: def get_db_session() -> Generator[Session, None, None]:
"""Get DB Session.""" """Get DB Session."""