Compare commits
5 Commits
f312edabd7
...
sftp-suppo
| Author | SHA1 | Date | |
|---|---|---|---|
| 49cd23b21b | |||
| 25dfefccb0 | |||
| d6357c8a88 | |||
| 08b85ab3bb | |||
| e346949953 |
@ -188,10 +188,9 @@ def create_router(dependencies: AdminDependencies) -> APIRouter:
|
|||||||
dependencies.settings, data=token_data, provider=claims.provider
|
dependencies.settings, data=token_data, provider=claims.provider
|
||||||
)
|
)
|
||||||
|
|
||||||
path = f"/auth_cb#access_token={access_token}&refresh_token={refresh_token}"
|
callback_url = f"/auth_cb#access_token={access_token}&refresh_token={refresh_token}"
|
||||||
callback_url = os.path.join("admin", path)
|
|
||||||
if dependencies.settings.frontend_test_url:
|
if dependencies.settings.frontend_test_url:
|
||||||
callback_url = os.path.join(dependencies.settings.frontend_test_url, path)
|
callback_url = os.path.join(dependencies.settings.frontend_test_url, callback_url)
|
||||||
origin = "UNKNOWN"
|
origin = "UNKNOWN"
|
||||||
if request.client:
|
if request.client:
|
||||||
origin = request.client.host
|
origin = request.client.host
|
||||||
|
|||||||
@ -120,7 +120,6 @@ def create_admin_app(
|
|||||||
@app.get("/")
|
@app.get("/")
|
||||||
def serve_frontend(request: Request) -> FileResponse:
|
def serve_frontend(request: Request) -> FileResponse:
|
||||||
"""Serve the frontend SPA index."""
|
"""Serve the frontend SPA index."""
|
||||||
LOG.info("Got this request: %r", request.url)
|
|
||||||
if not settings.frontend_dir:
|
if not settings.frontend_dir:
|
||||||
raise HTTPException(status_code=404, detail="Not found.")
|
raise HTTPException(status_code=404, detail="Not found.")
|
||||||
return FileResponse(settings.frontend_dir / "index.html")
|
return FileResponse(settings.frontend_dir / "index.html")
|
||||||
@ -128,7 +127,6 @@ def create_admin_app(
|
|||||||
@app.get("/{frontend_path:path}")
|
@app.get("/{frontend_path:path}")
|
||||||
def serve_frontend_path(frontend_path: str) -> FileResponse:
|
def serve_frontend_path(frontend_path: str) -> FileResponse:
|
||||||
"""Serve the frontend SPA.."""
|
"""Serve the frontend SPA.."""
|
||||||
LOG.info("Got request for %s", frontend_path)
|
|
||||||
if not settings.frontend_dir:
|
if not settings.frontend_dir:
|
||||||
raise HTTPException(status_code=404, detail="Not found.")
|
raise HTTPException(status_code=404, detail="Not found.")
|
||||||
static_file = settings.frontend_dir / frontend_path
|
static_file = settings.frontend_dir / frontend_path
|
||||||
|
|||||||
@ -1,285 +0,0 @@
|
|||||||
"""Unit tests for the password context class.
|
|
||||||
|
|
||||||
We are primarily testing whether the actions of the PasswordContext matches
|
|
||||||
those in the low level API.
|
|
||||||
|
|
||||||
"""
|
|
||||||
|
|
||||||
import random
|
|
||||||
import string
|
|
||||||
from typing import cast
|
|
||||||
import pykeepass
|
|
||||||
|
|
||||||
from sshecret_admin.services.keepass import PasswordContext
|
|
||||||
|
|
||||||
|
|
||||||
def random_string(length: int = 5) -> str:
|
|
||||||
"""Generate random string."""
|
|
||||||
chars = string.ascii_lowercase
|
|
||||||
return "".join(random.choice(chars) for _ in range(length))
|
|
||||||
|
|
||||||
|
|
||||||
def create_random_entries(
|
|
||||||
password_database: pykeepass.PyKeePass,
|
|
||||||
amount: int = 10,
|
|
||||||
prefix: str = "secret",
|
|
||||||
group: pykeepass.group.Group | None = None,
|
|
||||||
) -> None:
|
|
||||||
"""Create some random entries."""
|
|
||||||
if not group:
|
|
||||||
group = cast(pykeepass.group.Group, password_database.root_group)
|
|
||||||
for n in range(amount):
|
|
||||||
name = f"{prefix}-{n}"
|
|
||||||
username = "NONE"
|
|
||||||
password = random_string(12)
|
|
||||||
password_database.add_entry(
|
|
||||||
destination_group=group,
|
|
||||||
title=name,
|
|
||||||
username=username,
|
|
||||||
password=password,
|
|
||||||
)
|
|
||||||
password_database.save()
|
|
||||||
|
|
||||||
|
|
||||||
def test_add_entry(password_database: pykeepass.PyKeePass) -> None:
|
|
||||||
"""Test add entry."""
|
|
||||||
context = PasswordContext(password_database)
|
|
||||||
context.add_entry("testentry", "testsecret")
|
|
||||||
|
|
||||||
entry = password_database.find_entries(title="testentry", first=True)
|
|
||||||
assert entry is not None
|
|
||||||
assert isinstance(entry, pykeepass.entry.Entry)
|
|
||||||
assert entry.password == "testsecret"
|
|
||||||
|
|
||||||
|
|
||||||
def test_get_secret(password_database: pykeepass.PyKeePass) -> None:
|
|
||||||
"""Test get secret."""
|
|
||||||
|
|
||||||
context = PasswordContext(password_database)
|
|
||||||
context.add_entry("testentry", "testsecret")
|
|
||||||
|
|
||||||
secret = context.get_secret("testentry")
|
|
||||||
assert secret is not None
|
|
||||||
assert secret == "testsecret"
|
|
||||||
|
|
||||||
|
|
||||||
def test_get_available_secrets(password_database: pykeepass.PyKeePass) -> None:
|
|
||||||
"""Test get_available_secrets."""
|
|
||||||
create_random_entries(password_database, 10)
|
|
||||||
context = PasswordContext(password_database)
|
|
||||||
available_secrets = context.get_available_secrets()
|
|
||||||
assert len(available_secrets) == 10
|
|
||||||
for n in range(10):
|
|
||||||
assert f"secret-{n}" in available_secrets
|
|
||||||
|
|
||||||
|
|
||||||
def test_delete_entry(password_database: pykeepass.PyKeePass) -> None:
|
|
||||||
"""Test deletion of entry."""
|
|
||||||
create_random_entries(password_database, 3)
|
|
||||||
context = PasswordContext(password_database)
|
|
||||||
available_secrets = context.get_available_secrets()
|
|
||||||
assert len(available_secrets) == 3
|
|
||||||
context.delete_entry("secret-2")
|
|
||||||
entry = password_database.find_entries(title="secret-2", first=True)
|
|
||||||
assert entry is None
|
|
||||||
available_secrets = context.get_available_secrets()
|
|
||||||
assert len(available_secrets) == 2
|
|
||||||
|
|
||||||
|
|
||||||
def test_get_secret_groups(password_database: pykeepass.PyKeePass) -> None:
|
|
||||||
"""Test get secret groups."""
|
|
||||||
# We create a hierarchy of groups to test how that parses.
|
|
||||||
root_group = password_database.root_group
|
|
||||||
first_group = password_database.add_group(
|
|
||||||
root_group, "level_one", notes="A group in the root"
|
|
||||||
)
|
|
||||||
password_database.add_group(
|
|
||||||
first_group, "level_two", notes="A group one level down"
|
|
||||||
)
|
|
||||||
# Another group at the root, without a note
|
|
||||||
password_database.add_group(root_group, "free_group")
|
|
||||||
password_database.save()
|
|
||||||
|
|
||||||
context = PasswordContext(password_database)
|
|
||||||
groups = context.get_secret_groups()
|
|
||||||
assert len(groups) == 3
|
|
||||||
|
|
||||||
|
|
||||||
def test_get_secret_groups_regex(password_database: pykeepass.PyKeePass) -> None:
|
|
||||||
"""Get secret groups matching a regex."""
|
|
||||||
# create some groups matching a pattern
|
|
||||||
for n in range(4):
|
|
||||||
password_database.add_group(password_database.root_group, f"foo-{n}")
|
|
||||||
|
|
||||||
for n in range(3):
|
|
||||||
parent_group = password_database.find_groups(name="foo-1", first=True)
|
|
||||||
password_database.add_group(parent_group, f"bar-{n}")
|
|
||||||
|
|
||||||
password_database.save()
|
|
||||||
|
|
||||||
context = PasswordContext(password_database)
|
|
||||||
foo_groups = context.get_secret_groups("foo-.*")
|
|
||||||
assert len(foo_groups) == 4
|
|
||||||
bar_groups = context.get_secret_groups("bar-.*")
|
|
||||||
assert len(bar_groups) == 3
|
|
||||||
|
|
||||||
|
|
||||||
def test_get_secret_groups_with_entries(password_database: pykeepass.PyKeePass) -> None:
|
|
||||||
"""Get secret groups with entries."""
|
|
||||||
context = PasswordContext(password_database)
|
|
||||||
context.add_group("test_group", "Test Group")
|
|
||||||
context.add_group("parent_group", "Test Group")
|
|
||||||
context.add_group("nested_group", "Test Group", "parent_group")
|
|
||||||
context.add_entry("free_entry", "test", group_name="test_group")
|
|
||||||
context.add_entry("middle_entry", "test", group_name="parent_group")
|
|
||||||
context.add_entry("lower_entry", "test", group_name="nested_group")
|
|
||||||
|
|
||||||
groups = context.get_secret_groups()
|
|
||||||
assert len(groups) == 3
|
|
||||||
for group in groups:
|
|
||||||
assert len(group.entries) == 1
|
|
||||||
if group.name == "test_group":
|
|
||||||
assert "free_entry" in group.entries
|
|
||||||
elif group.name == "parent_group":
|
|
||||||
assert "middle_entry" in group.entries
|
|
||||||
elif group.name == "nested_group":
|
|
||||||
assert "lower_entry" in group.entries
|
|
||||||
|
|
||||||
|
|
||||||
def test_add_group(password_database: pykeepass.PyKeePass) -> None:
|
|
||||||
"""Test add_group."""
|
|
||||||
context = PasswordContext(password_database)
|
|
||||||
context.add_group("test_group", "Test Group")
|
|
||||||
assert password_database.find_groups(name="test_group", first=True) is not None
|
|
||||||
|
|
||||||
# add a nested group below the first one
|
|
||||||
context.add_group("nested_group", "Nested test group", "test_group")
|
|
||||||
group = password_database.find_groups(name="nested_group", first=True)
|
|
||||||
assert group is not None
|
|
||||||
assert isinstance(group, pykeepass.group.Group)
|
|
||||||
parent_group = group.parentgroup
|
|
||||||
assert parent_group.name == "test_group"
|
|
||||||
|
|
||||||
|
|
||||||
def test_set_group_description(password_database: pykeepass.PyKeePass) -> None:
|
|
||||||
"""Test setting the group description."""
|
|
||||||
context = PasswordContext(password_database)
|
|
||||||
context.add_group("test_group", "Test Group")
|
|
||||||
|
|
||||||
kp_group = password_database.find_groups(name="test_group", first=True)
|
|
||||||
assert isinstance(kp_group, pykeepass.group.Group)
|
|
||||||
assert kp_group.notes == "Test Group"
|
|
||||||
|
|
||||||
context.set_group_description("test_group", "New Description")
|
|
||||||
|
|
||||||
kp_group = password_database.find_groups(name="test_group", first=True)
|
|
||||||
assert isinstance(kp_group, pykeepass.group.Group)
|
|
||||||
assert kp_group.notes == "New Description"
|
|
||||||
|
|
||||||
|
|
||||||
def test_add_entry_with_group(password_database: pykeepass.PyKeePass) -> None:
|
|
||||||
"""Test adding an entry with a group."""
|
|
||||||
context = PasswordContext(password_database)
|
|
||||||
context.add_group("test_group", "A test group")
|
|
||||||
context.add_entry("test_entry", "test_secret", group_name="test_group")
|
|
||||||
|
|
||||||
entry = password_database.find_entries(title="test_entry", first=True)
|
|
||||||
assert entry is not None
|
|
||||||
|
|
||||||
assert isinstance(entry, pykeepass.entry.Entry)
|
|
||||||
assert entry.group.name == "test_group"
|
|
||||||
|
|
||||||
|
|
||||||
def test_move_entry(password_database: pykeepass.PyKeePass) -> None:
|
|
||||||
"""Test moving entries between groups."""
|
|
||||||
context = PasswordContext(password_database)
|
|
||||||
|
|
||||||
context.add_group("test_group", "A test group")
|
|
||||||
context.add_group("test_group_2", "Another test group")
|
|
||||||
|
|
||||||
context.add_entry("test_entry", "test_secret")
|
|
||||||
entry = password_database.find_entries(title="test_entry", first=True)
|
|
||||||
assert isinstance(entry, pykeepass.entry.Entry)
|
|
||||||
assert entry.group.is_root_group is True
|
|
||||||
|
|
||||||
context.set_secret_group("test_entry", "test_group")
|
|
||||||
|
|
||||||
entry = password_database.find_entries(title="test_entry", first=True)
|
|
||||||
assert isinstance(entry, pykeepass.entry.Entry)
|
|
||||||
|
|
||||||
assert entry.group.is_root_group is False
|
|
||||||
assert entry.group.name == "test_group"
|
|
||||||
|
|
||||||
context.set_secret_group("test_entry", "test_group_2")
|
|
||||||
|
|
||||||
entry = password_database.find_entries(title="test_entry", first=True)
|
|
||||||
assert isinstance(entry, pykeepass.entry.Entry)
|
|
||||||
|
|
||||||
assert entry.group.is_root_group is False
|
|
||||||
assert entry.group.name == "test_group_2"
|
|
||||||
|
|
||||||
context.set_secret_group("test_entry", None)
|
|
||||||
|
|
||||||
entry = password_database.find_entries(title="test_entry", first=True)
|
|
||||||
assert isinstance(entry, pykeepass.entry.Entry)
|
|
||||||
assert entry.group.is_root_group is True
|
|
||||||
|
|
||||||
|
|
||||||
def test_move_group(password_database: pykeepass.PyKeePass) -> None:
|
|
||||||
"""Test moving a group."""
|
|
||||||
context = PasswordContext(password_database)
|
|
||||||
context.add_group("test_group", "A test group")
|
|
||||||
context.add_group("parent_group", "A parent group")
|
|
||||||
context.move_group("test_group", "parent_group")
|
|
||||||
|
|
||||||
kp_group = password_database.find_groups(name="test_group", first=True)
|
|
||||||
assert isinstance(kp_group, pykeepass.group.Group)
|
|
||||||
assert kp_group.parentgroup.name == "parent_group"
|
|
||||||
|
|
||||||
|
|
||||||
def test_delete_group(password_database: pykeepass.PyKeePass) -> None:
|
|
||||||
"""Test group deletion."""
|
|
||||||
context = PasswordContext(password_database)
|
|
||||||
context.add_group("test_group", "A test group")
|
|
||||||
# Add some entries to this group.
|
|
||||||
kp_group = password_database.find_groups(name="test_group", first=True)
|
|
||||||
assert isinstance(kp_group, pykeepass.group.Group)
|
|
||||||
create_random_entries(password_database, amount=10, group=kp_group)
|
|
||||||
|
|
||||||
context.delete_group("test_group")
|
|
||||||
kp_group = password_database.find_groups(name="test_group", first=True)
|
|
||||||
assert kp_group is None
|
|
||||||
|
|
||||||
# Check if the secrets are still there.
|
|
||||||
secrets = context.get_available_secrets()
|
|
||||||
assert len(secrets) == 10
|
|
||||||
|
|
||||||
|
|
||||||
def test_get_specific_group(password_database: pykeepass.PyKeePass) -> None:
|
|
||||||
"""Test fetching a specific group."""
|
|
||||||
context = PasswordContext(password_database)
|
|
||||||
context.add_group("parent", "A parent group")
|
|
||||||
context.add_group("test_group", "A test group", "parent")
|
|
||||||
context.add_group("test_group_2", "A test group")
|
|
||||||
context.add_group("test_group_3", "A test group")
|
|
||||||
context.add_group("Other Group", "A test group")
|
|
||||||
results = context.get_secret_groups("test_group", False)
|
|
||||||
assert len(results) == 1
|
|
||||||
# Check if the parent reference is available.
|
|
||||||
assert results[0].parent_group is not None
|
|
||||||
|
|
||||||
|
|
||||||
def test_get_ungrouped_secrets(password_database: pykeepass.PyKeePass) -> None:
|
|
||||||
"""Test fetching secrets without groups."""
|
|
||||||
context = PasswordContext(password_database)
|
|
||||||
context.add_group("test_group", "A test group")
|
|
||||||
for n in range(7):
|
|
||||||
context.add_entry(f"grouped-{n}", "foo", group_name="test_group")
|
|
||||||
|
|
||||||
for n in range(5):
|
|
||||||
context.add_entry(f"ungrouped-{n}", "bar")
|
|
||||||
|
|
||||||
ungrouped = context.get_ungrouped_secrets()
|
|
||||||
assert len(ungrouped) == 5
|
|
||||||
for entry in ungrouped:
|
|
||||||
assert entry.startswith("ungrouped-")
|
|
||||||
@ -1,26 +0,0 @@
|
|||||||
"""Tests various error types."""
|
|
||||||
|
|
||||||
from pathlib import Path
|
|
||||||
import pykeepass
|
|
||||||
import pykeepass.exceptions
|
|
||||||
import pytest
|
|
||||||
|
|
||||||
from sshecret_admin.services.keepass import _password_context, PasswordCredentialsError
|
|
||||||
|
|
||||||
def test_open_invalid_database() -> None:
|
|
||||||
"""Test opening a non-existing database."""
|
|
||||||
bogus_path = Path("/tmp/non/existing/password/database.kdbx")
|
|
||||||
|
|
||||||
with pytest.raises(FileNotFoundError):
|
|
||||||
with _password_context(bogus_path, "foobar") as context:
|
|
||||||
assert context is not None
|
|
||||||
|
|
||||||
|
|
||||||
def test_incorrect_password(tmp_path: Path) -> None:
|
|
||||||
"""Test opening database with incorrect password."""
|
|
||||||
filename = tmp_path / "db.kdbx"
|
|
||||||
pykeepass.create_database(str(filename), password="correct")
|
|
||||||
|
|
||||||
with pytest.raises(PasswordCredentialsError):
|
|
||||||
with _password_context(filename, "incorrect") as context:
|
|
||||||
assert context is not None
|
|
||||||
@ -3,6 +3,8 @@
|
|||||||
<head>
|
<head>
|
||||||
<meta charset="UTF-8" />
|
<meta charset="UTF-8" />
|
||||||
<link rel="icon" href="/favicon.ico" />
|
<link rel="icon" href="/favicon.ico" />
|
||||||
|
<link rel="icon" href="/favicon.svg" sizes="any" type="image/svg+xml" />
|
||||||
|
|
||||||
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
|
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
|
||||||
<title>Sshecret Admin</title>
|
<title>Sshecret Admin</title>
|
||||||
</head>
|
</head>
|
||||||
|
|||||||
Binary file not shown.
|
Before Width: | Height: | Size: 4.2 KiB After Width: | Height: | Size: 18 KiB |
26
packages/sshecret-frontend/public/favicon.svg
Normal file
26
packages/sshecret-frontend/public/favicon.svg
Normal file
@ -0,0 +1,26 @@
|
|||||||
|
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
|
||||||
|
<!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.1//EN" "http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd">
|
||||||
|
<svg width="100%" height="100%" viewBox="0 0 64 64" version="1.1" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" xml:space="preserve" xmlns:serif="http://www.serif.com/" style="fill-rule:evenodd;clip-rule:evenodd;stroke-linejoin:round;stroke-miterlimit:2;">
|
||||||
|
<rect id="Artboard1" x="0" y="0" width="64" height="64" style="fill:none;"/>
|
||||||
|
<g id="Artboard11" serif:id="Artboard1">
|
||||||
|
<g id="grid">
|
||||||
|
<rect x="0" y="0" width="64" height="64" style="fill:none;"/>
|
||||||
|
<g transform="matrix(0.17843,0,0,0.17843,-64.901,-47.058)">
|
||||||
|
<g transform="matrix(1,0,0,1,8.07567,42.4671)">
|
||||||
|
<path d="M535,250L561.806,335L613.462,308.734L605.179,367.467L691.924,367.467L621.746,420L661.954,462.5L605.179,472.533L631.985,557.533L561.806,505L535,557.533L508.194,505L438.015,557.533L464.821,472.533L408.046,462.5L448.254,420L378.076,367.467L464.821,367.467L456.538,308.734L508.194,335L535,250Z"/>
|
||||||
|
</g>
|
||||||
|
<g transform="matrix(0.8125,0,0,0.8125,331.826,19.4716)">
|
||||||
|
<path d="M246.833,633.448C236.413,655.429 209.771,670 180,670L180,557.533C209.771,557.533 236.413,572.104 246.833,594.085L275.336,594.085C285.418,572.104 311.195,557.533 340,557.533L340,670C311.195,670 285.418,655.429 275.336,633.448L246.833,633.448Z" style="fill:rgb(241,90,36);"/>
|
||||||
|
</g>
|
||||||
|
<g transform="matrix(1,0,0,1,258.076,2.46711)">
|
||||||
|
<ellipse cx="250" cy="415" rx="20" ry="15" style="fill:white;"/>
|
||||||
|
</g>
|
||||||
|
<g transform="matrix(1,0,0,1,328.076,2.46711)">
|
||||||
|
<ellipse cx="250" cy="415" rx="20" ry="15" style="fill:white;"/>
|
||||||
|
</g>
|
||||||
|
</g>
|
||||||
|
</g>
|
||||||
|
<g id="icon">
|
||||||
|
</g>
|
||||||
|
</g>
|
||||||
|
</svg>
|
||||||
|
After Width: | Height: | Size: 1.9 KiB |
@ -1 +0,0 @@
|
|||||||
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 261.76 226.69"><path d="M161.096.001l-30.225 52.351L100.647.001H-.005l130.877 226.688L261.749.001z" fill="#41b883"/><path d="M161.096.001l-30.225 52.351L100.647.001H52.346l78.526 136.01L209.398.001z" fill="#34495e"/></svg>
|
|
||||||
|
Before Width: | Height: | Size: 276 B |
106
packages/sshecret-sshd/src/sshecret_sshd/sftp_server.py
Normal file
106
packages/sshecret-sshd/src/sshecret_sshd/sftp_server.py
Normal file
@ -0,0 +1,106 @@
|
|||||||
|
"""SFTP Server."""
|
||||||
|
|
||||||
|
from collections.abc import AsyncIterator
|
||||||
|
from typing import cast, override
|
||||||
|
from pathlib import Path
|
||||||
|
import asyncssh
|
||||||
|
|
||||||
|
|
||||||
|
from sshecret.backend.api import SshecretBackend
|
||||||
|
from sshecret.backend.models import Client
|
||||||
|
|
||||||
|
READ_ONLY = asyncssh.FXF_READ
|
||||||
|
|
||||||
|
|
||||||
|
def get_connection(channel: asyncssh.SSHServerChannel[bytes]) -> asyncssh.SSHServerConnection:
|
||||||
|
"""Get the connection."""
|
||||||
|
conn = channel.get_extra_info("connection")
|
||||||
|
assert conn is not None
|
||||||
|
return cast(asyncssh.SSHServerConnection, conn)
|
||||||
|
|
||||||
|
def get_backend(connection: asyncssh.SSHServerConnection) -> SshecretBackend:
|
||||||
|
"""Get backend from connection."""
|
||||||
|
backend = connection.get_extra_info("backend")
|
||||||
|
assert backend is not None
|
||||||
|
return cast(SshecretBackend, backend)
|
||||||
|
|
||||||
|
def get_client(connection: asyncssh.SSHServerConnection) -> Client:
|
||||||
|
"""Get client."""
|
||||||
|
client = connection.get_extra_info("client")
|
||||||
|
assert client is not None
|
||||||
|
return cast(Client, client)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
class SshecretFileServer(asyncssh.SFTPServer):
|
||||||
|
"""File server."""
|
||||||
|
|
||||||
|
@override
|
||||||
|
def __init__(self, chan: asyncssh.SSHServerChannel[bytes], chroot: bytes | None = None):
|
||||||
|
"""Initialize sftp server."""
|
||||||
|
super().__init__(chan, "/".encode())
|
||||||
|
self._conn: asyncssh.SSHServerConnection = get_connection(chan)
|
||||||
|
self.backend: SshecretBackend = get_backend(self._conn)
|
||||||
|
|
||||||
|
self.logger.info("Connected")
|
||||||
|
|
||||||
|
@property
|
||||||
|
def client(self) -> Client:
|
||||||
|
"""Get client."""
|
||||||
|
return get_client(self._conn)
|
||||||
|
|
||||||
|
@override
|
||||||
|
async def scandir(self, path: bytes) -> AsyncIterator[asyncssh.SFTPName]:
|
||||||
|
"""List secrets as files."""
|
||||||
|
for secret in self.client.secrets:
|
||||||
|
sftp_name = asyncssh.SFTPName(secret.encode(), READ_ONLY)
|
||||||
|
self.format_longname(sftp_name)
|
||||||
|
yield sftp_name
|
||||||
|
|
||||||
|
@override
|
||||||
|
def format_longname(self, name: asyncssh.SFTPName) -> None:
|
||||||
|
mode = "-r--------"
|
||||||
|
user = self.client.name
|
||||||
|
group = "sshecret_clients"
|
||||||
|
size = ''
|
||||||
|
modtime = ''
|
||||||
|
nlink = ''
|
||||||
|
|
||||||
|
detail = (
|
||||||
|
f'{mode:10s} {nlink:>4s} {user:8s} {group:8s} '
|
||||||
|
f'{size:>8s} {modtime:12s} '
|
||||||
|
)
|
||||||
|
|
||||||
|
name.longname = detail.encode('utf-8') + cast(bytes, name.filename)
|
||||||
|
|
||||||
|
@override
|
||||||
|
def format_user(self, uid: int | None) -> str:
|
||||||
|
return self.client.name
|
||||||
|
|
||||||
|
@override
|
||||||
|
def format_group(self, gid: int | None) -> str:
|
||||||
|
return "clients"
|
||||||
|
|
||||||
|
|
||||||
|
@override
|
||||||
|
async def open(self, path: bytes, pflags: int, attrs: asyncssh.SFTPAttrs) -> object:
|
||||||
|
"""Open file.
|
||||||
|
|
||||||
|
This is the tricky bit. We need to disallow writes, and we need to serve
|
||||||
|
an encrypted secret as a file.
|
||||||
|
"""
|
||||||
|
if pflags != READ_ONLY:
|
||||||
|
raise asyncssh.SFTPError(asyncssh.FX_WRITE_PROTECT, "Read write not supported")
|
||||||
|
filepath = path.decode()
|
||||||
|
secret_name = Path(filepath).name
|
||||||
|
if secret_name not in self.client.secrets:
|
||||||
|
raise asyncssh.SFTPError(asyncssh.FX_NO_SUCH_FILE, "No such secret")
|
||||||
|
secret = await self.backend.get_client_secret(("id", str(self.client.id)), secret_name)
|
||||||
|
if not secret:
|
||||||
|
raise asyncssh.SFTPError(asyncssh.FX_NO_SUCH_FILE, "No such secret")
|
||||||
|
# io.BytesIO
|
||||||
|
|
||||||
|
|
||||||
|
@override
|
||||||
|
async def close(self, file_obj: object) -> None:
|
||||||
|
"""Close the file object."""
|
||||||
@ -16,6 +16,7 @@ from sshecret_sshd.commands import dispatch_command
|
|||||||
|
|
||||||
from sshecret.backend import SshecretBackend, Client, Operation, SubSystem
|
from sshecret.backend import SshecretBackend, Client, Operation, SubSystem
|
||||||
from .settings import ServerSettings, ClientRegistrationSettings
|
from .settings import ServerSettings, ClientRegistrationSettings
|
||||||
|
from .sftp_server import SshecretFileServer
|
||||||
|
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
@ -222,6 +223,7 @@ async def run_ssh_server(
|
|||||||
port,
|
port,
|
||||||
server_host_keys=keys,
|
server_host_keys=keys,
|
||||||
process_factory=dispatch_command,
|
process_factory=dispatch_command,
|
||||||
|
sftp_factory=SshecretFileServer,
|
||||||
)
|
)
|
||||||
return server
|
return server
|
||||||
|
|
||||||
|
|||||||
@ -19,8 +19,8 @@ async def test_help_command(ssh_command_runner: CommandRunner, client_registry:
|
|||||||
|
|
||||||
re_cmd = re.compile(r"^\s{2}([^\s]+)\s.*", re.MULTILINE)
|
re_cmd = re.compile(r"^\s{2}([^\s]+)\s.*", re.MULTILINE)
|
||||||
commands = re_cmd.findall(result.stdout)
|
commands = re_cmd.findall(result.stdout)
|
||||||
expected = ['get_secret', 'ls', 'ping']
|
expected = ["get_secret", "ls", "ping", "list", "store", "lookup", "delete"]
|
||||||
assert commands == expected
|
assert sorted(commands) == sorted(expected)
|
||||||
|
|
||||||
print(result.stdout)
|
print(result.stdout)
|
||||||
|
|
||||||
@ -39,7 +39,7 @@ async def test_help_registration_command(ssh_command_runner: CommandRunner, clie
|
|||||||
|
|
||||||
re_cmd = re.compile(r"^\s{2}([^\s]+)\s.*", re.MULTILINE)
|
re_cmd = re.compile(r"^\s{2}([^\s]+)\s.*", re.MULTILINE)
|
||||||
commands = re_cmd.findall(result.stdout)
|
commands = re_cmd.findall(result.stdout)
|
||||||
expected = ["get_secret", "ls", "register", "ping"]
|
expected = ["get_secret", "ls", "register", "ping", "list", "store", "lookup", "delete"]
|
||||||
assert sorted(commands) == sorted(expected)
|
assert sorted(commands) == sorted(expected)
|
||||||
|
|
||||||
print(result.stdout)
|
print(result.stdout)
|
||||||
|
|||||||
Reference in New Issue
Block a user