Add audit logging

This commit is contained in:
2025-03-23 21:09:52 +01:00
parent 08d8031d09
commit 8ebd8d50f7
5 changed files with 94 additions and 3 deletions

66
src/sshecret/audit.py Normal file
View File

@ -0,0 +1,66 @@
"""Audit setup."""
import enum
import json
import logging
from typing import Any
from pythonjsonlogger.json import JsonFormatter
from pydantic import BaseModel, ConfigDict
from .constants import AUDIT_LOG_NAME
AUDIT_LOG = logging.getLogger(AUDIT_LOG_NAME)
class AuditMessageType(enum.StrEnum):
"""Audit Message Type."""
ACCESS = enum.auto() # Someone accessed something
SECURITY = enum.auto() # A message related to security
INFORMATIONAL = enum.auto() # other informational messages
class AuditMessage(BaseModel):
"""Audit message."""
model_config = ConfigDict(use_enum_values=True)
type: AuditMessageType
message: str
client_name: str | None = None
source_address: str | None = None
secret_name: str | None = None
details: str | None = None
def __str__(self) -> str:
"""Stringify object as JSON."""
return self.model_dump_json()
def audit_message(
message: str,
audit_type: AuditMessageType | str | None = None,
client_name: str | None = None,
secret_name: str | None = None,
source_address: str | None = None,
details: str | None = None,
) -> None:
"""Create an audit message."""
if not audit_type:
audit_type = AuditMessageType.INFORMATIONAL
if audit_type not in list(AuditMessageType):
audit_type = AuditMessageType.INFORMATIONAL
audit_message = AuditMessage(
type=audit_type,
message=message,
client_name=client_name,
source_address=source_address,
secret_name=secret_name,
details=details,
)
AUDIT_LOG.info(audit_message.model_dump(exclude_none=True))

View File

@ -13,3 +13,5 @@ ERROR_SOURCE_IP_NOT_ALLOWED = (
RSA_PUBLIC_EXPONENT = 65537
RSA_KEY_SIZE = 2048
AUDIT_LOG_NAME = "AUDIT"

View File

@ -10,9 +10,12 @@ import tempfile
import threading
from pathlib import Path
from .server.async_server import start_server
from pythonjsonlogger.json import JsonFormatter
from .server import start_server
from sshecret.backends import FileTableBackend
from .utils import create_client_file, add_secret_to_client_file
from .constants import AUDIT_LOG_NAME
def thread_id_filter(record: logging.LogRecord) -> logging.LogRecord:
@ -31,6 +34,12 @@ handler.setFormatter(formatter)
LOG.addHandler(handler)
LOG.setLevel(logging.DEBUG)
AUDIT_LOG = logging.getLogger(AUDIT_LOG_NAME)
audit_formatter = JsonFormatter()
audit_handler = logging.StreamHandler()
audit_handler.setFormatter(audit_formatter)
AUDIT_LOG.addHandler(audit_handler)
@click.group()
def cli() -> None:

View File

@ -1,5 +1,5 @@
"""Sshecret server module."""
from .server import SshKeyServer
from .async_server import AsshyncServer, start_server
__all__ = ["SshKeyServer"]
__all__ = ["AsshyncServer", "start_server"]

View File

@ -8,6 +8,7 @@ from typing import override
import asyncssh
from sshecret import constants
from sshecret.audit import audit_message
from sshecret.types import ClientSpecification, BaseClientBackend
from sshecret.crypto import create_private_rsa_key
@ -17,26 +18,31 @@ LOG = logging.getLogger(__name__)
def handle_client(process: asyncssh.SSHServerProcess[str]) -> None:
"""Handle client."""
remote_ip = process.get_extra_info("peername")[0]
client_found = process.get_extra_info("client_allowed", False)
if not client_found:
process.stderr.write(constants.ERROR_UKNOWN_CLIENT_OR_SECRET + "\n")
audit_message("Unknown connection", source_address=remote_ip)
process.exit(1)
return
client_allowed = process.get_extra_info("client_allowed", False)
if not client_allowed:
audit_message("Not permitted", "SECURITY", source_address=remote_ip)
process.stderr.write(constants.ERROR_SOURCE_IP_NOT_ALLOWED + "\n")
process.exit(1)
return
client = process.get_extra_info("client")
if not client:
audit_message("Unknown client", source_address=remote_ip)
process.stderr.write(constants.ERROR_UKNOWN_CLIENT_OR_SECRET + "\n")
process.exit(1)
return
secret_name = process.command
if not secret_name:
audit_message("No secret specified", source_address=remote_ip, client_name=client.name)
process.stderr.write(constants.ERROR_UKNOWN_CLIENT_OR_SECRET + "\n")
process.exit(1)
return
@ -45,12 +51,14 @@ def handle_client(process: asyncssh.SSHServerProcess[str]) -> None:
"Client %s successfully connected. Fetching secret %s", client.name, secret_name
)
audit_message(f"Requested secret", client_name=client.name, secret_name=secret_name, source_address=remote_ip)
secret = client.secrets.get(secret_name)
if not secret:
process.stderr.write(constants.ERROR_UKNOWN_CLIENT_OR_SECRET + "\n")
process.exit(1)
return
audit_message("Accessed secret", client.name, secret_name, source_address=remote_ip)
process.stdout.write(secret)
process.exit(0)
@ -66,6 +74,8 @@ class AsshyncServer(asyncssh.SSHServer):
@override
def connection_made(self, conn: asyncssh.SSHServerConnection) -> None:
"""Handle incoming connection."""
peername = conn.get_extra_info("peername")
LOG.debug("Connection established from %r", peername)
self._conn = conn
@override
@ -81,6 +91,7 @@ class AsshyncServer(asyncssh.SSHServer):
LOG.debug("Remote_IP: %r", remote_ip)
assert isinstance(remote_ip, str)
if self.check_connection_allowed(client, remote_ip):
audit_message("Authentication requested", "ACCESS", client_name=client.name, source_address=remote_ip)
self._conn.set_extra_info(client_allowed=True)
self._conn.set_extra_info(client=client)
@ -101,10 +112,12 @@ class AsshyncServer(asyncssh.SSHServer):
"""Check if client is allowed to request secrets."""
LOG.debug("Checking if client is allowed to log in from %s", source)
if isinstance(client.allowed_ips, str) and client.allowed_ips == "*":
audit_message("Permitting login", "SECURITY", client_name=client.name, source_address=source)
LOG.debug("Client has no restrictions on source IP address. Permitting.")
return True
if isinstance(client.allowed_ips, str) and "/" not in client.allowed_ips:
if source == client.allowed_ips:
audit_message("Permitting login", "SECURITY", client_name=client.name, source_address=source)
LOG.debug("Client IP matches permitted address")
return True
@ -113,6 +126,7 @@ class AsshyncServer(asyncssh.SSHServer):
client.name,
source,
)
audit_message("REJECTED. Invalid address", "SECURITY", client_name=client.name, source_address=source)
return False