|
|
|
|
@ -1,106 +0,0 @@
|
|
|
|
|
"""SFTP Server."""
|
|
|
|
|
|
|
|
|
|
from collections.abc import AsyncIterator
|
|
|
|
|
from typing import cast, override
|
|
|
|
|
from pathlib import Path
|
|
|
|
|
import asyncssh
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from sshecret.backend.api import SshecretBackend
|
|
|
|
|
from sshecret.backend.models import Client
|
|
|
|
|
|
|
|
|
|
READ_ONLY = asyncssh.FXF_READ
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_connection(channel: asyncssh.SSHServerChannel[bytes]) -> asyncssh.SSHServerConnection:
|
|
|
|
|
"""Get the connection."""
|
|
|
|
|
conn = channel.get_extra_info("connection")
|
|
|
|
|
assert conn is not None
|
|
|
|
|
return cast(asyncssh.SSHServerConnection, conn)
|
|
|
|
|
|
|
|
|
|
def get_backend(connection: asyncssh.SSHServerConnection) -> SshecretBackend:
|
|
|
|
|
"""Get backend from connection."""
|
|
|
|
|
backend = connection.get_extra_info("backend")
|
|
|
|
|
assert backend is not None
|
|
|
|
|
return cast(SshecretBackend, backend)
|
|
|
|
|
|
|
|
|
|
def get_client(connection: asyncssh.SSHServerConnection) -> Client:
|
|
|
|
|
"""Get client."""
|
|
|
|
|
client = connection.get_extra_info("client")
|
|
|
|
|
assert client is not None
|
|
|
|
|
return cast(Client, client)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class SshecretFileServer(asyncssh.SFTPServer):
|
|
|
|
|
"""File server."""
|
|
|
|
|
|
|
|
|
|
@override
|
|
|
|
|
def __init__(self, chan: asyncssh.SSHServerChannel[bytes], chroot: bytes | None = None):
|
|
|
|
|
"""Initialize sftp server."""
|
|
|
|
|
super().__init__(chan, "/".encode())
|
|
|
|
|
self._conn: asyncssh.SSHServerConnection = get_connection(chan)
|
|
|
|
|
self.backend: SshecretBackend = get_backend(self._conn)
|
|
|
|
|
|
|
|
|
|
self.logger.info("Connected")
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def client(self) -> Client:
|
|
|
|
|
"""Get client."""
|
|
|
|
|
return get_client(self._conn)
|
|
|
|
|
|
|
|
|
|
@override
|
|
|
|
|
async def scandir(self, path: bytes) -> AsyncIterator[asyncssh.SFTPName]:
|
|
|
|
|
"""List secrets as files."""
|
|
|
|
|
for secret in self.client.secrets:
|
|
|
|
|
sftp_name = asyncssh.SFTPName(secret.encode(), READ_ONLY)
|
|
|
|
|
self.format_longname(sftp_name)
|
|
|
|
|
yield sftp_name
|
|
|
|
|
|
|
|
|
|
@override
|
|
|
|
|
def format_longname(self, name: asyncssh.SFTPName) -> None:
|
|
|
|
|
mode = "-r--------"
|
|
|
|
|
user = self.client.name
|
|
|
|
|
group = "sshecret_clients"
|
|
|
|
|
size = ''
|
|
|
|
|
modtime = ''
|
|
|
|
|
nlink = ''
|
|
|
|
|
|
|
|
|
|
detail = (
|
|
|
|
|
f'{mode:10s} {nlink:>4s} {user:8s} {group:8s} '
|
|
|
|
|
f'{size:>8s} {modtime:12s} '
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
name.longname = detail.encode('utf-8') + cast(bytes, name.filename)
|
|
|
|
|
|
|
|
|
|
@override
|
|
|
|
|
def format_user(self, uid: int | None) -> str:
|
|
|
|
|
return self.client.name
|
|
|
|
|
|
|
|
|
|
@override
|
|
|
|
|
def format_group(self, gid: int | None) -> str:
|
|
|
|
|
return "clients"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@override
|
|
|
|
|
async def open(self, path: bytes, pflags: int, attrs: asyncssh.SFTPAttrs) -> object:
|
|
|
|
|
"""Open file.
|
|
|
|
|
|
|
|
|
|
This is the tricky bit. We need to disallow writes, and we need to serve
|
|
|
|
|
an encrypted secret as a file.
|
|
|
|
|
"""
|
|
|
|
|
if pflags != READ_ONLY:
|
|
|
|
|
raise asyncssh.SFTPError(asyncssh.FX_WRITE_PROTECT, "Read write not supported")
|
|
|
|
|
filepath = path.decode()
|
|
|
|
|
secret_name = Path(filepath).name
|
|
|
|
|
if secret_name not in self.client.secrets:
|
|
|
|
|
raise asyncssh.SFTPError(asyncssh.FX_NO_SUCH_FILE, "No such secret")
|
|
|
|
|
secret = await self.backend.get_client_secret(("id", str(self.client.id)), secret_name)
|
|
|
|
|
if not secret:
|
|
|
|
|
raise asyncssh.SFTPError(asyncssh.FX_NO_SUCH_FILE, "No such secret")
|
|
|
|
|
# io.BytesIO
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@override
|
|
|
|
|
async def close(self, file_obj: object) -> None:
|
|
|
|
|
"""Close the file object."""
|