Compare commits
10 Commits
3779e93b8c
...
0eaa913e35
| Author | SHA1 | Date | |
|---|---|---|---|
| 0eaa913e35 | |||
| 782ec19137 | |||
| 43d00cecb4 | |||
| d1fa6c0076 | |||
| 71d877022b | |||
| 36d04b8a33 | |||
| a834339c13 | |||
| fb6b76f7d8 | |||
| fed441743e | |||
| d86d9a9256 |
@ -5,7 +5,6 @@ import logging
|
||||
from typing import Annotated
|
||||
from fastapi import APIRouter, Depends, HTTPException, Query, Security, status
|
||||
|
||||
from sshecret.backend.models import Secret
|
||||
from sshecret_admin.core.dependencies import AdminDependencies
|
||||
from sshecret_admin.services import AdminBackend
|
||||
from sshecret_admin.services.models import (
|
||||
@ -13,6 +12,7 @@ from sshecret_admin.services.models import (
|
||||
ClientSecretGroupList,
|
||||
SecretCreate,
|
||||
SecretGroupCreate,
|
||||
SecretListView,
|
||||
SecretUpdate,
|
||||
SecretView,
|
||||
)
|
||||
@ -27,7 +27,7 @@ def create_router(dependencies: AdminDependencies) -> APIRouter:
|
||||
@app.get("/secrets/")
|
||||
async def get_secret_names(
|
||||
admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
|
||||
) -> list[Secret]:
|
||||
) -> list[SecretListView]:
|
||||
"""Get Secret Names."""
|
||||
return await admin.get_secrets()
|
||||
|
||||
@ -94,7 +94,7 @@ def create_router(dependencies: AdminDependencies) -> APIRouter:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND, detail="No such group."
|
||||
)
|
||||
return results[0]
|
||||
return results.groups[0]
|
||||
|
||||
@app.post("/secrets/groups/")
|
||||
async def add_secret_group(
|
||||
@ -125,7 +125,7 @@ def create_router(dependencies: AdminDependencies) -> APIRouter:
|
||||
Entries within the group will be moved to the root.
|
||||
This also includes nested entries further down from the group.
|
||||
"""
|
||||
group = await admin.get_secret_group(group.name)
|
||||
group = await admin.get_secret_group(group_name)
|
||||
if not group:
|
||||
return
|
||||
await admin.delete_secret_group(group_name, keep_entries=True)
|
||||
@ -196,7 +196,7 @@ def create_router(dependencies: AdminDependencies) -> APIRouter:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND, detail="No such group."
|
||||
)
|
||||
group = groups[0]
|
||||
group = groups.groups[0]
|
||||
matching_entries = [
|
||||
entry for entry in group.entries if entry.name == secret_name
|
||||
]
|
||||
|
||||
@ -4,6 +4,12 @@
|
||||
class="tree-entry-item"
|
||||
data-type="entry"
|
||||
data-name="{{ entry.name }}"
|
||||
data-group-path="/"
|
||||
{% if secret | default(false) %}
|
||||
{% if secret.name == entry.name %}
|
||||
selected=""
|
||||
{% endif %}
|
||||
{% endif %}
|
||||
>
|
||||
<sl-icon name="shield"> </sl-icon>
|
||||
<span class="px-2">{{ entry.name }}</span>
|
||||
@ -16,6 +22,13 @@
|
||||
class="secret-group-list-item"
|
||||
data-type="group"
|
||||
data-name="{{ group.group_name }}"
|
||||
data-group-path="{{ group.path }}"
|
||||
{% if group_path_nodes | default(false) %}
|
||||
{% if group.group_name in group_path_nodes %}
|
||||
expanded=""
|
||||
{% endif %}
|
||||
|
||||
{% endif %}
|
||||
>
|
||||
<sl-icon name="folder"> </sl-icon>
|
||||
<span class="px-2">{{ group.group_name }}</span>
|
||||
@ -57,8 +70,8 @@
|
||||
<div class="p-4 bg-white border border-gray-200 rounded-lg shadow-sm sm:flex dark:border-gray-700 sm:p-6 dark:bg-gray-800" id="secret-tree">
|
||||
|
||||
|
||||
<div class="flex flex-col">
|
||||
<div class="h-full">
|
||||
<div class="flex flex-1 flex-col">
|
||||
<div class="h-full w-full">
|
||||
<sl-tree class="tree-with-icons">
|
||||
<sl-tree-item
|
||||
id="secret-group-root-item"
|
||||
@ -81,7 +94,22 @@
|
||||
|
||||
</div>
|
||||
<div class="2xl:col-span-2 xl:col-span-2 p-4 mb-4 bg-white border border-gray-200 rounded-lg shadow-sm 2xl:col-span-2 dark:border-gray-700 sm:p-6 dark:bg-gray-800">
|
||||
{% if group_page | default(false) %}
|
||||
<div class="w-full" id="secretdetails">
|
||||
{% include '/secrets/partials/group_detail.html.j2' %}
|
||||
</div>
|
||||
{% elif root_group_page | default(false) %}
|
||||
<div class="w-full" id="secretdetails">
|
||||
{% include '/secrets/partials/edit_root.html.j2' %}
|
||||
</div>
|
||||
{% elif secret_page | default(false) %}
|
||||
<div class="w-full" id="secretdetails">
|
||||
{% include '/secrets/partials/tree_detail.html.j2' %}
|
||||
</div>
|
||||
{% else %}
|
||||
{% include '/secrets/partials/default_detail.html.j2' %}
|
||||
{% endif %}
|
||||
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
@ -99,17 +127,19 @@
|
||||
|
||||
const type = selectedEl.dataset.type;
|
||||
const name = selectedEl.dataset.name;
|
||||
console.log(`Event on ${type} ${name}`);
|
||||
const groupPath = selectedEl.dataset.groupPath;
|
||||
console.log(`Event on ${type} ${name} path: ${groupPath}`);
|
||||
|
||||
if (!type || !name) return;
|
||||
|
||||
let url = '';
|
||||
if (type === 'entry') {
|
||||
url = `/secrets/partial/secret/${encodeURIComponent(name)}`;
|
||||
url = `/secrets/secret/${encodeURIComponent(name)}`;
|
||||
} else if (type === 'group') {
|
||||
url = `/secrets/partial/group/${encodeURIComponent(name)}`;
|
||||
//url = `/secrets/partial/group/${encodeURIComponent(name)}`;
|
||||
url = `/secrets/group/${encodeURIComponent(groupPath)}`;
|
||||
} else if (type == 'root') {
|
||||
url = `/secrets/partial/root_group`;
|
||||
url = `/secrets/group/`;
|
||||
}
|
||||
|
||||
if (url) {
|
||||
|
||||
@ -3,6 +3,8 @@
|
||||
{% include '/secrets/partials/client_list_inner.html.j2' %}
|
||||
</ul>
|
||||
</div>
|
||||
{% if secret.secret %}
|
||||
<div class="w-full my-2" id="secretclientaction">
|
||||
{% include '/secrets/partials/client_assign_button.html.j2' %}
|
||||
{% endif %}
|
||||
</div>
|
||||
|
||||
@ -1,7 +1,9 @@
|
||||
<div class="w-full">
|
||||
<div class="mb-4">
|
||||
<h3 class="text-xl font-semibold dark:text-white">Group {{name}}</h3>
|
||||
{% if description %}
|
||||
<span class="text-sm text-gray-500 dark:text-gray-400">{{ description }}</span>
|
||||
{% endif %}
|
||||
</div>
|
||||
|
||||
<sl-details summary="Create secret">
|
||||
|
||||
@ -1,5 +1,42 @@
|
||||
<div class="w-full" id="secretdetails">
|
||||
|
||||
<!-- menu -->
|
||||
|
||||
<div class="flex justify-end px-4">
|
||||
<button id="secret-menu-button" data-dropdown-toggle="secret-edit-menu" class="inline-block text-gray-500 dark:text-gray-400 hover:bg-gray-100 dark:hover:bg-gray-700 focus:ring-4 focus:outline-none focus:ring-gray-200 dark:focus:ring-gray-700 rounded-lg text-sm p-1.5" type="button">
|
||||
<span class="sr-only">Open dropdown</span>
|
||||
<svg class="w-5 h-5" aria-hidden="true" xmlns="http://www.w3.org/2000/svg" fill="currentColor" viewBox="0 0 16 3">
|
||||
<path d="M2 0a1.5 1.5 0 1 1 0 3 1.5 1.5 0 0 1 0-3Zm6.041 0a1.5 1.5 0 1 1 0 3 1.5 1.5 0 0 1 0-3ZM14 0a1.5 1.5 0 1 1 0 3 1.5 1.5 0 0 1 0-3Z"/>
|
||||
</svg>
|
||||
</button>
|
||||
<!-- Dropdown menu -->
|
||||
<div id="secret-edit-menu" class="z-10 hidden text-base list-none bg-white divide-y divide-gray-100 rounded-lg shadow-sm w-44 dark:bg-gray-700">
|
||||
<ul class="py-2" aria-labelledby="secret-menu-button">
|
||||
<li>
|
||||
<a
|
||||
href="#"
|
||||
class="block px-4 py-2 text-sm text-red-600 hover:bg-gray-100 dark:hover:bg-gray-600 dark:text-gray-200 dark:hover:text-white"
|
||||
hx-delete="/secrets/{{secret.name}}"
|
||||
hx-target="#secretdetails"
|
||||
hx-swap="OuterHTML"
|
||||
hx-indicator=".secret-spinner"
|
||||
hx-confirm="Really delete this secret?"
|
||||
>
|
||||
Delete
|
||||
</a>
|
||||
</li>
|
||||
</ul>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
|
||||
<h3 class="mb-4 text-xl font-semibold dark:text-white">{{secret.name}}</h3>
|
||||
{% if secret.description %}
|
||||
<span class="text-sm text-gray-500 dark:text-gray-400">{{ secret.description }}</span>
|
||||
{% endif %}
|
||||
{% if not secret.secret %}
|
||||
<p class="text-sm text-gray-500 dark:text-gray-400 italic">This secret was created outside of sshecret-admin. It cannot be decrypted, and therefore fewer options are available here.</p>
|
||||
{% endif %}
|
||||
<div class="htmx-indicator secret-spinner">
|
||||
<div role="status">
|
||||
<svg aria-hidden="true" class="inline w-6 h-6 text-gray-200 animate-spin dark:text-gray-600 fill-blue-600" viewBox="0 0 100 101" fill="none" xmlns="http://www.w3.org/2000/svg">
|
||||
@ -15,6 +52,7 @@
|
||||
{% include '/secrets/partials/client_secret_details.html.j2' %}
|
||||
</div>
|
||||
</sl-details>
|
||||
{% if secret.secret %}
|
||||
<sl-details summary="Read/Update Secret">
|
||||
<div id="secretvalue">
|
||||
<div class="mb-6">
|
||||
@ -72,6 +110,7 @@
|
||||
</form>
|
||||
</sl-details>
|
||||
{% endif %}
|
||||
{% endif %}
|
||||
<sl-details summary="Events">
|
||||
<table class="min-w-full divide-y divide-gray-200 dark:divide-gray-600" id="last-audit-events">
|
||||
<thead class="bg-gray-50 dark:bg-gray-700">
|
||||
|
||||
@ -88,7 +88,7 @@ def create_router(dependencies: FrontendDependencies) -> APIRouter:
|
||||
client: Annotated[ClientUpdate, Form()],
|
||||
):
|
||||
"""Update a client."""
|
||||
original_client = await admin.get_client(id)
|
||||
original_client = await admin.get_client(("id", id))
|
||||
if not original_client:
|
||||
return templates.TemplateResponse(
|
||||
request, "fragments/error.html", {"message": "Client not found"}
|
||||
@ -131,7 +131,7 @@ def create_router(dependencies: FrontendDependencies) -> APIRouter:
|
||||
admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
|
||||
) -> Response:
|
||||
"""Delete a client."""
|
||||
await admin.delete_client(id)
|
||||
await admin.delete_client(("id", id))
|
||||
clients = await admin.get_clients()
|
||||
headers = {"Hx-Refresh": "true"}
|
||||
return templates.TemplateResponse(
|
||||
|
||||
@ -64,7 +64,7 @@ def create_router(dependencies: FrontendDependencies) -> APIRouter:
|
||||
current_user: Annotated[LocalUserInfo, Depends(dependencies.get_user_info)],
|
||||
):
|
||||
groups = await admin.get_secret_groups()
|
||||
LOG.info("Groups: %r", groups)
|
||||
LOG.info("Groups: %s", groups.model_dump_json(indent=2))
|
||||
return templates.TemplateResponse(
|
||||
request,
|
||||
"secrets/index.html.j2",
|
||||
@ -74,28 +74,121 @@ def create_router(dependencies: FrontendDependencies) -> APIRouter:
|
||||
},
|
||||
)
|
||||
|
||||
@app.get("/secrets/partial/root_group")
|
||||
async def get_root_group(
|
||||
# @app.get("/secrets/partial/root_group")
|
||||
# async def get_root_group(
|
||||
# request: Request,
|
||||
# admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
|
||||
# ):
|
||||
# """Get root group."""
|
||||
# clients = await admin.get_clients()
|
||||
# return templates.TemplateResponse(
|
||||
# request,
|
||||
# "secrets/partials/edit_root.html.j2",
|
||||
# {
|
||||
# "group_path_nodes": [],
|
||||
# "clients": clients,
|
||||
# },
|
||||
# )
|
||||
|
||||
# @app.get("/secrets/partial/secret/{name}")
|
||||
# async def get_secret_tree_detail_partial(
|
||||
# request: Request,
|
||||
# name: str,
|
||||
# admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
|
||||
# ):
|
||||
# """Get partial secret detail."""
|
||||
# secret = await admin.get_secret(name)
|
||||
# groups = await admin.get_secret_groups(flat=True)
|
||||
# events = await admin.get_audit_log_detailed(limit=10, secret_name=name)
|
||||
|
||||
# if not secret:
|
||||
# raise HTTPException(
|
||||
# status_code=status.HTTP_404_NOT_FOUND, detail="Secret not found"
|
||||
# )
|
||||
# return templates.TemplateResponse(
|
||||
# request,
|
||||
# "secrets/partials/tree_detail.html.j2",
|
||||
# {
|
||||
# "secret": secret,
|
||||
# "groups": groups,
|
||||
# "events": events,
|
||||
# },
|
||||
# )
|
||||
|
||||
@app.get("/secrets/group/")
|
||||
async def show_root_group(
|
||||
request: Request,
|
||||
admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
|
||||
current_user: Annotated[LocalUserInfo, Depends(dependencies.get_user_info)],
|
||||
):
|
||||
"""Get root group."""
|
||||
"""Show the root path."""
|
||||
clients = await admin.get_clients()
|
||||
return templates.TemplateResponse(
|
||||
request,
|
||||
"secrets/partials/edit_root.html.j2",
|
||||
{
|
||||
context: dict[str, Any] = {
|
||||
"clients": clients,
|
||||
},
|
||||
"root_group_page": True,
|
||||
}
|
||||
headers: dict[str, str] = {}
|
||||
if request.headers.get("HX-Request"):
|
||||
# This is a HTMX request.
|
||||
template_name = "secrets/partials/edit_root.html.j2"
|
||||
headers["HX-Push-Url"] = request.url.path
|
||||
else:
|
||||
groups = await admin.get_secret_groups()
|
||||
template_name = "secrets/index.html.j2"
|
||||
context["user"] = current_user
|
||||
context["groups"] = groups
|
||||
context["group_path_nodes"] = ["/"]
|
||||
|
||||
return templates.TemplateResponse(
|
||||
request, template_name, context, headers=headers
|
||||
)
|
||||
|
||||
@app.get("/secrets/partial/secret/{name}")
|
||||
@app.get("/secrets/group/{group_path:path}")
|
||||
async def show_group(
|
||||
request: Request,
|
||||
group_path: str,
|
||||
admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
|
||||
current_user: Annotated[LocalUserInfo, Depends(dependencies.get_user_info)],
|
||||
):
|
||||
"""Show a group."""
|
||||
group = await admin.get_secret_group_by_path(group_path)
|
||||
if not group:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND, detail="Group not found"
|
||||
)
|
||||
clients = await admin.get_clients()
|
||||
|
||||
headers: dict[str, str] = {}
|
||||
context: dict[str, Any] = {
|
||||
"group_page": True,
|
||||
"name": group.group_name,
|
||||
"description": group.description,
|
||||
"clients": clients,
|
||||
}
|
||||
if request.headers.get("HX-Request"):
|
||||
# This is a HTMX request.
|
||||
template_name = "secrets/partials/group_detail.html.j2"
|
||||
headers["HX-Push-Url"] = request.url.path
|
||||
else:
|
||||
template_name = "secrets/index.html.j2"
|
||||
|
||||
groups = await admin.get_secret_groups()
|
||||
context["user"] = current_user
|
||||
context["groups"] = groups
|
||||
context["group_path_nodes"] = group.path.split("/")
|
||||
|
||||
return templates.TemplateResponse(
|
||||
request, template_name, context, headers=headers
|
||||
)
|
||||
|
||||
@app.get("/secrets/secret/{name}")
|
||||
async def get_secret_tree_detail(
|
||||
request: Request,
|
||||
name: str,
|
||||
admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
|
||||
current_user: Annotated[LocalUserInfo, Depends(dependencies.get_user_info)],
|
||||
):
|
||||
"""Get partial secret detail."""
|
||||
"""Get secret detail."""
|
||||
secret = await admin.get_secret(name)
|
||||
groups = await admin.get_secret_groups(flat=True)
|
||||
events = await admin.get_audit_log_detailed(limit=10, secret_name=name)
|
||||
@ -104,14 +197,34 @@ def create_router(dependencies: FrontendDependencies) -> APIRouter:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND, detail="Secret not found"
|
||||
)
|
||||
return templates.TemplateResponse(
|
||||
request,
|
||||
"secrets/partials/tree_detail.html.j2",
|
||||
{
|
||||
|
||||
context: dict[str, Any] = {
|
||||
"secret": secret,
|
||||
"groups": groups,
|
||||
"events": events,
|
||||
},
|
||||
"secret_page": True,
|
||||
}
|
||||
|
||||
headers: dict[str, str] = {}
|
||||
|
||||
if request.headers.get("HX-Request"):
|
||||
# This is a HTMX request.
|
||||
template_name = "secrets/partials/tree_detail.html.j2"
|
||||
headers["HX-Push-Url"] = request.url.path
|
||||
else:
|
||||
group_path = ["/"]
|
||||
if secret.group:
|
||||
group = await admin.get_secret_group(secret.group)
|
||||
if group:
|
||||
group_path = group.path.split("/")
|
||||
|
||||
template_name = "secrets/index.html.j2"
|
||||
context["user"] = current_user
|
||||
context["groups"] = groups
|
||||
context["group_path_nodes"] = group_path
|
||||
|
||||
return templates.TemplateResponse(
|
||||
request, template_name, context, headers=headers
|
||||
)
|
||||
|
||||
@app.get("/secrets/partial/group/{name}")
|
||||
@ -461,7 +574,7 @@ def create_router(dependencies: FrontendDependencies) -> APIRouter:
|
||||
admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
|
||||
):
|
||||
"""Add a secret to a client."""
|
||||
await admin.create_client_secret(client, name)
|
||||
await admin.create_client_secret(("id", client), name)
|
||||
secret = await admin.get_secret(name)
|
||||
if not secret:
|
||||
raise HTTPException(
|
||||
@ -478,26 +591,20 @@ def create_router(dependencies: FrontendDependencies) -> APIRouter:
|
||||
},
|
||||
)
|
||||
|
||||
# @app.delete("/secrets/{name}")
|
||||
# async def delete_secret(
|
||||
# request: Request,
|
||||
# name: str,
|
||||
# admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
|
||||
# ):
|
||||
# """Delete a secret."""
|
||||
# await admin.delete_secret(name)
|
||||
# clients = await admin.get_clients()
|
||||
# secrets = await admin.get_detailed_secrets()
|
||||
# headers = {"Hx-Refresh": "true"}
|
||||
@app.delete("/secrets/{name}")
|
||||
async def delete_secret(
|
||||
request: Request,
|
||||
name: str,
|
||||
admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
|
||||
):
|
||||
"""Delete a secret."""
|
||||
await admin.delete_secret(name)
|
||||
headers = {"Hx-Refresh": "true"}
|
||||
|
||||
# return templates.TemplateResponse(
|
||||
# request,
|
||||
# "secrets/inner.html.j2",
|
||||
# {
|
||||
# "clients": clients,
|
||||
# "secrets": secrets,
|
||||
# },
|
||||
# headers=headers,
|
||||
# )
|
||||
return templates.TemplateResponse(
|
||||
request,
|
||||
"secrets/partials/default_detail.html.j2",
|
||||
headers=headers,
|
||||
)
|
||||
|
||||
return app
|
||||
|
||||
@ -12,13 +12,12 @@ from sshecret.backend import (
|
||||
AuditListResult,
|
||||
Client,
|
||||
ClientFilter,
|
||||
Secret,
|
||||
SshecretBackend,
|
||||
Operation,
|
||||
SubSystem,
|
||||
)
|
||||
from sshecret.backend.models import DetailedSecrets
|
||||
from sshecret.backend.api import AuditAPI
|
||||
from sshecret.backend.api import AuditAPI, KeySpec
|
||||
from sshecret.crypto import encrypt_string, load_public_key
|
||||
|
||||
from .keepass import PasswordContext, load_password_manager
|
||||
@ -27,6 +26,7 @@ from .models import (
|
||||
ClientSecretGroup,
|
||||
ClientSecretGroupList,
|
||||
SecretClientMapping,
|
||||
SecretListView,
|
||||
SecretGroup,
|
||||
SecretView,
|
||||
)
|
||||
@ -113,13 +113,13 @@ class AdminBackend:
|
||||
except Exception as e:
|
||||
raise BackendUnavailableError() from e
|
||||
|
||||
async def _get_client(self, name: str) -> Client | None:
|
||||
async def _get_client(self, idname: KeySpec) -> Client | None:
|
||||
"""Get a client from the backend."""
|
||||
return await self.backend.get_client(name)
|
||||
return await self.backend.get_client(idname)
|
||||
|
||||
async def _verify_client_exists(self, name: str) -> None:
|
||||
async def _verify_client_exists(self, idname: KeySpec) -> None:
|
||||
"""Check that a client exists."""
|
||||
client = await self.backend.get_client(name)
|
||||
client = await self.backend.get_client(idname)
|
||||
if not client:
|
||||
raise ClientNotFoundError()
|
||||
return None
|
||||
@ -133,7 +133,7 @@ class AdminBackend:
|
||||
except Exception as e:
|
||||
raise BackendUnavailableError() from e
|
||||
|
||||
async def get_client(self, name: str) -> Client | None:
|
||||
async def get_client(self, name: KeySpec) -> Client | None:
|
||||
"""Get a client from the backend."""
|
||||
try:
|
||||
return await self._get_client(name)
|
||||
@ -176,7 +176,10 @@ class AdminBackend:
|
||||
raise BackendUnavailableError() from e
|
||||
|
||||
async def _update_client_public_key(
|
||||
self, name: str, new_key: str, password_manager: PasswordContext
|
||||
self,
|
||||
name: KeySpec,
|
||||
new_key: str,
|
||||
password_manager: PasswordContext,
|
||||
) -> list[str]:
|
||||
"""Update client public key."""
|
||||
LOG.info(
|
||||
@ -203,7 +206,7 @@ class AdminBackend:
|
||||
|
||||
return updated_secrets
|
||||
|
||||
async def update_client_public_key(self, name: str, new_key: str) -> list[str]:
|
||||
async def update_client_public_key(self, name: KeySpec, new_key: str) -> list[str]:
|
||||
"""Update client public key."""
|
||||
try:
|
||||
with self.password_manager() as password_manager:
|
||||
@ -238,18 +241,18 @@ class AdminBackend:
|
||||
except Exception as e:
|
||||
raise BackendUnavailableError() from e
|
||||
|
||||
async def update_client_sources(self, name: str, sources: list[str]) -> None:
|
||||
async def update_client_sources(self, name: KeySpec, sources: list[str]) -> None:
|
||||
"""Update client sources."""
|
||||
try:
|
||||
await self.backend.update_client_sources(name, sources)
|
||||
except Exception as e:
|
||||
raise BackendUnavailableError() from e
|
||||
|
||||
async def _delete_client(self, name: str) -> None:
|
||||
async def _delete_client(self, name: KeySpec) -> None:
|
||||
"""Delete client."""
|
||||
await self.backend.delete_client(name)
|
||||
|
||||
async def delete_client(self, name: str) -> None:
|
||||
async def delete_client(self, name: KeySpec) -> None:
|
||||
"""Delete client."""
|
||||
try:
|
||||
await self._delete_client(name)
|
||||
@ -258,30 +261,41 @@ class AdminBackend:
|
||||
except Exception as e:
|
||||
raise BackendUnavailableError() from e
|
||||
|
||||
async def delete_client_secret(self, client_name: str, secret_name: str) -> None:
|
||||
async def delete_client_secret(
|
||||
self, client_name: KeySpec, secret_name: KeySpec
|
||||
) -> None:
|
||||
"""Delete a secret from a client."""
|
||||
try:
|
||||
await self.backend.delete_client_secret(client_name, secret_name)
|
||||
except Exception as e:
|
||||
raise BackendUnavailableError() from e
|
||||
|
||||
async def _get_secrets(self) -> list[Secret]:
|
||||
async def _get_secrets(self) -> list[SecretListView]:
|
||||
"""Get secrets.
|
||||
|
||||
This fetches the secret to client mapping from backend, and adds secrets from the password manager.
|
||||
"""
|
||||
backend_secrets = await self.backend.get_secrets()
|
||||
with self.password_manager() as password_manager:
|
||||
all_secrets = password_manager.get_available_secrets()
|
||||
admin_secrets = password_manager.get_available_secrets()
|
||||
|
||||
secrets = await self.backend.get_secrets()
|
||||
backend_secret_names = [secret.name for secret in secrets]
|
||||
for secret in all_secrets:
|
||||
if secret not in backend_secret_names:
|
||||
secrets.append(Secret(name=secret, clients=[]))
|
||||
secrets: dict[str, SecretListView] = {}
|
||||
for secret in backend_secrets:
|
||||
secrets[secret.name] = SecretListView(
|
||||
name=secret.name, unmanaged=True, clients=secret.clients
|
||||
)
|
||||
|
||||
return secrets
|
||||
for secret_name in admin_secrets:
|
||||
if secret_name in secrets:
|
||||
secrets[secret_name].unmanaged = False
|
||||
continue
|
||||
secrets[secret_name] = SecretListView(
|
||||
name=secret_name, unmanaged=False, clients=[]
|
||||
)
|
||||
|
||||
async def get_secrets(self) -> list[Secret]:
|
||||
return list(secrets.values())
|
||||
|
||||
async def get_secrets(self) -> list[SecretListView]:
|
||||
"""Get secrets from backend."""
|
||||
try:
|
||||
return await self._get_secrets()
|
||||
@ -381,6 +395,8 @@ class AdminBackend:
|
||||
)
|
||||
ungrouped = password_manager.get_ungrouped_secrets()
|
||||
|
||||
all_admin_secrets = password_manager.get_available_secrets()
|
||||
|
||||
group_result: list[ClientSecretGroup] = []
|
||||
for group in all_groups:
|
||||
# We have to do this recursively.
|
||||
@ -397,7 +413,19 @@ class AdminBackend:
|
||||
mapping.clients = client_mapping.clients
|
||||
ungrouped_clients.append(mapping)
|
||||
|
||||
# We need to process unmanaged secrets too.
|
||||
unmanaged_secrets = [
|
||||
secret for secret in all_secrets if secret.name not in all_admin_secrets
|
||||
]
|
||||
for secret in unmanaged_secrets:
|
||||
ungrouped_clients.append(
|
||||
SecretClientMapping(
|
||||
name=secret.name, unmanaged=True, clients=secret.clients
|
||||
)
|
||||
)
|
||||
|
||||
result.ungrouped = ungrouped_clients
|
||||
|
||||
return result
|
||||
|
||||
async def get_secret_group(self, name: str) -> ClientSecretGroup | None:
|
||||
@ -407,6 +435,18 @@ class AdminBackend:
|
||||
return matches.groups[0]
|
||||
return None
|
||||
|
||||
async def get_secret_group_by_path(self, path: str) -> ClientSecretGroup | None:
|
||||
"""Get a group based on its path."""
|
||||
with self.password_manager() as password_manager:
|
||||
secret_group = password_manager.get_secret_group(path)
|
||||
|
||||
if not secret_group:
|
||||
return None
|
||||
|
||||
all_secrets = await self.backend.get_detailed_secrets()
|
||||
secrets_mapping = {secret.name: secret for secret in all_secrets}
|
||||
return add_clients_to_secret_group(secret_group, secrets_mapping)
|
||||
|
||||
async def get_secret(self, name: str) -> SecretView | None:
|
||||
"""Get secrets from backend."""
|
||||
try:
|
||||
@ -416,18 +456,24 @@ class AdminBackend:
|
||||
except Exception as e:
|
||||
raise BackendUnavailableError() from e
|
||||
|
||||
async def _get_secret(self, name: str) -> SecretView | None:
|
||||
async def _get_secret(
|
||||
self, name: str, secret_id: str | None = None
|
||||
) -> SecretView | None:
|
||||
"""Get a secret, including the actual unencrypted value and clients."""
|
||||
secret: str | None = None
|
||||
with self.password_manager() as password_manager:
|
||||
secret = password_manager.get_secret(name)
|
||||
secret_group = password_manager.get_entry_group(name)
|
||||
|
||||
if not secret:
|
||||
return None
|
||||
secret_view = SecretView(name=name, secret=secret, group=secret_group)
|
||||
secret_mapping = await self.backend.get_secret(name)
|
||||
|
||||
idname: KeySpec = name
|
||||
if secret_id:
|
||||
idname = ("id", secret_id)
|
||||
|
||||
secret_mapping = await self.backend.get_secret(idname)
|
||||
if secret_mapping:
|
||||
secret_view.clients = secret_mapping.clients
|
||||
secret_view.clients = [ref.name for ref in secret_mapping.clients]
|
||||
|
||||
return secret_view
|
||||
|
||||
@ -450,7 +496,7 @@ class AdminBackend:
|
||||
return
|
||||
for client in secret_mapping.clients:
|
||||
LOG.info("Deleting secret %s from client %s", name, client)
|
||||
await self.backend.delete_client_secret(client, name)
|
||||
await self.backend.delete_client_secret(("id", client.id), name)
|
||||
|
||||
async def _add_secret(
|
||||
self,
|
||||
@ -467,7 +513,7 @@ class AdminBackend:
|
||||
if update:
|
||||
secret_map = await self.backend.get_secret(name)
|
||||
if secret_map:
|
||||
clients = secret_map.clients
|
||||
clients = [ref.name for ref in secret_map.clients]
|
||||
|
||||
if not clients:
|
||||
return
|
||||
@ -507,11 +553,13 @@ class AdminBackend:
|
||||
except Exception as e:
|
||||
raise BackendUnavailableError() from e
|
||||
|
||||
async def _create_client_secret(self, client_name: str, secret_name: str) -> None:
|
||||
async def _create_client_secret(
|
||||
self, client_idname: KeySpec, secret_name: str
|
||||
) -> None:
|
||||
"""Create client secret."""
|
||||
client = await self.get_client(client_name)
|
||||
client = await self.get_client(client_idname)
|
||||
if not client:
|
||||
raise ClientNotFoundError()
|
||||
raise ClientNotFoundError(client_idname)
|
||||
|
||||
with self.password_manager() as password_manager:
|
||||
secret = password_manager.get_secret(secret_name)
|
||||
@ -520,12 +568,14 @@ class AdminBackend:
|
||||
|
||||
public_key = load_public_key(client.public_key.encode())
|
||||
encrypted = encrypt_string(secret, public_key)
|
||||
await self.backend.create_client_secret(client_name, secret_name, encrypted)
|
||||
await self.backend.create_client_secret(client_idname, secret_name, encrypted)
|
||||
|
||||
async def create_client_secret(self, client_name: str, secret_name: str) -> None:
|
||||
async def create_client_secret(
|
||||
self, client_idname: KeySpec, secret_name: str
|
||||
) -> None:
|
||||
"""Create client secret."""
|
||||
try:
|
||||
await self._create_client_secret(client_name, secret_name)
|
||||
await self._create_client_secret(client_idname, secret_name)
|
||||
except ClientManagementError:
|
||||
raise
|
||||
except Exception as e:
|
||||
|
||||
@ -32,7 +32,9 @@ def create_password_db(location: Path, password: str) -> None:
|
||||
|
||||
|
||||
def _kp_group_to_secret_group(
|
||||
kp_group: pykeepass.group.Group, parent: SecretGroup | None = None, depth: int | None = None
|
||||
kp_group: pykeepass.group.Group,
|
||||
parent: SecretGroup | None = None,
|
||||
depth: int | None = None,
|
||||
) -> SecretGroup:
|
||||
"""Convert keepass group to secret group dataclass."""
|
||||
group_name = cast(str, kp_group.name)
|
||||
@ -143,8 +145,9 @@ class PasswordContext:
|
||||
return None
|
||||
return str(entry.group.name)
|
||||
|
||||
|
||||
def get_secret_groups(self, pattern: str | None = None, regex: bool = True) -> list[SecretGroup]:
|
||||
def get_secret_groups(
|
||||
self, pattern: str | None = None, regex: bool = True
|
||||
) -> list[SecretGroup]:
|
||||
"""Get secret groups.
|
||||
|
||||
A regex pattern may be provided to filter groups.
|
||||
@ -160,7 +163,9 @@ class PasswordContext:
|
||||
secret_groups = [_kp_group_to_secret_group(group) for group in groups]
|
||||
return secret_groups
|
||||
|
||||
def get_secret_group_list(self, pattern: str | None = None, regex: bool = True) -> list[SecretGroup]:
|
||||
def get_secret_group_list(
|
||||
self, pattern: str | None = None, regex: bool = True
|
||||
) -> list[SecretGroup]:
|
||||
"""Get a flat list of groups."""
|
||||
if pattern:
|
||||
return self.get_secret_groups(pattern, regex)
|
||||
@ -169,6 +174,24 @@ class PasswordContext:
|
||||
secret_groups = [_kp_group_to_secret_group(group) for group in groups]
|
||||
return secret_groups
|
||||
|
||||
def get_secret_group(self, path: str) -> SecretGroup | None:
|
||||
"""Get a secret group by path."""
|
||||
elements = path.split("/")
|
||||
final_element = elements[-1]
|
||||
|
||||
current = self._root_group
|
||||
while elements:
|
||||
groupname = elements.pop(0)
|
||||
matches = [
|
||||
subgroup for subgroup in current.subgroups if subgroup.name == groupname
|
||||
]
|
||||
if matches:
|
||||
current = matches[0]
|
||||
else:
|
||||
return None
|
||||
if not current.is_root_group and current.name == final_element:
|
||||
return _kp_group_to_secret_group(current)
|
||||
return None
|
||||
|
||||
def get_ungrouped_secrets(self) -> list[str]:
|
||||
"""Get secrets without groups."""
|
||||
@ -193,7 +216,9 @@ class PasswordContext:
|
||||
f"Error: Cannot find a parent group named {parent_group}"
|
||||
)
|
||||
kp_parent_group = query
|
||||
self.keepass.add_group(destination_group=kp_parent_group, group_name=name, notes=description)
|
||||
self.keepass.add_group(
|
||||
destination_group=kp_parent_group, group_name=name, notes=description
|
||||
)
|
||||
self.keepass.save()
|
||||
|
||||
def set_group_description(self, name: str, description: str) -> None:
|
||||
|
||||
@ -25,6 +25,7 @@ class SecretListView(BaseModel):
|
||||
"""Model containing a list of all available secrets."""
|
||||
|
||||
name: str
|
||||
unmanaged: bool = False
|
||||
clients: list[str] = Field(default_factory=list) # Clients that have access to it.
|
||||
|
||||
|
||||
@ -32,7 +33,7 @@ class SecretView(BaseModel):
|
||||
"""Model containing a secret, including its clear-text value."""
|
||||
|
||||
name: str
|
||||
secret: str
|
||||
secret: str | None
|
||||
group: str | None = None
|
||||
clients: list[str] = Field(default_factory=list) # Clients that have access to it.
|
||||
|
||||
@ -134,6 +135,7 @@ class SecretClientMapping(BaseModel):
|
||||
"""Secret name with list of clients."""
|
||||
|
||||
name: str # name of secret
|
||||
unmanaged: bool = False
|
||||
clients: list[ClientReference] = Field(default_factory=list)
|
||||
|
||||
|
||||
|
||||
@ -327,9 +327,6 @@
|
||||
.start-0 {
|
||||
inset-inline-start: calc(var(--spacing) * 0);
|
||||
}
|
||||
.end-2\.5 {
|
||||
inset-inline-end: calc(var(--spacing) * 2.5);
|
||||
}
|
||||
.top-0 {
|
||||
top: calc(var(--spacing) * 0);
|
||||
}
|
||||
@ -417,18 +414,12 @@
|
||||
.m-361 {
|
||||
margin: calc(var(--spacing) * 361);
|
||||
}
|
||||
.mx-2\.5 {
|
||||
margin-inline: calc(var(--spacing) * 2.5);
|
||||
}
|
||||
.mx-3 {
|
||||
margin-inline: calc(var(--spacing) * 3);
|
||||
}
|
||||
.mx-4 {
|
||||
margin-inline: calc(var(--spacing) * 4);
|
||||
}
|
||||
.mx-\[1rem\] {
|
||||
margin-inline: 1rem;
|
||||
}
|
||||
.mx-auto {
|
||||
margin-inline: auto;
|
||||
}
|
||||
@ -459,9 +450,6 @@
|
||||
.ms-3 {
|
||||
margin-inline-start: calc(var(--spacing) * 3);
|
||||
}
|
||||
.ms-auto {
|
||||
margin-inline-start: auto;
|
||||
}
|
||||
.me-2 {
|
||||
margin-inline-end: calc(var(--spacing) * 2);
|
||||
}
|
||||
@ -486,9 +474,6 @@
|
||||
.mt-2 {
|
||||
margin-top: calc(var(--spacing) * 2);
|
||||
}
|
||||
.mt-2\.5 {
|
||||
margin-top: calc(var(--spacing) * 2.5);
|
||||
}
|
||||
.mt-3 {
|
||||
margin-top: calc(var(--spacing) * 3);
|
||||
}
|
||||
@ -597,9 +582,6 @@
|
||||
.ml-6 {
|
||||
margin-left: calc(var(--spacing) * 6);
|
||||
}
|
||||
.ml-\[1rem\] {
|
||||
margin-left: 1rem;
|
||||
}
|
||||
.ml-auto {
|
||||
margin-left: auto;
|
||||
}
|
||||
@ -684,9 +666,6 @@
|
||||
.h-32 {
|
||||
height: calc(var(--spacing) * 32);
|
||||
}
|
||||
.h-48 {
|
||||
height: calc(var(--spacing) * 48);
|
||||
}
|
||||
.h-\[0\.125rem\] {
|
||||
height: 0.125rem;
|
||||
}
|
||||
@ -696,9 +675,6 @@
|
||||
.h-\[36rem\] {
|
||||
height: 36rem;
|
||||
}
|
||||
.h-\[calc\(100\%-1rem\)\] {
|
||||
height: calc(100% - 1rem);
|
||||
}
|
||||
.h-full {
|
||||
height: 100%;
|
||||
}
|
||||
@ -708,9 +684,6 @@
|
||||
.max-h-64 {
|
||||
max-height: calc(var(--spacing) * 64);
|
||||
}
|
||||
.max-h-full {
|
||||
max-height: 100%;
|
||||
}
|
||||
.min-h-0 {
|
||||
min-height: calc(var(--spacing) * 0);
|
||||
}
|
||||
@ -1228,9 +1201,6 @@
|
||||
--tw-border-style: solid;
|
||||
border-style: solid;
|
||||
}
|
||||
.border-blue-700 {
|
||||
border-color: var(--color-blue-700);
|
||||
}
|
||||
.border-gray-100 {
|
||||
border-color: var(--color-gray-100);
|
||||
}
|
||||
@ -1240,12 +1210,6 @@
|
||||
.border-gray-300 {
|
||||
border-color: var(--color-gray-300);
|
||||
}
|
||||
.border-gray-500 {
|
||||
border-color: var(--color-gray-500);
|
||||
}
|
||||
.border-gray-900 {
|
||||
border-color: var(--color-gray-900);
|
||||
}
|
||||
.border-green-100 {
|
||||
border-color: var(--color-green-100);
|
||||
}
|
||||
@ -1745,9 +1709,6 @@
|
||||
.text-blue-600 {
|
||||
color: var(--color-blue-600);
|
||||
}
|
||||
.text-blue-700 {
|
||||
color: var(--color-blue-700);
|
||||
}
|
||||
.text-blue-800 {
|
||||
color: var(--color-blue-800);
|
||||
}
|
||||
@ -2143,20 +2104,6 @@
|
||||
}
|
||||
}
|
||||
}
|
||||
.hover\:bg-blue-700 {
|
||||
&:hover {
|
||||
@media (hover: hover) {
|
||||
background-color: var(--color-blue-700);
|
||||
}
|
||||
}
|
||||
}
|
||||
.hover\:bg-blue-800 {
|
||||
&:hover {
|
||||
@media (hover: hover) {
|
||||
background-color: var(--color-blue-800);
|
||||
}
|
||||
}
|
||||
}
|
||||
.hover\:bg-gray-50 {
|
||||
&:hover {
|
||||
@media (hover: hover) {
|
||||
@ -2624,11 +2571,6 @@
|
||||
padding-inline: calc(var(--spacing) * 4);
|
||||
}
|
||||
}
|
||||
.sm\:px-16 {
|
||||
@media (width >= 40rem) {
|
||||
padding-inline: calc(var(--spacing) * 16);
|
||||
}
|
||||
}
|
||||
.sm\:py-2 {
|
||||
@media (width >= 40rem) {
|
||||
padding-block: calc(var(--spacing) * 2);
|
||||
@ -2870,11 +2812,6 @@
|
||||
padding: calc(var(--spacing) * 0);
|
||||
}
|
||||
}
|
||||
.md\:p-5 {
|
||||
@media (width >= 48rem) {
|
||||
padding: calc(var(--spacing) * 5);
|
||||
}
|
||||
}
|
||||
.md\:p-6 {
|
||||
@media (width >= 48rem) {
|
||||
padding: calc(var(--spacing) * 6);
|
||||
@ -3104,12 +3041,6 @@
|
||||
line-height: var(--tw-leading, var(--text-6xl--line-height));
|
||||
}
|
||||
}
|
||||
.lg\:text-xl {
|
||||
@media (width >= 64rem) {
|
||||
font-size: var(--text-xl);
|
||||
line-height: var(--tw-leading, var(--text-xl--line-height));
|
||||
}
|
||||
}
|
||||
.lg\:hover\:underline {
|
||||
@media (width >= 64rem) {
|
||||
&:hover {
|
||||
@ -3237,11 +3168,6 @@
|
||||
padding-inline: calc(var(--spacing) * 0);
|
||||
}
|
||||
}
|
||||
.xl\:px-48 {
|
||||
@media (width >= 80rem) {
|
||||
padding-inline: calc(var(--spacing) * 48);
|
||||
}
|
||||
}
|
||||
.xl\:py-24 {
|
||||
@media (width >= 80rem) {
|
||||
padding-block: calc(var(--spacing) * 24);
|
||||
@ -3461,11 +3387,6 @@
|
||||
background-color: var(--color-red-700);
|
||||
}
|
||||
}
|
||||
.dark\:bg-red-900 {
|
||||
&:where(.dark, .dark *) {
|
||||
background-color: var(--color-red-900);
|
||||
}
|
||||
}
|
||||
.dark\:bg-teal-900 {
|
||||
&:where(.dark, .dark *) {
|
||||
background-color: var(--color-teal-900);
|
||||
@ -3516,11 +3437,6 @@
|
||||
color: var(--color-gray-600);
|
||||
}
|
||||
}
|
||||
.dark\:text-gray-700 {
|
||||
&:where(.dark, .dark *) {
|
||||
color: var(--color-gray-700);
|
||||
}
|
||||
}
|
||||
.dark\:text-green-400 {
|
||||
&:where(.dark, .dark *) {
|
||||
color: var(--color-green-400);
|
||||
@ -3561,11 +3477,6 @@
|
||||
color: var(--color-purple-500);
|
||||
}
|
||||
}
|
||||
.dark\:text-red-300 {
|
||||
&:where(.dark, .dark *) {
|
||||
color: var(--color-red-300);
|
||||
}
|
||||
}
|
||||
.dark\:text-red-400 {
|
||||
&:where(.dark, .dark *) {
|
||||
color: var(--color-red-400);
|
||||
|
||||
@ -5,7 +5,7 @@ import pykeepass
|
||||
import pykeepass.exceptions
|
||||
import pytest
|
||||
|
||||
from sshecret_admin.services.keepass import PasswordContext, _password_context, PasswordCredentialsError
|
||||
from sshecret_admin.services.keepass import _password_context, PasswordCredentialsError
|
||||
|
||||
def test_open_invalid_database() -> None:
|
||||
"""Test opening a non-existing database."""
|
||||
|
||||
@ -128,8 +128,9 @@ class ClientOperations:
|
||||
"""Delete client."""
|
||||
db_client = await self._get_client(client)
|
||||
if not db_client:
|
||||
return
|
||||
raise HTTPException(status_code=404, detail="Client not found.")
|
||||
if db_client.is_deleted:
|
||||
LOG.warning("Client %r was already deleted!", client)
|
||||
return
|
||||
db_client.is_deleted = True
|
||||
db_client.deleted_at = datetime.now(timezone.utc)
|
||||
@ -271,7 +272,12 @@ async def get_clients(
|
||||
filter_query: ClientListParams,
|
||||
) -> ClientQueryResult:
|
||||
"""Get Clients."""
|
||||
count_statement = select(func.count("*")).select_from(Client)
|
||||
count_statement = (
|
||||
select(func.count("*"))
|
||||
.select_from(Client)
|
||||
.where(Client.is_deleted.is_not(True))
|
||||
.where(Client.is_active.is_not(False))
|
||||
)
|
||||
count_statement = cast(
|
||||
Select[tuple[int]],
|
||||
filter_client_statement(count_statement, filter_query, True),
|
||||
|
||||
@ -40,7 +40,9 @@ class ClientView(BaseModel):
|
||||
return responses
|
||||
|
||||
@classmethod
|
||||
def from_client(cls, client: models.Client) -> Self:
|
||||
def from_client(
|
||||
cls, client: models.Client, include_deleted_secrets: bool = False
|
||||
) -> Self:
|
||||
"""Instantiate from a client."""
|
||||
view = cls(
|
||||
id=client.id,
|
||||
@ -54,7 +56,12 @@ class ClientView(BaseModel):
|
||||
is_deleted=client.is_deleted,
|
||||
)
|
||||
if client.secrets:
|
||||
if include_deleted_secrets:
|
||||
view.secrets = [secret.name for secret in client.secrets]
|
||||
else:
|
||||
view.secrets = [
|
||||
secret.name for secret in client.secrets if not secret.deleted
|
||||
]
|
||||
|
||||
if client.policies:
|
||||
view.policies = [policy.source for policy in client.policies]
|
||||
|
||||
@ -116,24 +116,18 @@ async def resolve_client_id(
|
||||
return None
|
||||
|
||||
|
||||
async def get_client_by_id(session: AsyncSession, id: uuid.UUID) -> Client | None:
|
||||
async def get_client_by_id(
|
||||
session: AsyncSession, id: uuid.UUID, include_deleted: bool = False
|
||||
) -> Client | None:
|
||||
"""Get client by ID."""
|
||||
if include_deleted:
|
||||
client_filter = client_with_relationships().where(Client.id == id)
|
||||
else:
|
||||
client_filter = query_active_clients().where(Client.id == id)
|
||||
client_results = await session.execute(client_filter)
|
||||
return client_results.scalars().first()
|
||||
|
||||
|
||||
async def get_client_by_id_or_name(
|
||||
session: AsyncSession, id_or_name: str
|
||||
) -> Client | None:
|
||||
"""Get client either by id or name."""
|
||||
if RE_UUID.match(id_or_name):
|
||||
id = uuid.UUID(id_or_name)
|
||||
return await get_client_by_id(session, id)
|
||||
|
||||
return await get_client_by_name(session, id_or_name)
|
||||
|
||||
|
||||
def query_active_clients() -> Select[tuple[Client]]:
|
||||
"""Get all active clients."""
|
||||
client_filter = (
|
||||
@ -144,22 +138,6 @@ def query_active_clients() -> Select[tuple[Client]]:
|
||||
return client_filter
|
||||
|
||||
|
||||
async def get_client_by_name(session: AsyncSession, name: str) -> Client | None:
|
||||
"""Get client by name.
|
||||
|
||||
This will get the latest client version, unless it's deleted.
|
||||
"""
|
||||
client_filter = (
|
||||
client_with_relationships()
|
||||
.where(Client.is_active.is_(True))
|
||||
.where(Client.is_deleted.is_not(True))
|
||||
.where(Client.name == name)
|
||||
.order_by(Client.version.desc())
|
||||
)
|
||||
client_result = await session.execute(client_filter)
|
||||
return client_result.scalars().first()
|
||||
|
||||
|
||||
async def refresh_client(session: AsyncSession, client: Client) -> None:
|
||||
"""Refresh the client and load in all relationships."""
|
||||
await session.refresh(
|
||||
|
||||
@ -182,8 +182,10 @@ class ClientSecretOperations:
|
||||
|
||||
async def delete_client_secret(self, secret_identifier: FlexID) -> None:
|
||||
"""Delete a client secret."""
|
||||
LOG.debug("delete_client_secret called with identifier %r", secret_identifier)
|
||||
client_secret = await self._get_client_secret(secret_identifier)
|
||||
if not client_secret:
|
||||
LOG.warning("Could not find any secret matching client secret.")
|
||||
return
|
||||
|
||||
client_secret.deleted = True
|
||||
|
||||
@ -72,7 +72,6 @@ def create_client_secrets_router(get_db_session: AsyncDBSessionDep) -> APIRouter
|
||||
client_op = ClientSecretOperations(session, request, client)
|
||||
return await client_op.get_client_secret(secret)
|
||||
|
||||
# TODO: delete_client_secret
|
||||
@router.delete("/clients/{client_identifier}/secrets/{secret_identifier}")
|
||||
async def delete_client_secret(
|
||||
request: Request,
|
||||
|
||||
@ -7,14 +7,19 @@ import logging
|
||||
from typing import cast, final, override
|
||||
|
||||
import asyncssh
|
||||
|
||||
from sshecret_sshd import exceptions, constants
|
||||
from sshecret_sshd import constants, exceptions
|
||||
|
||||
from .base import CommandDispatcher
|
||||
from .get_secret import GetSecret
|
||||
from .register import Register
|
||||
from .list_secrets import ListSecrets
|
||||
from .ping import PingCommand
|
||||
from .register import Register
|
||||
from .shelldriver import (
|
||||
ShellDeleteSecret,
|
||||
ShellListSecrets,
|
||||
ShellLookupSecret,
|
||||
ShellStoreSecret,
|
||||
)
|
||||
|
||||
|
||||
SYNOPSIS = """[bold]Sshecret SSH Server[/bold]
|
||||
@ -29,9 +34,13 @@ encoded as base64.
|
||||
|
||||
COMMANDS = [
|
||||
GetSecret,
|
||||
Register,
|
||||
ListSecrets,
|
||||
PingCommand,
|
||||
Register,
|
||||
ShellDeleteSecret,
|
||||
ShellListSecrets,
|
||||
ShellLookupSecret,
|
||||
ShellStoreSecret,
|
||||
]
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
163
packages/sshecret-sshd/src/sshecret_sshd/commands/shelldriver.py
Normal file
163
packages/sshecret-sshd/src/sshecret_sshd/commands/shelldriver.py
Normal file
@ -0,0 +1,163 @@
|
||||
"""Podman Shelldriver compatible commands."""
|
||||
|
||||
import logging
|
||||
from typing import final, override
|
||||
import asyncssh
|
||||
|
||||
from sshecret.backend.models import Operation
|
||||
from sshecret.crypto import encrypt_string, load_public_key
|
||||
|
||||
|
||||
from .base import CommandDispatcher
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
# These error messages are taken verbatim from podman, and while they don't seem
|
||||
# to make complete sense, they will be used regardless.
|
||||
ERR_SECRET_NOT_FOUND = "no such secret"
|
||||
ERR_SECRET_EXISTS = "secret data with ID already exists"
|
||||
ERR_INVALID_SECRET = "invalid key"
|
||||
|
||||
|
||||
@final
|
||||
class ShellListSecrets(CommandDispatcher):
|
||||
"""List secrets.
|
||||
|
||||
This command lists secrets in a format compatible with podman's ShellDriver.
|
||||
"""
|
||||
|
||||
name = "list"
|
||||
|
||||
@override
|
||||
async def exec(self) -> None:
|
||||
"""List secrets."""
|
||||
LOG.debug("ShellListSecret called.")
|
||||
await self.audit(Operation.READ, "Listed available secret names")
|
||||
for secret_name in self.client.secrets:
|
||||
self.print(secret_name)
|
||||
|
||||
|
||||
@final
|
||||
class ShellDeleteSecret(CommandDispatcher):
|
||||
"""Delete a secret.
|
||||
|
||||
If the identifier for a secret does not exist, an error will be printed.
|
||||
"""
|
||||
|
||||
name = "delete"
|
||||
mandatory_argument = "KEY"
|
||||
|
||||
@override
|
||||
async def exec(self) -> None:
|
||||
"""Delete a secret."""
|
||||
secret_name = self.arguments[0]
|
||||
LOG.debug("ShellDeleteSecret called withg arguments %r.", self.arguments)
|
||||
await self.audit(
|
||||
operation=Operation.DELETE,
|
||||
message="ClientSecret deleted",
|
||||
secret=secret_name,
|
||||
)
|
||||
await self.backend.delete_client_secret(
|
||||
("id", str(self.client.id)), ("name", secret_name)
|
||||
)
|
||||
|
||||
|
||||
@final
|
||||
class ShellLookupSecret(CommandDispatcher):
|
||||
"""Look up a secret.
|
||||
|
||||
The identifier for the secret must be provided as the argument.
|
||||
"""
|
||||
|
||||
name = "lookup"
|
||||
mandatory_argument = "KEY"
|
||||
|
||||
@override
|
||||
async def exec(self) -> None:
|
||||
"""Lookup secret."""
|
||||
LOG.debug("ShellLookupSecret called with arguments %r", self.arguments)
|
||||
secret_name = self.arguments[0]
|
||||
|
||||
secret = await self.backend.get_client_secret(
|
||||
("id", str(self.client.id)), secret_name
|
||||
)
|
||||
if not secret:
|
||||
LOG.debug(
|
||||
"Secret %s not found for client %s (%s)",
|
||||
secret_name,
|
||||
self.client.id,
|
||||
self.client.name,
|
||||
)
|
||||
self.print(ERR_SECRET_NOT_FOUND, stderr=True)
|
||||
return
|
||||
await self.audit(
|
||||
Operation.READ, message="Client requested secret", secret=secret_name
|
||||
)
|
||||
|
||||
self.print(secret)
|
||||
|
||||
|
||||
@final
|
||||
class ShellStoreSecret(CommandDispatcher):
|
||||
"""Store a secret.
|
||||
|
||||
Secret will be read from command argument, or via STDIN.
|
||||
"""
|
||||
|
||||
name = "store"
|
||||
mandatory_argument = "KEY"
|
||||
|
||||
@override
|
||||
async def exec(self) -> None:
|
||||
"""Store a secret."""
|
||||
LOG.debug("ShellStoreSecret called with arguments %r", self.arguments)
|
||||
secret_name = self.arguments[0]
|
||||
if secret_name in self.client.secrets:
|
||||
self.print(ERR_SECRET_EXISTS, stderr=True)
|
||||
return
|
||||
|
||||
secret_data: str | None = None
|
||||
if len(self.arguments) == 2:
|
||||
secret_data = self.arguments[1]
|
||||
|
||||
if not secret_data:
|
||||
LOG.debug("No secret set as input, trying stdin.")
|
||||
secret_data = await self.get_secret_on_stdin()
|
||||
|
||||
if not secret_data:
|
||||
self.print(ERR_INVALID_SECRET, stderr=True)
|
||||
return
|
||||
|
||||
# Encrypt secret
|
||||
encrypted = self.encrypt_secret(secret_data)
|
||||
|
||||
await self.backend.create_client_secret(
|
||||
("id", str(self.client.id)), secret_name, encrypted
|
||||
)
|
||||
await self.audit(
|
||||
operation=Operation.CREATE,
|
||||
message="Secret created from 'store' command",
|
||||
secret=secret_name,
|
||||
)
|
||||
|
||||
def encrypt_secret(self, value: str) -> str:
|
||||
"""Encrypt a secret."""
|
||||
public_key = load_public_key(self.client.public_key.encode())
|
||||
return encrypt_string(value, public_key)
|
||||
|
||||
async def get_secret_on_stdin(self) -> str | None:
|
||||
"""Get secret from stdin."""
|
||||
secret_data = ""
|
||||
try:
|
||||
async for line in self.process.stdin:
|
||||
if self.process.stdin.at_eof():
|
||||
break
|
||||
if not line:
|
||||
break
|
||||
secret_data += line.rstrip()
|
||||
except asyncssh.BreakReceived:
|
||||
pass
|
||||
|
||||
if not secret_data:
|
||||
return None
|
||||
return secret_data
|
||||
@ -5,7 +5,7 @@ admin and sshd library do not need to implement the same
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import Any, Self, override
|
||||
from typing import Any, Literal, Self, override
|
||||
import httpx
|
||||
|
||||
from pydantic import TypeAdapter
|
||||
@ -29,6 +29,17 @@ from .utils import validate_public_key
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
KeyType = Literal["id", "name"]
|
||||
KeySpec = str | tuple[KeyType, str]
|
||||
|
||||
|
||||
def _key(id_or_name: KeySpec) -> str:
|
||||
"""Get the correct key string."""
|
||||
if isinstance(id_or_name, str):
|
||||
return id_or_name
|
||||
prefix, suffix = id_or_name
|
||||
return f"{prefix}:{suffix}"
|
||||
|
||||
|
||||
class ClientQueryIterator:
|
||||
"""Asynchronous query iterator."""
|
||||
@ -313,56 +324,54 @@ class SshecretBackend(BaseBackend):
|
||||
|
||||
return clients
|
||||
|
||||
async def get_client(self, name: str) -> Client | None:
|
||||
async def get_client(self, id_or_name: KeySpec) -> Client | None:
|
||||
"""Lookup a client on username."""
|
||||
path = f"/api/v1/clients/{name}"
|
||||
key = _key(id_or_name)
|
||||
path = f"/api/v1/clients/{key}"
|
||||
response = await self._get(path)
|
||||
if response.status_code == 404:
|
||||
return None
|
||||
client = Client.model_validate(response.json())
|
||||
return client
|
||||
|
||||
async def get_client_by_id(self, id: str) -> Client | None:
|
||||
"""Lookup a client on username."""
|
||||
path = f"/api/v1/clients/id/{id}"
|
||||
response = await self._get(path)
|
||||
if response.status_code == 404:
|
||||
return None
|
||||
client = Client.model_validate(response.json())
|
||||
return client
|
||||
|
||||
async def delete_client(self, client_name: str) -> None:
|
||||
async def delete_client(self, id_or_name: KeySpec) -> None:
|
||||
"""Delete a client."""
|
||||
path = f"/api/v1/clients/{client_name}"
|
||||
response = await self._delete(path)
|
||||
|
||||
async def delete_client_by_id(self, id: str) -> None:
|
||||
"""Delete a client."""
|
||||
path = f"/api/v1/clients/id/{id}"
|
||||
key = _key(id_or_name)
|
||||
path = f"/api/v1/clients/{key}"
|
||||
response = await self._delete(path)
|
||||
LOG.debug("response: %s", response.text)
|
||||
|
||||
async def create_client_secret(
|
||||
self, client_name: str, secret_name: str, encrypted_secret: str
|
||||
self, client_idname: KeySpec, secret_name: str, encrypted_secret: str
|
||||
) -> None:
|
||||
"""Create a secret.
|
||||
|
||||
This will overwrite any existing secret with that name.
|
||||
"""
|
||||
path = f"api/v1/clients/{client_name}/secrets/{secret_name}"
|
||||
client_key = _key(client_idname)
|
||||
path = f"api/v1/clients/{client_key}/secrets/{secret_name}"
|
||||
response = await self._put(path, json={"value": encrypted_secret})
|
||||
|
||||
async def get_client_secret(self, name: str, secret_name: str) -> str | None:
|
||||
async def get_client_secret(
|
||||
self, client_idname: KeySpec, secret_idname: KeySpec
|
||||
) -> str | None:
|
||||
"""Fetch a secret."""
|
||||
path = f"/api/v1/clients/{name}/secrets/{secret_name}"
|
||||
client_key = _key(client_idname)
|
||||
secret_key = _key(secret_idname)
|
||||
path = f"/api/v1/clients/{client_key}/secrets/{secret_key}"
|
||||
response = await self._get(path)
|
||||
if response.status_code == 404:
|
||||
return None
|
||||
secret = ClientSecret.model_validate(response.json())
|
||||
return secret.secret
|
||||
|
||||
async def delete_client_secret(self, client_name: str, secret_name: str) -> None:
|
||||
async def delete_client_secret(
|
||||
self, client_idname: KeySpec, secret_idname: KeySpec
|
||||
) -> None:
|
||||
"""Delete a secret from a client."""
|
||||
path = f"api/v1/clients/{client_name}/secrets/{secret_name}"
|
||||
client_key = _key(client_idname)
|
||||
secret_key = _key(secret_idname)
|
||||
path = f"api/v1/clients/{client_key}/secrets/{secret_key}"
|
||||
await self._delete(path)
|
||||
|
||||
async def update_client(self, client: Client) -> Client:
|
||||
@ -382,13 +391,14 @@ class SshecretBackend(BaseBackend):
|
||||
)
|
||||
return client
|
||||
|
||||
async def update_client_key(self, client_name: str, public_key: str) -> None:
|
||||
async def update_client_key(self, client_idname: KeySpec, public_key: str) -> None:
|
||||
"""Update the client key."""
|
||||
path = f"/api/v1/clients/{client_name}/public-key"
|
||||
client_key = _key(client_idname)
|
||||
path = f"/api/v1/clients/{client_key}/public-key"
|
||||
await self._post(path, json={"public_key": public_key})
|
||||
|
||||
async def update_client_sources(
|
||||
self, client_name: str, addresses: list[str] | None
|
||||
self, client_idname: KeySpec, addresses: list[str] | None
|
||||
) -> None:
|
||||
"""Update client source addresses.
|
||||
|
||||
@ -397,36 +407,32 @@ class SshecretBackend(BaseBackend):
|
||||
if not addresses:
|
||||
addresses = []
|
||||
|
||||
path = f"/api/v1/clients/{client_name}/policies/"
|
||||
client_key = _key(client_idname)
|
||||
path = f"/api/v1/clients/{client_key}/policies/"
|
||||
await self._put(path, json={"sources": addresses})
|
||||
|
||||
async def get_detailed_secrets(self) -> list[DetailedSecrets]:
|
||||
"""Get detailed list of secrets."""
|
||||
path = "/api/v1/secrets/detailed/"
|
||||
path = "/api/v1/secrets/"
|
||||
response = await self._get(path)
|
||||
|
||||
secret_list = TypeAdapter(list[DetailedSecrets])
|
||||
return secret_list.validate_python(response.json())
|
||||
|
||||
async def get_secrets(self) -> list[Secret]:
|
||||
"""Get Secrets.
|
||||
|
||||
This provides a list of secret names and which clients have them.
|
||||
"""
|
||||
"""Get detailed list of secrets."""
|
||||
path = "/api/v1/secrets/"
|
||||
response = await self._get(path)
|
||||
|
||||
secret_list = TypeAdapter(list[Secret])
|
||||
return secret_list.validate_python(response.json())
|
||||
detailed_secret_list = TypeAdapter(list[DetailedSecrets])
|
||||
detailed_secrets = detailed_secret_list.validate_python(response.json())
|
||||
# Convert to list of secrets
|
||||
secrets: list[Secret] = []
|
||||
for detail in detailed_secrets:
|
||||
clients = [ref.name for ref in detail.clients]
|
||||
secrets.append(Secret(name=detail.name, clients=clients))
|
||||
|
||||
async def get_secret(self, name: str) -> Secret | None:
|
||||
"""Get clients mapped to a single secret."""
|
||||
path = f"/api/v1/secrets/{name}"
|
||||
response = await self._get(path)
|
||||
if response.status_code == 404:
|
||||
return None
|
||||
|
||||
return Secret.model_validate(response.json())
|
||||
return secrets
|
||||
|
||||
async def get_detailed_secret(self, name: str) -> DetailedSecrets | None:
|
||||
"""Get clients mapped to a single secret."""
|
||||
@ -437,6 +443,16 @@ class SshecretBackend(BaseBackend):
|
||||
|
||||
return DetailedSecrets.model_validate(response.json())
|
||||
|
||||
async def get_secret(self, idname: KeySpec) -> DetailedSecrets | None:
|
||||
"""Get clients mapped to a single secret."""
|
||||
secret_key = _key(idname)
|
||||
path = f"/api/v1/secrets/{secret_key}"
|
||||
response = await self._get(path)
|
||||
if response.status_code == 404:
|
||||
return None
|
||||
|
||||
return DetailedSecrets.model_validate(response.json())
|
||||
|
||||
def audit(self, subsystem: SubSystem) -> AuditAPI:
|
||||
"""Create the audit API."""
|
||||
audit = AuditAPI(self._backend_url, self._api_token, subsystem)
|
||||
|
||||
@ -51,7 +51,7 @@ async def test_create_secret(backend_api: SshecretBackend) -> None:
|
||||
assert secret_to_client is not None
|
||||
|
||||
assert secret_to_client.name == "mysecret"
|
||||
assert "test" in secret_to_client.clients
|
||||
assert secret_to_client.clients[0].name == "test"
|
||||
|
||||
secret = await backend_api.get_client_secret("test", "mysecret")
|
||||
|
||||
|
||||
@ -3,8 +3,9 @@
|
||||
This essentially also tests parts of the admin API.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
from contextlib import asynccontextmanager
|
||||
from typing import AsyncIterator
|
||||
from collections.abc import AsyncIterator
|
||||
import os
|
||||
import httpx
|
||||
|
||||
@ -17,7 +18,33 @@ from .clients import create_test_client, ClientData
|
||||
from .types import CommandRunner, ProcessRunner
|
||||
|
||||
|
||||
class TestSshd:
|
||||
class BaseSSHTests:
|
||||
"""Base test class."""
|
||||
|
||||
async def register_client(
|
||||
self, name: str, ssh_session: ProcessRunner
|
||||
) -> ClientData:
|
||||
"""Register client."""
|
||||
test_client = create_test_client(name)
|
||||
async with ssh_session(test_client, "register") as session:
|
||||
maxlines = 10
|
||||
linenum = 0
|
||||
found = False
|
||||
while linenum < maxlines:
|
||||
line = await session.stdout.readline()
|
||||
if "Enter public key" in line:
|
||||
found = True
|
||||
break
|
||||
assert found is True
|
||||
session.stdin.write(test_client.public_key + "\n")
|
||||
|
||||
result = await session.stdout.read()
|
||||
assert "Key is valid. Registering client." in result
|
||||
await session.wait()
|
||||
return test_client
|
||||
|
||||
|
||||
class TestSshd(BaseSSHTests):
|
||||
"""Class based tests.
|
||||
|
||||
This allows us to create small helpers.
|
||||
@ -52,30 +79,9 @@ class TestSshd:
|
||||
client = clients[0]
|
||||
assert client.name == "new_client"
|
||||
|
||||
async def register_client(
|
||||
self, name: str, ssh_session: ProcessRunner
|
||||
) -> ClientData:
|
||||
"""Register client."""
|
||||
test_client = create_test_client(name)
|
||||
async with ssh_session(test_client, "register") as session:
|
||||
maxlines = 10
|
||||
linenum = 0
|
||||
found = False
|
||||
while linenum < maxlines:
|
||||
line = await session.stdout.readline()
|
||||
if "Enter public key" in line:
|
||||
found = True
|
||||
break
|
||||
assert found is True
|
||||
session.stdin.write(test_client.public_key + "\n")
|
||||
|
||||
result = await session.stdout.read()
|
||||
assert "Key is valid. Registering client." in result
|
||||
await session.wait()
|
||||
return test_client
|
||||
|
||||
|
||||
class TestSshdIntegration(TestSshd):
|
||||
class TestSshdIntegration(BaseSSHTests):
|
||||
"""Integration tests."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@ -137,3 +143,185 @@ class TestSshdIntegration(TestSshd):
|
||||
base_url=url, headers={"Authorization": f"Bearer {token}"}
|
||||
) as client:
|
||||
yield client
|
||||
|
||||
|
||||
class TestShelldriverCommands(BaseSSHTests):
|
||||
"""Shelldriver command tests."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_store_and_lookup(
|
||||
self,
|
||||
backend_server: tuple[str, tuple[str, str]],
|
||||
ssh_session: ProcessRunner,
|
||||
ssh_command_runner: CommandRunner,
|
||||
) -> None:
|
||||
"""Log in."""
|
||||
test_client = await self.register_client("myclient", ssh_session)
|
||||
ssh_output = await ssh_command_runner(test_client, "store mysecret secretvalue")
|
||||
assert bool(ssh_output.stderr) is False
|
||||
|
||||
# wait half a second
|
||||
await asyncio.sleep(0.5)
|
||||
|
||||
ssh_output = await ssh_command_runner(test_client, "lookup mysecret")
|
||||
assert bool(ssh_output.stderr) is False
|
||||
assert ssh_output.stdout is not None
|
||||
assert isinstance(ssh_output.stdout, str)
|
||||
encrypted = ssh_output.stdout.rstrip()
|
||||
decrypted = decode_string(encrypted, test_client.private_key)
|
||||
assert decrypted == "secretvalue"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_store_and_lookup_stdin(
|
||||
self,
|
||||
backend_server: tuple[str, tuple[str, str]],
|
||||
ssh_session: ProcessRunner,
|
||||
ssh_command_runner: CommandRunner,
|
||||
) -> None:
|
||||
"""Test store and lookup, with password specification in stdin."""
|
||||
test_client = await self.register_client("myclient", ssh_session)
|
||||
async with ssh_session(test_client, "store insecret") as session:
|
||||
session.stdin.write("testinput\n")
|
||||
session.stdin.write_eof()
|
||||
|
||||
await session.stdin.wait_closed()
|
||||
|
||||
await asyncio.sleep(0.5)
|
||||
ssh_output = await ssh_command_runner(test_client, "lookup insecret")
|
||||
assert bool(ssh_output.stderr) is False
|
||||
assert ssh_output.stdout is not None
|
||||
assert isinstance(ssh_output.stdout, str)
|
||||
encrypted = ssh_output.stdout.rstrip()
|
||||
decrypted = decode_string(encrypted, test_client.private_key)
|
||||
assert decrypted == "testinput"
|
||||
|
||||
@pytest.mark.parametrize("secret_name",["nonexistant", "blåbærgrød", "../../../etc/shadow"])
|
||||
@pytest.mark.asyncio
|
||||
async def test_invalid_lookup(
|
||||
self,
|
||||
secret_name: str,
|
||||
ssh_command_runner: CommandRunner,
|
||||
ssh_session: ProcessRunner,
|
||||
) -> None:
|
||||
"""Test lookup with invalid secret."""
|
||||
test_client = await self.register_client("myclient", ssh_session)
|
||||
ssh_output = await ssh_command_runner(test_client, f"lookup {secret_name}")
|
||||
assert isinstance(ssh_output.stderr, str)
|
||||
assert ssh_output.stderr.rstrip() == "no such secret"
|
||||
|
||||
|
||||
class TestShelldriverListCommand(BaseSSHTests):
|
||||
"""Tests for the list command."""
|
||||
|
||||
@pytest.fixture(name="secret_names")
|
||||
def get_secret_names(self) -> list[str]:
|
||||
"""Get secret names.
|
||||
|
||||
Sort of like a parametrize function.
|
||||
"""
|
||||
return ["foo", "bar", "abc123", "blåbærgrød"]
|
||||
|
||||
@pytest.fixture(name="test_client")
|
||||
@pytest.mark.asyncio
|
||||
async def create_test_client(
|
||||
self,
|
||||
ssh_session: ProcessRunner,
|
||||
) -> ClientData:
|
||||
"""Register a test client."""
|
||||
return await self.register_client("listclient", ssh_session)
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
@pytest.mark.asyncio
|
||||
async def create_data(
|
||||
self,
|
||||
secret_names: list[str],
|
||||
test_client: ClientData,
|
||||
ssh_command_runner: CommandRunner,
|
||||
) -> None:
|
||||
"""Create data for the test."""
|
||||
for name in secret_names:
|
||||
ssh_output = await ssh_command_runner(test_client, f"store {name} secretvalue")
|
||||
assert bool(ssh_output.stderr) is False
|
||||
assert ssh_output.stdout is not None
|
||||
assert isinstance(ssh_output.stdout, str)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_list_secrets(
|
||||
self,
|
||||
secret_names: list[str],
|
||||
test_client: ClientData,
|
||||
ssh_command_runner: CommandRunner,
|
||||
) -> None:
|
||||
"""Test list command."""
|
||||
ssh_output = await ssh_command_runner(test_client, "list")
|
||||
assert bool(ssh_output.stderr) is False
|
||||
assert ssh_output.stdout is not None
|
||||
assert isinstance(ssh_output.stdout, str)
|
||||
|
||||
list_output = ssh_output.stdout
|
||||
print(list_output)
|
||||
input_secret_names = list_output.splitlines()
|
||||
assert len(input_secret_names) == len(secret_names)
|
||||
|
||||
assert sorted(secret_names) == sorted(input_secret_names)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("secret_name", ["simple", "blåbærgrød"])
|
||||
class TestShelldriverDeleteCommand(BaseSSHTests):
|
||||
"""Tests for the list command."""
|
||||
|
||||
@pytest.fixture(name="test_client")
|
||||
@pytest.mark.asyncio
|
||||
async def create_test_client(
|
||||
self,
|
||||
ssh_session: ProcessRunner,
|
||||
) -> ClientData:
|
||||
"""Register a test client."""
|
||||
return await self.register_client("listclient", ssh_session)
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
@pytest.mark.asyncio
|
||||
async def create_data(
|
||||
self,
|
||||
secret_name: str,
|
||||
test_client: ClientData,
|
||||
ssh_command_runner: CommandRunner,
|
||||
) -> None:
|
||||
"""Create data for the test."""
|
||||
ssh_output = await ssh_command_runner(test_client, f"store {secret_name} secretvalue")
|
||||
assert bool(ssh_output.stderr) is False
|
||||
assert ssh_output.stdout is not None
|
||||
assert isinstance(ssh_output.stdout, str)
|
||||
|
||||
|
||||
async def get_stored_secrets(
|
||||
self,
|
||||
test_client: ClientData,
|
||||
ssh_command_runner: CommandRunner,
|
||||
) -> list[str]:
|
||||
"""Get stored secrets."""
|
||||
ssh_output = await ssh_command_runner(test_client, "list")
|
||||
assert bool(ssh_output.stderr) is False
|
||||
assert ssh_output.stdout is not None
|
||||
assert isinstance(ssh_output.stdout, str)
|
||||
|
||||
list_output = ssh_output.stdout
|
||||
print(list_output)
|
||||
input_secret_names = list_output.splitlines()
|
||||
return sorted(input_secret_names)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_delete_secret(
|
||||
self,
|
||||
secret_name: str,
|
||||
test_client: ClientData,
|
||||
ssh_command_runner: CommandRunner,
|
||||
) -> None:
|
||||
"""Delete secret."""
|
||||
current_secrets = await self.get_stored_secrets(test_client, ssh_command_runner)
|
||||
assert secret_name in current_secrets
|
||||
ssh_output = await ssh_command_runner(test_client, f"delete {secret_name}")
|
||||
assert bool(ssh_output.stderr) is False
|
||||
current_secrets = await self.get_stored_secrets(test_client, ssh_command_runner)
|
||||
assert secret_name not in current_secrets
|
||||
|
||||
@ -119,6 +119,9 @@ def test_delete_client(test_client: TestClient) -> None:
|
||||
resp = test_client.get("/api/v1/clients/test")
|
||||
assert resp.status_code == 404
|
||||
|
||||
resp = test_client.get("/api/v1/clients/")
|
||||
assert resp.status_code == 200
|
||||
|
||||
|
||||
def test_add_secret(test_client: TestClient) -> None:
|
||||
"""Test adding a secret to a client."""
|
||||
|
||||
Reference in New Issue
Block a user