diff --git a/.gitignore b/.gitignore index de1895d..a2464a6 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,6 @@ __pycache__ /key_x25519 .vscode .venv +/quic/build* +/quic/compile_commands.json +/quic/.cache diff --git a/.gitmodules b/.gitmodules index e69de29..28810ca 100644 --- a/.gitmodules +++ b/.gitmodules @@ -0,0 +1,9 @@ +[submodule "quic/oxen-libquic"] + path = quic/oxen-libquic + url = https://github.com/oxen-io/oxen-libquic.git +[submodule "quic/libpqxx"] + path = quic/libpqxx + url = https://github.com/jtv/libpqxx.git +[submodule "quic/CLI11"] + path = quic/CLI11 + url = https://github.com/CLIUtils/CLI11.git diff --git a/quic/CLI11 b/quic/CLI11 new file mode 160000 index 0000000..5248d5b --- /dev/null +++ b/quic/CLI11 @@ -0,0 +1 @@ +Subproject commit 5248d5b35271a76abf9b27639fc3ae32458e9dff diff --git a/quic/CMakeLists.txt b/quic/CMakeLists.txt new file mode 100644 index 0000000..1f2bb4c --- /dev/null +++ b/quic/CMakeLists.txt @@ -0,0 +1,71 @@ +cmake_minimum_required(VERSION 3.20...3.31) + +set(CMAKE_EXPORT_COMPILE_COMMANDS ON) + +find_program(CCACHE_PROGRAM ccache) +if(CCACHE_PROGRAM) + foreach(lang C CXX) + if(NOT DEFINED CMAKE_${lang}_COMPILER_LAUNCHER AND NOT CMAKE_${lang}_COMPILER MATCHES ".*/ccache") + message(STATUS "Enabling ccache for ${lang}") + set(CMAKE_${lang}_COMPILER_LAUNCHER ${CCACHE_PROGRAM} CACHE STRING "") + endif() + endforeach() +endif() + +project(quic-files + VERSION 0.0.1 + LANGUAGES CXX) + +set(CMAKE_CXX_STANDARD 20) +set(CMAKE_CXX_STANDARD_REQUIRED TRUE) +set(CMAKE_CXX_EXTENSIONS FALSE) + +include(cmake/submodule_check.cmake) +include(cmake/system_or_submodule.cmake) + +find_package(PkgConfig REQUIRED) + +system_or_submodule(OXENC oxenc oxenc::oxenc liboxenc>=1.4.0 oxen-libquic/external/oxen-encoding) +system_or_submodule(OXENLOGGING oxen-logging oxen::logging liboxen-logging>=1.2.0 oxen-libquic/external/oxen-logging) +system_or_submodule(FMT fmt fmt::fmt fmt>=9 oxen-libquic/external/oxen-logging/fmt) +system_or_submodule(LIBPQXX libpqxx libpqxx::pqxx libpqxx>=7.10.0 libpqxx) +system_or_submodule(CLI11 CLI11 CLI11::CLI11 CLI11>=2.3.0 CLI11) +system_or_submodule(OXENQUIC quic oxen::quic liboxenquic>=1.6 oxen-libquic) + +pkg_check_modules(NLOHMANN_JSON IMPORTED_TARGET REQUIRED nlohmann_json>=3.7.0) +pkg_check_modules(SODIUM IMPORTED_TARGET REQUIRED libsodium>=1.0.17) +pkg_check_modules(LIBURING IMPORTED_TARGET REQUIRED liburing>=2.3) + +add_executable(quic-files + quic-files.cpp + requests.cpp + put.cpp + get.cpp + cleanup.cpp +) + +add_library(quic-files-common INTERFACE) +target_link_libraries(quic-files-common INTERFACE + oxenc::oxenc + oxen::logging + oxen::quic + fmt::fmt +) + +target_link_libraries(quic-files + PRIVATE + quic-files-common + PkgConfig::NLOHMANN_JSON + PkgConfig::SODIUM + libpqxx::pqxx + CLI11::CLI11 + PkgConfig::LIBURING +) + +option(BUILD_TESTS "Build upload/download test programs" ON) +if(BUILD_TESTS) + foreach(x IN ITEMS upload download query) + add_executable(quic-${x} test/test-${x}.cpp) + target_link_libraries(quic-${x} PRIVATE quic-files-common) + endforeach() +endif() diff --git a/quic/cleanup.cpp b/quic/cleanup.cpp new file mode 100644 index 0000000..a0e9024 --- /dev/null +++ b/quic/cleanup.cpp @@ -0,0 +1,139 @@ +#include "cleanup.hpp" + +#include + +#include +#include +#include + +#include "common.hpp" // IWYU pragma: keep + +namespace sfs { + +auto logcat = log::Cat("files.cleanup"); + +namespace { + class Cleaner { + io_uring iou; + int files_dir_fd; + pqxx::connection conn; + std::future stop; + + public: + Cleaner(const std::filesystem::path& files_dir, + std::string pgsql_uri, + std::future stop) : + conn{pgsql_uri}, stop{std::move(stop)} { + std::filesystem::create_directories(files_dir); + files_dir_fd = open(files_dir.c_str(), O_PATH | O_DIRECTORY); + if (files_dir_fd < 0) + throw std::runtime_error{ + "Unable to open files path {}: {}"_format(files_dir, strerror(errno))}; + + if (int err = io_uring_queue_init(128, &iou, IORING_SETUP_SINGLE_ISSUER); err != 0) { + close(files_dir_fd); + throw std::runtime_error{ + "Failed to initialize io_uring queue: {}"_format(strerror(-err))}; + } + } + + void cleanup() { + pg_retryable([this] { + pqxx::work tx{conn}; + + auto started = std::chrono::steady_clock::now(); + std::vector removed; + unsigned submitted = 0; + for (auto r : tx.exec("DELETE FROM files WHERE expiry <= NOW() RETURNING id")) { + removed.push_back(id_to_path(r[0].as())); + auto& filepath = removed.back(); + + auto* sqe = io_uring_get_sqe(&iou); + if (!sqe) { + io_uring_submit(&iou); + sqe = io_uring_get_sqe(&iou); + } + + io_uring_sqe_set_flags(sqe, 0); + io_uring_sqe_set_data64(sqe, removed.size() - 1); + io_uring_prep_unlinkat(sqe, files_dir_fd, filepath.c_str(), 0); + submitted++; + } + io_uring_submit(&iou); + + bool success = true; + + while (submitted > 0) { + unsigned nr = std::min(iou.cq.ring_entries, submitted); + struct io_uring_cqe* cqe; + io_uring_wait_cqe_nr(&iou, &cqe, nr); + + unsigned head; + io_uring_for_each_cqe(&iou, head, cqe) { + const auto& id = removed[io_uring_cqe_get_data64(cqe)]; + if (cqe->res < 0) { + if (cqe->res == -ENOENT) + log::debug(logcat, "Failed to remove {}: file already gone", id); + else { + log::warning( + logcat, "Failed to remove {}: {}", id, strerror(-cqe->res)); + success = false; + } + } else { + log::debug(logcat, "Removed expired file {}", id); + } + } + + io_uring_cq_advance(&iou, nr); + + submitted -= nr; + } + + if (success) { + log::info( + logcat, + "Deleted {} expired files in {}", + removed.size(), + std::chrono::steady_clock::now() - started); + tx.commit(); + } else + tx.abort(); + }); + } + + bool wait() { return stop.wait_for(30s) == std::future_status::timeout; } + }; +} // namespace + +std::thread start_cleanup_thread( + const std::string& pgsql_uri, + const std::filesystem::path& files_dir, + std::future stop) { + + std::promise started_prom; + auto started = started_prom.get_future(); + std::thread th{[&started_prom, &files_dir, &pgsql_uri, &stop] { + std::optional cleaner; + try { + cleaner.emplace(files_dir, std::move(pgsql_uri), std::move(stop)); + } catch (...) { + started_prom.set_exception(std::current_exception()); + } + + started_prom.set_value(); + + do { + try { + cleaner->cleanup(); + } catch (const std::exception& e) { + log::error(logcat, "Cleanup failed: {}", e.what()); + } + } while (cleaner->wait()); + }}; + + started.get(); + + return th; +} + +} // namespace sfs diff --git a/quic/cleanup.hpp b/quic/cleanup.hpp new file mode 100644 index 0000000..91831db --- /dev/null +++ b/quic/cleanup.hpp @@ -0,0 +1,15 @@ +#pragma once + +#include +#include +#include +#include + +namespace sfs { + +std::thread start_cleanup_thread( + const std::string& pgsql_uri, + const std::filesystem::path& files_dir, + std::future stop); + +} diff --git a/quic/cmake/submodule_check.cmake b/quic/cmake/submodule_check.cmake new file mode 100644 index 0000000..422f892 --- /dev/null +++ b/quic/cmake/submodule_check.cmake @@ -0,0 +1,31 @@ +option(SUBMODULE_CHECK "Enables checking that vendored library submodules are up to date" ON) +if(SUBMODULE_CHECK) + find_package(Git) + if(GIT_FOUND) + function(check_submodule relative_path) + execute_process(COMMAND git rev-parse "HEAD" WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/${relative_path} OUTPUT_VARIABLE localHead) + execute_process(COMMAND git rev-parse "HEAD:quic/${relative_path}" WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} OUTPUT_VARIABLE checkedHead) + string(COMPARE EQUAL "${localHead}" "${checkedHead}" upToDate) + if (upToDate) + message(STATUS "Submodule 'quic/${relative_path}' is up-to-date") + else() + message(FATAL_ERROR "Submodule 'quic/${relative_path}' is not up-to-date. Please update with\ngit submodule update --init --recursive\nor run cmake with -DSUBMODULE_CHECK=OFF") + endif() + + # Extra arguments check nested submodules + foreach(submod ${ARGN}) + execute_process(COMMAND git rev-parse "HEAD" WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/${relative_path}/${submod} OUTPUT_VARIABLE localHead) + execute_process(COMMAND git rev-parse "HEAD:${submod}" WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/${relative_path} OUTPUT_VARIABLE checkedHead) + string(COMPARE EQUAL "${localHead}" "${checkedHead}" upToDate) + if (NOT upToDate) + message(FATAL_ERROR "Nested submodule '${relative_path}/${submod}' is not up-to-date. Please update with\ngit submodule update --init --recursive\nor run cmake with -DSUBMODULE_CHECK=OFF") + endif() + endforeach() + endfunction () + + message(STATUS "Checking submodules") + check_submodule(libpqxx) + check_submodule(oxen-libquic external/oxen-encoding external/oxen-logging) + endif() +endif() + diff --git a/quic/cmake/system_or_submodule.cmake b/quic/cmake/system_or_submodule.cmake new file mode 100644 index 0000000..d87b889 --- /dev/null +++ b/quic/cmake/system_or_submodule.cmake @@ -0,0 +1,23 @@ +macro(system_or_submodule BIGNAME smallname target pkgconf subdir) + if(NOT TARGET ${target}) + option(FORCE_${BIGNAME}_SUBMODULE "force using ${smallname} submodule" OFF) + if(NOT BUILD_STATIC_DEPS AND NOT FORCE_${BIGNAME}_SUBMODULE AND NOT FORCE_ALL_SUBMODULES) + pkg_check_modules(${BIGNAME} ${pkgconf} IMPORTED_TARGET) + endif() + if(${BIGNAME}_FOUND) + add_library(${smallname} INTERFACE) + if(NOT TARGET PkgConfig::${BIGNAME} AND CMAKE_VERSION VERSION_LESS "3.21") + # Work around cmake bug 22180 (PkgConfig::THING not set if no flags needed) + else() + target_link_libraries(${smallname} INTERFACE PkgConfig::${BIGNAME}) + endif() + message(STATUS "Found system ${smallname} ${${BIGNAME}_VERSION}") + else() + message(STATUS "using ${smallname} submodule ${subdir}") + add_subdirectory(${subdir} EXCLUDE_FROM_ALL) + endif() + if(NOT TARGET ${target}) + add_library(${target} ALIAS ${smallname}) + endif() + endif() +endmacro() diff --git a/quic/common.hpp b/quic/common.hpp new file mode 100644 index 0000000..6af37a4 --- /dev/null +++ b/quic/common.hpp @@ -0,0 +1,161 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +namespace sfs { + +// NOLINTBEGIN(misc-unused-alias-decls) + +namespace log = oxen::log; + +using namespace std::literals; +using namespace log::literals; + +static constexpr auto b64_url_chars = + "-0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ_abcdefghijklmnopqrstuvwxyz"sv; + +// Wraps a call `c()` in a transaction and executes it. If we get a broken connection exception +// then the call is retried up to n_retries times before giving up and propagating the exception. +template +static void pg_retryable(Call c, const int n_retries = 10) { + int retries = n_retries; + while (retries > 0) { + try { + c(); + break; + } catch (const pqxx::broken_connection& e) { + if (retries--) + log::warning(log::Cat("files.db"), "Lost postgresql connection; retrying query..."); + else { + log::error( + log::Cat("files.db"), + "Postgresql connection still failing after {} retries, giving up", + n_retries); + throw; + } + } + } +} + +inline std::filesystem::path id_to_path(std::string_view fileid) { + std::filesystem::path p; + if (fileid.size() == 44) + p = std::filesystem::path{fileid.substr(0, 2)}; + else + // back compat numeric identifier, goes into 000, 001, ..., 999 based on % 1000 id value + p = std::filesystem::path{ + "{:0>3s}"_format(fileid.substr(fileid.size() < 3 ? 0 : fileid.size() - 3))}; + + p /= std::filesystem::path{fileid}; + return p; +} + +struct file_db_info { + std::chrono::sys_seconds uploaded; + std::chrono::sys_seconds expiry; + std::filesystem::path path; +}; + +inline std::optional db_lookup(pqxx::connection& conn, std::string_view fileid) { + double upl, exp; + bool found = false; + try { + pg_retryable([&] { + pqxx::work tx{conn}; + auto row = tx.exec(R"( +SELECT EXTRACT(EPOCH FROM uploaded), EXTRACT(EPOCH FROM expiry) +FROM files WHERE id = $1)", + pqxx::params{fileid}) + .opt_row(); + + if (row) { + found = true; + std::tie(upl, exp) = row->as(); + } + + tx.commit(); + }); + } catch (const pqxx::failure& e) { + log::error(log::Cat("files.db"), "Failed to query files table: {}", e.what()); + } + + std::optional result; + if (found) { + std::chrono::sys_seconds expiry{std::chrono::seconds{static_cast(exp)}}; + if (expiry >= std::chrono::system_clock::now()) { + auto& r = result.emplace(); + r.expiry = expiry; + r.uploaded = std::chrono::sys_seconds{std::chrono::seconds{static_cast(upl)}}; + r.path = id_to_path(fileid); + } + } + return result; +} + +// Simple class that "explodes" (by calling a callback) if not "disarmed" before being +// destructed. Used to queue cleanup during partial construction. +class bomb { + public: + std::function explode; + + void disarm() { explode = nullptr; } + + ~bomb() { + if (explode) + explode(); + } +}; + +inline std::string friendly_duration(std::chrono::nanoseconds dur) { + std::string friendly; + auto append = std::back_inserter(friendly); + bool some = false; + if (dur >= 24h) { + fmt::format_to(append, "{}d", dur / 24h); + dur %= 24h; + some = true; + } + if (dur >= 1h || some) { + fmt::format_to(append, "{}h", dur / 1h); + dur %= 1h; + some = true; + } + if (dur >= 1min || some) { + fmt::format_to(append, "{}m", dur / 1min); + dur %= 1min; + some = true; + } + if (some || dur % 1s == 0ns) { + // If we have >= minutes or its an integer number of seconds then don't bother with + // fractional seconds + fmt::format_to(append, "{}s", dur / 1s); + } else { + double seconds = std::chrono::duration(dur).count(); + if (dur >= 1s) + fmt::format_to(append, "{:.3f}s", seconds); + else if (dur >= 1ms) + fmt::format_to(append, "{:.3f}ms", seconds * 1000); + else if (dur >= 1us) + fmt::format_to(append, "{:.3f}µs", seconds * 1'000'000); + else + fmt::format_to(append, "{:.0f}ns", seconds * 1'000'000'000); + } + return friendly; +} + +// NOLINTEND(misc-unused-alias-decls) + +} // namespace sfs diff --git a/quic/get.cpp b/quic/get.cpp new file mode 100644 index 0000000..ede1ace --- /dev/null +++ b/quic/get.cpp @@ -0,0 +1,221 @@ +#include +#include + +#include "common.hpp" // IWYU pragma: keep +#include "requests.hpp" + +namespace sfs { + +static auto accesslog = log::Cat("access"); + +static auto logcat = log::Cat("files.get"); + +void FileStream::parse_get(oxenc::bt_dict_consumer&& d) { + auto id = d.require("#"); + if (!((id.size() == 44 && id.find_first_not_of(b64_url_chars) == std::string::npos) || + (id.size() >= 1 && id.size() <= 16 && + id.find_first_not_of("0123456789") == std::string::npos))) + throw std::runtime_error{"invalid id: expected 44 b64url or 1-16 decimal digits"}; + d.finish(); + + request.emplace(*this, std::move(id)); +} + +FileStream::get_req::get_req(FileStream& str, std::string id) : file_req{str} { + fileid = std::move(id); + + auto info = db_lookup(str.handler.pg_conn, id); + + if (info) { + expiry = info->expiry; + uploaded = info->uploaded; + filepath = info->path; + } + + if (!info) { + if (log::get_level(accesslog) >= log::Level::info) { + if (auto conn = str.get_conn()) + log::info(accesslog, "GET {} NOT FOUND ({})", fileid, conn->remote()); + else + log::info(accesslog, "GET {} NOT FOUND ()", fileid); + } + str.close(STREAM_ERROR::not_found); + return; + } +} + +void FileStream::get_req::close() { + if (io_state != IO_STATE::none) { + auto* sqe = io_uring_get_sqe(&str.handler.iou); + io_uring_sqe_set_data64(sqe, 0); + io_uring_sqe_set_flags(sqe, IOSQE_CQE_SKIP_SUCCESS); + io_uring_prep_cancel64(sqe, str.fsid, 0); + io_uring_submit(&str.handler.iou); + } + + if (fd >= 0) { + auto* sqe = io_uring_get_sqe(&str.handler.iou); + io_uring_sqe_set_data64(sqe, 0); + io_uring_sqe_set_flags(sqe, IOSQE_CQE_SKIP_SUCCESS); + io_uring_prep_close_direct(sqe, fd); + io_uring_submit(&str.handler.iou); + } + + chunks.clear(); + io_state = IO_STATE::none; + fd = -1; +} + +void FileStream::get_req::handle_cqe(io_uring_cqe* cqe) { + assert(io_state != IO_STATE::none); + + if (str.is_closing()) { + log::debug(logcat, "Ignoring CQE on closing stream"); + chunks.clear(); + return; + } + + if (io_state == IO_STATE::statx) { + if (cqe->res < 0) { + log::error( + logcat, + "Failed to stat {}: {}; closing stream with I/O error code", + filepath, + strerror(-cqe->res)); + str.close(STREAM_ERROR::io_error); + return; + } + size = statxbuf.stx_size; + + io_state = IO_STATE::opening; // The open was chained immediately after the statx, so wait + // for it next. + + } else if (io_state == IO_STATE::opening) { + if (cqe->res < 0) { + log::error( + logcat, + "Failed to open {}: {}; closing stream with I/O error code", + filepath, + strerror(-cqe->res)); + str.close(STREAM_ERROR::io_error); + return; + } + fd = cqe->res; + + std::string buf{"XX:"}; // dummy value; we'll edit it to the correct size below + buf.resize(63); + + oxenc::bt_dict_producer meta{buf.data() + 3, buf.data() + buf.size()}; + meta.append("s", size); + meta.append("u", uploaded.time_since_epoch().count()); + meta.append("x", expiry.time_since_epoch().count()); + auto final_size = "{}:"_format(meta.view().size()); + assert(final_size.size() == 3); + std::memcpy(buf.data(), final_size.data(), final_size.size()); + buf.resize(3 + meta.view().size()); + str.send(std::move(buf)); + + io_state = IO_STATE::none; + + } else if (io_state == IO_STATE::reading) { // A read has finished and delivered the data to us + + if (chunks.size() == 1) // If 1 then this was the last outstanding read + io_state = IO_STATE::none; + // Otherwise there are more reads outstanding so leave the state as is + auto buf = std::move(chunks.front()); + chunks.pop_front(); + + if (cqe->res < 0) { + log::error( + logcat, + "Failed to open {}: {}; closing stream with I/O error code", + filepath, + strerror(-cqe->res)); + str.close(STREAM_ERROR::io_error); + close(); + return; + } + if (cqe->res > 0) { + if (cqe->res < static_cast(buf.size())) + buf.resize(cqe->res); + bytes_read += buf.size(); + str.send(std::move(buf)); + if (bytes_read >= size) { + str.send_fin(); + close(); + str.handler.overall.get(size); + } + } else { + eof = true; + close(); + if (bytes_read < size) { + log::error( + logcat, + "Hit EOF reading {} too early (read {}, expected {})", + filepath, + bytes_read, + size); + str.close(STREAM_ERROR::io_error); + return; + } + str.send_fin(); + } + + } else { + assert(!"Unknown state!"); + } + + queue_reads(); +} + +void FileStream::get_req::finalize() { + io_state = IO_STATE::statx; + + auto* sqe = io_uring_get_sqe(&str.handler.iou); + io_uring_sqe_set_data64(sqe, str.fsid); + io_uring_sqe_set_flags(sqe, IOSQE_IO_LINK); + io_uring_prep_statx(sqe, str.handler.files_dir_fd, filepath.c_str(), 0, STATX_SIZE, &statxbuf); + + sqe = io_uring_get_sqe(&str.handler.iou); + io_uring_sqe_set_data64(sqe, str.fsid); + io_uring_sqe_set_flags(sqe, 0); + io_uring_prep_openat_direct( + sqe, + str.handler.files_dir_fd, + filepath.c_str(), + O_RDONLY, + 0644, + IORING_FILE_INDEX_ALLOC); + + io_uring_submit(&str.handler.iou); +} + +void FileStream::get_req::queue_reads() { + if (io_state != IO_STATE::none || eof || bytes_read >= size) + return; + assert(chunks.empty()); + int64_t unsent = str.unsent(); + if (unsent >= READAHEAD) + return; + + // Submit a chain of smaller sequential reads; we do this rather than one big read so that we + // can hand earlier data off to our quic stream without having waiting for all data to be + // available. + int chunks_to_read = + (std::min(READAHEAD - unsent, size - bytes_read) + CHUNK_SIZE - 1) / + CHUNK_SIZE; + chunks.resize(chunks_to_read); + for (int i = 0; i < chunks_to_read; i++) { + auto& c = chunks[i]; + c.resize(CHUNK_SIZE); + auto* sqe = io_uring_get_sqe(&str.handler.iou); + io_uring_sqe_set_data64(sqe, str.fsid); + io_uring_sqe_set_flags( + sqe, IOSQE_FIXED_FILE | (i == chunks_to_read - 1 ? 0 : IOSQE_IO_LINK)); + io_uring_prep_read(sqe, fd, c.data(), CHUNK_SIZE, -1); + } + io_uring_submit(&str.handler.iou); + io_state = IO_STATE::reading; +} + +} // namespace sfs diff --git a/quic/libpqxx b/quic/libpqxx new file mode 160000 index 0000000..8e6fab3 --- /dev/null +++ b/quic/libpqxx @@ -0,0 +1 @@ +Subproject commit 8e6fab35069eee92ce6729f8a4f775df31342b75 diff --git a/quic/oxen-libquic b/quic/oxen-libquic new file mode 160000 index 0000000..afeaadd --- /dev/null +++ b/quic/oxen-libquic @@ -0,0 +1 @@ +Subproject commit afeaadd8c1ace940bd03a7d7c4d37c9e5db5dc6c diff --git a/quic/put.cpp b/quic/put.cpp new file mode 100644 index 0000000..e4c6bfe --- /dev/null +++ b/quic/put.cpp @@ -0,0 +1,462 @@ +#include + +#include "common.hpp" // IWYU pragma: keep +#include "requests.hpp" + +namespace sfs { + +static auto logcat = log::Cat("files.put"); +static auto accesslog = log::Cat("access"); + +constexpr auto FILE_ID_HASH_KEY = "SessionFileSvr\0\0"sv; + +FileStream::put_req::put_req(FileStream& s, size_t size_, std::optional ttl_) : + file_req{s}, ttl{std::move(ttl_)} { + + size = size_; + + crypto_generichash_blake2b_init( + &b2b, + reinterpret_cast(FILE_ID_HASH_KEY.data()), + FILE_ID_HASH_KEY.size(), + 33); +} + +void FileStream::put_req::append(std::span data) { + + if (got_all) { + log::critical(logcat, "Internal error: append called *after* finalize()!"); + str.close(STREAM_ERROR::too_much_data); + } + + if (received + static_cast(data.size()) > size) { + auto conn = str.get_conn(); + log::warning( + logcat, + "PUT request from {} exceeded declared size (declared {}, received {}); closing " + "stream with error", + conn ? conn->remote().to_string() : "", + size, + received + data.size()); + str.close(STREAM_ERROR::too_much_data); + return; + } + + if (fd == -1 && io_state == IO_STATE::none) { + auto now = std::chrono::system_clock::now().time_since_epoch().count(); + uint64_t random; + randombytes_buf(&random, sizeof(random)); + tmp_upload = std::filesystem::path{"upload-{}-{:016x}-{}B"_format(now, random, size)}; + + auto* sqe = io_uring_get_sqe(&str.handler.iou); + io_uring_sqe_set_data64(sqe, str.fsid); + io_state = IO_STATE::opening; + io_uring_prep_openat_direct( + sqe, + str.handler.upload_dir_fd, + tmp_upload.c_str(), + O_CREAT | O_EXCL | O_WRONLY, + 0644, + IORING_FILE_INDEX_ALLOC); + io_uring_submit(&str.handler.iou); + } + + if (!data.empty()) + crypto_generichash_blake2b_update( + &b2b, reinterpret_cast(data.data()), data.size()); + + while (!data.empty()) { + if (chunks.empty()) + chunks.emplace_back().reserve(CHUNK_SIZE); + + auto& c = chunks.back(); + auto s = c.size(); + if (s + data.size() <= CHUNK_SIZE) { + c.resize(s + data.size()); + std::memcpy(c.data() + s, data.data(), data.size()); + received += data.size(); + data = {}; + } else if (s < CHUNK_SIZE) { + c.resize(CHUNK_SIZE); + std::memcpy(c.data() + s, data.data(), CHUNK_SIZE - s); + received += CHUNK_SIZE - s; + data = data.subspan(CHUNK_SIZE - s); + } else { + chunks.emplace_back(); + } + } + + send_chunks(); +} + +void FileStream::put_req::send_chunks(std::optional _retry_offset) { + if (io_state != IO_STATE::none) { + // We're already waiting on an open or write, so nothing else to do just yet; we'll queue + // all the writes when it finishes. + assert(!_retry_offset); // An offset should only be given in response to a complete write + return; + } + + // If we get here then the file creation should have happened already: + assert(fd >= 0); + + if (chunks.empty()) { + if (got_all) + // No more chunks and we received the FIN, so time to rename it into place + initiate_rename(); + return; + } + + bool send_last = chunks.back().size() >= CHUNK_SIZE || got_all; + if (chunks.size() == 1 && !send_last && !_retry_offset) + // Nothing to write right now + return; + + int num_chunks = chunks.size() - !send_last; + std::vector iovecs; + auto it = chunks.begin(); + iovecs.resize(num_chunks); + io_write_size = 0; + for (auto& v : iovecs) { + v.iov_base = it->data(); + v.iov_len = it->size(); + io_write_size += v.iov_len; + ++it; + } + if (_retry_offset) { + assert(_retry_offset < chunks[0].size()); + iovecs[0].iov_base = chunks[0].data() + *_retry_offset; + iovecs[0].iov_len -= *_retry_offset; + io_write_size -= *_retry_offset; + io_write_first_offset = *_retry_offset; + } else { + io_write_first_offset = 0; + } + + auto* sqe = io_uring_get_sqe(&str.handler.iou); + assert(sqe); + io_uring_sqe_set_data64(sqe, str.fsid); + io_uring_sqe_set_flags(sqe, IOSQE_FIXED_FILE); + if (iovecs.size() == 1) + io_uring_prep_write(sqe, fd, iovecs[0].iov_base, iovecs[0].iov_len, -1); + else + io_uring_prep_writev(sqe, fd, iovecs.data(), iovecs.size(), -1); + io_uring_submit(&str.handler.iou); + io_state = static_cast(num_chunks); +} + +void FileStream::put_req::handle_cqe(io_uring_cqe* cqe) { + assert(io_state != IO_STATE::none); + + auto state = io_state; + if (state != IO_STATE::closing_done) // closing_done is a "final" state we don't want to reset + io_state = IO_STATE::none; + + if (str.is_closing()) { + log::debug(logcat, "Ignoring CQE on closing stream"); + chunks.clear(); + return; + } + + if (state == IO_STATE::opening) { + if (cqe->res < 0) { + log::error( + logcat, + "Failed to open temp file: {}; closing stream with I/O error code", + strerror(-cqe->res)); + str.close(STREAM_ERROR::io_error); + chunks.clear(); + return; + } + fd = cqe->res; + // Immediately queue any completed chunks we might have accumulated: + send_chunks(); + } else if (state >= IO_STATE::writing) { // writev response for N=`state` buffers + auto n_bufs = static_cast(state); + assert(n_bufs <= chunks.size()); + + auto written = cqe->res; + if (written < 0) { + log::error( + logcat, + "Failed to write upload data: {}; closing stream with I/O error code", + strerror(-written)); + str.close(STREAM_ERROR::io_error); + chunks.clear(); + return; + } + + // writev can return less than requested, in which case we need to retry from wherever the + // write left off. If it's something like being out of disk space, the subsequent write + // will fail with an i/o error. + if (written != io_write_size) { + if (written > io_write_size) + log::critical( + logcat, + "Internal error: write request returned *more* ({}) than we asked to write " + "({})", + written, + io_write_size); + else + log::debug( + logcat, + "Wrote less ({}) than expected ({}); requeuing write", + written, + io_write_size); + + written += io_write_first_offset; + + // For a partial write, head-drop any chunks that have been completely written: + while (written >= static_cast(chunks.front().size())) { + written -= chunks.front().size(); + chunks.pop_front(); + } + + // written is now the offset into the first chunk where we want to retry; calling + // send_chunks with a >= 0 offset will force it to always send at least the first chunk + // (starting at the required offset), plus anything else that might have queued up in + // the meantime. + send_chunks(written); + return; + } + + log::debug(logcat, "Successfully wrote {}B to tempfile #{}", written, fd); + chunks.erase(chunks.begin(), chunks.begin() + n_bufs); + send_chunks(); + } else if (state == IO_STATE::renaming) { + if (cqe->res == -EEXIST || cqe->res == 0) { + if (cqe->res == -EEXIST) + log::debug(logcat, "Rename failed: upload file already exists! Deleting tempfile"); + else + log::debug(logcat, "Tempfile successfully renamed to final location {}", filepath); + } else { + log::warning( + logcat, + "Failed to rename tempfile {} to final location {}: {}", + tmp_upload, + filepath, + strerror(-cqe->res)); + str.close(STREAM_ERROR::io_error); + } + + unlink_and_close(str.fsid); + io_state = IO_STATE::closing_done; + } else if (state == IO_STATE::closing_done) { + if (cqe->res < 0) { + log::warning(logcat, "Failed to close tempfile: {}", strerror(-cqe->res)); + str.close(STREAM_ERROR::io_error); + } + // Respond anyway because the rename succeeded and such a close failure is probably + // something weird or spurious? + insert_and_respond(); + } else { + assert(!"Unknown state!"); + } +} + +void FileStream::put_req::finalize() { + assert(!got_all); + log::debug(logcat, "Stream FIN received"); + if (str.is_closing()) + return; + + assert(received <= size); // other should already have failed in append() + + if (received < size) { + log::warning( + logcat, + "Stream FIN received without receiving all data (received {} of {}); closing " + "stream with error", + received, + size); + str.close(STREAM_ERROR::not_enough_data); + } + log::debug(logcat, "Stream FIN received"); + got_all = true; + + std::array hash; + crypto_generichash_blake2b_final(&b2b, hash.data(), hash.size()); + + fileid = oxenc::to_base64(hash); + // Convert to url-safe b64: + for (auto& c : fileid) { + if (c == '+') + c = '-'; + else if (c == '/') + c = '_'; + } + filepath = "{}/{}"_format(fileid.substr(0, 2), fileid); + + if (str.handler.back_compat_ids) { + std::string try_ttl = "{} seconds"_format( + ttl && *ttl > 0 && *ttl <= str.handler.max_ttl.count() + ? *ttl + : str.handler.max_ttl.count()); + bool success = false; + for (int i = 0; !success && i < 25; i++) { + uint64_t bcid; + randombytes_buf(&bcid, sizeof(bcid)); + // We can't go over 53 bits because nodejs doesn't have integers: + bcid &= 0x1f'ffff'ffff'ffff; + auto try_id = "{}"_format(bcid); + double upl, exp; + try { + pg_retryable([&] { + pqxx::work tx{str.handler.pg_conn}; + std::tie(upl, exp) = tx.exec(R"( +INSERT INTO files (id, expiry) VALUES ($1, NOW() + $2) +RETURNING EXTRACT(EPOCH FROM uploaded), EXTRACT(EPOCH FROM expiry))", + pqxx::params{try_id, try_ttl}) + .one_row() + .as(); + tx.commit(); + }); + } catch (const pqxx::unique_violation& e) { + continue; + } + success = true; + fileid = std::move(try_id); + filepath = "{:03d}/{}"_format(bcid % 1000, fileid); + expiry = std::chrono::sys_seconds{std::chrono::seconds{static_cast(exp)}}; + uploaded = std::chrono::sys_seconds{std::chrono::seconds{static_cast(upl)}}; + } + + if (!success) { + log::error( + logcat, + "Tried 25 random backcompat IDs are got all constraint failures, something " + "getting wrong!"); + str.close(STREAM_ERROR::io_error); + return; + } + } + + if (io_state == IO_STATE::none) + send_chunks(); +} + +void FileStream::put_req::initiate_rename() { + assert(!fileid.empty()); + assert(!filepath.empty()); + assert(chunks.empty()); + + log::debug( + logcat, + "All data received; renaming tempfile #{} ({}) to final location {}", + fd, + tmp_upload, + filepath); + + auto* sqe = io_uring_get_sqe(&str.handler.iou); + io_uring_sqe_set_data64(sqe, str.fsid); + io_uring_prep_renameat( + sqe, + str.handler.upload_dir_fd, + tmp_upload.c_str(), + str.handler.files_dir_fd, + filepath.c_str(), + RENAME_NOREPLACE); + io_uring_submit(&str.handler.iou); + io_state = IO_STATE::renaming; +} + +void FileStream::put_req::insert_and_respond() { + if (str.handler.back_compat_ids) { + // In back-compat mode, the insert already happened in finalize() because we had to do it to + // get the location to link the file into. + } else { + int max_ttl = str.handler.max_ttl.count(); + std::string db_ttl = "{} seconds"_format(std::clamp(ttl.value_or(max_ttl), 1, max_ttl)); + try { + pg_retryable([&] { + pqxx::work tx{str.handler.pg_conn}; + + auto [upl, exp] = tx.exec(R"( +INSERT INTO files (id, expiry) VALUES ($1, NOW() + $2) +ON CONFLICT(id) DO UPDATE SET expiry = GREATEST(files.expiry, EXCLUDED.expiry) +RETURNING EXTRACT(EPOCH FROM uploaded), EXTRACT(EPOCH FROM expiry))", + pqxx::params{fileid, db_ttl}) + .one_row() + .as(); + uploaded = + std::chrono::sys_seconds{std::chrono::seconds{static_cast(upl)}}; + expiry = std::chrono::sys_seconds{std::chrono::seconds{static_cast(exp)}}; + tx.commit(); + }); + } catch (const pqxx::failure& e) { + log::error(logcat, "Failed to insert DB record for file {}: {}", filepath, e.what()); + str.close(STREAM_ERROR::io_error); + return; + } + } + + oxenc::bt_dict_producer resp; + resp.append("#", fileid); + resp.append("u", uploaded.time_since_epoch().count()); + resp.append("x", expiry.time_since_epoch().count()); + + str.send(std::move(resp).str()); + str.send_fin(); + + str.handler.overall.put(size); +} + +void FileStream::put_req::unlink_and_close(uint64_t close_fsid) { + auto* sqe = io_uring_get_sqe(&str.handler.iou); + + io_uring_sqe_set_flags(sqe, IOSQE_CQE_SKIP_SUCCESS); + io_uring_sqe_set_data64(sqe, 0); // we can't do anything if this fails so 0 will just be + // ignored by the cqe handling + io_uring_prep_unlinkat(sqe, str.handler.upload_dir_fd, tmp_upload.c_str(), 0); + + sqe = io_uring_get_sqe(&str.handler.iou); + io_uring_sqe_set_data64(sqe, close_fsid); + if (close_fsid == 0) + // Without an fsid we don't care about the outcome, so can skip handling entirely. If we + // *do* have an fsid then we want it so that it triggers whatever happens after closing. + io_uring_sqe_set_flags(sqe, IOSQE_CQE_SKIP_SUCCESS); + io_uring_prep_close_direct(sqe, fd); + + io_uring_submit(&str.handler.iou); + + fd = -1; +} + +FileStream::put_req::~put_req() { + if (io_state != IO_STATE::none) { + // Some request is already in progress, so cancel it before we send the close. This + // particular io_uring submission is *synchronous* so that we aren't at risk of the close we + // submit just after this actually getting queued before the cancellation (and thus itself + // getting cancelled). + auto* sqe = io_uring_get_sqe(&str.handler.iou); + io_uring_sqe_set_data64(sqe, 0); + io_uring_sqe_set_flags(sqe, IOSQE_CQE_SKIP_SUCCESS); + io_uring_prep_cancel64(sqe, str.fsid, 0); + io_uring_submit(&str.handler.iou); + } + + if (fd >= 0) + unlink_and_close(0 /* don't care about results, since we're destructing */); +} + +void FileStream::parse_put(oxenc::bt_dict_consumer&& d) { + auto size = d.require("s"); + auto ttl = d.maybe("t"); + + if (size <= 0 || size > handler.max_size) { + log::error(logcat, "Invalid PUT size: {}, closing stream with error", size); + close(STREAM_ERROR::invalid_size); + return; + } + + auto& put = request.emplace(*this, std::move(size), std::move(ttl)); + if (log::get_level(accesslog) >= log::Level::info) { + auto ttl = put.ttl ? " (ttl={})"_format(*put.ttl) : ""; + if (auto conn = get_conn()) + log::info(accesslog, "PUT: {}B upload{} from {}", put.size, ttl, conn->remote()); + else + log::info(accesslog, "PUT: {}B upload{} from ", put.size, ttl); + } +} + +} // namespace sfs diff --git a/quic/quic-files.cpp b/quic/quic-files.cpp new file mode 100644 index 0000000..c1e4e55 --- /dev/null +++ b/quic/quic-files.cpp @@ -0,0 +1,202 @@ +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "cleanup.hpp" +#include "requests.hpp" + +std::atomic signalled = 0; +void handle_signal(int sig) { + signalled = sig; +} + +int main(int argc, char* argv[]) { + + using namespace oxen; + using namespace log::literals; + using namespace std::literals; + + std::signal(SIGINT, handle_signal); + std::signal(SIGTERM, handle_signal); + + auto logcat = log::Cat("files"); + + CLI::App cli{"Session QUIC file server"}; + + std::string pgsql_uri; + cli.add_option( + "--pgsql,-P", + pgsql_uri, + "Postgresql database or URI, e.g. 'mydb' or some more complicated " + "'postgresql:///...'") + ->type_name("URI") + ->required(); + + std::filesystem::path file_path; + cli.add_option( + "--files,-f", + file_path, + "Directory where files referenced in the database are stored on disk") + ->type_name("DIRECTORY") + ->required(); + + bool back_compat_ids = false; + cli.add_flag( + "--compat-ids,-c", + back_compat_ids, + "Run in backwards-compatible, numeric ID mode. New uploads will get random integer " + "IDs instead of file hash IDs."); + + uint32_t max_ttl = 14 * 24 * 60 * 60; + cli.add_option( + "--ttl,-t", + max_ttl, + "Maximum TTL for uploads, in seconds. This is the default lifetime for upload " + "files (if not renewed). Clients may request shorter TTLs during upload or renew " + "requests, but requesting a longer TTL will be truncated to this value") + ->capture_default_str() + ->check(CLI::Range(60, 60 * 24 * 60 * 60)); + + uint32_t max_size = 10'223'616; + cli.add_option("--max-size,-S", max_size, "Maximum upload size, in bytes.") + ->capture_default_str(); + + bool no_delete_expired = false; + cli.add_flag( + "--no-delete-expired", + no_delete_expired, + "Disable expired file deletion. Should only be used if something else is taking care " + "of expired file deletions."); + + std::filesystem::path key_file; + cli.add_option( + "--key,-K", + key_file, + "Path to an Ed25519 secret key. Must be either 64 bytes (raw), or 128 hex " + "characters") + ->type_name("FILENAME") + ->required(); + + std::string addr{":11235"}; + cli.add_option("--bind,-b", addr, "Bind address for incoming oxen-libQUIC connections") + ->type_name("IP:PORT") + ->capture_default_str() + ->check([](const std::string& a) { + try { + quic::Address::parse(a); + } catch (const std::exception& e) { + return "Invalid bind address: {}"_format(e.what()); + } + return ""s; + }); + + bool disable_0rtt = false; + cli.add_flag("--disable-0rtt,-0", disable_0rtt, "Disables 0-RTT support"); + + std::string log_level = "info,quic=warning"; + cli.add_option( + "--log-level", + log_level, + "Log verbosity level, see Log Levels below for accepted values") + ->type_name("LEVEL") + ->capture_default_str(); + + try { + cli.parse(argc, argv); + } catch (const CLI::ParseError& e) { + return cli.exit(e); + } + + log::apply_categories(log_level); + log::add_sink(log::Type::Print, "stdout"); + + std::string ed_keys; + try { + std::ifstream in; + in.exceptions(std::ios::badbit | std::ios::failbit); + in.open(key_file, std::ios::binary | std::ios::in); + in.seekg(0, in.end); + auto size = in.tellg(); + in.seekg(0, in.beg); + + if (size == 64) { + ed_keys.resize(64); + in.read(reinterpret_cast(ed_keys.data()), 64); + } else if (size >= 128 && size <= 130) { + ed_keys.resize(size); + in.read(ed_keys.data(), size); + if ((size == 130 && ed_keys[128] == '\r' && ed_keys[129] == '\n') || + (size == 129 && ed_keys[128] == '\n')) + ed_keys.resize(128); + if (ed_keys.size() == 128 && oxenc::is_hex(ed_keys)) + ed_keys = oxenc::from_hex(ed_keys); + else + ed_keys.clear(); + } + + if (ed_keys.size() != 64) + throw std::invalid_argument{ + "Invalid --key file: expected 64 raw byte, or 128 hex character key file"}; + } catch (const std::exception& e) { + fmt::print(stderr, "\n\n\x1b[31;1mInvalid --key file: {}\x1b[0m\n\n", e.what()); + return 1; + } + + log::info( + logcat, + "Server Ed25519 pubkey: {}", + oxenc::to_hex(ed_keys.begin() + 32, ed_keys.end())); + auto address = quic::Address::parse(addr); + log::info( + logcat, + "Starting quic listener @ {}{}", + address, + disable_0rtt ? " WITHOUT 0rtt support" : ""); + + std::thread cleanup_thread; + std::promise stop_cleanup; + if (!no_delete_expired) + cleanup_thread = sfs::start_cleanup_thread(pgsql_uri, file_path, stop_cleanup.get_future()); + + try { + sfs::ReqHandler handler{ + address, + std::move(ed_keys), + !disable_0rtt, + std::move(pgsql_uri), + back_compat_ids, + std::chrono::seconds{max_ttl}, + file_path, + max_size}; + + log::info(logcat, "Server started."); + + signalled.wait(0); + log::warning(logcat, "Received signal {}, stopping server", signalled.load()); + } catch (const std::exception& e) { + log::error(logcat, "An exception occured while running the server: {}", e.what()); + } + + if (cleanup_thread.joinable()) { + log::info(logcat, "Stopping cleanup thread"); + stop_cleanup.set_value(); + cleanup_thread.join(); + } + + log::info(logcat, "Exiting"); +} diff --git a/quic/requests.cpp b/quic/requests.cpp new file mode 100644 index 0000000..a3e11d1 --- /dev/null +++ b/quic/requests.cpp @@ -0,0 +1,533 @@ +#include "requests.hpp" + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "common.hpp" // IWYU pragma: keep + +namespace sfs { + +using namespace oxen; +using namespace log::literals; +using namespace std::literals; + +static auto logcat = log::Cat("files"); +static auto accesslog = log::Cat("access"); + +void FileStream::on_fin() { + Stream::on_fin(); + std::visit( + [](R& req) { + if constexpr (!std::same_as) + req.finalize(); + }, + request); +} + +void FileStream::receive(std::span data) { + if (is_closing()) + return; + if (std::holds_alternative(request)) { + if (command_block_size < 0) { + try { + // The first thing we received on the stream is the size of the command block, + // e.g. `123:`. This call accumulates these digits until we hit a :, mutating data + // to omit the prefix once found. + if (auto size = quic::prefix_accumulator(partial_size, data)) { + if (*size == 0) + throw std::runtime_error{"Command block cannot be 0 bytes"}; + command_block_size = *size; + } else + return; // Not enough data received yet + } catch (const std::exception& e) { + log::warning( + logcat, + "Invalid file stream command block on stream {}: {}; closing stream", + stream_id(), + e.what()); + close(STREAM_ERROR::bad_request); + return; + } + command.reserve(command_block_size); + } + + try { + if (!quic::data_accumulator(command, data, command_block_size)) + return; + } catch (const std::exception& e) { + log::error( + logcat, + "Invalid file stream command block on stream {}: {}; closing stream", + stream_id(), + e.what()); + close(STREAM_ERROR::bad_request); + return; + } + + // We've completely read the request body, so now we can parse it: + try { + oxenc::bt_dict_consumer d{command}; + if (auto cmd = d.maybe("!")) { + if (*cmd == "GET") + parse_get(std::move(d)); + else if (*cmd == "PUT") + parse_put(std::move(d)); + else { + log::error( + logcat, + "Invalid file stream request '{}' on stream {}", + *cmd, + stream_id()); + close(STREAM_ERROR::bad_endpoint); + return; + } + } else { + log::error( + logcat, + "File stream request missing command field '!' on stream {}", + stream_id()); + close(STREAM_ERROR::bad_request); + return; + } + } catch (const std::exception& e) { + log::error(logcat, "Failure during command block parsing: {}", e.what()); + close(STREAM_ERROR::bad_request); + return; + } + } + + if (is_closing()) + return; + + if (auto* put = std::get_if(&request)) { + // If this is a PUT request then everything after the command block is file data: + put->append(data); + } else { + auto* get = std::get_if(&request); + assert(get); + if (!data.empty()) { + log::warning(logcat, "Error: {}B after GET command block", data.size()); + close(STREAM_ERROR::bad_request); + } + } +} + +void FileStream::wrote(size_t size) { + Stream::wrote(size); + + if (auto* get = std::get_if(&request)) + get->queue_reads(); +} + +void FileStream::close(STREAM_ERROR e) { + Stream::close(static_cast(e)); + + // If there is a request in progress, destroying whatever is in `request` will cancel any + // ongoing I/O. (This has to be deferred via call_soon because this can be called from *inside* + // a live instance of `request`). + loop.call_soon( + [this, + wself = std::weak_ptr{std::static_pointer_cast(shared_from_this())}] { + if (auto self = wself.lock()) + self->request.emplace(); + }); +} + +FileStream::FileStream(quic::Connection& c, quic::Endpoint& e, ReqHandler& h) : + quic::Stream{c, e}, handler{h}, fsid{handler._next_fsid++} { + handler.streams[fsid] = this; +} + +FileStream::~FileStream() { + handler.streams.erase(fsid); +} + +quic::opt::static_secret make_static_secret(std::string_view ed_keys) { + constexpr auto STATIC_SEC_DOMAIN = "session-file-server-quic-files"sv; + crypto_generichash_blake2b_state st; + crypto_generichash_blake2b_init( + &st, + reinterpret_cast(STATIC_SEC_DOMAIN.data()), + STATIC_SEC_DOMAIN.size(), + 32); + crypto_generichash_blake2b_update( + &st, reinterpret_cast(ed_keys.data()), ed_keys.size()); + std::vector out; + out.resize(32); + crypto_generichash_blake2b_final(&st, out.data(), out.size()); + return quic::opt::static_secret{std::move(out)}; +} + +ReqHandler::ReqHandler( + quic::Address listen, + std::string ed_keys, + bool enable_0rtt, + std::string pgsql_uri, + bool back_compat_ids, + std::chrono::seconds max_ttl, + std::filesystem::path files_path_, + int64_t max_size) : + back_compat_ids{back_compat_ids}, + max_ttl{max_ttl}, + max_size{max_size}, + files_path{std::move(files_path_)} { + + if (sodium_init() == -1) + throw std::runtime_error{"Failed to initialize libsodium!"}; + + loop.call_get([&] { + pg_conn = pqxx::connection{pgsql_uri}; + + std::list cleanup; + + std::filesystem::create_directories(files_path); + + if (back_compat_ids) + for (int i = 0; i < 1000; i++) + std::filesystem::create_directories(files_path / "{:03d}"_format(i)); + else { + for (auto a : b64_url_chars) + for (auto b : b64_url_chars) + std::filesystem::create_directories(files_path / "{}{}"_format(a, b)); + } + upload_path = files_path / "uploads"; + std::filesystem::create_directories(upload_path); + + upload_dir_fd = open(upload_path.c_str(), O_PATH | O_DIRECTORY); + if (upload_dir_fd < 0) + throw std::runtime_error{ + "Unable to open upload path {}: {}"_format(upload_path, strerror(errno))}; + cleanup.emplace_back([this] { close(upload_dir_fd); }); + + if (int err = io_uring_queue_init(128, &iou, IORING_SETUP_SINGLE_ISSUER); err != 0) + throw std::runtime_error{ + "Failed to initialize io_uring queue: {}"_format(strerror(-err))}; + cleanup.emplace_front([this] { io_uring_queue_exit(&iou); }); + + if (int err = io_uring_register_files_sparse(&iou, MAX_OPEN_FILES); err != 0) + throw std::runtime_error{ + "Failed to initialize io_uring file descriptors: {}"_format(strerror(-err))}; + cleanup.emplace_front([this] { io_uring_unregister_files(&iou); }); + + files_dir_fd = open(files_path.c_str(), O_PATH | O_DIRECTORY); + if (files_dir_fd < 0) + throw std::runtime_error{ + "Unable to open files path {}: {}"_format(files_path, strerror(errno))}; + cleanup.emplace_back([this] { close(files_dir_fd); }); + + iou_evfd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); + if (iou_evfd < 0) + throw std::runtime_error{"Failed to set up eventfd: {}"_format(strerror(errno))}; + cleanup.emplace_front([this] { close(iou_evfd); }); + + if (int err = io_uring_register_eventfd(&iou, iou_evfd); err != 0) + throw std::runtime_error{"Failed to register eventfd: {}"_format(strerror(-err))}; + cleanup.emplace_front([this] { io_uring_unregister_eventfd(&iou); }); + + ep = quic::Endpoint::endpoint( + loop, listen, make_static_secret(ed_keys), quic::opt::inbound_alpn(sfs::ALPN)); + + auto creds = quic::GNUTLSCreds::make_from_ed_seckey(ed_keys); + if (enable_0rtt) + creds->enable_inbound_0rtt(0s, 48h); + + ep->listen( + std::move(creds), + [this](quic::Connection& conn, + quic::Endpoint& e, + std::optional stream_id) { + assert(stream_id); // Should always be set for incoming streams + return make_stream(conn, e, *stream_id); + }); + + iou_ev = event_new( + loop.get_event_base(), + iou_evfd, + EV_READ | EV_PERSIST, + [](evutil_socket_t fd, short, void* me) { + uint64_t dummy; + read(fd, &dummy, 8); + static_cast(me)->process_cqes(); + }, + this); + if (!iou_ev) + throw std::runtime_error{"Failed to initialize io_uring fd event via libevent"}; + cleanup.emplace_front([this] { event_free(iou_ev); }); + if (0 != event_add(iou_ev, nullptr)) + throw std::runtime_error{"Failed to initialize io_uring event monitoring via libevent"}; + + for (auto& x : cleanup) + x.disarm(); + + stats_timer = loop.call_every(30s, [this] { + auto now = std::chrono::steady_clock::now(); + log::info( + logcat, + "{} uploads ({:.1f}GB), {} downloads ({:.1f}GB), {} other requests in {} since " + "startup.", + overall.puts, + overall.put_data / 1e9, + overall.gets, + overall.get_data / 1e9, + overall.others, + friendly_duration(now - overall.since)); + if (now - recent.since >= 10min) { + recent_old = recent; + recent = {}; + } + if (recent_old.since > overall.since + 1s) { + log::info( + logcat, + "{} uploads ({:.1f}GB), {} downloads ({:.1f}GB), {} other requests in last " + "{}", + recent.puts + recent_old.puts, + (recent.put_data + recent_old.put_data) / 1e9, + recent.gets + recent_old.gets, + (recent.get_data + recent_old.get_data) / 1e9, + recent.others + recent_old.others, + friendly_duration(now - recent_old.since)); + } + }); + }); +} + +ReqHandler::~ReqHandler() { + assert(ep.use_count() == 1); + ep.reset(); + event_free(iou_ev); + loop.call_get([this] { + io_uring_unregister_files(&iou); + io_uring_unregister_eventfd(&iou); + close(iou_evfd); + io_uring_queue_exit(&iou); + }); +} + +void ReqHandler::process_cqes() { + unsigned head; + unsigned count = 0; + io_uring_cqe* cqe; + io_uring_for_each_cqe(&iou, head, cqe) { + ++count; + auto id = io_uring_cqe_get_data64(cqe); + if (id < REQ_CQE_BASE_ID) { + FileStream* sptr = nullptr; + if (auto it = streams.find(id); it != streams.end()) + sptr = it->second; + if (!sptr) { + log::debug(logcat, "Ignoring CQE on dead stream (fsid={})", id); + continue; + } + auto& str = *sptr; + try { + std::visit( + [cqe](R& req) { + if constexpr (!std::same_as) + req.handle_cqe(cqe); + }, + str.request); + } catch (const std::exception& e) { + log::warning( + logcat, + "Exception during I/O response processing ({}); closing stream", + e.what()); + str.close(STREAM_ERROR::io_error); + } + } else { + if (auto it = _req_cqe_handlers.find(id); it != _req_cqe_handlers.end()) { + auto h = std::move(it->second); + _req_cqe_handlers.erase(it); + try { + h(cqe); + } catch (const std::exception& e) { + log::warning( + logcat, + "Uncaught exception during request CQE handling: {}; request will not " + "be replied to!", + e.what()); + } + } else { + log::error( + logcat, + "Received request cqe with unknown or already handled ID {}, this " + "shouldn't happen!", + id); + } + } + } + io_uring_cq_advance(&iou, count); +} + +std::shared_ptr ReqHandler::make_stream( + quic::Connection& conn, quic::Endpoint& e, int64_t stream_id) { + + // Stream 0 is the bt request stream on which you can make simple requests such as getting + // current versions, querying file metadata, or touching an uploaded file to renew its + // expiry. + // + // It does *not*, however, handle uploads and downlods: each upload/download happens on a + // new stream. + if (stream_id == 0) { + auto s = e.loop.make_shared(conn, e); + s->register_handler( + "file_info", [this](quic::message m) { handle_file_info(std::move(m)); }); + s->register_handler( + "file_extend", [this](quic::message m) { handle_file_extend(std::move(m)); }); + // s->register_handler("session_version", [this](quic::message m) { + // handle_session_version(std::move(m)); + // }); + // s->register_handler( + // "token_info", [this](quic::message m) { handle_token_info(std::move(m)); + // }); + return std::move(s); + } + + return e.loop.make_shared(conn, e, *this); +} + +static void send_error(const quic::message& m, std::string_view err, bool json) { + m.respond(json ? R"({{"error":"{}"}})"_format(err) : err, true); +} + +void ReqHandler::handle_file_info(quic::message m) { + std::string id; + bool json = false; + try { + auto body = m.body(); + log::critical(logcat, "BODY: {}", body); + if (body.starts_with("d")) { + oxenc::bt_dict_consumer r{body}; + id = r.require("#"); + r.finish(); + } else { + nlohmann::json::parse(body).at("id").get_to(id); + json = true; + } + } catch (const std::exception& e) { + m.respond(req_error::BAD_REQUEST, true); + return; + } + + auto info = db_lookup(pg_conn, id); + if (!info) { + send_error(m, req_error::NOT_FOUND, json); + return; + } + + auto statxbuf = std::make_shared(); + auto* sqe = io_uring_get_sqe(&iou); + auto rid = _next_req_cqeid++; + io_uring_sqe_set_data64(sqe, rid); + io_uring_sqe_set_flags(sqe, 0); + io_uring_prep_statx(sqe, files_dir_fd, info->path.c_str(), 0, STATX_SIZE, statxbuf.get()); + io_uring_submit(&iou); + + _req_cqe_handlers[rid] = [id = std::move(id), + info = std::move(*info), + statxbuf = std::move(statxbuf), + m = std::move(m), + json](io_uring_cqe* cqe) { + if (cqe->res < 0) { + log::error( + logcat, + "Failed to stat {}: {}; returning NOT_FOUND to caller", + info.path, + strerror(-cqe->res)); + return send_error(m, req_error::NOT_FOUND, json); + } else if (json) { + m.respond(nlohmann::json{ + {"id", id}, + {"size", statxbuf->stx_size}, + {"uploaded", info.uploaded.time_since_epoch().count()}, + {"expires", info.expiry.time_since_epoch().count()}} + .dump()); + } else { + oxenc::bt_dict_producer res; + res.append("#", id); + res.append("s", statxbuf->stx_size); + res.append("u", info.uploaded.time_since_epoch().count()); + res.append("x", info.expiry.time_since_epoch().count()); + m.respond(std::move(res).str()); + } + }; +} + +void ReqHandler::handle_file_extend(quic::message m) { + std::string id; + std::optional ttl; + bool json = false; + try { + auto body = m.body(); + if (body.starts_with("d")) { + oxenc::bt_dict_consumer r{body}; + id = r.require("#"); + ttl = r.maybe("t"); + r.finish(); + } else { + auto r = nlohmann::json::parse(body); + r.at("id").get_to(id); + r.at("ttl").get_to(ttl); + json = true; + } + } catch (const std::exception& e) { + m.respond(req_error::BAD_REQUEST, true); + return; + } + + std::string db_ttl = + "{} seconds"_format(std::clamp(ttl.value_or(max_ttl.count()), 1, max_ttl.count())); + std::chrono::sys_seconds uploaded, expiry; + try { + pg_retryable([&] { + pqxx::work tx{pg_conn}; + + auto result = tx.exec(R"( +UPDATE files SET expiry = GREATEST(expiry, NOW() + $2) +WHERE id = $1 +RETURNING EXTRACT(EPOCH FROM uploaded), EXTRACT(EPOCH FROM expiry))", + pqxx::params{id, db_ttl}) + .opt_row(); + if (result) { + auto [upl, exp] = result->as(); + uploaded = + std::chrono::sys_seconds{std::chrono::seconds{static_cast(upl)}}; + expiry = std::chrono::sys_seconds{std::chrono::seconds{static_cast(exp)}}; + } + tx.commit(); + }); + } catch (const pqxx::failure& e) { + log::error(logcat, "Failed to update DB with updated expiry for {}: {}", id, e.what()); + send_error(m, req_error::INTERNAL_ERROR, true); + return; + } + + if (json) { + m.respond(nlohmann::json{ + {"id", id}, + {"uploaded", uploaded.time_since_epoch().count()}, + {"expires", expiry.time_since_epoch().count()}} + .dump()); + } else { + oxenc::bt_dict_producer res; + res.append("#", id); + res.append("u", uploaded.time_since_epoch().count()); + res.append("x", uploaded.time_since_epoch().count()); + m.respond(std::move(res).str()); + } +} + +} // namespace sfs diff --git a/quic/requests.hpp b/quic/requests.hpp new file mode 100644 index 0000000..ee4d60f --- /dev/null +++ b/quic/requests.hpp @@ -0,0 +1,380 @@ +#pragma once + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace oxen::quic { +struct message; +} + +namespace sfs { + +namespace quic = oxen::quic; +using namespace std::literals; + +inline constexpr auto ALPN = "quic-files"sv; + +enum class STREAM_ERROR : uint64_t { + bad_request = 400, + + not_found = 404, + + bad_endpoint = 405, + + size_missing = 450, + invalid_size = 451, + not_enough_data = 452, + too_much_data = 453, + io_error = 480, +}; + +// BTRequestStream request error strings: +namespace req_error { + // BAD_REQUEST means you sent something the server didn't understand: + static constexpr auto BAD_REQUEST = "BAD_REQUEST"sv; + // The server encountered some sort of internal error and cannot complete the request: + static constexpr auto INTERNAL_ERROR = "INTERNAL_ERROR"sv; + // The file to modify or query does not exist: + static constexpr auto NOT_FOUND = "NOT_FOUND"sv; +} + +class FileStream; + +struct req_stats { + int64_t puts = 0, gets = 0, others = 0; + int64_t put_data = 0, get_data = 0; + std::chrono::steady_clock::time_point since = std::chrono::steady_clock::now(); + + void put(int64_t size) { + puts++; + put_data += size; + } + void get(int64_t size) { + gets++; + get_data += size; + } + void other() { others++; } +}; + +class ReqHandler { + + quic::Loop loop; + std::shared_ptr ep; + bool back_compat_ids; + std::chrono::seconds max_ttl; + int64_t max_size{max_size}; + + req_stats overall{}; + req_stats recent{}, recent_old{}; + std::shared_ptr stats_timer; + + std::filesystem::path files_path; + std::filesystem::path upload_path; + int files_dir_fd; + int upload_dir_fd; + + io_uring iou; + int iou_evfd; + event* iou_ev; + + pqxx::connection pg_conn; + + static constexpr int MAX_OPEN_FILES = 200000; + + // The u64 that we use to associated io_uring responses with the streams that made the request. + // 0 is a special value (e.g. used for cancellation on destruction) that never maps to an actual + // stream. + uint64_t _next_fsid = 1; + std::unordered_map streams; + + uint64_t REQ_CQE_BASE_ID = 1ULL << 63; + + uint64_t _next_req_cqeid = REQ_CQE_BASE_ID; + std::unordered_map> _req_cqe_handlers; + + void process_cqes(); + + friend class FileStream; + + void handle_file_info(quic::message m); + void handle_file_extend(quic::message m); + void handle_session_version(quic::message m); + void handle_token_info(quic::message m); + + public: + ReqHandler( + quic::Address listen, + std::string ed_keys, + bool enable_0rtt, + std::string pgsql_uri, + bool back_compat_ids, + std::chrono::seconds max_ttl, + std::filesystem::path files_path, + int64_t max_size); + + ~ReqHandler(); + + std::shared_ptr make_stream( + quic::Connection& conn, quic::Endpoint& e, int64_t stream_id); +}; + +/// Stream subclass used for handling all incoming streams beyond the first (i.e. ids > 0; the first +/// stream, with id 0, is always a standard quic bt request stream for non-transfer responses). +/// +/// Each of these streams is issued with an initial command block, optionally followed by stream +/// data. The response is an initial response block, optionally followed by stream data. +/// +/// Each command or response block is a bt-encoded string containing a bt-encoded dict of request +/// data. The inner encoded dict may be no larger than 9999 bytes. E.g. +/// +/// 19:d1:!3:PUT1:si8192ee.................. +/// +/// is a single upload command. The reply (after successfully accepting the upload) could be: +/// +/// 67:d1:#44:abc…xyz1:xi1760388294ee +/// +/// indicating the status of a successful upload. +/// +/// Currently supported commands (which are in the "!" key): +/// +/// PUT +/// --- +/// Command block is a dict containing: +/// - ! = PUT +/// - s = upload size, in bytes. The upload will fail immediately if the size is not acceptable. +/// If the stream is closed (or FIN sent) before this size has been received, or if more than this +/// size is sent, then the stream is closed with an error (the file is not stored). +/// - t = file ttl, in seconds. Optional; if omitted defaults to max ttl. +/// +/// All data following the command block are the file contents being uploaded. +/// +/// The successful response is a dict block (i.e. bt-encoded string encoding of a bt-dict) +/// containing keys: +/// - # = the accepted file id as a string. This is a base64 (url variation) encoded value, unless +/// backwards compatible ids are enabled in which case it will be a stringified integer value. +/// - u = the file upload timestamp; typically now, but earlier if the upload was deduplicated. +/// - x = the file expiry, in unix epoch seconds, after the insert or update caused by this upload. +/// +/// On error, the stream is closed (without sending a response) with one of the following error +/// codes: +/// - 450 -- no upload size given +/// - 451 -- invalid upload size (i.e. size is too large). This will be triggered immediately if +/// the +/// size given in the initial command is not allowed, or if too little or too much data is +/// sent. +/// - 452 -- stream FIN received before all data received +/// - 453 -- too much file data received (i.e. more than "s" bytes) +/// - 480 -- the file could not be stored because of a server-side I/O error. +/// - other non-0 codes indicate some other internal server side error (e.g. server restarting). +/// +/// +/// GET +/// --- +/// Command block is a dict containing: +/// - ! = GET +/// - # = file ID to retrieve, as a string value. Should be either a 44-character ID, or a shorter +/// numeric ID (but still passed as a string). +/// +/// The successful response is a dict block followed by the file bytes. The dict contains keys: +/// - s = the size of the file being sent, in bytes. (Note that this is the file size, not the +/// stream size, i.e. it does not include the size of the response block). +/// - u = the original file creation data, in unix epoch seconds. If the file was renewed (or +/// re-uploaded and de-duplicate), this is the original upload date, not the renewal date. +/// - x = the file expiry, in unix epoch seconds +/// +/// The file immediately follows this double-encoded block (e.g. `123:d.....e`), in its entirety. +/// +/// If the file is not found then the stream is closed, with no response sent, with a stream error +/// code of 404. Error 480 can be issued for some sort of I/O error; other errors are possible, +/// e.g. if the server is restarted before handling the request, or if a server-side I/O error +/// occurs. +/// +/// +/// If some other command is sent on an auxiliary stream then the stream is closed with error 405. +/// If the command block is not parseable then it is closed with code 400. +/// +class FileStream : public quic::Stream { + + class file_req { + protected: + // We queue reads and writes in blocks of this size + static constexpr int64_t CHUNK_SIZE = 64 * 1024; + + file_req(FileStream& str) : str{str} {} + + FileStream& str; + + int fd = -1; + + std::filesystem::path filepath; + std::chrono::sys_seconds uploaded; + std::chrono::sys_seconds expiry; + + virtual ~file_req() = default; + + public: + std::string fileid; + int64_t size = -1; + + // Called to process a CQE response to a queued event that this object submitted. + virtual void handle_cqe(io_uring_cqe* cqe) = 0; + }; + + class get_req : public file_req { + // Per-stream readahead; if we drop below this of unsent data then we queue additional reads + // until we have unsent data on the stream >= this value (or hit EOF). + static constexpr int64_t READAHEAD = 512 * 1024; + + // current io_uring state: + // - IO_STATE::none means there is no pending io_uring request + // - IO_STATE::reading means we have a chain of `chunk.size()` reads queued + // - IO_STATE::statx means we are waiting on the statx that happens first + // - IO_STATE::opening means we are waiting on the open (which happens chained with the + // statx). + // - IO_STATE::closing means we are closing the file handle. + enum class IO_STATE : int { + none = -100, + statx = -1, + opening = 0, + closing_done = -2, + reading = 1, // including all higher underlying values + }; + + struct statx statxbuf {}; + + IO_STATE io_state = IO_STATE::none; + + bool eof = false; + int64_t bytes_read = 0; + + std::deque> chunks; + + public: + get_req(FileStream& str, std::string id); + + void finalize(); + + void handle_cqe(io_uring_cqe* cqe) override; + + void queue_reads(); + + // Cancels operations (if something is queued) and closes the fd + void close(); + }; + class put_req : public file_req { + + crypto_generichash_blake2b_state b2b; + + // current io_uring state: + // - IO_STATE::none means there is no pending io_uring request + // - >= IO_STATE::writing means we are writing the first N buffers of `chunks`, where N + // is the underlying value. + // - IO_STATE::opening means we are creating the tempfile + // - IO_STATE::renaming means we are renaming the tempfile into its final location + // - IO_STATE::closing_done means we are closing the file handle after success. + // - IO_STATE::unlink_err means we aborted with an error (and closed/unlinked the tempfile) + // Tracking the currently scheduled I/O task: + enum class IO_STATE : int { + none = -100, + closing_done = -2, + renaming = -1, + opening = 0, + writing = 1, // >= writing means a writev of the underlying number of chunks + }; + + IO_STATE io_state = IO_STATE::none; + + // When waiting on a write, this is how many bytes we tried writing + int io_write_size; + // In some cases (such as a partial write) we end up starting a write with an offset into + // the first chunk; this is where we store it: + size_t io_write_first_offset; + + std::deque> chunks; + + int64_t received = 0; + bool got_all = false; + std::filesystem::path tmp_upload; + + void initiate_rename(); + + void insert_and_respond(); + + void unlink_and_close(uint64_t close_fsid); + + friend class ReqHandler; + + public: + std::optional ttl; + + put_req(FileStream& s, size_t size, std::optional ttl); + + // Appends data; the first time this is called a temporary file is opened into which the + // data will be written. Throws upon I/O error. + void append(std::span data); + + // If there is at least 1 full chunk accumulated (or the write is complete) then this + // submits any full (or final) chunks to be written. Does nothing if an I/O operation is + // already submitted, or if there isn't a full chunk yet. + // + // retry_offset is used after a failed partial write to retry the first, beginning the first + // chunk at the given offset. + void send_chunks(std::optional _retry_offset = std::nullopt); + + // Called when the sender sends FIN on the stream (to indicate it is done with the file + // contents). If not enough data has been sent, this closes the stream with an error code. + // Otherwise this initiates linking the file to its final location on disk (based on the + // hash), inserting the record into the database, and then replying on the stream with the + // upload info. + void finalize(); + + // Called to process a CQE response to a queued event that this object submitted. + void handle_cqe(io_uring_cqe* cqe) override; + + // Cancels any ongoing operations and closes the file descriptor. + ~put_req(); + }; + + ReqHandler& handler; + + // Unique identifier we use to reacquire the stream (if still alive) out of the ReqHandler when + // a io_uring completion occurs. (We store this id in the queue entry). + // (NB: put_req destructor requires this, so this must be declared above `request`) + const uint64_t fsid; + + std::variant request; + int command_block_size = -1; + std::string partial_size; + std::vector command; + + void parse_get(oxenc::bt_dict_consumer&& d); + void parse_put(oxenc::bt_dict_consumer&& d); + + void on_fin() override; + + friend class ReqHandler; + + public: + FileStream(quic::Connection& c, quic::Endpoint& e, ReqHandler& h); + + ~FileStream() override; + + void receive(std::span data) override; + + void close(STREAM_ERROR e); + + void wrote(size_t size) override; +}; + +} // namespace sfs diff --git a/quic/test/test-download.cpp b/quic/test/test-download.cpp new file mode 100644 index 0000000..1782460 --- /dev/null +++ b/quic/test/test-download.cpp @@ -0,0 +1,232 @@ +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +using namespace std::literals; +using namespace oxen; +using namespace log::literals; + +static auto cat = log::Cat("upload"); + +int main(int argc, char* argv[]) { + + log::add_sink(log::Type::Print, "-"); + log::reset_level(log::Level::warn); + log::set_level(cat, log::Level::info); + + auto usage = [&argv](std::string_view msg) { + if (!msg.empty()) + log::error(cat, "{}", msg); + log::info( + cat, + "\n\nUsage: {} IP:PORT PUBKEY FILEID [FILEID2 ...]\n\nDownloads files from a " + "fileserver to ./FILEID ./FILEID2 etc.\n\n", + argv[0]); + return 1; + }; + + if (argc < 4) + return usage("Insufficient arguments"); + + quic::Address addr; + try { + addr = quic::Address::parse(argv[1]); + } catch (const std::exception& e) { + return usage("{} doesn't look like an IP:PORT address ({})"_format(argv[2], e.what())); + } + + std::string_view pubkey{argv[2]}; + if (pubkey.size() != 64 || !oxenc::is_hex(pubkey)) + return usage("{} is not a valid pubkey"_format(pubkey)); + + int ttl = 0; + int id_i = 3; + + if (id_i >= argc) + return usage("No upload files given!"); + + constexpr auto b64_url_chars = + "-0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ_abcdefghijklmnopqrstuvwxyz"sv; + struct state { + std::string id; + std::filesystem::path f; + std::string partial; + std::vector meta; + int meta_size = -1; + int size = -1; + std::chrono::sys_seconds uploaded; + std::chrono::sys_seconds expiry; + int received = 0; + std::ofstream out; + }; + + std::vector downloads; + std::unordered_set dupes; + for (; id_i < argc; id_i++) { + auto& st = downloads.emplace_back(); + st.id = argv[id_i]; + if (!((st.id.size() == 44 && st.id.find_first_not_of(b64_url_chars) == std::string::npos) || + (st.id.size() >= 1 && st.id.size() <= 16 && + st.id.find_first_not_of("0123456789") == std::string::npos))) + throw std::runtime_error{ + "invalid id '{}': expected 44 b64url or 1-16 decimal digits"_format(st.id)}; + + st.f = st.id; + for (int i = 1; std::filesystem::exists(st.f) || !dupes.insert(st.f).second; i++) + st.f.replace_extension(".{}"_format(i)); + } + + quic::Loop loop; + auto ep = quic::Endpoint::endpoint(loop, quic::Address{}); + + auto conn = ep->connect( + quic::RemoteAddress{oxenc::from_hex(pubkey), addr}, + quic::opt::outbound_alpn("quic-files")); + + // We must always open this stream first (even if we aren't going to use it); this is the stream + // on which we can make simple requests; all later streams are for file transfers. + auto bt_str = conn->open_stream(); + + // stream -> downloads_index + std::unordered_map, size_t> dl_idx; + + std::atomic remaining{downloads.size()}; + auto on_stream_close = [&](quic::Stream& s, uint64_t err) { + --remaining; + remaining.notify_one(); + auto& st = downloads[dl_idx.at(s.shared_from_this())]; + auto sid = s.stream_id(); + if (err != 0) { + log::error(cat, "Download {} stream (id={}) closed with error {}", st.id, sid, err); + return; + } + + if (st.size < 0) { + log::error( + cat, "Download {} stream (id={}) closed before metadata received!", st.id, sid); + return; + } + if (st.received < st.size) { + log::error( + cat, + "Download {} stream (id={}) closed before file content received (received {} " + "of {}B)", + st.id, + sid, + st.received, + st.size); + return; + } else { + st.out.close(); + log::info( + cat, + "Successfully downloaded {} (on stream {}):\nsaved to: {}\nuploaded: {}\n " + "expiry: {}", + st.id, + sid, + st.f, + st.uploaded, + st.expiry); + } + }; + + auto on_stream_data = [&](quic::Stream& s, std::span data) { + auto sid = s.stream_id(); + auto& st = downloads[dl_idx.at(s.shared_from_this())]; + + if (st.meta_size < 0) { + try { + if (auto size = quic::prefix_accumulator(st.partial, data)) { + if (*size == 0) + throw std::runtime_error{"Invalid 0-byte metadata block"}; + st.meta_size = *size; + } else + return; + } catch (const std::exception& e) { + log::error(cat, "Invalid file stream metadata on stream {}: {}", sid, e.what()); + s.close(1234); + return; + } + + st.meta.reserve(st.meta_size); + } + + if (st.size < 0) { + try { + if (!quic::data_accumulator(st.meta, data, st.meta_size)) + return; + } catch (const std::exception& e) { + log::error(cat, "Invalid file metadata received from server: {}", e.what()); + s.close(1234); + return; + } + + try { + oxenc::bt_dict_consumer d{st.meta}; + auto size = d.require("s"); + if (size <= 0) + throw std::runtime_error{"Invalid non-positive file size {}"_format(size)}; + auto upl = d.require("u"); + auto exp = d.require("x"); + d.finish(); + + st.size = size; + st.uploaded = std::chrono::sys_seconds{std::chrono::seconds{upl}}; + st.expiry = std::chrono::sys_seconds{std::chrono::seconds{exp}}; + } catch (const std::exception& e) { + log::error(cat, "Failed to parse file metadata for {} on stream {}", st.id, sid); + s.close(444); + return; + } + + try { + st.out.exceptions(std::ios::failbit | std::ios::badbit); + st.out.open(st.f, std::ios::binary | std::ios::out); + } catch (const std::exception& e) { + log::error(cat, "Failed to open {} for download: {}", st.f, e.what()); + s.close(400); + } + } + + try { + if (st.received + data.size() > st.size) + throw std::runtime_error{"Invalid file data received from server: received {}, expected {}"_format(st.received + data.size(), st.size)}; + st.out.write(reinterpret_cast(data.data()), data.size()); + st.received += data.size(); + } + catch (const std::exception& e) { + log::error(cat, "Failed writing to {}: {}", st.f, e.what()); + s.close(400); + } + }; + + loop.call([&] { + for (size_t i = 0; i < downloads.size(); i++) { + auto& st = downloads[i]; + auto str = conn->open_stream(on_stream_data, on_stream_close); + dl_idx[str] = i; + + oxenc::bt_dict_producer info; + info.append("!", "GET"); + info.append("#", st.id); + str->send("{}:{}"_format(info.view().size(), info.view())); + str->send_fin(); + } + }); + + size_t r = remaining.load(); + while (r > 0) { + remaining.wait(r); + r = loop.call_get([&] { return remaining.load(); }); + } +} diff --git a/quic/test/test-query.cpp b/quic/test/test-query.cpp new file mode 100644 index 0000000..322e1b0 --- /dev/null +++ b/quic/test/test-query.cpp @@ -0,0 +1,76 @@ +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +using namespace std::literals; +using namespace oxen; +using namespace log::literals; + +static auto cat = log::Cat("query"); + +int main(int argc, char* argv[]) { + + log::add_sink(log::Type::Print, "stderr"); + log::reset_level(log::Level::warn); + log::set_level(cat, log::Level::info); + + auto usage = [&argv](std::string_view msg) { + if (!msg.empty()) + log::error(cat, "{}", msg); + log::info( + cat, + "\n\nUsage: {} IP:PORT PUBKEY ENDPOINT BODY\n\nSend a body to the fileserver, shows the response.\n\n", + argv[0]); + return 1; + }; + + if (argc < 5) + return usage("Insufficient arguments"); + + quic::Address addr; + try { + addr = quic::Address::parse(argv[1]); + } catch (const std::exception& e) { + return usage("{} doesn't look like an IP:PORT address ({})"_format(argv[2], e.what())); + } + + std::string_view pubkey{argv[2]}; + if (pubkey.size() != 64 || !oxenc::is_hex(pubkey)) + return usage("{} is not a valid pubkey"_format(pubkey)); + + quic::Loop loop; + auto ep = quic::Endpoint::endpoint(loop, quic::Address{}); + + auto conn = ep->connect( + quic::RemoteAddress{oxenc::from_hex(pubkey), addr}, + quic::opt::outbound_alpn("quic-files")); + + auto btr = conn->open_stream(); + + std::promise done_prom; + auto done = done_prom.get_future(); + btr->command(std::string{argv[3]}, std::string_view{argv[4]}, + [&done_prom](quic::message m) { + if (m.timed_out) + log::error(cat, "Request timed out!"); + else if (m.is_error()) + log::error(cat, "Request returned an error: {}", m.body()); + else { + log::info(cat, "Request returned a body:\n"); + fmt::print("{}", m.body()); + } + done_prom.set_value(); + }); + + done.wait(); +} diff --git a/quic/test/test-upload.cpp b/quic/test/test-upload.cpp new file mode 100644 index 0000000..94fa3fb --- /dev/null +++ b/quic/test/test-upload.cpp @@ -0,0 +1,209 @@ +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +using namespace std::literals; +using namespace oxen; +using namespace log::literals; + +static auto cat = log::Cat("upload"); + +int main(int argc, char* argv[]) { + + log::add_sink(log::Type::Print, "-"); + log::reset_level(log::Level::warn); + log::set_level(cat, log::Level::info); + + auto usage = [&argv](std::string_view msg) { + if (!msg.empty()) + log::error(cat, "{}", msg); + log::info( + cat, + "\n\nUsage: {} IP:PORT PUBKEY [TTL=SECONDS] FILENAME [FILENAME ...]\n\n", + argv[0]); + return 1; + }; + + if (argc < 4) + return usage("Insufficient arguments"); + + quic::Address addr; + try { + addr = quic::Address::parse(argv[1]); + } catch (const std::exception& e) { + return usage("{} doesn't look like an IP:PORT address ({})"_format(argv[2], e.what())); + } + + std::string_view pubkey{argv[2]}; + if (pubkey.size() != 64 || !oxenc::is_hex(pubkey)) + return usage("{} is not a valid pubkey"_format(pubkey)); + + int ttl = 0; + int file_i = 3; + if (std::string_view ttl_opt{argv[3]}; ttl_opt.starts_with("TTL=")) { + file_i++; + auto [ptr, ec] = std::from_chars(ttl_opt.data() + 4, ttl_opt.data() + ttl_opt.size(), ttl); + if (ptr != ttl_opt.data() + ttl_opt.size() || ec != std::errc{}) + return usage("Invalid time-to-live option: {}"_format(ttl_opt)); + } + + if (file_i >= argc) + return usage("No upload files given!"); + + std::vector> uploads; + for (; file_i < argc; file_i++) { + std::filesystem::path f{argv[file_i]}; + if (!std::filesystem::exists(f) || !std::filesystem::is_regular_file(f)) + return usage("{} does not exist or is not a file"_format(f)); + auto s = std::filesystem::file_size(f); + if (s == 0) + return usage("Cannot upload empty file {}"_format(f)); + uploads.emplace_back(std::move(f), s, -1); + } + + quic::Loop loop; + auto ep = quic::Endpoint::endpoint(loop, quic::Address{}); + + auto conn = ep->connect( + quic::RemoteAddress{oxenc::from_hex(pubkey), addr}, + quic::opt::outbound_alpn("quic-files")); + + // We must always open this stream first (even if we aren't going to use it); this is the stream + // on which we can make simple requests; all later streams are for file transfers. + auto bt_str = conn->open_stream(); + + // stream -> [data, uploads_index] + std::unordered_map, std::pair> streams; + + std::atomic remaining{uploads.size()}; + auto on_stream_close = [&](quic::Stream& s, uint64_t err) { + --remaining; + remaining.notify_one(); + auto str = s.shared_from_this(); + auto sid = str->stream_id(); + auto it = streams.find(str); + if (it == streams.end()) { + log::error(cat, "Unknown stream {} closed with error {}!", sid, err); + return; + } + const auto& [data, upload_id] = it->second; + auto& [f, size, fd] = uploads[upload_id]; + if (err != 0) { + log::error(cat, "Upload {} stream (id={}) closed with error {}", f, sid, err); + return; + } + + if (data.empty()) { + log::error( + cat, + "Upload {} stream (id={}) closed without error, but with no response data!", + f, + sid); + return; + } + + try { + oxenc::bt_dict_consumer resp{data}; + std::string id = resp.require("#"); + std::chrono::sys_seconds uploaded{std::chrono::seconds{resp.require("u")}}; + std::chrono::sys_seconds expiry{std::chrono::seconds{resp.require("x")}}; + + log::info( + cat, + "Uploaded {} successfully (on stream {}):\n id: {}\nuploaded: {}\n " + "expiry: {}", + f, + sid, + id, + uploaded, + expiry); + } catch (const std::exception& e) { + log::warning( + cat, "Failed to parse stream {} ({}) upload response: {}", sid, f, e.what()); + } + }; + auto on_stream_data = [&](quic::Stream& s, std::span data) { + auto sid = s.stream_id(); + auto str = s.shared_from_this(); + auto it = streams.find(str); + if (it == streams.end()) { + log::error(cat, "Received data on unknown stream {}!", sid); + return; + } + auto& [d, upload_id] = it->second; + d += std::string_view{reinterpret_cast(data.data()), data.size()}; + }; + + loop.call([&] { + for (size_t i = 0; i < uploads.size(); i++) { + auto& [f, size, fd] = uploads[i]; + fd = open(f.c_str(), O_RDONLY); + if (fd < 0) { + log::error(cat, "Failed to open {}: {}", f, strerror(errno)); + --remaining; + continue; + } + auto str = conn->open_stream(on_stream_data, on_stream_close); + auto next_chunk = [f, fd, str, f_remaining = size](const quic::Stream&) mutable { + std::vector chunk; + if (f_remaining == 0) + return chunk; + chunk.resize(std::min(f_remaining, 65'536)); + int offset = 0; + while (true) { + auto r = read(fd, chunk.data() + offset, chunk.size() - offset); + if (r == -1) { + log::error(cat, "Error while reading from {}: {}", f, strerror(errno)); + str->close(1480); + chunk.clear(); + return chunk; + } + assert(offset + r <= chunk.size()); + if (offset + r >= chunk.size()) + break; + if (r == 0) { + log::error( + cat, + "Error reading from {}: hit EOF too soon (file truncated while " + "reading?)", + f); + str->close(1480); + chunk.clear(); + return chunk; + } + + offset += r; + } + f_remaining -= chunk.size(); + return chunk; + }; + + auto& [s, upload_i] = streams[str]; + upload_i = i; + + oxenc::bt_dict_producer info; + info.append("!", "PUT"); + info.append("s", size); + if (ttl > 0) + info.append("t", ttl); + str->send("{}:{}"_format(info.view().size(), info.view())); + str->send_chunks( + next_chunk, [](quic::Stream& s) { s.send_fin(); }, 100); + } + }); + + size_t r = remaining.load(); + while (r > 0) { + remaining.wait(r); + r = loop.call_get([&] { return remaining.load(); }); + } +}