Initial commit

This commit is contained in:
2025-03-17 21:42:56 +01:00
commit bbf2d0b280
30 changed files with 1518 additions and 0 deletions

10
.gitignore vendored Normal file
View File

@ -0,0 +1,10 @@
# Python-generated files
__pycache__/
*.py[oc]
build/
dist/
wheels/
*.egg-info
# Virtual environments
.venv

1
.python-version Normal file
View File

@ -0,0 +1 @@
3.13

59
README.md Normal file
View File

@ -0,0 +1,59 @@
# Sshecret - Openssh based secrets management
## Motivation
There are many approaches to managing secrets for services, but a lot of these
either assume you have one of the industry-standard systems like hashicorp vault to manage them centrally.
For enthusiasts or homelabbers this becomes overkill quickly, and end up
consuming a lot more time and energy than what feels justified.
This system has been created to provide a centralized solution that works well-enough.
## Components
This system has been designed with modularity and extensibility in mind. It has the following building blocks:
- Password database
- Password input handler
- Encryption and key management
- Client secret storage backend
- Custom ssh server
### Password database
Currently a single password database is implemented: Keepass.
Sshecret can create a database, and store your secrets in it.
It only uses a master password for protection, so you are responsible for
securing the password database file. In theory, the password database file can
be disconnected after encrypting the passwords for the clients, and these two
components may be disconnected.
### Password input handler
Passwords can be randomly generated, they can be read from stdin, or from environment variables.
Other methods can be implemented in the future.
### Client secret storage backend
So far only a simple JSON file based backend has been implemented. It stores one file per client.
The interface is flexible, and can be extended to databases or anything else really.
### Custom SSH server
A custom SSH based on paramiko is included. This is how the clients receive the encrypted password.
The client must send a single command over the SSH session equal to the name of the secret.
If permitted to access the secret, it will returned encrypted with the client RSA public key of the client, encoded as base64.
This allows the client to decrypt and get the clear text value easily.
## Usage
# Next step
## Rewrite encryption to use age
The RSA implementation works alright, but requires some work on the client side converting back to a readable format.
Age seem better suited, as it can also use ed25519 keys.
## Dedicated client?
If `age` works out, it may be entirely unnecessary to have a dedicated client. Who knows...

2
src/sshecret/__init__.py Normal file
View File

@ -0,0 +1,2 @@
def hello() -> str:
return "Hello from sshecret!"

3
src/sshecret/cli.py Normal file
View File

@ -0,0 +1,3 @@
"""Command Line Interface"""
import click

13
src/sshecret/constants.py Normal file
View File

@ -0,0 +1,13 @@
"""Constants."""
MASTER_PASSWORD = "MASTER_PASSWORD"
NO_USERNAME = "NO_USERNAME"
VAR_PREFIX = "SSHECRET"
ERROR_NO_SECRET_FOUND = "Error: No secret available with the given name."
ERROR_UKNOWN_CLIENT_OR_SECRET = "Error: Invalid client or secret name."
ERROR_NO_COMMAND_RECEIVED = "Error: No command was received from the client."
ERROR_SOURCE_IP_NOT_ALLOWED = "Error: Client not authorized to connect from the given host."
RSA_PUBLIC_EXPONENT = 65537
RSA_KEY_SIZE = 2048

86
src/sshecret/crypto.py Normal file
View File

@ -0,0 +1,86 @@
"""Encryption related functions."""
import base64
import logging
from pathlib import Path
from cryptography.hazmat.primitives import serialization, hashes
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.asymmetric import padding
from .types import ClientSpecification
from . import constants
LOG = logging.getLogger(__name__)
def load_client_key(client: ClientSpecification) -> rsa.RSAPublicKey:
"""Load public key."""
keybytes = client.public_key.encode()
return load_public_key(keybytes)
def load_public_key(keybytes: bytes) -> rsa.RSAPublicKey:
public_key = serialization.load_ssh_public_key(keybytes)
if not isinstance(public_key, rsa.RSAPublicKey):
raise RuntimeError("Only RSA keys are supported.")
return public_key
def load_private_key(filename: str) -> rsa.RSAPrivateKey:
"""Load a private key."""
with open(filename, "rb") as f:
private_key = serialization.load_ssh_private_key(f.read(), password=None)
if not isinstance(private_key, rsa.RSAPrivateKey):
raise RuntimeError("Only RSA keys are supported.")
return private_key
def encrypt_string(string: str, public_key: rsa.RSAPublicKey) -> str:
"""Encrypt string, end return it base64 encoded."""
message = string.encode()
ciphertext = public_key.encrypt(
message,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None,
)
)
return base64.b64encode(ciphertext).decode()
def decode_string(ciphertext: str, private_key: rsa.RSAPrivateKey) -> str:
"""Decode a string. String must be base64 encoded."""
decoded = base64.b64decode(ciphertext)
decrypted = private_key.decrypt(
decoded,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
))
return decrypted.decode()
def generate_private_key() -> rsa.RSAPrivateKey:
"""Generate private RSA key."""
private_key = rsa.generate_private_key(public_exponent=constants.RSA_PUBLIC_EXPONENT, key_size=constants.RSA_KEY_SIZE)
return private_key
def generate_pem(private_key: rsa.RSAPrivateKey) -> str:
"""Generate PEM."""
pem = private_key.private_bytes(encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.OpenSSH, encryption_algorithm=serialization.NoEncryption())
return pem.decode()
def create_private_rsa_key(filename: Path) -> None:
"""Create an RSA Private key at the given path."""
if filename.exists():
raise RuntimeError("Error: private key file already exists.")
LOG.debug("Generating private RSA key at %s", filename)
private_key = generate_private_key()
with open(filename, "wb") as f:
pem = private_key.private_bytes(encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.OpenSSH, encryption_algorithm=serialization.NoEncryption())
lines = f.write(pem)
LOG.debug("Wrote %s lines", lines)
f.flush()
def generate_public_key_string(public_key: rsa.RSAPublicKey) -> str:
"""Generate public key string."""
keybytes = public_key.public_bytes(encoding=serialization.Encoding.OpenSSH, format=serialization.PublicFormat.OpenSSH)
return keybytes.decode()

66
src/sshecret/dev_cli.py Normal file
View File

@ -0,0 +1,66 @@
"""Development CLI commands."""
import click
import logging
import tempfile
import threading
from pathlib import Path
from .server import SshKeyServer
from .server.client_loader import FileTableBackend
from .utils import create_client_file, add_secret_to_client_file
def thread_id_filter(record: logging.LogRecord) -> logging.LogRecord:
"""Resolve thread id."""
record.thread_id = threading.get_native_id()
return record
LOG = logging.getLogger()
handler = logging.StreamHandler()
handler.addFilter(thread_id_filter)
formatter = logging.Formatter("%(thread_id)d:%(created)f:%(levelname)s:%(name)s:%(module)s:%(message)s")
handler.setFormatter(formatter)
LOG.addHandler(handler)
LOG.setLevel(logging.DEBUG)
@click.group()
def cli() -> None:
"""Run commands for testing."""
@cli.command("create-client")
@click.argument("name")
@click.argument("filename", type=click.Path(file_okay=True, dir_okay=False, writable=True))
@click.option("--public-key", type=click.Path(file_okay=True))
def create_client(name: str, filename: str, public_key: str | None) -> None:
"""Create a client."""
create_client_file(name, filename, keyfile=public_key)
click.echo(f"Wrote client config to {filename}")
@cli.command("add-secret")
@click.argument("filename", type=click.Path(file_okay=True, dir_okay=False, writable=True))
@click.argument("secret-name")
@click.argument("secret-value")
def add_secret(filename: str, secret_name: str, secret_value: str) -> None:
"""Add secret to client file."""
add_secret_to_client_file(filename, secret_name, secret_value)
click.echo(f"Wrote secret to {filename}")
@cli.command("server")
@click.argument("directory", type=click.Path(file_okay=False, dir_okay=True))
@click.argument("port", type=click.INT)
def run_server(directory: str, port: int) -> None:
"""Run server."""
with tempfile.TemporaryDirectory() as tmpdir:
serverdir = Path(tmpdir)
host_key = serverdir / "hostkey"
clientdir = Path(directory)
backend = FileTableBackend(clientdir)
SshKeyServer.start_server(host_key, clients=backend, port=port, create_key=True)
if __name__ == "__main__":
cli()

80
src/sshecret/keepass.py Normal file
View File

