Compare commits
10 Commits
3779e93b8c
...
0eaa913e35
| Author | SHA1 | Date | |
|---|---|---|---|
| 0eaa913e35 | |||
| 782ec19137 | |||
| 43d00cecb4 | |||
| d1fa6c0076 | |||
| 71d877022b | |||
| 36d04b8a33 | |||
| a834339c13 | |||
| fb6b76f7d8 | |||
| fed441743e | |||
| d86d9a9256 |
@ -5,7 +5,6 @@ import logging
|
|||||||
from typing import Annotated
|
from typing import Annotated
|
||||||
from fastapi import APIRouter, Depends, HTTPException, Query, Security, status
|
from fastapi import APIRouter, Depends, HTTPException, Query, Security, status
|
||||||
|
|
||||||
from sshecret.backend.models import Secret
|
|
||||||
from sshecret_admin.core.dependencies import AdminDependencies
|
from sshecret_admin.core.dependencies import AdminDependencies
|
||||||
from sshecret_admin.services import AdminBackend
|
from sshecret_admin.services import AdminBackend
|
||||||
from sshecret_admin.services.models import (
|
from sshecret_admin.services.models import (
|
||||||
@ -13,6 +12,7 @@ from sshecret_admin.services.models import (
|
|||||||
ClientSecretGroupList,
|
ClientSecretGroupList,
|
||||||
SecretCreate,
|
SecretCreate,
|
||||||
SecretGroupCreate,
|
SecretGroupCreate,
|
||||||
|
SecretListView,
|
||||||
SecretUpdate,
|
SecretUpdate,
|
||||||
SecretView,
|
SecretView,
|
||||||
)
|
)
|
||||||
@ -27,7 +27,7 @@ def create_router(dependencies: AdminDependencies) -> APIRouter:
|
|||||||
@app.get("/secrets/")
|
@app.get("/secrets/")
|
||||||
async def get_secret_names(
|
async def get_secret_names(
|
||||||
admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
|
admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
|
||||||
) -> list[Secret]:
|
) -> list[SecretListView]:
|
||||||
"""Get Secret Names."""
|
"""Get Secret Names."""
|
||||||
return await admin.get_secrets()
|
return await admin.get_secrets()
|
||||||
|
|
||||||
@ -94,7 +94,7 @@ def create_router(dependencies: AdminDependencies) -> APIRouter:
|
|||||||
raise HTTPException(
|
raise HTTPException(
|
||||||
status_code=status.HTTP_404_NOT_FOUND, detail="No such group."
|
status_code=status.HTTP_404_NOT_FOUND, detail="No such group."
|
||||||
)
|
)
|
||||||
return results[0]
|
return results.groups[0]
|
||||||
|
|
||||||
@app.post("/secrets/groups/")
|
@app.post("/secrets/groups/")
|
||||||
async def add_secret_group(
|
async def add_secret_group(
|
||||||
@ -125,7 +125,7 @@ def create_router(dependencies: AdminDependencies) -> APIRouter:
|
|||||||
Entries within the group will be moved to the root.
|
Entries within the group will be moved to the root.
|
||||||
This also includes nested entries further down from the group.
|
This also includes nested entries further down from the group.
|
||||||
"""
|
"""
|
||||||
group = await admin.get_secret_group(group.name)
|
group = await admin.get_secret_group(group_name)
|
||||||
if not group:
|
if not group:
|
||||||
return
|
return
|
||||||
await admin.delete_secret_group(group_name, keep_entries=True)
|
await admin.delete_secret_group(group_name, keep_entries=True)
|
||||||
@ -196,7 +196,7 @@ def create_router(dependencies: AdminDependencies) -> APIRouter:
|
|||||||
raise HTTPException(
|
raise HTTPException(
|
||||||
status_code=status.HTTP_404_NOT_FOUND, detail="No such group."
|
status_code=status.HTTP_404_NOT_FOUND, detail="No such group."
|
||||||
)
|
)
|
||||||
group = groups[0]
|
group = groups.groups[0]
|
||||||
matching_entries = [
|
matching_entries = [
|
||||||
entry for entry in group.entries if entry.name == secret_name
|
entry for entry in group.entries if entry.name == secret_name
|
||||||
]
|
]
|
||||||
|
|||||||
@ -4,6 +4,12 @@
|
|||||||
class="tree-entry-item"
|
class="tree-entry-item"
|
||||||
data-type="entry"
|
data-type="entry"
|
||||||
data-name="{{ entry.name }}"
|
data-name="{{ entry.name }}"
|
||||||
|
data-group-path="/"
|
||||||
|
{% if secret | default(false) %}
|
||||||
|
{% if secret.name == entry.name %}
|
||||||
|
selected=""
|
||||||
|
{% endif %}
|
||||||
|
{% endif %}
|
||||||
>
|
>
|
||||||
<sl-icon name="shield"> </sl-icon>
|
<sl-icon name="shield"> </sl-icon>
|
||||||
<span class="px-2">{{ entry.name }}</span>
|
<span class="px-2">{{ entry.name }}</span>
|
||||||
@ -16,6 +22,13 @@
|
|||||||
class="secret-group-list-item"
|
class="secret-group-list-item"
|
||||||
data-type="group"
|
data-type="group"
|
||||||
data-name="{{ group.group_name }}"
|
data-name="{{ group.group_name }}"
|
||||||
|
data-group-path="{{ group.path }}"
|
||||||
|
{% if group_path_nodes | default(false) %}
|
||||||
|
{% if group.group_name in group_path_nodes %}
|
||||||
|
expanded=""
|
||||||
|
{% endif %}
|
||||||
|
|
||||||
|
{% endif %}
|
||||||
>
|
>
|
||||||
<sl-icon name="folder"> </sl-icon>
|
<sl-icon name="folder"> </sl-icon>
|
||||||
<span class="px-2">{{ group.group_name }}</span>
|
<span class="px-2">{{ group.group_name }}</span>
|
||||||
@ -57,8 +70,8 @@
|
|||||||
<div class="p-4 bg-white border border-gray-200 rounded-lg shadow-sm sm:flex dark:border-gray-700 sm:p-6 dark:bg-gray-800" id="secret-tree">
|
<div class="p-4 bg-white border border-gray-200 rounded-lg shadow-sm sm:flex dark:border-gray-700 sm:p-6 dark:bg-gray-800" id="secret-tree">
|
||||||
|
|
||||||
|
|
||||||
<div class="flex flex-col">
|
<div class="flex flex-1 flex-col">
|
||||||
<div class="h-full">
|
<div class="h-full w-full">
|
||||||
<sl-tree class="tree-with-icons">
|
<sl-tree class="tree-with-icons">
|
||||||
<sl-tree-item
|
<sl-tree-item
|
||||||
id="secret-group-root-item"
|
id="secret-group-root-item"
|
||||||
@ -81,7 +94,22 @@
|
|||||||
|
|
||||||
</div>
|
</div>
|
||||||
<div class="2xl:col-span-2 xl:col-span-2 p-4 mb-4 bg-white border border-gray-200 rounded-lg shadow-sm 2xl:col-span-2 dark:border-gray-700 sm:p-6 dark:bg-gray-800">
|
<div class="2xl:col-span-2 xl:col-span-2 p-4 mb-4 bg-white border border-gray-200 rounded-lg shadow-sm 2xl:col-span-2 dark:border-gray-700 sm:p-6 dark:bg-gray-800">
|
||||||
{% include '/secrets/partials/default_detail.html.j2' %}
|
{% if group_page | default(false) %}
|
||||||
|
<div class="w-full" id="secretdetails">
|
||||||
|
{% include '/secrets/partials/group_detail.html.j2' %}
|
||||||
|
</div>
|
||||||
|
{% elif root_group_page | default(false) %}
|
||||||
|
<div class="w-full" id="secretdetails">
|
||||||
|
{% include '/secrets/partials/edit_root.html.j2' %}
|
||||||
|
</div>
|
||||||
|
{% elif secret_page | default(false) %}
|
||||||
|
<div class="w-full" id="secretdetails">
|
||||||
|
{% include '/secrets/partials/tree_detail.html.j2' %}
|
||||||
|
</div>
|
||||||
|
{% else %}
|
||||||
|
{% include '/secrets/partials/default_detail.html.j2' %}
|
||||||
|
{% endif %}
|
||||||
|
|
||||||
</div>
|
</div>
|
||||||
</div>
|
</div>
|
||||||
</div>
|
</div>
|
||||||
@ -99,17 +127,19 @@
|
|||||||
|
|
||||||
const type = selectedEl.dataset.type;
|
const type = selectedEl.dataset.type;
|
||||||
const name = selectedEl.dataset.name;
|
const name = selectedEl.dataset.name;
|
||||||
console.log(`Event on ${type} ${name}`);
|
const groupPath = selectedEl.dataset.groupPath;
|
||||||
|
console.log(`Event on ${type} ${name} path: ${groupPath}`);
|
||||||
|
|
||||||
if (!type || !name) return;
|
if (!type || !name) return;
|
||||||
|
|
||||||
let url = '';
|
let url = '';
|
||||||
if (type === 'entry') {
|
if (type === 'entry') {
|
||||||
url = `/secrets/partial/secret/${encodeURIComponent(name)}`;
|
url = `/secrets/secret/${encodeURIComponent(name)}`;
|
||||||
} else if (type === 'group') {
|
} else if (type === 'group') {
|
||||||
url = `/secrets/partial/group/${encodeURIComponent(name)}`;
|
//url = `/secrets/partial/group/${encodeURIComponent(name)}`;
|
||||||
|
url = `/secrets/group/${encodeURIComponent(groupPath)}`;
|
||||||
} else if (type == 'root') {
|
} else if (type == 'root') {
|
||||||
url = `/secrets/partial/root_group`;
|
url = `/secrets/group/`;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (url) {
|
if (url) {
|
||||||
|
|||||||
@ -3,6 +3,8 @@
|
|||||||
{% include '/secrets/partials/client_list_inner.html.j2' %}
|
{% include '/secrets/partials/client_list_inner.html.j2' %}
|
||||||
</ul>
|
</ul>
|
||||||
</div>
|
</div>
|
||||||
|
{% if secret.secret %}
|
||||||
<div class="w-full my-2" id="secretclientaction">
|
<div class="w-full my-2" id="secretclientaction">
|
||||||
{% include '/secrets/partials/client_assign_button.html.j2' %}
|
{% include '/secrets/partials/client_assign_button.html.j2' %}
|
||||||
|
{% endif %}
|
||||||
</div>
|
</div>
|
||||||
|
|||||||
@ -1,7 +1,9 @@
|
|||||||
<div class="w-full">
|
<div class="w-full">
|
||||||
<div class="mb-4">
|
<div class="mb-4">
|
||||||
<h3 class="text-xl font-semibold dark:text-white">Group {{name}}</h3>
|
<h3 class="text-xl font-semibold dark:text-white">Group {{name}}</h3>
|
||||||
<span class="text-sm text-gray-500 dark:text-gray-400">{{ description }}</span>
|
{% if description %}
|
||||||
|
<span class="text-sm text-gray-500 dark:text-gray-400">{{ description }}</span>
|
||||||
|
{% endif %}
|
||||||
</div>
|
</div>
|
||||||
|
|
||||||
<sl-details summary="Create secret">
|
<sl-details summary="Create secret">
|
||||||
|
|||||||
@ -1,5 +1,42 @@
|
|||||||
<div class="w-full" id="secretdetails">
|
<div class="w-full" id="secretdetails">
|
||||||
|
|
||||||
|
<!-- menu -->
|
||||||
|
|
||||||
|
<div class="flex justify-end px-4">
|
||||||
|
<button id="secret-menu-button" data-dropdown-toggle="secret-edit-menu" class="inline-block text-gray-500 dark:text-gray-400 hover:bg-gray-100 dark:hover:bg-gray-700 focus:ring-4 focus:outline-none focus:ring-gray-200 dark:focus:ring-gray-700 rounded-lg text-sm p-1.5" type="button">
|
||||||
|
<span class="sr-only">Open dropdown</span>
|
||||||
|
<svg class="w-5 h-5" aria-hidden="true" xmlns="http://www.w3.org/2000/svg" fill="currentColor" viewBox="0 0 16 3">
|
||||||
|
<path d="M2 0a1.5 1.5 0 1 1 0 3 1.5 1.5 0 0 1 0-3Zm6.041 0a1.5 1.5 0 1 1 0 3 1.5 1.5 0 0 1 0-3ZM14 0a1.5 1.5 0 1 1 0 3 1.5 1.5 0 0 1 0-3Z"/>
|
||||||
|
</svg>
|
||||||
|
</button>
|
||||||
|
<!-- Dropdown menu -->
|
||||||
|
<div id="secret-edit-menu" class="z-10 hidden text-base list-none bg-white divide-y divide-gray-100 rounded-lg shadow-sm w-44 dark:bg-gray-700">
|
||||||
|
<ul class="py-2" aria-labelledby="secret-menu-button">
|
||||||
|
<li>
|
||||||
|
<a
|
||||||
|
href="#"
|
||||||
|
class="block px-4 py-2 text-sm text-red-600 hover:bg-gray-100 dark:hover:bg-gray-600 dark:text-gray-200 dark:hover:text-white"
|
||||||
|
hx-delete="/secrets/{{secret.name}}"
|
||||||
|
hx-target="#secretdetails"
|
||||||
|
hx-swap="OuterHTML"
|
||||||
|
hx-indicator=".secret-spinner"
|
||||||
|
hx-confirm="Really delete this secret?"
|
||||||
|
>
|
||||||
|
Delete
|
||||||
|
</a>
|
||||||
|
</li>
|
||||||
|
</ul>
|
||||||
|
</div>
|
||||||
|
</div>
|
||||||
|
|
||||||
|
|
||||||
<h3 class="mb-4 text-xl font-semibold dark:text-white">{{secret.name}}</h3>
|
<h3 class="mb-4 text-xl font-semibold dark:text-white">{{secret.name}}</h3>
|
||||||
|
{% if secret.description %}
|
||||||
|
<span class="text-sm text-gray-500 dark:text-gray-400">{{ secret.description }}</span>
|
||||||
|
{% endif %}
|
||||||
|
{% if not secret.secret %}
|
||||||
|
<p class="text-sm text-gray-500 dark:text-gray-400 italic">This secret was created outside of sshecret-admin. It cannot be decrypted, and therefore fewer options are available here.</p>
|
||||||
|
{% endif %}
|
||||||
<div class="htmx-indicator secret-spinner">
|
<div class="htmx-indicator secret-spinner">
|
||||||
<div role="status">
|
<div role="status">
|
||||||
<svg aria-hidden="true" class="inline w-6 h-6 text-gray-200 animate-spin dark:text-gray-600 fill-blue-600" viewBox="0 0 100 101" fill="none" xmlns="http://www.w3.org/2000/svg">
|
<svg aria-hidden="true" class="inline w-6 h-6 text-gray-200 animate-spin dark:text-gray-600 fill-blue-600" viewBox="0 0 100 101" fill="none" xmlns="http://www.w3.org/2000/svg">
|
||||||
@ -15,6 +52,7 @@
|
|||||||
{% include '/secrets/partials/client_secret_details.html.j2' %}
|
{% include '/secrets/partials/client_secret_details.html.j2' %}
|
||||||
</div>
|
</div>
|
||||||
</sl-details>
|
</sl-details>
|
||||||
|
{% if secret.secret %}
|
||||||
<sl-details summary="Read/Update Secret">
|
<sl-details summary="Read/Update Secret">
|
||||||
<div id="secretvalue">
|
<div id="secretvalue">
|
||||||
<div class="mb-6">
|
<div class="mb-6">
|
||||||
@ -72,6 +110,7 @@
|
|||||||
</form>
|
</form>
|
||||||
</sl-details>
|
</sl-details>
|
||||||
{% endif %}
|
{% endif %}
|
||||||
|
{% endif %}
|
||||||
<sl-details summary="Events">
|
<sl-details summary="Events">
|
||||||
<table class="min-w-full divide-y divide-gray-200 dark:divide-gray-600" id="last-audit-events">
|
<table class="min-w-full divide-y divide-gray-200 dark:divide-gray-600" id="last-audit-events">
|
||||||
<thead class="bg-gray-50 dark:bg-gray-700">
|
<thead class="bg-gray-50 dark:bg-gray-700">
|
||||||
|
|||||||
@ -88,7 +88,7 @@ def create_router(dependencies: FrontendDependencies) -> APIRouter:
|
|||||||
client: Annotated[ClientUpdate, Form()],
|
client: Annotated[ClientUpdate, Form()],
|
||||||
):
|
):
|
||||||
"""Update a client."""
|
"""Update a client."""
|
||||||
original_client = await admin.get_client(id)
|
original_client = await admin.get_client(("id", id))
|
||||||
if not original_client:
|
if not original_client:
|
||||||
return templates.TemplateResponse(
|
return templates.TemplateResponse(
|
||||||
request, "fragments/error.html", {"message": "Client not found"}
|
request, "fragments/error.html", {"message": "Client not found"}
|
||||||
@ -131,7 +131,7 @@ def create_router(dependencies: FrontendDependencies) -> APIRouter:
|
|||||||
admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
|
admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
|
||||||
) -> Response:
|
) -> Response:
|
||||||
"""Delete a client."""
|
"""Delete a client."""
|
||||||
await admin.delete_client(id)
|
await admin.delete_client(("id", id))
|
||||||
clients = await admin.get_clients()
|
clients = await admin.get_clients()
|
||||||
headers = {"Hx-Refresh": "true"}
|
headers = {"Hx-Refresh": "true"}
|
||||||
return templates.TemplateResponse(
|
return templates.TemplateResponse(
|
||||||
|
|||||||
@ -64,7 +64,7 @@ def create_router(dependencies: FrontendDependencies) -> APIRouter:
|
|||||||
current_user: Annotated[LocalUserInfo, Depends(dependencies.get_user_info)],
|
current_user: Annotated[LocalUserInfo, Depends(dependencies.get_user_info)],
|
||||||
):
|
):
|
||||||
groups = await admin.get_secret_groups()
|
groups = await admin.get_secret_groups()
|
||||||
LOG.info("Groups: %r", groups)
|
LOG.info("Groups: %s", groups.model_dump_json(indent=2))
|
||||||
return templates.TemplateResponse(
|
return templates.TemplateResponse(
|
||||||
request,
|
request,
|
||||||
"secrets/index.html.j2",
|
"secrets/index.html.j2",
|
||||||
@ -74,28 +74,121 @@ def create_router(dependencies: FrontendDependencies) -> APIRouter:
|
|||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
@app.get("/secrets/partial/root_group")
|
# @app.get("/secrets/partial/root_group")
|
||||||
async def get_root_group(
|
# async def get_root_group(
|
||||||
|
# request: Request,
|
||||||
|
# admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
|
||||||
|
# ):
|
||||||
|
# """Get root group."""
|
||||||
|
# clients = await admin.get_clients()
|
||||||
|
# return templates.TemplateResponse(
|
||||||
|
# request,
|
||||||
|
# "secrets/partials/edit_root.html.j2",
|
||||||
|
# {
|
||||||
|
# "group_path_nodes": [],
|
||||||
|
# "clients": clients,
|
||||||
|
# },
|
||||||
|
# )
|
||||||
|
|
||||||
|
# @app.get("/secrets/partial/secret/{name}")
|
||||||
|
# async def get_secret_tree_detail_partial(
|
||||||
|
# request: Request,
|
||||||
|
# name: str,
|
||||||
|
# admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
|
||||||
|
# ):
|
||||||
|
# """Get partial secret detail."""
|
||||||
|
# secret = await admin.get_secret(name)
|
||||||
|
# groups = await admin.get_secret_groups(flat=True)
|
||||||
|
# events = await admin.get_audit_log_detailed(limit=10, secret_name=name)
|
||||||
|
|
||||||
|
# if not secret:
|
||||||
|
# raise HTTPException(
|
||||||
|
# status_code=status.HTTP_404_NOT_FOUND, detail="Secret not found"
|
||||||
|
# )
|
||||||
|
# return templates.TemplateResponse(
|
||||||
|
# request,
|
||||||
|
# "secrets/partials/tree_detail.html.j2",
|
||||||
|
# {
|
||||||
|
# "secret": secret,
|
||||||
|
# "groups": groups,
|
||||||
|
# "events": events,
|
||||||
|
# },
|
||||||
|
# )
|
||||||
|
|
||||||
|
@app.get("/secrets/group/")
|
||||||
|
async def show_root_group(
|
||||||
request: Request,
|
request: Request,
|
||||||
admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
|
admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
|
||||||
|
current_user: Annotated[LocalUserInfo, Depends(dependencies.get_user_info)],
|
||||||
):
|
):
|
||||||
"""Get root group."""
|
"""Show the root path."""
|
||||||
clients = await admin.get_clients()
|
clients = await admin.get_clients()
|
||||||
|
context: dict[str, Any] = {
|
||||||
|
"clients": clients,
|
||||||
|
"root_group_page": True,
|
||||||
|
}
|
||||||
|
headers: dict[str, str] = {}
|
||||||
|
if request.headers.get("HX-Request"):
|
||||||
|
# This is a HTMX request.
|
||||||
|
template_name = "secrets/partials/edit_root.html.j2"
|
||||||
|
headers["HX-Push-Url"] = request.url.path
|
||||||
|
else:
|
||||||
|
groups = await admin.get_secret_groups()
|
||||||
|
template_name = "secrets/index.html.j2"
|
||||||
|
context["user"] = current_user
|
||||||
|
context["groups"] = groups
|
||||||
|
context["group_path_nodes"] = ["/"]
|
||||||
|
|
||||||
return templates.TemplateResponse(
|
return templates.TemplateResponse(
|
||||||
request,
|
request, template_name, context, headers=headers
|
||||||
"secrets/partials/edit_root.html.j2",
|
|
||||||
{
|
|
||||||
"clients": clients,
|
|
||||||
},
|
|
||||||
)
|
)
|
||||||
|
|
||||||
@app.get("/secrets/partial/secret/{name}")
|
@app.get("/secrets/group/{group_path:path}")
|
||||||
|
async def show_group(
|
||||||
|
request: Request,
|
||||||
|
group_path: str,
|
||||||
|
admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
|
||||||
|
current_user: Annotated[LocalUserInfo, Depends(dependencies.get_user_info)],
|
||||||
|
):
|
||||||
|
"""Show a group."""
|
||||||
|
group = await admin.get_secret_group_by_path(group_path)
|
||||||
|
if not group:
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=status.HTTP_404_NOT_FOUND, detail="Group not found"
|
||||||
|
)
|
||||||
|
clients = await admin.get_clients()
|
||||||
|
|
||||||
|
headers: dict[str, str] = {}
|
||||||
|
context: dict[str, Any] = {
|
||||||
|
"group_page": True,
|
||||||
|
"name": group.group_name,
|
||||||
|
"description": group.description,
|
||||||
|
"clients": clients,
|
||||||
|
}
|
||||||
|
if request.headers.get("HX-Request"):
|
||||||
|
# This is a HTMX request.
|
||||||
|
template_name = "secrets/partials/group_detail.html.j2"
|
||||||
|
headers["HX-Push-Url"] = request.url.path
|
||||||
|
else:
|
||||||
|
template_name = "secrets/index.html.j2"
|
||||||
|
|
||||||
|
groups = await admin.get_secret_groups()
|
||||||
|
context["user"] = current_user
|
||||||
|
context["groups"] = groups
|
||||||
|
context["group_path_nodes"] = group.path.split("/")
|
||||||
|
|
||||||
|
return templates.TemplateResponse(
|
||||||
|
request, template_name, context, headers=headers
|
||||||
|
)
|
||||||
|
|
||||||
|
@app.get("/secrets/secret/{name}")
|
||||||
async def get_secret_tree_detail(
|
async def get_secret_tree_detail(
|
||||||
request: Request,
|
request: Request,
|
||||||
name: str,
|
name: str,
|
||||||
admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
|
admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
|
||||||
|
current_user: Annotated[LocalUserInfo, Depends(dependencies.get_user_info)],
|
||||||
):
|
):
|
||||||
"""Get partial secret detail."""
|
"""Get secret detail."""
|
||||||
secret = await admin.get_secret(name)
|
secret = await admin.get_secret(name)
|
||||||
groups = await admin.get_secret_groups(flat=True)
|
groups = await admin.get_secret_groups(flat=True)
|
||||||
events = await admin.get_audit_log_detailed(limit=10, secret_name=name)
|
events = await admin.get_audit_log_detailed(limit=10, secret_name=name)
|
||||||
@ -104,14 +197,34 @@ def create_router(dependencies: FrontendDependencies) -> APIRouter:
|
|||||||
raise HTTPException(
|
raise HTTPException(
|
||||||
status_code=status.HTTP_404_NOT_FOUND, detail="Secret not found"
|
status_code=status.HTTP_404_NOT_FOUND, detail="Secret not found"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
context: dict[str, Any] = {
|
||||||
|
"secret": secret,
|
||||||
|
"groups": groups,
|
||||||
|
"events": events,
|
||||||
|
"secret_page": True,
|
||||||
|
}
|
||||||
|
|
||||||
|
headers: dict[str, str] = {}
|
||||||
|
|
||||||
|
if request.headers.get("HX-Request"):
|
||||||
|
# This is a HTMX request.
|
||||||
|
template_name = "secrets/partials/tree_detail.html.j2"
|
||||||
|
headers["HX-Push-Url"] = request.url.path
|
||||||
|
else:
|
||||||
|
group_path = ["/"]
|
||||||
|
if secret.group:
|
||||||
|
group = await admin.get_secret_group(secret.group)
|
||||||
|
if group:
|
||||||
|
group_path = group.path.split("/")
|
||||||
|
|
||||||
|
template_name = "secrets/index.html.j2"
|
||||||
|
context["user"] = current_user
|
||||||
|
context["groups"] = groups
|
||||||
|
context["group_path_nodes"] = group_path
|
||||||
|
|
||||||
return templates.TemplateResponse(
|
return templates.TemplateResponse(
|
||||||
request,
|
request, template_name, context, headers=headers
|
||||||
"secrets/partials/tree_detail.html.j2",
|
|
||||||
{
|
|
||||||
"secret": secret,
|
|
||||||
"groups": groups,
|
|
||||||
"events": events,
|
|
||||||
},
|
|
||||||
)
|
)
|
||||||
|
|
||||||
@app.get("/secrets/partial/group/{name}")
|
@app.get("/secrets/partial/group/{name}")
|
||||||
@ -461,7 +574,7 @@ def create_router(dependencies: FrontendDependencies) -> APIRouter:
|
|||||||
admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
|
admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
|
||||||
):
|
):
|
||||||
"""Add a secret to a client."""
|
"""Add a secret to a client."""
|
||||||
await admin.create_client_secret(client, name)
|
await admin.create_client_secret(("id", client), name)
|
||||||
secret = await admin.get_secret(name)
|
secret = await admin.get_secret(name)
|
||||||
if not secret:
|
if not secret:
|
||||||
raise HTTPException(
|
raise HTTPException(
|
||||||
@ -478,26 +591,20 @@ def create_router(dependencies: FrontendDependencies) -> APIRouter:
|
|||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
# @app.delete("/secrets/{name}")
|
@app.delete("/secrets/{name}")
|
||||||
# async def delete_secret(
|
async def delete_secret(
|
||||||
# request: Request,
|
request: Request,
|
||||||
# name: str,
|
name: str,
|
||||||
# admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
|
admin: Annotated[AdminBackend, Depends(dependencies.get_admin_backend)],
|
||||||
# ):
|
):
|
||||||
# """Delete a secret."""
|
"""Delete a secret."""
|
||||||
# await admin.delete_secret(name)
|
await admin.delete_secret(name)
|
||||||
# clients = await admin.get_clients()
|
headers = {"Hx-Refresh": "true"}
|
||||||
# secrets = await admin.get_detailed_secrets()
|
|
||||||
# headers = {"Hx-Refresh": "true"}
|
|
||||||
|
|
||||||
# return templates.TemplateResponse(
|
return templates.TemplateResponse(
|
||||||
# request,
|
request,
|
||||||
# "secrets/inner.html.j2",
|
"secrets/partials/default_detail.html.j2",
|
||||||
# {
|
headers=headers,
|
||||||
# "clients": clients,
|
)
|
||||||
# "secrets": secrets,
|
|
||||||
# },
|
|
||||||
# headers=headers,
|
|
||||||
# )
|
|
||||||
|
|
||||||
return app
|
return app
|
||||||
|
|||||||
@ -12,13 +12,12 @@ from sshecret.backend import (
|
|||||||
AuditListResult,
|
AuditListResult,
|
||||||
Client,
|
Client,
|
||||||
ClientFilter,
|
ClientFilter,
|
||||||
Secret,
|
|
||||||
SshecretBackend,
|
SshecretBackend,
|
||||||
Operation,
|
Operation,
|
||||||
SubSystem,
|
SubSystem,
|
||||||
)
|
)
|
||||||
from sshecret.backend.models import DetailedSecrets
|
from sshecret.backend.models import DetailedSecrets
|
||||||
from sshecret.backend.api import AuditAPI
|
from sshecret.backend.api import AuditAPI, KeySpec
|
||||||
from sshecret.crypto import encrypt_string, load_public_key
|
from sshecret.crypto import encrypt_string, load_public_key
|
||||||
|
|
||||||
from .keepass import PasswordContext, load_password_manager
|
from .keepass import PasswordContext, load_password_manager
|
||||||
@ -27,6 +26,7 @@ from .models import (
|
|||||||
ClientSecretGroup,
|
ClientSecretGroup,
|
||||||
ClientSecretGroupList,
|
ClientSecretGroupList,
|
||||||
SecretClientMapping,
|
SecretClientMapping,
|
||||||
|
SecretListView,
|
||||||
SecretGroup,
|
SecretGroup,
|
||||||
SecretView,
|
SecretView,
|
||||||
)
|
)
|
||||||
@ -113,13 +113,13 @@ class AdminBackend:
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
raise BackendUnavailableError() from e
|
raise BackendUnavailableError() from e
|
||||||
|
|
||||||
async def _get_client(self, name: str) -> Client | None:
|
async def _get_client(self, idname: KeySpec) -> Client | None:
|
||||||
"""Get a client from the backend."""
|
"""Get a client from the backend."""
|
||||||
return await self.backend.get_client(name)
|
return await self.backend.get_client(idname)
|
||||||
|
|
||||||
async def _verify_client_exists(self, name: str) -> None:
|
async def _verify_client_exists(self, idname: KeySpec) -> None:
|
||||||
"""Check that a client exists."""
|
"""Check that a client exists."""
|
||||||
client = await self.backend.get_client(name)
|
client = await self.backend.get_client(idname)
|
||||||
if not client:
|
if not client:
|
||||||
raise ClientNotFoundError()
|
raise ClientNotFoundError()
|
||||||
return None
|
return None
|
||||||
@ -133,7 +133,7 @@ class AdminBackend:
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
raise BackendUnavailableError() from e
|
raise BackendUnavailableError() from e
|
||||||
|
|
||||||
async def get_client(self, name: str) -> Client | None:
|
async def get_client(self, name: KeySpec) -> Client | None:
|
||||||
"""Get a client from the backend."""
|
"""Get a client from the backend."""
|
||||||
try:
|
try:
|
||||||
return await self._get_client(name)
|
return await self._get_client(name)
|
||||||
@ -176,7 +176,10 @@ class AdminBackend:
|
|||||||
raise BackendUnavailableError() from e
|
raise BackendUnavailableError() from e
|
||||||
|
|
||||||
async def _update_client_public_key(
|
async def _update_client_public_key(
|
||||||
self, name: str, new_key: str, password_manager: PasswordContext
|
self,
|
||||||
|
name: KeySpec,
|
||||||
|
new_key: str,
|
||||||
|
password_manager: PasswordContext,
|
||||||
) -> list[str]:
|
) -> list[str]:
|
||||||
"""Update client public key."""
|
"""Update client public key."""
|
||||||
LOG.info(
|
LOG.info(
|
||||||
@ -203,7 +206,7 @@ class AdminBackend:
|
|||||||
|
|
||||||
return updated_secrets
|
return updated_secrets
|
||||||
|
|
||||||
async def update_client_public_key(self, name: str, new_key: str) -> list[str]:
|
async def update_client_public_key(self, name: KeySpec, new_key: str) -> list[str]:
|
||||||
"""Update client public key."""
|
"""Update client public key."""
|
||||||
try:
|
try:
|
||||||
with self.password_manager() as password_manager:
|
with self.password_manager() as password_manager:
|
||||||
@ -238,18 +241,18 @@ class AdminBackend:
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
raise BackendUnavailableError() from e
|
raise BackendUnavailableError() from e
|
||||||
|
|
||||||
async def update_client_sources(self, name: str, sources: list[str]) -> None:
|
async def update_client_sources(self, name: KeySpec, sources: list[str]) -> None:
|
||||||
"""Update client sources."""
|
"""Update client sources."""
|
||||||
try:
|
try:
|
||||||
await self.backend.update_client_sources(name, sources)
|
await self.backend.update_client_sources(name, sources)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
raise BackendUnavailableError() from e
|
raise BackendUnavailableError() from e
|
||||||
|
|
||||||
async def _delete_client(self, name: str) -> None:
|
async def _delete_client(self, name: KeySpec) -> None:
|
||||||
"""Delete client."""
|
"""Delete client."""
|
||||||
await self.backend.delete_client(name)
|
await self.backend.delete_client(name)
|
||||||
|
|
||||||
async def delete_client(self, name: str) -> None:
|
async def delete_client(self, name: KeySpec) -> None:
|
||||||
"""Delete client."""
|
"""Delete client."""
|
||||||
try:
|
try:
|
||||||
await self._delete_client(name)
|
await self._delete_client(name)
|
||||||
@ -258,30 +261,41 @@ class AdminBackend:
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
raise BackendUnavailableError() from e
|
raise BackendUnavailableError() from e
|
||||||
|
|
||||||
async def delete_client_secret(self, client_name: str, secret_name: str) -> None:
|
async def delete_client_secret(
|
||||||
|
self, client_name: KeySpec, secret_name: KeySpec
|
||||||
|
) -> None:
|
||||||
"""Delete a secret from a client."""
|
"""Delete a secret from a client."""
|
||||||
try:
|
try:
|
||||||
await self.backend.delete_client_secret(client_name, secret_name)
|
await self.backend.delete_client_secret(client_name, secret_name)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
raise BackendUnavailableError() from e
|
raise BackendUnavailableError() from e
|
||||||
|
|
||||||
async def _get_secrets(self) -> list[Secret]:
|
async def _get_secrets(self) -> list[SecretListView]:
|
||||||
"""Get secrets.
|
"""Get secrets.
|
||||||
|
|
||||||
This fetches the secret to client mapping from backend, and adds secrets from the password manager.
|
This fetches the secret to client mapping from backend, and adds secrets from the password manager.
|
||||||
"""
|
"""
|
||||||
|
backend_secrets = await self.backend.get_secrets()
|
||||||
with self.password_manager() as password_manager:
|
with self.password_manager() as password_manager:
|
||||||
all_secrets = password_manager.get_available_secrets()
|
admin_secrets = password_manager.get_available_secrets()
|
||||||
|
|
||||||
secrets = await self.backend.get_secrets()
|
secrets: dict[str, SecretListView] = {}
|
||||||
backend_secret_names = [secret.name for secret in secrets]
|
for secret in backend_secrets:
|
||||||
for secret in all_secrets:
|
secrets[secret.name] = SecretListView(
|
||||||
if secret not in backend_secret_names:
|
name=secret.name, unmanaged=True, clients=secret.clients
|
||||||
secrets.append(Secret(name=secret, clients=[]))
|
)
|
||||||
|
|
||||||
return secrets
|
for secret_name in admin_secrets:
|
||||||
|
if secret_name in secrets:
|
||||||
|
secrets[secret_name].unmanaged = False
|
||||||
|
continue
|
||||||
|
secrets[secret_name] = SecretListView(
|
||||||
|
name=secret_name, unmanaged=False, clients=[]
|
||||||
|
)
|
||||||
|
|
||||||
async def get_secrets(self) -> list[Secret]:
|
return list(secrets.values())
|
||||||
|
|
||||||
|
async def get_secrets(self) -> list[SecretListView]:
|
||||||
"""Get secrets from backend."""
|
"""Get secrets from backend."""
|
||||||
try:
|
try:
|
||||||
return await self._get_secrets()
|
return await self._get_secrets()
|
||||||
@ -381,6 +395,8 @@ class AdminBackend:
|
|||||||
)
|
)
|
||||||
ungrouped = password_manager.get_ungrouped_secrets()
|
ungrouped = password_manager.get_ungrouped_secrets()
|
||||||
|
|
||||||
|
all_admin_secrets = password_manager.get_available_secrets()
|
||||||
|
|
||||||
group_result: list[ClientSecretGroup] = []
|
group_result: list[ClientSecretGroup] = []
|
||||||
for group in all_groups:
|
for group in all_groups:
|
||||||
# We have to do this recursively.
|
# We have to do this recursively.
|
||||||
@ -397,7 +413,19 @@ class AdminBackend:
|
|||||||
mapping.clients = client_mapping.clients
|
mapping.clients = client_mapping.clients
|
||||||
ungrouped_clients.append(mapping)
|
ungrouped_clients.append(mapping)
|
||||||
|
|
||||||
|
# We need to process unmanaged secrets too.
|
||||||
|
unmanaged_secrets = [
|
||||||
|
secret for secret in all_secrets if secret.name not in all_admin_secrets
|
||||||
|
]
|
||||||
|
for secret in unmanaged_secrets:
|
||||||
|
ungrouped_clients.append(
|
||||||
|
SecretClientMapping(
|
||||||
|
name=secret.name, unmanaged=True, clients=secret.clients
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
result.ungrouped = ungrouped_clients
|
result.ungrouped = ungrouped_clients
|
||||||
|
|
||||||
return result
|
return result
|
||||||
|
|
||||||
async def get_secret_group(self, name: str) -> ClientSecretGroup | None:
|
async def get_secret_group(self, name: str) -> ClientSecretGroup | None:
|
||||||
@ -407,6 +435,18 @@ class AdminBackend:
|
|||||||
return matches.groups[0]
|
return matches.groups[0]
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
async def get_secret_group_by_path(self, path: str) -> ClientSecretGroup | None:
|
||||||
|
"""Get a group based on its path."""
|
||||||
|
with self.password_manager() as password_manager:
|
||||||
|
secret_group = password_manager.get_secret_group(path)
|
||||||
|
|
||||||
|
if not secret_group:
|
||||||
|
return None
|
||||||
|
|
||||||
|
all_secrets = await self.backend.get_detailed_secrets()
|
||||||
|
secrets_mapping = {secret.name: secret for secret in all_secrets}
|
||||||
|
return add_clients_to_secret_group(secret_group, secrets_mapping)
|
||||||
|
|
||||||
async def get_secret(self, name: str) -> SecretView | None:
|
async def get_secret(self, name: str) -> SecretView | None:
|
||||||
"""Get secrets from backend."""
|
"""Get secrets from backend."""
|
||||||
try:
|
try:
|
||||||
@ -416,18 +456,24 @@ class AdminBackend:
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
raise BackendUnavailableError() from e
|
raise BackendUnavailableError() from e
|
||||||
|
|
||||||
async def _get_secret(self, name: str) -> SecretView | None:
|
async def _get_secret(
|
||||||
|
self, name: str, secret_id: str | None = None
|
||||||
|
) -> SecretView | None:
|
||||||
"""Get a secret, including the actual unencrypted value and clients."""
|
"""Get a secret, including the actual unencrypted value and clients."""
|
||||||
|
secret: str | None = None
|
||||||
with self.password_manager() as password_manager:
|
with self.password_manager() as password_manager:
|
||||||
secret = password_manager.get_secret(name)
|
secret = password_manager.get_secret(name)
|
||||||
secret_group = password_manager.get_entry_group(name)
|
secret_group = password_manager.get_entry_group(name)
|
||||||
|
|
||||||
if not secret:
|
|
||||||
return None
|
|
||||||
secret_view = SecretView(name=name, secret=secret, group=secret_group)
|
secret_view = SecretView(name=name, secret=secret, group=secret_group)
|
||||||
secret_mapping = await self.backend.get_secret(name)
|
|
||||||
|
idname: KeySpec = name
|
||||||
|
if secret_id:
|
||||||
|
idname = ("id", secret_id)
|
||||||
|
|
||||||
|
secret_mapping = await self.backend.get_secret(idname)
|
||||||
if secret_mapping:
|
if secret_mapping:
|
||||||
secret_view.clients = secret_mapping.clients
|
secret_view.clients = [ref.name for ref in secret_mapping.clients]
|
||||||
|
|
||||||
return secret_view
|
return secret_view
|
||||||
|
|
||||||
@ -450,7 +496,7 @@ class AdminBackend:
|
|||||||
return
|
return
|
||||||
for client in secret_mapping.clients:
|
for client in secret_mapping.clients:
|
||||||
LOG.info("Deleting secret %s from client %s", name, client)
|
LOG.info("Deleting secret %s from client %s", name, client)
|
||||||
await self.backend.delete_client_secret(client, name)
|
await self.backend.delete_client_secret(("id", client.id), name)
|
||||||
|
|
||||||
async def _add_secret(
|
async def _add_secret(
|
||||||
self,
|
self,
|
||||||
@ -467,7 +513,7 @@ class AdminBackend:
|
|||||||
if update:
|
if update:
|
||||||
secret_map = await self.backend.get_secret(name)
|
secret_map = await self.backend.get_secret(name)
|
||||||
if secret_map:
|
if secret_map:
|
||||||
clients = secret_map.clients
|
clients = [ref.name for ref in secret_map.clients]
|
||||||
|
|
||||||
if not clients:
|
if not clients:
|
||||||
return
|
return
|
||||||
@ -507,11 +553,13 @@ class AdminBackend:
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
raise BackendUnavailableError() from e
|
raise BackendUnavailableError() from e
|
||||||
|
|
||||||
async def _create_client_secret(self, client_name: str, secret_name: str) -> None:
|
async def _create_client_secret(
|
||||||
|
self, client_idname: KeySpec, secret_name: str
|
||||||
|
) -> None:
|
||||||
"""Create client secret."""
|
"""Create client secret."""
|
||||||
client = await self.get_client(client_name)
|
client = await self.get_client(client_idname)
|
||||||
if not client:
|
if not client:
|
||||||
raise ClientNotFoundError()
|
raise ClientNotFoundError(client_idname)
|
||||||
|
|
||||||
with self.password_manager() as password_manager:
|
with self.password_manager() as password_manager:
|
||||||
secret = password_manager.get_secret(secret_name)
|
secret = password_manager.get_secret(secret_name)
|
||||||
@ -520,12 +568,14 @@ class AdminBackend:
|
|||||||
|
|
||||||
public_key = load_public_key(client.public_key.encode())
|
public_key = load_public_key(client.public_key.encode())
|
||||||
encrypted = encrypt_string(secret, public_key)
|
encrypted = encrypt_string(secret, public_key)
|
||||||
await self.backend.create_client_secret(client_name, secret_name, encrypted)
|
await self.backend.create_client_secret(client_idname, secret_name, encrypted)
|
||||||
|
|
||||||
async def create_client_secret(self, client_name: str, secret_name: str) -> None:
|
async def create_client_secret(
|
||||||
|
self, client_idname: KeySpec, secret_name: str
|
||||||
|
) -> None:
|
||||||
"""Create client secret."""
|
"""Create client secret."""
|
||||||
try:
|
try:
|
||||||
await self._create_client_secret(client_name, secret_name)
|
await self._create_client_secret(client_idname, secret_name)
|
||||||
except ClientManagementError:
|
except ClientManagementError:
|
||||||
raise
|
raise
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|||||||
@ -32,7 +32,9 @@ def create_password_db(location: Path, password: str) -> None:
|
|||||||
|
|
||||||
|
|
||||||
def _kp_group_to_secret_group(
|
def _kp_group_to_secret_group(
|
||||||
kp_group: pykeepass.group.Group, parent: SecretGroup | None = None, depth: int | None = None
|
kp_group: pykeepass.group.Group,
|
||||||
|
parent: SecretGroup | None = None,
|
||||||
|
depth: int | None = None,
|
||||||
) -> SecretGroup:
|
) -> SecretGroup:
|
||||||
"""Convert keepass group to secret group dataclass."""
|
"""Convert keepass group to secret group dataclass."""
|
||||||
group_name = cast(str, kp_group.name)
|
group_name = cast(str, kp_group.name)
|
||||||
@ -143,8 +145,9 @@ class PasswordContext:
|
|||||||
return None
|
return None
|
||||||
return str(entry.group.name)
|
return str(entry.group.name)
|
||||||
|
|
||||||
|
def get_secret_groups(
|
||||||
def get_secret_groups(self, pattern: str | None = None, regex: bool = True) -> list[SecretGroup]:
|
self, pattern: str | None = None, regex: bool = True
|
||||||
|
) -> list[SecretGroup]:
|
||||||
"""Get secret groups.
|
"""Get secret groups.
|
||||||
|
|
||||||
A regex pattern may be provided to filter groups.
|
A regex pattern may be provided to filter groups.
|
||||||
@ -160,15 +163,35 @@ class PasswordContext:
|
|||||||
secret_groups = [_kp_group_to_secret_group(group) for group in groups]
|
secret_groups = [_kp_group_to_secret_group(group) for group in groups]
|
||||||
return secret_groups
|
return secret_groups
|
||||||
|
|
||||||
def get_secret_group_list(self, pattern: str | None = None, regex: bool = True) -> list[SecretGroup]:
|
def get_secret_group_list(
|
||||||
|
self, pattern: str | None = None, regex: bool = True
|
||||||
|
) -> list[SecretGroup]:
|
||||||
"""Get a flat list of groups."""
|
"""Get a flat list of groups."""
|
||||||
if pattern:
|
if pattern:
|
||||||
return self.get_secret_groups(pattern, regex)
|
return self.get_secret_groups(pattern, regex)
|
||||||
|
|
||||||
groups = [ group for group in self.keepass.groups if not group.is_root_group ]
|
groups = [group for group in self.keepass.groups if not group.is_root_group]
|
||||||
secret_groups = [_kp_group_to_secret_group(group) for group in groups]
|
secret_groups = [_kp_group_to_secret_group(group) for group in groups]
|
||||||
return secret_groups
|
return secret_groups
|
||||||
|
|
||||||
|
def get_secret_group(self, path: str) -> SecretGroup | None:
|
||||||
|
"""Get a secret group by path."""
|
||||||
|
elements = path.split("/")
|
||||||
|
final_element = elements[-1]
|
||||||
|
|
||||||
|
current = self._root_group
|
||||||
|
while elements:
|
||||||
|
groupname = elements.pop(0)
|
||||||
|
matches = [
|
||||||
|
subgroup for subgroup in current.subgroups if subgroup.name == groupname
|
||||||
|
]
|
||||||
|
if matches:
|
||||||
|
current = matches[0]
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
if not current.is_root_group and current.name == final_element:
|
||||||
|
return _kp_group_to_secret_group(current)
|
||||||
|
return None
|
||||||
|
|
||||||
def get_ungrouped_secrets(self) -> list[str]:
|
def get_ungrouped_secrets(self) -> list[str]:
|
||||||
"""Get secrets without groups."""
|
"""Get secrets without groups."""
|
||||||
@ -193,7 +216,9 @@ class PasswordContext:
|
|||||||
f"Error: Cannot find a parent group named {parent_group}"
|
f"Error: Cannot find a parent group named {parent_group}"
|
||||||
)
|
)
|
||||||
kp_parent_group = query
|
kp_parent_group = query
|
||||||
self.keepass.add_group(destination_group=kp_parent_group, group_name=name, notes=description)
|
self.keepass.add_group(
|
||||||
|
destination_group=kp_parent_group, group_name=name, notes=description
|
||||||
|
)
|
||||||
self.keepass.save()
|
self.keepass.save()
|
||||||
|
|
||||||
def set_group_description(self, name: str, description: str) -> None:
|
def set_group_description(self, name: str, description: str) -> None:
|
||||||
|
|||||||
@ -25,6 +25,7 @@ class SecretListView(BaseModel):
|
|||||||
"""Model containing a list of all available secrets."""
|
"""Model containing a list of all available secrets."""
|
||||||
|
|
||||||
name: str
|
name: str
|
||||||
|
unmanaged: bool = False
|
||||||
clients: list[str] = Field(default_factory=list) # Clients that have access to it.
|
clients: list[str] = Field(default_factory=list) # Clients that have access to it.
|
||||||
|
|
||||||
|
|
||||||
@ -32,7 +33,7 @@ class SecretView(BaseModel):
|
|||||||
"""Model containing a secret, including its clear-text value."""
|
"""Model containing a secret, including its clear-text value."""
|
||||||
|
|
||||||
name: str
|
name: str
|
||||||
secret: str
|
secret: str | None
|
||||||
group: str | None = None
|
group: str | None = None
|
||||||
clients: list[str] = Field(default_factory=list) # Clients that have access to it.
|
clients: list[str] = Field(default_factory=list) # Clients that have access to it.
|
||||||
|
|
||||||
@ -134,6 +135,7 @@ class SecretClientMapping(BaseModel):
|
|||||||
"""Secret name with list of clients."""
|
"""Secret name with list of clients."""
|
||||||
|
|
||||||
name: str # name of secret
|
name: str # name of secret
|
||||||
|
unmanaged: bool = False
|
||||||
clients: list[ClientReference] = Field(default_factory=list)
|
clients: list[ClientReference] = Field(default_factory=list)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -327,9 +327,6 @@
|
|||||||
.start-0 {
|
.start-0 {
|
||||||
inset-inline-start: calc(var(--spacing) * 0);
|
inset-inline-start: calc(var(--spacing) * 0);
|
||||||
}
|
}
|
||||||
.end-2\.5 {
|
|
||||||
inset-inline-end: calc(var(--spacing) * 2.5);
|
|
||||||
}
|
|
||||||
.top-0 {
|
.top-0 {
|
||||||
top: calc(var(--spacing) * 0);
|
top: calc(var(--spacing) * 0);
|
||||||
}
|
}
|
||||||
@ -417,18 +414,12 @@
|
|||||||
.m-361 {
|
.m-361 {
|
||||||
margin: calc(var(--spacing) * 361);
|
margin: calc(var(--spacing) * 361);
|
||||||
}
|
}
|
||||||
.mx-2\.5 {
|
|
||||||
margin-inline: calc(var(--spacing) * 2.5);
|
|
||||||
}
|
|
||||||
.mx-3 {
|
.mx-3 {
|
||||||
margin-inline: calc(var(--spacing) * 3);
|
margin-inline: calc(var(--spacing) * 3);
|
||||||
}
|
}
|
||||||
.mx-4 {
|
.mx-4 {
|
||||||
margin-inline: calc(var(--spacing) * 4);
|
margin-inline: calc(var(--spacing) * 4);
|
||||||
}
|
}
|
||||||
.mx-\[1rem\] {
|
|
||||||
margin-inline: 1rem;
|
|
||||||
}
|
|
||||||
.mx-auto {
|
.mx-auto {
|
||||||
margin-inline: auto;
|
margin-inline: auto;
|
||||||
}
|
}
|
||||||
@ -459,9 +450,6 @@
|
|||||||
.ms-3 {
|
.ms-3 {
|
||||||
margin-inline-start: calc(var(--spacing) * 3);
|
margin-inline-start: calc(var(--spacing) * 3);
|
||||||
}
|
}
|
||||||
.ms-auto {
|
|
||||||
margin-inline-start: auto;
|
|
||||||
}
|
|
||||||
.me-2 {
|
.me-2 {
|
||||||
margin-inline-end: calc(var(--spacing) * 2);
|
margin-inline-end: calc(var(--spacing) * 2);
|
||||||
}
|
}
|
||||||
@ -486,9 +474,6 @@
|
|||||||
.mt-2 {
|
.mt-2 {
|
||||||
margin-top: calc(var(--spacing) * 2);
|
margin-top: calc(var(--spacing) * 2);
|
||||||
}
|
}
|
||||||
.mt-2\.5 {
|
|
||||||
margin-top: calc(var(--spacing) * 2.5);
|
|
||||||
}
|
|
||||||
.mt-3 {
|
.mt-3 {
|
||||||
margin-top: calc(var(--spacing) * 3);
|
margin-top: calc(var(--spacing) * 3);
|
||||||
}
|
}
|
||||||
@ -597,9 +582,6 @@
|
|||||||
.ml-6 {
|
.ml-6 {
|
||||||
margin-left: calc(var(--spacing) * 6);
|
margin-left: calc(var(--spacing) * 6);
|
||||||
}
|
}
|
||||||
.ml-\[1rem\] {
|
|
||||||
margin-left: 1rem;
|
|
||||||
}
|
|
||||||
.ml-auto {
|
.ml-auto {
|
||||||
margin-left: auto;
|
margin-left: auto;
|
||||||
}
|
}
|
||||||
@ -684,9 +666,6 @@
|
|||||||
.h-32 {
|
.h-32 {
|
||||||
height: calc(var(--spacing) * 32);
|
height: calc(var(--spacing) * 32);
|
||||||
}
|
}
|
||||||
.h-48 {
|
|
||||||
height: calc(var(--spacing) * 48);
|
|
||||||
}
|
|
||||||
.h-\[0\.125rem\] {
|
.h-\[0\.125rem\] {
|
||||||
height: 0.125rem;
|
height: 0.125rem;
|
||||||
}
|
}
|
||||||
@ -696,9 +675,6 @@
|
|||||||
.h-\[36rem\] {
|
.h-\[36rem\] {
|
||||||
height: 36rem;
|
height: 36rem;
|
||||||
}
|
}
|
||||||
.h-\[calc\(100\%-1rem\)\] {
|
|
||||||
height: calc(100% - 1rem);
|
|
||||||
}
|
|
||||||
.h-full {
|
.h-full {
|
||||||
height: 100%;
|
height: 100%;
|
||||||
}
|
}
|
||||||
@ -708,9 +684,6 @@
|
|||||||
.max-h-64 {
|
.max-h-64 {
|
||||||
max-height: calc(var(--spacing) * 64);
|
max-height: calc(var(--spacing) * 64);
|
||||||
}
|
}
|
||||||
.max-h-full {
|
|
||||||
max-height: 100%;
|
|
||||||
}
|
|
||||||
.min-h-0 {
|
.min-h-0 {
|
||||||
min-height: calc(var(--spacing) * 0);
|
min-height: calc(var(--spacing) * 0);
|
||||||
}
|
}
|
||||||
@ -1228,9 +1201,6 @@
|
|||||||
--tw-border-style: solid;
|
--tw-border-style: solid;
|
||||||
border-style: solid;
|
border-style: solid;
|
||||||
}
|
}
|
||||||
.border-blue-700 {
|
|
||||||
border-color: var(--color-blue-700);
|
|
||||||
}
|
|
||||||
.border-gray-100 {
|
.border-gray-100 {
|
||||||
border-color: var(--color-gray-100);
|
border-color: var(--color-gray-100);
|
||||||
}
|
}
|
||||||
@ -1240,12 +1210,6 @@
|
|||||||
.border-gray-300 {
|
.border-gray-300 {
|
||||||
border-color: var(--color-gray-300);
|
border-color: var(--color-gray-300);
|
||||||
}
|
}
|
||||||
.border-gray-500 {
|
|
||||||
border-color: var(--color-gray-500);
|
|
||||||
}
|
|
||||||
.border-gray-900 {
|
|
||||||
border-color: var(--color-gray-900);
|
|
||||||
}
|
|
||||||
.border-green-100 {
|
.border-green-100 {
|
||||||
border-color: var(--color-green-100);
|
border-color: var(--color-green-100);
|
||||||
}
|
}
|
||||||
@ -1745,9 +1709,6 @@
|
|||||||
.text-blue-600 {
|
.text-blue-600 {
|
||||||
color: var(--color-blue-600);
|
color: var(--color-blue-600);
|
||||||
}
|
}
|
||||||
.text-blue-700 {
|
|
||||||
color: var(--color-blue-700);
|
|
||||||
}
|
|
||||||
.text-blue-800 {
|
.text-blue-800 {
|
||||||
color: var(--color-blue-800);
|
color: var(--color-blue-800);
|
||||||
}
|
}
|
||||||
@ -2143,20 +2104,6 @@
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
.hover\:bg-blue-700 {
|
|
||||||
&:hover {
|
|
||||||
@media (hover: hover) {
|
|
||||||
background-color: var(--color-blue-700);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
.hover\:bg-blue-800 {
|
|
||||||
&:hover {
|
|
||||||
@media (hover: hover) {
|
|
||||||
background-color: var(--color-blue-800);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
.hover\:bg-gray-50 {
|
.hover\:bg-gray-50 {
|
||||||
&:hover {
|
&:hover {
|
||||||
@media (hover: hover) {
|
@media (hover: hover) {
|
||||||
@ -2624,11 +2571,6 @@
|
|||||||
padding-inline: calc(var(--spacing) * 4);
|
padding-inline: calc(var(--spacing) * 4);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
.sm\:px-16 {
|
|
||||||
@media (width >= 40rem) {
|
|
||||||
padding-inline: calc(var(--spacing) * 16);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
.sm\:py-2 {
|
.sm\:py-2 {
|
||||||
@media (width >= 40rem) {
|
@media (width >= 40rem) {
|
||||||
padding-block: calc(var(--spacing) * 2);
|
padding-block: calc(var(--spacing) * 2);
|
||||||
@ -2870,11 +2812,6 @@
|
|||||||
padding: calc(var(--spacing) * 0);
|
padding: calc(var(--spacing) * 0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
.md\:p-5 {
|
|
||||||
@media (width >= 48rem) {
|
|
||||||
padding: calc(var(--spacing) * 5);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
.md\:p-6 {
|
.md\:p-6 {
|
||||||
@media (width >= 48rem) {
|
@media (width >= 48rem) {
|
||||||
padding: calc(var(--spacing) * 6);
|
padding: calc(var(--spacing) * 6);
|
||||||
@ -3104,12 +3041,6 @@
|
|||||||
line-height: var(--tw-leading, var(--text-6xl--line-height));
|
line-height: var(--tw-leading, var(--text-6xl--line-height));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
.lg\:text-xl {
|
|
||||||
@media (width >= 64rem) {
|
|
||||||
font-size: var(--text-xl);
|
|
||||||
line-height: var(--tw-leading, var(--text-xl--line-height));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
.lg\:hover\:underline {
|
.lg\:hover\:underline {
|
||||||
@media (width >= 64rem) {
|
@media (width >= 64rem) {
|
||||||
&:hover {
|
&:hover {
|
||||||
@ -3237,11 +3168,6 @@
|
|||||||
padding-inline: calc(var(--spacing) * 0);
|
padding-inline: calc(var(--spacing) * 0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
.xl\:px-48 {
|
|
||||||
@media (width >= 80rem) {
|
|
||||||
padding-inline: calc(var(--spacing) * 48);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
.xl\:py-24 {
|
.xl\:py-24 {
|
||||||
@media (width >= 80rem) {
|
@media (width >= 80rem) {
|
||||||
padding-block: calc(var(--spacing) * 24);
|
padding-block: calc(var(--spacing) * 24);
|
||||||
@ -3461,11 +3387,6 @@
|
|||||||
background-color: var(--color-red-700);
|
background-color: var(--color-red-700);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
.dark\:bg-red-900 {
|
|
||||||
&:where(.dark, .dark *) {
|
|
||||||
background-color: var(--color-red-900);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
.dark\:bg-teal-900 {
|
.dark\:bg-teal-900 {
|
||||||
&:where(.dark, .dark *) {
|
&:where(.dark, .dark *) {
|
||||||
background-color: var(--color-teal-900);
|
background-color: var(--color-teal-900);
|
||||||
@ -3516,11 +3437,6 @@
|
|||||||
color: var(--color-gray-600);
|
color: var(--color-gray-600);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
.dark\:text-gray-700 {
|
|
||||||
&:where(.dark, .dark *) {
|
|
||||||
color: var(--color-gray-700);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
.dark\:text-green-400 {
|
.dark\:text-green-400 {
|
||||||
&:where(.dark, .dark *) {
|
&:where(.dark, .dark *) {
|
||||||
color: var(--color-green-400);
|
color: var(--color-green-400);
|
||||||
@ -3561,11 +3477,6 @@
|
|||||||
color: var(--color-purple-500);
|
color: var(--color-purple-500);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
.dark\:text-red-300 {
|
|
||||||
&:where(.dark, .dark *) {
|
|
||||||
color: var(--color-red-300);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
.dark\:text-red-400 {
|
.dark\:text-red-400 {
|
||||||
&:where(.dark, .dark *) {
|
&:where(.dark, .dark *) {
|
||||||
color: var(--color-red-400);
|
color: var(--color-red-400);
|
||||||
|
|||||||
@ -5,7 +5,7 @@ import pykeepass
|
|||||||
import pykeepass.exceptions
|
import pykeepass.exceptions
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from sshecret_admin.services.keepass import PasswordContext, _password_context, PasswordCredentialsError
|
from sshecret_admin.services.keepass import _password_context, PasswordCredentialsError
|
||||||
|
|
||||||
def test_open_invalid_database() -> None:
|
def test_open_invalid_database() -> None:
|
||||||
"""Test opening a non-existing database."""
|
"""Test opening a non-existing database."""
|
||||||
|
|||||||
@ -128,8 +128,9 @@ class ClientOperations:
|
|||||||
"""Delete client."""
|
"""Delete client."""
|
||||||
db_client = await self._get_client(client)
|
db_client = await self._get_client(client)
|
||||||
if not db_client:
|
if not db_client:
|
||||||
return
|
raise HTTPException(status_code=404, detail="Client not found.")
|
||||||
if db_client.is_deleted:
|
if db_client.is_deleted:
|
||||||
|
LOG.warning("Client %r was already deleted!", client)
|
||||||
return
|
return
|
||||||
db_client.is_deleted = True
|
db_client.is_deleted = True
|
||||||
db_client.deleted_at = datetime.now(timezone.utc)
|
db_client.deleted_at = datetime.now(timezone.utc)
|
||||||
@ -271,7 +272,12 @@ async def get_clients(
|
|||||||
filter_query: ClientListParams,
|
filter_query: ClientListParams,
|
||||||
) -> ClientQueryResult:
|
) -> ClientQueryResult:
|
||||||
"""Get Clients."""
|
"""Get Clients."""
|
||||||
count_statement = select(func.count("*")).select_from(Client)
|
count_statement = (
|
||||||
|
select(func.count("*"))
|
||||||
|
.select_from(Client)
|
||||||
|
.where(Client.is_deleted.is_not(True))
|
||||||
|
.where(Client.is_active.is_not(False))
|
||||||
|
)
|
||||||
count_statement = cast(
|
count_statement = cast(
|
||||||
Select[tuple[int]],
|
Select[tuple[int]],
|
||||||
filter_client_statement(count_statement, filter_query, True),
|
filter_client_statement(count_statement, filter_query, True),
|
||||||
|
|||||||
@ -40,7 +40,9 @@ class ClientView(BaseModel):
|
|||||||
return responses
|
return responses
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def from_client(cls, client: models.Client) -> Self:
|
def from_client(
|
||||||
|
cls, client: models.Client, include_deleted_secrets: bool = False
|
||||||
|
) -> Self:
|
||||||
"""Instantiate from a client."""
|
"""Instantiate from a client."""
|
||||||
view = cls(
|
view = cls(
|
||||||
id=client.id,
|
id=client.id,
|
||||||
@ -54,7 +56,12 @@ class ClientView(BaseModel):
|
|||||||
is_deleted=client.is_deleted,
|
is_deleted=client.is_deleted,
|
||||||
)
|
)
|
||||||
if client.secrets:
|
if client.secrets:
|
||||||
view.secrets = [secret.name for secret in client.secrets]
|
if include_deleted_secrets:
|
||||||
|
view.secrets = [secret.name for secret in client.secrets]
|
||||||
|
else:
|
||||||
|
view.secrets = [
|
||||||
|
secret.name for secret in client.secrets if not secret.deleted
|
||||||
|
]
|
||||||
|
|
||||||
if client.policies:
|
if client.policies:
|
||||||
view.policies = [policy.source for policy in client.policies]
|
view.policies = [policy.source for policy in client.policies]
|
||||||
|
|||||||
@ -116,24 +116,18 @@ async def resolve_client_id(
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
async def get_client_by_id(session: AsyncSession, id: uuid.UUID) -> Client | None:
|
async def get_client_by_id(
|
||||||
|
session: AsyncSession, id: uuid.UUID, include_deleted: bool = False
|
||||||
|
) -> Client | None:
|
||||||
"""Get client by ID."""
|
"""Get client by ID."""
|
||||||
client_filter = client_with_relationships().where(Client.id == id)
|
if include_deleted:
|
||||||
|
client_filter = client_with_relationships().where(Client.id == id)
|
||||||
|
else:
|
||||||
|
client_filter = query_active_clients().where(Client.id == id)
|
||||||
client_results = await session.execute(client_filter)
|
client_results = await session.execute(client_filter)
|
||||||
return client_results.scalars().first()
|
return client_results.scalars().first()
|
||||||
|
|
||||||
|
|
||||||
async def get_client_by_id_or_name(
|
|
||||||
session: AsyncSession, id_or_name: str
|
|
||||||
) -> Client | None:
|
|
||||||
"""Get client either by id or name."""
|
|
||||||
if RE_UUID.match(id_or_name):
|
|
||||||
id = uuid.UUID(id_or_name)
|
|
||||||
return await get_client_by_id(session, id)
|
|
||||||
|
|
||||||
return await get_client_by_name(session, id_or_name)
|
|
||||||
|
|
||||||
|
|
||||||
def query_active_clients() -> Select[tuple[Client]]:
|
def query_active_clients() -> Select[tuple[Client]]:
|
||||||
"""Get all active clients."""
|
"""Get all active clients."""
|
||||||
client_filter = (
|
client_filter = (
|
||||||
@ -144,22 +138,6 @@ def query_active_clients() -> Select[tuple[Client]]:
|
|||||||
return client_filter
|
return client_filter
|
||||||
|
|
||||||
|
|
||||||
async def get_client_by_name(session: AsyncSession, name: str) -> Client | None:
|
|
||||||
"""Get client by name.
|
|
||||||
|
|
||||||
This will get the latest client version, unless it's deleted.
|
|
||||||
"""
|
|
||||||
client_filter = (
|
|
||||||
client_with_relationships()
|
|
||||||
.where(Client.is_active.is_(True))
|
|
||||||
.where(Client.is_deleted.is_not(True))
|
|
||||||
.where(Client.name == name)
|
|
||||||
.order_by(Client.version.desc())
|
|
||||||
)
|
|
||||||
client_result = await session.execute(client_filter)
|
|
||||||
return client_result.scalars().first()
|
|
||||||
|
|
||||||
|
|
||||||
async def refresh_client(session: AsyncSession, client: Client) -> None:
|
async def refresh_client(session: AsyncSession, client: Client) -> None:
|
||||||
"""Refresh the client and load in all relationships."""
|
"""Refresh the client and load in all relationships."""
|
||||||
await session.refresh(
|
await session.refresh(
|
||||||
|
|||||||
@ -182,8 +182,10 @@ class ClientSecretOperations:
|
|||||||
|
|
||||||
async def delete_client_secret(self, secret_identifier: FlexID) -> None:
|
async def delete_client_secret(self, secret_identifier: FlexID) -> None:
|
||||||
"""Delete a client secret."""
|
"""Delete a client secret."""
|
||||||
|
LOG.debug("delete_client_secret called with identifier %r", secret_identifier)
|
||||||
client_secret = await self._get_client_secret(secret_identifier)
|
client_secret = await self._get_client_secret(secret_identifier)
|
||||||
if not client_secret:
|
if not client_secret:
|
||||||
|
LOG.warning("Could not find any secret matching client secret.")
|
||||||
return
|
return
|
||||||
|
|
||||||
client_secret.deleted = True
|
client_secret.deleted = True
|
||||||
|
|||||||
@ -72,7 +72,6 @@ def create_client_secrets_router(get_db_session: AsyncDBSessionDep) -> APIRouter
|
|||||||
client_op = ClientSecretOperations(session, request, client)
|
client_op = ClientSecretOperations(session, request, client)
|
||||||
return await client_op.get_client_secret(secret)
|
return await client_op.get_client_secret(secret)
|
||||||
|
|
||||||
# TODO: delete_client_secret
|
|
||||||
@router.delete("/clients/{client_identifier}/secrets/{secret_identifier}")
|
@router.delete("/clients/{client_identifier}/secrets/{secret_identifier}")
|
||||||
async def delete_client_secret(
|
async def delete_client_secret(
|
||||||
request: Request,
|
request: Request,
|
||||||
|
|||||||
@ -7,14 +7,19 @@ import logging
|
|||||||
from typing import cast, final, override
|
from typing import cast, final, override
|
||||||
|
|
||||||
import asyncssh
|
import asyncssh
|
||||||
|
from sshecret_sshd import constants, exceptions
|
||||||
from sshecret_sshd import exceptions, constants
|
|
||||||
|
|
||||||
from .base import CommandDispatcher
|
from .base import CommandDispatcher
|
||||||
from .get_secret import GetSecret
|
from .get_secret import GetSecret
|
||||||
from .register import Register
|
|
||||||
from .list_secrets import ListSecrets
|
from .list_secrets import ListSecrets
|
||||||
from .ping import PingCommand
|
from .ping import PingCommand
|
||||||
|
from .register import Register
|
||||||
|
from .shelldriver import (
|
||||||
|
ShellDeleteSecret,
|
||||||
|
ShellListSecrets,
|
||||||
|
ShellLookupSecret,
|
||||||
|
ShellStoreSecret,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
SYNOPSIS = """[bold]Sshecret SSH Server[/bold]
|
SYNOPSIS = """[bold]Sshecret SSH Server[/bold]
|
||||||
@ -29,9 +34,13 @@ encoded as base64.
|
|||||||
|
|
||||||
COMMANDS = [
|
COMMANDS = [
|
||||||
GetSecret,
|
GetSecret,
|
||||||
Register,
|
|
||||||
ListSecrets,
|
ListSecrets,
|
||||||
PingCommand,
|
PingCommand,
|
||||||
|
Register,
|
||||||
|
ShellDeleteSecret,
|
||||||
|
ShellListSecrets,
|
||||||
|
ShellLookupSecret,
|
||||||
|
ShellStoreSecret,
|
||||||
]
|
]
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|||||||
163
packages/sshecret-sshd/src/sshecret_sshd/commands/shelldriver.py
Normal file
163
packages/sshecret-sshd/src/sshecret_sshd/commands/shelldriver.py
Normal file
@ -0,0 +1,163 @@
|
|||||||
|
"""Podman Shelldriver compatible commands."""
|
||||||
|
|
||||||
|
import logging
|
||||||
|
from typing import final, override
|
||||||
|
import asyncssh
|
||||||
|
|
||||||
|
from sshecret.backend.models import Operation
|
||||||
|
from sshecret.crypto import encrypt_string, load_public_key
|
||||||
|
|
||||||
|
|
||||||
|
from .base import CommandDispatcher
|
||||||
|
|
||||||
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# These error messages are taken verbatim from podman, and while they don't seem
|
||||||
|
# to make complete sense, they will be used regardless.
|
||||||
|
ERR_SECRET_NOT_FOUND = "no such secret"
|
||||||
|
ERR_SECRET_EXISTS = "secret data with ID already exists"
|
||||||
|
ERR_INVALID_SECRET = "invalid key"
|
||||||
|
|
||||||
|
|
||||||
|
@final
|
||||||
|
class ShellListSecrets(CommandDispatcher):
|
||||||
|
"""List secrets.
|
||||||
|
|
||||||
|
This command lists secrets in a format compatible with podman's ShellDriver.
|
||||||
|
"""
|
||||||
|
|
||||||
|
name = "list"
|
||||||
|
|
||||||
|
@override
|
||||||
|
async def exec(self) -> None:
|
||||||
|
"""List secrets."""
|
||||||
|
LOG.debug("ShellListSecret called.")
|
||||||
|
await self.audit(Operation.READ, "Listed available secret names")
|
||||||
|
for secret_name in self.client.secrets:
|
||||||
|
self.print(secret_name)
|
||||||
|
|
||||||
|
|
||||||
|
@final
|
||||||
|
class ShellDeleteSecret(CommandDispatcher):
|
||||||
|
"""Delete a secret.
|
||||||
|
|
||||||
|
If the identifier for a secret does not exist, an error will be printed.
|
||||||
|
"""
|
||||||
|
|
||||||
|
name = "delete"
|
||||||
|
mandatory_argument = "KEY"
|
||||||
|
|
||||||
|
@override
|
||||||
|
async def exec(self) -> None:
|
||||||
|
"""Delete a secret."""
|
||||||
|
secret_name = self.arguments[0]
|
||||||
|
LOG.debug("ShellDeleteSecret called withg arguments %r.", self.arguments)
|
||||||
|
await self.audit(
|
||||||
|
operation=Operation.DELETE,
|
||||||
|
message="ClientSecret deleted",
|
||||||
|
secret=secret_name,
|
||||||
|
)
|
||||||
|
await self.backend.delete_client_secret(
|
||||||
|
("id", str(self.client.id)), ("name", secret_name)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@final
|
||||||
|
class ShellLookupSecret(CommandDispatcher):
|
||||||
|
"""Look up a secret.
|
||||||
|
|
||||||
|
The identifier for the secret must be provided as the argument.
|
||||||
|
"""
|
||||||
|
|
||||||
|
name = "lookup"
|
||||||
|
mandatory_argument = "KEY"
|
||||||
|
|
||||||
|
@override
|
||||||
|
async def exec(self) -> None:
|
||||||
|
"""Lookup secret."""
|
||||||
|
LOG.debug("ShellLookupSecret called with arguments %r", self.arguments)
|
||||||
|
secret_name = self.arguments[0]
|
||||||
|
|
||||||
|
secret = await self.backend.get_client_secret(
|
||||||
|
("id", str(self.client.id)), secret_name
|
||||||
|
)
|
||||||
|
if not secret:
|
||||||
|
LOG.debug(
|
||||||
|
"Secret %s not found for client %s (%s)",
|
||||||
|
secret_name,
|
||||||
|
self.client.id,
|
||||||
|
self.client.name,
|
||||||
|
)
|
||||||
|
self.print(ERR_SECRET_NOT_FOUND, stderr=True)
|
||||||
|
return
|
||||||
|
await self.audit(
|
||||||
|
Operation.READ, message="Client requested secret", secret=secret_name
|
||||||
|
)
|
||||||
|
|
||||||
|
self.print(secret)
|
||||||
|
|
||||||
|
|
||||||
|
@final
|
||||||
|
class ShellStoreSecret(CommandDispatcher):
|
||||||
|
"""Store a secret.
|
||||||
|
|
||||||
|
Secret will be read from command argument, or via STDIN.
|
||||||
|
"""
|
||||||
|
|
||||||
|
name = "store"
|
||||||
|
mandatory_argument = "KEY"
|
||||||
|
|
||||||
|
@override
|
||||||
|
async def exec(self) -> None:
|
||||||
|
"""Store a secret."""
|
||||||
|
LOG.debug("ShellStoreSecret called with arguments %r", self.arguments)
|
||||||
|
secret_name = self.arguments[0]
|
||||||
|
if secret_name in self.client.secrets:
|
||||||
|
self.print(ERR_SECRET_EXISTS, stderr=True)
|
||||||
|
return
|
||||||
|
|
||||||
|
secret_data: str | None = None
|
||||||
|
if len(self.arguments) == 2:
|
||||||
|
secret_data = self.arguments[1]
|
||||||
|
|
||||||
|
if not secret_data:
|
||||||
|
LOG.debug("No secret set as input, trying stdin.")
|
||||||
|
secret_data = await self.get_secret_on_stdin()
|
||||||
|
|
||||||
|
if not secret_data:
|
||||||
|
self.print(ERR_INVALID_SECRET, stderr=True)
|
||||||
|
return
|
||||||
|
|
||||||
|
# Encrypt secret
|
||||||
|
encrypted = self.encrypt_secret(secret_data)
|
||||||
|
|
||||||
|
await self.backend.create_client_secret(
|
||||||
|
("id", str(self.client.id)), secret_name, encrypted
|
||||||
|
)
|
||||||
|
await self.audit(
|
||||||
|
operation=Operation.CREATE,
|
||||||
|
message="Secret created from 'store' command",
|
||||||
|
secret=secret_name,
|
||||||
|
)
|
||||||
|
|
||||||
|
def encrypt_secret(self, value: str) -> str:
|
||||||
|
"""Encrypt a secret."""
|
||||||
|
public_key = load_public_key(self.client.public_key.encode())
|
||||||
|
return encrypt_string(value, public_key)
|
||||||
|
|
||||||
|
async def get_secret_on_stdin(self) -> str | None:
|
||||||
|
"""Get secret from stdin."""
|
||||||
|
secret_data = ""
|
||||||
|
try:
|
||||||
|
async for line in self.process.stdin:
|
||||||
|
if self.process.stdin.at_eof():
|
||||||
|
break
|
||||||
|
if not line:
|
||||||
|
break
|
||||||
|
secret_data += line.rstrip()
|
||||||
|
except asyncssh.BreakReceived:
|
||||||
|
pass
|
||||||
|
|
||||||
|
if not secret_data:
|
||||||
|
return None
|
||||||
|
return secret_data
|
||||||
@ -5,7 +5,7 @@ admin and sshd library do not need to implement the same
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
from typing import Any, Self, override
|
from typing import Any, Literal, Self, override
|
||||||
import httpx
|
import httpx
|
||||||
|
|
||||||
from pydantic import TypeAdapter
|
from pydantic import TypeAdapter
|
||||||
@ -29,6 +29,17 @@ from .utils import validate_public_key
|
|||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
KeyType = Literal["id", "name"]
|
||||||
|
KeySpec = str | tuple[KeyType, str]
|
||||||
|
|
||||||
|
|
||||||
|
def _key(id_or_name: KeySpec) -> str:
|
||||||
|
"""Get the correct key string."""
|
||||||
|
if isinstance(id_or_name, str):
|
||||||
|
return id_or_name
|
||||||
|
prefix, suffix = id_or_name
|
||||||
|
return f"{prefix}:{suffix}"
|
||||||
|
|
||||||
|
|
||||||
class ClientQueryIterator:
|
class ClientQueryIterator:
|
||||||
"""Asynchronous query iterator."""
|
"""Asynchronous query iterator."""
|
||||||
@ -313,56 +324,54 @@ class SshecretBackend(BaseBackend):
|
|||||||
|
|
||||||
return clients
|
return clients
|
||||||
|
|
||||||
async def get_client(self, name: str) -> Client | None:
|
async def get_client(self, id_or_name: KeySpec) -> Client | None:
|
||||||
"""Lookup a client on username."""
|
"""Lookup a client on username."""
|
||||||
path = f"/api/v1/clients/{name}"
|
key = _key(id_or_name)
|
||||||
|
path = f"/api/v1/clients/{key}"
|
||||||
response = await self._get(path)
|
response = await self._get(path)
|
||||||
if response.status_code == 404:
|
if response.status_code == 404:
|
||||||
return None
|
return None
|
||||||
client = Client.model_validate(response.json())
|
client = Client.model_validate(response.json())
|
||||||
return client
|
return client
|
||||||
|
|
||||||
async def get_client_by_id(self, id: str) -> Client | None:
|
async def delete_client(self, id_or_name: KeySpec) -> None:
|
||||||
"""Lookup a client on username."""
|
|
||||||
path = f"/api/v1/clients/id/{id}"
|
|
||||||
response = await self._get(path)
|
|
||||||
if response.status_code == 404:
|
|
||||||
return None
|
|
||||||
client = Client.model_validate(response.json())
|
|
||||||
return client
|
|
||||||
|
|
||||||
async def delete_client(self, client_name: str) -> None:
|
|
||||||
"""Delete a client."""
|
"""Delete a client."""
|
||||||
path = f"/api/v1/clients/{client_name}"
|
key = _key(id_or_name)
|
||||||
response = await self._delete(path)
|
path = f"/api/v1/clients/{key}"
|
||||||
|
|
||||||
async def delete_client_by_id(self, id: str) -> None:
|
|
||||||
"""Delete a client."""
|
|
||||||
path = f"/api/v1/clients/id/{id}"
|
|
||||||
response = await self._delete(path)
|
response = await self._delete(path)
|
||||||
|
LOG.debug("response: %s", response.text)
|
||||||
|
|
||||||
async def create_client_secret(
|
async def create_client_secret(
|
||||||
self, client_name: str, secret_name: str, encrypted_secret: str
|
self, client_idname: KeySpec, secret_name: str, encrypted_secret: str
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Create a secret.
|
"""Create a secret.
|
||||||
|
|
||||||
This will overwrite any existing secret with that name.
|
This will overwrite any existing secret with that name.
|
||||||
"""
|
"""
|
||||||
path = f"api/v1/clients/{client_name}/secrets/{secret_name}"
|
client_key = _key(client_idname)
|
||||||
|
path = f"api/v1/clients/{client_key}/secrets/{secret_name}"
|
||||||
response = await self._put(path, json={"value": encrypted_secret})
|
response = await self._put(path, json={"value": encrypted_secret})
|
||||||
|
|
||||||
async def get_client_secret(self, name: str, secret_name: str) -> str | None:
|
async def get_client_secret(
|
||||||
|
self, client_idname: KeySpec, secret_idname: KeySpec
|
||||||
|
) -> str | None:
|
||||||
"""Fetch a secret."""
|
"""Fetch a secret."""
|
||||||
path = f"/api/v1/clients/{name}/secrets/{secret_name}"
|
client_key = _key(client_idname)
|
||||||
|
secret_key = _key(secret_idname)
|
||||||
|
path = f"/api/v1/clients/{client_key}/secrets/{secret_key}"
|
||||||
response = await self._get(path)
|
response = await self._get(path)
|
||||||
if response.status_code == 404:
|
if response.status_code == 404:
|
||||||
return None
|
return None
|
||||||
secret = ClientSecret.model_validate(response.json())
|
secret = ClientSecret.model_validate(response.json())
|
||||||
return secret.secret
|
return secret.secret
|
||||||
|
|
||||||
async def delete_client_secret(self, client_name: str, secret_name: str) -> None:
|
async def delete_client_secret(
|
||||||
|
self, client_idname: KeySpec, secret_idname: KeySpec
|
||||||
|
) -> None:
|
||||||
"""Delete a secret from a client."""
|
"""Delete a secret from a client."""
|
||||||
path = f"api/v1/clients/{client_name}/secrets/{secret_name}"
|
client_key = _key(client_idname)
|
||||||
|
secret_key = _key(secret_idname)
|
||||||
|
path = f"api/v1/clients/{client_key}/secrets/{secret_key}"
|
||||||
await self._delete(path)
|
await self._delete(path)
|
||||||
|
|
||||||
async def update_client(self, client: Client) -> Client:
|
async def update_client(self, client: Client) -> Client:
|
||||||
@ -382,13 +391,14 @@ class SshecretBackend(BaseBackend):
|
|||||||
)
|
)
|
||||||
return client
|
return client
|
||||||
|
|
||||||
async def update_client_key(self, client_name: str, public_key: str) -> None:
|
async def update_client_key(self, client_idname: KeySpec, public_key: str) -> None:
|
||||||
"""Update the client key."""
|
"""Update the client key."""
|
||||||
path = f"/api/v1/clients/{client_name}/public-key"
|
client_key = _key(client_idname)
|
||||||
|
path = f"/api/v1/clients/{client_key}/public-key"
|
||||||
await self._post(path, json={"public_key": public_key})
|
await self._post(path, json={"public_key": public_key})
|
||||||
|
|
||||||
async def update_client_sources(
|
async def update_client_sources(
|
||||||
self, client_name: str, addresses: list[str] | None
|
self, client_idname: KeySpec, addresses: list[str] | None
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Update client source addresses.
|
"""Update client source addresses.
|
||||||
|
|
||||||
@ -397,36 +407,32 @@ class SshecretBackend(BaseBackend):
|
|||||||
if not addresses:
|
if not addresses:
|
||||||
addresses = []
|
addresses = []
|
||||||
|
|
||||||
path = f"/api/v1/clients/{client_name}/policies/"
|
client_key = _key(client_idname)
|
||||||
|
path = f"/api/v1/clients/{client_key}/policies/"
|
||||||
await self._put(path, json={"sources": addresses})
|
await self._put(path, json={"sources": addresses})
|
||||||
|
|
||||||
async def get_detailed_secrets(self) -> list[DetailedSecrets]:
|
async def get_detailed_secrets(self) -> list[DetailedSecrets]:
|
||||||
"""Get detailed list of secrets."""
|
"""Get detailed list of secrets."""
|
||||||
path = "/api/v1/secrets/detailed/"
|
path = "/api/v1/secrets/"
|
||||||
response = await self._get(path)
|
response = await self._get(path)
|
||||||
|
|
||||||
secret_list = TypeAdapter(list[DetailedSecrets])
|
secret_list = TypeAdapter(list[DetailedSecrets])
|
||||||
return secret_list.validate_python(response.json())
|
return secret_list.validate_python(response.json())
|
||||||
|
|
||||||
async def get_secrets(self) -> list[Secret]:
|
async def get_secrets(self) -> list[Secret]:
|
||||||
"""Get Secrets.
|
"""Get detailed list of secrets."""
|
||||||
|
|
||||||
This provides a list of secret names and which clients have them.
|
|
||||||
"""
|
|
||||||
path = "/api/v1/secrets/"
|
path = "/api/v1/secrets/"
|
||||||
response = await self._get(path)
|
response = await self._get(path)
|
||||||
|
|
||||||
secret_list = TypeAdapter(list[Secret])
|
detailed_secret_list = TypeAdapter(list[DetailedSecrets])
|
||||||
return secret_list.validate_python(response.json())
|
detailed_secrets = detailed_secret_list.validate_python(response.json())
|
||||||
|
# Convert to list of secrets
|
||||||
|
secrets: list[Secret] = []
|
||||||
|
for detail in detailed_secrets:
|
||||||
|
clients = [ref.name for ref in detail.clients]
|
||||||
|
secrets.append(Secret(name=detail.name, clients=clients))
|
||||||
|
|
||||||
async def get_secret(self, name: str) -> Secret | None:
|
return secrets
|
||||||
"""Get clients mapped to a single secret."""
|
|
||||||
path = f"/api/v1/secrets/{name}"
|
|
||||||
response = await self._get(path)
|
|
||||||
if response.status_code == 404:
|
|
||||||
return None
|
|
||||||
|
|
||||||
return Secret.model_validate(response.json())
|
|
||||||
|
|
||||||
async def get_detailed_secret(self, name: str) -> DetailedSecrets | None:
|
async def get_detailed_secret(self, name: str) -> DetailedSecrets | None:
|
||||||
"""Get clients mapped to a single secret."""
|
"""Get clients mapped to a single secret."""
|
||||||
@ -437,6 +443,16 @@ class SshecretBackend(BaseBackend):
|
|||||||
|
|
||||||
return DetailedSecrets.model_validate(response.json())
|
return DetailedSecrets.model_validate(response.json())
|
||||||
|
|
||||||
|
async def get_secret(self, idname: KeySpec) -> DetailedSecrets | None:
|
||||||
|
"""Get clients mapped to a single secret."""
|
||||||
|
secret_key = _key(idname)
|
||||||
|
path = f"/api/v1/secrets/{secret_key}"
|
||||||
|
response = await self._get(path)
|
||||||
|
if response.status_code == 404:
|
||||||
|
return None
|
||||||
|
|
||||||
|
return DetailedSecrets.model_validate(response.json())
|
||||||
|
|
||||||
def audit(self, subsystem: SubSystem) -> AuditAPI:
|
def audit(self, subsystem: SubSystem) -> AuditAPI:
|
||||||
"""Create the audit API."""
|
"""Create the audit API."""
|
||||||
audit = AuditAPI(self._backend_url, self._api_token, subsystem)
|
audit = AuditAPI(self._backend_url, self._api_token, subsystem)
|
||||||
|
|||||||
@ -51,7 +51,7 @@ async def test_create_secret(backend_api: SshecretBackend) -> None:
|
|||||||
assert secret_to_client is not None
|
assert secret_to_client is not None
|
||||||
|
|
||||||
assert secret_to_client.name == "mysecret"
|
assert secret_to_client.name == "mysecret"
|
||||||
assert "test" in secret_to_client.clients
|
assert secret_to_client.clients[0].name == "test"
|
||||||
|
|
||||||
secret = await backend_api.get_client_secret("test", "mysecret")
|
secret = await backend_api.get_client_secret("test", "mysecret")
|
||||||
|
|
||||||
|
|||||||
@ -3,8 +3,9 @@
|
|||||||
This essentially also tests parts of the admin API.
|
This essentially also tests parts of the admin API.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
from typing import AsyncIterator
|
from collections.abc import AsyncIterator
|
||||||
import os
|
import os
|
||||||
import httpx
|
import httpx
|
||||||
|
|
||||||
@ -17,7 +18,33 @@ from .clients import create_test_client, ClientData
|
|||||||
from .types import CommandRunner, ProcessRunner
|
from .types import CommandRunner, ProcessRunner
|
||||||
|
|
||||||
|
|
||||||
class TestSshd:
|
class BaseSSHTests:
|
||||||
|
"""Base test class."""
|
||||||
|
|
||||||
|
async def register_client(
|
||||||
|
self, name: str, ssh_session: ProcessRunner
|
||||||
|
) -> ClientData:
|
||||||
|
"""Register client."""
|
||||||
|
test_client = create_test_client(name)
|
||||||
|
async with ssh_session(test_client, "register") as session:
|
||||||
|
maxlines = 10
|
||||||
|
linenum = 0
|
||||||
|
found = False
|
||||||
|
while linenum < maxlines:
|
||||||
|
line = await session.stdout.readline()
|
||||||
|
if "Enter public key" in line:
|
||||||
|
found = True
|
||||||
|
break
|
||||||
|
assert found is True
|
||||||
|
session.stdin.write(test_client.public_key + "\n")
|
||||||
|
|
||||||
|
result = await session.stdout.read()
|
||||||
|
assert "Key is valid. Registering client." in result
|
||||||
|
await session.wait()
|
||||||
|
return test_client
|
||||||
|
|
||||||
|
|
||||||
|
class TestSshd(BaseSSHTests):
|
||||||
"""Class based tests.
|
"""Class based tests.
|
||||||
|
|
||||||
This allows us to create small helpers.
|
This allows us to create small helpers.
|
||||||
@ -52,30 +79,9 @@ class TestSshd:
|
|||||||
client = clients[0]
|
client = clients[0]
|
||||||
assert client.name == "new_client"
|
assert client.name == "new_client"
|
||||||
|
|
||||||
async def register_client(
|
|
||||||
self, name: str, ssh_session: ProcessRunner
|
|
||||||
) -> ClientData:
|
|
||||||
"""Register client."""
|
|
||||||
test_client = create_test_client(name)
|
|
||||||
async with ssh_session(test_client, "register") as session:
|
|
||||||
maxlines = 10
|
|
||||||
linenum = 0
|
|
||||||
found = False
|
|
||||||
while linenum < maxlines:
|
|
||||||
line = await session.stdout.readline()
|
|
||||||
if "Enter public key" in line:
|
|
||||||
found = True
|
|
||||||
break
|
|
||||||
assert found is True
|
|
||||||
session.stdin.write(test_client.public_key + "\n")
|
|
||||||
|
|
||||||
result = await session.stdout.read()
|
|
||||||
assert "Key is valid. Registering client." in result
|
|
||||||
await session.wait()
|
|
||||||
return test_client
|
|
||||||
|
|
||||||
|
|
||||||
class TestSshdIntegration(TestSshd):
|
class TestSshdIntegration(BaseSSHTests):
|
||||||
"""Integration tests."""
|
"""Integration tests."""
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
@ -137,3 +143,185 @@ class TestSshdIntegration(TestSshd):
|
|||||||
base_url=url, headers={"Authorization": f"Bearer {token}"}
|
base_url=url, headers={"Authorization": f"Bearer {token}"}
|
||||||
) as client:
|
) as client:
|
||||||
yield client
|
yield client
|
||||||
|
|
||||||
|
|
||||||
|
class TestShelldriverCommands(BaseSSHTests):
|
||||||
|
"""Shelldriver command tests."""
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_store_and_lookup(
|
||||||
|
self,
|
||||||
|
backend_server: tuple[str, tuple[str, str]],
|
||||||
|
ssh_session: ProcessRunner,
|
||||||
|
ssh_command_runner: CommandRunner,
|
||||||
|
) -> None:
|
||||||
|
"""Log in."""
|
||||||
|
test_client = await self.register_client("myclient", ssh_session)
|
||||||
|
ssh_output = await ssh_command_runner(test_client, "store mysecret secretvalue")
|
||||||
|
assert bool(ssh_output.stderr) is False
|
||||||
|
|
||||||
|
# wait half a second
|
||||||
|
await asyncio.sleep(0.5)
|
||||||
|
|
||||||
|
ssh_output = await ssh_command_runner(test_client, "lookup mysecret")
|
||||||
|
assert bool(ssh_output.stderr) is False
|
||||||
|
assert ssh_output.stdout is not None
|
||||||
|
assert isinstance(ssh_output.stdout, str)
|
||||||
|
encrypted = ssh_output.stdout.rstrip()
|
||||||
|
decrypted = decode_string(encrypted, test_client.private_key)
|
||||||
|
assert decrypted == "secretvalue"
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_store_and_lookup_stdin(
|
||||||
|
self,
|
||||||
|
backend_server: tuple[str, tuple[str, str]],
|
||||||
|
ssh_session: ProcessRunner,
|
||||||
|
ssh_command_runner: CommandRunner,
|
||||||
|
) -> None:
|
||||||
|
"""Test store and lookup, with password specification in stdin."""
|
||||||
|
test_client = await self.register_client("myclient", ssh_session)
|
||||||
|
async with ssh_session(test_client, "store insecret") as session:
|
||||||
|
session.stdin.write("testinput\n")
|
||||||
|
session.stdin.write_eof()
|
||||||
|
|
||||||
|
await session.stdin.wait_closed()
|
||||||
|
|
||||||
|
await asyncio.sleep(0.5)
|
||||||
|
ssh_output = await ssh_command_runner(test_client, "lookup insecret")
|
||||||
|
assert bool(ssh_output.stderr) is False
|
||||||
|
assert ssh_output.stdout is not None
|
||||||
|
assert isinstance(ssh_output.stdout, str)
|
||||||
|
encrypted = ssh_output.stdout.rstrip()
|
||||||
|
decrypted = decode_string(encrypted, test_client.private_key)
|
||||||
|
assert decrypted == "testinput"
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("secret_name",["nonexistant", "blåbærgrød", "../../../etc/shadow"])
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_invalid_lookup(
|
||||||
|
self,
|
||||||
|
secret_name: str,
|
||||||
|
ssh_command_runner: CommandRunner,
|
||||||
|
ssh_session: ProcessRunner,
|
||||||
|
) -> None:
|
||||||
|
"""Test lookup with invalid secret."""
|
||||||
|
test_client = await self.register_client("myclient", ssh_session)
|
||||||
|
ssh_output = await ssh_command_runner(test_client, f"lookup {secret_name}")
|
||||||
|
assert isinstance(ssh_output.stderr, str)
|
||||||
|
assert ssh_output.stderr.rstrip() == "no such secret"
|
||||||
|
|
||||||
|
|
||||||
|
class TestShelldriverListCommand(BaseSSHTests):
|
||||||
|
"""Tests for the list command."""
|
||||||
|
|
||||||
|
@pytest.fixture(name="secret_names")
|
||||||
|
def get_secret_names(self) -> list[str]:
|
||||||
|
"""Get secret names.
|
||||||
|
|
||||||
|
Sort of like a parametrize function.
|
||||||
|
"""
|
||||||
|
return ["foo", "bar", "abc123", "blåbærgrød"]
|
||||||
|
|
||||||
|
@pytest.fixture(name="test_client")
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def create_test_client(
|
||||||
|
self,
|
||||||
|
ssh_session: ProcessRunner,
|
||||||
|
) -> ClientData:
|
||||||
|
"""Register a test client."""
|
||||||
|
return await self.register_client("listclient", ssh_session)
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def create_data(
|
||||||
|
self,
|
||||||
|
secret_names: list[str],
|
||||||
|
test_client: ClientData,
|
||||||
|
ssh_command_runner: CommandRunner,
|
||||||
|
) -> None:
|
||||||
|
"""Create data for the test."""
|
||||||
|
for name in secret_names:
|
||||||
|
ssh_output = await ssh_command_runner(test_client, f"store {name} secretvalue")
|
||||||
|
assert bool(ssh_output.stderr) is False
|
||||||
|
assert ssh_output.stdout is not None
|
||||||
|
assert isinstance(ssh_output.stdout, str)
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_list_secrets(
|
||||||
|
self,
|
||||||
|
secret_names: list[str],
|
||||||
|
test_client: ClientData,
|
||||||
|
ssh_command_runner: CommandRunner,
|
||||||
|
) -> None:
|
||||||
|
"""Test list command."""
|
||||||
|
ssh_output = await ssh_command_runner(test_client, "list")
|
||||||
|
assert bool(ssh_output.stderr) is False
|
||||||
|
assert ssh_output.stdout is not None
|
||||||
|
assert isinstance(ssh_output.stdout, str)
|
||||||
|
|
||||||
|
list_output = ssh_output.stdout
|
||||||
|
print(list_output)
|
||||||
|
input_secret_names = list_output.splitlines()
|
||||||
|
assert len(input_secret_names) == len(secret_names)
|
||||||
|
|
||||||
|
assert sorted(secret_names) == sorted(input_secret_names)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("secret_name", ["simple", "blåbærgrød"])
|
||||||
|
class TestShelldriverDeleteCommand(BaseSSHTests):
|
||||||
|
"""Tests for the list command."""
|
||||||
|
|
||||||
|
@pytest.fixture(name="test_client")
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def create_test_client(
|
||||||
|
self,
|
||||||
|
ssh_session: ProcessRunner,
|
||||||
|
) -> ClientData:
|
||||||
|
"""Register a test client."""
|
||||||
|
return await self.register_client("listclient", ssh_session)
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def create_data(
|
||||||
|
self,
|
||||||
|
secret_name: str,
|
||||||
|
test_client: ClientData,
|
||||||
|
ssh_command_runner: CommandRunner,
|
||||||
|
) -> None:
|
||||||
|
"""Create data for the test."""
|
||||||
|
ssh_output = await ssh_command_runner(test_client, f"store {secret_name} secretvalue")
|
||||||
|
assert bool(ssh_output.stderr) is False
|
||||||
|
assert ssh_output.stdout is not None
|
||||||
|
assert isinstance(ssh_output.stdout, str)
|
||||||
|
|
||||||
|
|
||||||
|
async def get_stored_secrets(
|
||||||
|
self,
|
||||||
|
test_client: ClientData,
|
||||||
|
ssh_command_runner: CommandRunner,
|
||||||
|
) -> list[str]:
|
||||||
|
"""Get stored secrets."""
|
||||||
|
ssh_output = await ssh_command_runner(test_client, "list")
|
||||||
|
assert bool(ssh_output.stderr) is False
|
||||||
|
assert ssh_output.stdout is not None
|
||||||
|
assert isinstance(ssh_output.stdout, str)
|
||||||
|
|
||||||
|
list_output = ssh_output.stdout
|
||||||
|
print(list_output)
|
||||||
|
input_secret_names = list_output.splitlines()
|
||||||
|
return sorted(input_secret_names)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_delete_secret(
|
||||||
|
self,
|
||||||
|
secret_name: str,
|
||||||
|
test_client: ClientData,
|
||||||
|
ssh_command_runner: CommandRunner,
|
||||||
|
) -> None:
|
||||||
|
"""Delete secret."""
|
||||||
|
current_secrets = await self.get_stored_secrets(test_client, ssh_command_runner)
|
||||||
|
assert secret_name in current_secrets
|
||||||
|
ssh_output = await ssh_command_runner(test_client, f"delete {secret_name}")
|
||||||
|
assert bool(ssh_output.stderr) is False
|
||||||
|
current_secrets = await self.get_stored_secrets(test_client, ssh_command_runner)
|
||||||
|
assert secret_name not in current_secrets
|
||||||
|
|||||||
@ -119,6 +119,9 @@ def test_delete_client(test_client: TestClient) -> None:
|
|||||||
resp = test_client.get("/api/v1/clients/test")
|
resp = test_client.get("/api/v1/clients/test")
|
||||||
assert resp.status_code == 404
|
assert resp.status_code == 404
|
||||||
|
|
||||||
|
resp = test_client.get("/api/v1/clients/")
|
||||||
|
assert resp.status_code == 200
|
||||||
|
|
||||||
|
|
||||||
def test_add_secret(test_client: TestClient) -> None:
|
def test_add_secret(test_client: TestClient) -> None:
|
||||||
"""Test adding a secret to a client."""
|
"""Test adding a secret to a client."""
|
||||||
|
|||||||
Reference in New Issue
Block a user