"""Podman Shelldriver compatible commands.""" import logging from typing import final, override import asyncssh from sshecret.backend.models import Operation from sshecret.crypto import encrypt_string, load_public_key from .base import CommandDispatcher LOG = logging.getLogger(__name__) # These error messages are taken verbatim from podman, and while they don't seem # to make complete sense, they will be used regardless. ERR_SECRET_NOT_FOUND = "no such secret" ERR_SECRET_EXISTS = "secret data with ID already exists" ERR_INVALID_SECRET = "invalid key" @final class ShellListSecrets(CommandDispatcher): """List secrets. This command lists secrets in a format compatible with podman's ShellDriver. """ name = "list" @override async def exec(self) -> None: """List secrets.""" LOG.debug("ShellListSecret called.") await self.audit(Operation.READ, "Listed available secret names") for secret_name in self.client.secrets: self.print(secret_name) @final class ShellDeleteSecret(CommandDispatcher): """Delete a secret. If the identifier for a secret does not exist, an error will be printed. """ name = "delete" mandatory_argument = "KEY" @override async def exec(self) -> None: """Delete a secret.""" secret_name = self.arguments[0] LOG.debug("ShellDeleteSecret called withg arguments %r.", self.arguments) await self.audit( operation=Operation.DELETE, message="ClientSecret deleted", secret=secret_name, ) await self.backend.delete_client_secret( ("id", str(self.client.id)), ("name", secret_name) ) @final class ShellLookupSecret(CommandDispatcher): """Look up a secret. The identifier for the secret must be provided as the argument. """ name = "lookup" mandatory_argument = "KEY" @override async def exec(self) -> None: """Lookup secret.""" LOG.debug("ShellLookupSecret called with arguments %r", self.arguments) secret_name = self.arguments[0] secret = await self.backend.get_client_secret( ("id", str(self.client.id)), secret_name ) if not secret: LOG.debug( "Secret %s not found for client %s (%s)", secret_name, self.client.id, self.client.name, ) self.print(ERR_SECRET_NOT_FOUND, stderr=True) return await self.audit( Operation.READ, message="Client requested secret", secret=secret_name ) self.print(secret) @final class ShellStoreSecret(CommandDispatcher): """Store a secret. Secret will be read from command argument, or via STDIN. """ name = "store" mandatory_argument = "KEY" @override async def exec(self) -> None: """Store a secret.""" LOG.debug("ShellStoreSecret called with arguments %r", self.arguments) secret_name = self.arguments[0] if secret_name in self.client.secrets: self.print(ERR_SECRET_EXISTS, stderr=True) return secret_data: str | None = None if len(self.arguments) == 2: secret_data = self.arguments[1] if not secret_data: LOG.debug("No secret set as input, trying stdin.") secret_data = await self.get_secret_on_stdin() if not secret_data: self.print(ERR_INVALID_SECRET, stderr=True) return # Encrypt secret encrypted = self.encrypt_secret(secret_data) await self.backend.create_client_secret( ("id", str(self.client.id)), secret_name, encrypted ) await self.audit( operation=Operation.CREATE, message="Secret created from 'store' command", secret=secret_name, ) await self.store_managed_secret(secret_name, secret_data) def encrypt_secret(self, value: str) -> str: """Encrypt a secret.""" public_key = load_public_key(self.client.public_key.encode()) return encrypt_string(value, public_key) async def store_managed_secret(self, secret_name: str, secret_data: str) -> None: """Store managed secret.""" system_client = await self.backend.get_system_client() if not system_client: return public_key = load_public_key(system_client.public_key.encode()) encrypted = encrypt_string(secret_data, public_key) await self.backend.create_client_secret(("id", str(system_client.id)), secret_name, encrypted) await self.audit(operation=Operation.CREATE, message="Managed secret entry created.", secret=secret_name) async def get_secret_on_stdin(self) -> str | None: """Get secret from stdin.""" secret_data = "" try: async for line in self.process.stdin: if self.process.stdin.at_eof(): break if not line: break secret_data += line.rstrip() except asyncssh.BreakReceived: pass if not secret_data: return None return secret_data