@ -0,0 +1,80 @@
"""Keepass integration."""
import logging
from pathlib import Path
from typing import final, override, Self
import pykeepass
from . import constants
from .types import BasePasswordManager, PasswordContext
from .utils import generate_password
LOG = logging.getLogger(__name__)
@final
class KeepassManager(BasePasswordManager):
"""KeepassXC compatible password manager."""
master_password_identifier = constants.MASTER_PASSWORD
def __init__(self, location: Path) -> None:
"""Initialize password manager."""
self.location = location
self._keepass: pykeepass.PyKeePass | None = None
@property
def keepass(self) -> pykeepass.PyKeePass:
"""Return keepass instance."""
if self._keepass:
return self._keepass
raise RuntimeError("Error: Database has not been opened.")
@keepass.setter
def keepass(self, instance: pykeepass.PyKeePass) -> None:
"""Set the keepass instance."""
self._keepass = instance
@override
@classmethod
def create_database(cls, location: str, reader_context: PasswordContext, overwrite: bool = False) -> Self:
"""Create database."""
if Path(location).exists() and not overwrite:
raise RuntimeError("Error: Database exists.")
master_password = reader_context.get_password(cls.master_password_identifier)
# TODO: should we delete if overwrite is set?
keepass = pykeepass.create_database(location, password=master_password)
instance = cls(Path(location))
instance.keepass = keepass
return instance
@override
def open_database(self, reader: PasswordContext) -> None:
"""Open the database"""
password = reader.get_password(self.master_password_identifier)
instance = pykeepass.PyKeePass(str(self.location.absolute()), password=password)
self.keepass = instance
@override
def close_database(self) -> None:
"""Close the database."""
self._keepass = None
@override
def get_password(self, identifier: str) -> str:
"""Get password."""
if entry := self.keepass.find_entries(title=identifier, first=True):
if password := entry.password:
return str(password)
raise RuntimeError(f"Cannot get password for entry {identifier}")
@override
def generate_password(self, identifier: str) -> str:
"""Generate password."""
# Generate a password.
password = generate_password()
_entry = self.keepass.add_entry(self.keepass.root_group, identifier, constants.NO_USERNAME, password)
self.keepass.save()
LOG.debug("Created Entry %r", _entry)
return password

View File

@ -0,0 +1,67 @@
"""Password reader classes.
This implements two interfaces to read passwords:
InputPasswordReader and EnvironmentPasswordReader.
"""
import re
import os
from typing import override
import click
from .types import BasePasswordReader
from . import constants
RE_VARNAME = re.compile(r"^[a-zA-Z_]+[a-zA-Z0-9_]*$")
class InputPasswordReader(BasePasswordReader):
"""Read a password from stdin."""
@override
@classmethod
def get_password(cls, identifier: str) -> str:
"""Get password."""
if password := click.prompt(
f"Enter password for {identifier}", hide_input=True, type=str
):
return str(password)
raise ValueError("No password received.")
class EnvironmentPasswordReader(BasePasswordReader):
"""Read a password from the environment.
The environemnt variable will be constructured based on the identifier and the prefix.
Final environemnt variable will be validated according to the regex `[a-zA-Z_]+[a-zA-Z0-9_]*`
"""
def __init__(self, identifier: str) -> None:
"""Initialize class."""
self._identifier: str = identifier
def _resolve_var_name(self) -> str:
"""Resolve variable name."""
identifier = self._identifier.replace("-", "_")
fields = [constants.VAR_PREFIX, identifier]
varname = "_".join(fields)
if not RE_VARNAME.fullmatch(varname):
raise ValueError(
f"Cannot generate encode password identifier in variable name. {varname} is not a valid identifier."
)
return varname
def get_password_from_env(self) -> str:
"""Get password from environment."""
varname = self._resolve_var_name()
if password := os.getenv(varname, None):
return password
raise ValueError(f"Error: No variable named {varname} resolved.")
@override
@classmethod
def get_password(cls, identifier: str) -> str:
"""Get password."""
instance = cls(identifier)
return instance.get_password_from_env()

0
src/sshecret/py.typed Normal file
View File

View File

@ -0,0 +1,6 @@
"""Sshecret server module."""
from .server import SshKeyServer
__all__ = ["SshKeyServer"]

View File

@ -0,0 +1,123 @@
"""Client loaders."""
import logging
import os
from pathlib import Path
from typing import override
import littletable as lt
from sshecret.crypto import load_client_key, encrypt_string
from sshecret.types import ClientSpecification
from .types import BaseClientBackend
LOG = logging.getLogger(__name__)
def load_clients_from_dir(directory: Path) -> dict[Path, ClientSpecification]:
"""Load clients from a directory."""
if not directory.exists() or not directory.is_dir():
raise ValueError("Invalid directory specified.")
clients: dict[Path, ClientSpecification] = {}
for client_file in directory.glob("*.json"):
with open(client_file, "r") as f:
client = ClientSpecification.model_validate_json(f.read())
if client_file.name != f"{client.name}.json":
raise RuntimeError(
"Filename scheme of clients does not conform to expected format. Aborting import!"
)
clients[client_file] = client
return clients
class FileTableBackend(BaseClientBackend):
"""In-memory littletable based backend."""
def __init__(self, directory: Path) -> None:
"""Create backend instance."""
LOG.debug("Creating in-memory table to hold clients.")
self._directory: Path = directory
self.table: lt.Table[ClientSpecification] = lt.Table()
self._setup_table()
client_files = load_clients_from_dir(directory)
client_count = len(client_files)
LOG.debug("Loaded %s clients from disk.", client_count)
# self.client_file_map: dict[str, Path] = {client.name: filepath for filepath, client in client_files.items()}
LOG.debug("Inserting clients into table.")
self.table.insert_many(list(client_files.values()))
def _setup_table(self) -> None:
"""Set up the table."""
self.table.create_index("name", unique=True)
@override
def lookup_name(self, name: str) -> ClientSpecification | None:
"""Lookup client by name."""
if result := self.table.by.name.get(name):
if isinstance(result, ClientSpecification):
return result
return None
@override
def add_client(self, spec: ClientSpecification) -> None:
"""Add client."""
self.table.insert(spec)
self._write_spec_file(spec)
def _write_spec_file(self, spec: ClientSpecification) -> None:
"""Write spec file to disk."""
dest_file_name = f"{spec.name}.json"
dest_file = self._directory / dest_file_name
with open(dest_file.absolute(), "w") as f:
f.write(
spec.model_dump_json(exclude_none=True, exclude_unset=True, indent=2)
)
f.flush()
@override
def add_secret(
self,
client_name: str,
secret_name: str,
secret_value: str,
encrypted: bool = False,
) -> None:
"""Add secret."""
client: ClientSpecification = self.table.by.name[
client_name
] # pyright: ignore[reportAssignmentType]
if not encrypted:
public_key = load_client_key(client)
secret_value = encrypt_string(secret_value, public_key)
client.secrets[secret_name] = secret_value
self._update_client_data(client)
self._write_spec_file(client)
@override
def remove_client(self, name: str, persistent: bool = True) -> None:
"""Delete client."""
client = self.lookup_name(name)
if not client:
raise ValueError("Client does not exist!")
self.table.remove(client)
if persistent:
filename = f"{client.name}.json"
filepath = self._directory / filename
filepath.unlink()
@override
def update_client(self, name: str, spec: ClientSpecification) -> None:
"""Update client."""
if not self.lookup_name(name):
raise ValueError("Client does not exist!")
self._update_client_data(spec)
self._write_spec_file(spec)
def _update_client_data(self, spec: ClientSpecification) -> None:
"""Update client data."""
existing = self.lookup_name(spec.name)
if existing:
self.table.remove(existing)
self.add_client(spec)

View File

@ -0,0 +1,20 @@
"""Server errors."""
class BaseSshecretServerError(Exception):
"""Base Sshecret Server Error."""
class UnknownClientError(BaseSshecretServerError):
"""Client was not recognized."""
class AccessDeniedError(BaseSshecretServerError):
"""Client was not authorized to access the resource."""
class AccessPolicyViolationError(BaseSshecretServerError):
"""Client was not authorized to access the secret."""
class UnknownSecretError(BaseSshecretServerError):
"""Error when resolving the secret."""

View File

