125 lines
4.2 KiB
Python
125 lines
4.2 KiB
Python
"""Client-related endpoints factory."""
|
|
|
|
# pyright: reportUnusedFunction=false
|
|
|
|
import logging
|
|
from typing import Annotated
|
|
from fastapi import APIRouter, Depends, HTTPException, status
|
|
|
|
from sshecret.backend import Client
|
|
from sshecret_admin.core.dependencies import AdminDependencies
|
|
from sshecret_admin.services import AdminBackend
|
|
from sshecret_admin.services.models import (
|
|
ClientCreate,
|
|
UpdateKeyModel,
|
|
UpdateKeyResponse,
|
|
UpdatePoliciesRequest,
|
|
)
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
|
|
def create_router(dependencies: AdminDependencies) -> APIRouter:
|
|
"""Create clients router."""
|
|
app = APIRouter(dependencies=[Depends(dependencies.get_current_active_user)])
|
|
|
|
@app.get("/clients/")
|
|
async def get_clients(
|
|
admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)]
|
|
) -> list[Client]:
|
|
"""Get clients."""
|
|
clients = await admin.get_clients()
|
|
return clients
|
|
|
|
@app.post("/clients/")
|
|
async def create_client(
|
|
new_client: ClientCreate,
|
|
admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
|
|
) -> Client:
|
|
"""Create a new client."""
|
|
sources: list[str] | None = None
|
|
if new_client.sources:
|
|
sources = [str(source) for source in new_client.sources]
|
|
client = await admin.create_client(
|
|
new_client.name, new_client.public_key, sources=sources
|
|
)
|
|
return client
|
|
|
|
@app.delete("/clients/{name}")
|
|
async def delete_client(
|
|
name: str,
|
|
admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
|
|
) -> None:
|
|
"""Delete a client."""
|
|
await admin.delete_client(name)
|
|
|
|
@app.delete("/clients/{name}/secrets/{secret_name}")
|
|
async def delete_secret_from_client(
|
|
name: str,
|
|
secret_name: str,
|
|
admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
|
|
) -> None:
|
|
"""Delete a secret from a client."""
|
|
client = await admin.get_client(name)
|
|
if not client:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND, detail="Item not found"
|
|
)
|
|
|
|
if secret_name not in client.secrets:
|
|
LOG.debug("Client does not have requested secret. No action to perform.")
|
|
return None
|
|
|
|
await admin.delete_client_secret(name, secret_name)
|
|
|
|
@app.put("/clients/{name}/policies")
|
|
async def update_client_policies(
|
|
name: str,
|
|
updated: UpdatePoliciesRequest,
|
|
admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
|
|
) -> Client:
|
|
"""Update the client access policies."""
|
|
client = await admin.get_client(name)
|
|
if not client:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND, detail="Item not found"
|
|
)
|
|
|
|
LOG.debug("Old policies: %r. New: %r", client.policies, updated.sources)
|
|
|
|
addresses: list[str] = [str(source) for source in updated.sources]
|
|
await admin.update_client_sources(name, addresses)
|
|
client = await admin.get_client(name)
|
|
|
|
assert client is not None, "Critical: The client disappeared after update!"
|
|
|
|
return client
|
|
|
|
@app.put("/clients/{name}/public-key")
|
|
async def update_client_public_key(
|
|
name: str,
|
|
updated: UpdateKeyModel,
|
|
admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
|
|
) -> UpdateKeyResponse:
|
|
"""Update client public key.
|
|
|
|
Updating the public key will invalidate the current secrets, so these well
|
|
be resolved first, and re-encrypted using the new key.
|
|
"""
|
|
# Let's first ensure that the key is actually updated.
|
|
updated_secrets = await admin.update_client_public_key(name, updated.public_key)
|
|
return UpdateKeyResponse(
|
|
public_key=updated.public_key, updated_secrets=updated_secrets
|
|
)
|
|
|
|
@app.put("/clients/{name}/secrets/{secret_name}")
|
|
async def add_secret_to_client(
|
|
name: str,
|
|
secret_name: str,
|
|
admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
|
|
) -> None:
|
|
"""Add secret to a client."""
|
|
await admin.create_client_secret(name, secret_name)
|
|
|
|
return app
|