"""Audit methods.""" from collections.abc import Sequence from fastapi import Request from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from .models import ( AuditLog, Client, ClientSecret, ClientAccessPolicy, Operation, SubSystem, ) def _get_origin(request: Request) -> str | None: """Resolve the request origin.""" origin: str | None = None if request.client: origin = request.client.host return origin async def _write_audit_log( session: AsyncSession, request: Request, entry: AuditLog, commit: bool = True ) -> None: """Write the audit log.""" origin = _get_origin(request) entry.origin = origin entry.subsystem = SubSystem.BACKEND session.add(entry) if commit: await session.commit() async def audit_create_client( session: AsyncSession, request: Request, client: Client, commit: bool = True ) -> None: """Log the creation of a client.""" entry = AuditLog( operation=Operation.CREATE, client_id=client.id, client_name=client.name, message="Client Created", ) await _write_audit_log(session, request, entry, commit) async def audit_delete_client( session: AsyncSession, request: Request, client: Client, commit: bool = True ) -> None: """Log the creation of a client.""" entry = AuditLog( operation=Operation.CREATE, client_id=client.id, client_name=client.name, message="Client deleted", ) await _write_audit_log(session, request, entry, commit) async def audit_create_secret( session: AsyncSession, request: Request, client: Client, secret: ClientSecret, commit: bool = True, ) -> None: """Audit a create secret event.""" entry = AuditLog( operation=Operation.CREATE, secret_id=secret.id, secret_name=secret.name, client_id=client.id, client_name=client.name, message="Added secret to client", ) await _write_audit_log(session, request, entry, commit) async def audit_remove_policy( session: AsyncSession, request: Request, client: Client, policy: ClientAccessPolicy, commit: bool = True, ) -> None: """Audit removal of policy.""" data = {"object": "ClientAccessPolicy", "object_id": str(policy.id)} entry = AuditLog( operation=Operation.DELETE, client_id=client.id, client_name=client.name, message="Deleted client policy", data=data, ) await _write_audit_log(session, request, entry, commit) async def audit_update_policy( session: AsyncSession, request: Request, client: Client, policy: ClientAccessPolicy, commit: bool = True, ) -> None: """Audit update of policy.""" data: dict[str, str] = {"object": "ClientAccessPolicy", "object_id": str(policy.id)} entry = AuditLog( operation=Operation.CREATE, client_name=client.name, client_id=client.id, message="Updated client policy", data=data, ) await _write_audit_log(session, request, entry, commit) async def audit_update_client( session: AsyncSession, request: Request, client: Client, commit: bool = True, ) -> None: """Audit an update secret event.""" entry = AuditLog( operation=Operation.UPDATE, client_id=client.id, client_name=client.name, message="Client data updated", ) await _write_audit_log(session, request, entry, commit) async def audit_new_client_version( session: AsyncSession, request: Request, old_client: Client, new_client: Client, commit: bool = True, ) -> None: """Audit an update secret event.""" entry = AuditLog( operation=Operation.UPDATE, client_id=old_client.id, client_name=old_client.name, message="Client data updated", data={ "new_client_id": str(new_client.id), "new_client_version": new_client.version, }, ) await _write_audit_log(session, request, entry, commit) async def audit_update_secret( session: AsyncSession, request: Request, client: Client, secret: ClientSecret, commit: bool = True, ) -> None: """Audit an update secret event.""" entry = AuditLog( operation=Operation.UPDATE, client_id=client.id, client_name=client.name, secret_name=secret.name, secret_id=secret.id, message="Secret value updated", ) await _write_audit_log(session, request, entry, commit) async def audit_invalidate_secrets( session: AsyncSession, request: Request, client: Client, commit: bool = True, ) -> None: """Audit Invalidate client secrets.""" entry = AuditLog( operation=Operation.UPDATE, client_name=client.name, client_id=client.id, message="Client public-key changed. All secrets invalidated.", ) await _write_audit_log(session, request, entry, commit) async def audit_delete_secret( session: AsyncSession, request: Request, client: Client, secret: ClientSecret, commit: bool = True, ) -> None: """Audit Delete client secrets.""" entry = AuditLog( operation=Operation.DELETE, secret_name=secret.name, secret_id=secret.id, client_name=client.name, client_id=client.id, message="Secret removed from client", ) await _write_audit_log(session, request, entry, commit) async def audit_access_secrets( session: AsyncSession, request: Request, client: Client, secrets: Sequence[ClientSecret] | None = None, commit: bool = True, ) -> None: """Audit that multiple secrets were accessed. With no secrets provided, all secrets of the client will be resolved. """ if not secrets: secrets_q = await session.scalars( select(ClientSecret).where(ClientSecret.client_id == client.id) ) secrets = secrets_q.all() for secret in secrets: await audit_access_secret(session, request, client, secret, False) if commit: await session.commit() async def audit_access_secret( session: AsyncSession, request: Request, client: Client, secret: ClientSecret, commit: bool = True, ) -> None: """Audit that someone accessed one secrets.""" entry = AuditLog( operation=Operation.READ, message="Secret was viewed", secret_name=secret.name, secret_id=secret.id, client_id=client.id, client_name=client.name, ) await _write_audit_log(session, request, entry, commit) async def audit_client_secret_list( session: AsyncSession, request: Request, commit: bool = True ) -> None: """Audit a list of all secrets.""" entry = AuditLog( operation=Operation.READ, message="All secret names and their clients was viewed", ) await _write_audit_log(session, request, entry, commit)