@ -0,0 +1,309 @@
"""SSH Server implementation."""
import ipaddress
import logging
import threading
import socket
from contextvars import ContextVar, Context, copy_context
from pathlib import Path
from typing import Any, override
import paramiko
from paramiko.common import (
AUTH_SUCCESSFUL,
AUTH_FAILED,
OPEN_SUCCEEDED,
OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED,
)
from sshecret import constants
from sshecret.crypto import create_private_rsa_key
from sshecret.types import ClientSpecification
from .types import BaseClientBackend, BaseServer
from . import errors
CLIENT_STATE = threading.local()
client_secret_request_name: ContextVar[str] = ContextVar("client_secret_request_name")
client_request_name: ContextVar[str] = ContextVar("client_request_name")
LOG = logging.getLogger(__name__)
class TransportContext(paramiko.Transport):
"""Context-aware transport."""
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.__context: Context | None = None
def snapshot_context(self) -> None:
"""Take a snapshot of the current context."""
LOG.debug("Snapshot!")
self.__context = copy_context()
#object.__setattr__(self, "__context", copy_context())
def get_context(self) -> None:
"""Get the frozen context into our current one."""
if contextobj := self.__context:
for var, value in contextobj.items():
var.set(value)
class SshServerInterface(paramiko.ServerInterface):
"""Define our ssh server interface."""
def __init__(self, clients: BaseClientBackend, client_address: str) -> None:
"""Initialize server interface."""
self.clients: BaseClientBackend = clients
self.client_address: str = client_address
self.event: threading.Event = threading.Event()
@override
def check_auth_publickey(self, username: str, key: paramiko.PKey) -> int:
"""Check if we can authenticate."""
LOG.debug("Verifying public key of username %s", username)
if self.clients.verify_key(username, key):
LOG.debug("Key matches configured key.")
return AUTH_SUCCESSFUL
LOG.warning("Key did not match. Auth denied!")
return AUTH_FAILED
@override
def get_allowed_auths(self, username: str) -> str:
"""Get allowed auth methods."""
return "publickey"
@override
def check_channel_request(self, kind: str, chanid: int) -> int:
LOG.debug("Open channel request received: kind=%s, chanid=%r", kind, chanid)
if kind == "session":
LOG.debug("Session requested.")
return OPEN_SUCCEEDED
LOG.warning("Prohibited channel request received.")
return OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED
@override
def check_channel_shell_request(self, channel: paramiko.Channel) -> bool:
"""Check shell request."""
# This shouldn't be allowed.
LOG.debug("Channel request received. Channel: %r", channel)
return False
@override
def check_channel_exec_request(
self, channel: paramiko.Channel, command: bytes
) -> bool:
"""Check if the exec request is valid.
This is where we send the password. The command is always the name of
the secret.
"""
LOG.debug("Exec request received: command: %r", command)
# Documentation says command is a string, but typeshed says it's bytes...
command_str = command.decode()
transport = channel.get_transport()
if not transport.is_authenticated():
return False
username = transport.get_username()
LOG.debug("Resolved username: %r", username)
if not username:
return False
if not isinstance(channel.transport, TransportContext):
LOG.critical("Error: Incorrect transport class. Cannot process commands.")
self.event.set()
return False
client_secret_request_name.set(command_str)
client_request_name.set(username)
channel.transport.snapshot_context()
self.event.set()
LOG.debug("Command check completed.")
return True
def check_allowed_client_ip(self, client: ClientSpecification) -> bool:
"""Check if the client is allowed to log in based on source IP."""
LOG.debug(
"Checking if client is allowed to log in from %s", self.client_address
)
if isinstance(client.allowed_ips, str) and client.allowed_ips == "*":
LOG.debug("Client has no restrictions on source IP address. Permitting.")
return True
if isinstance(client.allowed_ips, str):
if self.client_address == client.allowed_ips:
LOG.debug("Client IP matches permitted address")
return True
LOG.warning(
"Connection for client %s received from IP address %s that is not permitted.",
client.name,
self.client_address,
)
return False
client_ip = ipaddress.ip_address(self.client_address)
if client_ip in client.allowed_ips:
LOG.debug("Client IP matches permitted address")
return True
LOG.warning(
"Connection for client %s received from IP address %s that is not permitted.",
client.name,
self.client_address,
)
return False
class SshKeyServer(BaseServer):
"""SSH secrets server."""
def __init__(self, host_key: Path, clients: BaseClientBackend) -> None:
"""Create server instance."""
super().__init__()
self._host_key: paramiko.RSAKey = paramiko.RSAKey.from_private_key_file(
str(host_key), None
)
self.clients: BaseClientBackend = clients
def resolve_client(self) -> ClientSpecification:
"""Resolve client."""
LOG.debug("Looking up client data.")
client_name = client_request_name.get(None)
if not client_name:
LOG.debug("No context data was resolved.")
raise errors.UnknownClientError(constants.ERROR_NO_COMMAND_RECEIVED)
client = self.clients.lookup_name(str(client_name))
if not client:
raise errors.UnknownClientError(constants.ERROR_UKNOWN_CLIENT_OR_SECRET)
return client
def get_secret(self, client: ClientSpecification) -> str:
"""Get command."""
LOG.debug("Looking up secret as requested.")
secret_name = client_secret_request_name.get(None)
if not secret_name:
raise errors.UnknownSecretError(constants.ERROR_UKNOWN_CLIENT_OR_SECRET)
secret = client.secrets.get(str(secret_name))
if not secret:
raise errors.UnknownSecretError(constants.ERROR_NO_SECRET_FOUND)
return secret
def check_connection_allowed(self, client: ClientSpecification, source: str) -> None:
"""Check if client is allowed to request secrets."""
LOG.debug(
"Checking if client is allowed to log in from %s", source
)
if isinstance(client.allowed_ips, str) and client.allowed_ips == "*":
LOG.debug("Client has no restrictions on source IP address. Permitting.")
return
if isinstance(client.allowed_ips, str) and "/" not in client.allowed_ips:
if source == client.allowed_ips:
LOG.debug("Client IP matches permitted address")
return
LOG.warning(
"Connection for client %s received from IP address %s that is not permitted.",
client.name,
source
)
raise errors.AccessPolicyViolationError(constants.ERROR_SOURCE_IP_NOT_ALLOWED)
source_ip = ipaddress.ip_address(source)
permitted = False
for client_ip in client.allowed_ips:
if isinstance(client_ip, (ipaddress.IPv4Network, ipaddress.IPv6Network)):
if source_ip in client_ip:
permitted = True
break
else:
if source_ip == client_ip:
permitted = True
break
if not permitted:
raise errors.AccessPolicyViolationError(constants.ERROR_SOURCE_IP_NOT_ALLOWED)
LOG.debug("Matched client to permitted IP address statement.")
return
@override
def connection_function(
self, client_socket: socket.socket, client_address: str
) -> None:
"""Run on connection."""
LOG.debug("Connection function called by %s", client_address)
try:
session = TransportContext(client_socket)
session.add_server_key(self._host_key)
server = SshServerInterface(self.clients, client_address)
try:
session.start_server(server=server)
except paramiko.SSHException:
return
channel = session.accept(30)
if not channel:
LOG.debug("No channel opened!")
return
LOG.debug("Got channel: %r, transport: %r, ", channel, channel.transport)
server.event.wait()
LOG.debug("Opening channel file")
stdout = channel.makefile("rw")
LOG.debug("Extracting context.")
session.get_context()
try:
LOG.debug("Looking up client.")
client = self.resolve_client()
LOG.debug("Checking source address policy.")
self.check_connection_allowed(client, client_address)
LOG.debug("Looking up secret.")
secret = self.get_secret(client)
except errors.BaseSshecretServerError as e:
error_message = f"{e}\n"
LOG.critical(e, exc_info=True)
stdout.write(error_message)
session.close()
return
stdout.write(secret)
session.close()
except Exception as e:
LOG.critical(e, exc_info=True)
@classmethod
def start_server(
cls,
host_key: str | Path,
clients: BaseClientBackend,
bind_address: str = "127.0.0.1",
port: int = 22,
timeout: int = 10,
create_key: bool = False,
) -> None:
"""Start the server.
Args:
host_key: path to the private host key (str or Path)
clients: Client secret loader instance.
bind_address: address to bind to (default: 127.0.0.1)
port: Port to bind to (default: 22)
timeout: Socket timeout, default 1 second
create_key: Create the private key if it doesn't exist.
"""
if isinstance(host_key, str):
host_key = Path(host_key)
if not host_key.exists():
if create_key:
create_private_rsa_key(host_key)
else:
raise RuntimeError("Error: provided host key does not exist.")
server = cls(host_key, clients)
server.start(bind_address, port, timeout)

View File

@ -0,0 +1,137 @@
"""Base types and interfaces for the server."""
import abc
import logging
import socket
import sys
import threading
from typing import Any, TypeGuard
from cryptography.hazmat.primitives import serialization
import paramiko
from sshecret.types import ClientSpecification
from sshecret.crypto import load_client_key
LOG = logging.getLogger(__name__)
def validate_socket_tuple(data: Any) -> TypeGuard[tuple[str, int]]:
"""Validate socket accept return data.."""
if not isinstance(data, tuple):
return False
if not len(data) == 2:
return False
ip, port = data # pyright: ignore[reportUnknownVariableType]
if not isinstance(ip, str):
return False
if not isinstance(port, int):
return False
return True
class BaseClientBackend(abc.ABC):
"""Base client backend.
This class is responsible for managing the list of clients and facilitate
lookups.
"""
@abc.abstractmethod
def lookup_name(self, name: str) -> ClientSpecification | None:
"""Lookup a client specification by name."""
def _convert_to_pkey(self, client: ClientSpecification) -> paramiko.RSAKey:
"""Convert client key to paramiko key."""
client_key = load_client_key(client)
return paramiko.RSAKey(key=client_key)
def verify_key(self, name: str, key: paramiko.PKey) -> ClientSpecification | None:
"""Verify key."""
client = self.lookup_name(name)
if not client:
return None
LOG.debug("Verifying key: %r", key)
expected_key = self._convert_to_pkey(client)
if key == expected_key:
return client
return None
@abc.abstractmethod
def add_secret(self, client_name: str, secret_name: str, secret_value: str, encrypted: bool = False) -> None:
"""Add a secret to a client."""
@abc.abstractmethod
def add_client(self, spec: ClientSpecification) -> None:
"""Add a new client."""
@abc.abstractmethod
def update_client(self, name: str, spec: ClientSpecification) -> None:
"""Update client information."""
@abc.abstractmethod
def remove_client(self, name: str, persistent: bool = True) -> None:
"""Delete a client."""
class BaseServer(abc.ABC):
"""Base SSH server."""
def __init__(self) -> None:
"""Initialize the server."""
self._is_running: threading.Event = threading.Event()
self._socket: socket.socket | None = None
self._listen_thread: threading.Thread | None = None
def start(
self, address: str = "127.0.0.1", port: int = 22, timeout: int = 1
) -> None:
"""Start the server."""
if not self._is_running.is_set():
LOG.info("Starting SSH server on %s port %s", address, port)
self._is_running.set()
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
if sys.platform == "linux" or sys.platform == "linux2":
self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, True)
LOG.debug("Setting socket timeout %s", timeout)
self._socket.settimeout(timeout)
LOG.debug("Binding to %s:%s", address, port)
self._socket.bind((address, port))
LOG.debug("Spawning thread.")
self._listen_thread = threading.Thread(target=self._listen)
self._listen_thread.start()
def stop(self) -> None:
"""Stop the server."""
if self._is_running.is_set():
self._is_running.clear()
if self._listen_thread:
self._listen_thread.join()
if self._socket:
self._socket.close()
@abc.abstractmethod
def connection_function(
self, client_socket: socket.socket, client_address: str
) -> None:
"""Run function on connect."""
def _listen(self) -> None:
"""Connect client to function."""
if self._socket is None:
raise RuntimeError("Received connection request without any socket")
while self._is_running.is_set():
try:
self._socket.listen()
client_socket, addr = self._socket.accept() # pyright: ignore[reportAny]
if not validate_socket_tuple(addr):
LOG.warning("Socket address tuple did not pass typeguard check!")
continue
LOG.debug("Received connection from %r", addr)
self.connection_function(client_socket, addr[0])
except socket.timeout:
pass

