Files
sshecret/packages/sshecret-backend/src/sshecret_backend/audit.py

236 lines
6.0 KiB
Python

"""Audit methods."""
from collections.abc import Sequence
from fastapi import Request
from sqlalchemy import select
from sqlalchemy.orm import Session
from .models import AuditLog, Client, ClientSecret, ClientAccessPolicy, Operation, SubSystem
def _get_origin(request: Request) -> str | None:
"""Resolve the request origin."""
origin: str | None = None
if request.client:
origin = request.client.host
return origin
def _write_audit_log(
session: Session, request: Request, entry: AuditLog, commit: bool = True
) -> None:
"""Write the audit log."""
origin = _get_origin(request)
entry.origin = origin
entry.subsystem = SubSystem.BACKEND
session.add(entry)
if commit:
session.commit()
def audit_create_client(
session: Session, request: Request, client: Client, commit: bool = True
) -> None:
"""Log the creation of a client."""
entry = AuditLog(
operation=Operation.CREATE,
client_id=client.id,
client_name=client.name,
message="Client Created",
)
_write_audit_log(session, request, entry, commit)
def audit_delete_client(
session: Session, request: Request, client: Client, commit: bool = True
) -> None:
"""Log the creation of a client."""
entry = AuditLog(
operation=Operation.CREATE,
client_id=client.id,
client_name=client.name,
message="Client deleted",
)
_write_audit_log(session, request, entry, commit)
def audit_create_secret(
session: Session,
request: Request,
client: Client,
secret: ClientSecret,
commit: bool = True,
) -> None:
"""Audit a create secret event."""
entry = AuditLog(
operation=Operation.CREATE,
secret_id=secret.id,
secret_name=secret.name,
client_id=client.id,
client_name=client.name,
message="Added secret to client",
)
_write_audit_log(session, request, entry, commit)
def audit_remove_policy(
session: Session,
request: Request,
client: Client,
policy: ClientAccessPolicy,
commit: bool = True,
) -> None:
"""Audit removal of policy."""
data = {"object": "ClientAccessPolicy", "object_id": str(policy.id)}
entry = AuditLog(
operation=Operation.DELETE,
client_id=client.id,
client_name=client.name,
message="Deleted client policy",
data=data,
)
_write_audit_log(session, request, entry, commit)
def audit_update_policy(
session: Session,
request: Request,
client: Client,
policy: ClientAccessPolicy,
commit: bool = True,
) -> None:
"""Audit update of policy."""
data: dict[str, str] = {"object": "ClientAccessPolicy", "object_id": str(policy.id)}
entry = AuditLog(
operation=Operation.CREATE,
client_name=client.name,
client_id=client.id,
message="Updated client policy",
data=data,
)
_write_audit_log(session, request, entry, commit)
def audit_update_client(
session: Session,
request: Request,
client: Client,
commit: bool = True,
) -> None:
"""Audit an update secret event."""
entry = AuditLog(
operation=Operation.UPDATE,
client_id=client.id,
client_name=client.name,
message="Client data updated",
)
_write_audit_log(session, request, entry, commit)
def audit_update_secret(
session: Session,
request: Request,
client: Client,
secret: ClientSecret,
commit: bool = True,
) -> None:
"""Audit an update secret event."""
entry = AuditLog(
operation=Operation.UPDATE,
client_id=client.id,
client_name=client.name,
secret_name=secret.name,
secret_id=secret.id,
message="Secret value updated",
)
_write_audit_log(session, request, entry, commit)
def audit_invalidate_secrets(
session: Session,
request: Request,
client: Client,
commit: bool = True,
) -> None:
"""Audit Invalidate client secrets."""
entry = AuditLog(
operation=Operation.UPDATE,
client_name=client.name,
client_id=client.id,
message="Client public-key changed. All secrets invalidated.",
)
_write_audit_log(session, request, entry, commit)
def audit_delete_secret(
session: Session,
request: Request,
client: Client,
secret: ClientSecret,
commit: bool = True,
) -> None:
"""Audit Delete client secrets."""
entry = AuditLog(
operation=Operation.DELETE,
secret_name=secret.name,
secret_id=secret.id,
client_name=client.name,
client_id=client.id,
message="Deleted secret.",
)
_write_audit_log(session, request, entry, commit)
def audit_access_secrets(
session: Session,
request: Request,
client: Client,
secrets: Sequence[ClientSecret] | None = None,
commit: bool = True,
) -> None:
"""Audit that multiple secrets were accessed.
With no secrets provided, all secrets of the client will be resolved.
"""
if not secrets:
secrets = session.scalars(
select(ClientSecret).where(ClientSecret.client_id == client.id)
).all()
for secret in secrets:
audit_access_secret(session, request, client, secret, False)
if commit:
session.commit()
def audit_access_secret(
session: Session,
request: Request,
client: Client,
secret: ClientSecret,
commit: bool = True,
) -> None:
"""Audit that someone accessed one secrets."""
entry = AuditLog(
operation=Operation.READ,
message="Secret was viewed",
secret_name=secret.name,
secret_id=secret.id,
client_id=client.id,
client_name=client.name,
)
_write_audit_log(session, request, entry, commit)
def audit_client_secret_list(
session: Session, request: Request, commit: bool = True
) -> None:
"""Audit a list of all secrets."""
entry = AuditLog(
operation=Operation.READ,
message="All secret names and their clients was viewed",
)
_write_audit_log(session, request, entry, commit)