Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 4 additions & 102 deletions Framework/Core/include/Framework/TableBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,6 @@ class Table;
class Array;
} // namespace arrow

template <typename T>
struct BulkInfo {
const T ptr;
size_t size;
};

extern template class arrow::NumericBuilder<arrow::UInt8Type>;
extern template class arrow::NumericBuilder<arrow::UInt32Type>;
extern template class arrow::NumericBuilder<arrow::FloatType>;
Expand Down Expand Up @@ -200,34 +194,6 @@ struct BuilderUtils {
}
}

template <typename HolderType, typename PTR>
static arrow::Status bulkAppend(HolderType& holder, size_t bulkSize, const PTR ptr)
{
return holder.builder->AppendValues(ptr, bulkSize, nullptr);
}

template <typename HolderType, typename PTR>
static arrow::Status bulkAppendChunked(HolderType& holder, BulkInfo<PTR> info)
{
// Appending nullptr is a no-op.
if (info.ptr == nullptr) {
return arrow::Status::OK();
}
if constexpr (std::is_same_v<decltype(holder.builder), std::unique_ptr<arrow::FixedSizeListBuilder>>) {
if (appendToList<std::remove_pointer_t<decltype(info.ptr)>>(holder.builder, info.ptr, info.size).ok() == false) {
throw runtime_error("Unable to append to column");
} else {
return arrow::Status::OK();
}
} else {
if (holder.builder->AppendValues(info.ptr, info.size, nullptr).ok() == false) {
throw runtime_error("Unable to append to column");
} else {
return arrow::Status::OK();
}
}
}

template <typename HolderType, typename ITERATOR>
static arrow::Status append(HolderType& holder, std::pair<ITERATOR, ITERATOR> ip)
{
Expand Down Expand Up @@ -518,14 +484,6 @@ struct TableBuilderHelpers {
return {BuilderTraits<ARGS>::make_datatype()...};
}

template <typename... ARGS, size_t NCOLUMNS = sizeof...(ARGS)>
static std::vector<std::shared_ptr<arrow::Field>> makeFields(std::array<char const*, NCOLUMNS> const& names)
{
char const* const* names_ptr = names.data();
return {
std::make_shared<arrow::Field>(*names_ptr++, BuilderMaker<ARGS>::make_datatype(), true, nullptr)...};
}

/// Invokes the append method for each entry in the tuple
template <typename... Ts, typename VALUES>
static bool append(std::tuple<Ts...>& holders, VALUES&& values)
Expand All @@ -542,19 +500,6 @@ struct TableBuilderHelpers {
(BuilderUtils::unsafeAppend(std::get<Ts::index>(holders), std::get<Ts::index>(values)), ...);
}

template <typename... Ts, typename PTRS>
static bool bulkAppend(std::tuple<Ts...>& holders, size_t bulkSize, PTRS ptrs)
{
return (BuilderUtils::bulkAppend(std::get<Ts::index>(holders), bulkSize, std::get<Ts::index>(ptrs)).ok() && ...);
}

/// Return true if all columns are done.
template <typename... Ts, typename INFOS>
static bool bulkAppendChunked(std::tuple<Ts...>& holders, INFOS infos)
{
return (BuilderUtils::bulkAppendChunked(std::get<Ts::index>(holders), std::get<Ts::index>(infos)).ok() && ...);
}

/// Invokes the append method for each entry in the tuple
template <typename... Ts>
static bool finalize(std::vector<std::shared_ptr<arrow::Array>>& arrays, std::tuple<Ts...>& holders)
Expand All @@ -575,15 +520,9 @@ constexpr auto tuple_to_pack(std::tuple<ARGS...>&&)
return framework::pack<ARGS...>{};
}

template <typename T>
concept BulkInsertable = (std::integral<std::decay<T>> && !std::same_as<bool, std::decay_t<T>>);

template <typename T>
struct InsertionTrait {
static consteval DirectInsertion<T> policy()
requires(!BulkInsertable<T>);
static consteval CachedInsertion<T> policy()
requires(BulkInsertable<T>);
static consteval DirectInsertion<T> policy();
using Policy = decltype(policy());
};

Expand Down Expand Up @@ -658,7 +597,9 @@ class TableBuilder
template <typename... ARGS, size_t I = sizeof...(ARGS)>
auto makeBuilders(std::array<char const*, I> const& columnNames, size_t nRows)
{
mSchema = std::make_shared<arrow::Schema>(TableBuilderHelpers::makeFields<ARGS...>(columnNames));
char const* const* names_ptr = columnNames.data();
mSchema = std::make_shared<arrow::Schema>(
std::vector<std::shared_ptr<arrow::Field>>({std::make_shared<arrow::Field>(*names_ptr++, BuilderMaker<ARGS>::make_datatype(), true, nullptr)...}));

mHolders = makeHolders<ARGS...>(mMemoryPool, nRows);
mFinalizer = [](std::vector<std::shared_ptr<arrow::Array>>& arrays, void* holders) -> bool {
Expand Down Expand Up @@ -768,45 +709,6 @@ class TableBuilder
}(typename T::table_t::persistent_columns_t{});
}

template <typename... ARGS, size_t NCOLUMNS = sizeof...(ARGS)>
auto preallocatedPersist(std::array<char const*, NCOLUMNS> const& columnNames, int nRows)
{
constexpr size_t nColumns = NCOLUMNS;
validate();
mArrays.resize(nColumns);
makeBuilders<ARGS...>(columnNames, nRows);

// Callback used to fill the builders
return [holders = mHolders](unsigned int /*slot*/, typename BuilderMaker<ARGS>::FillType... args) -> void {
TableBuilderHelpers::unsafeAppend(*(HoldersTupleIndexed<ARGS...>*)holders, std::forward_as_tuple(args...));
};
}

