Compare commits

...

10 Commits

Author SHA1 Message Date
0eaa913e35 Implement podman-compatible commands
All checks were successful
Build and push image / build-containers (push) Successful in 8m46s
2025-06-10 10:28:17 +02:00
782ec19137 Support unmanaged secrets 2025-06-09 18:04:58 +02:00
43d00cecb4 Preserve history when navigating the secrets page 2025-06-09 15:44:21 +02:00
d1fa6c0076 Implement secret deletion function 2025-06-09 14:15:32 +02:00
71d877022b Implement same ID type as backend API 2025-06-09 14:15:22 +02:00
36d04b8a33 Fix correct secrets API 2025-06-09 14:14:28 +02:00
a834339c13 Fix style 2025-06-09 09:16:12 +02:00
fb6b76f7d8 Fix group object access 2025-06-09 09:16:03 +02:00
fed441743e Keep distinction between Secret and DetailedSecret 2025-06-09 09:15:41 +02:00
d86d9a9256 Adapt admin api to use new key format
Filter out deleted an previous version in count

Remove todo comment

Allow explicit ID specification

Update tests
2025-06-09 08:57:59 +02:00
23 changed files with 832 additions and 293 deletions

View File

@ -5,7 +5,6 @@ import logging
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException, Query, Security, status
from sshecret.backend.models import Secret
from sshecret_admin.core.dependencies import AdminDependencies
from sshecret_admin.services import AdminBackend
from sshecret_admin.services.models import (
@ -13,6 +12,7 @@ from sshecret_admin.services.models import (
ClientSecretGroupList,
SecretCreate,
SecretGroupCreate,
SecretListView,
SecretUpdate,
SecretView,
)
@ -27,7 +27,7 @@ def create_router(dependencies: AdminDependencies) -> APIRouter:
@app.get("/secrets/")
async def get_secret_names(
admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
) -> list[Secret]:
) -> list[SecretListView]:
"""Get Secret Names."""
return await admin.get_secrets()
@ -94,7 +94,7 @@ def create_router(dependencies: AdminDependencies) -> APIRouter:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="No such group."
)
return results[0]
return results.groups[0]
@app.post("/secrets/groups/")
async def add_secret_group(
@ -125,7 +125,7 @@ def create_router(dependencies: AdminDependencies) -> APIRouter:
Entries within the group will be moved to the root.
This also includes nested entries further down from the group.
"""
group = await admin.get_secret_group(group.name)
group = await admin.get_secret_group(group_name)
if not group:
return
await admin.delete_secret_group(group_name, keep_entries=True)
@ -196,7 +196,7 @@ def create_router(dependencies: AdminDependencies) -> APIRouter:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="No such group."
)
group = groups[0]
group = groups.groups[0]
matching_entries = [
entry for entry in group.entries if entry.name == secret_name
]

View File

@ -4,6 +4,12 @@
class="tree-entry-item"
data-type="entry"
data-name="{{ entry.name }}"
data-group-path="/"
{% if secret | default(false) %}
{% if secret.name == entry.name %}
selected=""
{% endif %}
{% endif %}
>
<sl-icon name="shield"> </sl-icon>
<span class="px-2">{{ entry.name }}</span>
@ -16,6 +22,13 @@
class="secret-group-list-item"
data-type="group"
data-name="{{ group.group_name }}"
data-group-path="{{ group.path }}"
{% if group_path_nodes | default(false) %}
{% if group.group_name in group_path_nodes %}
expanded=""
{% endif %}
{% endif %}
>
<sl-icon name="folder"> </sl-icon>
<span class="px-2">{{ group.group_name }}</span>
@ -57,8 +70,8 @@
<div class="p-4 bg-white border border-gray-200 rounded-lg shadow-sm sm:flex dark:border-gray-700 sm:p-6 dark:bg-gray-800" id="secret-tree">
<div class="flex flex-col">
<div class="h-full">
<div class="flex flex-1 flex-col">
<div class="h-full w-full">
<sl-tree class="tree-with-icons">
<sl-tree-item
id="secret-group-root-item"
@ -81,7 +94,22 @@
</div>
<div class="2xl:col-span-2 xl:col-span-2 p-4 mb-4 bg-white border border-gray-200 rounded-lg shadow-sm 2xl:col-span-2 dark:border-gray-700 sm:p-6 dark:bg-gray-800">
{% if group_page | default(false) %}
<div class="w-full" id="secretdetails">
{% include '/secrets/partials/group_detail.html.j2' %}
</div>
{% elif root_group_page | default(false) %}
<div class="w-full" id="secretdetails">
{% include '/secrets/partials/edit_root.html.j2' %}
</div>
{% elif secret_page | default(false) %}
<div class="w-full" id="secretdetails">
{% include '/secrets/partials/tree_detail.html.j2' %}
</div>
{% else %}
{% include '/secrets/partials/default_detail.html.j2' %}
{% endif %}
</div>
</div>
</div>
@ -99,17 +127,19 @@
const type = selectedEl.dataset.type;
const name = selectedEl.dataset.name;
console.log(`Event on ${type} ${name}`);
const groupPath = selectedEl.dataset.groupPath;
console.log(`Event on ${type} ${name} path: ${groupPath}`);
if (!type || !name) return;
let url = '';
if (type === 'entry') {
url = `/secrets/partial/secret/${encodeURIComponent(name)}`;
url = `/secrets/secret/${encodeURIComponent(name)}`;
} else if (type === 'group') {
url = `/secrets/partial/group/${encodeURIComponent(name)}`;
//url = `/secrets/partial/group/${encodeURIComponent(name)}`;
url = `/secrets/group/${encodeURIComponent(groupPath)}`;
} else if (type == 'root') {
url = `/secrets/partial/root_group`;
url = `/secrets/group/`;
}
if (url) {

View File

@ -3,6 +3,8 @@
{% include '/secrets/partials/client_list_inner.html.j2' %}
</ul>
</div>
{% if secret.secret %}
<div class="w-full my-2" id="secretclientaction">
{% include '/secrets/partials/client_assign_button.html.j2' %}
{% endif %}
</div>

View File

@ -1,7 +1,9 @@
<div class="w-full">
<div class="mb-4">
<h3 class="text-xl font-semibold dark:text-white">Group {{name}}</h3>
{% if description %}
<span class="text-sm text-gray-500 dark:text-gray-400">{{ description }}</span>
{% endif %}
</div>
<sl-details summary="Create secret">

View File

@ -1,5 +1,42 @@
<div class="w-full" id="secretdetails">
<!-- menu -->
<div class="flex justify-end px-4">
<button id="secret-menu-button" data-dropdown-toggle="secret-edit-menu" class="inline-block text-gray-500 dark:text-gray-400 hover:bg-gray-100 dark:hover:bg-gray-700 focus:ring-4 focus:outline-none focus:ring-gray-200 dark:focus:ring-gray-700 rounded-lg text-sm p-1.5" type="button">
<span class="sr-only">Open dropdown</span>
<svg class="w-5 h-5" aria-hidden="true" xmlns="http://www.w3.org/2000/svg" fill="currentColor" viewBox="0 0 16 3">
<path d="M2 0a1.5 1.5 0 1 1 0 3 1.5 1.5 0 0 1 0-3Zm6.041 0a1.5 1.5 0 1 1 0 3 1.5 1.5 0 0 1 0-3ZM14 0a1.5 1.5 0 1 1 0 3 1.5 1.5 0 0 1 0-3Z"/>
</svg>
</button>
<!-- Dropdown menu -->
<div id="secret-edit-menu" class="z-10 hidden text-base list-none bg-white divide-y divide-gray-100 rounded-lg shadow-sm w-44 dark:bg-gray-700">
<ul class="py-2" aria-labelledby="secret-menu-button">
<li>
<a
href="#"
class="block px-4 py-2 text-sm text-red-600 hover:bg-gray-100 dark:hover:bg-gray-600 dark:text-gray-200 dark:hover:text-white"
hx-delete="/secrets/{{secret.name}}"
hx-target="#secretdetails"
hx-swap="OuterHTML"
hx-indicator=".secret-spinner"
hx-confirm="Really delete this secret?"
>
Delete
</a>
</li>
</ul>
</div>
</div>
<h3 class="mb-4 text-xl font-semibold dark:text-white">{{secret.name}}</h3>
{% if secret.description %}
<span class="text-sm text-gray-500 dark:text-gray-400">{{ secret.description }}</span>
{% endif %}
{% if not secret.secret %}
<p class="text-sm text-gray-500 dark:text-gray-400 italic">This secret was created outside of sshecret-admin. It cannot be decrypted, and therefore fewer options are available here.</p>
{% endif %}
<div class="htmx-indicator secret-spinner">
<div role="status">
<svg aria-hidden="true" class="inline w-6 h-6 text-gray-200 animate-spin dark:text-gray-600 fill-blue-600" viewBox="0 0 100 101" fill="none" xmlns="http://www.w3.org/2000/svg">
@ -15,6 +52,7 @@
{% include '/secrets/partials/client_secret_details.html.j2' %}
</div>
</sl-details>
{% if secret.secret %}
<sl-details summary="Read/Update Secret">
<div id="secretvalue">
<div class="mb-6">
@ -72,6 +110,7 @@
</form>
</sl-details>
{% endif %}
{% endif %}
<sl-details summary="Events">
<table class="min-w-full divide-y divide-gray-200 dark:divide-gray-600" id="last-audit-events">
<thead class="bg-gray-50 dark:bg-gray-700">

View File

@ -88,7 +88,7 @@ def create_router(dependencies: FrontendDependencies) -> APIRouter:
client: Annotated[ClientUpdate, Form()],
):
"""Update a client."""
original_client = await admin.get_client(id)
original_client = await admin.get_client(("id", id))
if not original_client:
return templates.TemplateResponse(
request, "fragments/error.html", {"message": "Client not found"}
@ -131,7 +131,7 @@ def create_router(dependencies: FrontendDependencies) -> APIRouter:
admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
) -> Response:
"""Delete a client."""
await admin.delete_client(id)
await admin.delete_client(("id", id))
clients = await admin.get_clients()
headers = {"Hx-Refresh": "true"}
return templates.TemplateResponse(

View File

@ -64,7 +64,7 @@ def create_router(dependencies: FrontendDependencies) -> APIRouter:
current_user: Annotated[LocalUserInfo, Depends(dependencies.get_user_info)],
):
groups = await admin.get_secret_groups()
LOG.info("Groups: %r", groups)
LOG.info("Groups: %s", groups.model_dump_json(indent=2))
return templates.TemplateResponse(
request,
"secrets/index.html.j2",
@ -74,28 +74,121 @@ def create_router(dependencies: FrontendDependencies) -> APIRouter:
},
)
@app.get("/secrets/partial/root_group")
async def get_root_group(
# @app.get("/secrets/partial/root_group")
# async def get_root_group(
# request: Request,
# admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
# ):
# """Get root group."""
# clients = await admin.get_clients()
# return templates.TemplateResponse(
# request,
# "secrets/partials/edit_root.html.j2",
# {
# "group_path_nodes": [],
# "clients": clients,
# },
# )
# @app.get("/secrets/partial/secret/{name}")
# async def get_secret_tree_detail_partial(
# request: Request,
# name: str,
# admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
# ):
# """Get partial secret detail."""
# secret = await admin.get_secret(name)
# groups = await admin.get_secret_groups(flat=True)
# events = await admin.get_audit_log_detailed(limit=10, secret_name=name)
# if not secret:
# raise HTTPException(
# status_code=status.HTTP_404_NOT_FOUND, detail="Secret not found"
# )
# return templates.TemplateResponse(
# request,
# "secrets/partials/tree_detail.html.j2",
# {
# "secret": secret,
# "groups": groups,
# "events": events,
# },
# )
@app.get("/secrets/group/")
async def show_root_group(
request: Request,
admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
current_user: Annotated[LocalUserInfo, Depends(dependencies.get_user_info)],
):
"""Get root group."""
"""Show the root path."""
clients = await admin.get_clients()
return templates.TemplateResponse(
request,
"secrets/partials/edit_root.html.j2",
{
context: dict[str, Any] = {
"clients": clients,
},
"root_group_page": True,
}
headers: dict[str, str] = {}
if request.headers.get("HX-Request"):
# This is a HTMX request.
template_name = "secrets/partials/edit_root.html.j2"
headers["HX-Push-Url"] = request.url.path
else:
groups = await admin.get_secret_groups()
template_name = "secrets/index.html.j2"
context["user"] = current_user
context["groups"] = groups
context["group_path_nodes"] = ["/"]
return templates.TemplateResponse(
request, template_name, context, headers=headers
)
@app.get("/secrets/partial/secret/{name}")
@app.get("/secrets/group/{group_path:path}")
async def show_group(
request: Request,
group_path: str,
admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
current_user: Annotated[LocalUserInfo, Depends(dependencies.get_user_info)],
):
"""Show a group."""
group = await admin.get_secret_group_by_path(group_path)
if not group:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Group not found"
)
clients = await admin.get_clients()
headers: dict[str, str] = {}
context: dict[str, Any] = {
"group_page": True,
"name": group.group_name,
"description": group.description,
"clients": clients,
}
if request.headers.get("HX-Request"):
# This is a HTMX request.
template_name = "secrets/partials/group_detail.html.j2"
headers["HX-Push-Url"] = request.url.path
else:
template_name = "secrets/index.html.j2"
groups = await admin.get_secret_groups()
context["user"] = current_user
context["groups"] = groups
context["group_path_nodes"] = group.path.split("/")
return templates.TemplateResponse(
request, template_name, context, headers=headers
)
@app.get("/secrets/secret/{name}")
async def get_secret_tree_detail(
request: Request,
name: str,
admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
current_user: Annotated[LocalUserInfo, Depends(dependencies.get_user_info)],
):
"""Get partial secret detail."""
"""Get secret detail."""
secret = await admin.get_secret(name)
groups = await admin.get_secret_groups(flat=True)
events = await admin.get_audit_log_detailed(limit=10, secret_name=name)
@ -104,14 +197,34 @@ def create_router(dependencies: FrontendDependencies) -> APIRouter:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Secret not found"
)
return templates.TemplateResponse(
request,
"secrets/partials/tree_detail.html.j2",
{
context: dict[str, Any] = {
"secret": secret,
"groups": groups,
"events": events,
},
"secret_page": True,
}
headers: dict[str, str] = {}
if request.headers.get("HX-Request"):
# This is a HTMX request.
template_name = "secrets/partials/tree_detail.html.j2"
headers["HX-Push-Url"] = request.url.path
else:
group_path = ["/"]
if secret.group:
group = await admin.get_secret_group(secret.group)
if group:
group_path = group.path.split("/")
template_name = "secrets/index.html.j2"
context["user"] = current_user
context["groups"] = groups
context["group_path_nodes"] = group_path
return templates.TemplateResponse(
request, template_name, context, headers=headers
)
@app.get("/secrets/partial/group/{name}")
@ -461,7 +574,7 @@ def create_router(dependencies: FrontendDependencies) -> APIRouter:
admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
):
"""Add a secret to a client."""
await admin.create_client_secret(client, name)
await admin.create_client_secret(("id", client), name)
secret = await admin.get_secret(name)
if not secret:
raise HTTPException(
@ -478,26 +591,20 @@ def create_router(dependencies: FrontendDependencies) -> APIRouter:
},
)
# @app.delete("/secrets/{name}")
# async def delete_secret(
# request: Request,
# name: str,
# admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
# ):
# """Delete a secret."""
# await admin.delete_secret(name)
# clients = await admin.get_clients()
# secrets = await admin.get_detailed_secrets()
# headers = {"Hx-Refresh": "true"}
@app.delete("/secrets/{name}")
async def delete_secret(
request: Request,
name: str,
admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
):
"""Delete a secret."""
await admin.delete_secret(name)
headers = {"Hx-Refresh": "true"}
# return templates.TemplateResponse(
# request,
# "secrets/inner.html.j2",
# {
# "clients": clients,
# "secrets": secrets,
# },
# headers=headers,
# )
return templates.TemplateResponse(
request,
"secrets/partials/default_detail.html.j2",
headers=headers,
)
return app

View File

@ -12,13 +12,12 @@ from sshecret.backend import (
AuditListResult,
Client,
ClientFilter,
Secret,
SshecretBackend,
Operation,
SubSystem,
)
from sshecret.backend.models import DetailedSecrets
from sshecret.backend.api import AuditAPI
from sshecret.backend.api import AuditAPI, KeySpec
from sshecret.crypto import encrypt_string, load_public_key
from .keepass import PasswordContext, load_password_manager
@ -27,6 +26,7 @@ from .models import (
ClientSecretGroup,
ClientSecretGroupList,
SecretClientMapping,
SecretListView,
SecretGroup,
SecretView,
)
@ -113,13 +113,13 @@ class AdminBackend:
except Exception as e:
raise BackendUnavailableError() from e
async def _get_client(self, name: str) -> Client | None:
async def _get_client(self, idname: KeySpec) -> Client | None:
"""Get a client from the backend."""
return await self.backend.get_client(name)
return await self.backend.get_client(idname)
async def _verify_client_exists(self, name: str) -> None:
async def _verify_client_exists(self, idname: KeySpec) -> None:
"""Check that a client exists."""
client = await self.backend.get_client(name)
client = await self.backend.get_client(idname)
if not client:
raise ClientNotFoundError()
return None
@ -133,7 +133,7 @@ class AdminBackend:
except Exception as e:
raise BackendUnavailableError() from e
async def get_client(self, name: str) -> Client | None:
async def get_client(self, name: KeySpec) -> Client | None:
"""Get a client from the backend."""
try:
return await self._get_client(name)
@ -176,7 +176,10 @@ class AdminBackend:
raise BackendUnavailableError() from e
async def _update_client_public_key(
self, name: str, new_key: str, password_manager: PasswordContext
self,
name: KeySpec,
new_key: str,
password_manager: PasswordContext,
) -> list[str]:
"""Update client public key."""
LOG.info(
@ -203,7 +206,7 @@ class AdminBackend:
return updated_secrets
async def update_client_public_key(self, name: str, new_key: str) -> list[str]:
async def update_client_public_key(self, name: KeySpec, new_key: str) -> list[str]:
"""Update client public key."""
try:
with self.password_manager() as password_manager:
@ -238,18 +241,18 @@ class AdminBackend:
except Exception as e:
raise BackendUnavailableError() from e
async def update_client_sources(self, name: str, sources: list[str]) -> None:
async def update_client_sources(self, name: KeySpec, sources: list[str]) -> None:
"""Update client sources."""
try:
await self.backend.update_client_sources(name, sources)
except Exception as e:
raise BackendUnavailableError() from e
async def _delete_client(self, name: str) -> None:
async def _delete_client(self, name: KeySpec) -> None:
"""Delete client."""
await self.backend.delete_client(name)
async def delete_client(self, name: str) -> None:
async def delete_client(self, name: KeySpec) -> None:
"""Delete client."""
try:
await self._delete_client(name)
@ -258,30 +261,41 @@ class AdminBackend:
except Exception as e:
raise BackendUnavailableError() from e
async def delete_client_secret(self, client_name: str, secret_name: str) -> None:
async def delete_client_secret(
self, client_name: KeySpec, secret_name: KeySpec
) -> None:
"""Delete a secret from a client."""
try:
await self.backend.delete_client_secret(client_name, secret_name)
except Exception as e:
raise BackendUnavailableError() from e
async def _get_secrets(self) -> list[Secret]:
async def _get_secrets(self) -> list[SecretListView]:
"""Get secrets.
This fetches the secret to client mapping from backend, and adds secrets from the password manager.
"""
backend_secrets = await self.backend.get_secrets()
with self.password_manager() as password_manager:
all_secrets = password_manager.get_available_secrets()
admin_secrets = password_manager.get_available_secrets()
secrets = await self.backend.get_secrets()
backend_secret_names = [secret.name for secret in secrets]
for secret in all_secrets:
if secret not in backend_secret_names:
secrets.append(Secret(name=secret, clients=[]))
secrets: dict[str, SecretListView] = {}
for secret in backend_secrets:
secrets[secret.name] = SecretListView(
name=secret.name, unmanaged=True, clients=secret.clients
)
return secrets
for secret_name in admin_secrets:
if secret_name in secrets:
secrets[secret_name].unmanaged = False
continue
secrets[secret_name] = SecretListView(
name=secret_name, unmanaged=False, clients=[]
)
async def get_secrets(self) -> list[Secret]:
return list(secrets.values())
async def get_secrets(self) -> list[SecretListView]:
"""Get secrets from backend."""
try:
return await self._get_secrets()
@ -381,6 +395,8 @@ class AdminBackend:
)
ungrouped = password_manager.get_ungrouped_secrets()
all_admin_secrets = password_manager.get_available_secrets()
group_result: list[ClientSecretGroup] = []
for group in all_groups:
# We have to do this recursively.
@ -397,7 +413,19 @@ class AdminBackend:
mapping.clients = client_mapping.clients
ungrouped_clients.append(mapping)
# We need to process unmanaged secrets too.
unmanaged_secrets = [
secret for secret in all_secrets if secret.name not in all_admin_secrets
]
for secret in unmanaged_secrets:
ungrouped_clients.append(
SecretClientMapping(
name=secret.name, unmanaged=True, clients=secret.clients
)
)
result.ungrouped = ungrouped_clients
return result
async def get_secret_group(self, name: str) -> ClientSecretGroup | None:
@ -407,6 +435,18 @@ class AdminBackend:
return matches.groups[0]
return None
async def get_secret_group_by_path(self, path: str) -> ClientSecretGroup | None:
"""Get a group based on its path."""
with self.password_manager() as password_manager:
secret_group = password_manager.get_secret_group(path)
if not secret_group:
return None
all_secrets = await self.backend.get_detailed_secrets()
secrets_mapping = {secret.name: secret for secret in all_secrets}
return add_clients_to_secret_group(secret_group, secrets_mapping)
async def get_secret(self, name: str) -> SecretView | None:
"""Get secrets from backend."""
try:
@ -416,18 +456,24 @@ class AdminBackend:
except Exception as e:
raise BackendUnavailableError() from e
async def _get_secret(self, name: str) -> SecretView | None:
async def _get_secret(
self, name: str, secret_id: str | None = None
) -> SecretView | None:
"""Get a secret, including the actual unencrypted value and clients."""
secret: str | None = None
with self.password_manager() as password_manager:
secret = password_manager.get_secret(name)
secret_group = password_manager.get_entry_group(name)
if not secret:
return None
secret_view = SecretView(name=name, secret=secret, group=secret_group)
secret_mapping = await self.backend.get_secret(name)
idname: KeySpec = name
if secret_id:
idname = ("id", secret_id)
secret_mapping = await self.backend.get_secret(idname)
if secret_mapping:
secret_view.clients = secret_mapping.clients
secret_view.clients = [ref.name for ref in secret_mapping.clients]
return secret_view
@ -450,7 +496,7 @@ class AdminBackend:
return
for client in secret_mapping.clients:
LOG.info("Deleting secret %s from client %s", name, client)
await self.backend.delete_client_secret(client, name)
await self.backend.delete_client_secret(("id", client.id), name)
async def _add_secret(
self,
@ -467,7 +513,7 @@ class AdminBackend:
if update:
secret_map = await self.backend.get_secret(name)
if secret_map:
clients = secret_map.clients
clients = [ref.name for ref in secret_map.clients]
if not clients:
return
@ -507,11 +553,13 @@ class AdminBackend:
except Exception as e:
raise BackendUnavailableError() from e
async def _create_client_secret(self, client_name: str, secret_name: str) -> None:
async def _create_client_secret(
self, client_idname: KeySpec, secret_name: str
) -> None:
"""Create client secret."""
client = await self.get_client(client_name)
client = await self.get_client(client_idname)
if not client:
raise ClientNotFoundError()
raise ClientNotFoundError(client_idname)
with self.password_manager() as password_manager:
secret = password_manager.get_secret(secret_name)
@ -520,12 +568,14 @@ class AdminBackend:
public_key = load_public_key(client.public_key.encode())
encrypted = encrypt_string(secret, public_key)
await self.backend.create_client_secret(client_name, secret_name, encrypted)
await self.backend.create_client_secret(client_idname, secret_name, encrypted)
async def create_client_secret(self, client_name: str, secret_name: str) -> None:
async def create_client_secret(
self, client_idname: KeySpec, secret_name: str
) -> None:
"""Create client secret."""
try:
await self._create_client_secret(client_name, secret_name)
await self._create_client_secret(client_idname, secret_name)
except ClientManagementError:
raise
except Exception as e:

View File

@ -32,7 +32,9 @@ def create_password_db(location: Path, password: str) -> None:
def _kp_group_to_secret_group(
kp_group: pykeepass.group.Group, parent: SecretGroup | None = None, depth: int | None = None
kp_group: pykeepass.group.Group,
parent: SecretGroup | None = None,
depth: int | None = None,
) -> SecretGroup:
"""Convert keepass group to secret group dataclass."""
group_name = cast(str, kp_group.name)
@ -143,8 +145,9 @@ class PasswordContext:
return None
return str(entry.group.name)
def get_secret_groups(self, pattern: str | None = None, regex: bool = True) -> list[SecretGroup]:
def get_secret_groups(
self, pattern: str | None = None, regex: bool = True
) -> list[SecretGroup]:
"""Get secret groups.
A regex pattern may be provided to filter groups.
@ -160,7 +163,9 @@ class PasswordContext:
secret_groups = [_kp_group_to_secret_group(group) for group in groups]
return secret_groups
def get_secret_group_list(self, pattern: str | None = None, regex: bool = True) -> list[SecretGroup]:
def get_secret_group_list(
self, pattern: str | None = None, regex: bool = True
) -> list[SecretGroup]:
"""Get a flat list of groups."""
if pattern:
return self.get_secret_groups(pattern, regex)
@ -169,6 +174,24 @@ class PasswordContext:
secret_groups = [_kp_group_to_secret_group(group) for group in groups]
return secret_groups
def get_secret_group(self, path: str) -> SecretGroup | None:
"""Get a secret group by path."""
elements = path.split("/")
final_element = elements[-1]
current = self._root_group
while elements:
groupname = elements.pop(0)
matches = [
subgroup for subgroup in current.subgroups if subgroup.name == groupname
]
if matches:
current = matches[0]
else:
return None
if not current.is_root_group and current.name == final_element:
return _kp_group_to_secret_group(current)
return None
def get_ungrouped_secrets(self) -> list[str]:
"""Get secrets without groups."""
@ -193,7 +216,9 @@ class PasswordContext:
f"Error: Cannot find a parent group named {parent_group}"
)
kp_parent_group = query
self.keepass.add_group(destination_group=kp_parent_group, group_name=name, notes=description)
self.keepass.add_group(
destination_group=kp_parent_group, group_name=name, notes=description
)
self.keepass.save()
def set_group_description(self, name: str, description: str) -> None:

View File

@ -25,6 +25,7 @@ class SecretListView(BaseModel):
"""Model containing a list of all available secrets."""
name: str
unmanaged: bool = False
clients: list[str] = Field(default_factory=list) # Clients that have access to it.
@ -32,7 +33,7 @@ class SecretView(BaseModel):
"""Model containing a secret, including its clear-text value."""
name: str
secret: str
secret: str | None
group: str | None = None
clients: list[str] = Field(default_factory=list) # Clients that have access to it.
@ -134,6 +135,7 @@ class SecretClientMapping(BaseModel):
"""Secret name with list of clients."""
name: str # name of secret
unmanaged: bool = False
clients: list[ClientReference] = Field(default_factory=list)

View File

@ -327,9 +327,6 @@
.start-0 {
inset-inline-start: calc(var(--spacing) * 0);
}
.end-2\.5 {
inset-inline-end: calc(var(--spacing) * 2.5);
}
.top-0 {
top: calc(var(--spacing) * 0);
}
@ -417,18 +414,12 @@
.m-361 {
margin: calc(var(--spacing) * 361);
}
.mx-2\.5 {
margin-inline: calc(var(--spacing) * 2.5);
}
.mx-3 {
margin-inline: calc(var(--spacing) * 3);
}
.mx-4 {
margin-inline: calc(var(--spacing) * 4);
}
.mx-\[1rem\] {
margin-inline: 1rem;
}
.mx-auto {
margin-inline: auto;
}
@ -459,9 +450,6 @@
.ms-3 {
margin-inline-start: calc(var(--spacing) * 3);
}
.ms-auto {
margin-inline-start: auto;
}
.me-2 {
margin-inline-end: calc(var(--spacing) * 2);
}
@ -486,9 +474,6 @@
.mt-2 {
margin-top: calc(var(--spacing) * 2);
}
.mt-2\.5 {
margin-top: calc(var(--spacing) * 2.5);
}
.mt-3 {
margin-top: calc(var(--spacing) * 3);
}
@ -597,9 +582,6 @@
.ml-6 {
margin-left: calc(var(--spacing) * 6);
}
.ml-\[1rem\] {
margin-left: 1rem;
}
.ml-auto {
margin-left: auto;
}
@ -684,9 +666,6 @@
.h-32 {
height: calc(var(--spacing) * 32);
}
.h-48 {
height: calc(var(--spacing) * 48);
}
.h-\[0\.125rem\] {
height: 0.125rem;
}
@ -696,9 +675,6 @@
.h-\[36rem\] {
height: 36rem;
}
.h-\[calc\(100\%-1rem\)\] {
height: calc(100% - 1rem);
}
.h-full {
height: 100%;
}
@ -708,9 +684,6 @@
.max-h-64 {
max-height: calc(var(--spacing) * 64);
}
.max-h-full {
max-height: 100%;
}
.min-h-0 {
min-height: calc(var(--spacing) * 0);
}
@ -1228,9 +1201,6 @@
--tw-border-style: solid;
border-style: solid;
}
.border-blue-700 {
border-color: var(--color-blue-700);
}
.border-gray-100 {
border-color: var(--color-gray-100);
}
@ -1240,12 +1210,6 @@
.border-gray-300 {
border-color: var(--color-gray-300);
}
.border-gray-500 {
border-color: var(--color-gray-500);
}
.border-gray-900 {
border-color: var(--color-gray-900);
}
.border-green-100 {
border-color: var(--color-green-100);
}
@ -1745,9 +1709,6 @@
.text-blue-600 {
color: var(--color-blue-600);
}
.text-blue-700 {
color: var(--color-blue-700);
}
.text-blue-800 {
color: var(--color-blue-800);
}
@ -2143,20 +2104,6 @@
}
}
}
.hover\:bg-blue-700 {
&:hover {
@media (hover: hover) {
background-color: var(--color-blue-700);
}
}
}
.hover\:bg-blue-800 {
&:hover {
@media (hover: hover) {
background-color: var(--color-blue-800);
}
}
}
.hover\:bg-gray-50 {
&:hover {
@media (hover: hover) {
@ -2624,11 +2571,6 @@
padding-inline: calc(var(--spacing) * 4);
}
}
.sm\:px-16 {
@media (width >= 40rem) {
padding-inline: calc(var(--spacing) * 16);
}
}
.sm\:py-2 {
@media (width >= 40rem) {
padding-block: calc(var(--spacing) * 2);
@ -2870,11 +2812,6 @@
padding: calc(var(--spacing) * 0);
}
}
.md\:p-5 {
@media (width >= 48rem) {
padding: calc(var(--spacing) * 5);
}
}
.md\:p-6 {
@media (width >= 48rem) {
padding: calc(var(--spacing) * 6);
@ -3104,12 +3041,6 @@
line-height: var(--tw-leading, var(--text-6xl--line-height));
}
}
.lg\:text-xl {
@media (width >= 64rem) {
font-size: var(--text-xl);
line-height: var(--tw-leading, var(--text-xl--line-height));
}
}
.lg\:hover\:underline {
@media (width >= 64rem) {
&:hover {
@ -3237,11 +3168,6 @@
padding-inline: calc(var(--spacing) * 0);
}
}
.xl\:px-48 {
@media (width >= 80rem) {
padding-inline: calc(var(--spacing) * 48);
}
}
.xl\:py-24 {
@media (width >= 80rem) {
padding-block: calc(var(--spacing) * 24);
@ -3461,11 +3387,6 @@
background-color: var(--color-red-700);
}
}
.dark\:bg-red-900 {
&:where(.dark, .dark *) {
background-color: var(--color-red-900);
}
}
.dark\:bg-teal-900 {
&:where(.dark, .dark *) {
background-color: var(--color-teal-900);
@ -3516,11 +3437,6 @@
color: var(--color-gray-600);
}
}
.dark\:text-gray-700 {
&:where(.dark, .dark *) {
color: var(--color-gray-700);
}
}
.dark\:text-green-400 {
&:where(.dark, .dark *) {
color: var(--color-green-400);
@ -3561,11 +3477,6 @@
color: var(--color-purple-500);
}
}
.dark\:text-red-300 {
&:where(.dark, .dark *) {
color: var(--color-red-300);
}
}
.dark\:text-red-400 {
&:where(.dark, .dark *) {
color: var(--color-red-400);

View File

@ -5,7 +5,7 @@ import pykeepass
import pykeepass.exceptions
import pytest
from sshecret_admin.services.keepass import PasswordContext, _password_context, PasswordCredentialsError
from sshecret_admin.services.keepass import _password_context, PasswordCredentialsError
def test_open_invalid_database() -> None:
"""Test opening a non-existing database."""

View File

@ -128,8 +128,9 @@ class ClientOperations:
"""Delete client."""
db_client = await self._get_client(client)
if not db_client:
return
raise HTTPException(status_code=404, detail="Client not found.")
if db_client.is_deleted:
LOG.warning("Client %r was already deleted!", client)
return
db_client.is_deleted = True
db_client.deleted_at = datetime.now(timezone.utc)
@ -271,7 +272,12 @@ async def get_clients(
filter_query: ClientListParams,
) -> ClientQueryResult:
"""Get Clients."""
count_statement = select(func.count("*")).select_from(Client)
count_statement = (
select(func.count("*"))
.select_from(Client)
.where(Client.is_deleted.is_not(True))
.where(Client.is_active.is_not(False))
)
count_statement = cast(
Select[tuple[int]],
filter_client_statement(count_statement, filter_query, True),

View File

@ -40,7 +40,9 @@ class ClientView(BaseModel):
return responses
@classmethod
def from_client(cls, client: models.Client) -> Self:
def from_client(
cls, client: models.Client, include_deleted_secrets: bool = False
) -> Self:
"""Instantiate from a client."""
view = cls(
id=client.id,
@ -54,7 +56,12 @@ class ClientView(BaseModel):
is_deleted=client.is_deleted,
)
if client.secrets:
if include_deleted_secrets:
view.secrets = [secret.name for secret in client.secrets]
else:
view.secrets = [
secret.name for secret in client.secrets if not secret.deleted
]
if client.policies:
view.policies = [policy.source for policy in client.policies]

View File

@ -116,24 +116,18 @@ async def resolve_client_id(
return None
async def get_client_by_id(session: AsyncSession, id: uuid.UUID) -> Client | None:
async def get_client_by_id(
session: AsyncSession, id: uuid.UUID, include_deleted: bool = False
) -> Client | None:
"""Get client by ID."""
if include_deleted:
client_filter = client_with_relationships().where(Client.id == id)
else:
client_filter = query_active_clients().where(Client.id == id)
client_results = await session.execute(client_filter)
return client_results.scalars().first()
async def get_client_by_id_or_name(
session: AsyncSession, id_or_name: str
) -> Client | None:
"""Get client either by id or name."""
if RE_UUID.match(id_or_name):
id = uuid.UUID(id_or_name)
return await get_client_by_id(session, id)
return await get_client_by_name(session, id_or_name)
def query_active_clients() -> Select[tuple[Client]]:
"""Get all active clients."""
client_filter = (
@ -144,22 +138,6 @@ def query_active_clients() -> Select[tuple[Client]]:
return client_filter
async def get_client_by_name(session: AsyncSession, name: str) -> Client | None:
"""Get client by name.
This will get the latest client version, unless it's deleted.
"""
client_filter = (
client_with_relationships()
.where(Client.is_active.is_(True))
.where(Client.is_deleted.is_not(True))
.where(Client.name == name)
.order_by(Client.version.desc())
)
client_result = await session.execute(client_filter)
return client_result.scalars().first()
async def refresh_client(session: AsyncSession, client: Client) -> None:
"""Refresh the client and load in all relationships."""
await session.refresh(

View File

@ -182,8 +182,10 @@ class ClientSecretOperations:
async def delete_client_secret(self, secret_identifier: FlexID) -> None:
"""Delete a client secret."""
LOG.debug("delete_client_secret called with identifier %r", secret_identifier)
client_secret = await self._get_client_secret(secret_identifier)
if not client_secret:
LOG.warning("Could not find any secret matching client secret.")
return
client_secret.deleted = True

View File

@ -72,7 +72,6 @@ def create_client_secrets_router(get_db_session: AsyncDBSessionDep) -> APIRouter
client_op = ClientSecretOperations(session, request, client)
return await client_op.get_client_secret(secret)
# TODO: delete_client_secret
@router.delete("/clients/{client_identifier}/secrets/{secret_identifier}")
async def delete_client_secret(
request: Request,

View File

@ -7,14 +7,19 @@ import logging
from typing import cast, final, override
import asyncssh
from sshecret_sshd import exceptions, constants
from sshecret_sshd import constants, exceptions
from .base import CommandDispatcher
from .get_secret import GetSecret
from .register import Register
from .list_secrets import ListSecrets
from .ping import PingCommand
from .register import Register
from .shelldriver import (
ShellDeleteSecret,
ShellListSecrets,
ShellLookupSecret,
ShellStoreSecret,
)
SYNOPSIS = """[bold]Sshecret SSH Server[/bold]
@ -29,9 +34,13 @@ encoded as base64.
COMMANDS = [
GetSecret,
Register,
ListSecrets,
PingCommand,
Register,
ShellDeleteSecret,
ShellListSecrets,
ShellLookupSecret,
ShellStoreSecret,
]
LOG = logging.getLogger(__name__)

View File

@ -0,0 +1,163 @@
"""Podman Shelldriver compatible commands."""
import logging
from typing import final, override
import asyncssh
from sshecret.backend.models import Operation
from sshecret.crypto import encrypt_string, load_public_key
from .base import CommandDispatcher
LOG = logging.getLogger(__name__)
# These error messages are taken verbatim from podman, and while they don't seem
# to make complete sense, they will be used regardless.
ERR_SECRET_NOT_FOUND = "no such secret"
ERR_SECRET_EXISTS = "secret data with ID already exists"
ERR_INVALID_SECRET = "invalid key"
@final
class ShellListSecrets(CommandDispatcher):
"""List secrets.
This command lists secrets in a format compatible with podman's ShellDriver.
"""
name = "list"
@override
async def exec(self) -> None:
"""List secrets."""
LOG.debug("ShellListSecret called.")
await self.audit(Operation.READ, "Listed available secret names")
for secret_name in self.client.secrets:
self.print(secret_name)
@final
class ShellDeleteSecret(CommandDispatcher):
"""Delete a secret.
If the identifier for a secret does not exist, an error will be printed.
"""
name = "delete"
mandatory_argument = "KEY"
@override
async def exec(self) -> None:
"""Delete a secret."""
secret_name = self.arguments[0]
LOG.debug("ShellDeleteSecret called withg arguments %r.", self.arguments)
await self.audit(
operation=Operation.DELETE,
message="ClientSecret deleted",
secret=secret_name,
)
await self.backend.delete_client_secret(
("id", str(self.client.id)), ("name", secret_name)
)
@final
class ShellLookupSecret(CommandDispatcher):
"""Look up a secret.
The identifier for the secret must be provided as the argument.
"""
name = "lookup"
mandatory_argument = "KEY"
@override
async def exec(self) -> None:
"""Lookup secret."""
LOG.debug("ShellLookupSecret called with arguments %r", self.arguments)
secret_name = self.arguments[0]
secret = await self.backend.get_client_secret(
("id", str(self.client.id)), secret_name
)
if not secret:
LOG.debug(
"Secret %s not found for client %s (%s)",
secret_name,
self.client.id,
self.client.name,
)
self.print(ERR_SECRET_NOT_FOUND, stderr=True)
return
await self.audit(
Operation.READ, message="Client requested secret", secret=secret_name
)
self.print(secret)
@final
class ShellStoreSecret(CommandDispatcher):
"""Store a secret.
Secret will be read from command argument, or via STDIN.
"""
name = "store"
mandatory_argument = "KEY"
@override
async def exec(self) -> None:
"""Store a secret."""
LOG.debug("ShellStoreSecret called with arguments %r", self.arguments)
secret_name = self.arguments[0]
if secret_name in self.client.secrets:
self.print(ERR_SECRET_EXISTS, stderr=True)
return
secret_data: str | None = None
if len(self.arguments) == 2:
secret_data = self.arguments[1]
if not secret_data:
LOG.debug("No secret set as input, trying stdin.")
secret_data = await self.get_secret_on_stdin()
if not secret_data:
self.print(ERR_INVALID_SECRET, stderr=True)
return
# Encrypt secret
encrypted = self.encrypt_secret(secret_data)
await self.backend.create_client_secret(
("id", str(self.client.id)), secret_name, encrypted
)
await self.audit(
operation=Operation.CREATE,
message="Secret created from 'store' command",
secret=secret_name,
)
def encrypt_secret(self, value: str) -> str:
"""Encrypt a secret."""
public_key = load_public_key(self.client.public_key.encode())
return encrypt_string(value, public_key)
async def get_secret_on_stdin(self) -> str | None:
"""Get secret from stdin."""
secret_data = ""
try:
async for line in self.process.stdin:
if self.process.stdin.at_eof():
break
if not line:
break
secret_data += line.rstrip()
except asyncssh.BreakReceived:
pass
if not secret_data:
return None
return secret_data

View File

@ -5,7 +5,7 @@ admin and sshd library do not need to implement the same
"""
import logging
from typing import Any, Self, override
from typing import Any, Literal, Self, override
import httpx
from pydantic import TypeAdapter
@ -29,6 +29,17 @@ from .utils import validate_public_key
LOG = logging.getLogger(__name__)
KeyType = Literal["id", "name"]
KeySpec = str | tuple[KeyType, str]
def _key(id_or_name: KeySpec) -> str:
"""Get the correct key string."""
if isinstance(id_or_name, str):
return id_or_name
prefix, suffix = id_or_name
return f"{prefix}:{suffix}"
class ClientQueryIterator:
"""Asynchronous query iterator."""
@ -313,56 +324,54 @@ class SshecretBackend(BaseBackend):
return clients
async def get_client(self, name: str) -> Client | None:
async def get_client(self, id_or_name: KeySpec) -> Client | None:
"""Lookup a client on username."""
path = f"/api/v1/clients/{name}"
key = _key(id_or_name)
path = f"/api/v1/clients/{key}"
response = await self._get(path)
if response.status_code == 404:
return None
client = Client.model_validate(response.json())
return client
async def get_client_by_id(self, id: str) -> Client | None:
"""Lookup a client on username."""
path = f"/api/v1/clients/id/{id}"
response = await self._get(path)
if response.status_code == 404:
return None
client = Client.model_validate(response.json())
return client
async def delete_client(self, client_name: str) -> None:
async def delete_client(self, id_or_name: KeySpec) -> None:
"""Delete a client."""
path = f"/api/v1/clients/{client_name}"
response = await self._delete(path)
async def delete_client_by_id(self, id: str) -> None:
"""Delete a client."""
path = f"/api/v1/clients/id/{id}"
key = _key(id_or_name)
path = f"/api/v1/clients/{key}"
response = await self._delete(path)
LOG.debug("response: %s", response.text)
async def create_client_secret(
self, client_name: str, secret_name: str, encrypted_secret: str
self, client_idname: KeySpec, secret_name: str, encrypted_secret: str
) -> None:
"""Create a secret.
This will overwrite any existing secret with that name.
"""
path = f"api/v1/clients/{client_name}/secrets/{secret_name}"
client_key = _key(client_idname)
path = f"api/v1/clients/{client_key}/secrets/{secret_name}"
response = await self._put(path, json={"value": encrypted_secret})
async def get_client_secret(self, name: str, secret_name: str) -> str | None:
async def get_client_secret(
self, client_idname: KeySpec, secret_idname: KeySpec
) -> str | None:
"""Fetch a secret."""
path = f"/api/v1/clients/{name}/secrets/{secret_name}"
client_key = _key(client_idname)
secret_key = _key(secret_idname)
path = f"/api/v1/clients/{client_key}/secrets/{secret_key}"
response = await self._get(path)
if response.status_code == 404:
return None
secret = ClientSecret.model_validate(response.json())
return secret.secret
async def delete_client_secret(self, client_name: str, secret_name: str) -> None:
async def delete_client_secret(
self, client_idname: KeySpec, secret_idname: KeySpec
) -> None:
"""Delete a secret from a client."""
path = f"api/v1/clients/{client_name}/secrets/{secret_name}"
client_key = _key(client_idname)
secret_key = _key(secret_idname)
path = f"api/v1/clients/{client_key}/secrets/{secret_key}"
await self._delete(path)
async def update_client(self, client: Client) -> Client:
@ -382,13 +391,14 @@ class SshecretBackend(BaseBackend):
)
return client
async def update_client_key(self, client_name: str, public_key: str) -> None:
async def update_client_key(self, client_idname: KeySpec, public_key: str) -> None:
"""Update the client key."""
path = f"/api/v1/clients/{client_name}/public-key"
client_key = _key(client_idname)
path = f"/api/v1/clients/{client_key}/public-key"
await self._post(path, json={"public_key": public_key})
async def update_client_sources(
self, client_name: str, addresses: list[str] | None
self, client_idname: KeySpec, addresses: list[str] | None
) -> None:
"""Update client source addresses.
@ -397,36 +407,32 @@ class SshecretBackend(BaseBackend):
if not addresses:
addresses = []
path = f"/api/v1/clients/{client_name}/policies/"
client_key = _key(client_idname)
path = f"/api/v1/clients/{client_key}/policies/"
await self._put(path, json={"sources": addresses})
async def get_detailed_secrets(self) -> list[DetailedSecrets]:
"""Get detailed list of secrets."""
path = "/api/v1/secrets/detailed/"
path = "/api/v1/secrets/"
response = await self._get(path)
secret_list = TypeAdapter(list[DetailedSecrets])
return secret_list.validate_python(response.json())
async def get_secrets(self) -> list[Secret]:
"""Get Secrets.
This provides a list of secret names and which clients have them.
"""
"""Get detailed list of secrets."""
path = "/api/v1/secrets/"
response = await self._get(path)
secret_list = TypeAdapter(list[Secret])
return secret_list.validate_python(response.json())
detailed_secret_list = TypeAdapter(list[DetailedSecrets])
detailed_secrets = detailed_secret_list.validate_python(response.json())
# Convert to list of secrets
secrets: list[Secret] = []
for detail in detailed_secrets:
clients = [ref.name for ref in detail.clients]
secrets.append(Secret(name=detail.name, clients=clients))
async def get_secret(self, name: str) -> Secret | None:
"""Get clients mapped to a single secret."""
path = f"/api/v1/secrets/{name}"
response = await self._get(path)
if response.status_code == 404:
return None
return Secret.model_validate(response.json())
return secrets
async def get_detailed_secret(self, name: str) -> DetailedSecrets | None:
"""Get clients mapped to a single secret."""
@ -437,6 +443,16 @@ class SshecretBackend(BaseBackend):
return DetailedSecrets.model_validate(response.json())
async def get_secret(self, idname: KeySpec) -> DetailedSecrets | None:
"""Get clients mapped to a single secret."""
secret_key = _key(idname)
path = f"/api/v1/secrets/{secret_key}"
response = await self._get(path)
if response.status_code == 404:
return None
return DetailedSecrets.model_validate(response.json())
def audit(self, subsystem: SubSystem) -> AuditAPI:
"""Create the audit API."""
audit = AuditAPI(self._backend_url, self._api_token, subsystem)

View File

@ -51,7 +51,7 @@ async def test_create_secret(backend_api: SshecretBackend) -> None:
assert secret_to_client is not None
assert secret_to_client.name == "mysecret"
assert "test" in secret_to_client.clients
assert secret_to_client.clients[0].name == "test"
secret = await backend_api.get_client_secret("test", "mysecret")

View File

@ -3,8 +3,9 @@
This essentially also tests parts of the admin API.
"""
import asyncio
from contextlib import asynccontextmanager
from typing import AsyncIterator
from collections.abc import AsyncIterator
import os
import httpx
@ -17,7 +18,33 @@ from .clients import create_test_client, ClientData
from .types import CommandRunner, ProcessRunner
class TestSshd:
class BaseSSHTests:
"""Base test class."""
async def register_client(
self, name: str, ssh_session: ProcessRunner
) -> ClientData:
"""Register client."""
test_client = create_test_client(name)
async with ssh_session(test_client, "register") as session:
maxlines = 10
linenum = 0
found = False
while linenum < maxlines:
line = await session.stdout.readline()
if "Enter public key" in line:
found = True
break
assert found is True
session.stdin.write(test_client.public_key + "\n")
result = await session.stdout.read()
assert "Key is valid. Registering client." in result
await session.wait()
return test_client
class TestSshd(BaseSSHTests):
"""Class based tests.
This allows us to create small helpers.
@ -52,30 +79,9 @@ class TestSshd:
client = clients[0]
assert client.name == "new_client"
async def register_client(
self, name: str, ssh_session: ProcessRunner
) -> ClientData:
"""Register client."""
test_client = create_test_client(name)
async with ssh_session(test_client, "register") as session:
maxlines = 10
linenum = 0
found = False
while linenum < maxlines:
line = await session.stdout.readline()
if "Enter public key" in line:
found = True
break
assert found is True
session.stdin.write(test_client.public_key + "\n")
result = await session.stdout.read()
assert "Key is valid. Registering client." in result
await session.wait()
return test_client
class TestSshdIntegration(TestSshd):
class TestSshdIntegration(BaseSSHTests):
"""Integration tests."""
@pytest.mark.asyncio
@ -137,3 +143,185 @@ class TestSshdIntegration(TestSshd):
base_url=url, headers={"Authorization": f"Bearer {token}"}
) as client:
yield client
class TestShelldriverCommands(BaseSSHTests):
"""Shelldriver command tests."""
@pytest.mark.asyncio
async def test_store_and_lookup(
self,
backend_server: tuple[str, tuple[str, str]],
ssh_session: ProcessRunner,
ssh_command_runner: CommandRunner,
) -> None:
"""Log in."""
test_client = await self.register_client("myclient", ssh_session)
ssh_output = await ssh_command_runner(test_client, "store mysecret secretvalue")
assert bool(ssh_output.stderr) is False
# wait half a second
await asyncio.sleep(0.5)
ssh_output = await ssh_command_runner(test_client, "lookup mysecret")
assert bool(ssh_output.stderr) is False
assert ssh_output.stdout is not None
assert isinstance(ssh_output.stdout, str)
encrypted = ssh_output.stdout.rstrip()
decrypted = decode_string(encrypted, test_client.private_key)
assert decrypted == "secretvalue"
@pytest.mark.asyncio
async def test_store_and_lookup_stdin(
self,
backend_server: tuple[str, tuple[str, str]],
ssh_session: ProcessRunner,
ssh_command_runner: CommandRunner,
) -> None:
"""Test store and lookup, with password specification in stdin."""
test_client = await self.register_client("myclient", ssh_session)
async with ssh_session(test_client, "store insecret") as session:
session.stdin.write("testinput\n")
session.stdin.write_eof()
await session.stdin.wait_closed()
await asyncio.sleep(0.5)
ssh_output = await ssh_command_runner(test_client, "lookup insecret")
assert bool(ssh_output.stderr) is False
assert ssh_output.stdout is not None
assert isinstance(ssh_output.stdout, str)
encrypted = ssh_output.stdout.rstrip()
decrypted = decode_string(encrypted, test_client.private_key)
assert decrypted == "testinput"
@pytest.mark.parametrize("secret_name",["nonexistant", "blåbærgrød", "../../../etc/shadow"])
@pytest.mark.asyncio
async def test_invalid_lookup(
self,
secret_name: str,
ssh_command_runner: CommandRunner,
ssh_session: ProcessRunner,
) -> None:
"""Test lookup with invalid secret."""
test_client = await self.register_client("myclient", ssh_session)
ssh_output = await ssh_command_runner(test_client, f"lookup {secret_name}")
assert isinstance(ssh_output.stderr, str)
assert ssh_output.stderr.rstrip() == "no such secret"
class TestShelldriverListCommand(BaseSSHTests):
"""Tests for the list command."""
@pytest.fixture(name="secret_names")
def get_secret_names(self) -> list[str]:
"""Get secret names.
Sort of like a parametrize function.
"""
return ["foo", "bar", "abc123", "blåbærgrød"]
@pytest.fixture(name="test_client")
@pytest.mark.asyncio
async def create_test_client(
self,
ssh_session: ProcessRunner,
) -> ClientData:
"""Register a test client."""
return await self.register_client("listclient", ssh_session)
@pytest.fixture(autouse=True)
@pytest.mark.asyncio
async def create_data(
self,
secret_names: list[str],
test_client: ClientData,
ssh_command_runner: CommandRunner,
) -> None:
"""Create data for the test."""
for name in secret_names:
ssh_output = await ssh_command_runner(test_client, f"store {name} secretvalue")
assert bool(ssh_output.stderr) is False
assert ssh_output.stdout is not None
assert isinstance(ssh_output.stdout, str)
@pytest.mark.asyncio
async def test_list_secrets(
self,
secret_names: list[str],
test_client: ClientData,
ssh_command_runner: CommandRunner,
) -> None:
"""Test list command."""
ssh_output = await ssh_command_runner(test_client, "list")
assert bool(ssh_output.stderr) is False
assert ssh_output.stdout is not None
assert isinstance(ssh_output.stdout, str)
list_output = ssh_output.stdout
print(list_output)
input_secret_names = list_output.splitlines()
assert len(input_secret_names) == len(secret_names)
assert sorted(secret_names) == sorted(input_secret_names)
@pytest.mark.parametrize("secret_name", ["simple", "blåbærgrød"])
class TestShelldriverDeleteCommand(BaseSSHTests):
"""Tests for the list command."""
@pytest.fixture(name="test_client")
@pytest.mark.asyncio
async def create_test_client(
self,
ssh_session: ProcessRunner,
) -> ClientData:
"""Register a test client."""
return await self.register_client("listclient", ssh_session)
@pytest.fixture(autouse=True)
@pytest.mark.asyncio
async def create_data(
self,
secret_name: str,
test_client: ClientData,
ssh_command_runner: CommandRunner,
) -> None:
"""Create data for the test."""
ssh_output = await ssh_command_runner(test_client, f"store {secret_name} secretvalue")
assert bool(ssh_output.stderr) is False
assert ssh_output.stdout is not None
assert isinstance(ssh_output.stdout, str)
async def get_stored_secrets(
self,
test_client: ClientData,
ssh_command_runner: CommandRunner,
) -> list[str]:
"""Get stored secrets."""
ssh_output = await ssh_command_runner(test_client, "list")
assert bool(ssh_output.stderr) is False
assert ssh_output.stdout is not None
assert isinstance(ssh_output.stdout, str)
list_output = ssh_output.stdout
print(list_output)
input_secret_names = list_output.splitlines()
return sorted(input_secret_names)
@pytest.mark.asyncio
async def test_delete_secret(
self,
secret_name: str,
test_client: ClientData,
ssh_command_runner: CommandRunner,
) -> None:
"""Delete secret."""
current_secrets = await self.get_stored_secrets(test_client, ssh_command_runner)
assert secret_name in current_secrets
ssh_output = await ssh_command_runner(test_client, f"delete {secret_name}")
assert bool(ssh_output.stderr) is False
current_secrets = await self.get_stored_secrets(test_client, ssh_command_runner)
assert secret_name not in current_secrets

View File

@ -119,6 +119,9 @@ def test_delete_client(test_client: TestClient) -> None:
resp = test_client.get("/api/v1/clients/test")
assert resp.status_code == 404
resp = test_client.get("/api/v1/clients/")
assert resp.status_code == 200
def test_add_secret(test_client: TestClient) -> None:
"""Test adding a secret to a client."""