Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,6 @@ __pycache__
/key_x25519
.vscode
.venv
/quic/build*
/quic/compile_commands.json
/quic/.cache
9 changes: 9 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
[submodule "quic/oxen-libquic"]
path = quic/oxen-libquic
url = https://github.com/oxen-io/oxen-libquic.git
[submodule "quic/libpqxx"]
path = quic/libpqxx
url = https://github.com/jtv/libpqxx.git
[submodule "quic/CLI11"]
path = quic/CLI11
url = https://github.com/CLIUtils/CLI11.git
1 change: 1 addition & 0 deletions quic/CLI11
Submodule CLI11 added at 5248d5
71 changes: 71 additions & 0 deletions quic/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
cmake_minimum_required(VERSION 3.20...3.31)

set(CMAKE_EXPORT_COMPILE_COMMANDS ON)

find_program(CCACHE_PROGRAM ccache)
if(CCACHE_PROGRAM)
foreach(lang C CXX)
if(NOT DEFINED CMAKE_${lang}_COMPILER_LAUNCHER AND NOT CMAKE_${lang}_COMPILER MATCHES ".*/ccache")
message(STATUS "Enabling ccache for ${lang}")
set(CMAKE_${lang}_COMPILER_LAUNCHER ${CCACHE_PROGRAM} CACHE STRING "")
endif()
endforeach()
endif()

project(quic-files
VERSION 0.0.1
LANGUAGES CXX)

set(CMAKE_CXX_STANDARD 20)
set(CMAKE_CXX_STANDARD_REQUIRED TRUE)
set(CMAKE_CXX_EXTENSIONS FALSE)

include(cmake/submodule_check.cmake)
include(cmake/system_or_submodule.cmake)

find_package(PkgConfig REQUIRED)

system_or_submodule(OXENC oxenc oxenc::oxenc liboxenc>=1.4.0 oxen-libquic/external/oxen-encoding)
system_or_submodule(OXENLOGGING oxen-logging oxen::logging liboxen-logging>=1.2.0 oxen-libquic/external/oxen-logging)
system_or_submodule(FMT fmt fmt::fmt fmt>=9 oxen-libquic/external/oxen-logging/fmt)
system_or_submodule(LIBPQXX libpqxx libpqxx::pqxx libpqxx>=7.10.0 libpqxx)
system_or_submodule(CLI11 CLI11 CLI11::CLI11 CLI11>=2.3.0 CLI11)
system_or_submodule(OXENQUIC quic oxen::quic liboxenquic>=1.6 oxen-libquic)

pkg_check_modules(NLOHMANN_JSON IMPORTED_TARGET REQUIRED nlohmann_json>=3.7.0)
pkg_check_modules(SODIUM IMPORTED_TARGET REQUIRED libsodium>=1.0.17)
pkg_check_modules(LIBURING IMPORTED_TARGET REQUIRED liburing>=2.3)

add_executable(quic-files
quic-files.cpp
requests.cpp
put.cpp
get.cpp
cleanup.cpp
)

add_library(quic-files-common INTERFACE)
target_link_libraries(quic-files-common INTERFACE
oxenc::oxenc
oxen::logging
oxen::quic
fmt::fmt
)

target_link_libraries(quic-files
PRIVATE
quic-files-common
PkgConfig::NLOHMANN_JSON
PkgConfig::SODIUM
libpqxx::pqxx
CLI11::CLI11
PkgConfig::LIBURING
)

option(BUILD_TESTS "Build upload/download test programs" ON)
if(BUILD_TESTS)
foreach(x IN ITEMS upload download query)
add_executable(quic-${x} test/test-${x}.cpp)
target_link_libraries(quic-${x} PRIVATE quic-files-common)
endforeach()
endif()
139 changes: 139 additions & 0 deletions quic/cleanup.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
#include "cleanup.hpp"

#include <liburing.h>

#include <chrono>
#include <exception>
#include <pqxx/pqxx>

#include "common.hpp" // IWYU pragma: keep

