Support unmanaged secrets
This commit is contained in:
@ -26,6 +26,7 @@ from .models import (
|
||||
ClientSecretGroup,
|
||||
ClientSecretGroupList,
|
||||
SecretClientMapping,
|
||||
SecretListView,
|
||||
SecretGroup,
|
||||
SecretView,
|
||||
)
|
||||
@ -269,23 +270,32 @@ class AdminBackend:
|
||||
except Exception as e:
|
||||
raise BackendUnavailableError() from e
|
||||
|
||||
async def _get_secrets(self) -> list[Secret]:
|
||||
async def _get_secrets(self) -> list[SecretListView]:
|
||||
"""Get secrets.
|
||||
|
||||
This fetches the secret to client mapping from backend, and adds secrets from the password manager.
|
||||
"""
|
||||
backend_secrets = await self.backend.get_secrets()
|
||||
with self.password_manager() as password_manager:
|
||||
all_secrets = password_manager.get_available_secrets()
|
||||
admin_secrets = password_manager.get_available_secrets()
|
||||
|
||||
secrets = await self.backend.get_secrets()
|
||||
backend_secret_names = [secret.name for secret in secrets]
|
||||
for secret in all_secrets:
|
||||
if secret not in backend_secret_names:
|
||||
secrets.append(Secret(name=secret, clients=[]))
|
||||
secrets: dict[str, SecretListView] = {}
|
||||
for secret in backend_secrets:
|
||||
secrets[secret.name] = SecretListView(
|
||||
name=secret.name, unmanaged=True, clients=secret.clients
|
||||
)
|
||||
|
||||
return secrets
|
||||
for secret_name in admin_secrets:
|
||||
if secret_name in secrets:
|
||||
secrets[secret_name].unmanaged = False
|
||||
continue
|
||||
secrets[secret_name] = SecretListView(
|
||||
name=secret_name, unmanaged=False, clients=[]
|
||||
)
|
||||
|
||||
async def get_secrets(self) -> list[Secret]:
|
||||
return list(secrets.values())
|
||||
|
||||
async def get_secrets(self) -> list[SecretListView]:
|
||||
"""Get secrets from backend."""
|
||||
try:
|
||||
return await self._get_secrets()
|
||||
@ -385,6 +395,8 @@ class AdminBackend:
|
||||
)
|
||||
ungrouped = password_manager.get_ungrouped_secrets()
|
||||
|
||||
all_admin_secrets = password_manager.get_available_secrets()
|
||||
|
||||
group_result: list[ClientSecretGroup] = []
|
||||
for group in all_groups:
|
||||
# We have to do this recursively.
|
||||
@ -401,7 +413,19 @@ class AdminBackend:
|
||||
mapping.clients = client_mapping.clients
|
||||
ungrouped_clients.append(mapping)
|
||||
|
||||
# We need to process unmanaged secrets too.
|
||||
unmanaged_secrets = [
|
||||
secret for secret in all_secrets if secret.name not in all_admin_secrets
|
||||
]
|
||||
for secret in unmanaged_secrets:
|
||||
ungrouped_clients.append(
|
||||
SecretClientMapping(
|
||||
name=secret.name, unmanaged=True, clients=secret.clients
|
||||
)
|
||||
)
|
||||
|
||||
result.ungrouped = ungrouped_clients
|
||||
|
||||
return result
|
||||
|
||||
async def get_secret_group(self, name: str) -> ClientSecretGroup | None:
|
||||
@ -436,13 +460,13 @@ class AdminBackend:
|
||||
self, name: str, secret_id: str | None = None
|
||||
) -> SecretView | None:
|
||||
"""Get a secret, including the actual unencrypted value and clients."""
|
||||
secret: str | None = None
|
||||
with self.password_manager() as password_manager:
|
||||
secret = password_manager.get_secret(name)
|
||||
secret_group = password_manager.get_entry_group(name)
|
||||
|
||||
if not secret:
|
||||
return None
|
||||
secret_view = SecretView(name=name, secret=secret, group=secret_group)
|
||||
|
||||
idname: KeySpec = name
|
||||
if secret_id:
|
||||
idname = ("id", secret_id)
|
||||
@ -529,11 +553,13 @@ class AdminBackend:
|
||||
except Exception as e:
|
||||
raise BackendUnavailableError() from e
|
||||
|
||||
async def _create_client_secret(self, client_name: str, secret_name: str) -> None:
|
||||
async def _create_client_secret(
|
||||
self, client_idname: KeySpec, secret_name: str
|
||||
) -> None:
|
||||
"""Create client secret."""
|
||||
client = await self.get_client(client_name)
|
||||
client = await self.get_client(client_idname)
|
||||
if not client:
|
||||
raise ClientNotFoundError()
|
||||
raise ClientNotFoundError(client_idname)
|
||||
|
||||
with self.password_manager() as password_manager:
|
||||
secret = password_manager.get_secret(secret_name)
|
||||
@ -542,12 +568,14 @@ class AdminBackend:
|
||||
|
||||
public_key = load_public_key(client.public_key.encode())
|
||||
encrypted = encrypt_string(secret, public_key)
|
||||
await self.backend.create_client_secret(client_name, secret_name, encrypted)
|
||||
await self.backend.create_client_secret(client_idname, secret_name, encrypted)
|
||||
|
||||
async def create_client_secret(self, client_name: str, secret_name: str) -> None:
|
||||
async def create_client_secret(
|
||||
self, client_idname: KeySpec, secret_name: str
|
||||
) -> None:
|
||||
"""Create client secret."""
|
||||
try:
|
||||
await self._create_client_secret(client_name, secret_name)
|
||||
await self._create_client_secret(client_idname, secret_name)
|
||||
except ClientManagementError:
|
||||
raise
|
||||
except Exception as e:
|
||||
|
||||
@ -25,6 +25,7 @@ class SecretListView(BaseModel):
|
||||
"""Model containing a list of all available secrets."""
|
||||
|
||||
name: str
|
||||
unmanaged: bool = False
|
||||
clients: list[str] = Field(default_factory=list) # Clients that have access to it.
|
||||
|
||||
|
||||
@ -32,7 +33,7 @@ class SecretView(BaseModel):
|
||||
"""Model containing a secret, including its clear-text value."""
|
||||
|
||||
name: str
|
||||
secret: str
|
||||
secret: str | None
|
||||
group: str | None = None
|
||||
clients: list[str] = Field(default_factory=list) # Clients that have access to it.
|
||||
|
||||
@ -134,6 +135,7 @@ class SecretClientMapping(BaseModel):
|
||||
"""Secret name with list of clients."""
|
||||
|
||||
name: str # name of secret
|
||||
unmanaged: bool = False
|
||||
clients: list[ClientReference] = Field(default_factory=list)
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user