Files
sshecret/packages/sshecret-backend/src/sshecret_backend/models.py

136 lines
3.8 KiB
Python

"""Database models.
TODO:
We might want to pass on audit information from the SSH server.
This might require some changes to these schemas.
"""
import logging
import uuid
from datetime import datetime
import sqlalchemy as sa
from sqlmodel import Field, Relationship, SQLModel
LOG = logging.getLogger(__name__)
class Client(SQLModel, table=True):
"""Client model."""
id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True)
name: str = Field(unique=True)
description: str | None = None
public_key: str
created_at: datetime | None = Field(
default=None,
sa_type=sa.DateTime(timezone=True),
sa_column_kwargs={"server_default": sa.func.now()},
nullable=False,
)
updated_at: datetime | None = Field(
default=None,
sa_type=sa.DateTime(timezone=True),
sa_column_kwargs={"onupdate": sa.func.now(), "server_default": sa.func.now()},
)
secrets: list["ClientSecret"] = Relationship(
back_populates="client", passive_deletes="all"
)
policies: list["ClientAccessPolicy"] = Relationship(back_populates="client")
class ClientAccessPolicy(SQLModel, table=True):
"""Client access policies."""
id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True)
source: str
client_id: uuid.UUID | None = Field(foreign_key="client.id", ondelete="CASCADE")
client: Client | None = Relationship(back_populates="policies")
created_at: datetime | None = Field(
default=None,
sa_type=sa.DateTime(timezone=True),
sa_column_kwargs={"server_default": sa.func.now()},
nullable=False,
)
updated_at: datetime | None = Field(
default=None,
sa_type=sa.DateTime(timezone=True),
sa_column_kwargs={"onupdate": sa.func.now(), "server_default": sa.func.now()},
)
class ClientSecret(SQLModel, table=True):
"""A client secret."""
id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True)
name: str
description: str | None = None
client_id: uuid.UUID | None = Field(foreign_key="client.id", ondelete="CASCADE")
client: Client | None = Relationship(back_populates="secrets")
secret: str
invalidated: bool = Field(default=False)
created_at: datetime | None = Field(
default=None,
sa_type=sa.DateTime(timezone=True),
sa_column_kwargs={"server_default": sa.func.now()},
nullable=False,
)
updated_at: datetime | None = Field(
default=None,
sa_type=sa.DateTime(timezone=True),
sa_column_kwargs={"onupdate": sa.func.now(), "server_default": sa.func.now()},
)
class AuditLog(SQLModel, table=True):
"""Audit log.
This is implemented without any foreign keys to avoid losing data on
deletions.
"""
id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True)
subsystem: str | None = None
object: str | None = None
object_id: str | None = None
operation: str
client_id: uuid.UUID | None = None
client_name: str | None = None
message: str
origin: str | None = None
timestamp: datetime | None = Field(
default=None,
sa_type=sa.DateTime(timezone=True),
sa_column_kwargs={"server_default": sa.func.now()},
nullable=False,
)
class APIClient(SQLModel, table=True):
"""Stores API Keys."""
id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True)
token: str
read_write: bool
created_at: datetime | None = Field(
default=None,
sa_type=sa.DateTime(timezone=True),
sa_column_kwargs={"server_default": sa.func.now()},
nullable=False,
)
def init_db(engine: sa.Engine) -> None:
"""Create database."""
LOG.info("Starting init_db")
SQLModel.metadata.create_all(engine)