29
src/sshecret/testing.py Normal file
View File

@ -0,0 +1,29 @@
"""Testing utilities and classes."""
import tempfile
from dataclasses import dataclass, field
from contextlib import contextmanager
from pathlib import Path
from collections.abc import Iterator
from .utils import create_client_file
@dataclass
class TestClientSpec:
"""Specification of a test client."""
name: str
secrets: dict[str, str] = field(default_factory=dict)
@contextmanager
def test_context(clients: list[TestClientSpec]) -> Iterator[Path]:
"""Create a test context."""
with tempfile.TemporaryDirectory() as tmpdir:
dirpath = Path(tmpdir)
for client in clients:
filename = dirpath / f"{client.name}.json"
create_client_file(client.name, filename, client.secrets)
yield dirpath

85
src/sshecret/types.py Normal file
View File

@ -0,0 +1,85 @@
"""Interfaces and types."""
import abc
from typing import Self
from pydantic import BaseModel, field_serializer
from pydantic.networks import IPvAnyAddress, IPvAnyNetwork
class BasePasswordReader(abc.ABC):
"""Abstract strategy class to read a passwords."""
@classmethod
@abc.abstractmethod
def get_password(cls, identifier: str) -> str:
"""Resolve the password, e.g., via input."""
class PasswordContext:
"""Context class for resolving a password."""
def __init__(self, reader: type[BasePasswordReader]) -> None:
"""Initialize password context."""
self._reader: type[BasePasswordReader] = reader
@property
def reader(self) -> type[BasePasswordReader]:
"""Return reader."""
return self._reader
@reader.setter
def reader(self, reader: type[BasePasswordReader]) -> None:
"""Set the reader instance."""
self._reader = reader
def get_password(self, identifier: str) -> str:
"""Get the password."""
return self.reader.get_password(identifier)
class BasePasswordManager(abc.ABC):
"""Abstract base class for password managers."""
master_password_identifier: str
@classmethod
@abc.abstractmethod
def create_database(
cls, location: str, reader_context: PasswordContext, overwrite: bool = False
) -> Self:
"""Create database.
Location can be a file, a url or something else.
"""
@abc.abstractmethod
def open_database(self, reader: PasswordContext) -> None:
"""Open database."""
@abc.abstractmethod
def close_database(self) -> None:
"""Close database."""
@abc.abstractmethod
def get_password(self, identifier: str) -> str:
"""Get a password from the manager."""
@abc.abstractmethod
def generate_password(self, identifier: str) -> str:
"""Generate a password using unspecified default rules.
May be expanded later.
Returns the generated password.
"""
class ClientSpecification(BaseModel):
"""Specification of client."""
name: str
public_key: str
allowed_ips: list[IPvAnyAddress | IPvAnyNetwork] | str = "*"
secrets: dict[str, str] = {}
testing_private_key: str | None = None # Private key only for testing purposes!

60
src/sshecret/utils.py Normal file
View File

@ -0,0 +1,60 @@
"""Various utilities."""
import secrets
from pathlib import Path
from .crypto import load_client_key, encrypt_string, generate_private_key, generate_pem, generate_public_key_string
from .types import ClientSpecification
def generate_password() -> str:
"""Generate a password.
"""
return secrets.token_urlsafe(32)
def generate_client_object(name: str, secrets: dict[str, str] | None = None, keyfile: str | None = None) -> ClientSpecification:
"""Generate a client object."""
private_key = generate_private_key()
if keyfile:
with open(keyfile, "r") as f:
contents = f.read()
if not contents.startswith("ssh-rsa "):
raise RuntimeError("Error: Key must be an RSA key.")
client = ClientSpecification(name=name, public_key=contents.strip())
public_key = load_client_key(client)
else:
pem = generate_pem(private_key)
public_key = private_key.public_key()
pubkey_str = generate_public_key_string(public_key)
client = ClientSpecification(name=name, public_key=pubkey_str, testing_private_key=pem)
if secrets:
for secret_name, secret_value in secrets.items():
client.secrets[secret_name] = encrypt_string(secret_value, public_key)
return client
def create_client_file(name: str, filename: Path | str, secrets: dict[str, str] | None = None, keyfile: str | None = None) -> None:
"""Create client file."""
client = generate_client_object(name, secrets, keyfile)
with open(filename, "w") as f:
f.write(client.model_dump_json(exclude_none=True, indent=2))
f.flush()
def add_secret_to_client_file(filename: str | Path, secret_name: str, secret_value: str) -> None:
"""Add secret to client file."""
with open(filename, "r") as f:
client = ClientSpecification.model_validate_json(f.read())
public_key = load_client_key(client)
encrypted = encrypt_string(secret_value, public_key)
client.secrets[secret_name] = encrypted
with open(filename, "w") as f:
json_str = client.model_dump_json(exclude_none=True, indent=2)
f.write(json_str)
f.flush()

View File

