diff --git a/packages/sshecret-admin/README.md b/packages/sshecret-admin/README.md new file mode 100644 index 0000000..e69de29 diff --git a/packages/sshecret-admin/pyproject.toml b/packages/sshecret-admin/pyproject.toml new file mode 100644 index 0000000..0de01a3 --- /dev/null +++ b/packages/sshecret-admin/pyproject.toml @@ -0,0 +1,24 @@ +[project] +name = "sshecret-admin" +version = "0.1.0" +description = "Add your description here" +readme = "README.md" +authors = [ + { name = "Allan Eising", email = "allan@eising.dk" } +] +requires-python = ">=3.13" +dependencies = [ + "bcrypt>=4.3.0", + "click>=8.1.8", + "cryptography>=44.0.2", + "fastapi[standard]>=0.115.12", + "httpx>=0.28.1", + "pydantic>=2.10.6", + "pyjwt>=2.10.1", + "pykeepass>=4.1.1.post1", + "sqlmodel>=0.0.24", +] + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" diff --git a/packages/sshecret-admin/src/sshecret_admin/__init__.py b/packages/sshecret-admin/src/sshecret_admin/__init__.py new file mode 100644 index 0000000..8903142 --- /dev/null +++ b/packages/sshecret-admin/src/sshecret_admin/__init__.py @@ -0,0 +1,5 @@ +"""Sshecret Admin API.""" + +from .app import app + +__all__ = ["app"] diff --git a/packages/sshecret-admin/src/sshecret_admin/admin_api.py b/packages/sshecret-admin/src/sshecret_admin/admin_api.py new file mode 100644 index 0000000..cc4d663 --- /dev/null +++ b/packages/sshecret-admin/src/sshecret_admin/admin_api.py @@ -0,0 +1,417 @@ +"""Admin API.""" + +import logging +from collections import defaultdict +from contextlib import asynccontextmanager +from datetime import datetime, timedelta, timezone +from typing import Annotated, Any + +import bcrypt +import jwt + +from fastapi import APIRouter, Depends, FastAPI, HTTPException, status +from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm +from pydantic import BaseModel +from sqlmodel import SQLModel + +from sqlmodel import Session, select + +from . import keepass +from .auth_models import User, PasswordDB, get_engine +from .backend import BackendClient, Client +from .master_password import setup_master_password +from .settings import ServerSettings +from .view_models import ( + ClientCreate, + SecretListView, + SecretView, + UpdateKeyModel, + UpdateKeyResponse, + UpdatePoliciesRequest, +) + + +LOG = logging.getLogger(__name__) + + +API_VERSION = "v1" +JWT_ALGORITHM = "HS256" +ACCESS_TOKEN_EXPIRE_MINUTES = 30 + + +class TokenData(BaseModel): + """Token data.""" + + username: str | None = None + + +class Token(BaseModel): + access_token: str + token_type: str + + +settings = ServerSettings() + +engine = get_engine(settings.admin_db) + + +def init_db() -> None: + """Create database.""" + SQLModel.metadata.create_all(engine) + + +oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") + + +async def get_session(): + """Get the session.""" + with Session(engine) as session: + yield session + + +def setup_password_manager() -> None: + """Setup password manager.""" + encr_master_password = setup_master_password(regenerate=False) + if not encr_master_password: + return + + with Session(engine) as session: + pwdb = PasswordDB(id=1, encrypted_password=encr_master_password) + session.add(pwdb) + session.commit() + + +async def get_password_manager(session: Annotated[Session, Depends(get_session)]): + """Get password manager.""" + password_db = session.exec(select(PasswordDB).where(PasswordDB.id == 1)).first() + if not password_db: + raise HTTPException( + 500, detail="Error: The password manager has not yet been set up." + ) + with keepass.load_password_manager(password_db.encrypted_password) as kp: + yield kp + + +def verify_password(plain_password: str, hashed_password: str) -> bool: + """Verify password against stored hash.""" + return bcrypt.checkpw(plain_password.encode(), hashed_password.encode()) + + +def authenticate_user(session: Session, username: str, password: str) -> User | None: + """Authenticate user.""" + user = session.exec(select(User).where(User.username == username)).first() + if not user: + return None + if not verify_password(password, user.hashed_password): + return None + return user + + +async def get_current_user( + token: Annotated[str, Depends(oauth2_scheme)], + session: Annotated[Session, Depends(get_session)], +) -> User: + """Get current user from token.""" + credentials_exception = HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Could not validate credentials", + headers={"WWW-Authenticate": "Bearer"}, + ) + try: + payload = jwt.decode(token, settings.secret_key, algorithms=[JWT_ALGORITHM]) + username = payload.get("sub") + if not username: + raise credentials_exception + token_data = TokenData(username=username) + except jwt.InvalidTokenError: + raise credentials_exception + + user = session.exec( + select(User).where(User.username == token_data.username) + ).first() + if not user: + raise credentials_exception + return user + + +async def get_current_active_user( + current_user: Annotated[User, Depends(get_current_user)] +) -> User: + """Get current active user.""" + if current_user.disabled: + raise HTTPException(status_code=400, detail="Inactive or disabled user") + return current_user + + +def create_access_token( + data: dict[str, Any], expires_delta: timedelta | None = None +) -> str: + """Create access token.""" + to_encode = data.copy() + expire = datetime.now(timezone.utc) + timedelta(minutes=15) + if expires_delta: + expire = datetime.now(timezone.utc) + expires_delta + to_encode.update({"exp": expire}) + encoded_jwt = jwt.encode(to_encode, settings.secret_key, algorithm=JWT_ALGORITHM) + return encoded_jwt + + +async def get_backend(): + """Get backend client.""" + backend_client = BackendClient(settings) + yield backend_client + + +async def map_secrets_to_clients(backend: BackendClient) -> defaultdict[str, list[str]]: + """Map secrets to clients.""" + clients = await backend.get_clients() + client_secret_map: defaultdict[str, list[str]] = defaultdict(list) + for client in clients: + for secret in client.secrets: + client_secret_map[secret].append(client.name) + return client_secret_map + + +@asynccontextmanager +async def lifespan(_app: FastAPI): + """Create lifespan context for the app.""" + init_db() + setup_password_manager() + yield + + +api = APIRouter( + prefix=f"/api/{API_VERSION}", + lifespan=lifespan, +) + + +@api.post("/token") +async def login_for_access_token( + session: Annotated[Session, Depends(get_session)], + form_data: Annotated[OAuth2PasswordRequestForm, Depends()], +) -> Token: + """Login user and generate token.""" + user = authenticate_user(session, form_data.username, form_data.password) + if not user: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Incorrect username or password", + headers={"WWW-Authenticate": "Bearer"}, + ) + access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) + access_token = create_access_token( + data={"sub": user.username}, expires_delta=access_token_expires + ) + return Token(access_token=access_token, token_type="bearer") + + +@api.get("/secrets/") +async def get_secret_names( + current_user: Annotated[User, Depends(get_current_active_user)], + backend: Annotated[BackendClient, Depends(get_backend)], + password_manager: Annotated[keepass.PasswordContext, Depends(get_password_manager)], +) -> list[SecretListView]: + """Get Secret Names.""" + # We get the list of clients first, so we can resolve access. + LOG.info("User %s requested get_secret_names", current_user.username) + client_secret_map = await map_secrets_to_clients(backend) + secrets = password_manager.get_available_secrets() + results: list[SecretListView] = [] + for secret in secrets: + client_list = client_secret_map.get(secret, []) + results.append(SecretListView(name=secret, clients=client_list)) + + return results + + +@api.get("/secrets/{name}") +async def get_secret( + name: str, + current_user: Annotated[User, Depends(get_current_active_user)], + backend: Annotated[BackendClient, Depends(get_backend)], + password_manager: Annotated[keepass.PasswordContext, Depends(get_password_manager)], +) -> SecretView: + """Get a secret.""" + LOG.info("User %s viewed secret %s", current_user.username, name) + client_secret_map = await map_secrets_to_clients(backend) + secret = password_manager.get_secret(name) + if not secret: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, detail="Item not found." + ) + clients = client_secret_map[secret] + return SecretView(name=name, secret=secret, clients=clients) + + +@api.get("/clients/") +async def get_clients( + current_user: Annotated[User, Depends(get_current_active_user)], + backend: Annotated[BackendClient, Depends(get_backend)], +) -> list[Client]: + """Get clients.""" + LOG.info("User %s requested get_clients", current_user.username) + clients = await backend.get_clients() + return clients + + +@api.post("/clients/") +async def create_client( + new_client: ClientCreate, + current_user: Annotated[User, Depends(get_current_active_user)], + backend: Annotated[BackendClient, Depends(get_backend)], +) -> Client: + """Create a new client.""" + LOG.info( + "User %s requested create_client %", current_user.username, new_client.name + ) + await backend.register_client(new_client.name, new_client.public_key) + if new_client.sources: + LOG.debug("Creating policy sources") + sources = [str(source) for source in new_client.sources] + await backend.update_client_sources(new_client.name, sources) + + client = await backend.get_client(new_client.name) + if not client: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="The client could not be created", + ) + return client + + +@api.delete("/clients/{name}") +async def delete_client( + name: str, + current_user: Annotated[User, Depends(get_current_active_user)], + backend: Annotated[BackendClient, Depends(get_backend)], +) -> None: + """Delete a client.""" + LOG.info("User %s requested delete_client %s", current_user.username, name) + await backend.delete_client(name) + + +@api.put("/clients/{name}/public-key") +async def update_client_public_key( + name: str, + updated: UpdateKeyModel, + current_user: Annotated[User, Depends(get_current_active_user)], + backend: Annotated[BackendClient, Depends(get_backend)], + password_manager: Annotated[keepass.PasswordContext, Depends(get_password_manager)], +) -> UpdateKeyResponse: + """Update client public key. + + Updating the public key will invalidate the current secrets, so these well + be resolved first, and re-encrypted using the new key. + """ + LOG.info( + "User %s requested public key update for client %s", current_user.username, name + ) + # Let's first ensure that the key is actually updated. + client = await backend.get_client(name) + if not client: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, detail="Item not found." + ) + + if client.public_key == updated.public_key: + return UpdateKeyResponse( + public_key=updated.public_key, + detail="Updated key identical to existing key. No changes were made.", + ) + LOG.debug("Updating public key on backend.") + await backend.update_client_key(name, updated.public_key) + client.public_key = updated.public_key + response = UpdateKeyResponse(public_key=updated.public_key) + for secret in client.secrets: + LOG.debug("Re-encrypting secret %s for client %s", secret, name) + secret_value = password_manager.get_secret(secret) + if not secret_value: + LOG.warning("Referenced secret %s does not exist! Skipping.", secret_value) + continue + encrypted = client.encrypt(secret_value) + LOG.debug("Sending new encrypted value to backend.") + await backend.create_secret(name, secret, encrypted) + response.updated_secrets.append(secret) + + return response + + +@api.put("/clients/{name}/secrets/{secret_name}") +async def add_secret_to_client( + name: str, + secret_name: str, + current_user: Annotated[User, Depends(get_current_active_user)], + backend: Annotated[BackendClient, Depends(get_backend)], + password_manager: Annotated[keepass.PasswordContext, Depends(get_password_manager)], +) -> None: + """Add secret to a client.""" + LOG.info( + "User %s requested add_secret_to_client for secret %s", + current_user.username, + secret_name, + ) + client = await backend.get_client(name) + if not client: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, detail="Item not found" + ) + secret = password_manager.get_secret(secret_name) + if not secret: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, detail="Item not found" + ) + await backend.create_secret(name, secret_name, client.encrypt(secret)) + + +@api.delete("/clients/{name}/secrets/{secret_name}") +async def delete_secret_from_client( + name: str, + secret_name: str, + current_user: Annotated[User, Depends(get_current_active_user)], + backend: Annotated[BackendClient, Depends(get_backend)], +) -> None: + """Delete a secret from a client.""" + LOG.info( + "User % requested delete of secret. Client: %s, secret: %s", + current_user.username, + name, + secret_name, + ) + client = await backend.get_client(name) + if not client: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, detail="Item not found" + ) + + if secret_name not in client.secrets: + LOG.debug("Client does not have requested secret. No action to perform.") + return None + + await backend.delete_client_secret(name, secret_name) + + +@api.put("/clients/{name}/policies") +async def update_client_policies( + name: str, + updated: UpdatePoliciesRequest, + current_user: Annotated[User, Depends(get_current_active_user)], + backend: Annotated[BackendClient, Depends(get_backend)], +) -> Client: + """Update the client access policies.""" + LOG.info("User %s requested update_client_policies.", current_user.username) + client = await backend.get_client(name) + if not client: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, detail="Item not found" + ) + + LOG.debug("Old policies: %r. New: %r", client.policies, updated.sources) + + addresses: list[str] = [str(source) for source in updated.sources] + await backend.update_client_sources(name, addresses) + client = await backend.get_client(name) + + assert client is not None, "Critical: The client disappeared after update!" + + return client diff --git a/packages/sshecret-admin/src/sshecret_admin/app.py b/packages/sshecret-admin/src/sshecret_admin/app.py new file mode 100644 index 0000000..a02a624 --- /dev/null +++ b/packages/sshecret-admin/src/sshecret_admin/app.py @@ -0,0 +1,21 @@ +"""FastAPI app.""" + +from fastapi import FastAPI, Request, status +from fastapi.encoders import jsonable_encoder +from fastapi.exceptions import RequestValidationError +from fastapi.responses import JSONResponse + +from .admin_api import api + + +app = FastAPI() + + +@app.exception_handler(RequestValidationError) +async def validation_exception_handler(request: Request, exc: RequestValidationError): + return JSONResponse( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + content=jsonable_encoder({"detail": exc.errors(), "body": exc.body}), + ) + +app.include_router(api) diff --git a/packages/sshecret-admin/src/sshecret_admin/auth_models.py b/packages/sshecret-admin/src/sshecret_admin/auth_models.py new file mode 100644 index 0000000..d55b934 --- /dev/null +++ b/packages/sshecret-admin/src/sshecret_admin/auth_models.py @@ -0,0 +1,50 @@ +"""Models for authentication.""" + +from datetime import datetime +from pathlib import Path +import sqlalchemy as sa +from sqlalchemy.engine import URL + +from sqlmodel import SQLModel, Field, create_engine + + +class User(SQLModel, table=True): + """Users.""" + + username: str = Field(unique=True, primary_key=True) + hashed_password: str + disabled: bool = Field(default=False) + created_at: datetime | None = Field( + default=None, + sa_type=sa.DateTime(timezone=True), + sa_column_kwargs={"server_default": sa.func.now()}, + nullable=False, + ) + + + +class PasswordDB(SQLModel, table=True): + """Password database.""" + id: int | None = Field(default=None, primary_key=True) + encrypted_password: str + + created_at: datetime | None = Field( + default=None, + sa_type=sa.DateTime(timezone=True), + sa_column_kwargs={"server_default": sa.func.now()}, + nullable=False, + ) + + updated_at: datetime | None = Field( + default=None, + sa_type=sa.DateTime(timezone=True), + sa_column_kwargs={"onupdate": sa.func.now(), "server_default": sa.func.now()}, + ) + + +def get_engine(filename: Path, echo: bool = False) -> sa.Engine: + """Initialize the engine.""" + url = URL.create(drivername="sqlite", database=str(filename.absolute())) + engine = create_engine(url, echo=echo) + + return engine diff --git a/packages/sshecret-admin/src/sshecret_admin/backend.py b/packages/sshecret-admin/src/sshecret_admin/backend.py new file mode 100644 index 0000000..201f684 --- /dev/null +++ b/packages/sshecret-admin/src/sshecret_admin/backend.py @@ -0,0 +1,156 @@ +"""Backend API. + +TODO: + - Handle exceptions with custom exceptions + - Move to shared library. +""" + +import uuid +import urllib.parse +from datetime import datetime +import httpx +from pydantic import BaseModel, IPvAnyAddress, IPvAnyNetwork, TypeAdapter + +from .crypto import load_public_key, encrypt_string +from .settings import ServerSettings + + +class Client(BaseModel): + """Implementation of the backend class ClientView.""" + + id: uuid.UUID + name: str + public_key: str + secrets: list[str] + policies: list[IPvAnyNetwork | IPvAnyAddress] + created_at: datetime + updated_at: datetime | None + + def encrypt(self, value: str) -> str: + """Encrypt a string.""" + public_key = load_public_key(self.public_key.encode()) + return encrypt_string(value, public_key=public_key) + + +class BackendClient: + """Backend Client.""" + + def __init__(self, settings: ServerSettings | None = None) -> None: + """Initialize backend client.""" + if not settings: + settings = ServerSettings() # pyright: ignore[reportCallIssue] + self.settings: ServerSettings = settings + + @property + def headers(self) -> dict[str, str]: + """Get the headers.""" + return {"X-Api-Token": self.settings.backend_token} + + def _format_url(self, path: str) -> str: + """Format a URL.""" + return urllib.parse.urljoin(str(self.settings.backend_url), path) + + async def request(self, path: str) -> httpx.Response: + """Send a simple GET request.""" + url = self._format_url(path) + async with httpx.AsyncClient() as http_client: + response = await http_client.get(url, headers=self.headers) + return response + + async def register_client(self, username: str, public_key: str) -> None: + """Register a new client.""" + data = { + "name": username, + "public_key": public_key, + } + path = "api/v1/clients/" + url = self._format_url(path) + async with httpx.AsyncClient() as http_client: + response = await http_client.post(url, headers=self.headers, json=data) + + response.raise_for_status() + + async def update_client_key(self, client_name: str, public_key: str) -> None: + """Update the client key.""" + path = f"api/v1/clients/{client_name}/public-key" + url = self._format_url(path) + async with httpx.AsyncClient() as http_client: + response = await http_client.post( + url, headers=self.headers, json={"public_key": public_key} + ) + + response.raise_for_status() + + async def create_secret( + self, client_name: str, secret_name: str, encrypted_secret: str + ) -> None: + """Create a secret. + + This will overwrite any existing secret with that name. + """ + path = f"api/v1/clients/{client_name}/secrets/{secret_name}" + url = self._format_url(path) + async with httpx.AsyncClient() as http_client: + response = await http_client.put( + url, headers=self.headers, json={"value": encrypted_secret} + ) + + response.raise_for_status() + + async def delete_client_secret(self, client_name: str, secret_name: str) -> None: + """Delete a secret from a client.""" + path = f"api/v1/clients/{client_name}/secrets/{secret_name}" + url = self._format_url(path) + async with httpx.AsyncClient() as http_client: + response = await http_client.delete(url, headers=self.headers) + + response.raise_for_status() + + async def get_client(self, name: str) -> Client | None: + """Get a single client.""" + path = f"api/v1/clients/{name}" + response = await self.request(path) + if response.status_code == 404: + return None + response.raise_for_status() + client = Client.model_validate(response.json()) + return client + + async def get_clients(self) -> list[Client]: + """Get all clients.""" + path = "api/v1/clients/" + url = self._format_url(path) + async with httpx.AsyncClient() as http_client: + response = await http_client.get(url, headers=self.headers) + + response.raise_for_status() + client_list_adapter = TypeAdapter(list[Client]) + return client_list_adapter.validate_python(response.json()) + + async def update_client_sources( + self, client_name: str, addresses: list[str] | None + ) -> None: + """Update client source addresses. + + Pass None to sources to allow from all. + """ + if not addresses: + addresses = [] + + path = f"api/v1/clients/{client_name}/policies/" + url = self._format_url(path) + async with httpx.AsyncClient() as http_client: + response = await http_client.put( + url, headers=self.headers, json={"sources": addresses} + ) + + response.raise_for_status() + + async def delete_client(self, client_name: str) -> None: + """Delete a client.""" + path = f"api/v1/clients/{client_name}" + url = self._format_url(path) + async with httpx.AsyncClient() as http_client: + response = await http_client.delete(url, headers=self.headers) + + response.raise_for_status() diff --git a/packages/sshecret-admin/src/sshecret_admin/constants.py b/packages/sshecret-admin/src/sshecret_admin/constants.py new file mode 100644 index 0000000..5b9dd20 --- /dev/null +++ b/packages/sshecret-admin/src/sshecret_admin/constants.py @@ -0,0 +1,2 @@ +RSA_PUBLIC_EXPONENT = 65537 +RSA_KEY_SIZE = 2048 diff --git a/packages/sshecret-admin/src/sshecret_admin/crypto.py b/packages/sshecret-admin/src/sshecret_admin/crypto.py new file mode 100644 index 0000000..bc7f48b --- /dev/null +++ b/packages/sshecret-admin/src/sshecret_admin/crypto.py @@ -0,0 +1,125 @@ +"""Encryption related functions. + +Note! Encryption uses the less secure PKCS1v15 padding. This is to allow +decryption via openssl on the command line. + +""" + +import base64 +import logging +from pathlib import Path +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.asymmetric import rsa +from cryptography.hazmat.primitives.asymmetric import padding + +from . import constants + +LOG = logging.getLogger(__name__) + + +def load_public_key(keybytes: bytes) -> rsa.RSAPublicKey: + public_key = serialization.load_ssh_public_key(keybytes) + if not isinstance(public_key, rsa.RSAPublicKey): + raise RuntimeError("Only RSA keys are supported.") + return public_key + + +def validate_public_key(key: str) -> bool: + """Check if key provided in a string is valid.""" + valid = False + public_key: rsa.RSAPublicKey | None = None + try: + keybytes = key.encode() + public_key = load_public_key(keybytes) + except Exception as e: + LOG.debug("Validation of public key failed: %s", e, exc_info=True) + else: + valid = True + + if not isinstance(public_key, rsa.RSAPublicKey): + valid = False + + return valid + + +def load_private_key(filename: str, password: str | None = None) -> rsa.RSAPrivateKey: + """Load a private key.""" + password_bytes: bytes | None = None + if password: + password_bytes = password.encode() + with open(filename, "rb") as f: + private_key = serialization.load_ssh_private_key(f.read(), password=password_bytes) + if not isinstance(private_key, rsa.RSAPrivateKey): + raise RuntimeError("Only RSA keys are supported.") + return private_key + + +def encrypt_string(string: str, public_key: rsa.RSAPublicKey) -> str: + """Encrypt string, end return it base64 encoded.""" + message = string.encode() + ciphertext = public_key.encrypt( + message, + padding.PKCS1v15(), + ) + return base64.b64encode(ciphertext).decode() + + +def decode_string(ciphertext: str, private_key: rsa.RSAPrivateKey) -> str: + """Decode a string. String must be base64 encoded.""" + decoded = base64.b64decode(ciphertext) + decrypted = private_key.decrypt( + decoded, + padding.PKCS1v15(), + ) + return decrypted.decode() + + +def generate_private_key() -> rsa.RSAPrivateKey: + """Generate private RSA key.""" + private_key = rsa.generate_private_key( + public_exponent=constants.RSA_PUBLIC_EXPONENT, key_size=constants.RSA_KEY_SIZE + ) + return private_key + + +def generate_pem(private_key: rsa.RSAPrivateKey) -> str: + """Generate PEM.""" + pem = private_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.OpenSSH, + encryption_algorithm=serialization.NoEncryption(), + ) + return pem.decode() + + +def create_private_rsa_key(filename: Path, password: str | None = None) -> None: + """Create an RSA Private key at the given path. + + A password may be provided for secure storage. + """ + if filename.exists(): + raise RuntimeError("Error: private key file already exists.") + LOG.debug("Generating private RSA key at %s", filename) + private_key = generate_private_key() + encryption_algorithm = serialization.NoEncryption() + if password: + password_bytes = password.encode() + encryption_algorithm = serialization.BestAvailableEncryption(password_bytes) + with open(filename, "wb") as f: + pem = private_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.OpenSSH, + encryption_algorithm=encryption_algorithm, + ) + lines = f.write(pem) + LOG.debug("Wrote %s lines", lines) + f.flush() + + +def generate_public_key_string(public_key: rsa.RSAPublicKey) -> str: + """Generate public key string.""" + keybytes = public_key.public_bytes( + encoding=serialization.Encoding.OpenSSH, + format=serialization.PublicFormat.OpenSSH, + ) + return keybytes.decode() diff --git a/packages/sshecret-admin/src/sshecret_admin/keepass.py b/packages/sshecret-admin/src/sshecret_admin/keepass.py new file mode 100644 index 0000000..860a186 --- /dev/null +++ b/packages/sshecret-admin/src/sshecret_admin/keepass.py @@ -0,0 +1,101 @@ +"""Keepass password manager.""" + +import logging +from collections.abc import Iterator +from contextlib import contextmanager +from pathlib import Path +from typing import cast + +import pykeepass +from sshecret_admin.master_password import retrieve_master_password + + +LOG = logging.getLogger(__name__) + +NO_USERNAME = "NO_USERNAME" + +settings = ServerSettings() + +DEFAULT_LOCATION = "keepass.kdbx" + +def create_password_db(location: Path, password: str) -> None: + """Create the password database.""" + LOG.info("Creating password database at %s", location) + pykeepass.create_database(str(location.absolute()), password=password) + + +class PasswordContext: + """Password Context class.""" + + def __init__(self, keepass: pykeepass.PyKeePass) -> None: + """Initialize password context.""" + self.keepass: pykeepass.PyKeePass = keepass + + def add_entry(self, entry_name: str, secret: str, overwrite: bool = False) -> None: + """Add an entry. + + Specify overwrite=True to overwrite the existing secret value, if it exists. + """ + entry = cast( + "pykeepass.entry.Entry | None", + self.keepass.find_entries(title=entry_name, first=True), + ) + if entry and overwrite: + entry.password = secret + elif entry: + raise ValueError("Error: A secret with this name already exists.") + LOG.debug("Add secret entry to keepass: %s", entry_name) + entry = self.keepass.add_entry( + destination_group=self.keepass.root_group, + title=entry_name, + username=NO_USERNAME, + password=secret, + ) + self.keepass.save() + + def get_secret(self, entry_name: str) -> str | None: + """Get the secret value.""" + entry = cast( + "pykeepass.entry.Entry | None", + self.keepass.find_entries(title=entry_name, first=True), + ) + if not entry: + return None + + LOG.warning("Secret name %s accessed", entry_name) + if password := cast(str, entry.password): + return str(entry.password) + + raise RuntimeError(f"Cannot get password for entry {entry_name}") + + def get_available_secrets(self) -> list[str]: + """Get the names of all secrets in the database.""" + entries = self.keepass.entries + if not entries: + return [] + return [str(entry.title) for entry in entries] + + +@contextmanager +def _password_context(location: Path, password: str) -> Iterator[PasswordContext]: + """Open the password context.""" + database = pykeepass.PyKeePass(str(location.absolute()), password=password) + context = PasswordContext(database) + yield context + + + +@contextmanager +def load_password_manager(encrypted_password: str, location: str = DEFAULT_LOCATION) -> Iterator[PasswordContext]: + """Load password manager. + + This function decrypts the password, and creates the password database if it + has not yet been created. + """ + db_location = Path(location) + password = retrieve_master_password(encrypted_password) + if not db_location.exists(): + create_password_db(db_location, password) + + with _password_context(db_location, password) as context: + yield context diff --git a/packages/sshecret-admin/src/sshecret_admin/master_password.py b/packages/sshecret-admin/src/sshecret_admin/master_password.py new file mode 100644 index 0000000..de10bfc --- /dev/null +++ b/packages/sshecret-admin/src/sshecret_admin/master_password.py @@ -0,0 +1,76 @@ +"""Functions related to handling the password database master password.""" + +import secrets +import shutil +from pathlib import Path +from sshecret_admin.crypto import ( + create_private_rsa_key, + load_private_key, + encrypt_string, + decode_string, +) +from sshecret_admin.settings import ServerSettings + +KEY_FILENAME = "sshecret-admin-key" + + +settings = ServerSettings() + + +def setup_master_password( + filename: str = KEY_FILENAME, regenerate: bool = False +) -> str | None: + """Setup master password. + + If regenerate is True, a new key will be generated. + + This method should run just after setting up the database. + """ + created = _initial_key_setup(filename, regenerate) + if not created: + return None + + return _generate_master_password(filename) + + +def decrypt_master_password(encrypted: str, filename: str = KEY_FILENAME) -> str: + """Retrieve master password.""" + keyfile = Path(filename) + if not keyfile.exists(): + raise RuntimeError("Error: Private key has not been generated yet.") + + private_key = load_private_key(KEY_FILENAME, password=settings.secret_key) + return decode_string(encrypted, private_key) + + +def _generate_password() -> str: + """Generate a password.""" + return secrets.token_urlsafe(32) + + +def _initial_key_setup(filename: str = KEY_FILENAME, regenerate: bool = False) -> bool: + """Set up initial keys.""" + keyfile = Path(filename) + + if keyfile.exists() and not regenerate: + return False + + assert ( + settings.secret_key is not None + ), "Error: Could not load a secret key from environment." + create_private_rsa_key(keyfile, password=settings.secret_key) + return True + + +def _generate_master_password(filename: str = KEY_FILENAME) -> str: + """Generate master password for password database. + + Returns the encrypted string, base64 encoded. + """ + keyfile = Path(filename) + if not keyfile.exists(): + raise RuntimeError("Error: Private key has not been generated yet.") + private_key = load_private_key(filename, password=settings.secret_key) + public_key = private_key.public_key() + master_password = _generate_password() + return encrypt_string(master_password, public_key) diff --git a/packages/sshecret-admin/src/sshecret_admin/settings.py b/packages/sshecret-admin/src/sshecret_admin/settings.py new file mode 100644 index 0000000..7039f3a --- /dev/null +++ b/packages/sshecret-admin/src/sshecret_admin/settings.py @@ -0,0 +1,23 @@ +"""SSH Server settings.""" + +from pathlib import Path +from pydantic import AnyHttpUrl, Field +from pydantic_settings import BaseSettings, SettingsConfigDict + + +DEFAULT_LISTEN_PORT = 8822 + +DEFAULT_DB = "ssh_admin.db" + +class ServerSettings(BaseSettings): + """Server Settings.""" + + model_config = SettingsConfigDict(env_file=".admin.env", env_prefix="sshecret_admin_", secrets_dir='/var/run') + + backend_url: AnyHttpUrl = Field(alias="sshecret_backend_url") + backend_token: str + listen_address: str = Field(default="") + secret_key: str + port: int = DEFAULT_LISTEN_PORT + admin_db: Path = Path(DEFAULT_DB) + debug: bool = False diff --git a/packages/sshecret-admin/src/sshecret_admin/view_models.py b/packages/sshecret-admin/src/sshecret_admin/view_models.py new file mode 100644 index 0000000..9eeff29 --- /dev/null +++ b/packages/sshecret-admin/src/sshecret_admin/view_models.py @@ -0,0 +1,55 @@ +"""Models for the API.""" + +from typing import Annotated +from pydantic import AfterValidator, BaseModel, Field, IPvAnyAddress, IPvAnyNetwork +from .crypto import validate_public_key +from .backend import Client + + +def public_key_validator(value: str) -> str: + """Public key validator.""" + if validate_public_key(value): + return value + raise ValueError("Error: Public key must be a valid RSA public key.") + +class SecretListView(BaseModel): + """Model containing a list of all available secrets.""" + + name: str + clients: list[str] = Field(default_factory=list) # Clients that have access to it. + + +class SecretView(BaseModel): + """Model containing a secret, including its clear-text value.""" + + name: str + secret: str + clients: list[str] = Field(default_factory=list) # Clients that have access to it. + + +class UpdateKeyModel(BaseModel): + """Model for updating client public key.""" + + public_key: Annotated[str, AfterValidator(public_key_validator)] + + +class UpdateKeyResponse(BaseModel): + """Response model after updating the public key.""" + + public_key: str + updated_secrets: list[str] = Field(default_factory=list) + detail: str | None = None + + +class UpdatePoliciesRequest(BaseModel): + """Update policy request.""" + + sources: list[IPvAnyAddress | IPvAnyNetwork] + + +class ClientCreate(BaseModel): + """Model to create a client.""" + + name: str + public_key: Annotated[str, AfterValidator(public_key_validator)] + sources: list[IPvAnyAddress | IPvAnyNetwork] = Field(default_factory=list)