From b2a173f41c74ed736e3b3bf79d5adfd58192b345 Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Fri, 14 Feb 2025 09:28:45 +0100 Subject: [PATCH] DPL: add support for decompressing directly to shared memory This PR postpones the read operations which would usually populate an intermediate RecordBatch and it performs them directly on its subsequent shared memory serialization. Doing so avoids having the intermediate representation allocate most of the memory. For the moment this is only done for the TTree plugin. RNtuple support will come in a subsequent PR. --- .../AnalysisSupport/src/RNTuplePlugin.cxx | 6 +- Framework/AnalysisSupport/src/TTreePlugin.cxx | 633 +++++++++++++----- .../include/Framework/RootArrowFilesystem.h | 6 + Framework/Core/src/RootArrowFilesystem.cxx | 6 + Framework/Core/test/test_Root2ArrowTable.cxx | 64 +- 5 files changed, 542 insertions(+), 173 deletions(-) diff --git a/Framework/AnalysisSupport/src/RNTuplePlugin.cxx b/Framework/AnalysisSupport/src/RNTuplePlugin.cxx index 51b585d0714bb..a910964e6527c 100644 --- a/Framework/AnalysisSupport/src/RNTuplePlugin.cxx +++ b/Framework/AnalysisSupport/src/RNTuplePlugin.cxx @@ -12,6 +12,7 @@ #include "Framework/RuntimeError.h" #include "Framework/RootArrowFilesystem.h" #include "Framework/Plugins.h" +#include "Framework/FairMQResizableBuffer.h" #include #include #include @@ -852,7 +853,10 @@ struct RNTupleObjectReadingImplementation : public RootArrowFactoryPlugin { return new RootArrowFactory{ .options = [context]() { return context->format->DefaultWriteOptions(); }, .format = [context]() { return context->format; }, - }; + .deferredOutputStreamer = [](std::shared_ptr fragment, const std::shared_ptr& buffer) -> std::shared_ptr { + auto treeFragment = std::dynamic_pointer_cast(fragment); + return std::make_shared(buffer); + }}; } }; diff --git a/Framework/AnalysisSupport/src/TTreePlugin.cxx b/Framework/AnalysisSupport/src/TTreePlugin.cxx index 4b130a2144253..881f7d6edc117 100644 --- a/Framework/AnalysisSupport/src/TTreePlugin.cxx +++ b/Framework/AnalysisSupport/src/TTreePlugin.cxx @@ -13,10 +13,15 @@ #include "Framework/Plugins.h" #include "Framework/Signpost.h" #include "Framework/Endian.h" +#include +#include +#include #include #include +#include #include #include +#include #include #include #include @@ -26,13 +31,278 @@ #include #include #include +#include +#include #include +#include +#include O2_DECLARE_DYNAMIC_LOG(root_arrow_fs); namespace o2::framework { +enum struct ReadOpKind { + Unknown, + Offsets, + Values, + Booleans, + VLA +}; + +struct ReadOps { + TBranch* branch = nullptr; + std::shared_ptr targetBuffer = nullptr; + int64_t rootBranchEntries = 0; + size_t typeSize = 0; + size_t listSize = 0; + // If this is an offset reading op, keep track of the actual + // range for the offsets, not only how many VLAs are there. + int64_t offsetCount = 0; + ReadOpKind kind = ReadOpKind::Unknown; +}; + +/// An OutputStream which does the reading of the input buffers directly +/// on writing, if needed. Each deferred operation is encoded in the source +/// buffer by an incremental number which can be used to lookup in the @a ops +/// vector the operation to perform. +class TTreeDeferredReadOutputStream : public arrow::io::OutputStream +{ + public: + explicit TTreeDeferredReadOutputStream(std::vector& ops, + const std::shared_ptr& buffer); + + /// \brief Create in-memory output stream with indicated capacity using a + /// memory pool + /// \param[in] initial_capacity the initial allocated internal capacity of + /// the OutputStream + /// \param[in,out] pool a MemoryPool to use for allocations + /// \return the created stream + static arrow::Result> Create( + std::vector& ops, + int64_t initial_capacity = 4096, + arrow::MemoryPool* pool = arrow::default_memory_pool()); + + // By the time we call the destructor, the contents + // of the buffer are already moved to fairmq + // for being sent. + ~TTreeDeferredReadOutputStream() override = default; + + // Implement the OutputStream interface + + /// Close the stream, preserving the buffer (retrieve it with Finish()). + arrow::Status Close() override; + [[nodiscard]] bool closed() const override; + [[nodiscard]] arrow::Result Tell() const override; + arrow::Status Write(const void* data, int64_t nbytes) override; + + /// \cond FALSE + using OutputStream::Write; + /// \endcond + + /// Close the stream and return the buffer + arrow::Result> Finish(); + + /// \brief Initialize state of OutputStream with newly allocated memory and + /// set position to 0 + /// \param[in] initial_capacity the starting allocated capacity + /// \param[in,out] pool the memory pool to use for allocations + /// \return Status + arrow::Status Reset(std::vector ops, + int64_t initial_capacity, arrow::MemoryPool* pool); + + [[nodiscard]] int64_t capacity() const { return capacity_; } + + private: + TTreeDeferredReadOutputStream(); + std::vector ops_; + + // Ensures there is sufficient space available to write nbytes + arrow::Status Reserve(int64_t nbytes); + + std::shared_ptr buffer_; + bool is_open_; + int64_t capacity_; + int64_t position_; + uint8_t* mutable_data_; +}; + +static constexpr int64_t kBufferMinimumSize = 256; + +TTreeDeferredReadOutputStream::TTreeDeferredReadOutputStream() + : is_open_(false), capacity_(0), position_(0), mutable_data_(nullptr) {} + +TTreeDeferredReadOutputStream::TTreeDeferredReadOutputStream(std::vector& ops, + const std::shared_ptr& buffer) + : ops_(ops), + buffer_(buffer), + is_open_(true), + capacity_(buffer->size()), + position_(0), + mutable_data_(buffer->mutable_data()) {} + +arrow::Result> TTreeDeferredReadOutputStream::Create( + std::vector& ops, + int64_t initial_capacity, arrow::MemoryPool* pool) +{ + // ctor is private, so cannot use make_shared + auto ptr = std::shared_ptr(new TTreeDeferredReadOutputStream); + RETURN_NOT_OK(ptr->Reset(ops, initial_capacity, pool)); + return ptr; +} + +arrow::Status TTreeDeferredReadOutputStream::Reset(std::vector ops, + int64_t initial_capacity, arrow::MemoryPool* pool) +{ + ARROW_ASSIGN_OR_RAISE(buffer_, AllocateResizableBuffer(initial_capacity, pool)); + ops_ = ops; + is_open_ = true; + capacity_ = initial_capacity; + position_ = 0; + mutable_data_ = buffer_->mutable_data(); + return arrow::Status::OK(); +} + +arrow::Status TTreeDeferredReadOutputStream::Close() +{ + if (is_open_) { + is_open_ = false; + if (position_ < capacity_) { + RETURN_NOT_OK(buffer_->Resize(position_, false)); + } + } + return arrow::Status::OK(); +} + +bool TTreeDeferredReadOutputStream::closed() const { return !is_open_; } + +arrow::Result> TTreeDeferredReadOutputStream::Finish() +{ + RETURN_NOT_OK(Close()); + buffer_->ZeroPadding(); + is_open_ = false; + return std::move(buffer_); +} + +arrow::Result TTreeDeferredReadOutputStream::Tell() const { return position_; } + +auto readValues = [](uint8_t* target, ReadOps& op, TBufferFile& rootBuffer) { + int readEntries = 0; + rootBuffer.Reset(); + while (readEntries < op.rootBranchEntries) { + auto readLast = op.branch->GetBulkRead().GetEntriesSerialized(readEntries, rootBuffer); + if (readLast < 0) { + throw runtime_error_f("Error while reading branch %s starting from %zu.", op.branch->GetName(), readEntries); + } + int size = readLast * op.listSize; + readEntries += readLast; + swapCopy(target, rootBuffer.GetCurrent(), size, op.typeSize); + target += (ptrdiff_t)(size * op.typeSize); + } +}; + +auto readBoolValues = [](uint8_t* target, ReadOps& op, TBufferFile& rootBuffer) { + int readEntries = 0; + rootBuffer.Reset(); + // Set to 0 + memset(target, 0, op.targetBuffer->size()); + int readLast = 0; + while (readEntries < op.rootBranchEntries) { + auto beginValue = readLast; + auto readLast = op.branch->GetBulkRead().GetBulkEntries(readEntries, rootBuffer); + int size = readLast * op.listSize; + readEntries += readLast; + for (int i = beginValue; i < beginValue + size; ++i) { + auto value = static_cast(rootBuffer.GetCurrent()[i - beginValue] << (i % 8)); + target[i / 8] |= value; + } + } +}; + +auto readVLAValues = [](uint8_t* target, ReadOps& op, ReadOps const& offsetOp, TBufferFile& rootBuffer) { + int readEntries = 0; + auto* tPtrOffset = reinterpret_cast(offsetOp.targetBuffer->data()); + std::span const offsets{tPtrOffset, tPtrOffset + offsetOp.rootBranchEntries + 1}; + + rootBuffer.Reset(); + while (readEntries < op.rootBranchEntries) { + auto readLast = op.branch->GetBulkRead().GetEntriesSerialized(readEntries, rootBuffer); + int size = offsets[readEntries + readLast] - offsets[readEntries]; + readEntries += readLast; + swapCopy(target, rootBuffer.GetCurrent(), size, op.typeSize); + target += (ptrdiff_t)(size * op.typeSize); + } +}; + +TBufferFile& rootBuffer() +{ + // FIXME: we will need more than one once we have multithreaded reading. + static TBufferFile rootBuffer{TBuffer::EMode::kWrite, 4 * 1024 * 1024}; + return rootBuffer; +} + +arrow::Status TTreeDeferredReadOutputStream::Write(const void* data, int64_t nbytes) +{ + if (ARROW_PREDICT_FALSE(!is_open_)) { + return arrow::Status::IOError("OutputStream is closed"); + } + if (ARROW_PREDICT_TRUE(nbytes == 0)) { + return arrow::Status::OK(); + } + if (ARROW_PREDICT_FALSE(position_ + nbytes >= capacity_)) { + RETURN_NOT_OK(Reserve(nbytes)); + } + // This is a real address which needs to be copied. Do it! + auto ref = (int64_t)data; + if (ref >= ops_.size()) { + memcpy(mutable_data_ + position_, data, nbytes); + position_ += nbytes; + return arrow::Status::OK(); + } + auto& op = ops_[ref]; + + switch (op.kind) { + // Offsets need to be read in advance because we need to know + // how many elements are there in total (since TTree does not allow discovering such informantion) + case ReadOpKind::Offsets: + break; + case ReadOpKind::Values: + readValues(mutable_data_ + position_, op, rootBuffer()); + break; + case ReadOpKind::VLA: + readVLAValues(mutable_data_ + position_, op, ops_[ref - 1], rootBuffer()); + break; + case ReadOpKind::Booleans: + readBoolValues(mutable_data_ + position_, op, rootBuffer()); + break; + case ReadOpKind::Unknown: + throw runtime_error("Unknown Op"); + } + op.branch->SetStatus(false); + op.branch->DropBaskets("all"); + op.branch->Reset(); + op.branch->GetTransientBuffer(0)->Expand(0); + + position_ += nbytes; + return arrow::Status::OK(); +} + +arrow::Status TTreeDeferredReadOutputStream::Reserve(int64_t nbytes) +{ + // Always overallocate by doubling. It seems that it is a better growth + // strategy, at least for memory_benchmark.cc. + // This may be because it helps match the allocator's allocation buckets + // more exactly. Or perhaps it hits a sweet spot in jemalloc. + int64_t new_capacity = std::max(kBufferMinimumSize, capacity_); + new_capacity = position_ + nbytes; + if (new_capacity > capacity_) { + RETURN_NOT_OK(buffer_->Resize(new_capacity)); + capacity_ = new_capacity; + mutable_data_ = buffer_->mutable_data(); + } + return arrow::Status::OK(); +} + class TTreeFileWriteOptions : public arrow::dataset::FileWriteOptions { public: @@ -174,8 +444,21 @@ class TTreeFileFragment : public arrow::dataset::FileFragment return mTree.get(); } + std::vector& ops() + { + return mOps; + } + + /// The pointer to each allocation is an incremental number, indexing a collection to track + /// the size of each allocation. + std::shared_ptr GetPlaceholderForOp(size_t size) + { + return std::make_shared((uint8_t*)(mOps.size() - 1), size); + } + private: std::unique_ptr mTree; + std::vector mOps; }; // An arrow outputstream which allows to write to a TTree. Eventually @@ -246,6 +529,9 @@ bool TTreeOutputStream::closed() const TBranch* TTreeOutputStream::CreateBranch(char const* branchName, char const* sizeBranch) { + if (mBranchPrefix.empty() == true) { + return mTree->Branch(branchName, (char*)nullptr, sizeBranch); + } return mTree->Branch((mBranchPrefix + "/" + branchName).c_str(), (char*)nullptr, (mBranchPrefix + sizeBranch).c_str()); } @@ -263,7 +549,10 @@ struct TTreeObjectReadingImplementation : public RootArrowFactoryPlugin { return new RootArrowFactory{ .options = [context]() { return context->format->DefaultWriteOptions(); }, .format = [context]() { return context->format; }, - }; + .deferredOutputStreamer = [](std::shared_ptr fragment, const std::shared_ptr& buffer) -> std::shared_ptr { + auto treeFragment = std::dynamic_pointer_cast(fragment); + return std::make_shared(treeFragment->ops(), buffer); + }}; } }; @@ -273,10 +562,36 @@ struct BranchFieldMapping { int datasetFieldIdx; }; +auto readOffsets = [](ReadOps& op, TBufferFile& rootBuffer) { + uint32_t offset = 0; + std::span offsets; + int readEntries = 0; + int count = 0; + auto* tPtrOffset = reinterpret_cast(op.targetBuffer->mutable_data()); + offsets = std::span{tPtrOffset, tPtrOffset + op.rootBranchEntries + 1}; + + // read sizes first + rootBuffer.Reset(); + while (readEntries < op.rootBranchEntries) { + auto readLast = op.branch->GetBulkRead().GetEntriesSerialized(readEntries, rootBuffer); + if (readLast == -1) { + throw runtime_error_f("Unable to read from branch %s.", op.branch->GetName()); + } + readEntries += readLast; + for (auto i = 0; i < readLast; ++i) { + offsets[count++] = (int)offset; + offset += swap32_(reinterpret_cast(rootBuffer.GetCurrent())[i]); + } + } + offsets[count] = (int)offset; + op.offsetCount = offset; +}; + arrow::Result TTreeFileFormat::ScanBatchesAsync( const std::shared_ptr& options, const std::shared_ptr& fragment) const { + assert(options->dataset_schema != nullptr); // This is the schema we want to read auto dataset_schema = options->dataset_schema; auto treeFragment = std::dynamic_pointer_cast(fragment); @@ -286,6 +601,8 @@ arrow::Result TTreeFileFormat::ScanBatchesAsync( auto generator = [pool = options->pool, treeFragment, dataset_schema, &totalCompressedSize = mTotCompressedSize, &totalUncompressedSize = mTotUncompressedSize]() -> arrow::Future> { + O2_SIGNPOST_ID_FROM_POINTER(tid, root_arrow_fs, treeFragment->GetTree()); + O2_SIGNPOST_START(root_arrow_fs, tid, "Generator", "Creating batch for tree %{public}s", treeFragment->GetTree()->GetName()); std::vector> columns; std::vector> fields = dataset_schema->fields(); auto physical_schema = *treeFragment->ReadPhysicalSchema(); @@ -297,201 +614,170 @@ arrow::Result TTreeFileFormat::ScanBatchesAsync( // Register physical fields into the cache std::vector mappings; + // We need to count the number of readops to avoid moving the vector. + int opsCount = 0; for (int fi = 0; fi < dataset_schema->num_fields(); ++fi) { auto dataset_field = dataset_schema->field(fi); + // This is needed because for now the dataset_field + // is actually the schema of the ttree + O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Generator", "Processing dataset field %{public}s.", dataset_field->name().c_str()); int physicalFieldIdx = physical_schema->GetFieldIndex(dataset_field->name()); if (physicalFieldIdx < 0) { - throw runtime_error_f("Cannot find physical field associated to %s", dataset_field->name().c_str()); + throw runtime_error_f("Cannot find physical field associated to %s. Possible fields: %s", + dataset_field->name().c_str(), physical_schema->ToString().c_str()); } if (physicalFieldIdx > 1 && physical_schema->field(physicalFieldIdx - 1)->name().ends_with("_size")) { + O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Generator", "Field %{public}s has sizes in %{public}s.", dataset_field->name().c_str(), + physical_schema->field(physicalFieldIdx - 1)->name().c_str()); mappings.push_back({physicalFieldIdx, physicalFieldIdx - 1, fi}); + opsCount += 2; } else { mappings.push_back({physicalFieldIdx, -1, fi}); + opsCount++; } } auto* tree = treeFragment->GetTree(); - tree->SetCacheSize(25000000); auto branches = tree->GetListOfBranches(); + size_t totalTreeSize = 0; + std::vector selectedBranches; for (auto& mapping : mappings) { - tree->AddBranchToCache((TBranch*)branches->At(mapping.mainBranchIdx), false); + selectedBranches.push_back((TBranch*)branches->At(mapping.mainBranchIdx)); + O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Generator", "Adding branch %{public}s to stream.", selectedBranches.back()->GetName()); + totalTreeSize += selectedBranches.back()->GetTotalSize(); if (mapping.vlaIdx != -1) { - tree->AddBranchToCache((TBranch*)branches->At(mapping.vlaIdx), false); + selectedBranches.push_back((TBranch*)branches->At(mapping.vlaIdx)); + O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Generator", "Adding branch %{public}s to stream.", selectedBranches.back()->GetName()); + totalTreeSize += selectedBranches.back()->GetTotalSize(); } } - tree->StopCacheLearningPhase(); - static TBufferFile buffer{TBuffer::EMode::kWrite, 4 * 1024 * 1024}; + size_t cacheSize = std::max(std::min(totalTreeSize, 25000000UL), 1000000UL); + O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Generator", "Resizing cache to %zu.", cacheSize); + tree->SetCacheSize(cacheSize); + for (auto* branch : selectedBranches) { + tree->AddBranchToCache(branch, false); + } + tree->StopCacheLearningPhase(); - int64_t rows = -1; + // Intermediate buffer to bulk read. Two for now + std::vector& ops = treeFragment->ops(); + ops.clear(); + ops.reserve(opsCount); for (size_t mi = 0; mi < mappings.size(); ++mi) { BranchFieldMapping mapping = mappings[mi]; // The field actually on disk auto datasetField = dataset_schema->field(mapping.datasetFieldIdx); auto physicalField = physical_schema->field(mapping.mainBranchIdx); - auto* branch = (TBranch*)branches->At(mapping.mainBranchIdx); - assert(branch); - buffer.Reset(); - auto totalEntries = branch->GetEntries(); - if (rows == -1) { - rows = totalEntries; + + if (mapping.vlaIdx != -1) { + auto* branch = (TBranch*)branches->At(mapping.vlaIdx); + ops.emplace_back(ReadOps{ + .branch = branch, + .rootBranchEntries = branch->GetEntries(), + .typeSize = 4, + .listSize = 1, + .kind = ReadOpKind::Offsets, + }); + auto& op = ops.back(); + ARROW_ASSIGN_OR_RAISE(op.targetBuffer, arrow::AllocateBuffer((op.rootBranchEntries + 1) * op.typeSize, pool)); + // Offsets need to be read immediately to know how many values are there + readOffsets(op, rootBuffer()); } - if (rows != totalEntries) { - throw runtime_error_f("Unmatching number of rows for branch %s", branch->GetName()); + ops.push_back({}); + auto& valueOp = ops.back(); + valueOp.branch = (TBranch*)branches->At(mapping.mainBranchIdx); + valueOp.rootBranchEntries = valueOp.branch->GetEntries(); + // In case this is a vla, we set the offsetCount as totalEntries + // In case we read booleans we need a special coversion from bytes to bits. + auto listType = std::dynamic_pointer_cast(datasetField->type()); + valueOp.typeSize = physicalField->type()->byte_width(); + // Notice how we are not (yet) allocating buffers at this point. We merely + // create placeholders to subsequently fill. + if ((datasetField->type() == arrow::boolean())) { + valueOp.kind = ReadOpKind::Booleans; + valueOp.listSize = 1; + valueOp.targetBuffer = treeFragment->GetPlaceholderForOp((valueOp.rootBranchEntries) / 8 + 1); + } else if (listType && datasetField->type()->field(0)->type() == arrow::boolean()) { + valueOp.typeSize = physicalField->type()->field(0)->type()->byte_width(); + valueOp.listSize = listType->list_size(); + valueOp.kind = ReadOpKind::Booleans; + valueOp.targetBuffer = treeFragment->GetPlaceholderForOp((valueOp.rootBranchEntries * valueOp.listSize) / 8 + 1); + } else if (mapping.vlaIdx != -1) { + valueOp.typeSize = physicalField->type()->field(0)->type()->byte_width(); + valueOp.listSize = -1; + // -1 is the current one, -2 is the one with for the offsets + valueOp.kind = ReadOpKind::VLA; + valueOp.targetBuffer = treeFragment->GetPlaceholderForOp(ops[ops.size() - 2].offsetCount * valueOp.typeSize); + } else if (listType) { + valueOp.kind = ReadOpKind::Values; + valueOp.listSize = listType->list_size(); + valueOp.typeSize = physicalField->type()->field(0)->type()->byte_width(); + valueOp.targetBuffer = treeFragment->GetPlaceholderForOp(valueOp.rootBranchEntries * valueOp.typeSize * valueOp.listSize); + } else { + valueOp.typeSize = physicalField->type()->byte_width(); + valueOp.kind = ReadOpKind::Values; + valueOp.listSize = 1; + valueOp.targetBuffer = treeFragment->GetPlaceholderForOp(valueOp.rootBranchEntries * valueOp.typeSize); } arrow::Status status; - int readEntries = 0; std::shared_ptr array; - auto listType = std::dynamic_pointer_cast(datasetField->type()); - if (datasetField->type() == arrow::boolean() || - (listType && datasetField->type()->field(0)->type() == arrow::boolean())) { - if (listType) { - std::unique_ptr builder = nullptr; - auto status = arrow::MakeBuilder(pool, datasetField->type()->field(0)->type(), &builder); - if (!status.ok()) { - throw runtime_error("Cannot create value builder"); - } - auto listBuilder = std::make_unique(pool, std::move(builder), listType->list_size()); - auto valueBuilder = listBuilder.get()->value_builder(); - // boolean array special case: we need to use builder to create the bitmap - status = valueBuilder->Reserve(totalEntries * listType->list_size()); - status &= listBuilder->Reserve(totalEntries); - if (!status.ok()) { - throw runtime_error("Failed to reserve memory for array builder"); - } - while (readEntries < totalEntries) { - auto readLast = branch->GetBulkRead().GetBulkEntries(readEntries, buffer); - readEntries += readLast; - status &= static_cast(valueBuilder)->AppendValues(reinterpret_cast(buffer.GetCurrent()), readLast * listType->list_size()); - } - status &= static_cast(listBuilder.get())->AppendValues(readEntries); - if (!status.ok()) { - throw runtime_error("Failed to append values to array"); - } - status &= listBuilder->Finish(&array); - if (!status.ok()) { - throw runtime_error("Failed to create array"); - } - } else if (listType == nullptr) { - std::unique_ptr builder = nullptr; - auto status = arrow::MakeBuilder(pool, datasetField->type(), &builder); - if (!status.ok()) { - throw runtime_error("Cannot create builder"); - } - auto valueBuilder = static_cast(builder.get()); - // boolean array special case: we need to use builder to create the bitmap - status = valueBuilder->Reserve(totalEntries); - if (!status.ok()) { - throw runtime_error("Failed to reserve memory for array builder"); - } - while (readEntries < totalEntries) { - auto readLast = branch->GetBulkRead().GetBulkEntries(readEntries, buffer); - readEntries += readLast; - status &= valueBuilder->AppendValues(reinterpret_cast(buffer.GetCurrent()), readLast); - } - if (!status.ok()) { - throw runtime_error("Failed to append values to array"); - } - status &= valueBuilder->Finish(&array); - if (!status.ok()) { - throw runtime_error("Failed to create array"); - } - } - } else { - // This is needed for branches which have not been persisted. - auto bytes = branch->GetTotBytes(); - auto branchSize = bytes ? bytes : 1000000; - auto&& result = arrow::AllocateResizableBuffer(branchSize, pool); - if (!result.ok()) { - throw runtime_error("Cannot allocate values buffer"); - } - std::shared_ptr arrowValuesBuffer = result.MoveValueUnsafe(); - auto ptr = arrowValuesBuffer->mutable_data(); - if (ptr == nullptr) { - throw runtime_error("Invalid buffer"); - } - - std::unique_ptr offsetBuffer = nullptr; - - uint32_t offset = 0; - int count = 0; - std::shared_ptr arrowOffsetBuffer; - std::span offsets; - int size = 0; - uint32_t totalSize = 0; - if (mapping.vlaIdx != -1) { - auto* mSizeBranch = (TBranch*)branches->At(mapping.vlaIdx); - offsetBuffer = std::make_unique(TBuffer::EMode::kWrite, 4 * 1024 * 1024); - result = arrow::AllocateResizableBuffer((totalEntries + 1) * (int64_t)sizeof(int), pool); - if (!result.ok()) { - throw runtime_error("Cannot allocate offset buffer"); - } - arrowOffsetBuffer = result.MoveValueUnsafe(); - unsigned char* ptrOffset = arrowOffsetBuffer->mutable_data(); - auto* tPtrOffset = reinterpret_cast(ptrOffset); - offsets = std::span{tPtrOffset, tPtrOffset + totalEntries + 1}; - - // read sizes first - while (readEntries < totalEntries) { - auto readLast = mSizeBranch->GetBulkRead().GetEntriesSerialized(readEntries, *offsetBuffer); - readEntries += readLast; - for (auto i = 0; i < readLast; ++i) { - offsets[count++] = (int)offset; - offset += swap32_(reinterpret_cast(offsetBuffer->GetCurrent())[i]); - } - } - offsets[count] = (int)offset; - totalSize = offset; - readEntries = 0; - } - int typeSize = physicalField->type()->byte_width(); - int64_t listSize = 1; - if (auto fixedSizeList = std::dynamic_pointer_cast(datasetField->type())) { - listSize = fixedSizeList->list_size(); - typeSize = physicalField->type()->field(0)->type()->byte_width(); - } else if (mapping.vlaIdx != -1) { - typeSize = physicalField->type()->field(0)->type()->byte_width(); - listSize = -1; - } - - while (readEntries < totalEntries) { - auto readLast = branch->GetBulkRead().GetEntriesSerialized(readEntries, buffer); - if (mapping.vlaIdx != -1) { - size = offsets[readEntries + readLast] - offsets[readEntries]; - } else { - size = readLast * listSize; - } - readEntries += readLast; - swapCopy(ptr, buffer.GetCurrent(), size, typeSize); - ptr += (ptrdiff_t)(size * typeSize); - } - if (listSize >= 1) { - totalSize = readEntries * listSize; - } - if (listSize == 1) { - array = std::make_shared(datasetField->type(), readEntries, arrowValuesBuffer); - } else { - auto varray = std::make_shared(datasetField->type()->field(0)->type(), totalSize, arrowValuesBuffer); - if (mapping.vlaIdx != -1) { - array = std::make_shared(datasetField->type(), readEntries, arrowOffsetBuffer, varray); - } else { - array = std::make_shared(datasetField->type(), readEntries, varray); - } - } + if (listType) { + auto varray = std::make_shared(datasetField->type()->field(0)->type(), valueOp.rootBranchEntries * valueOp.listSize, valueOp.targetBuffer); + array = std::make_shared(datasetField->type(), valueOp.rootBranchEntries, varray); + // This is a vla, there is also an offset op + O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Op", "Created op for branch %{public}s with %lli entries, size of the buffer %lli.", + valueOp.branch->GetName(), + valueOp.rootBranchEntries, + valueOp.targetBuffer->size()); + } else if (mapping.vlaIdx != -1) { + auto& offsetOp = ops[ops.size() - 2]; + auto varray = std::make_shared(datasetField->type()->field(0)->type(), offsetOp.offsetCount, valueOp.targetBuffer); + // We have pushed an offset op if this was the case. + array = std::make_shared(datasetField->type(), offsetOp.rootBranchEntries, offsetOp.targetBuffer, varray); + O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Op", "Created op for branch %{public}s with %lli entries, size of the buffer %lli.", + offsetOp.branch->GetName(), offsetOp.rootBranchEntries, offsetOp.targetBuffer->size()); + O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Op", "Created op for branch %{public}s with %lli entries, size of the buffer %lli.", + valueOp.branch->GetName(), + offsetOp.offsetCount, + valueOp.targetBuffer->size()); + } else { + array = std::make_shared(datasetField->type(), valueOp.rootBranchEntries, valueOp.targetBuffer); + O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Op", "Created op for branch %{public}s with %lli entries, size of the buffer %lli.", + valueOp.branch->GetName(), + valueOp.rootBranchEntries, + valueOp.targetBuffer->size()); } - branch->SetStatus(false); - branch->DropBaskets("all"); - branch->Reset(); - branch->GetTransientBuffer(0)->Expand(0); - columns.push_back(array); } + + // Do the actual filling of the buffers. This happens after we have created the whole structure + // so that we can read directly in shared memory. + int64_t rows = -1; + for (size_t i = 0; i < ops.size(); ++i) { + auto& op = ops[i]; + if (rows == -1 && op.kind != ReadOpKind::VLA) { + rows = op.rootBranchEntries; + } + if (rows == -1 && op.kind == ReadOpKind::VLA) { + auto& offsetOp = ops[i - 1]; + rows = offsetOp.rootBranchEntries; + } + if (op.kind != ReadOpKind::VLA && rows != op.rootBranchEntries) { + throw runtime_error_f("Unmatching number of rows for branch %s. Expected %lli, found %lli", op.branch->GetName(), rows, op.rootBranchEntries); + } + if (op.kind == ReadOpKind::VLA && rows != ops[i - 1].rootBranchEntries) { + throw runtime_error_f("Unmatching number of rows for branch %s. Expected %lli, found %lli", op.branch->GetName(), rows, ops[i - 1].offsetCount); + } + } + auto batch = arrow::RecordBatch::Make(dataset_schema, rows, columns); totalCompressedSize += tree->GetZipBytes(); totalUncompressedSize += tree->GetTotBytes(); + O2_SIGNPOST_END(root_arrow_fs, tid, "Generator", "Done creating batch compressed:%zu uncompressed:%zu", totalCompressedSize, totalUncompressedSize); return batch; }; return generator; @@ -817,11 +1103,31 @@ class TTreeFileWriter : public arrow::dataset::FileWriter switch (field->type()->id()) { case arrow::Type::FIXED_SIZE_LIST: { auto list = std::static_pointer_cast(column); - valueArrays.back() = list->values(); + if (list->list_type()->field(0)->type()->id() == arrow::Type::BOOL) { + int64_t length = list->length() * list->list_type()->list_size(); + arrow::UInt8Builder builder; + auto ok = builder.Reserve(length); + // I need to build an array of uint8_t for the conversion to ROOT which uses + // bytes for boolans. + auto boolArray = std::static_pointer_cast(list->values()); + for (int64_t i = 0; i < length; ++i) { + if (boolArray->IsValid(i)) { + // Expand each boolean value (true/false) to uint8 (1/0) + uint8_t value = boolArray->Value(i) ? 1 : 0; + auto ok = builder.Append(value); + } else { + // Append null for invalid entries + auto ok = builder.AppendNull(); + } + } + valueArrays.back() = *builder.Finish(); + } else { + valueArrays.back() = list->values(); + } } break; case arrow::Type::LIST: { auto list = std::static_pointer_cast(column); - valueArrays.back() = list; + valueArrays.back() = list->values(); } break; case arrow::Type::BOOL: { // In case of arrays of booleans, we need to go back to their @@ -867,11 +1173,12 @@ class TTreeFileWriter : public arrow::dataset::FileWriter uint8_t const* buffer = std::static_pointer_cast(valueArray)->values()->data() + array->offset() + list->value_offset(pos) * valueType->byte_width(); branch->SetAddress((void*)buffer); sizeBranch->SetAddress(&listSize); - }; - break; + } break; case arrow::Type::FIXED_SIZE_LIST: default: { - uint8_t const* buffer = std::static_pointer_cast(valueArray)->values()->data() + array->offset() + pos * listSize * valueType->byte_width(); + // needed for the boolean case, I should probably cache this. + auto byteWidth = valueType->byte_width() ? valueType->byte_width() : 1; + uint8_t const* buffer = std::static_pointer_cast(valueArray)->values()->data() + array->offset() + pos * listSize * byteWidth; branch->SetAddress((void*)buffer); }; } diff --git a/Framework/Core/include/Framework/RootArrowFilesystem.h b/Framework/Core/include/Framework/RootArrowFilesystem.h index 441b43aeca331..5aceaed077001 100644 --- a/Framework/Core/include/Framework/RootArrowFilesystem.h +++ b/Framework/Core/include/Framework/RootArrowFilesystem.h @@ -12,6 +12,7 @@ #define O2_FRAMEWORK_ROOT_ARROW_FILESYSTEM_H_ #include +#include #include #include #include @@ -96,6 +97,9 @@ class VirtualRootFileSystemBase : public arrow::fs::FileSystem struct RootArrowFactory final { std::function()> options = nullptr; std::function()> format = nullptr; + // Builds an output streamer which is able to read from the source fragment + // in a deferred way. + std::function(std::shared_ptr, const std::shared_ptr& buffer)> deferredOutputStreamer = nullptr; }; struct RootArrowFactoryPlugin { @@ -144,6 +148,8 @@ class TFileFileSystem : public VirtualRootFileSystemBase TFileFileSystem(TDirectoryFile* f, size_t readahead, RootObjectReadingFactory&); + ~TFileFileSystem() override; + std::string type_name() const override { return "TDirectoryFile"; diff --git a/Framework/Core/src/RootArrowFilesystem.cxx b/Framework/Core/src/RootArrowFilesystem.cxx index c563866e802bb..403e393ec6090 100644 --- a/Framework/Core/src/RootArrowFilesystem.cxx +++ b/Framework/Core/src/RootArrowFilesystem.cxx @@ -42,6 +42,12 @@ TFileFileSystem::TFileFileSystem(TDirectoryFile* f, size_t readahead, RootObject ((TFile*)mFile)->SetReadaheadSize(50 * 1024 * 1024); } +TFileFileSystem::~TFileFileSystem() +{ + mFile->Close(); + delete mFile; +} + std::shared_ptr TFileFileSystem::GetObjectHandler(arrow::dataset::FileSource source) { // We use a plugin to create the actual objects inside the diff --git a/Framework/Core/test/test_Root2ArrowTable.cxx b/Framework/Core/test/test_Root2ArrowTable.cxx index 438f388ec86b5..663be91a1e6f3 100644 --- a/Framework/Core/test/test_Root2ArrowTable.cxx +++ b/Framework/Core/test/test_Root2ArrowTable.cxx @@ -38,6 +38,7 @@ #include #include +#include #include #include #include @@ -388,6 +389,7 @@ bool validatePhysicalSchema(std::shared_ptr schema) { REQUIRE(schema->num_fields() == 12); REQUIRE(schema->field(0)->type()->id() == arrow::float32()->id()); + REQUIRE(schema->field(0)->name() == "px"); REQUIRE(schema->field(1)->type()->id() == arrow::float32()->id()); REQUIRE(schema->field(2)->type()->id() == arrow::float32()->id()); REQUIRE(schema->field(3)->type()->id() == arrow::float64()->id()); @@ -541,12 +543,28 @@ TEST_CASE("RootTree2Dataset") options->dataset_schema = schema; auto scanner = format->ScanBatchesAsync(options, *fragment); REQUIRE(scanner.ok()); + + // This is batch has deferred contents. Therefore we need to use a DeferredOutputStream to + // write it to a real one and read it back with the BufferReader, which is hopefully zero copy + std::shared_ptr batch; + auto batches = (*scanner)(); auto result = batches.result(); REQUIRE(result.ok()); REQUIRE((*result)->columns().size() == 11); REQUIRE((*result)->num_rows() == 100); - validateContents(*result); + std::shared_ptr buffer = *arrow::AllocateResizableBuffer(1000, 64); + auto deferredWriterStream = factory.capabilities[1].factory().deferredOutputStreamer(*fragment, buffer); + auto outBatch = arrow::ipc::MakeStreamWriter(deferredWriterStream.get(), schema); + auto status = outBatch.ValueOrDie()->WriteRecordBatch(**result); + std::shared_ptr bufferReader = std::make_shared(buffer); + auto readerResult = arrow::ipc::RecordBatchStreamReader::Open(bufferReader); + auto batchReader = readerResult.ValueOrDie(); + + auto next = batchReader->ReadNext(&batch); + REQUIRE(batch != nullptr); + + validateContents(batch); auto* output = new TMemFile("foo", "RECREATE"); auto outFs = std::make_shared(output, 0, factory); @@ -558,7 +576,8 @@ TEST_CASE("RootTree2Dataset") // Write to the /DF_3 tree at top level arrow::fs::FileLocator locator{outFs, "/DF_3"}; auto writer = format->MakeWriter(*destination, schema, {}, locator); - auto success = writer->get()->Write(*result); + auto success = writer->get()->Write(batch); + REQUIRE(batch->schema()->field(0)->name() == "px"); auto rootDestination = std::dynamic_pointer_cast(*destination); SECTION("Read tree") @@ -568,7 +587,11 @@ TEST_CASE("RootTree2Dataset") auto tfileFs = std::dynamic_pointer_cast(outFs); REQUIRE(tfileFs.get()); REQUIRE(tfileFs->GetFile()); - REQUIRE(tfileFs->GetFile()->GetObjectChecked("/DF_3", TClass::GetClass("TTree"))); + auto* tree = (TTree*)tfileFs->GetFile()->GetObjectChecked("/DF_3", TClass::GetClass("TTree")); + REQUIRE(tree != nullptr); + REQUIRE(((TBranch*)tree->GetListOfBranches()->At(0))->GetEntries() == 100); + REQUIRE(((TBranch*)tree->GetListOfBranches()->At(0))->GetName() == std::string("px")); + arrow::dataset::FileSource source2("/DF_3", outFs); REQUIRE(format->IsSupported(source2) == true); @@ -577,6 +600,10 @@ TEST_CASE("RootTree2Dataset") REQUIRE(tfileFs->GetFile()); REQUIRE(tfileFs->GetFile()->GetObjectChecked("/DF_3", TClass::GetClass("TTree"))); + tree = (TTree*)tfileFs->GetFile()->GetObjectChecked("/DF_3", TClass::GetClass("TTree")); + REQUIRE(tree != nullptr); + REQUIRE(((TBranch*)tree->GetListOfBranches()->At(0))->GetEntries() == 100); + auto schemaOptWritten = format->Inspect(source2); tfileFs = std::dynamic_pointer_cast(source2.filesystem()); REQUIRE(tfileFs.get()); @@ -585,6 +612,10 @@ TEST_CASE("RootTree2Dataset") REQUIRE(schemaOptWritten.ok()); auto schemaWritten = *schemaOptWritten; + tree = (TTree*)tfileFs->GetFile()->GetObjectChecked("/DF_3", TClass::GetClass("TTree")); + REQUIRE(tree != nullptr); + REQUIRE(((TBranch*)tree->GetListOfBranches()->At(0))->GetEntries() == 100); + REQUIRE(validatePhysicalSchema(schemaWritten)); std::vector> fields; for (auto& field : schemaWritten->fields()) { @@ -599,23 +630,38 @@ TEST_CASE("RootTree2Dataset") auto fragmentWritten = format->MakeFragment(source2, {}, *physicalSchema); REQUIRE(fragmentWritten.ok()); auto optionsWritten = std::make_shared(); - options->dataset_schema = schema; - auto scannerWritten = format->ScanBatchesAsync(optionsWritten, *fragment); + optionsWritten->dataset_schema = schema; + auto scannerWritten = format->ScanBatchesAsync(optionsWritten, *fragmentWritten); REQUIRE(scannerWritten.ok()); - auto batchesWritten = (*scanner)(); - auto resultWritten = batches.result(); + tree = (TTree*)tfileFs->GetFile()->GetObjectChecked("/DF_3", TClass::GetClass("TTree")); + REQUIRE(tree != nullptr); + REQUIRE(((TBranch*)tree->GetListOfBranches()->At(0))->GetEntries() == 100); + auto batchesWritten = (*scannerWritten)(); + auto resultWritten = batchesWritten.result(); REQUIRE(resultWritten.ok()); REQUIRE((*resultWritten)->columns().size() == 11); REQUIRE((*resultWritten)->num_rows() == 100); - validateContents(*resultWritten); + + std::shared_ptr buffer = *arrow::AllocateResizableBuffer(1000, 64); + auto deferredWriterStream2 = factory.capabilities[1].factory().deferredOutputStreamer(*fragmentWritten, buffer); + auto outBatch = arrow::ipc::MakeStreamWriter(deferredWriterStream2.get(), schema); + auto status = outBatch.ValueOrDie()->WriteRecordBatch(**resultWritten); + std::shared_ptr bufferReader = std::make_shared(buffer); + auto readerResult = arrow::ipc::RecordBatchStreamReader::Open(bufferReader); + auto batchReader = readerResult.ValueOrDie(); + + auto next = batchReader->ReadNext(&batch); + REQUIRE(batch != nullptr); + validateContents(batch); } + arrow::fs::FileLocator rnTupleLocator{outFs, "/rntuple"}; // We write an RNTuple in the same TMemFile, using /rntuple as a location auto rntupleDestination = std::dynamic_pointer_cast(*destination); { auto rNtupleWriter = rNtupleFormat->MakeWriter(*destination, schema, {}, rnTupleLocator); - auto rNtupleSuccess = rNtupleWriter->get()->Write(*result); + auto rNtupleSuccess = rNtupleWriter->get()->Write(batch); REQUIRE(rNtupleSuccess.ok()); }