@ -0,0 +1,38 @@
-----BEGIN OPENSSH PRIVATE KEY-----
b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAABlwAAAAdzc2gtcn
NhAAAAAwEAAQAAAYEAvLI+RsQWmzdg5uehSLgeqdHyuENtA2oKwNAxVatjNKEasJYIbe0j
rEDj8+VFrrqo65wl9nmz5q5/sQwuWSMtChl12HT9ouPorssYgbBSH/TkE3z6ozjlcaqBcP
VegdFyRKN5oUbG/IFrV/cTztX/WEKMOSv0jGyRNo625Mmvrpw2hkBdOGtu2dkDJZjUx8MR
2Z/oQXpkJk9036nZZCaxLJHyKh9Ctyv6ZeeDRmvAjsgo/AH/v+54yZn9Lr2I+XC0tAL4L4
PkhhUnf40inWFTD6pc9PJSStc9MRqwP+GmDMMLE9erwyV9SbsrkC2iqeYB/8mLzK6C0rIq
/P1AK3i82lIYS1kKvHrjU0HGUSwkb3lR/rsqAEQNAUpNUFN7SXSPP9STMTevIO9L56Yr9G
oHJM1FxORcQbEsVyfGo3QU/mLTCye8yZ0TbnRI5F2EsZUAG0wdeREWklMAAdGElPuY2SFK
VZ4VOM1jT8ahfCLfmqStrZHe7Yo7VptBSlmSvip/AAAFgEX7wZNF+8GTAAAAB3NzaC1yc2
EAAAGBALyyPkbEFps3YObnoUi4HqnR8rhDbQNqCsDQMVWrYzShGrCWCG3tI6xA4/PlRa66
qOucJfZ5s+auf7EMLlkjLQoZddh0/aLj6K7LGIGwUh/05BN8+qM45XGqgXD1XoHRckSjea
FGxvyBa1f3E87V/1hCjDkr9IxskTaOtuTJr66cNoZAXThrbtnZAyWY1MfDEdmf6EF6ZCZP
dN+p2WQmsSyR8iofQrcr+mXng0ZrwI7IKPwB/7/ueMmZ/S69iPlwtLQC+C+D5IYVJ3+NIp
1hUw+qXPTyUkrXPTEasD/hpgzDCxPXq8MlfUm7K5AtoqnmAf/Ji8yugtKyKvz9QCt4vNpS
GEtZCrx641NBxlEsJG95Uf67KgBEDQFKTVBTe0l0jz/UkzE3ryDvS+emK/RqByTNRcTkXE
GxLFcnxqN0FP5i0wsnvMmdE250SORdhLGVABtMHXkRFpJTAAHRhJT7mNkhSlWeFTjNY0/G
oXwi35qkra2R3u2KO1abQUpZkr4qfwAAAAMBAAEAAAGADRJb9hMHbeE8OUK6jYsTtLfylI
k3OBFUhV7mzAR/btnqO2lpVBQlcH1eTTsIxL3xjcDXcGel6skT13P8kfg52oVBAKm6GFqp
d9Jh9Dn+tnAEjMUPp9b9Lg6dwPF+hoe33sFkX6PDjSJ6CTH4kU+JzNdvV1aQLlonBRyF1v
uRzArOTCaRTqNCnpzF9wjLVLtStTy6ni6YWX8PnZ7qjGGRzICfwgNAX+gQBJcxJOO6Byoe
jLamvOkMPQsJ2v8OShlgjJc7A9TZ8EtdAPHgxr1UHqLypNz9M3OoPsBWEAacPmb7k/+Lal
R3wzbPlFuxCQKiUUc6vJrqUz1+w2s+iaJTKgLoARSwqPe+6GU1bZ7yMYJT0lCuXtFPAtmf
QXORpxe6iNGBuAIaxQfTZ0+NZpzxV4CVnA5Edf7wY4HxG+BPqp+nPF9sG6GvBvXALE+uMN
vgLcO8pjdJRdXvYIf8/SqP5xsrJcIAL2DWhJNPvk8WsfN4hJAUzM99f5OXH3kZbaEBAAAA
wAC+Wotra9EeXdbboSRrbG6Mm3whJugwNzdDY7CovTW/Mgydpowt8LSFKfIRiH6m/7HuF4
tf27tpS9ucFncqY0v1BNAd7ctl3/WZJ5cLcm8WOhX0R4VWh/mKcj1Sxl9gDylbxt7hZPw0
mlvNymZFZH8POBWMheSLPBYxlNqgPIphbgNr04AlTXE8CApCOKzMpPd0IEcGovLYDCrA+k
TQhJk38xy1iE7wI5dhqKprCOk0488capTJWEm7nx5HbmmfHQAAAMEA+RGAQR2Y8XPQ+Zyk
DGSsUjFjgeSyy/VVBnx0flq2Of6dsni6S0ffRKHHCz1TFYwQxsqedbRV6dV8shoTfI/L51
mtWXN+Wo+RSmF3pFvqjZaI5D7ai6mqwas0yZEAcHnzAY2Yz6TbH3x/PHIbU7MXcv6gzFXs
VWTMRVCTI/rYdPH3VOGkM+0DsLXbGivae0kCK14vsQJq/yHNwMHOdzWtPJtu51hKpP5jOq
jQ7CzuUKxm0oTv9px4rK/SayY8v4KfAAAAwQDB8p5T56lYVbSPecH4nxy0gMvmGtBqTxcr
towIFiVygDMFK4z+Ey4qK4E2CX50V3/Gjezl4hoRPuhu3L+xV6Q+MAPedc15IMhw2TZhrx
SkDj3nCBz13Jex8KBcMmdM1Yc8eHTFDSVArovCRLcwCFzWAyKRCRuNYBbacwWb0BlpObgS
00N+ISNT6q+IyuXz3lDY7ZEyEdtUFSjeFJZ5XzwJ96IWsbh0n/qW7jSdj0Knc4OG4ud4mq
CogUWmH6mFLCEAAAAEdGVzdAECAwQFBgc=
-----END OPENSSH PRIVATE KEY-----

View File

@ -0,0 +1 @@
ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABgQC8sj5GxBabN2Dm56FIuB6p0fK4Q20DagrA0DFVq2M0oRqwlght7SOsQOPz5UWuuqjrnCX2ebPmrn+xDC5ZIy0KGXXYdP2i4+iuyxiBsFIf9OQTfPqjOOVxqoFw9V6B0XJEo3mhRsb8gWtX9xPO1f9YQow5K/SMbJE2jrbkya+unDaGQF04a27Z2QMlmNTHwxHZn+hBemQmT3TfqdlkJrEskfIqH0K3K/pl54NGa8COyCj8Af+/7njJmf0uvYj5cLS0Avgvg+SGFSd/jSKdYVMPqlz08lJK1z0xGrA/4aYMwwsT16vDJX1JuyuQLaKp5gH/yYvMroLSsir8/UAreLzaUhhLWQq8euNTQcZRLCRveVH+uyoARA0BSk1QU3tJdI8/1JMxN68g70vnpiv0agckzUXE5FxBsSxXJ8ajdBT+YtMLJ7zJnRNudEjkXYSxlQAbTB15ERaSUwAB0YSU+5jZIUpVnhU4zWNPxqF8It+apK2tkd7tijtWm0FKWZK+Kn8= test

10
tests/clients/test1.json Normal file
View File

@ -0,0 +1,10 @@
{
"name": "test1",
"public_key": "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQCkLJeFw2U3PL35siEPrQIpDgP4KleX8A5wiKDvHFNUNgN7hzAA0IFYuzScuHGPx0ZEVSq3H4YVvE9/GJaMT1sfy64EeuGc7TEuoWataBXsTqPmrTI/jeSJ6Zpxadepazt/0ZJzb1aj0+P8Kqc9CnHPFaCwaa81GpYB7DvGM3BbK3u5n9/AGA5cm3QzGK/lxOu9T50MkR+bjutHa9nZDgTxc/4Ydm8rdAd5vVfC4yl0t56uRHvpZcdyJ+F5xYlJSbCG//bwaksnT2EEK/pPE0LHYRM8YBPWc8W/1HN4GFuRU9kzhPmNzIjXxOz4a/gEiHzFoDakZhdBKlo6egkhHcRF",
"allowed_ips": "*",
"secrets": {
"MY_FOO_VAL": "CUwIDDvZG8n9WLeJ8TKBRlBnL4MZQwXihyOD0eOWhCIL3+8Pigl0WhSzGuB+VcXffcEHmM9MIPrYY2wNEs/iDwGdD4HdABbJHOgW/ezUislghbcQ1FwxzgqeJ5388VZxa0d11bzhFklEl6mp2FCTFIpmM4SpowJ6ZmI9w5OwQVHEC74vP3rIls9BC3avZMVWUWhF1LTvoqacPxYJ7FMXeUyy4NW4Vi1A6blzLBRmDkDfaXLglUZX36BA1+zm1Gsjx5ziZQ9ObtmtR69Bwon6RuoqU1uEeAjWg/wL2h/7IovUcQeGKaSySi/mS7OZISJHpv2TLun8J+mVnl4NJNSzkQ==",
"MY_BAR_VAL": "V6M9uBELq5gjJjXZIHCIoXq0bTtSNXpfIk63BpT/DxJhNZvpmpaX6Bn1gQPYfzi00mR+CzY6nvAuH8fVd8nMcUhng1DdGl7fUhiDLEBBwuWaHZ/iprGA6z8ZoPDkO6bNRWI2733VYjqZ1IQHEWlt0NuzXGFwUOS60YKljd/gZ8AgKrIioV/Pvuz7JT8Ko1gLIOXzzwStPjaB3T07zTX8F3Tz1F8Vc8iYtC+Xd5lzk1AQJE3HvsDChFNXwE0fbbKxe6zQwg9XzIrkU3bwtzKaEjyOPg8uke+ZqGSgpVz5c98iWRf215pcalgF5iKrm5bFW8EhE4DgCkkSbWpzmWp27A=="
},
"testing_private_key": "-----BEGIN OPENSSH PRIVATE KEY-----\nb3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAABFwAAAAdzc2gtcnNhAAAA\nAwEAAQAAAQEApCyXhcNlNzy9+bIhD60CKQ4D+CpXl/AOcIig7xxTVDYDe4cwANCBWLs0nLhxj8dG\nRFUqtx+GFbxPfxiWjE9bH8uuBHrhnO0xLqFmrWgV7E6j5q0yP43kiemacWnXqWs7f9GSc29Wo9Pj\n/CqnPQpxzxWgsGmvNRqWAew7xjNwWyt7uZ/fwBgOXJt0Mxiv5cTrvU+dDJEfm47rR2vZ2Q4E8XP+\nGHZvK3QHeb1XwuMpdLeerkR76WXHcifhecWJSUmwhv/28GpLJ09hBCv6TxNCx2ETPGAT1nPFv9Rz\neBhbkVPZM4T5jcyI18Ts+Gv4BIh8xaA2pGYXQSpaOnoJIR3ERQAAA7gH0327B9N9uwAAAAdzc2gt\ncnNhAAABAQCkLJeFw2U3PL35siEPrQIpDgP4KleX8A5wiKDvHFNUNgN7hzAA0IFYuzScuHGPx0ZE\nVSq3H4YVvE9/GJaMT1sfy64EeuGc7TEuoWataBXsTqPmrTI/jeSJ6Zpxadepazt/0ZJzb1aj0+P8\nKqc9CnHPFaCwaa81GpYB7DvGM3BbK3u5n9/AGA5cm3QzGK/lxOu9T50MkR+bjutHa9nZDgTxc/4Y\ndm8rdAd5vVfC4yl0t56uRHvpZcdyJ+F5xYlJSbCG//bwaksnT2EEK/pPE0LHYRM8YBPWc8W/1HN4\nGFuRU9kzhPmNzIjXxOz4a/gEiHzFoDakZhdBKlo6egkhHcRFAAAAAwEAAQAAAQAO+YFFom22RNxY\nLPL+jMuMZpqelXAha/RJN/Ej9ju0i+uz5gAPJvWRXBv/qoQzNtw2KeWISAARNfizUVEUEb3wT8H6\n5yFykKECjZbBvON5BzBEf2o8qUrd+HiNnTeeXKlT7o/y5wYqUc6zBsnz06LPXnvmc3FXgOoLWVqX\naMQ4EID4y1pVKSFAE4GYstS2JKrpBtCkK22MyIBhMD5+Oxgy1FN7WPUfkVBSJiKsTLuh2nlyZCt+\nZQI9H81kaiqztvl1R+VFfi2xG9YVADLatcNb+1tuQbscAY9A2zdmHurYTAmUg++VlLGUXoMtZy2u\nPYmTBKFNm7cD+okKjr9akSCBAAAAgDsH/+xzp8ia99ZyRvfObnWBhQXt7yDJOco3E34zURTmfYEv\nNVKBfnC1NHZMofBbR05ww8BqQE5z0zV050pNdEDVLs92CJpSvXAoKO99OCbPFyNNSBWYUbZajZ8G\nmVDZ9EkbK0PkeHZAYDRSWmF3Hx1RVjp0+dvWtQCjdCONtMX+AAAAgQDh1lo8a+dm5CV2vROPfYLl\nlGrLnUsZY7JmctiFBkgU7TuRDfpgN4gRM64fCmbFKetLuoe6OG7UWTyWIk1zyfV0rm52amhZJvQL\nLyKX+R90XcTUvssa6XRPMFx31FkJ5mc97HAX1qsZQpfWB6Y9u5vgUpXOgki3Ra7q/MdFEU2jgQAA\nAIEAuhnnRv8ebM/ugRkHEXvS2Mndeh/37u77HxRhUyd6bbZnzIuH9FXw9LkMOH+jnLNJqU4CKQQD\ni5faIZwSrc+u+TsQAZqvCbmhGbvM6Pjl9cI3ucp6lj7ntI3MvrzygfR168bO0KCU8KUzGsbqqTur\n0DvUcZDqCFepbaGOEXdP8sUAAAAAAQID\n-----END OPENSSH PRIVATE KEY-----\n"
}

