Keep distinction between Secret and DetailedSecret

This commit is contained in:
2025-06-09 09:11:49 +02:00
parent d86d9a9256
commit fed441743e
2 changed files with 64 additions and 19 deletions

View File

@ -16,7 +16,7 @@ from sshecret.backend import (
Operation,
SubSystem,
)
from sshecret.backend.models import DetailedSecrets
from sshecret.backend.models import DetailedSecrets, Secret
from sshecret.backend.api import AuditAPI, KeySpec
from sshecret.crypto import encrypt_string, load_public_key
@ -274,17 +274,8 @@ class AdminBackend:
except Exception as e:
raise BackendUnavailableError() from e
async def get_secrets(self) -> list[DetailedSecrets]:
"""Get secrets from backend."""
try:
return await self._get_detailed_secrets()
except ClientManagementError:
raise
except Exception as e:
raise BackendUnavailableError() from e
async def _get_detailed_secrets(self) -> list[DetailedSecrets]:
"""Get detailed secrets.
async def _get_secrets(self) -> list[Secret]:
"""Get secrets.
This fetches the secret to client mapping from backend, and adds secrets from the password manager.
"""
@ -293,12 +284,39 @@ class AdminBackend:
secrets = await self.backend.get_secrets()
backend_secret_names = [secret.name for secret in secrets]
for secret in all_secrets:
if secret not in backend_secret_names:
secrets.append(Secret(name=secret, clients=[]))
return secrets
async def get_secrets(self) -> list[Secret]:
"""Get secrets from backend."""
try:
return await self._get_secrets()
except ClientManagementError:
raise
except Exception as e:
raise BackendUnavailableError() from e
async def _get_detailed_secrets(self) -> list[DetailedSecrets]:
"""Get detailed secrets.
This fetches the secret to client mapping from backend, and adds secrets from the password manager.
"""
with self.password_manager() as password_manager:
all_secrets = password_manager.get_available_secrets()
secrets = await self.backend.get_detailed_secrets()
backend_secret_names = [secret.name for secret in secrets]
for secret in all_secrets:
if secret not in backend_secret_names:
secrets.append(DetailedSecrets(name=secret, ids=[], clients=[]))
return secrets
async def get_detailed_secrets(self) -> list[DetailedSecrets]:
"""Get detailed secrets from backend."""
try:
@ -361,7 +379,7 @@ class AdminBackend:
Groups are returned in a tree, unless flat is True.
"""
all_secrets = await self.backend.get_secrets()
all_secrets = await self.backend.get_detailed_secrets()
secrets_mapping = {secret.name: secret for secret in all_secrets}
with self.password_manager() as password_manager:
if flat:

View File

@ -21,6 +21,7 @@ from .models import (
ClientFilter,
DetailedSecrets,
Operation,
Secret,
SubSystem,
)
from .exceptions import BackendValidationError, BackendConnectionError
@ -333,7 +334,6 @@ class SshecretBackend(BaseBackend):
client = Client.model_validate(response.json())
return client
async def delete_client(self, id_or_name: str) -> None:
"""Delete a client."""
key = _key(id_or_name)
@ -352,7 +352,9 @@ class SshecretBackend(BaseBackend):
path = f"api/v1/clients/{client_key}/secrets/{secret_name}"
response = await self._put(path, json={"value": encrypted_secret})
async def get_client_secret(self, client_idname: KeySpec, secret_idname: KeySpec) -> str | None:
async def get_client_secret(
self, client_idname: KeySpec, secret_idname: KeySpec
) -> str | None:
"""Fetch a secret."""
client_key = _key(client_idname)
secret_key = _key(secret_idname)
@ -363,7 +365,9 @@ class SshecretBackend(BaseBackend):
secret = ClientSecret.model_validate(response.json())
return secret.secret
async def delete_client_secret(self, client_idname: KeySpec, secret_idname: KeySpec) -> None:
async def delete_client_secret(
self, client_idname: KeySpec, secret_idname: KeySpec
) -> None:
"""Delete a secret from a client."""
client_key = _key(client_idname)
secret_key = _key(secret_idname)
@ -407,14 +411,38 @@ class SshecretBackend(BaseBackend):
path = f"/api/v1/clients/{client_key}/policies/"
await self._put(path, json={"sources": addresses})
async def get_secrets(self) -> list[DetailedSecrets]:
async def get_detailed_secrets(self) -> list[DetailedSecrets]:
"""Get detailed list of secrets."""
path = "/api/v1/secrets/"
path = "/api/v1/secrets/detailed/"
response = await self._get(path)
secret_list = TypeAdapter(list[DetailedSecrets])
return secret_list.validate_python(response.json())
async def get_secrets(self) -> list[Secret]:
"""Get detailed list of secrets."""
path = "/api/v1/secrets/"
response = await self._get(path)
detailed_secret_list = TypeAdapter(list[DetailedSecrets])
detailed_secrets = detailed_secret_list.validate_python(response.json())
# Convert to list of secrets
secrets: list[Secret] = []
for detail in detailed_secrets:
clients = [ref.name for ref in detail.clients]
secrets.append(Secret(name=detail.name, clients=clients))
return secrets
async def get_detailed_secret(self, name: str) -> DetailedSecrets | None:
"""Get clients mapped to a single secret."""
path = f"/api/v1/secrets/{name}/detailed"
response = await self._get(path)
if response.status_code == 404:
return None
return DetailedSecrets.model_validate(response.json())
async def get_secret(self, idname: KeySpec) -> DetailedSecrets | None:
"""Get clients mapped to a single secret."""
secret_key = _key(idname)
@ -425,7 +453,6 @@ class SshecretBackend(BaseBackend):
return DetailedSecrets.model_validate(response.json())
def audit(self, subsystem: SubSystem) -> AuditAPI:
"""Create the audit API."""
audit = AuditAPI(self._backend_url, self._api_token, subsystem)