From 71d877022b838c0fa28557143197d599a82f6cb9 Mon Sep 17 00:00:00 2001 From: Allan Eising Date: Mon, 9 Jun 2025 14:14:53 +0200 Subject: [PATCH] Implement same ID type as backend API --- .../sshecret_admin/frontend/views/clients.py | 4 +- .../sshecret_admin/services/admin_backend.py | 53 +++++++++---------- 2 files changed, 28 insertions(+), 29 deletions(-) diff --git a/packages/sshecret-admin/src/sshecret_admin/frontend/views/clients.py b/packages/sshecret-admin/src/sshecret_admin/frontend/views/clients.py index c4ea06e..630fb48 100644 --- a/packages/sshecret-admin/src/sshecret_admin/frontend/views/clients.py +++ b/packages/sshecret-admin/src/sshecret_admin/frontend/views/clients.py @@ -88,7 +88,7 @@ def create_router(dependencies: FrontendDependencies) -> APIRouter: client: Annotated[ClientUpdate, Form()], ): """Update a client.""" - original_client = await admin.get_client(id) + original_client = await admin.get_client(("id", id)) if not original_client: return templates.TemplateResponse( request, "fragments/error.html", {"message": "Client not found"} @@ -131,7 +131,7 @@ def create_router(dependencies: FrontendDependencies) -> APIRouter: admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)], ) -> Response: """Delete a client.""" - await admin.delete_client(id) + await admin.delete_client(("id", id)) clients = await admin.get_clients() headers = {"Hx-Refresh": "true"} return templates.TemplateResponse( diff --git a/packages/sshecret-admin/src/sshecret_admin/services/admin_backend.py b/packages/sshecret-admin/src/sshecret_admin/services/admin_backend.py index c329201..5a21880 100644 --- a/packages/sshecret-admin/src/sshecret_admin/services/admin_backend.py +++ b/packages/sshecret-admin/src/sshecret_admin/services/admin_backend.py @@ -132,13 +132,10 @@ class AdminBackend: except Exception as e: raise BackendUnavailableError() from e - async def get_client(self, name: str, is_id: bool = False) -> Client | None: + async def get_client(self, name: KeySpec) -> Client | None: """Get a client from the backend.""" - key = name - if is_id: - key = ("id", name) try: - return await self._get_client(key) + return await self._get_client(name) except ClientManagementError: raise except Exception as e: @@ -178,20 +175,19 @@ class AdminBackend: raise BackendUnavailableError() from e async def _update_client_public_key( - self, name: str, new_key: str, password_manager: PasswordContext, is_id: bool = False, + self, + name: KeySpec, + new_key: str, + password_manager: PasswordContext, ) -> list[str]: """Update client public key.""" LOG.info( "Updating client %s public key. This will invalidate all existing secrets." ) - client = await self.get_client(name, is_id=is_id) + client = await self.get_client(name) if not client: raise ClientNotFoundError() - idname: KeySpec = name - if is_id: - idname = ("id", name) - - await self.backend.update_client_key(idname, new_key) + await self.backend.update_client_key(name, new_key) updated_secrets: list[str] = [] for secret in client.secrets: LOG.debug("Re-encrypting secret %s for client %s", secret, name) @@ -204,17 +200,17 @@ class AdminBackend: rsa_public_key = load_public_key(client.public_key.encode()) encrypted = encrypt_string(secret_value, rsa_public_key) LOG.debug("Sending new encrypted value to backend.") - await self.backend.create_client_secret(idname, secret, encrypted) + await self.backend.create_client_secret(name, secret, encrypted) updated_secrets.append(secret) return updated_secrets - async def update_client_public_key(self, name: str, new_key: str, is_id: bool = False) -> list[str]: + async def update_client_public_key(self, name: KeySpec, new_key: str) -> list[str]: """Update client public key.""" try: with self.password_manager() as password_manager: return await self._update_client_public_key( - name, new_key, password_manager, is_id=is_id + name, new_key, password_manager ) except ClientManagementError: raise @@ -244,21 +240,18 @@ class AdminBackend: except Exception as e: raise BackendUnavailableError() from e - async def update_client_sources(self, name: str, sources: list[str], is_id: bool = False) -> None: + async def update_client_sources(self, name: KeySpec, sources: list[str]) -> None: """Update client sources.""" - key: KeySpec = name - if is_id: - key = ("id", name) try: - await self.backend.update_client_sources(key, sources) + await self.backend.update_client_sources(name, sources) except Exception as e: raise BackendUnavailableError() from e - async def _delete_client(self, name: str) -> None: + async def _delete_client(self, name: KeySpec) -> None: """Delete client.""" await self.backend.delete_client(name) - async def delete_client(self, name: str) -> None: + async def delete_client(self, name: KeySpec) -> None: """Delete client.""" try: await self._delete_client(name) @@ -267,7 +260,9 @@ class AdminBackend: except Exception as e: raise BackendUnavailableError() from e - async def delete_client_secret(self, client_name: str, secret_name: str) -> None: + async def delete_client_secret( + self, client_name: KeySpec, secret_name: KeySpec + ) -> None: """Delete a secret from a client.""" try: await self.backend.delete_client_secret(client_name, secret_name) @@ -299,7 +294,6 @@ class AdminBackend: except Exception as e: raise BackendUnavailableError() from e - async def _get_detailed_secrets(self) -> list[DetailedSecrets]: """Get detailed secrets. @@ -316,7 +310,6 @@ class AdminBackend: return secrets - async def get_detailed_secrets(self) -> list[DetailedSecrets]: """Get detailed secrets from backend.""" try: @@ -427,7 +420,9 @@ class AdminBackend: except Exception as e: raise BackendUnavailableError() from e - async def _get_secret(self, name: str) -> SecretView | None: + async def _get_secret( + self, name: str, secret_id: str | None = None + ) -> SecretView | None: """Get a secret, including the actual unencrypted value and clients.""" with self.password_manager() as password_manager: secret = password_manager.get_secret(name) @@ -436,7 +431,11 @@ class AdminBackend: if not secret: return None secret_view = SecretView(name=name, secret=secret, group=secret_group) - secret_mapping = await self.backend.get_secret(name) + idname: KeySpec = name + if secret_id: + idname = ("id", secret_id) + + secret_mapping = await self.backend.get_secret(idname) if secret_mapping: secret_view.clients = [ref.name for ref in secret_mapping.clients]