10
tests/clients/test2.json Normal file
View File

@ -0,0 +1,10 @@
{
"name": "test2",
"public_key": "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQCPzsiHOLEFlTp3OiBDoYssfLfYboziOTyBXKT7O5Nd8++5JdhE0qw+4mxj3ehEhcHsmB7TU5DtwEEtbrVi4eStLD8hGTgfeGIhTHUbd7sIJhAPPPbEnW/JdJURxNZNBLiEHN5VAMYgvqVIpeYoTOk15gHHonwCpZAAmFX3X8TgkBH5sJLUW5eOx915mWcTAGm174KNhXgYAWFV0e5Q4fi6ksXv1eYXfxvGrbOQS5pgoHN21pNy7hfukF1AhTf2eAS1EXnXlt3dVzfFRI+73+vAtquifv68ECBMfeoUSkUBT7plH315Ibg4/9RCW83zJVA/JlLUnJ6Az9d7SvTCJT/R",
"allowed_ips": "*",
"secrets": {
"MY_BAR_VAL": "awE7TWONHKELcggRR24kFSkmPdsb6pANll/DEfLnB7rs5LeiPm2i8jcpvr0PZuDwvRmcZShMMJDCjqwvlh+ioCcCNNz0htH7O4Uh7ycf5+RzKzOnRWGOG5sfOBajtETjsmZf7YKGC7dd4RypZ8u0hxwY0kyCgqep8JiMjacGw8Gdz/BTDvGJ8RzWcUcwf1IjhtCBSDyC/GWPkB7uIbHqi4AUbJ6Y86T317z/NeW15sZpC/1vRW+sJeNV09lzXYMBsaya6Z1ppyUfCSEywZgCV+olp64DP570t2H8fBZkRo5Z7F2PuUpv4exWkcW5tXXv0OgH4Kwi4Zr+AHauqR/CgQ==",
"MY_FOO_VAL": "c98Jq4WYJ9gXezNipzrzCcHDFIaDsIMW+HnDE8shZxX/uELkfPIC0j6MQHPWJvxMJs2/c+unJ7yX6MWe/oSQbSfKM+5YzNhSh37fTmeJDAa1giooNg0trDEmzS3EgXuTIk5ltPkg31wCp2FDrViBERnc4QsOWXdPztKAbuJYj4/pSSU5mk4lmjnXg0fLHmK3n79wXGpMST25/LlZFUbu7n5pJZRZzth88qMu3/rvQqsDTt6JWeqBYo3A+UJnT1Me/PpDTQqvzhPKHf9IGIIN+QVYyvq+ya5kgO3fTlomIwNqznuE6Tw0nxFXP67cbc8F93+rmVyxaj2US758oX61jw=="
},
"testing_private_key": "-----BEGIN OPENSSH PRIVATE KEY-----\nb3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAABFwAAAAdzc2gtcnNhAAAA\nAwEAAQAAAQEAj87IhzixBZU6dzogQ6GLLHy32G6M4jk8gVyk+zuTXfPvuSXYRNKsPuJsY93oRIXB\n7Jge01OQ7cBBLW61YuHkrSw/IRk4H3hiIUx1G3e7CCYQDzz2xJ1vyXSVEcTWTQS4hBzeVQDGIL6l\nSKXmKEzpNeYBx6J8AqWQAJhV91/E4JAR+bCS1FuXjsfdeZlnEwBpte+CjYV4GAFhVdHuUOH4upLF\n79XmF38bxq2zkEuaYKBzdtaTcu4X7pBdQIU39ngEtRF515bd3Vc3xUSPu9/rwLaron7+vBAgTH3q\nFEpFAU+6ZR99eSG4OP/UQlvN8yVQPyZS1JyegM/Xe0r0wiU/0QAAA7gH0BRKB9AUSgAAAAdzc2gt\ncnNhAAABAQCPzsiHOLEFlTp3OiBDoYssfLfYboziOTyBXKT7O5Nd8++5JdhE0qw+4mxj3ehEhcHs\nmB7TU5DtwEEtbrVi4eStLD8hGTgfeGIhTHUbd7sIJhAPPPbEnW/JdJURxNZNBLiEHN5VAMYgvqVI\npeYoTOk15gHHonwCpZAAmFX3X8TgkBH5sJLUW5eOx915mWcTAGm174KNhXgYAWFV0e5Q4fi6ksXv\n1eYXfxvGrbOQS5pgoHN21pNy7hfukF1AhTf2eAS1EXnXlt3dVzfFRI+73+vAtquifv68ECBMfeoU\nSkUBT7plH315Ibg4/9RCW83zJVA/JlLUnJ6Az9d7SvTCJT/RAAAAAwEAAQAAAQAKu8Dc0t7nj0VP\nW8/HrHmCRwbDyTCLvADnmN4ZgE9V/lyAobH8JQtFIEo9w/TPlHouagY2+LBDBov206IHMNwMDtbh\nZgv50VblrFq7Q5r6lziwoni6oROUYja0HlBubDFHbw4rIwUmsYQNoZBFpsPrSXENkPOXkProCHa2\nIXhE2G9Sk1i2Gxjwpk2gZ4zqYZd0MVs1YkySj1XyDc87JDKLZN9c0KXk0O9sTNqUp8sOospk++zx\n44zN/u3AES3F86HUjqZD2gJDLXJYAwZSFKcabLRXFMaupGw9XSEhIvb9ET3rSxqdE9/79JuyG3ep\nXUUmDfPIB/SMzCYLo9NwEXQJAAAAgFthqmQkXj84I+DrKxk4PVebufruIFkgdtN4aYVSSUvLFAzP\ngYIaBCroviUruSY1EPR25wnhADkMgJxteca/irkYS/Rg84lmGzWZvnmD6ppZTvAGqjrGkflztBkG\nGv3MNi1dALckzGHNDmQX3yjMhh8qYZiCjMCVB+7Cc94sy5B+AAAAgQDA4qIzdSDKuctl5KSFNMhN\nWdNx8KQTahuXLzmB3cMKzNPT/0ZiBbX2bIn4R+HsxYmc3eBDUuRyMrG6cMyyFujECzQdNC1Gky8L\nQDH0GyMDVlD9c4lQenk9MIr9dy9P1XfFaMMQL6z3tAkpoXugPgaimgbRGoD2MzVpRM8u9GMEpQAA\nAIEAvt0V5NSaO1odjhNJIEF5MubPIvhWLsE223rKSN0uzbYXWlA6zLaZiLmp8+InzXSuyxhvZTMx\nXCx1KqzRDPHhzCDTI7kkw3o3YFWVvmZBEm/nClTloEK8PB3/MoUu1rXQhqkKmtyf5knYdf5dZ10z\nD/d+gS2nILbdt1zuR3Hc6r0AAAAAAQID\n-----END OPENSSH PRIVATE KEY-----\n"
}

