From 046f65b1709dc776276753b32f6d8aad32005664 Mon Sep 17 00:00:00 2001 From: Jason Rhinelander Date: Mon, 29 Sep 2025 14:59:29 -0300 Subject: [PATCH 1/2] QUIC file server This adds a QUIC-based implementation of file upload and download that uploads and downloads files on streams, thus allowing both parallel downloads/uploads, as well as allowing more efficient storage as streamed data can be sent piece-by-piece as it loads from disk, rather than needing large all-at-once reads and writes. This implementation makes heavy use of io_uring for efficiency, and as such will never work outside Linux. Each upload or download uses exactly one stream (skipping the first stream): an upload is prefixed with a bt-encoded file metadata block (containing the "PUT" request type, size, and optional ttl), while a download request consists of the same sort of block but containing "GET" and the file id. For PUT, all data on the stream after the initial block is the file data, terminated by a FIN on the stream. For GET, the FIN is sent immediately after the request block. For responses, PUT consists of a metadata block containing the uploaded id, expiry, and upload info metadata. GET consists of a metadata block (containing size+expiry+upload info), which is immediately followed by the stream data and FIN. (Thus PUT/GET responses are analogous to the GET/PUT request data in the opposite direction). This initial commit supports read, write, and automatic expiry from disk; the following commits will add support for the non-transfer endpoints (such as updating expiries, querying info, and session version retrieval), which will all happen via the initial (id=0) bt request stream. --- .gitignore | 3 + .gitmodules | 9 + quic/CLI11 | 1 + quic/CMakeLists.txt | 71 ++++ quic/cleanup.cpp | 139 ++++++++ quic/cleanup.hpp | 15 + quic/cmake/submodule_check.cmake | 31 ++ quic/cmake/system_or_submodule.cmake | 23 ++ quic/common.hpp | 118 +++++++ quic/get.cpp | 245 ++++++++++++++ quic/libpqxx | 1 + quic/oxen-libquic | 1 + quic/put.cpp | 462 +++++++++++++++++++++++++++ quic/quic-files.cpp | 202 ++++++++++++ quic/requests.cpp | 386 ++++++++++++++++++++++ quic/requests.hpp | 356 +++++++++++++++++++++ quic/test/test-download.cpp | 232 ++++++++++++++ quic/test/test-upload.cpp | 209 ++++++++++++ 18 files changed, 2504 insertions(+) create mode 160000 quic/CLI11 create mode 100644 quic/CMakeLists.txt create mode 100644 quic/cleanup.cpp create mode 100644 quic/cleanup.hpp create mode 100644 quic/cmake/submodule_check.cmake create mode 100644 quic/cmake/system_or_submodule.cmake create mode 100644 quic/common.hpp create mode 100644 quic/get.cpp create mode 160000 quic/libpqxx create mode 160000 quic/oxen-libquic create mode 100644 quic/put.cpp create mode 100644 quic/quic-files.cpp create mode 100644 quic/requests.cpp create mode 100644 quic/requests.hpp create mode 100644 quic/test/test-download.cpp create mode 100644 quic/test/test-upload.cpp diff --git a/.gitignore b/.gitignore index de1895d..a2464a6 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,6 @@ __pycache__ /key_x25519 .vscode .venv +/quic/build* +/quic/compile_commands.json +/quic/.cache diff --git a/.gitmodules b/.gitmodules index e69de29..28810ca 100644 --- a/.gitmodules +++ b/.gitmodules @@ -0,0 +1,9 @@ +[submodule "quic/oxen-libquic"] + path = quic/oxen-libquic + url = https://github.com/oxen-io/oxen-libquic.git +[submodule "quic/libpqxx"] + path = quic/libpqxx + url = https://github.com/jtv/libpqxx.git +[submodule "quic/CLI11"] + path = quic/CLI11 + url = https://github.com/CLIUtils/CLI11.git diff --git a/quic/CLI11 b/quic/CLI11 new file mode 160000 index 0000000..5248d5b --- /dev/null +++ b/quic/CLI11 @@ -0,0 +1 @@ +Subproject commit 5248d5b35271a76abf9b27639fc3ae32458e9dff diff --git a/quic/CMakeLists.txt b/quic/CMakeLists.txt new file mode 100644 index 0000000..43afd32 --- /dev/null +++ b/quic/CMakeLists.txt @@ -0,0 +1,71 @@ +cmake_minimum_required(VERSION 3.20...3.31) + +set(CMAKE_EXPORT_COMPILE_COMMANDS ON) + +find_program(CCACHE_PROGRAM ccache) +if(CCACHE_PROGRAM) + foreach(lang C CXX) + if(NOT DEFINED CMAKE_${lang}_COMPILER_LAUNCHER AND NOT CMAKE_${lang}_COMPILER MATCHES ".*/ccache") + message(STATUS "Enabling ccache for ${lang}") + set(CMAKE_${lang}_COMPILER_LAUNCHER ${CCACHE_PROGRAM} CACHE STRING "") + endif() + endforeach() +endif() + +project(quic-files + VERSION 0.0.1 + LANGUAGES CXX) + +set(CMAKE_CXX_STANDARD 20) +set(CMAKE_CXX_STANDARD_REQUIRED TRUE) +set(CMAKE_CXX_EXTENSIONS FALSE) + +include(cmake/submodule_check.cmake) +include(cmake/system_or_submodule.cmake) + +find_package(PkgConfig REQUIRED) + +system_or_submodule(OXENC oxenc oxenc::oxenc liboxenc>=1.4.0 oxen-libquic/external/oxen-encoding) +system_or_submodule(OXENLOGGING oxen-logging oxen::logging liboxen-logging>=1.2.0 oxen-libquic/external/oxen-logging) +system_or_submodule(FMT fmt fmt::fmt fmt>=9 oxen-libquic/external/oxen-logging/fmt) +system_or_submodule(LIBPQXX libpqxx libpqxx::pqxx libpqxx>=7.10.0 libpqxx) +system_or_submodule(CLI11 CLI11 CLI11::CLI11 CLI11>=2.3.0 CLI11) +system_or_submodule(OXENQUIC quic oxen::quic liboxenquic>=1.6 oxen-libquic) + +pkg_check_modules(NLOHMANN_JSON IMPORTED_TARGET REQUIRED nlohmann_json>=3.7.0) +pkg_check_modules(SODIUM IMPORTED_TARGET REQUIRED libsodium>=1.0.17) +pkg_check_modules(LIBURING IMPORTED_TARGET REQUIRED liburing>=2.3) + +add_executable(quic-files + quic-files.cpp + requests.cpp + put.cpp + get.cpp + cleanup.cpp +) + +add_library(quic-files-common INTERFACE) +target_link_libraries(quic-files-common INTERFACE + oxenc::oxenc + oxen::logging + oxen::quic + fmt::fmt +) + +target_link_libraries(quic-files + PRIVATE + quic-files-common + PkgConfig::NLOHMANN_JSON + PkgConfig::SODIUM + libpqxx::pqxx + CLI11::CLI11 + PkgConfig::LIBURING +) + +option(BUILD_TESTS "Build upload/download test programs" ON) +if(BUILD_TESTS) + add_executable(quic-upload test/test-upload.cpp) + target_link_libraries(quic-upload PRIVATE quic-files-common) + add_executable(quic-download test/test-download.cpp) + target_link_libraries(quic-download PRIVATE quic-files-common) +endif() diff --git a/quic/cleanup.cpp b/quic/cleanup.cpp new file mode 100644 index 0000000..a0e9024 --- /dev/null +++ b/quic/cleanup.cpp @@ -0,0 +1,139 @@ +#include "cleanup.hpp" + +#include + +#include +#include +#include + +#include "common.hpp" // IWYU pragma: keep + +namespace sfs { + +auto logcat = log::Cat("files.cleanup"); + +namespace { + class Cleaner { + io_uring iou; + int files_dir_fd; + pqxx::connection conn; + std::future stop; + + public: + Cleaner(const std::filesystem::path& files_dir, + std::string pgsql_uri, + std::future stop) : + conn{pgsql_uri}, stop{std::move(stop)} { + std::filesystem::create_directories(files_dir); + files_dir_fd = open(files_dir.c_str(), O_PATH | O_DIRECTORY); + if (files_dir_fd < 0) + throw std::runtime_error{ + "Unable to open files path {}: {}"_format(files_dir, strerror(errno))}; + + if (int err = io_uring_queue_init(128, &iou, IORING_SETUP_SINGLE_ISSUER); err != 0) { + close(files_dir_fd); + throw std::runtime_error{ + "Failed to initialize io_uring queue: {}"_format(strerror(-err))}; + } + } + + void cleanup() { + pg_retryable([this] { + pqxx::work tx{conn}; + + auto started = std::chrono::steady_clock::now(); + std::vector removed; + unsigned submitted = 0; + for (auto r : tx.exec("DELETE FROM files WHERE expiry <= NOW() RETURNING id")) { + removed.push_back(id_to_path(r[0].as())); + auto& filepath = removed.back(); + + auto* sqe = io_uring_get_sqe(&iou); + if (!sqe) { + io_uring_submit(&iou); + sqe = io_uring_get_sqe(&iou); + } + + io_uring_sqe_set_flags(sqe, 0); + io_uring_sqe_set_data64(sqe, removed.size() - 1); + io_uring_prep_unlinkat(sqe, files_dir_fd, filepath.c_str(), 0); + submitted++; + } + io_uring_submit(&iou); + + bool success = true; + + while (submitted > 0) { + unsigned nr = std::min(iou.cq.ring_entries, submitted); + struct io_uring_cqe* cqe; + io_uring_wait_cqe_nr(&iou, &cqe, nr); + + unsigned head; + io_uring_for_each_cqe(&iou, head, cqe) { + const auto& id = removed[io_uring_cqe_get_data64(cqe)]; + if (cqe->res < 0) { + if (cqe->res == -ENOENT) + log::debug(logcat, "Failed to remove {}: file already gone", id); + else { + log::warning( + logcat, "Failed to remove {}: {}", id, strerror(-cqe->res)); + success = false; + } + } else { + log::debug(logcat, "Removed expired file {}", id); + } + } + + io_uring_cq_advance(&iou, nr); + + submitted -= nr; + } + + if (success) { + log::info( + logcat, + "Deleted {} expired files in {}", + removed.size(), + std::chrono::steady_clock::now() - started); + tx.commit(); + } else + tx.abort(); + }); + } + + bool wait() { return stop.wait_for(30s) == std::future_status::timeout; } + }; +} // namespace + +std::thread start_cleanup_thread( + const std::string& pgsql_uri, + const std::filesystem::path& files_dir, + std::future stop) { + + std::promise started_prom; + auto started = started_prom.get_future(); + std::thread th{[&started_prom, &files_dir, &pgsql_uri, &stop] { + std::optional cleaner; + try { + cleaner.emplace(files_dir, std::move(pgsql_uri), std::move(stop)); + } catch (...) { + started_prom.set_exception(std::current_exception()); + } + + started_prom.set_value(); + + do { + try { + cleaner->cleanup(); + } catch (const std::exception& e) { + log::error(logcat, "Cleanup failed: {}", e.what()); + } + } while (cleaner->wait()); + }}; + + started.get(); + + return th; +} + +} // namespace sfs diff --git a/quic/cleanup.hpp b/quic/cleanup.hpp new file mode 100644 index 0000000..91831db --- /dev/null +++ b/quic/cleanup.hpp @@ -0,0 +1,15 @@ +#pragma once + +#include +#include +#include +#include + +namespace sfs { + +std::thread start_cleanup_thread( + const std::string& pgsql_uri, + const std::filesystem::path& files_dir, + std::future stop); + +} diff --git a/quic/cmake/submodule_check.cmake b/quic/cmake/submodule_check.cmake new file mode 100644 index 0000000..422f892 --- /dev/null +++ b/quic/cmake/submodule_check.cmake @@ -0,0 +1,31 @@ +option(SUBMODULE_CHECK "Enables checking that vendored library submodules are up to date" ON) +if(SUBMODULE_CHECK) + find_package(Git) + if(GIT_FOUND) + function(check_submodule relative_path) + execute_process(COMMAND git rev-parse "HEAD" WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/${relative_path} OUTPUT_VARIABLE localHead) + execute_process(COMMAND git rev-parse "HEAD:quic/${relative_path}" WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} OUTPUT_VARIABLE checkedHead) + string(COMPARE EQUAL "${localHead}" "${checkedHead}" upToDate) + if (upToDate) + message(STATUS "Submodule 'quic/${relative_path}' is up-to-date") + else() + message(FATAL_ERROR "Submodule 'quic/${relative_path}' is not up-to-date. Please update with\ngit submodule update --init --recursive\nor run cmake with -DSUBMODULE_CHECK=OFF") + endif() + + # Extra arguments check nested submodules + foreach(submod ${ARGN}) + execute_process(COMMAND git rev-parse "HEAD" WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/${relative_path}/${submod} OUTPUT_VARIABLE localHead) + execute_process(COMMAND git rev-parse "HEAD:${submod}" WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/${relative_path} OUTPUT_VARIABLE checkedHead) + string(COMPARE EQUAL "${localHead}" "${checkedHead}" upToDate) + if (NOT upToDate) + message(FATAL_ERROR "Nested submodule '${relative_path}/${submod}' is not up-to-date. Please update with\ngit submodule update --init --recursive\nor run cmake with -DSUBMODULE_CHECK=OFF") + endif() + endforeach() + endfunction () + + message(STATUS "Checking submodules") + check_submodule(libpqxx) + check_submodule(oxen-libquic external/oxen-encoding external/oxen-logging) + endif() +endif() + diff --git a/quic/cmake/system_or_submodule.cmake b/quic/cmake/system_or_submodule.cmake new file mode 100644 index 0000000..d87b889 --- /dev/null +++ b/quic/cmake/system_or_submodule.cmake @@ -0,0 +1,23 @@ +macro(system_or_submodule BIGNAME smallname target pkgconf subdir) + if(NOT TARGET ${target}) + option(FORCE_${BIGNAME}_SUBMODULE "force using ${smallname} submodule" OFF) + if(NOT BUILD_STATIC_DEPS AND NOT FORCE_${BIGNAME}_SUBMODULE AND NOT FORCE_ALL_SUBMODULES) + pkg_check_modules(${BIGNAME} ${pkgconf} IMPORTED_TARGET) + endif() + if(${BIGNAME}_FOUND) + add_library(${smallname} INTERFACE) + if(NOT TARGET PkgConfig::${BIGNAME} AND CMAKE_VERSION VERSION_LESS "3.21") + # Work around cmake bug 22180 (PkgConfig::THING not set if no flags needed) + else() + target_link_libraries(${smallname} INTERFACE PkgConfig::${BIGNAME}) + endif() + message(STATUS "Found system ${smallname} ${${BIGNAME}_VERSION}") + else() + message(STATUS "using ${smallname} submodule ${subdir}") + add_subdirectory(${subdir} EXCLUDE_FROM_ALL) + endif() + if(NOT TARGET ${target}) + add_library(${target} ALIAS ${smallname}) + endif() + endif() +endmacro() diff --git a/quic/common.hpp b/quic/common.hpp new file mode 100644 index 0000000..fb9a04b --- /dev/null +++ b/quic/common.hpp @@ -0,0 +1,118 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +namespace sfs { + +// NOLINTBEGIN(misc-unused-alias-decls) + +namespace log = oxen::log; + +using namespace std::literals; +using namespace log::literals; + +static constexpr auto b64_url_chars = + "-0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ_abcdefghijklmnopqrstuvwxyz"sv; + +// Wraps a call `c()` in a transaction and executes it. If we get a broken connection exception +// then the call is retried up to n_retries times before giving up and propagating the exception. +template +static void pg_retryable(Call c, const int n_retries = 10) { + int retries = n_retries; + while (retries > 0) { + try { + c(); + break; + } catch (const pqxx::broken_connection& e) { + if (retries--) + log::warning(log::Cat("files.db"), "Lost postgresql connection; retrying query..."); + else { + log::error( + log::Cat("files.db"), + "Postgresql connection still failing after {} retries, giving up", + n_retries); + throw; + } + } + } +} + +inline std::filesystem::path id_to_path(std::string_view fileid) { + std::filesystem::path p; + if (fileid.size() == 44) + p = std::filesystem::path{fileid.substr(0, 2)}; + else + // back compat numeric identifier, goes into 000, 001, ..., 999 based on % 1000 id value + p = std::filesystem::path{ + "{:0>3s}"_format(fileid.substr(fileid.size() < 3 ? 0 : fileid.size() - 3))}; + + p /= std::filesystem::path{fileid}; + return p; +} + +// Simple class that "explodes" (by calling a callback) if not "disarmed" before being +// destructed. Used to queue cleanup during partial construction. +class bomb { + public: + std::function explode; + + void disarm() { explode = nullptr; } + + ~bomb() { + if (explode) + explode(); + } +}; + +inline std::string friendly_duration(std::chrono::nanoseconds dur) { + std::string friendly; + auto append = std::back_inserter(friendly); + bool some = false; + if (dur >= 24h) { + fmt::format_to(append, "{}d", dur / 24h); + dur %= 24h; + some = true; + } + if (dur >= 1h || some) { + fmt::format_to(append, "{}h", dur / 1h); + dur %= 1h; + some = true; + } + if (dur >= 1min || some) { + fmt::format_to(append, "{}m", dur / 1min); + dur %= 1min; + some = true; + } + if (some || dur % 1s == 0ns) { + // If we have >= minutes or its an integer number of seconds then don't bother with + // fractional seconds + fmt::format_to(append, "{}s", dur / 1s); + } else { + double seconds = std::chrono::duration(dur).count(); + if (dur >= 1s) + fmt::format_to(append, "{:.3f}s", seconds); + else if (dur >= 1ms) + fmt::format_to(append, "{:.3f}ms", seconds * 1000); + else if (dur >= 1us) + fmt::format_to(append, "{:.3f}µs", seconds * 1'000'000); + else + fmt::format_to(append, "{:.0f}ns", seconds * 1'000'000'000); + } + return friendly; +} + +// NOLINTEND(misc-unused-alias-decls) + +} // namespace sfs diff --git a/quic/get.cpp b/quic/get.cpp new file mode 100644 index 0000000..4de281e --- /dev/null +++ b/quic/get.cpp @@ -0,0 +1,245 @@ +#include +#include + +#include "common.hpp" // IWYU pragma: keep +#include "requests.hpp" + +namespace sfs { + +static auto accesslog = log::Cat("access"); + +static auto logcat = log::Cat("files.get"); + +void FileStream::parse_get(oxenc::bt_dict_consumer&& d) { + auto id = d.require("#"); + if (!((id.size() == 44 && id.find_first_not_of(b64_url_chars) == std::string::npos) || + (id.size() >= 1 && id.size() <= 16 && + id.find_first_not_of("0123456789") == std::string::npos))) + throw std::runtime_error{"invalid id: expected 44 b64url or 1-16 decimal digits"}; + d.finish(); + + request.emplace(*this, std::move(id)); +} + +FileStream::get_req::get_req(FileStream& str, std::string id) : file_req{str} { + fileid = std::move(id); + + double upl, exp; + bool found = false; + try { + pg_retryable([&] { + pqxx::work tx{str.handler.pg_conn}; + auto row = tx.exec(R"( +SELECT EXTRACT(EPOCH FROM uploaded), EXTRACT(EPOCH FROM expiry) +FROM files WHERE id = $1)", + pqxx::params{fileid}) + .opt_row(); + + if (row) { + found = true; + std::tie(upl, exp) = row->as(); + } + + tx.commit(); + }); + } catch (const pqxx::failure& e) { + log::error(logcat, "Failed to query files table: {}", e.what()); + } + + if (found) { + expiry = std::chrono::sys_seconds{std::chrono::seconds{static_cast(exp)}}; + uploaded = std::chrono::sys_seconds{std::chrono::seconds{static_cast(upl)}}; + + if (expiry < std::chrono::system_clock::now()) + found = false; + } + + if (!found) { + if (log::get_level(accesslog) >= log::Level::info) { + if (auto conn = str.get_conn()) + log::info(accesslog, "GET {} NOT FOUND ({})", fileid, conn->remote()); + else + log::info(accesslog, "GET {} NOT FOUND ()", fileid); + } + str.close(STREAM_ERROR::not_found); + return; + } + + filepath = id_to_path(fileid); +} + +void FileStream::get_req::close() { + if (io_state != IO_STATE::none) { + auto* sqe = io_uring_get_sqe(&str.handler.iou); + io_uring_sqe_set_data64(sqe, 0); + io_uring_sqe_set_flags(sqe, IOSQE_CQE_SKIP_SUCCESS); + io_uring_prep_cancel64(sqe, str.fsid, 0); + io_uring_submit(&str.handler.iou); + } + + if (fd >= 0) { + auto* sqe = io_uring_get_sqe(&str.handler.iou); + io_uring_sqe_set_data64(sqe, 0); + io_uring_sqe_set_flags(sqe, IOSQE_CQE_SKIP_SUCCESS); + io_uring_prep_close_direct(sqe, fd); + io_uring_submit(&str.handler.iou); + } + + chunks.clear(); + io_state = IO_STATE::none; + fd = -1; +} + +void FileStream::get_req::handle_cqe(io_uring_cqe* cqe) { + assert(io_state != IO_STATE::none); + + if (str.is_closing()) { + log::debug(logcat, "Ignoring CQE on closing stream"); + chunks.clear(); + return; + } + + if (io_state == IO_STATE::statx) { + if (cqe->res < 0) { + log::error( + logcat, + "Failed to stat {}: {}; closing stream with I/O error code", + filepath, + strerror(-cqe->res)); + str.close(STREAM_ERROR::io_error); + return; + } + size = statxbuf.stx_size; + + io_state = IO_STATE::opening; // The open was chained immediately after the statx, so wait + // for it next. + + } else if (io_state == IO_STATE::opening) { + if (cqe->res < 0) { + log::error( + logcat, + "Failed to open {}: {}; closing stream with I/O error code", + filepath, + strerror(-cqe->res)); + str.close(STREAM_ERROR::io_error); + return; + } + fd = cqe->res; + + std::string buf{"XX:"}; // dummy value; we'll edit it to the correct size below + buf.resize(63); + + oxenc::bt_dict_producer meta{buf.data() + 3, buf.data() + buf.size()}; + meta.append("s", size); + meta.append("u", uploaded.time_since_epoch().count()); + meta.append("x", expiry.time_since_epoch().count()); + auto final_size = "{}:"_format(meta.view().size()); + assert(final_size.size() == 3); + std::memcpy(buf.data(), final_size.data(), final_size.size()); + buf.resize(3 + meta.view().size()); + str.send(std::move(buf)); + + io_state = IO_STATE::none; + + } else if (io_state == IO_STATE::reading) { // A read has finished and delivered the data to us + + if (chunks.size() == 1) // If 1 then this was the last outstanding read + io_state = IO_STATE::none; + // Otherwise there are more reads outstanding so leave the state as is + auto buf = std::move(chunks.front()); + chunks.pop_front(); + + if (cqe->res < 0) { + log::error( + logcat, + "Failed to open {}: {}; closing stream with I/O error code", + filepath, + strerror(-cqe->res)); + str.close(STREAM_ERROR::io_error); + close(); + return; + } + if (cqe->res > 0) { + if (cqe->res < static_cast(buf.size())) + buf.resize(cqe->res); + bytes_read += buf.size(); + str.send(std::move(buf)); + if (bytes_read >= size) { + str.send_fin(); + close(); + str.handler.overall.get(size); + } + } else { + eof = true; + close(); + if (bytes_read < size) { + log::error( + logcat, + "Hit EOF reading {} too early (read {}, expected {})", + filepath, + bytes_read, + size); + str.close(STREAM_ERROR::io_error); + return; + } + str.send_fin(); + } + + } else { + assert(!"Unknown state!"); + } + + queue_reads(); +} + +void FileStream::get_req::finalize() { + io_state = IO_STATE::statx; + + auto* sqe = io_uring_get_sqe(&str.handler.iou); + io_uring_sqe_set_data64(sqe, str.fsid); + io_uring_sqe_set_flags(sqe, IOSQE_IO_LINK); + io_uring_prep_statx(sqe, str.handler.files_dir_fd, filepath.c_str(), 0, STATX_SIZE, &statxbuf); + + sqe = io_uring_get_sqe(&str.handler.iou); + io_uring_sqe_set_data64(sqe, str.fsid); + io_uring_sqe_set_flags(sqe, 0); + io_uring_prep_openat_direct( + sqe, + str.handler.files_dir_fd, + filepath.c_str(), + O_RDONLY, + 0644, + IORING_FILE_INDEX_ALLOC); + + io_uring_submit(&str.handler.iou); +} + +void FileStream::get_req::queue_reads() { + if (io_state != IO_STATE::none || eof || bytes_read >= size) + return; + assert(chunks.empty()); + int64_t unsent = str.unsent(); + if (unsent >= READAHEAD) + return; + + // Submit a chain of smaller sequential reads; we do this rather than one big read so that we + // can hand earlier data off to our quic stream without having waiting for all data to be + // available. + int chunks_to_read = + (std::min(READAHEAD - unsent, size - bytes_read) + CHUNK_SIZE - 1) / + CHUNK_SIZE; + chunks.resize(chunks_to_read); + for (int i = 0; i < chunks_to_read; i++) { + auto& c = chunks[i]; + c.resize(CHUNK_SIZE); + auto* sqe = io_uring_get_sqe(&str.handler.iou); + io_uring_sqe_set_data64(sqe, str.fsid); + io_uring_sqe_set_flags( + sqe, IOSQE_FIXED_FILE | (i == chunks_to_read - 1 ? 0 : IOSQE_IO_LINK)); + io_uring_prep_read(sqe, fd, c.data(), CHUNK_SIZE, -1); + } + io_uring_submit(&str.handler.iou); + io_state = IO_STATE::reading; +} + +} // namespace sfs diff --git a/quic/libpqxx b/quic/libpqxx new file mode 160000 index 0000000..8e6fab3 --- /dev/null +++ b/quic/libpqxx @@ -0,0 +1 @@ +Subproject commit 8e6fab35069eee92ce6729f8a4f775df31342b75 diff --git a/quic/oxen-libquic b/quic/oxen-libquic new file mode 160000 index 0000000..afeaadd --- /dev/null +++ b/quic/oxen-libquic @@ -0,0 +1 @@ +Subproject commit afeaadd8c1ace940bd03a7d7c4d37c9e5db5dc6c diff --git a/quic/put.cpp b/quic/put.cpp new file mode 100644 index 0000000..e4c6bfe --- /dev/null +++ b/quic/put.cpp @@ -0,0 +1,462 @@ +#include + +#include "common.hpp" // IWYU pragma: keep +#include "requests.hpp" + +namespace sfs { + +static auto logcat = log::Cat("files.put"); +static auto accesslog = log::Cat("access"); + +constexpr auto FILE_ID_HASH_KEY = "SessionFileSvr\0\0"sv; + +FileStream::put_req::put_req(FileStream& s, size_t size_, std::optional ttl_) : + file_req{s}, ttl{std::move(ttl_)} { + + size = size_; + + crypto_generichash_blake2b_init( + &b2b, + reinterpret_cast(FILE_ID_HASH_KEY.data()), + FILE_ID_HASH_KEY.size(), + 33); +} + +void FileStream::put_req::append(std::span data) { + + if (got_all) { + log::critical(logcat, "Internal error: append called *after* finalize()!"); + str.close(STREAM_ERROR::too_much_data); + } + + if (received + static_cast(data.size()) > size) { + auto conn = str.get_conn(); + log::warning( + logcat, + "PUT request from {} exceeded declared size (declared {}, received {}); closing " + "stream with error", + conn ? conn->remote().to_string() : "", + size, + received + data.size()); + str.close(STREAM_ERROR::too_much_data); + return; + } + + if (fd == -1 && io_state == IO_STATE::none) { + auto now = std::chrono::system_clock::now().time_since_epoch().count(); + uint64_t random; + randombytes_buf(&random, sizeof(random)); + tmp_upload = std::filesystem::path{"upload-{}-{:016x}-{}B"_format(now, random, size)}; + + auto* sqe = io_uring_get_sqe(&str.handler.iou); + io_uring_sqe_set_data64(sqe, str.fsid); + io_state = IO_STATE::opening; + io_uring_prep_openat_direct( + sqe, + str.handler.upload_dir_fd, + tmp_upload.c_str(), + O_CREAT | O_EXCL | O_WRONLY, + 0644, + IORING_FILE_INDEX_ALLOC); + io_uring_submit(&str.handler.iou); + } + + if (!data.empty()) + crypto_generichash_blake2b_update( + &b2b, reinterpret_cast(data.data()), data.size()); + + while (!data.empty()) { + if (chunks.empty()) + chunks.emplace_back().reserve(CHUNK_SIZE); + + auto& c = chunks.back(); + auto s = c.size(); + if (s + data.size() <= CHUNK_SIZE) { + c.resize(s + data.size()); + std::memcpy(c.data() + s, data.data(), data.size()); + received += data.size(); + data = {}; + } else if (s < CHUNK_SIZE) { + c.resize(CHUNK_SIZE); + std::memcpy(c.data() + s, data.data(), CHUNK_SIZE - s); + received += CHUNK_SIZE - s; + data = data.subspan(CHUNK_SIZE - s); + } else { + chunks.emplace_back(); + } + } + + send_chunks(); +} + +void FileStream::put_req::send_chunks(std::optional _retry_offset) { + if (io_state != IO_STATE::none) { + // We're already waiting on an open or write, so nothing else to do just yet; we'll queue + // all the writes when it finishes. + assert(!_retry_offset); // An offset should only be given in response to a complete write + return; + } + + // If we get here then the file creation should have happened already: + assert(fd >= 0); + + if (chunks.empty()) { + if (got_all) + // No more chunks and we received the FIN, so time to rename it into place + initiate_rename(); + return; + } + + bool send_last = chunks.back().size() >= CHUNK_SIZE || got_all; + if (chunks.size() == 1 && !send_last && !_retry_offset) + // Nothing to write right now + return; + + int num_chunks = chunks.size() - !send_last; + std::vector iovecs; + auto it = chunks.begin(); + iovecs.resize(num_chunks); + io_write_size = 0; + for (auto& v : iovecs) { + v.iov_base = it->data(); + v.iov_len = it->size(); + io_write_size += v.iov_len; + ++it; + } + if (_retry_offset) { + assert(_retry_offset < chunks[0].size()); + iovecs[0].iov_base = chunks[0].data() + *_retry_offset; + iovecs[0].iov_len -= *_retry_offset; + io_write_size -= *_retry_offset; + io_write_first_offset = *_retry_offset; + } else { + io_write_first_offset = 0; + } + + auto* sqe = io_uring_get_sqe(&str.handler.iou); + assert(sqe); + io_uring_sqe_set_data64(sqe, str.fsid); + io_uring_sqe_set_flags(sqe, IOSQE_FIXED_FILE); + if (iovecs.size() == 1) + io_uring_prep_write(sqe, fd, iovecs[0].iov_base, iovecs[0].iov_len, -1); + else + io_uring_prep_writev(sqe, fd, iovecs.data(), iovecs.size(), -1); + io_uring_submit(&str.handler.iou); + io_state = static_cast(num_chunks); +} + +void FileStream::put_req::handle_cqe(io_uring_cqe* cqe) { + assert(io_state != IO_STATE::none); + + auto state = io_state; + if (state != IO_STATE::closing_done) // closing_done is a "final" state we don't want to reset + io_state = IO_STATE::none; + + if (str.is_closing()) { + log::debug(logcat, "Ignoring CQE on closing stream"); + chunks.clear(); + return; + } + + if (state == IO_STATE::opening) { + if (cqe->res < 0) { + log::error( + logcat, + "Failed to open temp file: {}; closing stream with I/O error code", + strerror(-cqe->res)); + str.close(STREAM_ERROR::io_error); + chunks.clear(); + return; + } + fd = cqe->res; + // Immediately queue any completed chunks we might have accumulated: + send_chunks(); + } else if (state >= IO_STATE::writing) { // writev response for N=`state` buffers + auto n_bufs = static_cast(state); + assert(n_bufs <= chunks.size()); + + auto written = cqe->res; + if (written < 0) { + log::error( + logcat, + "Failed to write upload data: {}; closing stream with I/O error code", + strerror(-written)); + str.close(STREAM_ERROR::io_error); + chunks.clear(); + return; + } + + // writev can return less than requested, in which case we need to retry from wherever the + // write left off. If it's something like being out of disk space, the subsequent write + // will fail with an i/o error. + if (written != io_write_size) { + if (written > io_write_size) + log::critical( + logcat, + "Internal error: write request returned *more* ({}) than we asked to write " + "({})", + written, + io_write_size); + else + log::debug( + logcat, + "Wrote less ({}) than expected ({}); requeuing write", + written, + io_write_size); + + written += io_write_first_offset; + + // For a partial write, head-drop any chunks that have been completely written: + while (written >= static_cast(chunks.front().size())) { + written -= chunks.front().size(); + chunks.pop_front(); + } + + // written is now the offset into the first chunk where we want to retry; calling + // send_chunks with a >= 0 offset will force it to always send at least the first chunk + // (starting at the required offset), plus anything else that might have queued up in + // the meantime. + send_chunks(written); + return; + } + + log::debug(logcat, "Successfully wrote {}B to tempfile #{}", written, fd); + chunks.erase(chunks.begin(), chunks.begin() + n_bufs); + send_chunks(); + } else if (state == IO_STATE::renaming) { + if (cqe->res == -EEXIST || cqe->res == 0) { + if (cqe->res == -EEXIST) + log::debug(logcat, "Rename failed: upload file already exists! Deleting tempfile"); + else + log::debug(logcat, "Tempfile successfully renamed to final location {}", filepath); + } else { + log::warning( + logcat, + "Failed to rename tempfile {} to final location {}: {}", + tmp_upload, + filepath, + strerror(-cqe->res)); + str.close(STREAM_ERROR::io_error); + } + + unlink_and_close(str.fsid); + io_state = IO_STATE::closing_done; + } else if (state == IO_STATE::closing_done) { + if (cqe->res < 0) { + log::warning(logcat, "Failed to close tempfile: {}", strerror(-cqe->res)); + str.close(STREAM_ERROR::io_error); + } + // Respond anyway because the rename succeeded and such a close failure is probably + // something weird or spurious? + insert_and_respond(); + } else { + assert(!"Unknown state!"); + } +} + +void FileStream::put_req::finalize() { + assert(!got_all); + log::debug(logcat, "Stream FIN received"); + if (str.is_closing()) + return; + + assert(received <= size); // other should already have failed in append() + + if (received < size) { + log::warning( + logcat, + "Stream FIN received without receiving all data (received {} of {}); closing " + "stream with error", + received, + size); + str.close(STREAM_ERROR::not_enough_data); + } + log::debug(logcat, "Stream FIN received"); + got_all = true; + + std::array hash; + crypto_generichash_blake2b_final(&b2b, hash.data(), hash.size()); + + fileid = oxenc::to_base64(hash); + // Convert to url-safe b64: + for (auto& c : fileid) { + if (c == '+') + c = '-'; + else if (c == '/') + c = '_'; + } + filepath = "{}/{}"_format(fileid.substr(0, 2), fileid); + + if (str.handler.back_compat_ids) { + std::string try_ttl = "{} seconds"_format( + ttl && *ttl > 0 && *ttl <= str.handler.max_ttl.count() + ? *ttl + : str.handler.max_ttl.count()); + bool success = false; + for (int i = 0; !success && i < 25; i++) { + uint64_t bcid; + randombytes_buf(&bcid, sizeof(bcid)); + // We can't go over 53 bits because nodejs doesn't have integers: + bcid &= 0x1f'ffff'ffff'ffff; + auto try_id = "{}"_format(bcid); + double upl, exp; + try { + pg_retryable([&] { + pqxx::work tx{str.handler.pg_conn}; + std::tie(upl, exp) = tx.exec(R"( +INSERT INTO files (id, expiry) VALUES ($1, NOW() + $2) +RETURNING EXTRACT(EPOCH FROM uploaded), EXTRACT(EPOCH FROM expiry))", + pqxx::params{try_id, try_ttl}) + .one_row() + .as(); + tx.commit(); + }); + } catch (const pqxx::unique_violation& e) { + continue; + } + success = true; + fileid = std::move(try_id); + filepath = "{:03d}/{}"_format(bcid % 1000, fileid); + expiry = std::chrono::sys_seconds{std::chrono::seconds{static_cast(exp)}}; + uploaded = std::chrono::sys_seconds{std::chrono::seconds{static_cast(upl)}}; + } + + if (!success) { + log::error( + logcat, + "Tried 25 random backcompat IDs are got all constraint failures, something " + "getting wrong!"); + str.close(STREAM_ERROR::io_error); + return; + } + } + + if (io_state == IO_STATE::none) + send_chunks(); +} + +void FileStream::put_req::initiate_rename() { + assert(!fileid.empty()); + assert(!filepath.empty()); + assert(chunks.empty()); + + log::debug( + logcat, + "All data received; renaming tempfile #{} ({}) to final location {}", + fd, + tmp_upload, + filepath); + + auto* sqe = io_uring_get_sqe(&str.handler.iou); + io_uring_sqe_set_data64(sqe, str.fsid); + io_uring_prep_renameat( + sqe, + str.handler.upload_dir_fd, + tmp_upload.c_str(), + str.handler.files_dir_fd, + filepath.c_str(), + RENAME_NOREPLACE); + io_uring_submit(&str.handler.iou); + io_state = IO_STATE::renaming; +} + +void FileStream::put_req::insert_and_respond() { + if (str.handler.back_compat_ids) { + // In back-compat mode, the insert already happened in finalize() because we had to do it to + // get the location to link the file into. + } else { + int max_ttl = str.handler.max_ttl.count(); + std::string db_ttl = "{} seconds"_format(std::clamp(ttl.value_or(max_ttl), 1, max_ttl)); + try { + pg_retryable([&] { + pqxx::work tx{str.handler.pg_conn}; + + auto [upl, exp] = tx.exec(R"( +INSERT INTO files (id, expiry) VALUES ($1, NOW() + $2) +ON CONFLICT(id) DO UPDATE SET expiry = GREATEST(files.expiry, EXCLUDED.expiry) +RETURNING EXTRACT(EPOCH FROM uploaded), EXTRACT(EPOCH FROM expiry))", + pqxx::params{fileid, db_ttl}) + .one_row() + .as(); + uploaded = + std::chrono::sys_seconds{std::chrono::seconds{static_cast(upl)}}; + expiry = std::chrono::sys_seconds{std::chrono::seconds{static_cast(exp)}}; + tx.commit(); + }); + } catch (const pqxx::failure& e) { + log::error(logcat, "Failed to insert DB record for file {}: {}", filepath, e.what()); + str.close(STREAM_ERROR::io_error); + return; + } + } + + oxenc::bt_dict_producer resp; + resp.append("#", fileid); + resp.append("u", uploaded.time_since_epoch().count()); + resp.append("x", expiry.time_since_epoch().count()); + + str.send(std::move(resp).str()); + str.send_fin(); + + str.handler.overall.put(size); +} + +void FileStream::put_req::unlink_and_close(uint64_t close_fsid) { + auto* sqe = io_uring_get_sqe(&str.handler.iou); + + io_uring_sqe_set_flags(sqe, IOSQE_CQE_SKIP_SUCCESS); + io_uring_sqe_set_data64(sqe, 0); // we can't do anything if this fails so 0 will just be + // ignored by the cqe handling + io_uring_prep_unlinkat(sqe, str.handler.upload_dir_fd, tmp_upload.c_str(), 0); + + sqe = io_uring_get_sqe(&str.handler.iou); + io_uring_sqe_set_data64(sqe, close_fsid); + if (close_fsid == 0) + // Without an fsid we don't care about the outcome, so can skip handling entirely. If we + // *do* have an fsid then we want it so that it triggers whatever happens after closing. + io_uring_sqe_set_flags(sqe, IOSQE_CQE_SKIP_SUCCESS); + io_uring_prep_close_direct(sqe, fd); + + io_uring_submit(&str.handler.iou); + + fd = -1; +} + +FileStream::put_req::~put_req() { + if (io_state != IO_STATE::none) { + // Some request is already in progress, so cancel it before we send the close. This + // particular io_uring submission is *synchronous* so that we aren't at risk of the close we + // submit just after this actually getting queued before the cancellation (and thus itself + // getting cancelled). + auto* sqe = io_uring_get_sqe(&str.handler.iou); + io_uring_sqe_set_data64(sqe, 0); + io_uring_sqe_set_flags(sqe, IOSQE_CQE_SKIP_SUCCESS); + io_uring_prep_cancel64(sqe, str.fsid, 0); + io_uring_submit(&str.handler.iou); + } + + if (fd >= 0) + unlink_and_close(0 /* don't care about results, since we're destructing */); +} + +void FileStream::parse_put(oxenc::bt_dict_consumer&& d) { + auto size = d.require("s"); + auto ttl = d.maybe("t"); + + if (size <= 0 || size > handler.max_size) { + log::error(logcat, "Invalid PUT size: {}, closing stream with error", size); + close(STREAM_ERROR::invalid_size); + return; + } + + auto& put = request.emplace(*this, std::move(size), std::move(ttl)); + if (log::get_level(accesslog) >= log::Level::info) { + auto ttl = put.ttl ? " (ttl={})"_format(*put.ttl) : ""; + if (auto conn = get_conn()) + log::info(accesslog, "PUT: {}B upload{} from {}", put.size, ttl, conn->remote()); + else + log::info(accesslog, "PUT: {}B upload{} from ", put.size, ttl); + } +} + +} // namespace sfs diff --git a/quic/quic-files.cpp b/quic/quic-files.cpp new file mode 100644 index 0000000..c1e4e55 --- /dev/null +++ b/quic/quic-files.cpp @@ -0,0 +1,202 @@ +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "cleanup.hpp" +#include "requests.hpp" + +std::atomic signalled = 0; +void handle_signal(int sig) { + signalled = sig; +} + +int main(int argc, char* argv[]) { + + using namespace oxen; + using namespace log::literals; + using namespace std::literals; + + std::signal(SIGINT, handle_signal); + std::signal(SIGTERM, handle_signal); + + auto logcat = log::Cat("files"); + + CLI::App cli{"Session QUIC file server"}; + + std::string pgsql_uri; + cli.add_option( + "--pgsql,-P", + pgsql_uri, + "Postgresql database or URI, e.g. 'mydb' or some more complicated " + "'postgresql:///...'") + ->type_name("URI") + ->required(); + + std::filesystem::path file_path; + cli.add_option( + "--files,-f", + file_path, + "Directory where files referenced in the database are stored on disk") + ->type_name("DIRECTORY") + ->required(); + + bool back_compat_ids = false; + cli.add_flag( + "--compat-ids,-c", + back_compat_ids, + "Run in backwards-compatible, numeric ID mode. New uploads will get random integer " + "IDs instead of file hash IDs."); + + uint32_t max_ttl = 14 * 24 * 60 * 60; + cli.add_option( + "--ttl,-t", + max_ttl, + "Maximum TTL for uploads, in seconds. This is the default lifetime for upload " + "files (if not renewed). Clients may request shorter TTLs during upload or renew " + "requests, but requesting a longer TTL will be truncated to this value") + ->capture_default_str() + ->check(CLI::Range(60, 60 * 24 * 60 * 60)); + + uint32_t max_size = 10'223'616; + cli.add_option("--max-size,-S", max_size, "Maximum upload size, in bytes.") + ->capture_default_str(); + + bool no_delete_expired = false; + cli.add_flag( + "--no-delete-expired", + no_delete_expired, + "Disable expired file deletion. Should only be used if something else is taking care " + "of expired file deletions."); + + std::filesystem::path key_file; + cli.add_option( + "--key,-K", + key_file, + "Path to an Ed25519 secret key. Must be either 64 bytes (raw), or 128 hex " + "characters") + ->type_name("FILENAME") + ->required(); + + std::string addr{":11235"}; + cli.add_option("--bind,-b", addr, "Bind address for incoming oxen-libQUIC connections") + ->type_name("IP:PORT") + ->capture_default_str() + ->check([](const std::string& a) { + try { + quic::Address::parse(a); + } catch (const std::exception& e) { + return "Invalid bind address: {}"_format(e.what()); + } + return ""s; + }); + + bool disable_0rtt = false; + cli.add_flag("--disable-0rtt,-0", disable_0rtt, "Disables 0-RTT support"); + + std::string log_level = "info,quic=warning"; + cli.add_option( + "--log-level", + log_level, + "Log verbosity level, see Log Levels below for accepted values") + ->type_name("LEVEL") + ->capture_default_str(); + + try { + cli.parse(argc, argv); + } catch (const CLI::ParseError& e) { + return cli.exit(e); + } + + log::apply_categories(log_level); + log::add_sink(log::Type::Print, "stdout"); + + std::string ed_keys; + try { + std::ifstream in; + in.exceptions(std::ios::badbit | std::ios::failbit); + in.open(key_file, std::ios::binary | std::ios::in); + in.seekg(0, in.end); + auto size = in.tellg(); + in.seekg(0, in.beg); + + if (size == 64) { + ed_keys.resize(64); + in.read(reinterpret_cast(ed_keys.data()), 64); + } else if (size >= 128 && size <= 130) { + ed_keys.resize(size); + in.read(ed_keys.data(), size); + if ((size == 130 && ed_keys[128] == '\r' && ed_keys[129] == '\n') || + (size == 129 && ed_keys[128] == '\n')) + ed_keys.resize(128); + if (ed_keys.size() == 128 && oxenc::is_hex(ed_keys)) + ed_keys = oxenc::from_hex(ed_keys); + else + ed_keys.clear(); + } + + if (ed_keys.size() != 64) + throw std::invalid_argument{ + "Invalid --key file: expected 64 raw byte, or 128 hex character key file"}; + } catch (const std::exception& e) { + fmt::print(stderr, "\n\n\x1b[31;1mInvalid --key file: {}\x1b[0m\n\n", e.what()); + return 1; + } + + log::info( + logcat, + "Server Ed25519 pubkey: {}", + oxenc::to_hex(ed_keys.begin() + 32, ed_keys.end())); + auto address = quic::Address::parse(addr); + log::info( + logcat, + "Starting quic listener @ {}{}", + address, + disable_0rtt ? " WITHOUT 0rtt support" : ""); + + std::thread cleanup_thread; + std::promise stop_cleanup; + if (!no_delete_expired) + cleanup_thread = sfs::start_cleanup_thread(pgsql_uri, file_path, stop_cleanup.get_future()); + + try { + sfs::ReqHandler handler{ + address, + std::move(ed_keys), + !disable_0rtt, + std::move(pgsql_uri), + back_compat_ids, + std::chrono::seconds{max_ttl}, + file_path, + max_size}; + + log::info(logcat, "Server started."); + + signalled.wait(0); + log::warning(logcat, "Received signal {}, stopping server", signalled.load()); + } catch (const std::exception& e) { + log::error(logcat, "An exception occured while running the server: {}", e.what()); + } + + if (cleanup_thread.joinable()) { + log::info(logcat, "Stopping cleanup thread"); + stop_cleanup.set_value(); + cleanup_thread.join(); + } + + log::info(logcat, "Exiting"); +} diff --git a/quic/requests.cpp b/quic/requests.cpp new file mode 100644 index 0000000..790ba8f --- /dev/null +++ b/quic/requests.cpp @@ -0,0 +1,386 @@ +#include "requests.hpp" + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "common.hpp" // IWYU pragma: keep + +namespace sfs { + +bool back_compat_ids = false; +int max_ttl = 14 * 24 * 60 * 60; + +using namespace oxen; +using namespace log::literals; +using namespace std::literals; + +void handle_file_info(quic::message m) {} +void handle_file_extend(quic::message m) {} +void handle_session_version(quic::message m) {} +void handle_token_info(quic::message m) {} + +static auto logcat = log::Cat("files"); +static auto accesslog = log::Cat("access"); + +void FileStream::on_fin() { + Stream::on_fin(); + std::visit( + [](R& req) { + if constexpr (!std::same_as) + req.finalize(); + }, + request); +} + +void FileStream::receive(std::span data) { + if (is_closing()) + return; + if (std::holds_alternative(request)) { + if (command_block_size < 0) { + try { + // The first thing we received on the stream is the size of the command block, + // e.g. `123:`. This call accumulates these digits until we hit a :, mutating data + // to omit the prefix once found. + if (auto size = quic::prefix_accumulator(partial_size, data)) { + if (*size == 0) + throw std::runtime_error{"Command block cannot be 0 bytes"}; + command_block_size = *size; + } else + return; // Not enough data received yet + } catch (const std::exception& e) { + log::warning( + logcat, + "Invalid file stream command block on stream {}: {}; closing stream", + stream_id(), + e.what()); + close(STREAM_ERROR::bad_request); + return; + } + command.reserve(command_block_size); + } + + try { + if (!quic::data_accumulator(command, data, command_block_size)) + return; + } catch (const std::exception& e) { + log::error( + logcat, + "Invalid file stream command block on stream {}: {}; closing stream", + stream_id(), + e.what()); + close(STREAM_ERROR::bad_request); + return; + } + + // We've completely read the request body, so now we can parse it: + try { + oxenc::bt_dict_consumer d{command}; + if (auto cmd = d.maybe("!")) { + if (*cmd == "GET") + parse_get(std::move(d)); + else if (*cmd == "PUT") + parse_put(std::move(d)); + else { + log::error( + logcat, + "Invalid file stream request '{}' on stream {}", + *cmd, + stream_id()); + close(STREAM_ERROR::bad_endpoint); + return; + } + } else { + log::error( + logcat, + "File stream request missing command field '!' on stream {}", + stream_id()); + close(STREAM_ERROR::bad_request); + return; + } + } catch (const std::exception& e) { + log::error(logcat, "Failure during command block parsing: {}", e.what()); + close(STREAM_ERROR::bad_request); + return; + } + } + + if (is_closing()) + return; + + if (auto* put = std::get_if(&request)) { + // If this is a PUT request then everything after the command block is file data: + put->append(data); + } else { + auto* get = std::get_if(&request); + assert(get); + if (!data.empty()) { + log::warning(logcat, "Error: {}B after GET command block", data.size()); + close(STREAM_ERROR::bad_request); + } + } +} + +void FileStream::wrote(size_t size) { + Stream::wrote(size); + + if (auto* get = std::get_if(&request)) + get->queue_reads(); +} + +void FileStream::close(STREAM_ERROR e) { + Stream::close(static_cast(e)); + + // If there is a request in progress, destroying whatever is in `request` will cancel any + // ongoing I/O. (This has to be deferred via call_soon because this can be called from *inside* + // a live instance of `request`). + loop.call_soon( + [this, + wself = std::weak_ptr{std::static_pointer_cast(shared_from_this())}] { + if (auto self = wself.lock()) + self->request.emplace(); + }); +} + +FileStream::FileStream(quic::Connection& c, quic::Endpoint& e, ReqHandler& h) : + quic::Stream{c, e}, handler{h}, fsid{handler._next_fsid++} { + handler.streams[fsid] = this; +} + +FileStream::~FileStream() { + handler.streams.erase(fsid); +} + +quic::opt::static_secret make_static_secret(std::string_view ed_keys) { + constexpr auto STATIC_SEC_DOMAIN = "session-file-server-quic-files"sv; + crypto_generichash_blake2b_state st; + crypto_generichash_blake2b_init( + &st, + reinterpret_cast(STATIC_SEC_DOMAIN.data()), + STATIC_SEC_DOMAIN.size(), + 32); + crypto_generichash_blake2b_update( + &st, reinterpret_cast(ed_keys.data()), ed_keys.size()); + std::vector out; + out.resize(32); + crypto_generichash_blake2b_final(&st, out.data(), out.size()); + return quic::opt::static_secret{std::move(out)}; +} + +ReqHandler::ReqHandler( + quic::Address listen, + std::string ed_keys, + bool enable_0rtt, + std::string pgsql_uri, + bool back_compat_ids, + std::chrono::seconds max_ttl, + std::filesystem::path files_path_, + int64_t max_size) : + back_compat_ids{back_compat_ids}, + max_ttl{max_ttl}, + max_size{max_size}, + files_path{std::move(files_path_)} { + + if (sodium_init() == -1) + throw std::runtime_error{"Failed to initialize libsodium!"}; + + loop.call_get([&] { + pg_conn = pqxx::connection{pgsql_uri}; + + std::list cleanup; + + std::filesystem::create_directories(files_path); + + if (back_compat_ids) + for (int i = 0; i < 1000; i++) + std::filesystem::create_directories(files_path / "{:03d}"_format(i)); + else { + for (auto a : b64_url_chars) + for (auto b : b64_url_chars) + std::filesystem::create_directories(files_path / "{}{}"_format(a, b)); + } + upload_path = files_path / "uploads"; + std::filesystem::create_directories(upload_path); + + upload_dir_fd = open(upload_path.c_str(), O_PATH | O_DIRECTORY); + if (upload_dir_fd < 0) + throw std::runtime_error{ + "Unable to open upload path {}: {}"_format(upload_path, strerror(errno))}; + cleanup.emplace_back([this] { close(upload_dir_fd); }); + + if (int err = io_uring_queue_init(128, &iou, IORING_SETUP_SINGLE_ISSUER); err != 0) + throw std::runtime_error{ + "Failed to initialize io_uring queue: {}"_format(strerror(-err))}; + cleanup.emplace_front([this] { io_uring_queue_exit(&iou); }); + + if (int err = io_uring_register_files_sparse(&iou, MAX_OPEN_FILES); err != 0) + throw std::runtime_error{ + "Failed to initialize io_uring file descriptors: {}"_format(strerror(-err))}; + cleanup.emplace_front([this] { io_uring_unregister_files(&iou); }); + + files_dir_fd = open(files_path.c_str(), O_PATH | O_DIRECTORY); + if (files_dir_fd < 0) + throw std::runtime_error{ + "Unable to open files path {}: {}"_format(files_path, strerror(errno))}; + cleanup.emplace_back([this] { close(files_dir_fd); }); + + iou_evfd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); + if (iou_evfd < 0) + throw std::runtime_error{"Failed to set up eventfd: {}"_format(strerror(errno))}; + cleanup.emplace_front([this] { close(iou_evfd); }); + + if (int err = io_uring_register_eventfd(&iou, iou_evfd); err != 0) + throw std::runtime_error{"Failed to register eventfd: {}"_format(strerror(-err))}; + cleanup.emplace_front([this] { io_uring_unregister_eventfd(&iou); }); + + ep = quic::Endpoint::endpoint( + loop, listen, make_static_secret(ed_keys), quic::opt::inbound_alpn(sfs::ALPN)); + + auto creds = quic::GNUTLSCreds::make_from_ed_seckey(ed_keys); + if (enable_0rtt) + creds->enable_inbound_0rtt(0s, 48h); + + ep->listen( + std::move(creds), + [this](quic::Connection& conn, + quic::Endpoint& e, + std::optional stream_id) { + assert(stream_id); // Should always be set for incoming streams + return make_stream(conn, e, *stream_id); + }); + + iou_ev = event_new( + loop.get_event_base(), + iou_evfd, + EV_READ | EV_PERSIST, + [](evutil_socket_t fd, short, void* me) { + uint64_t dummy; + read(fd, &dummy, 8); + static_cast(me)->process_cqes(); + }, + this); + if (!iou_ev) + throw std::runtime_error{"Failed to initialize io_uring fd event via libevent"}; + cleanup.emplace_front([this] { event_free(iou_ev); }); + if (0 != event_add(iou_ev, nullptr)) + throw std::runtime_error{"Failed to initialize io_uring event monitoring via libevent"}; + + for (auto& x : cleanup) + x.disarm(); + + stats_timer = loop.call_every(30s, [this] { + auto now = std::chrono::steady_clock::now(); + log::info( + logcat, + "{} uploads ({:.1f}GB), {} downloads ({:.1f}GB), {} other requests in {} since " + "startup.", + overall.puts, + overall.put_data / 1e9, + overall.gets, + overall.get_data / 1e9, + overall.others, + friendly_duration(now - overall.since)); + if (now - recent.since >= 10min) { + recent_old = recent; + recent = {}; + } + if (recent_old.since > overall.since + 1s) { + log::info( + logcat, + "{} uploads ({:.1f}GB), {} downloads ({:.1f}GB), {} other requests in last " + "{}", + recent.puts + recent_old.puts, + (recent.put_data + recent_old.put_data) / 1e9, + recent.gets + recent_old.gets, + (recent.get_data + recent_old.get_data) / 1e9, + recent.others + recent_old.others, + friendly_duration(now - recent_old.since)); + } + }); + }); +} + +ReqHandler::~ReqHandler() { + assert(ep.use_count() == 1); + ep.reset(); + event_free(iou_ev); + loop.call_get([this] { + io_uring_unregister_files(&iou); + io_uring_unregister_eventfd(&iou); + close(iou_evfd); + io_uring_queue_exit(&iou); + }); +} + +void ReqHandler::process_cqes() { + unsigned head; + unsigned count = 0; + io_uring_cqe* cqe; + io_uring_for_each_cqe(&iou, head, cqe) { + ++count; + auto fsid = io_uring_cqe_get_data64(cqe); + FileStream* sptr = nullptr; + if (auto it = streams.find(fsid); it != streams.end()) + sptr = it->second; + if (!sptr) { + log::debug(logcat, "Ignoring CQE on dead stream (fsid={})", fsid); + continue; + } + auto& str = *sptr; + try { + std::visit( + [cqe](R& req) { + if constexpr (!std::same_as) + req.handle_cqe(cqe); + }, + str.request); + } catch (const std::exception& e) { + log::warning( + logcat, + "Exception during I/O response processing ({}); closing stream", + e.what()); + str.close(STREAM_ERROR::io_error); + } + } + io_uring_cq_advance(&iou, count); +} + +std::shared_ptr ReqHandler::make_stream( + quic::Connection& conn, quic::Endpoint& e, int64_t stream_id) { + + // Stream 0 is the bt request stream on which you can make simple requests such as getting + // current versions, querying file metadata, or touching an uploaded file to renew its + // expiry. + // + // It does *not*, however, handle uploads and downlods: each upload/download happens on a + // new stream. + if (stream_id == 0) { + auto s = e.loop.make_shared(conn, e); + s->register_handler( + "file_info", [this](quic::message m) { handle_file_info(std::move(m)); }); + s->register_handler( + "file_extend", [this](quic::message m) { handle_file_extend(std::move(m)); }); + s->register_handler("session_version", [this](quic::message m) { + handle_session_version(std::move(m)); + }); + s->register_handler( + "token_info", [this](quic::message m) { handle_token_info(std::move(m)); }); + return std::move(s); + } + + return e.loop.make_shared(conn, e, *this); +} + +} // namespace sfs diff --git a/quic/requests.hpp b/quic/requests.hpp new file mode 100644 index 0000000..90df488 --- /dev/null +++ b/quic/requests.hpp @@ -0,0 +1,356 @@ +#pragma once + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace sfs { + +namespace quic = oxen::quic; +using namespace std::literals; + +inline constexpr auto ALPN = "quic-files"sv; + +enum class STREAM_ERROR : uint64_t { + bad_request = 400, + + not_found = 404, + + bad_endpoint = 405, + + size_missing = 450, + invalid_size = 451, + not_enough_data = 452, + too_much_data = 453, + io_error = 480, +}; + +class FileStream; + +struct req_stats { + int64_t puts = 0, gets = 0, others = 0; + int64_t put_data = 0, get_data = 0; + std::chrono::steady_clock::time_point since = std::chrono::steady_clock::now(); + + void put(int64_t size) { + puts++; + put_data += size; + } + void get(int64_t size) { + gets++; + get_data += size; + } + void other() { others++; } +}; + +class ReqHandler { + + quic::Loop loop; + std::shared_ptr ep; + bool back_compat_ids; + std::chrono::seconds max_ttl; + int64_t max_size{max_size}; + + req_stats overall{}; + req_stats recent{}, recent_old{}; + std::shared_ptr stats_timer; + + std::filesystem::path files_path; + std::filesystem::path upload_path; + int files_dir_fd; + int upload_dir_fd; + + io_uring iou; + int iou_evfd; + event* iou_ev; + + pqxx::connection pg_conn; + + static constexpr int MAX_OPEN_FILES = 200000; + + // The u64 that we use to associated io_uring responses with the streams that made the request. + // 0 is a special value (e.g. used for cancellation on destruction) that never maps to an actual + // stream. + uint64_t _next_fsid = 1; + std::unordered_map streams; + + void process_cqes(); + + friend class FileStream; + + public: + ReqHandler( + quic::Address listen, + std::string ed_keys, + bool enable_0rtt, + std::string pgsql_uri, + bool back_compat_ids, + std::chrono::seconds max_ttl, + std::filesystem::path files_path, + int64_t max_size); + + ~ReqHandler(); + + std::shared_ptr make_stream( + quic::Connection& conn, quic::Endpoint& e, int64_t stream_id); +}; + +/// Stream subclass used for handling all incoming streams beyond the first (i.e. ids > 0; the first +/// stream, with id 0, is always a standard quic bt request stream for non-transfer responses). +/// +/// Each of these streams is issued with an initial command block, optionally followed by stream +/// data. The response is an initial response block, optionally followed by stream data. +/// +/// Each command or response block is a bt-encoded string containing a bt-encoded dict of request +/// data. The inner encoded dict may be no larger than 9999 bytes. E.g. +/// +/// 19:d1:!3:PUT1:si8192ee.................. +/// +/// is a single upload command. The reply (after successfully accepting the upload) could be: +/// +/// 67:d1:#44:abc…xyz1:xi1760388294ee +/// +/// indicating the status of a successful upload. +/// +/// Currently supported commands (which are in the "!" key): +/// +/// PUT +/// --- +/// Command block is a dict containing: +/// - ! = PUT +/// - s = upload size, in bytes. The upload will fail immediately if the size is not acceptable. +/// If the stream is closed (or FIN sent) before this size has been received, or if more than this +/// size is sent, then the stream is closed with an error (the file is not stored). +/// - t = file ttl, in seconds. Optional; if omitted defaults to max ttl. +/// +/// All data following the command block are the file contents being uploaded. +/// +/// The successful response is a dict block (i.e. bt-encoded string encoding of a bt-dict) +/// containing keys: +/// - # = the accepted file id as a string. This is a base64 (url variation) encoded value, unless +/// backwards compatible ids are enabled in which case it will be a stringified integer value. +/// - u = the file upload timestamp; typically now, but earlier if the upload was deduplicated. +/// - x = the file expiry, in unix epoch seconds, after the insert or update caused by this upload. +/// +/// On error, the stream is closed (without sending a response) with one of the following error +/// codes: +/// - 450 -- no upload size given +/// - 451 -- invalid upload size (i.e. size is too large). This will be triggered immediately if +/// the +/// size given in the initial command is not allowed, or if too little or too much data is +/// sent. +/// - 452 -- stream FIN received before all data received +/// - 453 -- too much file data received (i.e. more than "s" bytes) +/// - 480 -- the file could not be stored because of a server-side I/O error. +/// - other non-0 codes indicate some other internal server side error (e.g. server restarting). +/// +/// +/// GET +/// --- +/// Command block is a dict containing: +/// - ! = GET +/// - # = file ID to retrieve, as a string value. Should be either a 44-character ID, or a shorter +/// numeric ID (but still passed as a string). +/// +/// The successful response is a dict block followed by the file bytes. The dict contains keys: +/// - s = the size of the file being sent, in bytes. (Note that this is the file size, not the +/// stream size, i.e. it does not include the size of the response block). +/// - u = the original file creation data, in unix epoch seconds. If the file was renewed (or +/// re-uploaded and de-duplicate), this is the original upload date, not the renewal date. +/// - x = the file expiry, in unix epoch seconds +/// +/// The file immediately follows this double-encoded block (e.g. `123:d.....e`), in its entirety. +/// +/// If the file is not found then the stream is closed, with no response sent, with a stream error +/// code of 404. Error 480 can be issued for some sort of I/O error; other errors are possible, +/// e.g. if the server is restarted before handling the request, or if a server-side I/O error +/// occurs. +/// +/// +/// If some other command is sent on an auxiliary stream then the stream is closed with error 405. +/// If the command block is not parseable then it is closed with code 400. +/// +class FileStream : public quic::Stream { + + class file_req { + protected: + // We queue reads and writes in blocks of this size + static constexpr int64_t CHUNK_SIZE = 64 * 1024; + + file_req(FileStream& str) : str{str} {} + + FileStream& str; + + int fd = -1; + + std::filesystem::path filepath; + std::chrono::sys_seconds uploaded; + std::chrono::sys_seconds expiry; + + virtual ~file_req() = default; + + public: + std::string fileid; + int64_t size = -1; + + // Called to process a CQE response to a queued event that this object submitted. + virtual void handle_cqe(io_uring_cqe* cqe) = 0; + }; + + class get_req : public file_req { + // Per-stream readahead; if we drop below this of unsent data then we queue additional reads + // until we have unsent data on the stream >= this value (or hit EOF). + static constexpr int64_t READAHEAD = 512*1024; + + // current io_uring state: + // - IO_STATE::none means there is no pending io_uring request + // - IO_STATE::reading means we have a chain of `chunk.size()` reads queued + // - IO_STATE::statx means we are waiting on the statx that happens first + // - IO_STATE::opening means we are waiting on the open (which happens chained with the + // statx). + // - IO_STATE::closing means we are closing the file handle. + enum class IO_STATE : int { + none = -100, + statx = -1, + opening = 0, + closing_done = -2, + reading = 1, // including all higher underlying values + }; + + struct statx statxbuf{}; + + IO_STATE io_state = IO_STATE::none; + + bool eof = false; + int64_t bytes_read = 0; + + std::deque> chunks; + + public: + get_req(FileStream& str, std::string id); + + void finalize(); + + void handle_cqe(io_uring_cqe* cqe) override; + + void queue_reads(); + + // Cancels operations (if something is queued) and closes the fd + void close(); + }; + class put_req : public file_req { + + crypto_generichash_blake2b_state b2b; + + // current io_uring state: + // - IO_STATE::none means there is no pending io_uring request + // - >= IO_STATE::writing means we are writing the first N buffers of `chunks`, where N + // is the underlying value. + // - IO_STATE::opening means we are creating the tempfile + // - IO_STATE::renaming means we are renaming the tempfile into its final location + // - IO_STATE::closing_done means we are closing the file handle after success. + // - IO_STATE::unlink_err means we aborted with an error (and closed/unlinked the tempfile) + // Tracking the currently scheduled I/O task: + enum class IO_STATE : int { + none = -100, + closing_done = -2, + renaming = -1, + opening = 0, + writing = 1, // >= writing means a writev of the underlying number of chunks + }; + + IO_STATE io_state = IO_STATE::none; + + // When waiting on a write, this is how many bytes we tried writing + int io_write_size; + // In some cases (such as a partial write) we end up starting a write with an offset into + // the first chunk; this is where we store it: + size_t io_write_first_offset; + + std::deque> chunks; + + int64_t received = 0; + bool got_all = false; + std::filesystem::path tmp_upload; + + void initiate_rename(); + + void insert_and_respond(); + + void unlink_and_close(uint64_t close_fsid); + + friend class ReqHandler; + + public: + std::optional ttl; + + put_req(FileStream& s, size_t size, std::optional ttl); + + // Appends data; the first time this is called a temporary file is opened into which the + // data will be written. Throws upon I/O error. + void append(std::span data); + + // If there is at least 1 full chunk accumulated (or the write is complete) then this + // submits any full (or final) chunks to be written. Does nothing if an I/O operation is + // already submitted, or if there isn't a full chunk yet. + // + // retry_offset is used after a failed partial write to retry the first, beginning the first + // chunk at the given offset. + void send_chunks(std::optional _retry_offset = std::nullopt); + + // Called when the sender sends FIN on the stream (to indicate it is done with the file + // contents). If not enough data has been sent, this closes the stream with an error code. + // Otherwise this initiates linking the file to its final location on disk (based on the + // hash), inserting the record into the database, and then replying on the stream with the + // upload info. + void finalize(); + + // Called to process a CQE response to a queued event that this object submitted. + void handle_cqe(io_uring_cqe* cqe) override; + + // Cancels any ongoing operations and closes the file descriptor. + ~put_req(); + }; + + ReqHandler& handler; + + // Unique identifier we use to reacquire the stream (if still alive) out of the ReqHandler when + // a io_uring completion occurs. (We store this id in the queue entry). + // (NB: put_req destructor requires this, so this must be declared above `request`) + const uint64_t fsid; + + std::variant request; + int command_block_size = -1; + std::string partial_size; + std::vector command; + + void parse_get(oxenc::bt_dict_consumer&& d); + void parse_put(oxenc::bt_dict_consumer&& d); + + void on_fin() override; + + friend class ReqHandler; + + public: + FileStream(quic::Connection& c, quic::Endpoint& e, ReqHandler& h); + + ~FileStream() override; + + void receive(std::span data) override; + + void close(STREAM_ERROR e); + + void wrote(size_t size) override; +}; + +} // namespace sfs diff --git a/quic/test/test-download.cpp b/quic/test/test-download.cpp new file mode 100644 index 0000000..1782460 --- /dev/null +++ b/quic/test/test-download.cpp @@ -0,0 +1,232 @@ +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +using namespace std::literals; +using namespace oxen; +using namespace log::literals; + +static auto cat = log::Cat("upload"); + +int main(int argc, char* argv[]) { + + log::add_sink(log::Type::Print, "-"); + log::reset_level(log::Level::warn); + log::set_level(cat, log::Level::info); + + auto usage = [&argv](std::string_view msg) { + if (!msg.empty()) + log::error(cat, "{}", msg); + log::info( + cat, + "\n\nUsage: {} IP:PORT PUBKEY FILEID [FILEID2 ...]\n\nDownloads files from a " + "fileserver to ./FILEID ./FILEID2 etc.\n\n", + argv[0]); + return 1; + }; + + if (argc < 4) + return usage("Insufficient arguments"); + + quic::Address addr; + try { + addr = quic::Address::parse(argv[1]); + } catch (const std::exception& e) { + return usage("{} doesn't look like an IP:PORT address ({})"_format(argv[2], e.what())); + } + + std::string_view pubkey{argv[2]}; + if (pubkey.size() != 64 || !oxenc::is_hex(pubkey)) + return usage("{} is not a valid pubkey"_format(pubkey)); + + int ttl = 0; + int id_i = 3; + + if (id_i >= argc) + return usage("No upload files given!"); + + constexpr auto b64_url_chars = + "-0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ_abcdefghijklmnopqrstuvwxyz"sv; + struct state { + std::string id; + std::filesystem::path f; + std::string partial; + std::vector meta; + int meta_size = -1; + int size = -1; + std::chrono::sys_seconds uploaded; + std::chrono::sys_seconds expiry; + int received = 0; + std::ofstream out; + }; + + std::vector downloads; + std::unordered_set dupes; + for (; id_i < argc; id_i++) { + auto& st = downloads.emplace_back(); + st.id = argv[id_i]; + if (!((st.id.size() == 44 && st.id.find_first_not_of(b64_url_chars) == std::string::npos) || + (st.id.size() >= 1 && st.id.size() <= 16 && + st.id.find_first_not_of("0123456789") == std::string::npos))) + throw std::runtime_error{ + "invalid id '{}': expected 44 b64url or 1-16 decimal digits"_format(st.id)}; + + st.f = st.id; + for (int i = 1; std::filesystem::exists(st.f) || !dupes.insert(st.f).second; i++) + st.f.replace_extension(".{}"_format(i)); + } + + quic::Loop loop; + auto ep = quic::Endpoint::endpoint(loop, quic::Address{}); + + auto conn = ep->connect( + quic::RemoteAddress{oxenc::from_hex(pubkey), addr}, + quic::opt::outbound_alpn("quic-files")); + + // We must always open this stream first (even if we aren't going to use it); this is the stream + // on which we can make simple requests; all later streams are for file transfers. + auto bt_str = conn->open_stream(); + + // stream -> downloads_index + std::unordered_map, size_t> dl_idx; + + std::atomic remaining{downloads.size()}; + auto on_stream_close = [&](quic::Stream& s, uint64_t err) { + --remaining; + remaining.notify_one(); + auto& st = downloads[dl_idx.at(s.shared_from_this())]; + auto sid = s.stream_id(); + if (err != 0) { + log::error(cat, "Download {} stream (id={}) closed with error {}", st.id, sid, err); + return; + } + + if (st.size < 0) { + log::error( + cat, "Download {} stream (id={}) closed before metadata received!", st.id, sid); + return; + } + if (st.received < st.size) { + log::error( + cat, + "Download {} stream (id={}) closed before file content received (received {} " + "of {}B)", + st.id, + sid, + st.received, + st.size); + return; + } else { + st.out.close(); + log::info( + cat, + "Successfully downloaded {} (on stream {}):\nsaved to: {}\nuploaded: {}\n " + "expiry: {}", + st.id, + sid, + st.f, + st.uploaded, + st.expiry); + } + }; + + auto on_stream_data = [&](quic::Stream& s, std::span data) { + auto sid = s.stream_id(); + auto& st = downloads[dl_idx.at(s.shared_from_this())]; + + if (st.meta_size < 0) { + try { + if (auto size = quic::prefix_accumulator(st.partial, data)) { + if (*size == 0) + throw std::runtime_error{"Invalid 0-byte metadata block"}; + st.meta_size = *size; + } else + return; + } catch (const std::exception& e) { + log::error(cat, "Invalid file stream metadata on stream {}: {}", sid, e.what()); + s.close(1234); + return; + } + + st.meta.reserve(st.meta_size); + } + + if (st.size < 0) { + try { + if (!quic::data_accumulator(st.meta, data, st.meta_size)) + return; + } catch (const std::exception& e) { + log::error(cat, "Invalid file metadata received from server: {}", e.what()); + s.close(1234); + return; + } + + try { + oxenc::bt_dict_consumer d{st.meta}; + auto size = d.require("s"); + if (size <= 0) + throw std::runtime_error{"Invalid non-positive file size {}"_format(size)}; + auto upl = d.require("u"); + auto exp = d.require("x"); + d.finish(); + + st.size = size; + st.uploaded = std::chrono::sys_seconds{std::chrono::seconds{upl}}; + st.expiry = std::chrono::sys_seconds{std::chrono::seconds{exp}}; + } catch (const std::exception& e) { + log::error(cat, "Failed to parse file metadata for {} on stream {}", st.id, sid); + s.close(444); + return; + } + + try { + st.out.exceptions(std::ios::failbit | std::ios::badbit); + st.out.open(st.f, std::ios::binary | std::ios::out); + } catch (const std::exception& e) { + log::error(cat, "Failed to open {} for download: {}", st.f, e.what()); + s.close(400); + } + } + + try { + if (st.received + data.size() > st.size) + throw std::runtime_error{"Invalid file data received from server: received {}, expected {}"_format(st.received + data.size(), st.size)}; + st.out.write(reinterpret_cast(data.data()), data.size()); + st.received += data.size(); + } + catch (const std::exception& e) { + log::error(cat, "Failed writing to {}: {}", st.f, e.what()); + s.close(400); + } + }; + + loop.call([&] { + for (size_t i = 0; i < downloads.size(); i++) { + auto& st = downloads[i]; + auto str = conn->open_stream(on_stream_data, on_stream_close); + dl_idx[str] = i; + + oxenc::bt_dict_producer info; + info.append("!", "GET"); + info.append("#", st.id); + str->send("{}:{}"_format(info.view().size(), info.view())); + str->send_fin(); + } + }); + + size_t r = remaining.load(); + while (r > 0) { + remaining.wait(r); + r = loop.call_get([&] { return remaining.load(); }); + } +} diff --git a/quic/test/test-upload.cpp b/quic/test/test-upload.cpp new file mode 100644 index 0000000..94fa3fb --- /dev/null +++ b/quic/test/test-upload.cpp @@ -0,0 +1,209 @@ +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +using namespace std::literals; +using namespace oxen; +using namespace log::literals; + +static auto cat = log::Cat("upload"); + +int main(int argc, char* argv[]) { + + log::add_sink(log::Type::Print, "-"); + log::reset_level(log::Level::warn); + log::set_level(cat, log::Level::info); + + auto usage = [&argv](std::string_view msg) { + if (!msg.empty()) + log::error(cat, "{}", msg); + log::info( + cat, + "\n\nUsage: {} IP:PORT PUBKEY [TTL=SECONDS] FILENAME [FILENAME ...]\n\n", + argv[0]); + return 1; + }; + + if (argc < 4) + return usage("Insufficient arguments"); + + quic::Address addr; + try { + addr = quic::Address::parse(argv[1]); + } catch (const std::exception& e) { + return usage("{} doesn't look like an IP:PORT address ({})"_format(argv[2], e.what())); + } + + std::string_view pubkey{argv[2]}; + if (pubkey.size() != 64 || !oxenc::is_hex(pubkey)) + return usage("{} is not a valid pubkey"_format(pubkey)); + + int ttl = 0; + int file_i = 3; + if (std::string_view ttl_opt{argv[3]}; ttl_opt.starts_with("TTL=")) { + file_i++; + auto [ptr, ec] = std::from_chars(ttl_opt.data() + 4, ttl_opt.data() + ttl_opt.size(), ttl); + if (ptr != ttl_opt.data() + ttl_opt.size() || ec != std::errc{}) + return usage("Invalid time-to-live option: {}"_format(ttl_opt)); + } + + if (file_i >= argc) + return usage("No upload files given!"); + + std::vector> uploads; + for (; file_i < argc; file_i++) { + std::filesystem::path f{argv[file_i]}; + if (!std::filesystem::exists(f) || !std::filesystem::is_regular_file(f)) + return usage("{} does not exist or is not a file"_format(f)); + auto s = std::filesystem::file_size(f); + if (s == 0) + return usage("Cannot upload empty file {}"_format(f)); + uploads.emplace_back(std::move(f), s, -1); + } + + quic::Loop loop; + auto ep = quic::Endpoint::endpoint(loop, quic::Address{}); + + auto conn = ep->connect( + quic::RemoteAddress{oxenc::from_hex(pubkey), addr}, + quic::opt::outbound_alpn("quic-files")); + + // We must always open this stream first (even if we aren't going to use it); this is the stream + // on which we can make simple requests; all later streams are for file transfers. + auto bt_str = conn->open_stream(); + + // stream -> [data, uploads_index] + std::unordered_map, std::pair> streams; + + std::atomic remaining{uploads.size()}; + auto on_stream_close = [&](quic::Stream& s, uint64_t err) { + --remaining; + remaining.notify_one(); + auto str = s.shared_from_this(); + auto sid = str->stream_id(); + auto it = streams.find(str); + if (it == streams.end()) { + log::error(cat, "Unknown stream {} closed with error {}!", sid, err); + return; + } + const auto& [data, upload_id] = it->second; + auto& [f, size, fd] = uploads[upload_id]; + if (err != 0) { + log::error(cat, "Upload {} stream (id={}) closed with error {}", f, sid, err); + return; + } + + if (data.empty()) { + log::error( + cat, + "Upload {} stream (id={}) closed without error, but with no response data!", + f, + sid); + return; + } + + try { + oxenc::bt_dict_consumer resp{data}; + std::string id = resp.require("#"); + std::chrono::sys_seconds uploaded{std::chrono::seconds{resp.require("u")}}; + std::chrono::sys_seconds expiry{std::chrono::seconds{resp.require("x")}}; + + log::info( + cat, + "Uploaded {} successfully (on stream {}):\n id: {}\nuploaded: {}\n " + "expiry: {}", + f, + sid, + id, + uploaded, + expiry); + } catch (const std::exception& e) { + log::warning( + cat, "Failed to parse stream {} ({}) upload response: {}", sid, f, e.what()); + } + }; + auto on_stream_data = [&](quic::Stream& s, std::span data) { + auto sid = s.stream_id(); + auto str = s.shared_from_this(); + auto it = streams.find(str); + if (it == streams.end()) { + log::error(cat, "Received data on unknown stream {}!", sid); + return; + } + auto& [d, upload_id] = it->second; + d += std::string_view{reinterpret_cast(data.data()), data.size()}; + }; + + loop.call([&] { + for (size_t i = 0; i < uploads.size(); i++) { + auto& [f, size, fd] = uploads[i]; + fd = open(f.c_str(), O_RDONLY); + if (fd < 0) { + log::error(cat, "Failed to open {}: {}", f, strerror(errno)); + --remaining; + continue; + } + auto str = conn->open_stream(on_stream_data, on_stream_close); + auto next_chunk = [f, fd, str, f_remaining = size](const quic::Stream&) mutable { + std::vector chunk; + if (f_remaining == 0) + return chunk; + chunk.resize(std::min(f_remaining, 65'536)); + int offset = 0; + while (true) { + auto r = read(fd, chunk.data() + offset, chunk.size() - offset); + if (r == -1) { + log::error(cat, "Error while reading from {}: {}", f, strerror(errno)); + str->close(1480); + chunk.clear(); + return chunk; + } + assert(offset + r <= chunk.size()); + if (offset + r >= chunk.size()) + break; + if (r == 0) { + log::error( + cat, + "Error reading from {}: hit EOF too soon (file truncated while " + "reading?)", + f); + str->close(1480); + chunk.clear(); + return chunk; + } + + offset += r; + } + f_remaining -= chunk.size(); + return chunk; + }; + + auto& [s, upload_i] = streams[str]; + upload_i = i; + + oxenc::bt_dict_producer info; + info.append("!", "PUT"); + info.append("s", size); + if (ttl > 0) + info.append("t", ttl); + str->send("{}:{}"_format(info.view().size(), info.view())); + str->send_chunks( + next_chunk, [](quic::Stream& s) { s.send_fin(); }, 100); + } + }); + + size_t r = remaining.load(); + while (r > 0) { + remaining.wait(r); + r = loop.call_get([&] { return remaining.load(); }); + } +} From 2a603abf5435d3afd7beb615030731ddf76bd460 Mon Sep 17 00:00:00 2001 From: Jason Rhinelander Date: Fri, 10 Oct 2025 23:11:02 -0300 Subject: [PATCH 2/2] Implement file_info and file_extend --- quic/CMakeLists.txt | 8 +- quic/common.hpp | 43 ++++++++ quic/get.cpp | 36 ++----- quic/requests.cpp | 217 ++++++++++++++++++++++++++++++++------- quic/requests.hpp | 30 +++++- quic/test/test-query.cpp | 76 ++++++++++++++ 6 files changed, 338 insertions(+), 72 deletions(-) create mode 100644 quic/test/test-query.cpp diff --git a/quic/CMakeLists.txt b/quic/CMakeLists.txt index 43afd32..1f2bb4c 100644 --- a/quic/CMakeLists.txt +++ b/quic/CMakeLists.txt @@ -64,8 +64,8 @@ target_link_libraries(quic-files option(BUILD_TESTS "Build upload/download test programs" ON) if(BUILD_TESTS) - add_executable(quic-upload test/test-upload.cpp) - target_link_libraries(quic-upload PRIVATE quic-files-common) - add_executable(quic-download test/test-download.cpp) - target_link_libraries(quic-download PRIVATE quic-files-common) + foreach(x IN ITEMS upload download query) + add_executable(quic-${x} test/test-${x}.cpp) + target_link_libraries(quic-${x} PRIVATE quic-files-common) + endforeach() endif() diff --git a/quic/common.hpp b/quic/common.hpp index fb9a04b..6af37a4 100644 --- a/quic/common.hpp +++ b/quic/common.hpp @@ -8,6 +8,7 @@ #include #include +#include #include #include #include @@ -62,6 +63,48 @@ inline std::filesystem::path id_to_path(std::string_view fileid) { return p; } +struct file_db_info { + std::chrono::sys_seconds uploaded; + std::chrono::sys_seconds expiry; + std::filesystem::path path; +}; + +inline std::optional db_lookup(pqxx::connection& conn, std::string_view fileid) { + double upl, exp; + bool found = false; + try { + pg_retryable([&] { + pqxx::work tx{conn}; + auto row = tx.exec(R"( +SELECT EXTRACT(EPOCH FROM uploaded), EXTRACT(EPOCH FROM expiry) +FROM files WHERE id = $1)", + pqxx::params{fileid}) + .opt_row(); + + if (row) { + found = true; + std::tie(upl, exp) = row->as(); + } + + tx.commit(); + }); + } catch (const pqxx::failure& e) { + log::error(log::Cat("files.db"), "Failed to query files table: {}", e.what()); + } + + std::optional result; + if (found) { + std::chrono::sys_seconds expiry{std::chrono::seconds{static_cast(exp)}}; + if (expiry >= std::chrono::system_clock::now()) { + auto& r = result.emplace(); + r.expiry = expiry; + r.uploaded = std::chrono::sys_seconds{std::chrono::seconds{static_cast(upl)}}; + r.path = id_to_path(fileid); + } + } + return result; +} + // Simple class that "explodes" (by calling a callback) if not "disarmed" before being // destructed. Used to queue cleanup during partial construction. class bomb { diff --git a/quic/get.cpp b/quic/get.cpp index 4de281e..ede1ace 100644 --- a/quic/get.cpp +++ b/quic/get.cpp @@ -24,37 +24,15 @@ void FileStream::parse_get(oxenc::bt_dict_consumer&& d) { FileStream::get_req::get_req(FileStream& str, std::string id) : file_req{str} { fileid = std::move(id); - double upl, exp; - bool found = false; - try { - pg_retryable([&] { - pqxx::work tx{str.handler.pg_conn}; - auto row = tx.exec(R"( -SELECT EXTRACT(EPOCH FROM uploaded), EXTRACT(EPOCH FROM expiry) -FROM files WHERE id = $1)", - pqxx::params{fileid}) - .opt_row(); - - if (row) { - found = true; - std::tie(upl, exp) = row->as(); - } + auto info = db_lookup(str.handler.pg_conn, id); - tx.commit(); - }); - } catch (const pqxx::failure& e) { - log::error(logcat, "Failed to query files table: {}", e.what()); + if (info) { + expiry = info->expiry; + uploaded = info->uploaded; + filepath = info->path; } - if (found) { - expiry = std::chrono::sys_seconds{std::chrono::seconds{static_cast(exp)}}; - uploaded = std::chrono::sys_seconds{std::chrono::seconds{static_cast(upl)}}; - - if (expiry < std::chrono::system_clock::now()) - found = false; - } - - if (!found) { + if (!info) { if (log::get_level(accesslog) >= log::Level::info) { if (auto conn = str.get_conn()) log::info(accesslog, "GET {} NOT FOUND ({})", fileid, conn->remote()); @@ -64,8 +42,6 @@ FROM files WHERE id = $1)", str.close(STREAM_ERROR::not_found); return; } - - filepath = id_to_path(fileid); } void FileStream::get_req::close() { diff --git a/quic/requests.cpp b/quic/requests.cpp index 790ba8f..a3e11d1 100644 --- a/quic/requests.cpp +++ b/quic/requests.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include @@ -20,18 +21,10 @@ namespace sfs { -bool back_compat_ids = false; -int max_ttl = 14 * 24 * 60 * 60; - using namespace oxen; using namespace log::literals; using namespace std::literals; -void handle_file_info(quic::message m) {} -void handle_file_extend(quic::message m) {} -void handle_session_version(quic::message m) {} -void handle_token_info(quic::message m) {} - static auto logcat = log::Cat("files"); static auto accesslog = log::Cat("access"); @@ -330,28 +323,50 @@ void ReqHandler::process_cqes() { io_uring_cqe* cqe; io_uring_for_each_cqe(&iou, head, cqe) { ++count; - auto fsid = io_uring_cqe_get_data64(cqe); - FileStream* sptr = nullptr; - if (auto it = streams.find(fsid); it != streams.end()) - sptr = it->second; - if (!sptr) { - log::debug(logcat, "Ignoring CQE on dead stream (fsid={})", fsid); - continue; - } - auto& str = *sptr; - try { - std::visit( - [cqe](R& req) { - if constexpr (!std::same_as) - req.handle_cqe(cqe); - }, - str.request); - } catch (const std::exception& e) { - log::warning( - logcat, - "Exception during I/O response processing ({}); closing stream", - e.what()); - str.close(STREAM_ERROR::io_error); + auto id = io_uring_cqe_get_data64(cqe); + if (id < REQ_CQE_BASE_ID) { + FileStream* sptr = nullptr; + if (auto it = streams.find(id); it != streams.end()) + sptr = it->second; + if (!sptr) { + log::debug(logcat, "Ignoring CQE on dead stream (fsid={})", id); + continue; + } + auto& str = *sptr; + try { + std::visit( + [cqe](R& req) { + if constexpr (!std::same_as) + req.handle_cqe(cqe); + }, + str.request); + } catch (const std::exception& e) { + log::warning( + logcat, + "Exception during I/O response processing ({}); closing stream", + e.what()); + str.close(STREAM_ERROR::io_error); + } + } else { + if (auto it = _req_cqe_handlers.find(id); it != _req_cqe_handlers.end()) { + auto h = std::move(it->second); + _req_cqe_handlers.erase(it); + try { + h(cqe); + } catch (const std::exception& e) { + log::warning( + logcat, + "Uncaught exception during request CQE handling: {}; request will not " + "be replied to!", + e.what()); + } + } else { + log::error( + logcat, + "Received request cqe with unknown or already handled ID {}, this " + "shouldn't happen!", + id); + } } } io_uring_cq_advance(&iou, count); @@ -372,15 +387,147 @@ std::shared_ptr ReqHandler::make_stream( "file_info", [this](quic::message m) { handle_file_info(std::move(m)); }); s->register_handler( "file_extend", [this](quic::message m) { handle_file_extend(std::move(m)); }); - s->register_handler("session_version", [this](quic::message m) { - handle_session_version(std::move(m)); - }); - s->register_handler( - "token_info", [this](quic::message m) { handle_token_info(std::move(m)); }); + // s->register_handler("session_version", [this](quic::message m) { + // handle_session_version(std::move(m)); + // }); + // s->register_handler( + // "token_info", [this](quic::message m) { handle_token_info(std::move(m)); + // }); return std::move(s); } return e.loop.make_shared(conn, e, *this); } +static void send_error(const quic::message& m, std::string_view err, bool json) { + m.respond(json ? R"({{"error":"{}"}})"_format(err) : err, true); +} + +void ReqHandler::handle_file_info(quic::message m) { + std::string id; + bool json = false; + try { + auto body = m.body(); + log::critical(logcat, "BODY: {}", body); + if (body.starts_with("d")) { + oxenc::bt_dict_consumer r{body}; + id = r.require("#"); + r.finish(); + } else { + nlohmann::json::parse(body).at("id").get_to(id); + json = true; + } + } catch (const std::exception& e) { + m.respond(req_error::BAD_REQUEST, true); + return; + } + + auto info = db_lookup(pg_conn, id); + if (!info) { + send_error(m, req_error::NOT_FOUND, json); + return; + } + + auto statxbuf = std::make_shared(); + auto* sqe = io_uring_get_sqe(&iou); + auto rid = _next_req_cqeid++; + io_uring_sqe_set_data64(sqe, rid); + io_uring_sqe_set_flags(sqe, 0); + io_uring_prep_statx(sqe, files_dir_fd, info->path.c_str(), 0, STATX_SIZE, statxbuf.get()); + io_uring_submit(&iou); + + _req_cqe_handlers[rid] = [id = std::move(id), + info = std::move(*info), + statxbuf = std::move(statxbuf), + m = std::move(m), + json](io_uring_cqe* cqe) { + if (cqe->res < 0) { + log::error( + logcat, + "Failed to stat {}: {}; returning NOT_FOUND to caller", + info.path, + strerror(-cqe->res)); + return send_error(m, req_error::NOT_FOUND, json); + } else if (json) { + m.respond(nlohmann::json{ + {"id", id}, + {"size", statxbuf->stx_size}, + {"uploaded", info.uploaded.time_since_epoch().count()}, + {"expires", info.expiry.time_since_epoch().count()}} + .dump()); + } else { + oxenc::bt_dict_producer res; + res.append("#", id); + res.append("s", statxbuf->stx_size); + res.append("u", info.uploaded.time_since_epoch().count()); + res.append("x", info.expiry.time_since_epoch().count()); + m.respond(std::move(res).str()); + } + }; +} + +void ReqHandler::handle_file_extend(quic::message m) { + std::string id; + std::optional ttl; + bool json = false; + try { + auto body = m.body(); + if (body.starts_with("d")) { + oxenc::bt_dict_consumer r{body}; + id = r.require("#"); + ttl = r.maybe("t"); + r.finish(); + } else { + auto r = nlohmann::json::parse(body); + r.at("id").get_to(id); + r.at("ttl").get_to(ttl); + json = true; + } + } catch (const std::exception& e) { + m.respond(req_error::BAD_REQUEST, true); + return; + } + + std::string db_ttl = + "{} seconds"_format(std::clamp(ttl.value_or(max_ttl.count()), 1, max_ttl.count())); + std::chrono::sys_seconds uploaded, expiry; + try { + pg_retryable([&] { + pqxx::work tx{pg_conn}; + + auto result = tx.exec(R"( +UPDATE files SET expiry = GREATEST(expiry, NOW() + $2) +WHERE id = $1 +RETURNING EXTRACT(EPOCH FROM uploaded), EXTRACT(EPOCH FROM expiry))", + pqxx::params{id, db_ttl}) + .opt_row(); + if (result) { + auto [upl, exp] = result->as(); + uploaded = + std::chrono::sys_seconds{std::chrono::seconds{static_cast(upl)}}; + expiry = std::chrono::sys_seconds{std::chrono::seconds{static_cast(exp)}}; + } + tx.commit(); + }); + } catch (const pqxx::failure& e) { + log::error(logcat, "Failed to update DB with updated expiry for {}: {}", id, e.what()); + send_error(m, req_error::INTERNAL_ERROR, true); + return; + } + + if (json) { + m.respond(nlohmann::json{ + {"id", id}, + {"uploaded", uploaded.time_since_epoch().count()}, + {"expires", expiry.time_since_epoch().count()}} + .dump()); + } else { + oxenc::bt_dict_producer res; + res.append("#", id); + res.append("u", uploaded.time_since_epoch().count()); + res.append("x", uploaded.time_since_epoch().count()); + m.respond(std::move(res).str()); + } +} + } // namespace sfs diff --git a/quic/requests.hpp b/quic/requests.hpp index 90df488..ee4d60f 100644 --- a/quic/requests.hpp +++ b/quic/requests.hpp @@ -14,6 +14,10 @@ #include #include +namespace oxen::quic { +struct message; +} + namespace sfs { namespace quic = oxen::quic; @@ -35,6 +39,16 @@ enum class STREAM_ERROR : uint64_t { io_error = 480, }; +// BTRequestStream request error strings: +namespace req_error { + // BAD_REQUEST means you sent something the server didn't understand: + static constexpr auto BAD_REQUEST = "BAD_REQUEST"sv; + // The server encountered some sort of internal error and cannot complete the request: + static constexpr auto INTERNAL_ERROR = "INTERNAL_ERROR"sv; + // The file to modify or query does not exist: + static constexpr auto NOT_FOUND = "NOT_FOUND"sv; +} + class FileStream; struct req_stats { @@ -84,10 +98,20 @@ class ReqHandler { uint64_t _next_fsid = 1; std::unordered_map streams; + uint64_t REQ_CQE_BASE_ID = 1ULL << 63; + + uint64_t _next_req_cqeid = REQ_CQE_BASE_ID; + std::unordered_map> _req_cqe_handlers; + void process_cqes(); friend class FileStream; + void handle_file_info(quic::message m); + void handle_file_extend(quic::message m); + void handle_session_version(quic::message m); + void handle_token_info(quic::message m); + public: ReqHandler( quic::Address listen, @@ -210,7 +234,7 @@ class FileStream : public quic::Stream { class get_req : public file_req { // Per-stream readahead; if we drop below this of unsent data then we queue additional reads // until we have unsent data on the stream >= this value (or hit EOF). - static constexpr int64_t READAHEAD = 512*1024; + static constexpr int64_t READAHEAD = 512 * 1024; // current io_uring state: // - IO_STATE::none means there is no pending io_uring request @@ -224,10 +248,10 @@ class FileStream : public quic::Stream { statx = -1, opening = 0, closing_done = -2, - reading = 1, // including all higher underlying values + reading = 1, // including all higher underlying values }; - struct statx statxbuf{}; + struct statx statxbuf {}; IO_STATE io_state = IO_STATE::none; diff --git a/quic/test/test-query.cpp b/quic/test/test-query.cpp new file mode 100644 index 0000000..322e1b0 --- /dev/null +++ b/quic/test/test-query.cpp @@ -0,0 +1,76 @@ +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +using namespace std::literals; +using namespace oxen; +using namespace log::literals; + +static auto cat = log::Cat("query"); + +int main(int argc, char* argv[]) { + + log::add_sink(log::Type::Print, "stderr"); + log::reset_level(log::Level::warn); + log::set_level(cat, log::Level::info); + + auto usage = [&argv](std::string_view msg) { + if (!msg.empty()) + log::error(cat, "{}", msg); + log::info( + cat, + "\n\nUsage: {} IP:PORT PUBKEY ENDPOINT BODY\n\nSend a body to the fileserver, shows the response.\n\n", + argv[0]); + return 1; + }; + + if (argc < 5) + return usage("Insufficient arguments"); + + quic::Address addr; + try { + addr = quic::Address::parse(argv[1]); + } catch (const std::exception& e) { + return usage("{} doesn't look like an IP:PORT address ({})"_format(argv[2], e.what())); + } + + std::string_view pubkey{argv[2]}; + if (pubkey.size() != 64 || !oxenc::is_hex(pubkey)) + return usage("{} is not a valid pubkey"_format(pubkey)); + + quic::Loop loop; + auto ep = quic::Endpoint::endpoint(loop, quic::Address{}); + + auto conn = ep->connect( + quic::RemoteAddress{oxenc::from_hex(pubkey), addr}, + quic::opt::outbound_alpn("quic-files")); + + auto btr = conn->open_stream(); + + std::promise done_prom; + auto done = done_prom.get_future(); + btr->command(std::string{argv[3]}, std::string_view{argv[4]}, + [&done_prom](quic::message m) { + if (m.timed_out) + log::error(cat, "Request timed out!"); + else if (m.is_error()) + log::error(cat, "Request returned an error: {}", m.body()); + else { + log::info(cat, "Request returned a body:\n"); + fmt::print("{}", m.body()); + } + done_prom.set_value(); + }); + + done.wait(); +}