diff --git a/packages/sshecret-admin/src/sshecret_admin/services/admin_backend.py b/packages/sshecret-admin/src/sshecret_admin/services/admin_backend.py index d06112b..c329201 100644 --- a/packages/sshecret-admin/src/sshecret_admin/services/admin_backend.py +++ b/packages/sshecret-admin/src/sshecret_admin/services/admin_backend.py @@ -16,7 +16,7 @@ from sshecret.backend import ( Operation, SubSystem, ) -from sshecret.backend.models import DetailedSecrets +from sshecret.backend.models import DetailedSecrets, Secret from sshecret.backend.api import AuditAPI, KeySpec from sshecret.crypto import encrypt_string, load_public_key @@ -274,17 +274,8 @@ class AdminBackend: except Exception as e: raise BackendUnavailableError() from e - async def get_secrets(self) -> list[DetailedSecrets]: - """Get secrets from backend.""" - try: - return await self._get_detailed_secrets() - except ClientManagementError: - raise - except Exception as e: - raise BackendUnavailableError() from e - - async def _get_detailed_secrets(self) -> list[DetailedSecrets]: - """Get detailed secrets. + async def _get_secrets(self) -> list[Secret]: + """Get secrets. This fetches the secret to client mapping from backend, and adds secrets from the password manager. """ @@ -293,12 +284,39 @@ class AdminBackend: secrets = await self.backend.get_secrets() backend_secret_names = [secret.name for secret in secrets] + for secret in all_secrets: + if secret not in backend_secret_names: + secrets.append(Secret(name=secret, clients=[])) + + return secrets + + async def get_secrets(self) -> list[Secret]: + """Get secrets from backend.""" + try: + return await self._get_secrets() + except ClientManagementError: + raise + except Exception as e: + raise BackendUnavailableError() from e + + + async def _get_detailed_secrets(self) -> list[DetailedSecrets]: + """Get detailed secrets. + + This fetches the secret to client mapping from backend, and adds secrets from the password manager. + """ + with self.password_manager() as password_manager: + all_secrets = password_manager.get_available_secrets() + + secrets = await self.backend.get_detailed_secrets() + backend_secret_names = [secret.name for secret in secrets] for secret in all_secrets: if secret not in backend_secret_names: secrets.append(DetailedSecrets(name=secret, ids=[], clients=[])) return secrets + async def get_detailed_secrets(self) -> list[DetailedSecrets]: """Get detailed secrets from backend.""" try: @@ -361,7 +379,7 @@ class AdminBackend: Groups are returned in a tree, unless flat is True. """ - all_secrets = await self.backend.get_secrets() + all_secrets = await self.backend.get_detailed_secrets() secrets_mapping = {secret.name: secret for secret in all_secrets} with self.password_manager() as password_manager: if flat: diff --git a/src/sshecret/backend/api.py b/src/sshecret/backend/api.py index e7ec395..17bc78d 100644 --- a/src/sshecret/backend/api.py +++ b/src/sshecret/backend/api.py @@ -21,6 +21,7 @@ from .models import ( ClientFilter, DetailedSecrets, Operation, + Secret, SubSystem, ) from .exceptions import BackendValidationError, BackendConnectionError @@ -333,7 +334,6 @@ class SshecretBackend(BaseBackend): client = Client.model_validate(response.json()) return client - async def delete_client(self, id_or_name: str) -> None: """Delete a client.""" key = _key(id_or_name) @@ -352,7 +352,9 @@ class SshecretBackend(BaseBackend): path = f"api/v1/clients/{client_key}/secrets/{secret_name}" response = await self._put(path, json={"value": encrypted_secret}) - async def get_client_secret(self, client_idname: KeySpec, secret_idname: KeySpec) -> str | None: + async def get_client_secret( + self, client_idname: KeySpec, secret_idname: KeySpec + ) -> str | None: """Fetch a secret.""" client_key = _key(client_idname) secret_key = _key(secret_idname) @@ -363,7 +365,9 @@ class SshecretBackend(BaseBackend): secret = ClientSecret.model_validate(response.json()) return secret.secret - async def delete_client_secret(self, client_idname: KeySpec, secret_idname: KeySpec) -> None: + async def delete_client_secret( + self, client_idname: KeySpec, secret_idname: KeySpec + ) -> None: """Delete a secret from a client.""" client_key = _key(client_idname) secret_key = _key(secret_idname) @@ -407,14 +411,38 @@ class SshecretBackend(BaseBackend): path = f"/api/v1/clients/{client_key}/policies/" await self._put(path, json={"sources": addresses}) - async def get_secrets(self) -> list[DetailedSecrets]: + async def get_detailed_secrets(self) -> list[DetailedSecrets]: """Get detailed list of secrets.""" - path = "/api/v1/secrets/" + path = "/api/v1/secrets/detailed/" response = await self._get(path) secret_list = TypeAdapter(list[DetailedSecrets]) return secret_list.validate_python(response.json()) + async def get_secrets(self) -> list[Secret]: + """Get detailed list of secrets.""" + path = "/api/v1/secrets/" + response = await self._get(path) + + detailed_secret_list = TypeAdapter(list[DetailedSecrets]) + detailed_secrets = detailed_secret_list.validate_python(response.json()) + # Convert to list of secrets + secrets: list[Secret] = [] + for detail in detailed_secrets: + clients = [ref.name for ref in detail.clients] + secrets.append(Secret(name=detail.name, clients=clients)) + + return secrets + + async def get_detailed_secret(self, name: str) -> DetailedSecrets | None: + """Get clients mapped to a single secret.""" + path = f"/api/v1/secrets/{name}/detailed" + response = await self._get(path) + if response.status_code == 404: + return None + + return DetailedSecrets.model_validate(response.json()) + async def get_secret(self, idname: KeySpec) -> DetailedSecrets | None: """Get clients mapped to a single secret.""" secret_key = _key(idname) @@ -425,7 +453,6 @@ class SshecretBackend(BaseBackend): return DetailedSecrets.model_validate(response.json()) - def audit(self, subsystem: SubSystem) -> AuditAPI: """Create the audit API.""" audit = AuditAPI(self._backend_url, self._api_token, subsystem)