template <typename... ARGS, size_t NCOLUMNS = sizeof...(ARGS)>
auto bulkPersist(std::array<char const*, NCOLUMNS> const& columnNames, size_t nRows)
{
validate();
// Should not be called more than once
mArrays.resize(NCOLUMNS);
makeBuilders<ARGS...>(columnNames, nRows);

return [holders = mHolders](unsigned int /*slot*/, size_t batchSize, typename BuilderMaker<ARGS>::FillType const*... args) -> void {
TableBuilderHelpers::bulkAppend(*(HoldersTupleIndexed<ARGS...>*)holders, batchSize, std::forward_as_tuple(args...));
};
}

template <typename... ARGS, size_t NCOLUMNS = sizeof...(ARGS)>
auto bulkPersistChunked(std::array<char const*, NCOLUMNS> const& columnNames, size_t nRows)
{
validate();
mArrays.resize(NCOLUMNS);
makeBuilders<ARGS...>(columnNames, nRows);

return [holders = mHolders](unsigned int /*slot*/, BulkInfo<typename BuilderMaker<ARGS>::STLValueType const*>... args) -> bool {
return TableBuilderHelpers::bulkAppendChunked(*(HoldersTupleIndexed<ARGS...>*)holders, std::forward_as_tuple(args...));
};
}

/// Reserve method to expand the columns as needed.
template <typename... Ts>
auto reserveArrays(std::tuple<Ts...>& holders, int s)
Expand Down
33 changes: 0 additions & 33 deletions Framework/Core/test/benchmark_TableBuilder.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -62,39 +62,6 @@ static void BM_TableBuilderScalarReserved(benchmark::State& state)
BENCHMARK(BM_TableBuilderScalarReserved)->Arg(1 << 21);
BENCHMARK(BM_TableBuilderScalarReserved)->Range(8, 8 << 16);

static void BM_TableBuilderScalarPresized(benchmark::State& state)
{
using namespace o2::framework;
for (auto _ : state) {
TableBuilder builder;
auto rowWriter = builder.preallocatedPersist<float>({"x"}, state.range(0));
for (auto i = 0; i < state.range(0); ++i) {
rowWriter(0, 0.f);
}
auto table = builder.finalize();
}
}

BENCHMARK(BM_TableBuilderScalarPresized)->Arg(1 << 20);
BENCHMARK(BM_TableBuilderScalarPresized)->Range(8, 8 << 16);

static void BM_TableBuilderScalarBulk(benchmark::State& state)
{
using namespace o2::framework;
auto chunkSize = state.range(0) / 256;
std::vector<float> buffer(chunkSize, 0.); // We assume data is chunked in blocks 256th of the total size
for (auto _ : state) {
TableBuilder builder;
auto bulkWriter = builder.bulkPersist<float>({"x"}, state.range(0));
for (auto i = 0; i < state.range(0) / chunkSize; ++i) {
bulkWriter(0, chunkSize, buffer.data());
}
auto table = builder.finalize();
}
}

BENCHMARK(BM_TableBuilderScalarBulk)->Range(256, 1 << 20);

static void BM_TableBuilderSimple(benchmark::State& state)
{
using namespace o2::framework;
Expand Down
34 changes: 0 additions & 34 deletions Framework/Core/test/test_TableBuilder.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -162,30 +162,6 @@ TEST_CASE("TestTableBuilderStruct")
}
}

TEST_CASE("TestTableBuilderBulk")
{
using namespace o2::framework;
TableBuilder builder;
auto bulkWriter = builder.bulkPersist<int, int>({"x", "y"}, 10);
int x[] = {0, 1, 2, 3, 4, 5, 6, 7};
int y[] = {0, 1, 2, 3, 4, 5, 6, 7};

bulkWriter(0, 8, x, y);

auto table = builder.finalize();
REQUIRE(table->num_columns() == 2);
REQUIRE(table->num_rows() == 8);
REQUIRE(table->schema()->field(0)->name() == "x");
REQUIRE(table->schema()->field(1)->name() == "y");
REQUIRE(table->schema()->field(0)->type()->id() == arrow::int32()->id());
REQUIRE(table->schema()->field(1)->type()->id() == arrow::int32()->id());

for (int64_t i = 0; i < 8; ++i) {
auto p = std::dynamic_pointer_cast<arrow::NumericArray<arrow::Int32Type>>(table->column(0)->chunk(0));
REQUIRE(p->Value(i) == i);
}
}

TEST_CASE("TestTableBuilderMore")
{
using namespace o2::framework;
Expand Down Expand Up @@ -288,13 +264,3 @@ TEST_CASE("TestColumnCount")
int count2 = TableBuilder::countColumns<float, int, char[3]>();
REQUIRE(count2 == 3);
}

TEST_CASE("TestMakeFields")
{
auto fields = TableBuilderHelpers::makeFields<int, float>({"i", "f"});
REQUIRE(fields.size() == 2);
REQUIRE(fields[0]->name() == "i");
REQUIRE(fields[1]->name() == "f");
REQUIRE(fields[0]->type()->name() == "int32");
REQUIRE(fields[1]->type()->name() == "float");
}