8
tests/clients/test3.json Normal file
View File

@ -0,0 +1,8 @@
{
"name": "test3",
"public_key": "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABgQC8sj5GxBabN2Dm56FIuB6p0fK4Q20DagrA0DFVq2M0oRqwlght7SOsQOPz5UWuuqjrnCX2ebPmrn+xDC5ZIy0KGXXYdP2i4+iuyxiBsFIf9OQTfPqjOOVxqoFw9V6B0XJEo3mhRsb8gWtX9xPO1f9YQow5K/SMbJE2jrbkya+unDaGQF04a27Z2QMlmNTHwxHZn+hBemQmT3TfqdlkJrEskfIqH0K3K/pl54NGa8COyCj8Af+/7njJmf0uvYj5cLS0Avgvg+SGFSd/jSKdYVMPqlz08lJK1z0xGrA/4aYMwwsT16vDJX1JuyuQLaKp5gH/yYvMroLSsir8/UAreLzaUhhLWQq8euNTQcZRLCRveVH+uyoARA0BSk1QU3tJdI8/1JMxN68g70vnpiv0agckzUXE5FxBsSxXJ8ajdBT+YtMLJ7zJnRNudEjkXYSxlQAbTB15ERaSUwAB0YSU+5jZIUpVnhU4zWNPxqF8It+apK2tkd7tijtWm0FKWZK+Kn8= test",
"allowed_ips": "*",
"secrets": {
"MY_FOO_VAL": "iEeVK30h9fecLimeTYdBxkur9YDY+YuImCY0vyWVnMCqcsoz/kvmFUwiq5Qx1hKp5kS0XILS7xsltR4UIGLMHngqM1nuY0d2K6KmiiYl7bCGImtxOs9rhm2GlBq/H91FOOI58lLXQrPE/N099X+E+qlM6PnH5JcRaMObZ9lnR2n1OnlxCeisCHZLjve1iCkAd/+0F09atQNIAymC2CkL/wCtuWZTep4LoKnwHNfy4zHDX06l0u1t6twilA9OFiqQNU62AGaTCNK/E48QyyNYweRONkAO4d2++yMBDjslGuQLS8lpruvGn8slOjHN/rv/gJD8MEhcKvaSCzacHLw5LdWDU78Hs7HZcrp9XZ81qFJnIsrkGDO+hLiKvHd5hisJOpu/V1jCUoh/XRlVlOqmei2ZDKyqrJEixT+VAdK2okB9Ap+rQ1NqYtjtigif+lYwYKRUjroY92wCTYStKWFORUpkjOO5adHWQ10Eeced3FOLrTukXB0kPDM5lnTJgk/+"
}
}

View File

@ -0,0 +1,99 @@
"""Tests of client loader."""
import unittest
from sshecret.server import client_loader
from sshecret.utils import generate_client_object
from sshecret.testing import TestClientSpec, test_context
class TestFileTableBackend(unittest.TestCase):
"""Test the file table backend."""
def setUp(self) -> None:
"""Set up tests."""
self.test_dataset = [
TestClientSpec("webserver", {"SECRET_TOKEN": "mysecrettoken"}),
TestClientSpec("dbserver", {"DB_ROOT_PASSWORD": "mysecretpassword"}),
]
def test_init(self) -> None:
"""Test instance creation."""
with test_context(self.test_dataset) as testdir:
backend = client_loader.FileTableBackend(testdir)
self.assertGreater(len(backend.table), 0)
def test_lookup_name(self) -> None:
"""Test lookup name."""
with test_context(self.test_dataset) as testdir:
backend = client_loader.FileTableBackend(testdir)
webserver = backend.lookup_name("webserver")
self.assertIsNotNone(webserver)
self.assertEqual(webserver.name, "webserver")
def test_add_client(self) -> None:
"""Test whether it is possible to add a client."""
with test_context(self.test_dataset) as testdir:
backend = client_loader.FileTableBackend(testdir)
new_client = generate_client_object(
"backupserver", {"BACKUP_KEY": "mysecretbackupkey"}
)
backend.add_client(new_client)
expected_file = testdir / "backupserver.json"
self.assertTrue(expected_file.exists())
result = backend.lookup_name("backupserver")
self.assertIsNotNone(result)
def test_add_secret(self) -> None:
"""Test whether it is possible to add a secret."""
with test_context(self.test_dataset) as testdir:
backend = client_loader.FileTableBackend(testdir)
backend.add_secret("webserver", "OTHER_SECRET_TOKEN", "myothersecrettoken")
webserver = backend.lookup_name("webserver")
assert webserver is not None
self.assertIsNotNone(webserver.secrets.get("OTHER_SECRET_TOKEN"))
self.assertNotEqual(
webserver.secrets["OTHER_SECRET_TOKEN"], "myothersecrettoken"
)
backend.add_secret(
"dbserver", "UNENCRYPTED_THING", "thisiscleartext", encrypted=True
)
dbserver = backend.lookup_name("dbserver")
assert dbserver is not None
self.assertEqual(dbserver.secrets["UNENCRYPTED_THING"], "thisiscleartext")
def test_update_client(self) -> None:
"""Test update_client method."""
with test_context(self.test_dataset) as testdir:
backend = client_loader.FileTableBackend(testdir)
webserver = backend.lookup_name("webserver")
assert webserver is not None
webserver.allowed_ips = "192.0.2.1"
backend.update_client("webserver", webserver)
new_obj = backend.lookup_name("webserver")
assert new_obj is not None
self.assertEqual(new_obj.allowed_ips, "192.0.2.1")
def test_remove_client(self) -> None:
"""Test removal of client."""
with test_context(self.test_dataset) as testdir:
backend = client_loader.FileTableBackend(testdir)
backend.remove_client("webserver", persistent=False)
webserver = backend.lookup_name("webserver")
self.assertIsNone(webserver)
webserver_file = testdir / "webserver.json"
self.assertTrue(webserver_file.exists())
def test_remove_client_persistent(self) -> None:
"""Test removal of client."""
with test_context(self.test_dataset) as testdir:
backend = client_loader.FileTableBackend(testdir)
backend.remove_client("webserver", persistent=True)
webserver = backend.lookup_name("webserver")
self.assertIsNone(webserver)
webserver_file = testdir / "webserver.json"
self.assertFalse(webserver_file.exists())
if __name__ == "__main__":
unittest.main()

53
tests/test_crypto.py Normal file
View File

@ -0,0 +1,53 @@
"""Tests of the encryption methods."""
import os
from pathlib import Path
from typing import override
import unittest
from sshecret.crypto import (
load_public_key,
load_private_key,
encrypt_string,
decode_string,
)
def read_public_key() -> bytes:
"""Load public key."""
keyname = "testkey"
public_key_file = Path(os.path.dirname(__file__)) / f"{keyname}.pub"
with open(public_key_file, "rb") as f:
public_key = f.read()
return public_key
class TestBasicCrypto(unittest.TestCase):
"""Test basic crypto functionality."""
@override
def setUp(self) -> None:
"""Set up keys."""
keyname = "testkey"
self.private_key: str = os.path.join(os.path.dirname(__file__), keyname)
self.public_key: bytes = read_public_key()
def test_key_loading(self) -> None:
"""Test basic flow."""
public_key = load_public_key(self.public_key)
private_key = load_private_key(self.private_key)
self.assertEqual(True, True)
def test_encrypt_decrypt(self) -> None:
"""Test encryption and decryption."""
password = "MySecretPassword"
public_key = load_public_key(self.public_key)
encoded = encrypt_string(password, public_key)
private_key = load_private_key(self.private_key)
decoded = decode_string(encoded, private_key)
self.assertEqual(password, decoded)
if __name__ == "__main__":
unittest.main()

64
tests/test_keepass.py Normal file
View File

@ -0,0 +1,64 @@
"""Tests for the keepass password manager."""
import tempfile
import unittest
from pathlib import Path
from typing import override
from sshecret.types import BasePasswordReader, PasswordContext
from sshecret.keepass import KeepassManager
TEST_PASSWORD = "test_password"
class Rot13PasswordReader(BasePasswordReader):
"""This password reader returns the identifier backwards."""
@override
@classmethod
def get_password(cls, identifier: str) -> str:
"""Get password."""
return identifier[::-1]
class TestKeepass(unittest.TestCase):
"""Test the keepass password manager."""
def __init__(self, methodName: str = "runTest") -> None:
super().__init__(methodName)
self.reader_context: PasswordContext
@override
def setUp(self) -> None:
"""Set up testing."""
self.reader_context = PasswordContext(Rot13PasswordReader)
def test_db_create(self) -> None:
"""Test db creation."""
with tempfile.TemporaryDirectory() as testdir:
testdbfile = Path(testdir) / "test.kdbx"
testdb = KeepassManager.create_database(str(testdbfile.absolute()), self.reader_context)
self.assertTrue(testdbfile.exists())
# Close the file and reopen
testdb.close_database()
with self.assertRaises(RuntimeError):
testdb.keepass.version
testdb.open_database(self.reader_context)
self.assertIsNotNone(testdb.keepass)
def test_password_creation(self) -> None:
"""Test password creation."""
with tempfile.TemporaryDirectory() as testdir:
testdbfile = Path(testdir) / "test.kdbx"
testdb = KeepassManager.create_database(str(testdbfile.absolute()), self.reader_context)
password = testdb.generate_password("foobar")
testdb.close_database()
testdb.open_database(self.reader_context)
saved_password = testdb.get_password("foobar")
self.assertEqual(saved_password, password)
if __name__ == "__main__":
unittest.main()

