Compare commits

...

6 Commits

Author SHA1 Message Date
86ad1a13fb Update dependencies 2025-05-18 21:34:27 +02:00
dcf0b4274c Refactor command handling
This now supports usage/help texts
2025-05-18 17:56:53 +02:00
26ef9b45d4 Create command dispatching classes 2025-05-18 09:40:09 +02:00
64536b40f6 Create exception hierarchy 2025-05-18 09:39:57 +02:00
fd2922fde8 Update tests 2025-05-16 17:38:21 +02:00
6daceef913 Fix various issues 2025-05-16 17:36:45 +02:00
49 changed files with 2659 additions and 826 deletions

View File

@ -7,6 +7,7 @@ source =
packages/sshecret-sshd/src/sshecret_sshd
omit =
packages/sshecret-backend/src/sshecret_backend/frontend/*
*/__init__.py
*/types.py
*/testing.py
@ -17,7 +18,7 @@ omit =
*/test_*.py
*/conftest.py
*/site-packages/*
concurrency = multiprocessing
concurrency = thread
[report]
show_missing = True

View File

@ -5,33 +5,33 @@
<td
class="p-4 text-sm font-normal text-gray-500 whitespace-nowrap dark:text-gray-400"
>
{{ client.name }}
{{-client.name -}}
</td>
<td
class="p-4 text-base font-medium text-gray-900 whitespace-nowrap dark:text-white"
>
{{ client.id }}
{{- client.id -}}
</td>
<td
class="max-w-sm p-4 overflow-hidden text-base font-normal text-gray-500 truncate xl:max-w-xs dark:text-gray-400"
>
{{ client.description }}
{{- client.description -}}
</td>
<td
class="max-w-sm p-4 text-base font-normal text-gray-500 truncate xl:max-w-xs dark:text-gray-400"
>
{{ client.secrets|length }}
{{- client.secrets|length -}}
</td>
<td
class="max-w-sm p-4 text-base font-normal text-gray-500 truncate xl:max-w-xs dark:text-gray-400"
>
{{ client.policies|join(', ') }}
{{- client.policies|join(', ') -}}
</td>
<td class="p-4 space-x-2 whitespace-nowrap">
<button
type="button"
id="updateClientButton"
id="updateClientButton-{{ client.id }}"
data-drawer-target="drawer-update-client-{{ client.id }}"
data-drawer-show="drawer-update-client-{{ client.id }}"
aria-controls="drawer-update-client-{{ client.id }}"
@ -57,7 +57,7 @@
</button>
<button
type="button"
id="deleteClientButton"
id="deleteClientButton-{{ client.id }}"
data-drawer-target="drawer-delete-client-{{ client.id }}"
data-drawer-show="drawer-delete-client-{{ client.id }}"
aria-controls="drawer-delete-client-{{ client.id }}"

View File

@ -33,113 +33,6 @@
<span class="sr-only">Close menu</span>
</button>
<form hx-post="/clients/" hx-target="#clientContent">
<div class="space-y-4">
<div>
<label
for="name"
class="block mb-2 text-sm font-medium text-gray-900 dark:text-white"
>Name</label
>
<input
type="text"
name="name"
id="name"
class="bg-gray-50 border border-gray-300 text-gray-900 text-sm rounded-lg focus:ring-primary-600 focus:border-primary-600 block w-full p-2.5 dark:bg-gray-700 dark:border-gray-600 dark:placeholder-gray-400 dark:text-white dark:focus:ring-primary-500 dark:focus:border-primary-500"
placeholder="Client name"
required=""
/>
</div>
<div>
<label
for="description"
class="block mb-2 text-sm font-medium text-gray-900 dark:text-white"
>Description</label
>
<input
type="text"
name="description"
id="description"
class="bg-gray-50 border border-gray-300 text-gray-900 text-sm rounded-lg focus:ring-primary-600 focus:border-primary-600 block w-full p-2.5 dark:bg-gray-700 dark:border-gray-600 dark:placeholder-gray-400 dark:text-white dark:focus:ring-primary-500 dark:focus:border-primary-500"
placeholder="Client description"
/>
</div>
<div>
<label
for="sources"
class="block mb-2 text-sm font-medium text-gray-900 dark:text-white"
>Allowed subnets or IPs</label
>
<p
id="helper-text-explanation"
class="mt-2 text-sm text-gray-500 dark:text-gray-400"
>
Separate multiple entries with comma.
</p>
<input
type="text"
name="sources"
id="sources"
class="bg-gray-50 border border-gray-300 text-gray-900 text-sm rounded-lg focus:ring-primary-600 focus:border-primary-600 block w-full p-2.5 dark:bg-gray-700 dark:border-gray-600 dark:placeholder-gray-400 dark:text-white dark:focus:ring-primary-500 dark:focus:border-primary-500"
placeholder="0.0.0.0/0"
value="0.0.0.0/0"
hx-post="/clients/validate/source"
hx-target="#clientSourceValidation"
/>
<span id="clientSourceValidation"></span>
</div>
<div>
<label
for="public_key"
class="block mb-2 text-sm font-medium text-gray-900 dark:text-white"
>Public Key</label
>
<textarea
id="public_key"
name="public_key"
rows="4"
class="block p-2.5 w-full text-sm text-gray-900 bg-gray-50 rounded-lg border border-gray-300 focus:ring-primary-500 focus:border-primary-500 dark:bg-gray-700 dark:border-gray-600 dark:placeholder-gray-400 dark:text-white dark:focus:ring-primary-500 dark:focus:border-primary-500"
placeholder="Enter RSA SSH Public Key here"
hx-post="/clients/validate/public_key"
hx-target="#clientPublicKeyValidation"
></textarea>
<span id="clientPublicKeyValidation"></span>
</div>
<div
class="bottom-0 left-0 flex justify-center w-full pb-4 space-x-4 md:px-4 md:absolute"
>
<button
type="submit"
class="text-white w-full justify-center bg-primary-700 hover:bg-primary-800 focus:ring-4 focus:outline-none focus:ring-primary-300 font-medium rounded-lg text-sm px-5 py-2.5 text-center dark:bg-primary-600 dark:hover:bg-primary-700 dark:focus:ring-primary-800"
>
Add Client
</button>
<button
type="button"
data-drawer-dismiss="drawer-create-client-default"
aria-controls="drawer-create-client-default"
class="inline-flex w-full justify-center text-gray-500 items-center bg-white hover:bg-gray-100 focus:ring-4 focus:outline-none focus:ring-primary-300 rounded-lg border border-gray-200 text-sm font-medium px-5 py-2.5 hover:text-gray-900 focus:z-10 dark:bg-gray-700 dark:text-gray-300 dark:border-gray-500 dark:hover:text-white dark:hover:bg-gray-600 dark:focus:ring-gray-600"
>
<svg
aria-hidden="true"
class="w-5 h-5 -ml-1 sm:mr-1"
fill="none"
stroke="currentColor"
viewBox="0 0 24 24"
xmlns="http://www.w3.org/2000/svg"
>
<path
stroke-linecap="round"
stroke-linejoin="round"
stroke-width="2"
d="M6 18L18 6M6 6l12 12"
></path>
</svg>
Cancel
</button>
</div>
</div>
{% include '/clients/drawer_client_create_inner.html.j2' %}
</form>
</div>

View File

@ -0,0 +1,108 @@
<div class="space-y-4">
<div>
<label
for="name"
class="block mb-2 text-sm font-medium text-gray-900 dark:text-white"
>Name</label
>
<input
type="text"
name="name"
id="name"
class="bg-gray-50 border border-gray-300 text-gray-900 text-sm rounded-lg focus:ring-primary-600 focus:border-primary-600 block w-full p-2.5 dark:bg-gray-700 dark:border-gray-600 dark:placeholder-gray-400 dark:text-white dark:focus:ring-primary-500 dark:focus:border-primary-500"
placeholder="Client name"
required=""
/>
</div>
<div>
<label
for="description"
class="block mb-2 text-sm font-medium text-gray-900 dark:text-white"
>Description</label
>
<input
type="text"
name="description"
id="description"
class="bg-gray-50 border border-gray-300 text-gray-900 text-sm rounded-lg focus:ring-primary-600 focus:border-primary-600 block w-full p-2.5 dark:bg-gray-700 dark:border-gray-600 dark:placeholder-gray-400 dark:text-white dark:focus:ring-primary-500 dark:focus:border-primary-500"
placeholder="Client description"
/>
</div>
<div>
<label
for="sources"
class="block mb-2 text-sm font-medium text-gray-900 dark:text-white"
>Allowed subnets or IPs</label
>
<p
id="helper-text-explanation"
class="mt-2 text-sm text-gray-500 dark:text-gray-400"
>
Separate multiple entries with comma.
</p>
<input
type="text"
name="sources"
id="sources"
class="bg-gray-50 border border-gray-300 text-gray-900 text-sm rounded-lg focus:ring-primary-600 focus:border-primary-600 block w-full p-2.5 dark:bg-gray-700 dark:border-gray-600 dark:placeholder-gray-400 dark:text-white dark:focus:ring-primary-500 dark:focus:border-primary-500"
placeholder="0.0.0.0/0"
value="0.0.0.0/0"
hx-post="/clients/validate/source"
hx-target="#clientSourceValidation"
/>
<span id="clientSourceValidation"></span>
</div>
<div>
<label
for="public_key"
class="block mb-2 text-sm font-medium text-gray-900 dark:text-white"
>Public Key</label
>
<textarea
id="public_key"
name="public_key"
rows="4"
class="block p-2.5 w-full text-sm text-gray-900 bg-gray-50 rounded-lg border border-gray-300 focus:ring-primary-500 focus:border-primary-500 dark:bg-gray-700 dark:border-gray-600 dark:placeholder-gray-400 dark:text-white dark:focus:ring-primary-500 dark:focus:border-primary-500"
placeholder="Enter RSA SSH Public Key here"
hx-post="/clients/validate/public_key"
hx-target="#clientPublicKeyValidation"
></textarea>
<span id="clientPublicKeyValidation"></span>
</div>
<div
class="bottom-0 left-0 flex justify-center w-full pb-4 space-x-4 md:px-4 md:absolute"
>
<button
type="submit"
class="text-white w-full justify-center bg-primary-700 hover:bg-primary-800 focus:ring-4 focus:outline-none focus:ring-primary-300 font-medium rounded-lg text-sm px-5 py-2.5 text-center dark:bg-primary-600 dark:hover:bg-primary-700 dark:focus:ring-primary-800"
>
Add Client
</button>
<button
type="button"
data-drawer-dismiss="drawer-create-client-default"
aria-controls="drawer-create-client-default"
class="inline-flex w-full justify-center text-gray-500 items-center bg-white hover:bg-gray-100 focus:ring-4 focus:outline-none focus:ring-primary-300 rounded-lg border border-gray-200 text-sm font-medium px-5 py-2.5 hover:text-gray-900 focus:z-10 dark:bg-gray-700 dark:text-gray-300 dark:border-gray-500 dark:hover:text-white dark:hover:bg-gray-600 dark:focus:ring-gray-600"
>
<svg
aria-hidden="true"
class="w-5 h-5 -ml-1 sm:mr-1"
fill="none"
stroke="currentColor"
viewBox="0 0 24 24"
xmlns="http://www.w3.org/2000/svg"
>
<path
stroke-linecap="round"
stroke-linejoin="round"
stroke-width="2"
d="M6 18L18 6M6 6l12 12"
></path>
</svg>
Cancel
</button>
</div>
</div>

View File

@ -150,6 +150,7 @@
hx-delete="/clients/{{ client.id }}"
hx-confirm="Are you sure?"
hx-target="#clientContent"
id="delete-button-{{ client.id }}"
>
<svg

View File

@ -4,7 +4,7 @@
<div class="overflow-x-auto">
<div class="inline-block min-w-full align-middle">
<div class="overflow-hidden shadow">
<table class="min-w-full divide-y divide-gray-200 table-fixed dark:divide-gray-600">
<table class="min-w-full divide-y divide-gray-200 table-fixed dark:divide-gray-600" id="clientListTable">
<thead class="bg-gray-100 dark:bg-gray-700">
<tr>

View File

@ -1,38 +1,28 @@
{% extends "/dashboard/_base.html" %} {% block content %}
<div class="px-4 pt-6">
<div class="py-8 px-4 mt-4 mx-auto max-w-screen-xl text-center lg:py-16">
<div class="py-8 px-4 mt-4 mx-auto max-w-screen-xl text-center lg:py-16">
<h1 class="mb-4 text-4xl font-extrabold leading-none tracking-tight text-gray-900 md:text-5xl lg:text-6xl dark:text-white">Welcome to Sshecret</h1>
<div class="flex flex-col space-y-4 sm:flex-row sm:justify-center sm:space-y-0">
<button id="createClientButton" class="text-white bg-primary-700 hover:bg-primary-800 focus:ring-4 focus:ring-primary-300 font-medium rounded-lg text-sm px-3 py-2.5 dark:bg-primary-600 dark:hover:bg-primary-700 focus:outline-none dark:focus:ring-primary-800" type="button" data-drawer-target="drawer-create-client-default" data-drawer-show="drawer-create-client-default" aria-controls="drawer-create-client-default" data-drawer-placement="right">
Add new client
</button>
<button id="createSecretButton" class="text-white bg-primary-700 hover:bg-primary-800 focus:ring-4 focus:ring-primary-300 font-medium rounded-lg text-sm px-3 py-2.5 ms-2 dark:bg-primary-600 dark:hover:bg-primary-700 focus:outline-none dark:focus:ring-primary-800" type="button" data-drawer-target="drawer-create-secret-default" data-drawer-show="drawer-create-secret-default" aria-controls="drawer-create-secret-default" data-drawer-placement="right">
Add new secret
</button>
</div>
</div>
<div class="grid w-full grid-cols-1 gap-4 mt-4 xl:grid-cols-2 2xl:grid-cols-3">
<div class="items-center justify-between p-4 bg-white border border-gray-200 rounded-lg shadow-sm sm:flex dark:border-gray-700 sm:p-6 dark:bg-gray-800">
<div class="grid w-full grid-cols-1 gap-4 mt-4 xl:grid-cols-2 2xl:grid-cols-3">
<div class="items-center justify-between p-4 bg-white border border-gray-200 rounded-lg shadow-sm sm:flex dark:border-gray-700 sm:p-6 dark:bg-gray-800" id="dashboard-stats-panel">
<div class="w-full">
<h3 class="text-base text-gray-500 dark:text-gray-400">Stats</h3>
<dl class="max-w-md text-gray-900 divide-y divide-gray-200 dark:text-white dark:divide-gray-700">
<div class="flex flex-col pb-3">
<dt class="mb-1 text-gray-500 text-xs dark:text-gray-400">Clients</dt>
<dd class="text-lg font-semibold">{{ stats.clients }}</dd>
<dd class="text-lg font-semibold" id="stats-client-count">{{ stats.clients }}</dd>
</div>
<div class="flex flex-col py-3">
<dt class="mb-1 text-gray-500 md:text-xs dark:text-gray-400">Secrets</dt>
<dd class="text-lg font-semibold">{{ stats.secrets }}</dd>
<dd class="text-lg font-semibold" id="stats-secret-count">{{ stats.secrets }}</dd>
</div>
<div class="flex flex-col py-3">
<dt class="mb-1 text-gray-500 md:text-xs dark:text-gray-400">Audit Events</dt>
<dd class="text-lg font-semibold">{{ stats.audit_events }}</dd>
<dd class="text-lg font-semibold" id="stats-audit-count">{{ stats.audit_events }}</dd>
</div>
</dl>
</div>
@ -41,7 +31,7 @@
<div class="w-full">
<h3 class="text-base font-normal text-gray-500 dark:text-gray-400">Last Login Events</h3>
{% if last_login_events.total > 0 %}
<table class="min-w-full divide-y divide-gray-200 dark:divide-gray-600">
<table class="min-w-full divide-y divide-gray-200 dark:divide-gray-600" id="last-login-events">
<thead class="bg-gray-50 dark:bg-gray-700">
<tr>
<th scope="col" class="p-4 text-xs font-medium tracking-wider text-left text-gray-500 uppercase dark:text-white">Timestamp</th>
@ -52,15 +42,14 @@
</thead>
<tbody class="bg-white dark:bg-gray-800">
{% for entry in last_login_events.results | list %}
<tr
class="{{ loop.cycle('', 'bg-gray-50 dark:bg-gray-700 ') }}hover:bg-gray-100 dark:hover:bg-gray-700"
id="login-entry-{{ entry.id }}"
>
<tr class="{{ loop.cycle('', 'bg-gray-50 dark:bg-gray-700 ') }}hover:bg-gray-100 dark:hover:bg-gray-700" id="login-entry-{{ entry.id }}">
<td class="p-4 text-sm font-normal text-gray-900 whitespace-nowrap dark:text-white">
<p>{{ entry.timestamp }}<button data-popover-target="popover-login-entry-{{ entry.id }}" data-popover-placement="bottom-end" type="button"><svg class="w-4 h-4 ms-2 text-gray-400 hover:text-gray-500" aria-hidden="true" fill="currentColor" viewBox="0 0 20 20" xmlns="http://www.w3.org/2000/svg"><path fill-rule="evenodd" d="M18 10a8 8 0 11-16 0 8 8 0 0116 0zm-8-3a1 1 0 00-.867.5 1 1 0 11-1.731-1A3 3 0 0113 8a3.001 3.001 0 01-2 2.83V11a1 1 0 11-2 0v-1a1 1 0 011-1 1 1 0 100-2zm0 8a1 1 0 100-2 1 1 0 000 2z" clip-rule="evenodd"></path></svg><span class="sr-only">Show information</span></button>
<p>{{ entry.timestamp }}<button data-popover-target="popover-login-entry-{{ entry.id }}" data-popover-placement="bottom-end" type="button" id="btn-popover-login-entry-{{ entry.id }}"><svg class="w-4 h-4 ms-2 text-gray-400 hover:text-gray-500" aria-hidden="true" fill="currentColor" viewBox="0 0 20 20" xmlns="http://www.w3.org/2000/svg">
<path fill-rule="evenodd" d="M18 10a8 8 0 11-16 0 8 8 0 0116 0zm-8-3a1 1 0 00-.867.5 1 1 0 11-1.731-1A3 3 0 0113 8a3.001 3.001 0 01-2 2.83V11a1 1 0 11-2 0v-1a1 1 0 011-1 1 1 0 100-2zm0 8a1 1 0 100-2 1 1 0 000 2z" clip-rule="evenodd"></path>
</svg><span class="sr-only">Show information</span></button>
</p>
<div data-popover id="popover-login-entry-{{entry.id}}" role="tooltip" class="absolute z-10 invisible inline-block text-sm text-gray-500 transition-opacity duration-300 bg-white border border-gray-200 rounded-lg shadow-xs opacity-0 w-80 dark:bg-gray-800 dark:border-gray-600 dark:text-gray-400">
<div data-popover id="popover-login-entry-{{entry.id}}" role="tooltip" class="absolute z-10 invisible inline-block text-sm text-gray-500 transition-opacity duration-300 bg-white border border-gray-200 rounded-lg shadow-xs opacity-0 w-80 dark:bg-gray-800 dark:border-gray-600 dark:text-gray-400 popover-login-entry">
<dl class="max-w-md text-gray-900 divide-y divide-gray-200 dark:text-white dark:divide-gray-700 px-2 py-2">
<div class="flex flex-col pb-3">
<dt class="mb-1 text-gray-500 md:text-xs dark:text-gray-400">ID</dt>
@ -148,7 +137,7 @@
<div class="w-full">
<h3 class="text-base font-normal text-gray-500 dark:text-gray-400">Last Audit Events</h3>
{% if last_audit_events.total > 0 %}
<table class="min-w-full divide-y divide-gray-200 dark:divide-gray-600">
<table class="min-w-full divide-y divide-gray-200 dark:divide-gray-600" id="last-audit-events">
<thead class="bg-gray-50 dark:bg-gray-700">
<tr>
<th scope="col" class="p-4 text-xs font-medium tracking-wider text-left text-gray-500 uppercase dark:text-white">Timestamp</th>
@ -159,12 +148,11 @@
</thead>
<tbody class="bg-white dark:bg-gray-800">
{% for entry in last_audit_events.results | list %}
<tr
class="{{ loop.cycle('', 'bg-gray-50 dark:bg-gray-700 ') }}hover:bg-gray-100 dark:hover:bg-gray-700"
id="login-entry-{{ entry.id }}"
>
<tr class="{{ loop.cycle('', 'bg-gray-50 dark:bg-gray-700 ') }}hover:bg-gray-100 dark:hover:bg-gray-700" id="login-entry-{{ entry.id }}">
<td class="p-4 text-sm font-normal text-gray-900 whitespace-nowrap dark:text-white">
<p>{{ entry.timestamp }}<button data-popover-target="popover-audit-entry-{{ entry.id }}" data-popover-placement="bottom-end" type="button"><svg class="w-4 h-4 ms-2 text-gray-400 hover:text-gray-500" aria-hidden="true" fill="currentColor" viewBox="0 0 20 20" xmlns="http://www.w3.org/2000/svg"><path fill-rule="evenodd" d="M18 10a8 8 0 11-16 0 8 8 0 0116 0zm-8-3a1 1 0 00-.867.5 1 1 0 11-1.731-1A3 3 0 0113 8a3.001 3.001 0 01-2 2.83V11a1 1 0 11-2 0v-1a1 1 0 011-1 1 1 0 100-2zm0 8a1 1 0 100-2 1 1 0 000 2z" clip-rule="evenodd"></path></svg><span class="sr-only">Show information</span></button></p>
<p>{{ entry.timestamp }}<button data-popover-target="popover-audit-entry-{{ entry.id }}" data-popover-placement="bottom-end" type="button"><svg class="w-4 h-4 ms-2 text-gray-400 hover:text-gray-500" aria-hidden="true" fill="currentColor" viewBox="0 0 20 20" xmlns="http://www.w3.org/2000/svg">
<path fill-rule="evenodd" d="M18 10a8 8 0 11-16 0 8 8 0 0116 0zm-8-3a1 1 0 00-.867.5 1 1 0 11-1.731-1A3 3 0 0113 8a3.001 3.001 0 01-2 2.83V11a1 1 0 11-2 0v-1a1 1 0 011-1 1 1 0 100-2zm0 8a1 1 0 100-2 1 1 0 000 2z" clip-rule="evenodd"></path>
</svg><span class="sr-only">Show information</span></button></p>
<div data-popover id="popover-audit-entry-{{entry.id}}" role="tooltip" class="absolute z-10 invisible inline-block text-sm text-gray-500 transition-opacity duration-300 bg-white border border-gray-200 rounded-lg shadow-xs opacity-0 w-80 dark:bg-gray-800 dark:border-gray-600 dark:text-gray-400">
<dl class="max-w-md text-gray-900 divide-y divide-gray-200 dark:text-white dark:divide-gray-700 px-2 py-2">
@ -246,10 +234,9 @@
{% endif %}
</div>
</div>
</div>
{% include '/dashboard/drawer_client_create_dashboard.html.j2' %}
{% include '/dashboard/drawer_secret_create_dashboard.html.j2' %}
</div>
{% include '/clients/drawer_client_create.html.j2' %}
{% include '/secrets/drawer_secret_create.html.j2' %}
</div>
{% endblock %}

View File

@ -1,31 +1,90 @@
<nav class="fixed z-30 w-full bg-white border-b border-gray-200 dark:bg-gray-800 dark:border-gray-700">
<nav
class="fixed z-30 w-full bg-white border-b border-gray-200 dark:bg-gray-800 dark:border-gray-700"
>
<div class="px-3 py-3 lg:px-5 lg:pl-3">
<div class="flex items-center justify-between">
<div class="flex items-center justify-start">
<button id="toggleSidebarMobile" aria-expanded="true" aria-controls="sidebar" class="p-2 text-gray-600 rounded cursor-pointer lg:hidden hover:text-gray-900 hover:bg-gray-100 focus:bg-gray-100 dark:focus:bg-gray-700 focus:ring-2 focus:ring-gray-100 dark:focus:ring-gray-700 dark:text-gray-400 dark:hover:bg-gray-700 dark:hover:text-white">
<svg id="toggleSidebarMobileHamburger" class="w-6 h-6" fill="currentColor" viewBox="0 0 20 20" xmlns="http://www.w3.org/2000/svg"><path fill-rule="evenodd" d="M3 5a1 1 0 011-1h12a1 1 0 110 2H4a1 1 0 01-1-1zM3 10a1 1 0 011-1h6a1 1 0 110 2H4a1 1 0 01-1-1zM3 15a1 1 0 011-1h12a1 1 0 110 2H4a1 1 0 01-1-1z" clip-rule="evenodd"></path></svg>
<svg id="toggleSidebarMobileClose" class="hidden w-6 h-6" fill="currentColor" viewBox="0 0 20 20" xmlns="http://www.w3.org/2000/svg"><path fill-rule="evenodd" d="M4.293 4.293a1 1 0 011.414 0L10 8.586l4.293-4.293a1 1 0 111.414 1.414L11.414 10l4.293 4.293a1 1 0 01-1.414 1.414L10 11.414l-4.293 4.293a1 1 0 01-1.414-1.414L8.586 10 4.293 5.707a1 1 0 010-1.414z" clip-rule="evenodd"></path></svg>
<button
id="toggleSidebarMobile"
aria-expanded="true"
aria-controls="sidebar"
class="p-2 text-gray-600 rounded cursor-pointer lg:hidden hover:text-gray-900 hover:bg-gray-100 focus:bg-gray-100 dark:focus:bg-gray-700 focus:ring-2 focus:ring-gray-100 dark:focus:ring-gray-700 dark:text-gray-400 dark:hover:bg-gray-700 dark:hover:text-white"
>
<svg
id="toggleSidebarMobileHamburger"
class="w-6 h-6"
fill="currentColor"
viewBox="0 0 20 20"
xmlns="http://www.w3.org/2000/svg"
>
<path
fill-rule="evenodd"
d="M3 5a1 1 0 011-1h12a1 1 0 110 2H4a1 1 0 01-1-1zM3 10a1 1 0 011-1h6a1 1 0 110 2H4a1 1 0 01-1-1zM3 15a1 1 0 011-1h12a1 1 0 110 2H4a1 1 0 01-1-1z"
clip-rule="evenodd"
></path>
</svg>
<svg
id="toggleSidebarMobileClose"
class="hidden w-6 h-6"
fill="currentColor"
viewBox="0 0 20 20"
xmlns="http://www.w3.org/2000/svg"
>
<path
fill-rule="evenodd"
d="M4.293 4.293a1 1 0 011.414 0L10 8.586l4.293-4.293a1 1 0 111.414 1.414L11.414 10l4.293 4.293a1 1 0 01-1.414 1.414L10 11.414l-4.293 4.293a1 1 0 01-1.414-1.414L8.586 10 4.293 5.707a1 1 0 010-1.414z"
clip-rule="evenodd"
></path>
</svg>
</button>
<a href="/" class="flex ml-2 md:mr-24">
<img src="{{ url_for('static', path='logo.svg') }}" class="h-11 mr-3" alt="Sshecret Logo" />
<span class="self-center text-xl font-semibold sm:text-2xl whitespace-nowrap dark:text-white">Sshecret</span>
<img
src="{{ url_for('static', path='logo.svg') }}"
class="h-11 mr-3"
alt="Sshecret Logo"
/>
<span
class="self-center text-xl font-semibold sm:text-2xl whitespace-nowrap dark:text-white"
>Sshecret</span
>
</a>
</div>
<div class="flex items-center">
<div class="flex items-center ml-3">
<div>
<button type="button" class="flex text-sm bg-gray-800 rounded-full focus:ring-4 focus:ring-gray-300 dark:focus:ring-gray-600" id="user-menu-button-2" aria-expanded="false" data-dropdown-toggle="dropdown-2">
<button
type="button"
class="flex text-sm bg-gray-800 rounded-full focus:ring-4 focus:ring-gray-300 dark:focus:ring-gray-600"
id="user-menu-button-2"
aria-expanded="false"
data-dropdown-toggle="dropdown-2"
>
<span class="sr-only">Open user menu</span>
<svg class="w-6 h-6 text-gray-800 dark:text-white" aria-hidden="true" xmlns="http://www.w3.org/2000/svg" width="24" height="24" fill="none" viewBox="0 0 24 24">
<path stroke="currentColor" stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M12 21a9 9 0 1 0 0-18 9 9 0 0 0 0 18Zm0 0a8.949 8.949 0 0 0 4.951-1.488A3.987 3.987 0 0 0 13 16h-2a3.987 3.987 0 0 0-3.951 3.512A8.948 8.948 0 0 0 12 21Zm3-11a3 3 0 1 1-6 0 3 3 0 0 1 6 0Z"/>
</svg>
<svg
class="w-6 h-6 text-gray-800 dark:text-white"
aria-hidden="true"
xmlns="http://www.w3.org/2000/svg"
width="24"
height="24"
fill="none"
viewBox="0 0 24 24"
>
<path
stroke="currentColor"
stroke-linecap="round"
stroke-linejoin="round"
stroke-width="2"
d="M12 21a9 9 0 1 0 0-18 9 9 0 0 0 0 18Zm0 0a8.949 8.949 0 0 0 4.951-1.488A3.987 3.987 0 0 0 13 16h-2a3.987 3.987 0 0 0-3.951 3.512A8.948 8.948 0 0 0 12 21Zm3-11a3 3 0 1 1-6 0 3 3 0 0 1 6 0Z"
/>
</svg>
</button>
</div>
<!-- Dropdown menu -->
<div class="z-50 hidden my-4 text-base list-none bg-white divide-y divide-gray-100 rounded shadow dark:bg-gray-700 dark:divide-gray-600" id="dropdown-2">
<div
class="z-50 hidden my-4 text-base list-none bg-white divide-y divide-gray-100 rounded shadow dark:bg-gray-700 dark:divide-gray-600"
id="dropdown-2"
>
<div class="px-4 py-3" role="none">
<p class="text-sm text-gray-900 dark:text-white" role="none">
{{ user }}
@ -33,10 +92,20 @@
</div>
<ul class="py-1" role="none">
<li>
<a href="#" class="block px-4 py-2 text-sm text-gray-700 hover:bg-gray-100 dark:text-gray-300 dark:hover:bg-gray-600 dark:hover:text-white" role="menuitem">Change Password</a>
<a
href="#"
class="block px-4 py-2 text-sm text-gray-700 hover:bg-gray-100 dark:text-gray-300 dark:hover:bg-gray-600 dark:hover:text-white"
role="menuitem"
>Change Password</a
>
</li>
<li>
<a href="#" class="block px-4 py-2 text-sm text-gray-700 hover:bg-gray-100 dark:text-gray-300 dark:hover:bg-gray-600 dark:hover:text-white" role="menuitem">Logout</a>
<a
href="/logout"
class="block px-4 py-2 text-sm text-gray-700 hover:bg-gray-100 dark:text-gray-300 dark:hover:bg-gray-600 dark:hover:text-white"
role="menuitem"
>Logout</a
>
</li>
</ul>
</div>

View File

@ -1,19 +1,29 @@
{% extends "/shared/_base.html" %} {% block content %}
{% if login_error %}
{% extends "/shared/_base.html" %} {% block content %} {% if login_error %}
<div class="flex bg-gray-100">
<div class="flex w-full items-center p-4 mb-4 text-sm text-red-800 border border-red-300 rounded-lg bg-red-50 dark:bg-gray-800 dark:text-red-400 dark:border-red-800" role="alert">
<svg class="shrink-0 inline w-4 h-4 me-3" aria-hidden="true" xmlns="http://www.w3.org/2000/svg" fill="currentColor" viewBox="0 0 20 20">
<path d="M10 .5a9.5 9.5 0 1 0 9.5 9.5A9.51 9.51 0 0 0 10 .5ZM9.5 4a1.5 1.5 0 1 1 0 3 1.5 1.5 0 0 1 0-3ZM12 15H8a1 1 0 0 1 0-2h1v-3H8a1 1 0 0 1 0-2h2a1 1 0 0 1 1 1v4h1a1 1 0 0 1 0 2Z"/>
<div class="flex bg-gray-100">
<div
class="flex w-full items-center p-4 mb-4 text-sm text-red-800 border border-red-300 rounded-lg bg-red-50 dark:bg-gray-800 dark:text-red-400 dark:border-red-800"
role="alert"
>
<svg
class="shrink-0 inline w-4 h-4 me-3"
aria-hidden="true"
xmlns="http://www.w3.org/2000/svg"
fill="currentColor"
viewBox="0 0 20 20"
>
<path
d="M10 .5a9.5 9.5 0 1 0 9.5 9.5A9.51 9.51 0 0 0 10 .5ZM9.5 4a1.5 1.5 0 1 1 0 3 1.5 1.5 0 0 1 0-3ZM12 15H8a1 1 0 0 1 0-2h1v-3H8a1 1 0 0 1 0-2h2a1 1 0 0 1 1 1v4h1a1 1 0 0 1 0 2Z"
/>
</svg>
<span class="sr-only">Info</span>
<div>
<span class="font-medium">{{ login_error.title }}</span> {{login_error.message}}
<span class="font-medium">{{ login_error.title }}</span>
{{login_error.message}}
</div>
</div>
</div>
</div>
{% endif %}
{% endif %}
<div class="min-h-screen bg-gray-100 flex items-center justify-center p-4">
<div class="max-w-md w-full bg-white rounded-xl shadow-lg p-8">
@ -28,6 +38,8 @@
name="username"
class="w-full px-4 py-2 border border-gray-300 rounded-lg focus:ring-2 focus:ring-indigo-500 focus:border-indigo-500 outline-none transition-all"
placeholder="Username"
autocomplete="username"
required=""
/>
</div>
@ -40,11 +52,14 @@
name="password"
class="w-full px-4 py-2 border border-gray-300 rounded-lg focus:ring-2 focus:ring-indigo-500 focus:border-indigo-500 outline-none transition-all"
placeholder="••••••••"
autocomplete="current-password"
required=""
/>
</div>
<button
class="w-full bg-indigo-600 hover:bg-indigo-700 text-white font-medium py-2.5 rounded-lg transition-colors"
type="submit"
>
Sign In
</button>

View File

@ -33,123 +33,6 @@
<span class="sr-only">Close menu</span>
</button>
<form hx-post="/secrets/" hx-target="#secretsContent">
<div class="space-y-4">
<div>
<label
for="name"
class="block mb-2 text-sm font-medium text-gray-900 dark:text-white"
>Name</label
>
<input
type="text"
name="name"
id="name"
class="bg-gray-50 border border-gray-300 text-gray-900 text-sm rounded-lg focus:ring-primary-600 focus:border-primary-600 block w-full p-2.5 dark:bg-gray-700 dark:border-gray-600 dark:placeholder-gray-400 dark:text-white dark:focus:ring-primary-500 dark:focus:border-primary-500"
placeholder="Secret name"
required=""
/>
</div>
<div>
<label
for="value"
class="block mb-2 text-sm font-medium text-gray-900 dark:text-white"
>Secret Value</label
>
<p
id="helper-text-explanation"
class="mt-2 text-sm text-gray-500 dark:text-gray-400"
>
Enter the secret string here.
</p>
<input
type="text"
name="value"
id="secretValueInput"
class="bg-gray-50 border border-gray-300 text-gray-900 text-sm rounded-lg focus:ring-primary-600 focus:border-primary-600 block w-full p-2.5 dark:bg-gray-700 dark:border-gray-600 dark:placeholder-gray-400 dark:text-white dark:focus:ring-primary-500 dark:focus:border-primary-500"
placeholder="Your secret string here"
/>
</div>
<div>
<label
for="auto_generate"
class="block mb-2 text-sm font-medium text-gray-900 dark:text-white"
>
<label class="inline-flex items-center cursor-pointer">
<input
type="checkbox"
name="auto_generate"
id="autoGenerateCheckbox"
class="sr-only peer"
hx-on:change="document.getElementById('secretValueInput').disabled = this.checked;
if (this.checked) { document.getElementById('secretValueInput').value = '' }"
/>
<div
class="relative w-11 h-6 bg-gray-200 peer-focus:outline-none peer-focus:ring-4 peer-focus:ring-blue-300 dark:peer-focus:ring-blue-800 rounded-full peer dark:bg-gray-700 peer-checked:after:translate-x-full rtl:peer-checked:after:-translate-x-full peer-checked:after:border-white after:content-[''] after:absolute after:top-[2px] after:start-[2px] after:bg-white after:border-gray-300 after:border after:rounded-full after:h-5 after:w-5 after:transition-all dark:border-gray-600 peer-checked:bg-blue-600 dark:peer-checked:bg-blue-600"
></div>
<span
class="ms-3 text-sm font-medium text-gray-900 dark:text-gray-300"
>Auto-generate secret</span
>
</label>
</label>
</div>
<div>
<label
for="clients"
class="block mb-2 text-sm font-medium text-gray-900 dark:text-white"
>Clients</label
>
<select
multiple="multiple"
id="clients"
name="clients"
class="bg-gray-50 border border-gray-300 text-gray-900 text-sm rounded-lg focus:ring-primary-500 focus:border-primary-500 block w-full p-2.5 dark:bg-gray-700 dark:border-gray-600 dark:placeholder-gray-400 dark:text-white dark:focus:ring-primary-500 dark:focus:border-primary-500"
>
<option selected="selected">Select clients to assign the secret to</option>
{% for client in clients %}
<option value="{{ client.id }}">{{ client.name }}</option>
{% endfor %}
</select>
</div>
<div
class="bottom-0 left-0 flex justify-center w-full pb-4 space-x-4 md:px-4 md:absolute"
>
<button
type="submit"
class="text-white w-full justify-center bg-primary-700 hover:bg-primary-800 focus:ring-4 focus:outline-none focus:ring-primary-300 font-medium rounded-lg text-sm px-5 py-2.5 text-center dark:bg-primary-600 dark:hover:bg-primary-700 dark:focus:ring-primary-800"
>
Add Secret
</button>
<button
type="button"
data-drawer-dismiss="drawer-create-secret-default"
aria-controls="drawer-create-secret-default"
class="inline-flex w-full justify-center text-gray-500 items-center bg-white hover:bg-gray-100 focus:ring-4 focus:outline-none focus:ring-primary-300 rounded-lg border border-gray-200 text-sm font-medium px-5 py-2.5 hover:text-gray-900 focus:z-10 dark:bg-gray-700 dark:text-gray-300 dark:border-gray-500 dark:hover:text-white dark:hover:bg-gray-600 dark:focus:ring-gray-600"
>
<svg
aria-hidden="true"
class="w-5 h-5 -ml-1 sm:mr-1"
fill="none"
stroke="currentColor"
viewBox="0 0 24 24"
xmlns="http://www.w3.org/2000/svg"
>
<path
stroke-linecap="round"
stroke-linejoin="round"
stroke-width="2"
d="M6 18L18 6M6 6l12 12"
></path>
</svg>
Cancel
</button>
</div>
</div>
{% include '/secrets/drawer_secret_create_inner.html.j2' %}
</form>
</div>

View File

@ -0,0 +1,118 @@
<div class="space-y-4">
<div>
<label
for="name"
class="block mb-2 text-sm font-medium text-gray-900 dark:text-white"
>Name</label
>
<input
type="text"
name="name"
id="name"
class="bg-gray-50 border border-gray-300 text-gray-900 text-sm rounded-lg focus:ring-primary-600 focus:border-primary-600 block w-full p-2.5 dark:bg-gray-700 dark:border-gray-600 dark:placeholder-gray-400 dark:text-white dark:focus:ring-primary-500 dark:focus:border-primary-500"
placeholder="Secret name"
required=""
/>
</div>
<div>
<label
for="value"
class="block mb-2 text-sm font-medium text-gray-900 dark:text-white"
>Secret Value</label
>
<p
id="helper-text-explanation"
class="mt-2 text-sm text-gray-500 dark:text-gray-400"
>
Enter the secret string here.
</p>
<input
type="text"
name="value"
id="secretValueInput"
class="bg-gray-50 border border-gray-300 text-gray-900 text-sm rounded-lg focus:ring-primary-600 focus:border-primary-600 block w-full p-2.5 dark:bg-gray-700 dark:border-gray-600 dark:placeholder-gray-400 dark:text-white dark:focus:ring-primary-500 dark:focus:border-primary-500"
placeholder="Your secret string here"
/>
</div>
<div>
<label
for="auto_generate"
class="block mb-2 text-sm font-medium text-gray-900 dark:text-white"
>
<label class="inline-flex items-center cursor-pointer" id="autoGenerateCheckboxLabel">
<input
type="checkbox"
name="auto_generate"
id="autoGenerateCheckbox"
class="sr-only peer"
hx-on:change="document.getElementById('secretValueInput').disabled = this.checked;
if (this.checked) { document.getElementById('secretValueInput').value = '' }"
/>
<div
class="relative w-11 h-6 bg-gray-200 peer-focus:outline-none peer-focus:ring-4 peer-focus:ring-blue-300 dark:peer-focus:ring-blue-800 rounded-full peer dark:bg-gray-700 peer-checked:after:translate-x-full rtl:peer-checked:after:-translate-x-full peer-checked:after:border-white after:content-[''] after:absolute after:top-[2px] after:start-[2px] after:bg-white after:border-gray-300 after:border after:rounded-full after:h-5 after:w-5 after:transition-all dark:border-gray-600 peer-checked:bg-blue-600 dark:peer-checked:bg-blue-600"
></div>
<span
class="ms-3 text-sm font-medium text-gray-900 dark:text-gray-300"
>Auto-generate secret</span
>
</label>
</label>
</div>
<div>
<label
for="clients"
class="block mb-2 text-sm font-medium text-gray-900 dark:text-white"
>Clients</label
>
<select
multiple="multiple"
id="clients"
name="clients"
class="bg-gray-50 border border-gray-300 text-gray-900 text-sm rounded-lg focus:ring-primary-500 focus:border-primary-500 block w-full p-2.5 dark:bg-gray-700 dark:border-gray-600 dark:placeholder-gray-400 dark:text-white dark:focus:ring-primary-500 dark:focus:border-primary-500"
>
<option selected="selected">Select clients to assign the secret to</option>
{% for client in clients %}
<option value="{{ client.id }}">{{ client.name }}</option>
{% endfor %}
</select>
</div>
<div
class="bottom-0 left-0 flex justify-center w-full pb-4 space-x-4 md:px-4 md:absolute"
>
<button
type="submit"
class="text-white w-full justify-center bg-primary-700 hover:bg-primary-800 focus:ring-4 focus:outline-none focus:ring-primary-300 font-medium rounded-lg text-sm px-5 py-2.5 text-center dark:bg-primary-600 dark:hover:bg-primary-700 dark:focus:ring-primary-800"
>
Add Secret
</button>
<button
type="button"
data-drawer-dismiss="drawer-create-secret-default"
aria-controls="drawer-create-secret-default"
class="inline-flex w-full justify-center text-gray-500 items-center bg-white hover:bg-gray-100 focus:ring-4 focus:outline-none focus:ring-primary-300 rounded-lg border border-gray-200 text-sm font-medium px-5 py-2.5 hover:text-gray-900 focus:z-10 dark:bg-gray-700 dark:text-gray-300 dark:border-gray-500 dark:hover:text-white dark:hover:bg-gray-600 dark:focus:ring-gray-600"
>
<svg
aria-hidden="true"
class="w-5 h-5 -ml-1 sm:mr-1"
fill="none"
stroke="currentColor"
viewBox="0 0 24 24"
xmlns="http://www.w3.org/2000/svg"
>
<path
stroke-linecap="round"
stroke-linejoin="round"
stroke-width="2"
d="M6 18L18 6M6 6l12 12"
></path>
</svg>
Cancel
</button>
</div>
</div>

View File

@ -45,7 +45,8 @@
{% for client in secret.clients %}
<span
class="inline-flex items-center px-2 py-1 me-2 text-sm font-medium text-red-800 bg-red-100 rounded-sm dark:bg-red-900 dark:text-red-300"
class="inline-flex items-center px-2 py-1 me-2 text-sm font-medium text-red-800 bg-red-100 rounded-sm dark:bg-red-900 dark:text-red-300 pill-client-secret"
id="client-secret-{{ secret.name }}-pill-{{ client.name }}"
>{{ client.name }}
<button
type="button"
@ -54,6 +55,7 @@
hx-delete="/secrets/{{ secret.name }}/clients/{{ client.id }}"
hx-target="#secretsContent"
hx-confirm="Remove client {{ client.name }} from secret {{secret.name}}?"
id="btn-remove-client-{{ client.name }}-secret-{{ secret.name }}"
>
<svg
class="w-2 h-2"
@ -93,6 +95,7 @@
<select
name="client"
class="bg-gray-50 border border-gray-300 text-gray-900 text-sm rounded-lg focus:ring-primary-500 focus:border-primary-500 block w-full p-2.5 dark:bg-gray-700 dark:border-gray-600 dark:placeholder-gray-400 dark:text-white dark:focus:ring-primary-500 dark:focus:border-primary-500"
id="sel-add-client-secret-{{ secret.name }}"
>
<option selected="selected">
Select clients to assign the secret to

View File

@ -4,11 +4,10 @@
>
<td
class="p-4 text-sm font-normal text-gray-500 whitespace-nowrap dark:text-gray-400"
>
{{ secret.name }}
</td>
>{{- secret.name -}}</td>
<td
class="max-w-sm p-4 overflow-hidden text-base font-normal text-gray-500 truncate xl:max-w-xs dark:text-gray-400"
class="max-w-sm p-4 overflow-hidden text-base font-normal text-gray-500 truncate xl:max-w-xs dark:text-gray-400 secret-client-list"
id="secret-client-list-{{ secret.name }}"
>
{% if secret.clients %}
{% for client in secret.clients %}
@ -16,7 +15,7 @@
<svg class="w-6 h-6 text-gray-800 dark:text-white" aria-hidden="true" xmlns="http://www.w3.org/2000/svg" width="24" height="24" fill="currentColor" viewBox="0 0 24 24">
<path fill-rule="evenodd" d="M12 4a4 4 0 1 0 0 8 4 4 0 0 0 0-8Zm-2 9a4 4 0 0 0-4 4v1a2 2 0 0 0 2 2h8a2 2 0 0 0 2-2v-1a4 4 0 0 0-4-4h-4Z" clip-rule="evenodd"/>
</svg>
{{ client.name }}
{{- client.name -}}
</span>
{% endfor %}
{% else %}
@ -28,6 +27,7 @@
type="button"
data-modal-target="client-secret-modal-{{secret.name}}" data-modal-toggle="client-secret-modal-{{ secret.name }}"
class="inline-flex items-center px-3 py-2 text-sm font-medium text-center text-white rounded-lg bg-primary-700 hover:bg-primary-800 focus:ring-4 focus:ring-primary-300 dark:bg-primary-600 dark:hover:bg-primary-700 dark:focus:ring-primary-800"
id="manage-client-access-btn-{{ secret.name }}"
>
<svg
class="w-4 h-4 mr-2"
@ -53,6 +53,7 @@
hx-delete="/secrets/{{ secret.name }}"
hx-confirm="Are you sure you want to delete the secret {{ secret.name }}?"
hx-target="#secretsContent"
id="delete-secret-btn-{{ secret.name }}"
>
<svg
class="w-4 h-4 mr-2"
@ -69,3 +70,4 @@
Delete item
</button>
</td>
</tr>

View File

@ -169,4 +169,14 @@ def create_router(dependencies: FrontendDependencies) -> APIRouter:
)
return response
@app.get("/logout")
async def logout(
response: Response,
):
"""Log out user."""
response = RedirectResponse(url="/login", status_code=status.HTTP_302_FOUND)
response.delete_cookie("refresh_token", httponly=True, secure=False, samesite="strict")
response.delete_cookie("access_token", httponly=True, secure=False, samesite="strict")
return response
return app

View File

@ -13,6 +13,7 @@ dependencies = [
"httpx>=0.28.1",
"pydantic>=2.10.6",
"python-dotenv>=1.0.1",
"rich>=14.0.0",
"sshecret",
]

View File

@ -0,0 +1,5 @@
"""Commands module."""
from .dispatcher import dispatch_command
__all__ = ["dispatch_command"]

View File

@ -0,0 +1,342 @@
"""Base command class."""
import abc
import json
import textwrap
from collections import defaultdict
from dataclasses import dataclass, field
import ipaddress
import logging
from typing import Any, cast, override
from rich.console import Console
import asyncssh
from pydantic import IPvAnyNetwork, IPvAnyAddress
from sshecret.backend.api import SshecretBackend
from sshecret.backend.models import Client, Operation, SubSystem
from sshecret_sshd import exceptions
PeernameV4 = tuple[str, int]
PeernameV6 = tuple[str, int, int, int]
Peername = PeernameV4 | PeernameV6
LOG = logging.getLogger(__name__)
@dataclass
class CmdArgs:
"""Command and arguments."""
command: str
arguments: list[str] = field(default_factory=list)
@dataclass
class CommandFlag:
"""Command flag."""
name: str
description: str
supports_short: bool = False
enabled: bool = False
@override
def __str__(self) -> str:
"""Format an output for help texts."""
if self.supports_short:
return f"[-{self.name[0]} --{self.name}]"
return f"--{self.name}"
def get_console(process: asyncssh.SSHServerProcess[str]) -> Console:
"""Initiate console from process."""
width, _height, pixwidth, pixheight = process.term_size
LOG.debug("Terminal is %sx%s", pixwidth, pixheight)
if width > 0:
console = Console(
force_terminal=True,
width=pixwidth,
height=pixheight,
color_system="standard",
)
return console
return Console(markup=False, color_system=None)
class CommandDispatcher(abc.ABC):
"""Command dispatcher."""
name: str
flags: dict[str, str] | None = None
mandatory_argument: str | None = None
def __init__(
self,
process: asyncssh.SSHServerProcess[str],
) -> None:
"""Create command dispatcher class."""
self.process: asyncssh.SSHServerProcess[str] = process
def print(
self,
*data: str,
stderr: bool = False,
newline: bool = True,
) -> None:
"""Write to stdout."""
if stderr:
for line in data:
self.process.stderr.write(line + "\n")
return
for line in data:
self.process.stdout.write(line)
if newline:
self.process.stdout.write("\n")
def rich_print_line(
self, data: str, tags: list[str] | None = None, rule: bool = False
) -> None:
"""Write formatted text to the process.
IF the client terminal does not support this, no formatting will be added.
Otherwise, the tags will be added to the string.
"""
if not tags:
tags = []
if not self.formatting_supported:
return self.print(data, newline=True)
for tag in tags:
data = f"[{tag}]{data}[/{tag}]"
console = self.get_console()
with console.capture() as capture:
if rule:
console.rule(data)
else:
console.print(data)
self.process.stdout.write(capture.get())
def get_console(self) -> Console:
"""Initiate console from process."""
return get_console(self.process)
@property
def formatting_supported(self) -> bool:
"""Check if the terminal supports formatting."""
term_width = self.process.term_size[0]
return term_width > 0
def print_json(self, obj: dict[str, Any]) -> None:
"""Print a json object."""
console = self.get_console()
data = json.dumps(obj)
with console.capture() as capture:
console.print_json(data)
self.process.stdout.write(capture.get() + "\n")
async def audit(
self, operation: Operation, message: str, secret: str | None = None, **data: str
) -> None:
"""Log audit message."""
client = self.get_client()
try:
origin = str(self.remote_ip)
except Exception:
origin = "UNKNOWN"
username = self.get_username()
LOG.warning(
"Audit: %s (origin=%s, username=%s, client=%r, secret=%r, data=%r)",
message,
origin,
username,
client,
secret,
data,
)
await self.backend.audit(SubSystem.SSHD).write_async(
operation=operation,
message=message,
origin=origin,
secret=None,
secret_name=secret,
client=client,
username=username or "No username",
**data,
)
@abc.abstractmethod
async def exec(self) -> None:
"""Execute main command."""
def parse_command(self) -> CmdArgs:
"""Get command."""
if not self.process.command:
raise exceptions.NoCommandReceivedError()
argv = self.process.command.split(" ")
return CmdArgs(argv[0], argv[1:])
@property
def options(self) -> dict[str, bool]:
"""Get arguments."""
if not self.flags:
return {}
parsed = self.parse_command()
args: dict[str, bool] = {}
shortflags: defaultdict[str, list[str]] = defaultdict(list)
for flag in self.flags.keys():
shortflags[flag[0]].append(flag)
for flag in self.flags.keys():
allowshort = len(shortflags[flag[0]]) == 1
if f"--{flag}" in parsed.arguments:
args[flag] = True
continue
if allowshort and f"-{flag[0]}" in parsed.arguments:
args[flag] = True
continue
args[flag] = False
return args
@property
def arguments(self) -> list[str]:
"""Get non-flag arguments."""
parsed = self.parse_command()
LOG.debug("Parsed command: %r", parsed)
if not self.flags:
return parsed.arguments
return [
argument for argument in parsed.arguments if not argument.startswith("-")
]
@property
def backend(self) -> SshecretBackend:
"""Get backend from process info."""
backend = cast(
SshecretBackend | None, self.process.get_extra_info("backend", None)
)
if not backend:
raise exceptions.NoBackendError()
return backend
def get_client(self) -> Client | None:
"""Get client."""
return cast(Client | None, self.process.get_extra_info("client", None))
@property
def client(self) -> Client:
"""Get client from process info."""
client = self.get_client()
if not client:
raise exceptions.NoClientError()
return client
def get_username(self) -> str | None:
"""Get username."""
return cast(str | None, self.process.get_extra_info("provided_username", None))
@property
def username(self) -> str:
"""Get username from process info."""
username = self.get_username()
if not username:
raise exceptions.NoUsernameError()
return username
@property
def remote_ip(self) -> IPvAnyAddress:
"""Get remote IP."""
peername = cast(
"Peername | None", self.process.get_extra_info("peername", None)
)
remote_ip: str | None = None
if peername:
remote_ip = peername[0]
return ipaddress.ip_address(remote_ip)
raise exceptions.NoRemoteIpError()
@property
def allowed_registration_networks(self) -> list[IPvAnyNetwork]:
"""Get networks that allow registration."""
allowed_registration = cast(
list[IPvAnyNetwork],
self.process.get_extra_info("allow_registration_from", []),
)
return allowed_registration
@classmethod
def _format_command_name(cls) -> str:
"""Format command name with arguments."""
name = cls.name
if cls.mandatory_argument:
name = f"{name} {cls.mandatory_argument}"
if not cls.flags:
return name
flags = cls.command_flags()
args: list[str] = [str(flag) for flag in flags.values()]
flagstr = " ".join(args)
return f"{name} {flagstr}"
@classmethod
def usage(cls, indent: int = 0) -> str:
"""Print command usage."""
indent_prefix = " " * indent
inner_indent = 2 + indent
inner_prefix = " " * inner_indent
usage_str: list[str] = []
default_usage = "No help available."
if not cls.__doc__:
usage_str.append(default_usage)
else:
usage_str = [
textwrap.indent(line, prefix=inner_prefix)
for line in cls.__doc__.splitlines()
]
flags = cls.command_flags()
if flags:
usage_str.append("")
usage_str.append(textwrap.indent("Arguments:", prefix=(" " * 4)))
for info in flags.values():
usage_str.append(
textwrap.indent(f"{info!s}: {info.description}", prefix=(" " * 6))
)
usage = "\n".join(usage_str)
if indent:
usage = textwrap.indent(usage, prefix=indent_prefix)
return usage
async def print_help(self) -> None:
"""Print help."""
usage = type(self).usage()
command_name = type(self)._format_command_name()
self.rich_print_line(command_name)
self.print(usage)
@classmethod
def command_flags(cls) -> dict[str, CommandFlag]:
"""Parse the command flags."""
shortflags: defaultdict[str, list[str]] = defaultdict(list)
if not cls.flags:
return {}
for flag in cls.flags.keys():
shortflags[flag[0]].append(flag)
command_flags: dict[str, CommandFlag] = {}
for flag, description in cls.flags.items():
supports_shorts = True
if len(shortflags[flag[0]]) > 1:
supports_shorts = False
command_flags[flag] = CommandFlag(flag, description, supports_shorts)
return command_flags

View File

@ -0,0 +1,150 @@
"""Command dispatcher.
Register arguments here.
"""
import logging
from typing import cast, final, override
import asyncssh
from sshecret_sshd import exceptions, constants
from .base import CommandDispatcher
from .get_secret import GetSecret
from .register import Register
from .list_secrets import ListSecrets
from .ping import PingCommand
SYNOPSIS = """[bold]Sshecret SSH Server[/bold]
[bold]SYNOPSIS[/bold]
An interface to request encrypted client secrets and perform
simple commands.
Secrets will be returned encrypted with the client public key,
encoded as base64.
"""
COMMANDS = [
GetSecret,
Register,
ListSecrets,
PingCommand,
]
LOG = logging.getLogger(__name__)
@final
class HelpCommand(CommandDispatcher):
"""Help.
Returns usage instructions.
"""
name = "help"
@override
def __init__(
self,
process: asyncssh.SSHServerProcess[str],
disabled_commands: list[str] | None = None,
) -> None:
"""Init help command."""
super().__init__(process)
self.disabled_commands: list[str] = disabled_commands or []
@override
async def exec(self) -> None:
"""Execute command."""
output_lines: list[tuple[str, str] | str] = []
output_lines.append(("Available commands:", "bold"))
output_lines.append(
("Some commands may be disabled or restricted by the administrator", "i")
)
output_lines.append("")
for command in COMMANDS:
if command.name in self.disabled_commands:
continue
command_usage = command.usage(indent=2)
output_lines.append((f" {command._format_command_name()}", "bold"))
output_lines.extend(command_usage.splitlines())
output_lines.append("")
for line in output_lines:
tags: list[str] = []
if isinstance(line, tuple):
text, tag = line
tags.append(tag)
else:
text = line
self.rich_print_line(text, tags)
def get_disabled_commands(process: asyncssh.SSHServerProcess[str]) -> list[str]:
"""Get optional command state."""
with_registration = cast(
bool, process.get_extra_info("registration_enabled", False)
)
with_ping = cast(bool, process.get_extra_info("ping_enabled", False))
optional_commands = {
"register": with_registration,
"ping": with_ping,
}
disabled = [key for key, value in optional_commands.items() if not value]
return disabled
async def do_dispatch_command(process: asyncssh.SSHServerProcess[str]) -> None:
"""Dispatch command."""
disabled_commands = get_disabled_commands(process)
command_args = process.command
if not command_args:
raise exceptions.NoCommandReceivedError()
show_command_help = False
if "--help" in command_args:
show_command_help = True
command = command_args.split(" ")[0]
command_map: dict[str, type[CommandDispatcher]] = {
cmd_disp.name: cmd_disp
for cmd_disp in COMMANDS
if cmd_disp.name not in disabled_commands
}
command_map["help"] = HelpCommand
LOG.debug("disabled_commands: %r, command_map: %r", disabled_commands, command_map)
LOG.debug("Looking for command %s", command)
if command not in command_map:
raise exceptions.UnknownCommandError()
dispatcher = command_map[command]
LOG.debug("Received command: %s. Dispatching to %r", command, dispatcher)
if command != "help" and show_command_help:
return await dispatcher(process).print_help()
if command == "help":
return await HelpCommand(process, disabled_commands).exec()
return await dispatcher(process).exec()
async def dispatch_command(process: asyncssh.SSHServerProcess[str]) -> None:
"""Dispatch command."""
status_code = 0
try:
await do_dispatch_command(process)
except exceptions.BaseSshecretSshError as e:
LOG.error("Command Error: %s", e, exc_info=True)
process.stderr.write(f"{e}\n")
status_code = 1
except Exception as e:
LOG.error("Unexpected error: %e", e, exc_info=True)
process.stderr.write(f"{constants.ERROR_GENERIC_ERROR}: {e}")
status_code = 1
finally:
process.exit(status_code)

View File

@ -0,0 +1,63 @@
"""Get secret."""
import logging
from typing import final, override
from sshecret.backend.models import Operation
from sshecret_sshd import exceptions
from .base import CommandDispatcher
LOG = logging.getLogger(__name__)
@final
class GetSecret(CommandDispatcher):
"""Retrieve an encrypted secret.
Returns the value of the secret provided as a mandatory argument.
The secret will be encrypted using the stored RSA public key, and returned
as a base64 encoded string.
"""
name = "get_secret"
mandatory_argument = "SECRET"
@override
async def exec(self) -> None:
"""Execute command."""
if len(self.arguments) != 1:
raise exceptions.UnknownClientOrSecretError()
secret_name = self.arguments[0]
LOG.debug("get_secret called: Argument: %r", secret_name)
if secret_name not in self.client.secrets:
await self.audit(
Operation.DENY,
message="Client requested invalid secret",
secret=secret_name,
)
raise exceptions.SecretNotFoundError()
try:
secret = await self.backend.get_client_secret(self.client.name, secret_name)
except Exception as exc:
LOG.error(
"Got exception while getting client %s secret %s: %s",
self.client.name,
secret_name,
exc,
exc_info=True,
)
raise exceptions.BackendError(backend_error=str(exc)) from exc
if not secret:
await self.audit(
Operation.DENY,
message="Client requested invalid secret",
secret=secret_name,
)
raise exceptions.SecretNotFoundError()
await self.audit(
Operation.READ, message="Client requested secret", secret=secret_name
)
self.print(secret, newline=False)

View File

@ -0,0 +1,40 @@
"""List secrets command."""
from typing import final, override
from sshecret.backend.models import Operation
from .base import CommandDispatcher
@final
class ListSecrets(CommandDispatcher):
"""List secrets.
This command returns a list of secrets available for the connecting client
host.
"""
name = "ls"
flags = {"json": "Output in JSON format"}
@override
async def exec(self) -> None:
"""Execute command."""
json_mode = self.options.get("json")
if json_mode:
return self.list_as_json()
return await self.list_secrets()
def list_as_json(self) -> None:
"""List as json."""
json_obj = {"secrets": self.client.secrets}
self.print_json(json_obj)
async def list_secrets(self) -> None:
"""List secrets."""
self.rich_print_line(
f"Available secrets for client {self.client.name}", ["bold"], rule=True
)
await self.audit(Operation.READ, "Listed available secret names")
for secret_name in self.client.secrets:
self.print(f" - {secret_name}")

View File

@ -0,0 +1,22 @@
"""Ping as a healthcheck command."""
from typing import final, override
from .base import CommandDispatcher
@final
class PingCommand(CommandDispatcher):
"""Ping.
This command responds with the string 'PONG'.
It may be used to ensure that the system works.
"""
name = "ping"
@override
async def exec(self) -> None:
"""Execute command."""
self.print("PONG")

View File

@ -0,0 +1,73 @@
"""Registration command."""
from typing import final, override
import asyncssh
from sshecret_sshd import constants, exceptions
from sshecret.backend.models import Operation
from .base import CommandDispatcher
def verify_key_input(public_key: str) -> str | None:
"""Verify key input."""
try:
key = asyncssh.import_public_key(public_key)
if key.algorithm.decode() == "ssh-rsa":
return public_key
except Exception:
return None
@final
class Register(CommandDispatcher):
"""Register a new client.
After connection, you must input a ssh public key.
This must be an RSA key. No other types of keys are supported.
"""
name = "register"
def verify_registration_host(self) -> bool:
"""Check if registration command is allowed."""
if not self.allowed_registration_networks:
return False
for network in self.allowed_registration_networks:
if self.remote_ip in network:
return True
return False
@override
async def exec(self) -> None:
"""Register client."""
if not self.verify_registration_host():
self.print(f"Registration not permitted from {self.remote_ip}", stderr=True)
await self.audit(
Operation.DENY,
constants.ERROR_REGISTRATION_NOT_ALLOWED,
)
return
self.print("Enter public key:")
public_key: str | None = None
try:
async for line in self.process.stdin:
public_key = verify_key_input(line.rstrip("\n"))
if public_key:
break
raise exceptions.InvalidPublicKeyType()
except asyncssh.BreakReceived:
pass
else:
self.print("Key received. Validating.")
if not public_key:
raise exceptions.InvalidPublicKeyType()
key = asyncssh.import_public_key(public_key)
if key.algorithm.decode() != "ssh-rsa":
raise exceptions.InvalidPublicKeyType()
self.print("Key is valid. Registering client.")
await self.audit(Operation.CREATE, "Registering new client.")
await self.backend.create_client(self.username, public_key)

View File

@ -14,3 +14,9 @@ ERROR_INFO_USERNAME_GONE = "Unexpected error: Username lost in transit."
ERROR_INFO_REMOTE_IP_GONE = (
"Unexpected error: Client connection details lost in transit."
)
ERROR_GENERIC_ERROR = "An unexpected error occured"
ERROR_UNAUTHORIZED = "Denied due to policy or configuration"
ERROR_REGISTRATION_NOT_ALLOWED = (
"Received registration command from unauthorized source."
)

View File

@ -0,0 +1,117 @@
"""Exceptions.
We define a hierarchy like this:
- BaseSshecretSshError
- PolicyError
- CommandError
"""
from sshecret_sshd import constants
class BaseSshecretSshError(Exception):
"""Base exception class."""
default_message: str = constants.ERROR_GENERIC_ERROR
def __init__(self, message: str | None = None) -> None:
"""Init error class."""
super().__init__(message or self.default_message)
class BackendError(BaseSshecretSshError):
"""Error communicating with the backend."""
default_message: str = constants.ERROR_BACKEND_ERROR
def __init__(
self, message: str | None = None, backend_error: str | None = None
) -> None:
"""Init error class."""
if not message:
message = self.default_message
if backend_error:
message += f" Upstream error: {backend_error}"
super().__init__(message)
class PolicyError(BaseSshecretSshError):
"""Error related to policy"""
default_message: str = constants.ERROR_UNAUTHORIZED
class CommandError(BaseSshecretSshError):
"""Errors related to commands."""
default_message: str = constants.ERROR_GENERIC_ERROR
class SecretNotFoundError(CommandError):
"""Secret was not found."""
default_message: str = constants.ERROR_NO_SECRET_FOUND
class UnknownClientOrSecretError(CommandError):
"""Client or secret was not found."""
default_message: str = constants.ERROR_UNKNOWN_CLIENT_OR_SECRET
class NoCommandReceivedError(CommandError):
"""Client or secret was not found."""
default_message: str = constants.ERROR_NO_COMMAND_RECEIVED
class NoPublicKeyError(PolicyError):
"""No public key received."""
default_message: str = constants.ERROR_NO_PUBLIC_KEY
class InvalidPublicKeyType(PolicyError):
"""Type of public key is not correct."""
default_message: str = constants.ERROR_INVALID_KEY_TYPE
class UnknownCommandError(CommandError):
"""Unknown command received."""
default_message: str = constants.ERROR_UNKNOWN_COMMAND
class ProcessError(BaseSshecretSshError):
"""Process related errors."""
default_message: str = constants.ERROR_GENERIC_ERROR
class NoBackendError(ProcessError):
"""Error related to backend object resolution."""
default_message: str = constants.ERROR_INFO_BACKEND_GONE
class NoUsernameError(ProcessError):
"""Error related to username resolution."""
default_message: str = constants.ERROR_INFO_USERNAME_GONE
class NoRemoteIpError(ProcessError):
"""Error related to remote_ip resolution."""
default_message: str = constants.ERROR_INFO_REMOTE_IP_GONE
class NoClientError(ProcessError):
"""Error related to client resolution."""
default_message: str = constants.ERROR_UNKNOWN_CLIENT_OR_SECRET

View File

@ -1,19 +1,18 @@
"""SSH Server implementation."""
from asyncio import _register_task
import logging
import asyncssh
import ipaddress
from collections.abc import Awaitable
from functools import partial
from pathlib import Path
from typing import Callable, cast, override
from typing import Callable, override
from pydantic import IPvAnyNetwork
from . import constants
from sshecret_sshd import constants
from sshecret_sshd.commands import dispatch_command
from sshecret.backend import SshecretBackend, Client, Operation, SubSystem
from .settings import ServerSettings, ClientRegistrationSettings
@ -29,37 +28,6 @@ PeernameV6 = tuple[str, int, int, int]
Peername = PeernameV4 | PeernameV6
class CommandError(Exception):
"""Error class for errors during command processing."""
async def audit_process(
backend: SshecretBackend,
process: asyncssh.SSHServerProcess[str],
operation: Operation,
message: str,
secret: str | None = None,
**data: str,
) -> None:
"""Add an audit event from process."""
command = get_process_command(process)
client = get_info_client(process)
username = get_info_username(process)
remote_ip = get_info_remote_ip(process) or "UNKNOWN"
if username:
data["username"] = username
if command and not secret:
cmd, cmd_args = command
if cmd:
data["command"] = cmd
data["args"] = " ".join(cmd_args)
await backend.audit(SubSystem.SSHD).write_async(
operation, message, remote_ip, client, secret=None, secret_name=secret, **data
)
async def audit_event(
backend: SshecretBackend,
message: str,
@ -87,250 +55,6 @@ def verify_key_input(public_key: str) -> str | None:
return None
def get_process_command(
process: asyncssh.SSHServerProcess[str],
) -> tuple[str | None, list[str]]:
"""Extract the process command."""
if not process.command:
return (None, [])
argv = process.command.split(" ")
LOG.debug("Args: %r", argv)
return (argv[0], argv[1:])
def get_info_backend(process: asyncssh.SSHServerProcess[str]) -> SshecretBackend | None:
"""Get backend from process."""
backend = cast("SshecretBackend | None", process.get_extra_info("backend", None))
return backend
def get_info_client(process: asyncssh.SSHServerProcess[str]) -> Client | None:
"""Get info from process."""
client = cast("Client | None", process.get_extra_info("client", None))
return client
def get_info_username(process: asyncssh.SSHServerProcess[str]) -> str | None:
"""Get username from process."""
username = cast("str | None", process.get_extra_info("provided_username", None))
return username
def get_info_remote_ip(process: asyncssh.SSHServerProcess[str]) -> str | None:
"""Get remote IP."""
peername = cast("Peername | None", process.get_extra_info("peername", None))
remote_ip: str | None = None
if peername:
remote_ip = peername[0]
return remote_ip
def get_info_allowed_registration(
process: asyncssh.SSHServerProcess[str],
) -> list[IPvAnyNetwork] | None:
"""Get allowed networks to allow registration from."""
allowed_registration = cast(
list[IPvAnyNetwork] | None,
process.get_extra_info("allow_registration_from", None),
)
return allowed_registration
def get_optional_commands(process: asyncssh.SSHServerProcess[str]) -> dict[str, bool]:
"""Get optional command state."""
with_registration = cast(
bool, process.get_extra_info("registration_enabled", False)
)
with_ping = cast(bool, process.get_extra_info("ping_enabled", False))
return {
"registration": with_registration,
"ping": with_ping,
}
async def get_stdin_public_key(process: asyncssh.SSHServerProcess[str]) -> str | None:
"""Get public key from stdin."""
process.stdout.write("Enter public key:\n")
public_key: str | None = None
try:
async for line in process.stdin:
public_key = verify_key_input(line.rstrip("\n"))
if public_key:
break
raise CommandError(constants.ERROR_INVALID_KEY_TYPE)
except asyncssh.BreakReceived:
pass
else:
process.stdout.write("OK\n")
return public_key
async def register_client(
process: asyncssh.SSHServerProcess[str],
backend: SshecretBackend,
username: str,
) -> None:
"""Register a new client."""
public_key = await get_stdin_public_key(process)
if not public_key:
raise CommandError(constants.ERROR_NO_PUBLIC_KEY)
key = asyncssh.import_public_key(public_key)
if key.algorithm.decode() != "ssh-rsa":
raise CommandError(constants.ERROR_INVALID_KEY_TYPE)
await audit_process(backend, process, Operation.CREATE, "Registering new client")
LOG.debug("Registering client %s with public key %s", username, public_key)
await backend.create_client(username, public_key)
async def get_secret(
backend: SshecretBackend,
client: Client,
secret_name: str,
origin: str,
) -> str:
"""Handle get secret requests from client."""
LOG.debug("Recieved command: get_secret %r", secret_name)
if not secret_name:
raise CommandError(constants.ERROR_UNKNOWN_CLIENT_OR_SECRET)
if secret_name not in client.secrets:
raise CommandError(constants.ERROR_NO_SECRET_FOUND)
await audit_event(
backend,
"Client requested secret",
operation=Operation.READ,
client=client,
origin=origin,
secret=secret_name,
)
# Look up secret
try:
secret = await backend.get_client_secret(client.name, secret_name)
if not secret:
raise CommandError(constants.ERROR_NO_SECRET_FOUND)
return secret
except Exception as exc:
LOG.debug(exc, exc_info=True)
raise CommandError(constants.ERROR_BACKEND_ERROR) from exc
async def dispatch_no_cmd(process: asyncssh.SSHServerProcess[str]) -> None:
"""Dispatch for no command."""
raise CommandError(constants.ERROR_NO_COMMAND_RECEIVED)
async def dispatch_cmd_ping(process: asyncssh.SSHServerProcess[str]) -> None:
"""Dispatch the ping command."""
process.stdout.write("PONG\n")
async def dispatch_cmd_register(process: asyncssh.SSHServerProcess[str]) -> None:
"""Dispatch the register command."""
backend = get_info_backend(process)
if not backend:
raise CommandError(constants.ERROR_INFO_BACKEND_GONE)
username = get_info_username(process)
if not username:
raise CommandError(constants.ERROR_INFO_USERNAME_GONE)
allowed_networks = get_info_allowed_registration(process)
if not allowed_networks:
process.stdout.write("Unauthorized.\n")
await audit_process(
backend,
process,
Operation.DENY,
"Received registration command, but no subnets are allowed.",
)
return
remote_ip = get_info_remote_ip(process)
if not remote_ip:
raise CommandError(constants.ERROR_INFO_REMOTE_IP_GONE)
client_address = ipaddress.ip_address(remote_ip)
for network in allowed_networks:
if client_address in network:
break
else:
await audit_process(
backend,
process,
Operation.DENY,
"Received registration command from unauthorized subnet.",
)
process.stdout.write("Unauthorized.\n")
return
await register_client(process, backend, username)
process.stdout.write("Client registered\n.")
async def dispatch_cmd_get_secret(process: asyncssh.SSHServerProcess[str]) -> None:
"""Dispatch the get_secret command."""
backend = get_info_backend(process)
if not backend:
raise CommandError(constants.ERROR_INFO_BACKEND_GONE)
client = get_info_client(process)
if not client:
raise CommandError(constants.ERROR_UNKNOWN_CLIENT_OR_SECRET)
_cmd, args = get_process_command(process)
if not args:
raise CommandError(constants.ERROR_UNKNOWN_CLIENT_OR_SECRET)
secret_name = args[0]
origin = get_info_remote_ip(process) or "Unknown"
secret = await get_secret(backend, client, secret_name, origin)
process.stdout.write(secret)
async def dispatch_command(process: asyncssh.SSHServerProcess[str]) -> None:
"""Dispatch command."""
command, _args = get_process_command(process)
if not command:
process.stderr.write(constants.ERROR_NO_COMMAND_RECEIVED)
process.exit(1)
return
cmdmap: dict[str, CommandDispatch] = {
"get_secret": dispatch_cmd_get_secret,
}
extra_commands = get_optional_commands(process)
if "registration" in extra_commands:
cmdmap["register"] = dispatch_cmd_register
if "ping" in extra_commands:
cmdmap["ping"] = dispatch_cmd_ping
if command not in cmdmap:
process.stderr.write(constants.ERROR_UNKNOWN_COMMAND)
process.exit(1)
return
exit_code = 0
try:
dispatcher = cmdmap[command]
await dispatcher(process)
except CommandError as e:
process.stderr.write(str(e))
exit_code = 1
except Exception as e:
LOG.debug(e, exc_info=True)
process.stderr.write("Unexpected exception:\n")
process.stderr.write(str(e))
exit_code = 1
LOG.debug("Command processing finished.")
process.exit(exit_code)
class AsshyncServer(asyncssh.SSHServer):
"""Asynchronous SSH server implementation."""

View File

@ -2,12 +2,12 @@
fmt = "ruff format ${PWD}"
lint = "ruff check --fix ${PWD}"
check = "basedpyright ${PWD}"
test = "pytest ${PWD}"
test = "pytest ${PWD} --alluredir allure-results --driver chrome"
all = [ {ref="fmt"}, {ref="lint"}, {ref="check"}, {ref="test"} ]
"ci:fmt" = "ruff format --check ${PWD}" # fail if not formatted
"ci:lint" = "ruff check ${PWD}"
[tool.poe.tasks.coverage]
cmd = "pytest --cov-config=${PWD}/.coveragerc --cov --cov-report=html --cov-report=term-missing"
cmd = "pytest --ignore ${POE_PWD}/tests/integration/frontend --cov-config=${PWD}/.coveragerc --cov --cov-report=html --cov-report=term-missing --alluredir allure-results"
cwd = "${POE_PWD}"
@ -32,6 +32,7 @@ dependencies = [
"pykeepass>=4.1.1.post1",
"pytest-asyncio>=0.26.0",
"pytest-cov>=6.1.1",
"pytest-selenium>=4.1.0",
"python-dotenv>=1.0.1",
"python-json-logger>=3.3.0",
]
@ -65,9 +66,13 @@ dev = [
"python-dotenv>=1.0.1",
]
test = [
"allure-pytest>=2.14.2",
"coverage>=7.8.0",
"pytest>=8.3.5",
"pytest-asyncio>=0.26.0",
"pytest-cov>=6.1.1",
"pytest-selenium>=4.1.0",
"requests>=2.32.3",
"robotframework>=7.2.2",
"selenium>=4.32.0",
]

15
tests/conftest.py Normal file
View File

@ -0,0 +1,15 @@
"""Capture screenshots on failure."""
import allure
import base64
from allure_commons.types import AttachmentType
def pytest_selenium_capture_debug(item, report, extra):
for log_type in extra:
if log_type["name"] == "Screenshot":
content = base64.b64decode(log_type["content"].encode("utf-8"))
allure.attach(
content,
name="Screenshot on failure",
attachment_type=AttachmentType.PNG,
)

View File

@ -0,0 +1 @@

129
tests/frontend/conftest.py Normal file
View File

@ -0,0 +1,129 @@
"""Test fixtures for the frontend."""
import asyncio
import secrets
import tempfile
import threading
import time
from pathlib import Path
import pytest
import requests
import uvicorn
from sshecret_admin.core.app import create_admin_app
from sshecret_admin.core.settings import AdminServerSettings
from sshecret_backend.app import create_backend_app
from sshecret_backend.settings import BackendSettings
from sshecret_backend.testing import create_test_token
from tests.helpers import create_test_admin_user, in_tempdir
from tests.types import PortFactory, TestPorts
@pytest.fixture(name="ui_test_ports", scope="session")
def generate_test_ports(unused_tcp_port_factory: PortFactory) -> TestPorts:
"""Generate the test ports."""
test_ports = TestPorts(
backend=unused_tcp_port_factory(),
admin=unused_tcp_port_factory(),
sshd=unused_tcp_port_factory(),
)
print(f"{test_ports=!r}")
return test_ports
@pytest.fixture(scope="function", name="ui_backend_server")
def run_backend_server(ui_test_ports: TestPorts):
"""Run the backend server in a thread."""
port = ui_test_ports.backend
with tempfile.TemporaryDirectory() as tmp_dir:
backend_work_path = Path(tmp_dir)
db_file = backend_work_path / "backend.db"
backend_settings = BackendSettings(database=str(db_file.absolute()))
backend_app = create_backend_app(backend_settings)
token = create_test_token(backend_settings)
config = uvicorn.Config(
app=backend_app, port=port, host="127.0.0.1", log_level="warning"
)
server = uvicorn.Server(config)
def run():
asyncio.run(server.serve())
thread = threading.Thread(target=run)
thread.start()
backend_url = f"http://127.0.0.1:{port}"
for _ in range(30):
try:
r = requests.get(backend_url)
if r.status_code < 500:
break
except Exception:
pass
time.sleep(1)
else:
raise RuntimeError("Backend server did not start in time")
yield backend_url, token
server.should_exit = True
thread.join()
@pytest.fixture(scope="function", name="ui_admin_server")
def run_admin_server(ui_test_ports: TestPorts, ui_backend_server: tuple[str, str]):
"""Run the admin server in a thread."""
backend_url, backend_token = ui_backend_server
port = ui_test_ports.admin
secret_key = secrets.token_urlsafe(32)
with in_tempdir() as admin_work_path:
admin_db = admin_work_path / "ssh_admin.db"
admin_settings = AdminServerSettings.model_validate(
{
"sshecret_backend_url": backend_url,
"backend_token": backend_token,
"secret_key": secret_key,
"listen_address": "127.0.0.1",
"port": port,
"database": str(admin_db.absolute()),
"password_manager_directory": str(admin_work_path.absolute()),
}
)
admin_app = create_admin_app(admin_settings)
config = uvicorn.Config(
app=admin_app, port=port, host="127.0.0.1", log_level="warning"
)
server = uvicorn.Server(config)
def run():
asyncio.run(server.serve())
thread = threading.Thread(target=run)
thread.start()
admin_url = f"http://127.0.0.1:{port}"
admin_password = secrets.token_urlsafe(10)
create_test_admin_user(admin_settings, "test", admin_password)
for _ in range(30):
try:
r = requests.get(admin_url)
if r.status_code < 500:
break
except Exception:
pass
time.sleep(1)
else:
raise RuntimeError("Admin server did not start in time")
yield admin_url, ("test", admin_password)
server.should_exit = True
thread.join()

View File

@ -0,0 +1 @@

View File

@ -0,0 +1,30 @@
"""Auth helpers."""
from selenium.webdriver.common.by import By
from selenium.webdriver.remote.webdriver import WebDriver
from tests.integration.types import AdminServer
from .wait_helpers import wait_until_url_contains
def login(ui_admin_server: AdminServer, driver: WebDriver) -> WebDriver:
"""Log in."""
admin_url, credentials = ui_admin_server
username, password = credentials
driver.get(admin_url + "/login")
username_input = driver.find_element(By.NAME, "username")
password_input = driver.find_element(By.NAME, "password")
submit_button = driver.find_element(By.XPATH, "//button[@type='submit']")
assert username_input is not None
assert password_input is not None
assert submit_button.text.lower() == "sign in"
username_input.clear()
username_input.send_keys(username)
password_input.send_keys(password)
submit_button.click()
wait_until_url_contains(driver, "/dashboard")
return driver

View File

@ -0,0 +1,66 @@
"""Database helpers.
Allows pre-loading database for tests.
"""
from collections.abc import Iterator
from contextlib import contextmanager
import httpx
from sshecret.crypto import generate_private_key, generate_public_key_string
class DatabasePreloader:
"""Database preloader class."""
def __init__(self, admin_url: str, username: str, password: str) -> None:
"""Instantiate class to populate database."""
self.admin_url: str = admin_url
self.username: str = username
self.password: str = password
@contextmanager
def login(self) -> Iterator[httpx.Client]:
"""Login and yield client."""
login_client = httpx.Client(base_url=self.admin_url)
resp = login_client.post(
"api/v1/token",
data={"username": self.username, "password": self.password}
)
assert resp.status_code == 200
data = resp.json()
token = data["access_token"]
headers = {"Authorization": f"Bearer {token}"}
with httpx.Client(base_url=self.admin_url, headers=headers) as client:
yield client
def create_client(self, *names: str) -> None:
"""Create one or more clients."""
with self.login() as http_client:
for name in names:
private_key = generate_private_key()
public_key = generate_public_key_string(private_key.public_key())
data = {
"name": name,
"description": "Test client",
"public_key": public_key,
"sources": ["0.0.0.0/0", "::/0"],
}
resp = http_client.post("api/v1/clients/", json=data)
resp.raise_for_status()
def create_secret(self, *secrets: tuple[str, list[str]]) -> None:
"""Create secret.
Argument format is (secret_name, [client1, client2, ...])
Clients must exist.
"""
with self.login() as http_client:
for name, clients in secrets:
data = {
"name": name,
"clients": clients,
"value": {"auto_generate": True, "length": 32}
}
http_client.post("api/v1/secrets/", json=data)

View File

@ -0,0 +1,63 @@
"""
Collection of waiting statements.
"""
from selenium.webdriver.remote.webdriver import WebDriver
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
def wait_for_url_change(driver: WebDriver, old_url: str, timeout: int = 10) -> None:
WebDriverWait(driver, timeout).until(lambda d: d.current_url != old_url)
def wait_until_url_contains(driver: WebDriver, text: str, timeout: int = 10) -> None:
WebDriverWait(driver, timeout).until(lambda d: text in d.current_url)
def wait_for_element(driver: WebDriver, by: str, value: str, timeout: int = 10):
return WebDriverWait(driver, timeout).until(
EC.presence_of_element_located((by, value))
)
def wait_for_element_with_text(
driver: WebDriver, tag: str, text: str, timeout: int = 10
):
return WebDriverWait(driver, timeout).until(
EC.text_to_be_present_in_element((By.TAG_NAME, tag), text)
)
def wait_for_clickable(driver: WebDriver, by: str, value: str, timeout: int = 10):
return WebDriverWait(driver, timeout).until(EC.element_to_be_clickable((by, value)))
def wait_for_element_to_be_visisble(driver: WebDriver, id: str, timeout: int = 10):
return WebDriverWait(driver, timeout).until(
EC.visibility_of_element_located((By.ID, id))
)
def wait_for_element_to_be_disabled(
driver: WebDriver, by: str, value: str, timeout: int = 10
):
"""Wait for an element to be disabled."""
return WebDriverWait(driver, timeout).until(
EC.none_of(EC.element_to_be_clickable((by, value)))
)
def wait_for_alert(driver: WebDriver, timeout: int = 10):
"""Wait for an alert."""
return WebDriverWait(driver, timeout).until(lambda d: d.switch_to.alert)
def wait_for_element_to_disappear(
driver: WebDriver, by: str, value: str, timeout: int = 10
):
"""Wait for an element to disappear."""
return WebDriverWait(driver, timeout).until(
EC.none_of(EC.presence_of_element_located((by, value)))
)

View File

@ -0,0 +1,217 @@
"""Tests for the client page."""
from selenium.webdriver.common.by import By
from selenium.webdriver.remote.webdriver import WebDriver
import pytest
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from sshecret.crypto import generate_private_key, generate_public_key_string
from tests.integration.types import AdminServer
from .helpers.auth import login
from .helpers import wait_helpers
@pytest.fixture
def client_page(ui_admin_server: AdminServer, driver: WebDriver) -> WebDriver:
"""Log in and show the client page."""
admin_url = ui_admin_server[0]
driver = login(ui_admin_server, driver)
driver.get(admin_url + "/clients")
return driver
class TestClientPage:
"""Test client page."""
def test_client_page_loaded(self, client_page: WebDriver) -> None:
"""Test that the client page loads."""
# Ensure that the create client button is present
client_page.refresh()
create_client_button = client_page.find_element(By.ID, "createClientButton")
assert create_client_button is not None
# Ensure that the table is loaded
client_table = client_page.find_element(By.ID, "clientListTable")
assert client_table is not None
def test_create_client_button(self, client_page: WebDriver) -> None:
"""Test that the Create Client button works."""
client_page.refresh()
create_client_button = client_page.find_element(By.ID, "createClientButton")
assert create_client_button is not None
create_client_button.click()
wait_helpers.wait_for_element_to_be_visisble(client_page, "drawer-create-client-default")
add_client_button = client_page.find_element(By.XPATH, "//button[@type='submit']")
assert add_client_button.text.lower() == "add client"
def test_create_client(self, client_page: WebDriver) -> None:
"""Test create clients."""
client_page.refresh()
private_key = generate_private_key()
public_key = generate_public_key_string(private_key.public_key())
create_client_button = client_page.find_element(By.ID, "createClientButton")
assert create_client_button is not None
create_client_button.click()
wait_helpers.wait_for_element_to_be_visisble(client_page, "drawer-create-client-default")
drawer = client_page.find_element(By.ID, "drawer-create-client-default")
assert drawer is not None
name_input = drawer.find_element(By.NAME, "name")
assert name_input is not None
description_input = drawer.find_element(By.NAME, "description")
assert description_input is not None
sources_input = drawer.find_element(By.NAME, "sources")
assert sources_input is not None
public_key_input = drawer.find_element(By.NAME, "public_key")
assert public_key_input is not None
name_input.send_keys("testuser")
description_input.send_keys("Test")
sources_input.clear()
sources_input.send_keys("0.0.0.0/0, ::/0")
public_key_input.send_keys(public_key)
validation_field = drawer.find_element(By.ID, "clientPublicKeyValidation")
assert validation_field is not None
error_message = validation_field.find_elements(By.TAG_NAME, "p")
assert len(error_message) == 0
# Submit the request
add_client_button = drawer.find_element(By.XPATH, "//button[@type='submit']")
assert add_client_button.text.lower() == "add client"
add_client_button.click()
client_appeared = wait_helpers.wait_for_element_with_text(client_page, "td", "testuser")
assert client_appeared is not False
def test_delete_client(self, client_page: WebDriver) -> None:
"""Test deletion of a test client."""
self.test_create_client(client_page)
client_page.refresh()
client_field = client_page.find_element(By.XPATH, "//tr/td[contains(text(), 'testuser')]")
assert client_field is not None
row = client_field.find_element(By.XPATH, "./..")
assert row is not None
assert row.tag_name == "tr"
row_id = row.get_attribute("id")
assert row_id is not None
print(row_id)
client_id = row_id[7:]
wait_helpers.wait_for_element_to_be_visisble(client_page, row_id)
delete_button = client_page.find_element(By.ID, f"deleteClientButton-{client_id}")
assert delete_button is not None
delete_button.click()
drawer_name = f"drawer-delete-{row_id}"
wait_helpers.wait_for_element_to_be_visisble(client_page, drawer_name)
drawer = client_page.find_element(By.ID, drawer_name)
assert drawer is not None
confirm_button = drawer.find_element(By.XPATH, "//button[contains(text(), 'Yes, delete the client')]")
assert confirm_button is not None
confirm_button.click()
WebDriverWait(client_page, 10).until(
EC.none_of(
EC.presence_of_element_located((By.ID, row_id))
)
)
def test_update_client(self, client_page: WebDriver) -> None:
"""Test updating a client."""
self.test_create_client(client_page)
client_page.refresh()
client_field = client_page.find_element(By.XPATH, "//tr/td[contains(text(), 'testuser')]")
assert client_field is not None
row = client_field.find_element(By.XPATH, "./..")
assert row is not None
assert row.tag_name == "tr"
row_id = row.get_attribute("id")
assert row_id is not None
print(row_id)
client_id = row_id[7:]
wait_helpers.wait_for_element_to_be_visisble(client_page, row_id)
update_button = client_page.find_element(By.ID, f"updateClientButton-{client_id}")
assert update_button is not None
update_button.click()
drawer_name = f"drawer-update-{row_id}"
wait_helpers.wait_for_element_to_be_visisble(client_page, drawer_name)
drawer = client_page.find_element(By.ID, drawer_name)
description_input = drawer.find_element(By.NAME, "description")
description_input.clear()
description_input.send_keys("New Description")
confirm_button = drawer.find_element(By.XPATH, "//button[contains(text(), 'Update')]")
assert confirm_button is not None
confirm_button.click()
WebDriverWait(client_page, 10).until(
EC.text_to_be_present_in_element((By.XPATH, f"//tr[@id='{row_id}']/td[3]"), "New Description")
)
def test_delete_from_update_client(self, client_page: WebDriver) -> None:
"""Test updating a client."""
self.test_create_client(client_page)
client_page.refresh()
client_field = client_page.find_element(By.XPATH, "//tr/td[contains(text(), 'testuser')]")
assert client_field is not None
row = client_field.find_element(By.XPATH, "./..")
assert row is not None
assert row.tag_name == "tr"
row_id = row.get_attribute("id")
assert row_id is not None
print(row_id)
client_id = row_id[7:]
wait_helpers.wait_for_element_to_be_visisble(client_page, row_id)
update_button = client_page.find_element(By.ID, f"updateClientButton-{client_id}")
assert update_button is not None
update_button.click()
drawer_name = f"drawer-update-{row_id}"
wait_helpers.wait_for_element_to_be_visisble(client_page, drawer_name)
drawer = client_page.find_element(By.ID, drawer_name)
description_input = drawer.find_element(By.NAME, "description")
description_input.clear()
description_input.send_keys("New Description")
delete_button = drawer.find_element(By.ID, f"delete-button-{client_id}")
assert delete_button is not None
delete_button.click()
alert = WebDriverWait(client_page, 10).until(lambda d: d.switch_to.alert)
assert alert.text == "Are you sure?"
alert.accept()
WebDriverWait(client_page, 10).until(
EC.none_of(
EC.presence_of_element_located((By.ID, row_id))
)
)

View File

@ -0,0 +1,79 @@
"""Test of the dashboard landing page."""
import time
import allure
import pytest
from selenium.webdriver import ActionChains
from selenium.webdriver.common.by import By
from selenium.webdriver.remote.webdriver import WebDriver
from selenium.webdriver.support.select import Select
from tests.integration.types import AdminServer
from .helpers.auth import login
from .helpers.db import DatabasePreloader
from .helpers.wait_helpers import (
wait_for_element_to_be_visisble,
)
class TestDashboardPage:
"""Test dashboard page."""
@pytest.fixture(autouse=True)
def login(self, ui_admin_server: AdminServer, driver: WebDriver) -> None:
"""Login."""
login(ui_admin_server, driver)
@pytest.fixture(autouse=True)
def create_testdata(self, ui_admin_server: AdminServer) -> None:
"""Preload some test data."""
test_clients = ["client1", "client2", "client3"]
admin_url, (username, password) = ui_admin_server
db = DatabasePreloader(admin_url, username, password)
db.create_client(*test_clients)
secrets = [
("secret1", ["client1", "client2"]),
("secret2", ["client1"]),
("secret3", ["client3"]),
("secret4", ["client2"]),
]
db.create_secret(*secrets)
@allure.title("Test Dashboard view")
def test_dashboard_elements(self, driver: WebDriver) -> None:
"""Test elements on the dashboard."""
wait_for_element_to_be_visisble(driver, "dashboard-stats-panel")
stats_clients = driver.find_element(By.ID, "stats-client-count")
assert stats_clients.text == "3"
stats_secrets = driver.find_element(By.ID, "stats-secret-count")
assert stats_secrets.text == "4"
stats_audit = driver.find_element(By.ID, "stats-audit-count")
assert stats_audit.text.isdecimal()
assert int(stats_audit.text) > 0
# Check that there is at least one row in each audit table
login_table = driver.find_element(By.ID, "last-login-events")
login_table_rows = login_table.find_elements(By.XPATH, ".//tr")
assert len(login_table_rows) > 1
audit_table = driver.find_element(By.ID, "last-audit-events")
audit_table_rows = audit_table.find_elements(By.XPATH, ".//tr")
assert len(audit_table_rows) > 1
# Find a questionmark hover
login_info_btn = login_table_rows[-1].find_element(By.XPATH, "./td[1]//button")
login_info_target = login_info_btn.get_attribute("data-popover-target")
assert login_info_target is not None
ActionChains(driver).move_to_element(login_info_btn).perform()
wait_for_element_to_be_visisble(driver, login_info_target)
audit_info_btn = audit_table_rows[-1].find_element(By.XPATH, "./td[1]//button")
audit_info_target = audit_info_btn.get_attribute("data-popover-target")
assert audit_info_target is not None
ActionChains(driver).move_to_element(audit_info_btn).perform()
wait_for_element_to_be_visisble(driver, audit_info_target)

View File

@ -0,0 +1,23 @@
"""Tests for the login."""
from selenium.webdriver.remote.webdriver import WebDriver
from tests.integration.types import AdminServer
from .helpers.auth import login
from .helpers import wait_helpers
def test_login(ui_admin_server: AdminServer, driver: WebDriver) -> None:
"""Test login."""
driver = login(ui_admin_server, driver)
print(driver.current_url)
assert driver.current_url.endswith("/dashboard")
def test_logout(ui_admin_server: AdminServer, driver: WebDriver) -> None:
"""Test logout function."""
admin_url = ui_admin_server[0]
driver = login(ui_admin_server, driver)
assert driver.current_url.endswith("/dashboard")
driver.get(admin_url + "/logout")
wait_helpers.wait_until_url_contains(driver, "/login")
assert driver.current_url.endswith("/login")

View File

@ -0,0 +1,189 @@
"""Tests for the secrets page."""
import allure
import pytest
from selenium.webdriver.common.by import By
from selenium.webdriver.remote.webdriver import WebDriver
from selenium.webdriver.support.select import Select
from tests.integration.types import AdminServer
from .helpers.auth import login
from .helpers.db import DatabasePreloader
from .helpers.wait_helpers import (
wait_for_alert,
wait_for_element,
wait_for_element_to_be_disabled,
wait_for_element_to_be_visisble,
wait_for_element_to_disappear,
wait_for_element_with_text,
)
class TestSecretsPage:
"""Test secrets page."""
@pytest.fixture(autouse=True)
def login(self, ui_admin_server: AdminServer, driver: WebDriver) -> None:
"""Log in and navigate to secrets page.."""
admin_url = ui_admin_server[0]
driver = login(ui_admin_server, driver)
driver.get(admin_url + "/secrets")
@pytest.fixture(autouse=True)
def create_testdata(self, ui_admin_server: AdminServer) -> None:
"""Preload some test data."""
test_clients = ["client1", "client2", "client3"]
admin_url, (username, password) = ui_admin_server
db = DatabasePreloader(admin_url, username, password)
db.create_client(*test_clients)
@allure.title("Test secret creation")
@allure.description("Verify that secrets can be created")
def test_create_secret(self, driver: WebDriver) -> None:
"""Test creation of secrets."""
driver.refresh()
create_secret_button = driver.find_element(By.ID, "createSecretButton")
assert create_secret_button is not None
create_secret_button.click()
wait_for_element_to_be_visisble(driver, "drawer-create-secret-default")
drawer = driver.find_element(By.ID, "drawer-create-secret-default")
assert drawer is not None
name_input = drawer.find_element(By.NAME, "name")
value_input = drawer.find_element(By.NAME, "value")
client_select = drawer.find_element(By.NAME, "clients")
client_select_node = Select(client_select)
name_input.send_keys("testsecret")
value_input.send_keys("secret")
client_select_node.select_by_visible_text("client1")
add_secret_button = drawer.find_element(By.XPATH, "//button[@type='submit']")
add_secret_button.click()
client_appeared = wait_for_element_with_text(driver, "td", "testsecret")
assert client_appeared is not False
secret_name_field = driver.find_element(
By.XPATH, "//tr/td[contains(text(), 'testsecret')]"
)
secret_row = secret_name_field.find_element(By.XPATH, "./..")
client_field = secret_row.find_element(By.CLASS_NAME, "secret-client-list")
secret_client = client_field.find_element(
By.XPATH, "//span[contains(text(), 'client1')]"
)
assert secret_client is not None
@allure.title("Test auto-generating secrets")
@allure.description("Test creation of a secret with automatic value")
def test_auto_secret_creation(self, driver: WebDriver) -> None:
"""Test creation of secret with automatic value."""
driver.refresh()
create_secret_button = driver.find_element(By.ID, "createSecretButton")
assert create_secret_button is not None
create_secret_button.click()
wait_for_element_to_be_visisble(driver, "drawer-create-secret-default")
drawer = driver.find_element(By.ID, "drawer-create-secret-default")
assert drawer is not None
name_input = drawer.find_element(By.NAME, "name")
value_input = drawer.find_element(By.NAME, "value")
# The auto generate checkbox is obscured by a dynamic div.
# We find the label and its nested div
auto_generate_label = drawer.find_element(By.ID, "autoGenerateCheckboxLabel")
# find the first div
checkbox_div = auto_generate_label.find_element(By.TAG_NAME, "div")
checkbox_div.click()
wait_for_element_to_be_disabled(driver, By.NAME, "value")
client_select = drawer.find_element(By.NAME, "clients")
client_select_node = Select(client_select)
name_input.send_keys("autosecret")
client_select_node.deselect_all()
client_select_node.select_by_visible_text("client1")
client_select_node.select_by_visible_text("client2")
add_secret_button = drawer.find_element(By.XPATH, "//button[@type='submit']")
add_secret_button.click()
client_appeared = wait_for_element_with_text(driver, "td", "autosecret")
secret_name_field = driver.find_element(
By.XPATH, "//tr/td[contains(text(), 'autosecret')]"
)
secret_row = secret_name_field.find_element(By.XPATH, "./..")
client_field = secret_row.find_element(By.CLASS_NAME, "secret-client-list")
secret_client1 = client_field.find_element(
By.XPATH, "//span[contains(text(), 'client1')]"
)
assert secret_client1 is not None
secret_client2 = client_field.find_element(
By.XPATH, "//span[contains(text(), 'client2')]"
)
assert secret_client2 is not None
@allure.title("Test manage client access")
def test_manage_client_access(self, driver: WebDriver) -> None:
"""Test the manage client access button."""
# Use the previous step to create a secret assigned to two clients.
self.test_auto_secret_creation(driver)
driver.refresh()
# Find the manage client access button
# btn_id = "client-secret-modal-autosecret"
manage_btn = driver.find_element(By.ID, "manage-client-access-btn-autosecret")
assert manage_btn is not None
manage_btn.click()
wait_for_element_to_be_visisble(driver, "client-secret-modal-autosecret")
modal = driver.find_element(By.ID, "client-secret-modal-autosecret")
assert modal is not None
client_pills = modal.find_elements(By.CLASS_NAME, "pill-client-secret")
assert len(client_pills) == 2
# Remove client1
remove_pill_btn_id = "btn-remove-client-client1-secret-autosecret"
remove_btn = modal.find_element(By.ID, remove_pill_btn_id)
assert remove_btn is not None
remove_btn.click()
alert = wait_for_alert(driver)
alert.accept()
# Wait for the client pill to disappear.
wait_for_element_to_disappear(
driver,
By.XPATH,
"//td[@id='secret-client-list-autosecret']/span[contains(text(), 'client1')]",
)
# Add a different client.
client_select_field = modal.find_element(By.NAME, "client")
assert client_select_field is not None
assert client_select_field.tag_name == "select"
client_select = Select(client_select_field)
client_select.select_by_visible_text("client3")
give_access = modal.find_element(By.XPATH, "//button[@type='submit']")
assert "give access" in give_access.text.lower()
give_access.click()
wait_for_element(driver, By.ID, "client-secret-autosecret-pill-client3")
@allure.title("Test secret deletion")
def test_delete_secret(self, driver: WebDriver) -> None:
"""Test deleting a secret."""
self.test_auto_secret_creation(driver)
driver.refresh()
delete_btn_id = "delete-secret-btn-autosecret"
delete_btn = driver.find_element(By.ID, delete_btn_id)
delete_btn.click()
alert = wait_for_alert(driver)
alert.accept()
wait_for_element_to_disappear(
driver, By.XPATH, "//td[contains(text(), 'autosecret')]"
)

View File

@ -34,8 +34,8 @@ from sshecret_sshd.settings import ServerSettings
from sshecret_sshd.ssh_server import start_sshecret_sshd
from .clients import ClientData
from .helpers import create_sshd_server_key, create_test_admin_user, in_tempdir
from .types import PortFactory, TestPorts
from tests.helpers import create_sshd_server_key, create_test_admin_user, in_tempdir
from tests.types import PortFactory, TestPorts
TEST_SCOPE = "function"
LOOP_SCOPE = "function"
@ -92,7 +92,7 @@ async def run_admin_server(test_ports: TestPorts, backend_server: tuple[str, str
"sshecret_backend_url": backend_url,
"backend_token": backend_token,
"secret_key": secret_key,
"listen_address": "127.0.0.1",
"listen_address": "0.0.0.0",
"port": port,
"database": str(admin_db.absolute()),
"password_manager_directory": str(admin_work_path.absolute()),

View File

@ -2,10 +2,13 @@
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
import allure
import pytest
import httpx
from allure_commons.types import Severity
from sshecret.backend import Client
from sshecret.crypto import generate_private_key, generate_public_key_string
@ -70,9 +73,12 @@ class BaseAdminTests:
return client
@allure.title("Admin API")
class TestAdminAPI(BaseAdminTests):
"""Tests of the Admin REST API."""
@allure.title("Test health test endpoint")
@allure.severity(Severity.TRIVIAL)
@pytest.mark.asyncio
async def test_health_check(
self, admin_server: tuple[str, tuple[str, str]]
@ -82,6 +88,8 @@ class TestAdminAPI(BaseAdminTests):
resp = await client.get("/health")
assert resp.status_code == 200
@allure.title("Test login over API")
@allure.severity(Severity.BLOCKER)
@pytest.mark.asyncio
async def test_admin_login(self, admin_server: AdminServer) -> None:
"""Test admin login."""
@ -95,9 +103,12 @@ class TestAdminAPI(BaseAdminTests):
assert resp.status_code == 200
@allure.title("Admin API Client API")
class TestAdminApiClients(BaseAdminTests):
"""Test client routes."""
@allure.title("Test creating a client")
@allure.description("Ensure we can create a new client.")
@pytest.mark.asyncio
async def test_create_client(self, admin_server: AdminServer) -> None:
"""Test create_client."""
@ -106,6 +117,8 @@ class TestAdminApiClients(BaseAdminTests):
assert client.id is not None
assert client.name == "testclient"
@allure.title("Test reading clients")
@allure.description("Ensure we can retrieve a list of current clients.")
@pytest.mark.asyncio
async def test_get_clients(self, admin_server: AdminServer) -> None:
"""Test get_clients."""
@ -124,6 +137,8 @@ class TestAdminApiClients(BaseAdminTests):
client_name = entry.get("name")
assert client_name in client_names
@allure.title("Test client deletion")
@allure.description("Ensure we can delete a client.")
@pytest.mark.asyncio
async def test_delete_client(self, admin_server: AdminServer) -> None:
"""Test delete_client."""
@ -146,9 +161,12 @@ class TestAdminApiClients(BaseAdminTests):
assert len(data) == 0
@allure.title("Test secret management")
class TestAdminApiSecrets(BaseAdminTests):
"""Test secret management."""
@allure.title("Test adding a secret")
@allure.description("Ensure that we can add a secret to a client.")
@pytest.mark.asyncio
async def test_add_secret(self, admin_server: AdminServer) -> None:
"""Test add_secret."""
@ -162,6 +180,8 @@ class TestAdminApiSecrets(BaseAdminTests):
resp = await http_client.post("api/v1/secrets/", json=data)
assert resp.status_code == 200
@allure.title("Test read a secret")
@allure.description("Ensure that we can retrieve a secret we have stored.")
@pytest.mark.asyncio
async def test_get_secret(self, admin_server: AdminServer) -> None:
"""Test get_secret."""
@ -175,6 +195,8 @@ class TestAdminApiSecrets(BaseAdminTests):
assert data["secret"] == "secretstring"
assert "testclient" in data["clients"]
@allure.title("Test adding a secret with automatic value")
@allure.description("Test that we can add a secret where we let the system come up with the value of a given length.")
@pytest.mark.asyncio
async def test_add_secret_auto(self, admin_server: AdminServer) -> None:
"""Test adding a secret with an auto-generated value."""
@ -195,6 +217,8 @@ class TestAdminApiSecrets(BaseAdminTests):
assert len(data["secret"]) == 17
assert "testclient" in data["clients"]
@allure.title("Test updating a secret")
@allure.description("Test that we can update the value of a stored secret.")
@pytest.mark.asyncio
async def test_update_secret(self, admin_server: AdminServer) -> None:
"""Test updating secrets."""

View File

@ -69,8 +69,8 @@ class TestSshd:
assert found is True
session.stdin.write(test_client.public_key + "\n")
result = await session.stdout.readline()
assert "OK" in result
result = await session.stdout.read()
assert "Key is valid. Registering client." in result
await session.wait()
return test_client

View File

@ -2,25 +2,13 @@
import asyncssh
from typing import Any, AsyncContextManager, Protocol
from dataclasses import dataclass
from collections.abc import Callable, Awaitable
from .clients import ClientData
PortFactory = Callable[[], int]
AdminServer = tuple[str, tuple[str, str]]
@dataclass
class TestPorts:
"""Test port dataclass."""
backend: int
admin: int
sshd: int
CommandRunner = Callable[[ClientData, str], Awaitable[asyncssh.SSHCompletedProcess]]
class ProcessRunner(Protocol):

View File

@ -82,10 +82,13 @@ async def mock_backend(client_registry: ClientRegistry) -> MagicMock:
"Error, must have a client called template for this to work."
)
clients_data[name] = clients_data["template"]
template_secrets: dict[str, str] = {}
for secret_key, secret in secrets_data.items():
s_client, secret_name = secret_key
if s_client != "template":
continue
template_secrets[secret_name] = secret
for secret_name, secret in template_secrets.items():
secrets_data[(name, secret_name)] = secret
async def write_audit(*args, **kwargs):

View File

@ -83,8 +83,9 @@ class TestRegistrationErrors(BaseSshTests):
output = await process.stdout.readline()
assert "Enter public key" in output
stdout, stderr = await process.communicate(public_key)
assert isinstance(stderr, str)
print(f"{stdout=!r}, {stderr=!r}")
assert stderr == "Error: Invalid key type: Only RSA keys are supported."
assert stderr.rstrip() == "Error: Invalid key type: Only RSA keys are supported."
result = await process.wait()
assert result.exit_status == 1
@ -102,8 +103,9 @@ class TestRegistrationErrors(BaseSshTests):
output = await process.stdout.readline()
assert "Enter public key" in output
stdout, stderr = await process.communicate(public_key)
assert isinstance(stderr, str)
print(f"{stdout=!r}, {stderr=!r}")
assert stderr == "Error: Invalid key type: Only RSA keys are supported."
assert stderr.rstrip() == "Error: Invalid key type: Only RSA keys are supported."
result = await process.wait()
assert result.exit_status == 1
@ -122,7 +124,8 @@ class TestCommandErrors(BaseSshTests):
assert result.exit_status == 1
stderr = result.stderr or ""
assert stderr == "Error: Unsupported command."
assert isinstance(stderr, str)
assert stderr.rstrip() == "Error: Unsupported command."
@pytest.mark.asyncio
async def test_no_command(
@ -136,7 +139,8 @@ class TestCommandErrors(BaseSshTests):
async with conn.create_process() as process:
stdout, stderr = await process.communicate()
print(f"{stdout=!r}, {stderr=!r}")
assert stderr == "Error: No command was received from the client."
assert isinstance(stderr, str)
assert stderr.rstrip() == "Error: No command was received from the client."
result = await process.wait()
assert result.exit_status == 1

View File

@ -1,10 +1,12 @@
"""Test get secret."""
import allure
import pytest
from .types import ClientRegistry, CommandRunner
@allure.title("Test get_secret command")
@pytest.mark.asyncio
async def test_get_secret(
ssh_command_runner: CommandRunner, client_registry: ClientRegistry
@ -19,7 +21,7 @@ async def test_get_secret(
assert isinstance(result.stdout, str)
assert result.stdout.rstrip() == "mocked-secret-mysecret"
@allure.title("Test with invalid secret name")
@pytest.mark.asyncio
async def test_invalid_secret_name(
ssh_command_runner: CommandRunner, client_registry: ClientRegistry
@ -30,4 +32,25 @@ async def test_invalid_secret_name(
result = await ssh_command_runner("test-client", "get_secret mysecret")
assert result.exit_status == 1
assert result.stderr == "Error: No secret available with the given name."
stderr = result.stderr
assert isinstance(stderr, str)
assert stderr.rstrip() == "Error: No secret available with the given name."
@allure.title("Test get_secret command help")
@pytest.mark.asyncio
async def test_get_secret_cmd_help(ssh_command_runner: CommandRunner, client_registry: ClientRegistry) -> None:
"""Test running get_secret --help"""
await client_registry["add_client"]("test-client", ["mysecret"])
result = await ssh_command_runner("test-client", "get_secret --help")
assert result.exit_status == 0
print(result.stdout)
assert isinstance(result.stdout, str)
lines = result.stdout.splitlines()
assert lines[0] == "get_secret SECRET"
assert len(lines) > 4

View File

@ -1,8 +1,11 @@
"""Test for the ping command."""
import allure
import pytest
from .types import ClientRegistry, CommandRunner
@allure.title("Test running the ping command")
@pytest.mark.asyncio
async def test_ping_command(
ssh_command_runner: CommandRunner, client_registry: ClientRegistry
@ -16,3 +19,21 @@ async def test_ping_command(
assert result.stdout is not None
assert isinstance(result.stdout, str)
assert result.stdout.rstrip() == "PONG"
@allure.title("Test ping help")
@pytest.mark.asyncio
async def test_ping_cmd_help(ssh_command_runner: CommandRunner, client_registry: ClientRegistry) -> None:
"""Test running ping --help."""
await client_registry["add_client"]("test-client", ["mysecret"])
result = await ssh_command_runner("test-client", "ping --help")
assert result.exit_status == 0
print(result.stdout)
assert isinstance(result.stdout, str)
lines = result.stdout.splitlines()
assert lines[0] == "ping"
assert len(lines) > 4

View File

@ -1,10 +1,12 @@
"""Test registration."""
import allure
import pytest
from .types import ClientRegistry, CommandRunner, ProcessRunner
@allure.title("Test client registration")
@pytest.mark.enable_registration(True)
@pytest.mark.asyncio
async def test_register_client(
@ -29,8 +31,9 @@ async def test_register_client(
assert found is True
session.stdin.write(public_key)
result = await session.stdout.readline()
assert "OK" in result
data = await session.stdout.read()
assert isinstance(data, str)
assert "Key is valid. Registering client" in data
# Test that we can connect
@ -39,3 +42,28 @@ async def test_register_client(
assert result.stdout is not None
assert isinstance(result.stdout, str)
assert result.stdout.rstrip() == "mocked-secret-testsecret"
@allure.title("Test register command help")
@pytest.mark.enable_registration(True)
@pytest.mark.asyncio
async def test_register_cmd_help(ssh_command_runner: CommandRunner, client_registry: ClientRegistry) -> None:
"""Test running register --help"""
await client_registry["add_client"]("test-client", ["mysecret"])
result = await ssh_command_runner("test-client", "register --help")
assert result.exit_status == 0
print(result.stdout)
assert isinstance(result.stdout, str)
lines = result.stdout.splitlines()
assert lines[0] == "register"
assert len(lines) > 4
# TODO: Test running register with an existing client.

16
tests/types.py Normal file
View File

@ -0,0 +1,16 @@
"""Typings."""
from dataclasses import dataclass
from collections.abc import Callable
PortFactory = Callable[[], int]
@dataclass
class TestPorts:
"""Test port dataclass."""
backend: int
admin: int
sshd: int

275
uv.lock generated
View File

@ -23,6 +23,32 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/41/18/d89a443ed1ab9bcda16264716f809c663866d4ca8de218aa78fd50b38ead/alembic-1.15.2-py3-none-any.whl", hash = "sha256:2e76bd916d547f6900ec4bb5a90aeac1485d2c92536923d0b138c02b126edc53", size = 231911 },
]
[[package]]
name = "allure-pytest"
version = "2.14.2"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "allure-python-commons" },
{ name = "pytest" },
]
sdist = { url = "https://files.pythonhosted.org/packages/d6/f4/59d3d3ca7cbcdb5efae990072f6b4aafebff524237fa277c14daac8b84f8/allure_pytest-2.14.2.tar.gz", hash = "sha256:d387492178d27805863d95350bdc38b7feca3ed7165841997630fd4073cc9101", size = 17153 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/f3/bb/d6b28ee5cced087a171715945e578bd6f2c0a32e6bbbd3578fe701f7ae4c/allure_pytest-2.14.2-py3-none-any.whl", hash = "sha256:18f3baa9ebd1b6148223cfa898bacfc2794bb9446221adac1be71deeb26ed79a", size = 11673 },
]
[[package]]
name = "allure-python-commons"
version = "2.14.2"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "attrs" },
{ name = "pluggy" },
]
sdist = { url = "https://files.pythonhosted.org/packages/d0/79/53fe62ff56fa1f972b9fcbc46d04687786907cecc40b412421c4554d3734/allure_python_commons-2.14.2.tar.gz", hash = "sha256:7acdc4fe3efbe709604895e2393f082b2659d8e5653e77ff6367682e6e4a41bc", size = 15186 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/d5/b7/41b4323f65911b216a616087c37a8fd337d2cd92f7b8fa89bc35cb91cd1d/allure_python_commons-2.14.2-py3-none-any.whl", hash = "sha256:ad50385a4c601ec31c86eed773d8ccfdcc687fecbb6535c9768af3bf03b50a19", size = 16191 },
]
[[package]]
name = "annotated-types"
version = "0.7.0"
@ -91,6 +117,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/99/56/db25216aa7f385ec71fdc489af80812171515cddbe68c0e515e98a291390/asyncssh-2.21.0-py3-none-any.whl", hash = "sha256:cf7f3dfa52b2cb4ad31f0d77ff0d0a8fdd850203da84a0e72e62c36fdd4daf4b", size = 374919 },
]
[[package]]
name = "attrs"
version = "25.3.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/5a/b0/1367933a8532ee6ff8d63537de4f1177af4bff9f3e829baf7331f595bb24/attrs-25.3.0.tar.gz", hash = "sha256:75d7cefc7fb576747b2c81b4442d4d4a1ce0900973527c011d1030fd3bf4af1b", size = 812032 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/77/06/bb80f5f86020c4551da315d78b3ab75e8228f89f0162f2c3a819e407941a/attrs-25.3.0-py3-none-any.whl", hash = "sha256:427318ce031701fea540783410126f03899a97ffc6f61596ad581ac2e40e3bc3", size = 63815 },
]
[[package]]
name = "bcrypt"
version = "4.3.0"
@ -172,6 +207,28 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/7c/fc/6a8cb64e5f0324877d503c854da15d76c1e50eb722e320b15345c4d0c6de/cffi-1.17.1-cp313-cp313-win_amd64.whl", hash = "sha256:f6a16c31041f09ead72d69f583767292f750d24913dadacf5756b966aacb3f1a", size = 182009 },
]
[[package]]
name = "charset-normalizer"
version = "3.4.2"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/e4/33/89c2ced2b67d1c2a61c19c6751aa8902d46ce3dacb23600a283619f5a12d/charset_normalizer-3.4.2.tar.gz", hash = "sha256:5baececa9ecba31eff645232d59845c07aa030f0c81ee70184a90d35099a0e63", size = 126367 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/ea/12/a93df3366ed32db1d907d7593a94f1fe6293903e3e92967bebd6950ed12c/charset_normalizer-3.4.2-cp313-cp313-macosx_10_13_universal2.whl", hash = "sha256:926ca93accd5d36ccdabd803392ddc3e03e6d4cd1cf17deff3b989ab8e9dbcf0", size = 199622 },
{ url = "https://files.pythonhosted.org/packages/04/93/bf204e6f344c39d9937d3c13c8cd5bbfc266472e51fc8c07cb7f64fcd2de/charset_normalizer-3.4.2-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:eba9904b0f38a143592d9fc0e19e2df0fa2e41c3c3745554761c5f6447eedabf", size = 143435 },
{ url = "https://files.pythonhosted.org/packages/22/2a/ea8a2095b0bafa6c5b5a55ffdc2f924455233ee7b91c69b7edfcc9e02284/charset_normalizer-3.4.2-cp313-cp313-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:3fddb7e2c84ac87ac3a947cb4e66d143ca5863ef48e4a5ecb83bd48619e4634e", size = 153653 },
{ url = "https://files.pythonhosted.org/packages/b6/57/1b090ff183d13cef485dfbe272e2fe57622a76694061353c59da52c9a659/charset_normalizer-3.4.2-cp313-cp313-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:98f862da73774290f251b9df8d11161b6cf25b599a66baf087c1ffe340e9bfd1", size = 146231 },
{ url = "https://files.pythonhosted.org/packages/e2/28/ffc026b26f441fc67bd21ab7f03b313ab3fe46714a14b516f931abe1a2d8/charset_normalizer-3.4.2-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6c9379d65defcab82d07b2a9dfbfc2e95bc8fe0ebb1b176a3190230a3ef0e07c", size = 148243 },
{ url = "https://files.pythonhosted.org/packages/c0/0f/9abe9bd191629c33e69e47c6ef45ef99773320e9ad8e9cb08b8ab4a8d4cb/charset_normalizer-3.4.2-cp313-cp313-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:e635b87f01ebc977342e2697d05b56632f5f879a4f15955dfe8cef2448b51691", size = 150442 },
{ url = "https://files.pythonhosted.org/packages/67/7c/a123bbcedca91d5916c056407f89a7f5e8fdfce12ba825d7d6b9954a1a3c/charset_normalizer-3.4.2-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:1c95a1e2902a8b722868587c0e1184ad5c55631de5afc0eb96bc4b0d738092c0", size = 145147 },
{ url = "https://files.pythonhosted.org/packages/ec/fe/1ac556fa4899d967b83e9893788e86b6af4d83e4726511eaaad035e36595/charset_normalizer-3.4.2-cp313-cp313-musllinux_1_2_i686.whl", hash = "sha256:ef8de666d6179b009dce7bcb2ad4c4a779f113f12caf8dc77f0162c29d20490b", size = 153057 },
{ url = "https://files.pythonhosted.org/packages/2b/ff/acfc0b0a70b19e3e54febdd5301a98b72fa07635e56f24f60502e954c461/charset_normalizer-3.4.2-cp313-cp313-musllinux_1_2_ppc64le.whl", hash = "sha256:32fc0341d72e0f73f80acb0a2c94216bd704f4f0bce10aedea38f30502b271ff", size = 156454 },
{ url = "https://files.pythonhosted.org/packages/92/08/95b458ce9c740d0645feb0e96cea1f5ec946ea9c580a94adfe0b617f3573/charset_normalizer-3.4.2-cp313-cp313-musllinux_1_2_s390x.whl", hash = "sha256:289200a18fa698949d2b39c671c2cc7a24d44096784e76614899a7ccf2574b7b", size = 154174 },
{ url = "https://files.pythonhosted.org/packages/78/be/8392efc43487ac051eee6c36d5fbd63032d78f7728cb37aebcc98191f1ff/charset_normalizer-3.4.2-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:4a476b06fbcf359ad25d34a057b7219281286ae2477cc5ff5e3f70a246971148", size = 149166 },
{ url = "https://files.pythonhosted.org/packages/44/96/392abd49b094d30b91d9fbda6a69519e95802250b777841cf3bda8fe136c/charset_normalizer-3.4.2-cp313-cp313-win32.whl", hash = "sha256:aaeeb6a479c7667fbe1099af9617c83aaca22182d6cf8c53966491a0f1b7ffb7", size = 98064 },
{ url = "https://files.pythonhosted.org/packages/e9/b0/0200da600134e001d91851ddc797809e2fe0ea72de90e09bec5a2fbdaccb/charset_normalizer-3.4.2-cp313-cp313-win_amd64.whl", hash = "sha256:aa6af9e7d59f9c12b33ae4e9450619cf2488e2bbe9b44030905877f0b2324980", size = 105641 },
{ url = "https://files.pythonhosted.org/packages/20/94/c5790835a017658cbfabd07f3bfb549140c3ac458cfc196323996b10095a/charset_normalizer-3.4.2-py3-none-any.whl", hash = "sha256:7f56930ab0abd1c45cd15be65cc741c28b1c9a34876ce8c17a2fa107810c0af0", size = 52626 },
]
[[package]]
name = "click"
version = "8.2.0"
@ -574,6 +631,18 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/79/7b/2c79738432f5c924bef5071f933bcc9efd0473bac3b4aa584a6f7c1c8df8/mypy_extensions-1.1.0-py3-none-any.whl", hash = "sha256:1be4cccdb0f2482337c4743e60421de3a356cd97508abadd57d47403e94f5505", size = 4963 },
]
[[package]]
name = "outcome"
version = "1.3.0.post0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "attrs" },
]
sdist = { url = "https://files.pythonhosted.org/packages/98/df/77698abfac98571e65ffeb0c1fba8ffd692ab8458d617a0eed7d9a8d38f2/outcome-1.3.0.post0.tar.gz", hash = "sha256:9dcf02e65f2971b80047b377468e72a268e15c0af3cf1238e6ff14f7f91143b8", size = 21060 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/55/8b/5ab7257531a5d830fc8000c476e63c935488d74609b50f9384a643ec0a62/outcome-1.3.0.post0-py2.py3-none-any.whl", hash = "sha256:e771c5ce06d1415e356078d3bdd68523f284b4ce5419828922b6871e65eda82b", size = 10692 },
]
[[package]]
name = "packaging"
version = "25.0"
@ -755,6 +824,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/c3/c0/c33c8792c3e50193ef55adb95c1c3c2786fe281123291c2dbf0eaab95a6f/pyotp-2.9.0-py3-none-any.whl", hash = "sha256:81c2e5865b8ac55e825b0358e496e1d9387c811e85bb40e71a3b29b288963612", size = 13376 },
]
[[package]]
name = "pysocks"
version = "1.7.1"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/bd/11/293dd436aea955d45fc4e8a35b6ae7270f5b8e00b53cf6c024c83b657a11/PySocks-1.7.1.tar.gz", hash = "sha256:3f8804571ebe159c380ac6de37643bb4685970655d3bba243530d6558b799aa0", size = 284429 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/8d/59/b4572118e098ac8e46e399a1dd0f2d85403ce8bbaad9ec79373ed6badaf9/PySocks-1.7.1-py3-none-any.whl", hash = "sha256:2725bd0a9925919b9b51739eea5f9e2bae91e83288108a9ad338b2e3a4435ee5", size = 16725 },
]
[[package]]
name = "pytailwindcss"
version = "0.2.0"
@ -791,6 +869,19 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/20/7f/338843f449ace853647ace35870874f69a764d251872ed1b4de9f234822c/pytest_asyncio-0.26.0-py3-none-any.whl", hash = "sha256:7b51ed894f4fbea1340262bdae5135797ebbe21d8638978e35d31c6d19f72fb0", size = 19694 },
]
[[package]]
name = "pytest-base-url"
version = "2.1.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "pytest" },
{ name = "requests" },
]
sdist = { url = "https://files.pythonhosted.org/packages/ae/1a/b64ac368de6b993135cb70ca4e5d958a5c268094a3a2a4cac6f0021b6c4f/pytest_base_url-2.1.0.tar.gz", hash = "sha256:02748589a54f9e63fcbe62301d6b0496da0d10231b753e950c63e03aee745d45", size = 6702 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/98/1c/b00940ab9eb8ede7897443b771987f2f4a76f06be02f1b3f01eb7567e24a/pytest_base_url-2.1.0-py3-none-any.whl", hash = "sha256:3ad15611778764d451927b2a53240c1a7a591b521ea44cebfe45849d2d2812e6", size = 5302 },
]
[[package]]
name = "pytest-cov"
version = "6.1.1"
@ -804,6 +895,62 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/28/d0/def53b4a790cfb21483016430ed828f64830dd981ebe1089971cd10cab25/pytest_cov-6.1.1-py3-none-any.whl", hash = "sha256:bddf29ed2d0ab6f4df17b4c55b0a657287db8684af9c42ea546b21b1041b3dde", size = 23841 },
]
[[package]]
name = "pytest-html"
version = "4.1.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "jinja2" },
{ name = "pytest" },
{ name = "pytest-metadata" },
]
sdist = { url = "https://files.pythonhosted.org/packages/bb/ab/4862dcb5a8a514bd87747e06b8d55483c0c9e987e1b66972336946e49b49/pytest_html-4.1.1.tar.gz", hash = "sha256:70a01e8ae5800f4a074b56a4cb1025c8f4f9b038bba5fe31e3c98eb996686f07", size = 150773 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/c8/c7/c160021cbecd956cc1a6f79e5fe155f7868b2e5b848f1320dad0b3e3122f/pytest_html-4.1.1-py3-none-any.whl", hash = "sha256:c8152cea03bd4e9bee6d525573b67bbc6622967b72b9628dda0ea3e2a0b5dd71", size = 23491 },
]
[[package]]
name = "pytest-metadata"
version = "3.1.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "pytest" },
]
sdist = { url = "https://files.pythonhosted.org/packages/a6/85/8c969f8bec4e559f8f2b958a15229a35495f5b4ce499f6b865eac54b878d/pytest_metadata-3.1.1.tar.gz", hash = "sha256:d2a29b0355fbc03f168aa96d41ff88b1a3b44a3b02acbe491801c98a048017c8", size = 9952 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/3e/43/7e7b2ec865caa92f67b8f0e9231a798d102724ca4c0e1f414316be1c1ef2/pytest_metadata-3.1.1-py3-none-any.whl", hash = "sha256:c8e0844db684ee1c798cfa38908d20d67d0463ecb6137c72e91f418558dd5f4b", size = 11428 },
]
[[package]]
name = "pytest-selenium"
version = "4.1.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "pytest" },
{ name = "pytest-base-url" },
{ name = "pytest-html" },
{ name = "pytest-variables" },
{ name = "requests" },
{ name = "selenium" },
{ name = "tenacity" },
]
sdist = { url = "https://files.pythonhosted.org/packages/5a/a9/c275839e461fdde9ccaaf7ec46f67ad08dbd28bfebc5f8480f883d9da690/pytest_selenium-4.1.0.tar.gz", hash = "sha256:b0a4e1f27750cde631c513c87ae4863dcf9e180e5a1d680a66077da8a669156c", size = 41059 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/df/27/bac432528c2b20a382ef2423731c1bb8c6292ea5328e3522eccfa9bf0687/pytest_selenium-4.1.0-py3-none-any.whl", hash = "sha256:c6f2c18e91596d3ef360d74c450953767a15193879d3971296498151d1843c01", size = 24131 },
]
[[package]]
name = "pytest-variables"
version = "3.1.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "pytest" },
]
sdist = { url = "https://files.pythonhosted.org/packages/65/52/d756c9704a80119f2e8a84e418290f8bd1b7e1415c3417a1b9c13fcd2a87/pytest_variables-3.1.0.tar.gz", hash = "sha256:4719b07f0f6e5d07829b19284a99d9159543a2e0336311f7bc4ee3b1617f595d", size = 7420 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/4b/fe/30dbeccfeafa242b3c9577db059019022cd96db20942c4a74ef9361c5b3c/pytest_variables-3.1.0-py3-none-any.whl", hash = "sha256:4c864d2b7093f9053a2bed61e4b1d027bb26456924e637fcef2d1455d32732b1", size = 6070 },
]
[[package]]
name = "python-dotenv"
version = "1.1.0"
@ -848,6 +995,21 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/fa/de/02b54f42487e3d3c6efb3f89428677074ca7bf43aae402517bc7cca949f3/PyYAML-6.0.2-cp313-cp313-win_amd64.whl", hash = "sha256:8388ee1976c416731879ac16da0aff3f63b286ffdd57cdeb95f3f2e085687563", size = 156446 },
]
[[package]]
name = "requests"
version = "2.32.3"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "certifi" },
{ name = "charset-normalizer" },
{ name = "idna" },
{ name = "urllib3" },
]
sdist = { url = "https://files.pythonhosted.org/packages/63/70/2bf7780ad2d390a8d301ad0b550f1581eadbd9a20f896afe06353c2a2913/requests-2.32.3.tar.gz", hash = "sha256:55365417734eb18255590a9ff9eb97e9e1da868d4ccd6402399eaf68af20a760", size = 131218 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/f9/9b/335f9764261e915ed497fcdeb11df5dfd6f7bf257d4a6a2a686d80da4d54/requests-2.32.3-py3-none-any.whl", hash = "sha256:70761cfe03c773ceb22aa2f671b4757976145175cdfca038c02654d061d6dcc6", size = 64928 },
]
[[package]]
name = "rich"
version = "14.0.0"
@ -884,6 +1046,23 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/4a/9a/7498a0a40a32ba522840da2b8c0ecb4794114ec332992fda09a0733c25a0/robotframework-7.2.2-py3-none-any.whl", hash = "sha256:1cb4ec69d52aae515bf6037cee66a2a2d8dc3256368081c0f4b3d4578d40904e", size = 777676 },
]
[[package]]
name = "selenium"
version = "4.32.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "certifi" },
{ name = "trio" },
{ name = "trio-websocket" },
{ name = "typing-extensions" },
{ name = "urllib3", extra = ["socks"] },
{ name = "websocket-client" },
]
sdist = { url = "https://files.pythonhosted.org/packages/54/2d/fafffe946099033ccf22bf89e12eede14c1d3c5936110c5f6f2b9830722c/selenium-4.32.0.tar.gz", hash = "sha256:b9509bef4056f4083772abb1ae19ff57247d617a29255384b26be6956615b206", size = 870997 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/ea/37/d07ed9d13e571b2115d4ed6956d156c66816ceec0b03b2e463e80d09f572/selenium-4.32.0-py3-none-any.whl", hash = "sha256:c4d9613f8a45693d61530c9660560fadb52db7d730237bc788ddedf442391f97", size = 9369668 },
]
[[package]]
name = "shellingham"
version = "1.5.4"
@ -902,6 +1081,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/e9/44/75a9c9421471a6c4805dbf2356f7c181a29c1879239abab1ea2cc8f38b40/sniffio-1.3.1-py3-none-any.whl", hash = "sha256:2f6da418d1f1e0fddd844478f41680e794e6051915791a034ff65e5f100525a2", size = 10235 },
]
[[package]]
name = "sortedcontainers"
version = "2.4.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/e8/c4/ba2f8066cceb6f23394729afe52f3bf7adec04bf9ed2c820b39e19299111/sortedcontainers-2.4.0.tar.gz", hash = "sha256:25caa5a06cc30b6b83d11423433f65d1f9d76c4c6a0c90e3379eaa43b9bfdb88", size = 30594 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/32/46/9cb0e58b2deb7f82b84065f37f3bffeb12413f947f9388e4cac22c4621ce/sortedcontainers-2.4.0-py2.py3-none-any.whl", hash = "sha256:a163dcaede0f1c021485e957a39245190e74249897e2ae4b2aa38595db237ee0", size = 29575 },
]
[[package]]
name = "sqlalchemy"
version = "2.0.40"
@ -952,6 +1140,7 @@ dependencies = [
{ name = "pykeepass" },
{ name = "pytest-asyncio" },
{ name = "pytest-cov" },
{ name = "pytest-selenium" },
{ name = "python-dotenv" },
{ name = "python-json-logger" },
]
@ -964,11 +1153,15 @@ dev = [
{ name = "python-dotenv" },
]
test = [
{ name = "allure-pytest" },
{ name = "coverage" },
{ name = "pytest" },
{ name = "pytest-asyncio" },
{ name = "pytest-cov" },
{ name = "pytest-selenium" },
{ name = "requests" },
{ name = "robotframework" },
{ name = "selenium" },
]
[package.metadata]
@ -984,6 +1177,7 @@ requires-dist = [
{ name = "pykeepass", specifier = ">=4.1.1.post1" },
{ name = "pytest-asyncio", specifier = ">=0.26.0" },
{ name = "pytest-cov", specifier = ">=6.1.1" },
{ name = "pytest-selenium", specifier = ">=4.1.0" },
{ name = "python-dotenv", specifier = ">=1.0.1" },
{ name = "python-json-logger", specifier = ">=3.3.0" },
]
@ -996,11 +1190,15 @@ dev = [
{ name = "python-dotenv", specifier = ">=1.0.1" },
]
test = [
{ name = "allure-pytest", specifier = ">=2.14.2" },
{ name = "coverage", specifier = ">=7.8.0" },
{ name = "pytest", specifier = ">=8.3.5" },
{ name = "pytest-asyncio", specifier = ">=0.26.0" },
{ name = "pytest-cov", specifier = ">=6.1.1" },
{ name = "pytest-selenium", specifier = ">=4.1.0" },
{ name = "requests", specifier = ">=2.32.3" },
{ name = "robotframework", specifier = ">=7.2.2" },
{ name = "selenium", specifier = ">=4.32.0" },
]
[[package]]
@ -1085,6 +1283,7 @@ dependencies = [
{ name = "httpx" },
{ name = "pydantic" },
{ name = "python-dotenv" },
{ name = "rich" },
{ name = "sshecret" },
]
@ -1095,6 +1294,7 @@ requires-dist = [
{ name = "httpx", specifier = ">=0.28.1" },
{ name = "pydantic", specifier = ">=2.10.6" },
{ name = "python-dotenv", specifier = ">=1.0.1" },
{ name = "rich", specifier = ">=14.0.0" },
{ name = "sshecret", editable = "." },
]
@ -1110,6 +1310,46 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/8b/0c/9d30a4ebeb6db2b25a841afbb80f6ef9a854fc3b41be131d249a977b4959/starlette-0.46.2-py3-none-any.whl", hash = "sha256:595633ce89f8ffa71a015caed34a5b2dc1c0cdb3f0f1fbd1e69339cf2abeec35", size = 72037 },
]
[[package]]
name = "tenacity"
version = "9.1.2"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/0a/d4/2b0cd0fe285e14b36db076e78c93766ff1d529d70408bd1d2a5a84f1d929/tenacity-9.1.2.tar.gz", hash = "sha256:1169d376c297e7de388d18b4481760d478b0e99a777cad3a9c86e556f4b697cb", size = 48036 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/e5/30/643397144bfbfec6f6ef821f36f33e57d35946c44a2352d3c9f0ae847619/tenacity-9.1.2-py3-none-any.whl", hash = "sha256:f77bf36710d8b73a50b2dd155c97b870017ad21afe6ab300326b0371b3b05138", size = 28248 },
]
[[package]]
name = "trio"
version = "0.30.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "attrs" },
{ name = "cffi", marker = "implementation_name != 'pypy' and os_name == 'nt'" },
{ name = "idna" },
{ name = "outcome" },
{ name = "sniffio" },
{ name = "sortedcontainers" },
]
sdist = { url = "https://files.pythonhosted.org/packages/01/c1/68d582b4d3a1c1f8118e18042464bb12a7c1b75d64d75111b297687041e3/trio-0.30.0.tar.gz", hash = "sha256:0781c857c0c81f8f51e0089929a26b5bb63d57f927728a5586f7e36171f064df", size = 593776 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/69/8e/3f6dfda475ecd940e786defe6df6c500734e686c9cd0a0f8ef6821e9b2f2/trio-0.30.0-py3-none-any.whl", hash = "sha256:3bf4f06b8decf8d3cf00af85f40a89824669e2d033bb32469d34840edcfc22a5", size = 499194 },
]
[[package]]
name = "trio-websocket"
version = "0.12.2"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "outcome" },
{ name = "trio" },
{ name = "wsproto" },
]
sdist = { url = "https://files.pythonhosted.org/packages/d1/3c/8b4358e81f2f2cfe71b66a267f023a91db20a817b9425dd964873796980a/trio_websocket-0.12.2.tar.gz", hash = "sha256:22c72c436f3d1e264d0910a3951934798dcc5b00ae56fc4ee079d46c7cf20fae", size = 33549 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/c7/19/eb640a397bba49ba49ef9dbe2e7e5c04202ba045b6ce2ec36e9cadc51e04/trio_websocket-0.12.2-py3-none-any.whl", hash = "sha256:df605665f1db533f4a386c94525870851096a223adcb97f72a07e8b4beba45b6", size = 21221 },
]
[[package]]
name = "typer"
version = "0.15.3"
@ -1167,6 +1407,20 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/31/08/aa4fdfb71f7de5176385bd9e90852eaf6b5d622735020ad600f2bab54385/typing_inspection-0.4.0-py3-none-any.whl", hash = "sha256:50e72559fcd2a6367a19f7a7e610e6afcb9fac940c650290eed893d61386832f", size = 14125 },
]
[[package]]
name = "urllib3"
version = "2.4.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/8a/78/16493d9c386d8e60e442a35feac5e00f0913c0f4b7c217c11e8ec2ff53e0/urllib3-2.4.0.tar.gz", hash = "sha256:414bc6535b787febd7567804cc015fee39daab8ad86268f1310a9250697de466", size = 390672 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/6b/11/cc635220681e93a0183390e26485430ca2c7b5f9d33b15c74c2861cb8091/urllib3-2.4.0-py3-none-any.whl", hash = "sha256:4e16665048960a0900c702d4a66415956a584919c03361cac9f1df5c5dd7e813", size = 128680 },
]
[package.optional-dependencies]
socks = [
{ name = "pysocks" },
]
[[package]]
name = "uvicorn"
version = "0.34.2"
@ -1228,6 +1482,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/a8/b4/c57b99518fadf431f3ef47a610839e46e5f8abf9814f969859d1c65c02c7/watchfiles-1.0.5-cp313-cp313-win_amd64.whl", hash = "sha256:f436601594f15bf406518af922a89dcaab416568edb6f65c4e5bbbad1ea45c11", size = 291087 },
]
[[package]]
name = "websocket-client"
version = "1.8.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/e6/30/fba0d96b4b5fbf5948ed3f4681f7da2f9f64512e1d303f94b4cc174c24a5/websocket_client-1.8.0.tar.gz", hash = "sha256:3239df9f44da632f96012472805d40a23281a991027ce11d2f45a6f24ac4c3da", size = 54648 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/5a/84/44687a29792a70e111c5c477230a72c4b957d88d16141199bf9acb7537a3/websocket_client-1.8.0-py3-none-any.whl", hash = "sha256:17b44cc997f5c498e809b22cdf2d9c7a9e71c02c8cc2b6c56e7c2d1239bfa526", size = 58826 },
]
[[package]]
name = "websockets"
version = "15.0.1"
@ -1247,3 +1510,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/1b/6c/c65773d6cab416a64d191d6ee8a8b1c68a09970ea6909d16965d26bfed1e/websockets-15.0.1-cp313-cp313-win_amd64.whl", hash = "sha256:e09473f095a819042ecb2ab9465aee615bd9c2028e4ef7d933600a8401c79561", size = 176837 },
{ url = "https://files.pythonhosted.org/packages/fa/a8/5b41e0da817d64113292ab1f8247140aac61cbf6cfd085d6a0fa77f4984f/websockets-15.0.1-py3-none-any.whl", hash = "sha256:f7a866fbc1e97b5c617ee4116daaa09b722101d4a3c170c787450ba409f9736f", size = 169743 },
]
[[package]]
name = "wsproto"
version = "1.2.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "h11" },
]
sdist = { url = "https://files.pythonhosted.org/packages/c9/4a/44d3c295350d776427904d73c189e10aeae66d7f555bb2feee16d1e4ba5a/wsproto-1.2.0.tar.gz", hash = "sha256:ad565f26ecb92588a3e43bc3d96164de84cd9902482b130d0ddbaa9664a85065", size = 53425 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/78/58/e860788190eba3bcce367f74d29c4675466ce8dddfba85f7827588416f01/wsproto-1.2.0-py3-none-any.whl", hash = "sha256:b9acddd652b585d75b20477888c56642fdade28bdfd3579aa24a4d2c037dd736", size = 24226 },
]