64 lines
1.9 KiB
Python
64 lines
1.9 KiB
Python
"""Get secret."""
|
|
|
|
import logging
|
|
from typing import final, override
|
|
|
|
from sshecret.backend.models import Operation
|
|
from sshecret_sshd import exceptions
|
|
from .base import CommandDispatcher
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
|
|
@final
|
|
class GetSecret(CommandDispatcher):
|
|
"""Retrieve an encrypted secret.
|
|
|
|
Returns the value of the secret provided as a mandatory argument.
|
|
The secret will be encrypted using the stored RSA public key, and returned
|
|
as a base64 encoded string.
|
|
"""
|
|
|
|
name = "get_secret"
|
|
mandatory_argument = "SECRET"
|
|
|
|
@override
|
|
async def exec(self) -> None:
|
|
"""Execute command."""
|
|
if len(self.arguments) != 1:
|
|
raise exceptions.UnknownClientOrSecretError()
|
|
secret_name = self.arguments[0]
|
|
LOG.debug("get_secret called: Argument: %r", secret_name)
|
|
if secret_name not in self.client.secrets:
|
|
await self.audit(
|
|
Operation.DENY,
|
|
message="Client requested invalid secret",
|
|
secret=secret_name,
|
|
)
|
|
raise exceptions.SecretNotFoundError()
|
|
try:
|
|
secret = await self.backend.get_client_secret(self.client.name, secret_name)
|
|
except Exception as exc:
|
|
LOG.error(
|
|
"Got exception while getting client %s secret %s: %s",
|
|
self.client.name,
|
|
secret_name,
|
|
exc,
|
|
exc_info=True,
|
|
)
|
|
raise exceptions.BackendError(backend_error=str(exc)) from exc
|
|
|
|
if not secret:
|
|
await self.audit(
|
|
Operation.DENY,
|
|
message="Client requested invalid secret",
|
|
secret=secret_name,
|
|
)
|
|
|
|
raise exceptions.SecretNotFoundError()
|
|
|
|
await self.audit(
|
|
Operation.READ, message="Client requested secret", secret=secret_name
|
|
)
|
|
self.print(secret, newline=False)
|