View File

@ -0,0 +1,40 @@
"""Test passsword readers."""
from typing import override
import unittest
from io import StringIO
from unittest.mock import patch
from dotenv import load_dotenv
from sshecret.password_readers import InputPasswordReader, EnvironmentPasswordReader
class TestInputPasswordReader(unittest.TestCase):
"""Test input password reader."""
def test_reader(self) -> None:
"""Test reader."""
input_password = "testpassword"
with patch("getpass.getpass", return_value=input_password):
received_password = InputPasswordReader.get_password("test_password")
self.assertEqual(received_password, "testpassword")
class TestEnvPasswordReader(unittest.TestCase):
"""Test environment password reader."""
@override
def setUp(self) -> None:
"""Set up environment."""
env = StringIO("SSHECRET_test=secretthing")
load_dotenv(stream=env)
def test_env_loader(self) -> None:
"""Test environment loading."""
password = EnvironmentPasswordReader.get_password("test")
self.assertEqual(password, "secretthing")
if __name__ == "__main__":
unittest.main()

38
tests/testkey Normal file
View File

@ -0,0 +1,38 @@
-----BEGIN OPENSSH PRIVATE KEY-----
b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAABlwAAAAdzc2gtcn
NhAAAAAwEAAQAAAYEA0hOyTTItV3gRRTgvva92Nfa1kLFuZgsDwRMwSC6DaHbkYRFQzeyV
uusrLpq3bFSRnQum1OhE3aDkm+K8sjgsk6X8BGhHL0qtP1gSB16DXTnuzOtd+xmwA4MvX8
5fS9olyR+MMnCOm2rPmyq+Kkn4P8FpoLPFPiDPAsKdswRVerh1fRqVPxIv2GdriunwhD3l
zTBx5FNZ+ixzijAKYf1yQiophCFQBuWT6t81BbcfSIZl8KczP9fX4Oyy7I7y0smOmWYITR
onmWhZom7apEHEMoE6QK0YVNkzY6r1MG/ffKjlQue8JrNDwrI8UqkneHUUyI+1+f0LXphQ
tWaCNw6Tck+QOgXldbfwOpkmHGyNgXh2MaRnCKmf/QoaU+gFfT2nOmylQ9yxQJiseNWhnz
UzrxT5pEcWRZzCZslXy2Ql6QL5/XKHvS7qBWxsWhcyaCWr+0UY8E27z1akjUgygH3qlWe5
6d0epP8FIu1Sm6fcczZ6sHbuZIMJ16GHKCW++bfjAAAFmKQrG+WkKxvlAAAAB3NzaC1yc2
EAAAGBANITsk0yLVd4EUU4L72vdjX2tZCxbmYLA8ETMEgug2h25GERUM3slbrrKy6at2xU
kZ0LptToRN2g5JvivLI4LJOl/ARoRy9KrT9YEgdeg1057szrXfsZsAODL1/OX0vaJckfjD
Jwjptqz5sqvipJ+D/BaaCzxT4gzwLCnbMEVXq4dX0alT8SL9hna4rp8IQ95c0wceRTWfos
c4owCmH9ckIqKYQhUAblk+rfNQW3H0iGZfCnMz/X1+DssuyO8tLJjplmCE0aJ5loWaJu2q
RBxDKBOkCtGFTZM2Oq9TBv33yo5ULnvCazQ8KyPFKpJ3h1FMiPtfn9C16YULVmgjcOk3JP
kDoF5XW38DqZJhxsjYF4djGkZwipn/0KGlPoBX09pzpspUPcsUCYrHjVoZ81M68U+aRHFk
WcwmbJV8tkJekC+f1yh70u6gVsbFoXMmglq/tFGPBNu89WpI1IMoB96pVnuendHqT/BSLt
Upun3HM2erB27mSDCdehhyglvvm34wAAAAMBAAEAAAGAf04mV/eXWJFPTfYtoDKLXUpjXw
rXDwmPvdpGAQgG5DBgV55prFC5r+tBYN2rV/+rulLMR+t1iCUvRHRTy2CVSuhkX7tdoAAO
Gvvg+QxCaSVpXE8pxbgcXRSLifCC+XF6QnZWvF5PXUmOA8cUNIZc5S3tN9CZL/wr1s1fSZ
PPxS2xLR4F4ZHA4tBRcH4yHcFw2DaKXkZQmXWEkvJn6FfxfL0WKZcSawuG5udat1rwnz+q
2PpJ6V+A2DI4f3hlGG3BXlYtlGBpUo0Mt3D8LVHvMQbk8HDVtZbhgo1fIpGId4UwNsfmsR
HLfL7NKspQOBS1WRdp1qNZi1ky78p3fLyqpPc5QkfBXRg2C+ud61Z1UJKmYt9xdOcjYy+V
4e1pN9CvgR6+8EsQZAsGtvsFcZNyDXnRWuAN+0vO0lYzBppuQN0isatucbaqiIAMk41GJO
xxL+0528s+iCUwiHREXpVORyXhT+rtvwNTqkFJPrW8E6bkI9wHoUweSRTJ17o8ogjZAAAA
wGMd5CU6EntbtlDkhr0CvYuCfjEE6npumHCaaZDocR+2qUFQypw/E/wCZHuz/XMDQNXopH
g1LVXlYxHTo7FTtzru3mfDBZaUngt2iMb3pSap7jbmzCPU31eunklMrCXFbw7SqmTValIQ
8lIizCzjstlZRSbMCtWmxHno8NBeYfRhF6gOpHL++KfOCL+PLKjD9CZkeHnWu3Sgp5LU+Z
F2XxkY5aLKga6QGGqEQ71HJWJT1vMLbQ3k1FtMn3BShX0HigAAAMEA8RlZ2gR3omxLbWpP
oaiXK/YMfnEJa8iDPkl5FOxDKG+UXzBCCS+WtXaVqQ3HJb6k6NIOo+XSNX302Fbb9Ev39T
9KNF5hNbXTnso9339qo8Z4onG4I02UU5jPmDvQlG4xWy0CxumtzpSzAVqZbhiB+z+Iobqr
gj6gJD3FyN6cuII3MDUgSbJCKCSNjPW6+Yt0E27DdIrEwP+thTpWVxbePqNyn+msw/8zNa
qyu4Mz0u9oPO4bIzNxvoW33XIQ9jfdAAAAwQDfD4Q+ltIQYQ+7S3ar2SyYDAocT7rxalFr
mn6dhi6u9YAFt93+wFJ2rhlZ4W/ePdz4/BBGkVVw12JCCLLuoiZRHBFwfTON75DfetCMrE
KbBNImLPNcwpQ8cSzq2TleWJbK7EaOVyI2CjjeatSld94kJf9OR3CqVUUZaDs/uq1SzJtI
ck4a1DWLdxytplhHENXrL7ve1BBi0LyvauA5P6vBtcdTXEbxZbMx//u5Pf4YufczFJ8Pcm
RWBAZWMcdykr8AAAAfZWlzaW5nQEFsbGFucy1NYWNCb29rLUFpci5sb2NhbAECAwQ=
-----END OPENSSH PRIVATE KEY-----

1
tests/testkey.pub Normal file
View File

@ -0,0 +1 @@
ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABgQDSE7JNMi1XeBFFOC+9r3Y19rWQsW5mCwPBEzBILoNoduRhEVDN7JW66ysumrdsVJGdC6bU6ETdoOSb4ryyOCyTpfwEaEcvSq0/WBIHXoNdOe7M6137GbADgy9fzl9L2iXJH4wycI6bas+bKr4qSfg/wWmgs8U+IM8Cwp2zBFV6uHV9GpU/Ei/YZ2uK6fCEPeXNMHHkU1n6LHOKMAph/XJCKimEIVAG5ZPq3zUFtx9IhmXwpzM/19fg7LLsjvLSyY6ZZghNGieZaFmibtqkQcQygTpArRhU2TNjqvUwb998qOVC57wms0PCsjxSqSd4dRTIj7X5/QtemFC1ZoI3DpNyT5A6BeV1t/A6mSYcbI2BeHYxpGcIqZ/9ChpT6AV9Pac6bKVD3LFAmKx41aGfNTOvFPmkRxZFnMJmyVfLZCXpAvn9coe9LuoFbGxaFzJoJav7RRjwTbvPVqSNSDKAfeqVZ7np3R6k/wUi7VKbp9xzNnqwdu5kgwnXoYcoJb75t+M= eising@Allans-MacBook-Air.local