diff --git a/packages/sshecret-admin/src/sshecret_admin/services/keepass.py b/packages/sshecret-admin/src/sshecret_admin/services/keepass.py index f8682a3..7eeb7e1 100644 --- a/packages/sshecret-admin/src/sshecret_admin/services/keepass.py +++ b/packages/sshecret-admin/src/sshecret_admin/services/keepass.py @@ -43,7 +43,10 @@ class PasswordContext: ) if entry and overwrite: entry.password = secret - elif entry: + self.keepass.save() + return + + if entry: raise ValueError("Error: A secret with this name already exists.") LOG.debug("Add secret entry to keepass: %s", entry_name) entry = self.keepass.add_entry( diff --git a/packages/sshecret-admin/src/sshecret_admin/services/models.py b/packages/sshecret-admin/src/sshecret_admin/services/models.py index 2b8c073..8ea93e8 100644 --- a/packages/sshecret-admin/src/sshecret_admin/services/models.py +++ b/packages/sshecret-admin/src/sshecret_admin/services/models.py @@ -85,7 +85,7 @@ class SecretUpdate(BaseModel): """ if isinstance(self.value, str): return self.value - secret = secrets.token_urlsafe(self.value.length) + secret = secrets.token_urlsafe(32)[:self.value.length] return secret diff --git a/packages/sshecret-admin/src/sshecret_admin/testing.py b/packages/sshecret-admin/src/sshecret_admin/testing.py index b189700..254dc96 100644 --- a/packages/sshecret-admin/src/sshecret_admin/testing.py +++ b/packages/sshecret-admin/src/sshecret_admin/testing.py @@ -1,40 +1,17 @@ -"""Testing helper functions.""" +"""Testing helper functions. + +This allows creation of a user from within tests. +""" import os import bcrypt -from sqlmodel import Session from sshecret_admin.auth.models import User -def get_test_user_details() -> tuple[str, str]: - """Resolve testing user.""" - test_user = os.getenv("SSHECRET_TEST_USERNAME") or "test" - test_password = os.getenv("SSHECRET_TEST_PASSWORD") or "test" - if test_user and test_password: - return (test_user, test_password) - - raise RuntimeError( - "Error: No testing username and password registered in environment." - ) - - -def is_testing_mode() -> bool: - """Check if we're running in test mode. - - We will determine this by looking for the environment variable SSHECRET_TEST_MODE=1 - """ - if os.environ.get("PYTEST_VERSION") is not None: - return True - return False - - def create_test_user(session: Session, username: str, password: str) -> User: - """Create test user. - - We create a user with whatever username and password is supplied. - """ + """Create test user.""" salt = bcrypt.gensalt() hashed_password = bcrypt.hashpw(password.encode(), salt) user = User(username=username, hashed_password=hashed_password.decode()) diff --git a/packages/sshecret-sshd/src/sshecret_sshd/__init__.py b/packages/sshecret-sshd/src/sshecret_sshd/__init__.py index 66941f4..e69de29 100644 --- a/packages/sshecret-sshd/src/sshecret_sshd/__init__.py +++ b/packages/sshecret-sshd/src/sshecret_sshd/__init__.py @@ -1,2 +0,0 @@ -def hello() -> str: - return "Hello from sshecret-sshd!" diff --git a/packages/sshecret-sshd/src/sshecret_sshd/constants.py b/packages/sshecret-sshd/src/sshecret_sshd/constants.py index df1776f..97c0730 100644 --- a/packages/sshecret-sshd/src/sshecret_sshd/constants.py +++ b/packages/sshecret-sshd/src/sshecret_sshd/constants.py @@ -6,7 +6,7 @@ ERROR_SOURCE_IP_NOT_ALLOWED = ( ) ERROR_NO_PUBLIC_KEY = "Error: No valid public key received." ERROR_INVALID_KEY_TYPE = "Error: Invalid key type: Only RSA keys are supported." -ERROR_UNKNOWN_COMMAND = "Error: The given command was not understood." +ERROR_UNKNOWN_COMMAND = "Error: Unsupported command." SERVER_KEY_TYPE = "ed25519" ERROR_BACKEND_ERROR = "Error: Unexpected response or error from backend" ERROR_INFO_BACKEND_GONE = "Unexpected error: Backend connection lost." diff --git a/packages/sshecret-sshd/src/sshecret_sshd/ssh_server.py b/packages/sshecret-sshd/src/sshecret_sshd/ssh_server.py index 6ba444b..517fb73 100644 --- a/packages/sshecret-sshd/src/sshecret_sshd/ssh_server.py +++ b/packages/sshecret-sshd/src/sshecret_sshd/ssh_server.py @@ -1,5 +1,6 @@ """SSH Server implementation.""" +from asyncio import _register_task import logging import asyncssh @@ -66,12 +67,13 @@ async def audit_event( client: Client | None = None, origin: str | None = None, secret: str | None = None, + **data: str, ) -> None: """Add an audit event.""" if not origin: origin = "UNKNOWN" await backend.audit(SubSystem.SSHD).write_async( - operation, message, origin, client, secret=None, secret_name=secret + operation, message, origin, client, secret=None, secret_name=secret, **data ) @@ -158,22 +160,14 @@ async def get_stdin_public_key(process: asyncssh.SSHServerProcess[str]) -> str | public_key = verify_key_input(line.rstrip("\n")) if public_key: break - process.stdout.write("Invalid key. Must be RSA Public Key.\n") + raise CommandError(constants.ERROR_INVALID_KEY_TYPE) except asyncssh.BreakReceived: pass - process.stdout.write("OK\n") + else: + process.stdout.write("OK\n") return public_key -def get_info_user_and_public_key( - process: asyncssh.SSHServerProcess[str], -) -> tuple[str | None, str | None]: - """Get username and public_key from process.""" - username = cast("str | None", process.get_extra_info("provided_username", None)) - public_key = cast("str | None", process.get_extra_info("provided_key", None)) - return (username, public_key) - - async def register_client( process: asyncssh.SSHServerProcess[str], backend: SshecretBackend, @@ -381,8 +375,14 @@ class AsshyncServer(asyncssh.SSHServer): """ LOG.debug("Started authentication flow for user %s", username) - if not self._conn: - return True + allowed_registration_sources: list[IPvAnyNetwork] = [] + if self.registration_enabled and not self.allow_registration_from: + allowed_registration_sources.append(ipaddress.IPv4Network("0.0.0.0/0")) + allowed_registration_sources.append(ipaddress.IPv6Network("::/0")) + elif self.registration_enabled and self.allow_registration_from: + allowed_registration_sources = self.allow_registration_from + + assert self._conn is not None, "Error: No connection found." if client := await self.backend.get_client(username): LOG.debug("Client lookup sucessful: %r", client) if key := self.resolve_client_key(client): @@ -397,33 +397,43 @@ class AsshyncServer(asyncssh.SSHServer): client, origin=self.client_ip, ) - LOG.warning("Client connection denied due to policy.") - elif self.registration_enabled: - self._conn.set_extra_info(provided_username=username) - self._conn.set_extra_info( - allow_registration_from=self.allow_registration_from - ) - LOG.warning( - "Registration enabled, and client is not recognized. Bypassing authentication." - ) - return False + LOG.warning( + "Client connection denied. Source: %s, policy: %r.", + self.client_ip, + client.policies, + ) + elif allowed_registration_sources and self.client_ip: + client_ip = ipaddress.ip_address(self.client_ip) + for network in allowed_registration_sources: + if client_ip.version != network.version: + continue + if client_ip in network: + self._conn.set_extra_info(provided_username=username) + self._conn.set_extra_info( + allow_registration_from=self.allow_registration_from + ) + LOG.info( + "Registration enabled, and client is not recognized. Bypassing authentication." + ) + return False + else: + await audit_event( + self.backend, + "Received registration command from unauthorized subnet.", + Operation.DENY, + origin=self.client_ip, + username=username, + ) + + LOG.warning( + "Registration not permitted for username=%s, origin: %s", + username, + self.client_ip, + ) LOG.debug("Continuing to regular authentication") return True - @override - def validate_public_key(self, username: str, key: asyncssh.SSHKey) -> bool: - """Intercept public key validation.""" - if not self._conn: - return False - - # get an export of the provided public key. - keystring = key.export_public_key().decode() - self._conn.set_extra_info(provided_username=username) - self._conn.set_extra_info(provided_key=keystring) - LOG.debug("Intercepting user public key") - return False - def resolve_client_key(self, client: Client) -> asyncssh.SSHAuthorizedKeys | None: """Resolve the client key. @@ -492,7 +502,9 @@ async def run_ssh_server( return server -async def start_sshecret_sshd(settings: ServerSettings | None = None) -> asyncssh.SSHAcceptor: +async def start_sshecret_sshd( + settings: ServerSettings | None = None, +) -> asyncssh.SSHAcceptor: """Start the server.""" server_key = get_server_key()