namespace sfs {

auto logcat = log::Cat("files.cleanup");

namespace {
class Cleaner {
io_uring iou;
int files_dir_fd;
pqxx::connection conn;
std::future<void> stop;

public:
Cleaner(const std::filesystem::path& files_dir,
std::string pgsql_uri,
std::future<void> stop) :
conn{pgsql_uri}, stop{std::move(stop)} {
std::filesystem::create_directories(files_dir);
files_dir_fd = open(files_dir.c_str(), O_PATH | O_DIRECTORY);
if (files_dir_fd < 0)
throw std::runtime_error{
"Unable to open files path {}: {}"_format(files_dir, strerror(errno))};

if (int err = io_uring_queue_init(128, &iou, IORING_SETUP_SINGLE_ISSUER); err != 0) {
close(files_dir_fd);
throw std::runtime_error{
"Failed to initialize io_uring queue: {}"_format(strerror(-err))};
}
}

void cleanup() {
pg_retryable([this] {
pqxx::work tx{conn};

auto started = std::chrono::steady_clock::now();
std::vector<std::filesystem::path> removed;
unsigned submitted = 0;
for (auto r : tx.exec("DELETE FROM files WHERE expiry <= NOW() RETURNING id")) {
removed.push_back(id_to_path(r[0].as<std::string>()));
auto& filepath = removed.back();

auto* sqe = io_uring_get_sqe(&iou);
if (!sqe) {
io_uring_submit(&iou);
sqe = io_uring_get_sqe(&iou);
}

io_uring_sqe_set_flags(sqe, 0);
io_uring_sqe_set_data64(sqe, removed.size() - 1);
io_uring_prep_unlinkat(sqe, files_dir_fd, filepath.c_str(), 0);
submitted++;
}
io_uring_submit(&iou);

bool success = true;

while (submitted > 0) {
unsigned nr = std::min<unsigned>(iou.cq.ring_entries, submitted);
struct io_uring_cqe* cqe;
io_uring_wait_cqe_nr(&iou, &cqe, nr);

unsigned head;
io_uring_for_each_cqe(&iou, head, cqe) {
const auto& id = removed[io_uring_cqe_get_data64(cqe)];
if (cqe->res < 0) {
if (cqe->res == -ENOENT)
log::debug(logcat, "Failed to remove {}: file already gone", id);
else {
log::warning(
logcat, "Failed to remove {}: {}", id, strerror(-cqe->res));
success = false;
}
} else {
log::debug(logcat, "Removed expired file {}", id);
}
}

io_uring_cq_advance(&iou, nr);

submitted -= nr;
}

if (success) {
log::info(
logcat,
"Deleted {} expired files in {}",
removed.size(),
std::chrono::steady_clock::now() - started);
tx.commit();
} else
tx.abort();
});
}

bool wait() { return stop.wait_for(30s) == std::future_status::timeout; }
};
} // namespace

std::thread start_cleanup_thread(
const std::string& pgsql_uri,
const std::filesystem::path& files_dir,
std::future<void> stop) {

std::promise<void> started_prom;
auto started = started_prom.get_future();
std::thread th{[&started_prom, &files_dir, &pgsql_uri, &stop] {
std::optional<Cleaner> cleaner;
try {
cleaner.emplace(files_dir, std::move(pgsql_uri), std::move(stop));
} catch (...) {
started_prom.set_exception(std::current_exception());
}

started_prom.set_value();

do {
try {
cleaner->cleanup();
} catch (const std::exception& e) {
log::error(logcat, "Cleanup failed: {}", e.what());
}
} while (cleaner->wait());
}};

started.get();

return th;
}

} // namespace sfs
15 changes: 15 additions & 0 deletions quic/cleanup.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#pragma once

#include <filesystem>
#include <future>
#include <string>
#include <thread>

namespace sfs {

std::thread start_cleanup_thread(
const std::string& pgsql_uri,
const std::filesystem::path& files_dir,
std::future<void> stop);

}
31 changes: 31 additions & 0 deletions quic/cmake/submodule_check.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
option(SUBMODULE_CHECK "Enables checking that vendored library submodules are up to date" ON)
if(SUBMODULE_CHECK)
find_package(Git)
if(GIT_FOUND)
function(check_submodule relative_path)
execute_process(COMMAND git rev-parse "HEAD" WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/${relative_path} OUTPUT_VARIABLE localHead)
execute_process(COMMAND git rev-parse "HEAD:quic/${relative_path}" WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} OUTPUT_VARIABLE checkedHead)
string(COMPARE EQUAL "${localHead}" "${checkedHead}" upToDate)
if (upToDate)
message(STATUS "Submodule 'quic/${relative_path}' is up-to-date")
else()
message(FATAL_ERROR "Submodule 'quic/${relative_path}' is not up-to-date. Please update with\ngit submodule update --init --recursive\nor run cmake with -DSUBMODULE_CHECK=OFF")
endif()

# Extra arguments check nested submodules
foreach(submod ${ARGN})
execute_process(COMMAND git rev-parse "HEAD" WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/${relative_path}/${submod} OUTPUT_VARIABLE localHead)
execute_process(COMMAND git rev-parse "HEAD:${submod}" WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/${relative_path} OUTPUT_VARIABLE checkedHead)
string(COMPARE EQUAL "${localHead}" "${checkedHead}" upToDate)
if (NOT upToDate)
message(FATAL_ERROR "Nested submodule '${relative_path}/${submod}' is not up-to-date. Please update with\ngit submodule update --init --recursive\nor run cmake with -DSUBMODULE_CHECK=OFF")
endif()
endforeach()
endfunction ()

message(STATUS "Checking submodules")
check_submodule(libpqxx)
check_submodule(oxen-libquic external/oxen-encoding external/oxen-logging)
endif()
endif()

23 changes: 23 additions & 0 deletions quic/cmake/system_or_submodule.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
macro(system_or_submodule BIGNAME smallname target pkgconf subdir)
if(NOT TARGET ${target})
option(FORCE_${BIGNAME}_SUBMODULE "force using ${smallname} submodule" OFF)
if(NOT BUILD_STATIC_DEPS AND NOT FORCE_${BIGNAME}_SUBMODULE AND NOT FORCE_ALL_SUBMODULES)
pkg_check_modules(${BIGNAME} ${pkgconf} IMPORTED_TARGET)
endif()
if(${BIGNAME}_FOUND)
add_library(${smallname} INTERFACE)
if(NOT TARGET PkgConfig::${BIGNAME} AND CMAKE_VERSION VERSION_LESS "3.21")
# Work around cmake bug 22180 (PkgConfig::THING not set if no flags needed)
else()
target_link_libraries(${smallname} INTERFACE PkgConfig::${BIGNAME})
endif()
message(STATUS "Found system ${smallname} ${${BIGNAME}_VERSION}")
else()
message(STATUS "using ${smallname} submodule ${subdir}")
add_subdirectory(${subdir} EXCLUDE_FROM_ALL)
endif()
if(NOT TARGET ${target})
add_library(${target} ALIAS ${smallname})
endif()
endif()
endmacro()
Loading