diff --git a/Framework/AnalysisSupport/src/RNTuplePlugin.cxx b/Framework/AnalysisSupport/src/RNTuplePlugin.cxx index 51b585d0714bb..a910964e6527c 100644 --- a/Framework/AnalysisSupport/src/RNTuplePlugin.cxx +++ b/Framework/AnalysisSupport/src/RNTuplePlugin.cxx @@ -12,6 +12,7 @@ #include "Framework/RuntimeError.h" #include "Framework/RootArrowFilesystem.h" #include "Framework/Plugins.h" +#include "Framework/FairMQResizableBuffer.h" #include #include #include @@ -852,7 +853,10 @@ struct RNTupleObjectReadingImplementation : public RootArrowFactoryPlugin { return new RootArrowFactory{ .options = [context]() { return context->format->DefaultWriteOptions(); }, .format = [context]() { return context->format; }, - }; + .deferredOutputStreamer = [](std::shared_ptr fragment, const std::shared_ptr& buffer) -> std::shared_ptr { + auto treeFragment = std::dynamic_pointer_cast(fragment); + return std::make_shared(buffer); + }}; } }; diff --git a/Framework/AnalysisSupport/src/TTreePlugin.cxx b/Framework/AnalysisSupport/src/TTreePlugin.cxx index 4b130a2144253..881f7d6edc117 100644 --- a/Framework/AnalysisSupport/src/TTreePlugin.cxx +++ b/Framework/AnalysisSupport/src/TTreePlugin.cxx @@ -13,10 +13,15 @@ #include "Framework/Plugins.h" #include "Framework/Signpost.h" #include "Framework/Endian.h" +#include +#include +#include #include #include +#include #include #include +#include #include #include #include @@ -26,13 +31,278 @@ #include #include #include +#include +#include #include +#include +#include O2_DECLARE_DYNAMIC_LOG(root_arrow_fs); namespace o2::framework { +enum struct ReadOpKind { + Unknown, + Offsets, + Values, + Booleans, + VLA +}; + +struct ReadOps { + TBranch* branch = nullptr; + std::shared_ptr targetBuffer = nullptr; + int64_t rootBranchEntries = 0; + size_t typeSize = 0; + size_t listSize = 0; + // If this is an offset reading op, keep track of the actual + // range for the offsets, not only how many VLAs are there. + int64_t offsetCount = 0; + ReadOpKind kind = ReadOpKind::Unknown; +}; + +/// An OutputStream which does the reading of the input buffers directly +/// on writing, if needed. Each deferred operation is encoded in the source +/// buffer by an incremental number which can be used to lookup in the @a ops +/// vector the operation to perform. +class TTreeDeferredReadOutputStream : public arrow::io::OutputStream +{ + public: + explicit TTreeDeferredReadOutputStream(std::vector& ops, + const std::shared_ptr& buffer); + + /// \brief Create in-memory output stream with indicated capacity using a + /// memory pool + /// \param[in] initial_capacity the initial allocated internal capacity of + /// the OutputStream + /// \param[in,out] pool a MemoryPool to use for allocations + /// \return the created stream + static arrow::Result> Create( + std::vector& ops, + int64_t initial_capacity = 4096, + arrow::MemoryPool* pool = arrow::default_memory_pool()); + + // By the time we call the destructor, the contents + // of the buffer are already moved to fairmq + // for being sent. + ~TTreeDeferredReadOutputStream() override = default; + + // Implement the OutputStream interface + + /// Close the stream, preserving the buffer (retrieve it with Finish()). + arrow::Status Close() override; + [[nodiscard]] bool closed() const override; + [[nodiscard]] arrow::Result Tell() const override; + arrow::Status Write(const void* data, int64_t nbytes) override; + + /// \cond FALSE + using OutputStream::Write; + /// \endcond + + /// Close the stream and return the buffer + arrow::Result> Finish(); + + /// \brief Initialize state of OutputStream with newly allocated memory and + /// set position to 0 + /// \param[in] initial_capacity the starting allocated capacity + /// \param[in,out] pool the memory pool to use for allocations + /// \return Status + arrow::Status Reset(std::vector ops, + int64_t initial_capacity, arrow::MemoryPool* pool); + + [[nodiscard]] int64_t capacity() const { return capacity_; } + + private: + TTreeDeferredReadOutputStream(); + std::vector ops_; + + // Ensures there is sufficient space available to write nbytes + arrow::Status Reserve(int64_t nbytes); + + std::shared_ptr buffer_; + bool is_open_; + int64_t capacity_; + int64_t position_; + uint8_t* mutable_data_; +}; + +static constexpr int64_t kBufferMinimumSize = 256; + +TTreeDeferredReadOutputStream::TTreeDeferredReadOutputStream() + : is_open_(false), capacity_(0), position_(0), mutable_data_(nullptr) {} + +TTreeDeferredReadOutputStream::TTreeDeferredReadOutputStream(std::vector& ops, + const std::shared_ptr& buffer) + : ops_(ops), + buffer_(buffer), + is_open_(true), + capacity_(buffer->size()), + position_(0), + mutable_data_(buffer->mutable_data()) {} + +arrow::Result> TTreeDeferredReadOutputStream::Create( + std::vector& ops, + int64_t initial_capacity, arrow::MemoryPool* pool) +{ + // ctor is private, so cannot use make_shared + auto ptr = std::shared_ptr(new TTreeDeferredReadOutputStream); + RETURN_NOT_OK(ptr->Reset(ops, initial_capacity, pool)); + return ptr; +} + +arrow::Status TTreeDeferredReadOutputStream::Reset(std::vector ops, + int64_t initial_capacity, arrow::MemoryPool* pool) +{ + ARROW_ASSIGN_OR_RAISE(buffer_, AllocateResizableBuffer(initial_capacity, pool)); + ops_ = ops; + is_open_ = true; + capacity_ = initial_capacity; + position_ = 0; + mutable_data_ = buffer_->mutable_data(); + return arrow::Status::OK(); +} + +arrow::Status TTreeDeferredReadOutputStream::Close() +{ + if (is_open_) { + is_open_ = false; + if (position_ < capacity_) { + RETURN_NOT_OK(buffer_->Resize(position_, false)); + } + } + return arrow::Status::OK(); +} + +bool TTreeDeferredReadOutputStream::closed() const { return !is_open_; } + +arrow::Result> TTreeDeferredReadOutputStream::Finish() +{ + RETURN_NOT_OK(Close()); + buffer_->ZeroPadding(); + is_open_ = false; + return std::move(buffer_); +} + +arrow::Result TTreeDeferredReadOutputStream::Tell() const { return position_; } + +auto readValues = [](uint8_t* target, ReadOps& op, TBufferFile& rootBuffer) { + int readEntries = 0; + rootBuffer.Reset(); + while (readEntries < op.rootBranchEntries) { + auto readLast = op.branch->GetBulkRead().GetEntriesSerialized(readEntries, rootBuffer); + if (readLast < 0) { + throw runtime_error_f("Error while reading branch %s starting from %zu.", op.branch->GetName(), readEntries); + } + int size = readLast * op.listSize; + readEntries += readLast; + swapCopy(target, rootBuffer.GetCurrent(), size, op.typeSize); + target += (ptrdiff_t)(size * op.typeSize); + } +}; + +auto readBoolValues = [](uint8_t* target, ReadOps& op, TBufferFile& rootBuffer) { + int readEntries = 0; + rootBuffer.Reset(); + // Set to 0 + memset(target, 0, op.targetBuffer->size()); + int readLast = 0; + while (readEntries < op.rootBranchEntries) { + auto beginValue = readLast; + auto readLast = op.branch->GetBulkRead().GetBulkEntries(readEntries, rootBuffer); + int size = readLast * op.listSize; + readEntries += readLast; + for (int i = beginValue; i < beginValue + size; ++i) { + auto value = static_cast(rootBuffer.GetCurrent()[i - beginValue] << (i % 8)); + target[i / 8] |= value; + } + } +}; + +auto readVLAValues = [](uint8_t* target, ReadOps& op, ReadOps const& offsetOp, TBufferFile& rootBuffer) { + int readEntries = 0; + auto* tPtrOffset = reinterpret_cast(offsetOp.targetBuffer->data()); + std::span const offsets{tPtrOffset, tPtrOffset + offsetOp.rootBranchEntries + 1}; + + rootBuffer.Reset(); + while (readEntries < op.rootBranchEntries) { + auto readLast = op.branch->GetBulkRead().GetEntriesSerialized(readEntries, rootBuffer); + int size = offsets[readEntries + readLast] - offsets[readEntries]; + readEntries += readLast; + swapCopy(target, rootBuffer.GetCurrent(), size, op.typeSize); + target += (ptrdiff_t)(size * op.typeSize); + } +}; + +TBufferFile& rootBuffer() +{ + // FIXME: we will need more than one once we have multithreaded reading. + static TBufferFile rootBuffer{TBuffer::EMode::kWrite, 4 * 1024 * 1024}; + return rootBuffer; +} + +arrow::Status TTreeDeferredReadOutputStream::Write(const void* data, int64_t nbytes) +{ + if (ARROW_PREDICT_FALSE(!is_open_)) { + return arrow::Status::IOError("OutputStream is closed"); + } + if (ARROW_PREDICT_TRUE(nbytes == 0)) { + return arrow::Status::OK(); + } + if (ARROW_PREDICT_FALSE(position_ + nbytes >= capacity_)) { + RETURN_NOT_OK(Reserve(nbytes)); + } + // This is a real address which needs to be copied. Do it! + auto ref = (int64_t)data; + if (ref >= ops_.size()) { + memcpy(mutable_data_ + position_, data, nbytes); + position_ += nbytes; + return arrow::Status::OK(); + } + auto& op = ops_[ref]; + + switch (op.kind) { + // Offsets need to be read in advance because we need to know + // how many elements are there in total (since TTree does not allow discovering such informantion) + case ReadOpKind::Offsets: + break; + case ReadOpKind::Values: + readValues(mutable_data_ + position_, op, rootBuffer()); + break; + case ReadOpKind::VLA: + readVLAValues(mutable_data_ + position_, op, ops_[ref - 1], rootBuffer()); + break; + case ReadOpKind::Booleans: + readBoolValues(mutable_data_ + position_, op, rootBuffer()); + break; + case ReadOpKind::Unknown: + throw runtime_error("Unknown Op"); + } + op.branch->SetStatus(false); + op.branch->DropBaskets("all"); + op.branch->Reset(); + op.branch->GetTransientBuffer(0)->Expand(0); + + position_ += nbytes; + return arrow::Status::OK(); +} + +arrow::Status TTreeDeferredReadOutputStream::Reserve(int64_t nbytes) +{ + // Always overallocate by doubling. It seems that it is a better growth + // strategy, at least for memory_benchmark.cc. + // This may be because it helps match the allocator's allocation buckets + // more exactly. Or perhaps it hits a sweet spot in jemalloc. + int64_t new_capacity = std::max(kBufferMinimumSize, capacity_); + new_capacity = position_ + nbytes; + if (new_capacity > capacity_) { + RETURN_NOT_OK(buffer_->Resize(new_capacity)); + capacity_ = new_capacity; + mutable_data_ = buffer_->mutable_data(); + } + return arrow::Status::OK(); +} + class TTreeFileWriteOptions : public arrow::dataset::FileWriteOptions { public: @@ -174,8 +444,21 @@ class TTreeFileFragment : public arrow::dataset::FileFragment return mTree.get(); } + std::vector& ops() + { + return mOps; + } + + /// The pointer to each allocation is an incremental number, indexing a collection to track + /// the size of each allocation. + std::shared_ptr GetPlaceholderForOp(size_t size) + { + return std::make_shared((uint8_t*)(mOps.size() - 1), size); + } + private: std::unique_ptr mTree; + std::vector mOps; }; // An arrow outputstream which allows to write to a TTree. Eventually @@ -246,6 +529,9 @@ bool TTreeOutputStream::closed() const TBranch* TTreeOutputStream::CreateBranch(char const* branchName, char const* sizeBranch) { + if (mBranchPrefix.empty() == true) { + return mTree->Branch(branchName, (char*)nullptr, sizeBranch); + } return mTree->Branch((mBranchPrefix + "/" + branchName).c_str(), (char*)nullptr, (mBranchPrefix + sizeBranch).c_str()); } @@ -263,7 +549,10 @@ struct TTreeObjectReadingImplementation : public RootArrowFactoryPlugin { return new RootArrowFactory{ .options = [context]() { return context->format->DefaultWriteOptions(); }, .format = [context]() { return context->format; }, - }; + .deferredOutputStreamer = [](std::shared_ptr fragment, const std::shared_ptr& buffer) -> std::shared_ptr { + auto treeFragment = std::dynamic_pointer_cast(fragment); + return std::make_shared(treeFragment->ops(), buffer); + }}; } }; @@ -273,10 +562,36 @@ struct BranchFieldMapping { int datasetFieldIdx; }; +auto readOffsets = [](ReadOps& op, TBufferFile& rootBuffer) { + uint32_t offset = 0; + std::span offsets; + int readEntries = 0; + int count = 0; + auto* tPtrOffset = reinterpret_cast(op.targetBuffer->mutable_data()); + offsets = std::span{tPtrOffset, tPtrOffset + op.rootBranchEntries + 1}; + + // read sizes first + rootBuffer.Reset(); + while (readEntries < op.rootBranchEntries) { + auto readLast = op.branch->GetBulkRead().GetEntriesSerialized(readEntries, rootBuffer); + if (readLast == -1) { + throw runtime_error_f("Unable to read from branch %s.", op.branch->GetName()); + } + readEntries += readLast; + for (auto i = 0; i < readLast; ++i) { + offsets[count++] = (int)offset; + offset += swap32_(reinterpret_cast(rootBuffer.GetCurrent())[i]); + } + } + offsets[count] = (int)offset; + op.offsetCount = offset; +}; + arrow::Result TTreeFileFormat::ScanBatchesAsync( const std::shared_ptr& options, const std::shared_ptr& fragment) const { + assert(options->dataset_schema != nullptr); // This is the schema we want to read auto dataset_schema = options->dataset_schema; auto treeFragment = std::dynamic_pointer_cast(fragment); @@ -286,6 +601,8 @@ arrow::Result TTreeFileFormat::ScanBatchesAsync( auto generator = [pool = options->pool, treeFragment, dataset_schema, &totalCompressedSize = mTotCompressedSize, &totalUncompressedSize = mTotUncompressedSize]() -> arrow::Future> { + O2_SIGNPOST_ID_FROM_POINTER(tid, root_arrow_fs, treeFragment->GetTree()); + O2_SIGNPOST_START(root_arrow_fs, tid, "Generator", "Creating batch for tree %{public}s", treeFragment->GetTree()->GetName()); std::vector> columns; std::vector> fields = dataset_schema->fields(); auto physical_schema = *treeFragment->ReadPhysicalSchema(); @@ -297,201 +614,170 @@ arrow::Result TTreeFileFormat::ScanBatchesAsync( // Register physical fields into the cache std::vector mappings; + // We need to count the number of readops to avoid moving the vector. + int opsCount = 0; for (int fi = 0; fi < dataset_schema->num_fields(); ++fi) { auto dataset_field = dataset_schema->field(fi); + // This is needed because for now the dataset_field + // is actually the schema of the ttree + O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Generator", "Processing dataset field %{public}s.", dataset_field->name().c_str()); int physicalFieldIdx = physical_schema->GetFieldIndex(dataset_field->name()); if (physicalFieldIdx < 0) { - throw runtime_error_f("Cannot find physical field associated to %s", dataset_field->name().c_str()); + throw runtime_error_f("Cannot find physical field associated to %s. Possible fields: %s", + dataset_field->name().c_str(), physical_schema->ToString().c_str()); } if (physicalFieldIdx > 1 && physical_schema->field(physicalFieldIdx - 1)->name().ends_with("_size")) { + O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Generator", "Field %{public}s has sizes in %{public}s.", dataset_field->name().c_str(), + physical_schema->field(physicalFieldIdx - 1)->name().c_str()); mappings.push_back({physicalFieldIdx, physicalFieldIdx - 1, fi}); + opsCount += 2; } else { mappings.push_back({physicalFieldIdx, -1, fi}); + opsCount++; } } auto* tree = treeFragment->GetTree(); - tree->SetCacheSize(25000000); auto branches = tree->GetListOfBranches(); + size_t totalTreeSize = 0; + std::vector selectedBranches; for (auto& mapping : mappings) { - tree->AddBranchToCache((TBranch*)branches->At(mapping.mainBranchIdx), false); + selectedBranches.push_back((TBranch*)branches->At(mapping.mainBranchIdx)); + O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Generator", "Adding branch %{public}s to stream.", selectedBranches.back()->GetName()); + totalTreeSize += selectedBranches.back()->GetTotalSize(); if (mapping.vlaIdx != -1) { - tree->AddBranchToCache((TBranch*)branches->At(mapping.vlaIdx), false); + selectedBranches.push_back((TBranch*)branches->At(mapping.vlaIdx)); + O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Generator", "Adding branch %{public}s to stream.", selectedBranches.back()->GetName()); + totalTreeSize += selectedBranches.back()->GetTotalSize(); } } - tree->StopCacheLearningPhase(); - static TBufferFile buffer{TBuffer::EMode::kWrite, 4 * 1024 * 1024}; + size_t cacheSize = std::max(std::min(totalTreeSize, 25000000UL), 1000000UL); + O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Generator", "Resizing cache to %zu.", cacheSize); + tree->SetCacheSize(cacheSize); + for (auto* branch : selectedBranches) { + tree->AddBranchToCache(branch, false); + } + tree->StopCacheLearningPhase(); - int64_t rows = -1; + // Intermediate buffer to bulk read. Two for now + std::vector& ops = treeFragment->ops(); + ops.clear(); + ops.reserve(opsCount); for (size_t mi = 0; mi < mappings.size(); ++mi) { BranchFieldMapping mapping = mappings[mi]; // The field actually on disk auto datasetField = dataset_schema->field(mapping.datasetFieldIdx); auto physicalField = physical_schema->field(mapping.mainBranchIdx); - auto* branch = (TBranch*)branches->At(mapping.mainBranchIdx); - assert(branch); - buffer.Reset(); - auto totalEntries = branch->GetEntries(); - if (rows == -1) { - rows = totalEntries; + + if (mapping.vlaIdx != -1) { + auto* branch = (TBranch*)branches->At(mapping.vlaIdx); + ops.emplace_back(ReadOps{ + .branch = branch, + .rootBranchEntries = branch->GetEntries(), + .typeSize = 4, + .listSize = 1, + .kind = ReadOpKind::Offsets, + }); + auto& op = ops.back(); + ARROW_ASSIGN_OR_RAISE(op.targetBuffer, arrow::AllocateBuffer((op.rootBranchEntries + 1) * op.typeSize, pool)); + // Offsets need to be read immediately to know how many values are there + readOffsets(op, rootBuffer()); } - if (rows != totalEntries) { - throw runtime_error_f("Unmatching number of rows for branch %s", branch->GetName()); + ops.push_back({}); + auto& valueOp = ops.back(); + valueOp.branch = (TBranch*)branches->At(mapping.mainBranchIdx); + valueOp.rootBranchEntries = valueOp.branch->GetEntries(); + // In case this is a vla, we set the offsetCount as totalEntries + // In case we read booleans we need a special coversion from bytes to bits. + auto listType = std::dynamic_pointer_cast(datasetField->type()); + valueOp.typeSize = physicalField->type()->byte_width(); + // Notice how we are not (yet) allocating buffers at this point. We merely + // create placeholders to subsequently fill. + if ((datasetField->type() == arrow::boolean())) { + valueOp.kind = ReadOpKind::Booleans; + valueOp.listSize = 1; + valueOp.targetBuffer = treeFragment->GetPlaceholderForOp((valueOp.rootBranchEntries) / 8 + 1); + } else if (listType && datasetField->type()->field(0)->type() == arrow::boolean()) { + valueOp.typeSize = physicalField->type()->field(0)->type()->byte_width(); + valueOp.listSize = listType->list_size(); + valueOp.kind = ReadOpKind::Booleans; + valueOp.targetBuffer = treeFragment->GetPlaceholderForOp((valueOp.rootBranchEntries * valueOp.listSize) / 8 + 1); + } else if (mapping.vlaIdx != -1) { + valueOp.typeSize = physicalField->type()->field(0)->type()->byte_width(); + valueOp.listSize = -1; + // -1 is the current one, -2 is the one with for the offsets + valueOp.kind = ReadOpKind::VLA; + valueOp.targetBuffer = treeFragment->GetPlaceholderForOp(ops[ops.size() - 2].offsetCount * valueOp.typeSize); + } else if (listType) { + valueOp.kind = ReadOpKind::Values; + valueOp.listSize = listType->list_size(); + valueOp.typeSize = physicalField->type()->field(0)->type()->byte_width(); + valueOp.targetBuffer = treeFragment->GetPlaceholderForOp(valueOp.rootBranchEntries * valueOp.typeSize * valueOp.listSize); + } else { + valueOp.typeSize = physicalField->type()->byte_width(); + valueOp.kind = ReadOpKind::Values; + valueOp.listSize = 1; + valueOp.targetBuffer = treeFragment->GetPlaceholderForOp(valueOp.rootBranchEntries * valueOp.typeSize); } arrow::Status status; - int readEntries = 0; std::shared_ptr array; - auto listType = std::dynamic_pointer_cast(datasetField->type()); - if (datasetField->type() == arrow::boolean() || - (listType && datasetField->type()->field(0)->type() == arrow::boolean())) { - if (listType) { - std::unique_ptr builder = nullptr; - auto status = arrow::MakeBuilder(pool, datasetField->type()->field(0)->type(), &builder); - if (!status.ok()) { - throw runtime_error("Cannot create value builder"); - } - auto listBuilder = std::make_unique(pool, std::move(builder), listType->list_size()); - auto valueBuilder = listBuilder.get()->value_builder(); - // boolean array special case: we need to use builder to create the bitmap - status = valueBuilder->Reserve(totalEntries * listType->list_size()); - status &= listBuilder->Reserve(totalEntries); - if (!status.ok()) { - throw runtime_error("Failed to reserve memory for array builder"); - } - while (readEntries < totalEntries) { - auto readLast = branch->GetBulkRead().GetBulkEntries(readEntries, buffer); - readEntries += readLast; - status &= static_cast(valueBuilder)->AppendValues(reinterpret_cast(buffer.GetCurrent()), readLast * listType->list_size()); - } - status &= static_cast(listBuilder.get())->AppendValues(readEntries); - if (!status.ok()) { - throw runtime_error("Failed to append values to array"); - } - status &= listBuilder->Finish(&array); - if (!status.ok()) { - throw runtime_error("Failed to create array"); - } - } else if (listType == nullptr) { - std::unique_ptr builder = nullptr; - auto status = arrow::MakeBuilder(pool, datasetField->type(), &builder); - if (!status.ok()) { - throw runtime_error("Cannot create builder"); - } - auto valueBuilder = static_cast(builder.get()); - // boolean array special case: we need to use builder to create the bitmap - status = valueBuilder->Reserve(totalEntries); - if (!status.ok()) { - throw runtime_error("Failed to reserve memory for array builder"); - } - while (readEntries < totalEntries) { - auto readLast = branch->GetBulkRead().GetBulkEntries(readEntries, buffer); - readEntries += readLast; - status &= valueBuilder->AppendValues(reinterpret_cast(buffer.GetCurrent()), readLast); - } - if (!status.ok()) { - throw runtime_error("Failed to append values to array"); - } - status &= valueBuilder->Finish(&array); - if (!status.ok()) { - throw runtime_error("Failed to create array"); - } - } - } else { - // This is needed for branches which have not been persisted. - auto bytes = branch->GetTotBytes(); - auto branchSize = bytes ? bytes : 1000000; - auto&& result = arrow::AllocateResizableBuffer(branchSize, pool); - if (!result.ok()) { - throw runtime_error("Cannot allocate values buffer"); - } - std::shared_ptr arrowValuesBuffer = result.MoveValueUnsafe(); - auto ptr = arrowValuesBuffer->mutable_data(); - if (ptr == nullptr) { - throw runtime_error("Invalid buffer"); - } - - std::unique_ptr offsetBuffer = nullptr; - - uint32_t offset = 0; - int count = 0; - std::shared_ptr arrowOffsetBuffer; - std::span offsets; - int size = 0; - uint32_t totalSize = 0; - if (mapping.vlaIdx != -1) { - auto* mSizeBranch = (TBranch*)branches->At(mapping.vlaIdx); - offsetBuffer = std::make_unique(TBuffer::EMode::kWrite, 4 * 1024 * 1024); - result = arrow::AllocateResizableBuffer((totalEntries + 1) * (int64_t)sizeof(int), pool); - if (!result.ok()) { - throw runtime_error("Cannot allocate offset buffer"); - } - arrowOffsetBuffer = result.MoveValueUnsafe(); - unsigned char* ptrOffset = arrowOffsetBuffer->mutable_data(); - auto* tPtrOffset = reinterpret_cast(ptrOffset); - offsets = std::span{tPtrOffset, tPtrOffset + totalEntries + 1}; - - // read sizes first - while (readEntries < totalEntries) { - auto readLast = mSizeBranch->GetBulkRead().GetEntriesSerialized(readEntries, *offsetBuffer); - readEntries += readLast; - for (auto i = 0; i < readLast; ++i) { - offsets[count++] = (int)offset; - offset += swap32_(reinterpret_cast(offsetBuffer->GetCurrent())[i]); - } - } - offsets[count] = (int)offset; - totalSize = offset; - readEntries = 0; - } - int typeSize = physicalField->type()->byte_width(); - int64_t listSize = 1; - if (auto fixedSizeList = std::dynamic_pointer_cast(datasetField->type())) { - listSize = fixedSizeList->list_size(); - typeSize = physicalField->type()->field(0)->type()->byte_width(); - } else if (mapping.vlaIdx != -1) { - typeSize = physicalField->type()->field(0)->type()->byte_width(); - listSize = -1; - } - - while (readEntries < totalEntries) { - auto readLast = branch->GetBulkRead().GetEntriesSerialized(readEntries, buffer); - if (mapping.vlaIdx != -1) { - size = offsets[readEntries + readLast] - offsets[readEntries]; - } else { - size = readLast * listSize; - } - readEntries += readLast; - swapCopy(ptr, buffer.GetCurrent(), size, typeSize); - ptr += (ptrdiff_t)(size * typeSize); - } - if (listSize >= 1) { - totalSize = readEntries * listSize; - } - if (listSize == 1) { - array = std::make_shared(datasetField->type(), readEntries, arrowValuesBuffer); - } else { - auto varray = std::make_shared(datasetField->type()->field(0)->type(), totalSize, arrowValuesBuffer); - if (mapping.vlaIdx != -1) { - array = std::make_shared(datasetField->type(), readEntries, arrowOffsetBuffer, varray); - } else { - array = std::make_shared(datasetField->type(), readEntries, varray); - } - } + if (listType) { + auto varray = std::make_shared(datasetField->type()->field(0)->type(), valueOp.rootBranchEntries * valueOp.listSize, valueOp.targetBuffer); + array = std::make_shared(datasetField->type(), valueOp.rootBranchEntries, varray); + // This is a vla, there is also an offset op + O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Op", "Created op for branch %{public}s with %lli entries, size of the buffer %lli.", + valueOp.branch->GetName(), + valueOp.rootBranchEntries, + valueOp.targetBuffer->size()); + } else if (mapping.vlaIdx != -1) { + auto& offsetOp = ops[ops.size() - 2]; + auto varray = std::make_shared(datasetField->type()->field(0)->type(), offsetOp.offsetCount, valueOp.targetBuffer); + // We have pushed an offset op if this was the case. + array = std::make_shared(datasetField->type(), offsetOp.rootBranchEntries, offsetOp.targetBuffer, varray); + O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Op", "Created op for branch %{public}s with %lli entries, size of the buffer %lli.", + offsetOp.branch->GetName(), offsetOp.rootBranchEntries, offsetOp.targetBuffer->size()); + O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Op", "Created op for branch %{public}s with %lli entries, size of the buffer %lli.", + valueOp.branch->GetName(), + offsetOp.offsetCount, + valueOp.targetBuffer->size()); + } else { + array = std::make_shared(datasetField->type(), valueOp.rootBranchEntries, valueOp.targetBuffer); + O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Op", "Created op for branch %{public}s with %lli entries, size of the buffer %lli.", + valueOp.branch->GetName(), + valueOp.rootBranchEntries, + valueOp.targetBuffer->size()); } - branch->SetStatus(false); - branch->DropBaskets("all"); - branch->Reset(); - branch->GetTransientBuffer(0)->Expand(0); - columns.push_back(array); } + + // Do the actual filling of the buffers. This happens after we have created the whole structure + // so that we can read directly in shared memory. + int64_t rows = -1; + for (size_t i = 0; i < ops.size(); ++i) { + auto& op = ops[i]; + if (rows == -1 && op.kind != ReadOpKind::VLA) { + rows = op.rootBranchEntries; + } + if (rows == -1 && op.kind == ReadOpKind::VLA) { + auto& offsetOp = ops[i - 1]; + rows = offsetOp.rootBranchEntries; + } + if (op.kind != ReadOpKind::VLA && rows != op.rootBranchEntries) { + throw runtime_error_f("Unmatching number of rows for branch %s. Expected %lli, found %lli", op.branch->GetName(), rows, op.rootBranchEntries); + } + if (op.kind == ReadOpKind::VLA && rows != ops[i - 1].rootBranchEntries) { + throw runtime_error_f("Unmatching number of rows for branch %s. Expected %lli, found %lli", op.branch->GetName(), rows, ops[i - 1].offsetCount); + } + } + auto batch = arrow::RecordBatch::Make(dataset_schema, rows, columns); totalCompressedSize += tree->GetZipBytes(); totalUncompressedSize += tree->GetTotBytes(); + O2_SIGNPOST_END(root_arrow_fs, tid, "Generator", "Done creating batch compressed:%zu uncompressed:%zu", totalCompressedSize, totalUncompressedSize); return batch; }; return generator; @@ -817,11 +1103,31 @@ class TTreeFileWriter : public arrow::dataset::FileWriter switch (field->type()->id()) { case arrow::Type::FIXED_SIZE_LIST: { auto list = std::static_pointer_cast(column); - valueArrays.back() = list->values(); + if (list->list_type()->field(0)->type()->id() == arrow::Type::BOOL) { + int64_t length = list->length() * list->list_type()->list_size(); + arrow::UInt8Builder builder; + auto ok = builder.Reserve(length); + // I need to build an array of uint8_t for the conversion to ROOT which uses + // bytes for boolans. + auto boolArray = std::static_pointer_cast(list->values()); + for (int64_t i = 0; i < length; ++i) { + if (boolArray->IsValid(i)) { + // Expand each boolean value (true/false) to uint8 (1/0) + uint8_t value = boolArray->Value(i) ? 1 : 0; + auto ok = builder.Append(value); + } else { + // Append null for invalid entries + auto ok = builder.AppendNull(); + } + } + valueArrays.back() = *builder.Finish(); + } else { + valueArrays.back() = list->values(); + } } break; case arrow::Type::LIST: { auto list = std::static_pointer_cast(column); - valueArrays.back() = list; + valueArrays.back() = list->values(); } break; case arrow::Type::BOOL: { // In case of arrays of booleans, we need to go back to their @@ -867,11 +1173,12 @@ class TTreeFileWriter : public arrow::dataset::FileWriter uint8_t const* buffer = std::static_pointer_cast(valueArray)->values()->data() + array->offset() + list->value_offset(pos) * valueType->byte_width(); branch->SetAddress((void*)buffer); sizeBranch->SetAddress(&listSize); - }; - break; + } break; case arrow::Type::FIXED_SIZE_LIST: default: { - uint8_t const* buffer = std::static_pointer_cast(valueArray)->values()->data() + array->offset() + pos * listSize * valueType->byte_width(); + // needed for the boolean case, I should probably cache this. + auto byteWidth = valueType->byte_width() ? valueType->byte_width() : 1; + uint8_t const* buffer = std::static_pointer_cast(valueArray)->values()->data() + array->offset() + pos * listSize * byteWidth; branch->SetAddress((void*)buffer); }; } diff --git a/Framework/Core/include/Framework/RootArrowFilesystem.h b/Framework/Core/include/Framework/RootArrowFilesystem.h index 441b43aeca331..5aceaed077001 100644 --- a/Framework/Core/include/Framework/RootArrowFilesystem.h +++ b/Framework/Core/include/Framework/RootArrowFilesystem.h @@ -12,6 +12,7 @@ #define O2_FRAMEWORK_ROOT_ARROW_FILESYSTEM_H_ #include +#include #include #include #include @@ -96,6 +97,9 @@ class VirtualRootFileSystemBase : public arrow::fs::FileSystem struct RootArrowFactory final { std::function()> options = nullptr; std::function()> format = nullptr; + // Builds an output streamer which is able to read from the source fragment + // in a deferred way. + std::function(std::shared_ptr, const std::shared_ptr& buffer)> deferredOutputStreamer = nullptr; }; struct RootArrowFactoryPlugin { @@ -144,6 +148,8 @@ class TFileFileSystem : public VirtualRootFileSystemBase TFileFileSystem(TDirectoryFile* f, size_t readahead, RootObjectReadingFactory&); + ~TFileFileSystem() override; + std::string type_name() const override { return "TDirectoryFile"; diff --git a/Framework/Core/src/RootArrowFilesystem.cxx b/Framework/Core/src/RootArrowFilesystem.cxx index c563866e802bb..403e393ec6090 100644 --- a/Framework/Core/src/RootArrowFilesystem.cxx +++ b/Framework/Core/src/RootArrowFilesystem.cxx @@ -42,6 +42,12 @@ TFileFileSystem::TFileFileSystem(TDirectoryFile* f, size_t readahead, RootObject ((TFile*)mFile)->SetReadaheadSize(50 * 1024 * 1024); } +TFileFileSystem::~TFileFileSystem() +{ + mFile->Close(); + delete mFile; +} + std::shared_ptr TFileFileSystem::GetObjectHandler(arrow::dataset::FileSource source) { // We use a plugin to create the actual objects inside the diff --git a/Framework/Core/test/test_Root2ArrowTable.cxx b/Framework/Core/test/test_Root2ArrowTable.cxx index 438f388ec86b5..663be91a1e6f3 100644 --- a/Framework/Core/test/test_Root2ArrowTable.cxx +++ b/Framework/Core/test/test_Root2ArrowTable.cxx @@ -38,6 +38,7 @@ #include #include +#include #include #include #include @@ -388,6 +389,7 @@ bool validatePhysicalSchema(std::shared_ptr schema) { REQUIRE(schema->num_fields() == 12); REQUIRE(schema->field(0)->type()->id() == arrow::float32()->id()); + REQUIRE(schema->field(0)->name() == "px"); REQUIRE(schema->field(1)->type()->id() == arrow::float32()->id()); REQUIRE(schema->field(2)->type()->id() == arrow::float32()->id()); REQUIRE(schema->field(3)->type()->id() == arrow::float64()->id()); @@ -541,12 +543,28 @@ TEST_CASE("RootTree2Dataset") options->dataset_schema = schema; auto scanner = format->ScanBatchesAsync(options, *fragment); REQUIRE(scanner.ok()); + + // This is batch has deferred contents. Therefore we need to use a DeferredOutputStream to + // write it to a real one and read it back with the BufferReader, which is hopefully zero copy + std::shared_ptr batch; + auto batches = (*scanner)(); auto result = batches.result(); REQUIRE(result.ok()); REQUIRE((*result)->columns().size() == 11); REQUIRE((*result)->num_rows() == 100); - validateContents(*result); + std::shared_ptr buffer = *arrow::AllocateResizableBuffer(1000, 64); + auto deferredWriterStream = factory.capabilities[1].factory().deferredOutputStreamer(*fragment, buffer); + auto outBatch = arrow::ipc::MakeStreamWriter(deferredWriterStream.get(), schema); + auto status = outBatch.ValueOrDie()->WriteRecordBatch(**result); + std::shared_ptr bufferReader = std::make_shared(buffer); + auto readerResult = arrow::ipc::RecordBatchStreamReader::Open(bufferReader); + auto batchReader = readerResult.ValueOrDie(); + + auto next = batchReader->ReadNext(&batch); + REQUIRE(batch != nullptr); + + validateContents(batch); auto* output = new TMemFile("foo", "RECREATE"); auto outFs = std::make_shared(output, 0, factory); @@ -558,7 +576,8 @@ TEST_CASE("RootTree2Dataset") // Write to the /DF_3 tree at top level arrow::fs::FileLocator locator{outFs, "/DF_3"}; auto writer = format->MakeWriter(*destination, schema, {}, locator); - auto success = writer->get()->Write(*result); + auto success = writer->get()->Write(batch); + REQUIRE(batch->schema()->field(0)->name() == "px"); auto rootDestination = std::dynamic_pointer_cast(*destination); SECTION("Read tree") @@ -568,7 +587,11 @@ TEST_CASE("RootTree2Dataset") auto tfileFs = std::dynamic_pointer_cast(outFs); REQUIRE(tfileFs.get()); REQUIRE(tfileFs->GetFile()); - REQUIRE(tfileFs->GetFile()->GetObjectChecked("/DF_3", TClass::GetClass("TTree"))); + auto* tree = (TTree*)tfileFs->GetFile()->GetObjectChecked("/DF_3", TClass::GetClass("TTree")); + REQUIRE(tree != nullptr); + REQUIRE(((TBranch*)tree->GetListOfBranches()->At(0))->GetEntries() == 100); + REQUIRE(((TBranch*)tree->GetListOfBranches()->At(0))->GetName() == std::string("px")); + arrow::dataset::FileSource source2("/DF_3", outFs); REQUIRE(format->IsSupported(source2) == true); @@ -577,6 +600,10 @@ TEST_CASE("RootTree2Dataset") REQUIRE(tfileFs->GetFile()); REQUIRE(tfileFs->GetFile()->GetObjectChecked("/DF_3", TClass::GetClass("TTree"))); + tree = (TTree*)tfileFs->GetFile()->GetObjectChecked("/DF_3", TClass::GetClass("TTree")); + REQUIRE(tree != nullptr); + REQUIRE(((TBranch*)tree->GetListOfBranches()->At(0))->GetEntries() == 100); + auto schemaOptWritten = format->Inspect(source2); tfileFs = std::dynamic_pointer_cast(source2.filesystem()); REQUIRE(tfileFs.get()); @@ -585,6 +612,10 @@ TEST_CASE("RootTree2Dataset") REQUIRE(schemaOptWritten.ok()); auto schemaWritten = *schemaOptWritten; + tree = (TTree*)tfileFs->GetFile()->GetObjectChecked("/DF_3", TClass::GetClass("TTree")); + REQUIRE(tree != nullptr); + REQUIRE(((TBranch*)tree->GetListOfBranches()->At(0))->GetEntries() == 100); + REQUIRE(validatePhysicalSchema(schemaWritten)); std::vector> fields; for (auto& field : schemaWritten->fields()) { @@ -599,23 +630,38 @@ TEST_CASE("RootTree2Dataset") auto fragmentWritten = format->MakeFragment(source2, {}, *physicalSchema); REQUIRE(fragmentWritten.ok()); auto optionsWritten = std::make_shared(); - options->dataset_schema = schema; - auto scannerWritten = format->ScanBatchesAsync(optionsWritten, *fragment); + optionsWritten->dataset_schema = schema; + auto scannerWritten = format->ScanBatchesAsync(optionsWritten, *fragmentWritten); REQUIRE(scannerWritten.ok()); - auto batchesWritten = (*scanner)(); - auto resultWritten = batches.result(); + tree = (TTree*)tfileFs->GetFile()->GetObjectChecked("/DF_3", TClass::GetClass("TTree")); + REQUIRE(tree != nullptr); + REQUIRE(((TBranch*)tree->GetListOfBranches()->At(0))->GetEntries() == 100); + auto batchesWritten = (*scannerWritten)(); + auto resultWritten = batchesWritten.result(); REQUIRE(resultWritten.ok()); REQUIRE((*resultWritten)->columns().size() == 11); REQUIRE((*resultWritten)->num_rows() == 100); - validateContents(*resultWritten); + + std::shared_ptr buffer = *arrow::AllocateResizableBuffer(1000, 64); + auto deferredWriterStream2 = factory.capabilities[1].factory().deferredOutputStreamer(*fragmentWritten, buffer); + auto outBatch = arrow::ipc::MakeStreamWriter(deferredWriterStream2.get(), schema); + auto status = outBatch.ValueOrDie()->WriteRecordBatch(**resultWritten); + std::shared_ptr bufferReader = std::make_shared(buffer); + auto readerResult = arrow::ipc::RecordBatchStreamReader::Open(bufferReader); + auto batchReader = readerResult.ValueOrDie(); + + auto next = batchReader->ReadNext(&batch); + REQUIRE(batch != nullptr); + validateContents(batch); } + arrow::fs::FileLocator rnTupleLocator{outFs, "/rntuple"}; // We write an RNTuple in the same TMemFile, using /rntuple as a location auto rntupleDestination = std::dynamic_pointer_cast(*destination); { auto rNtupleWriter = rNtupleFormat->MakeWriter(*destination, schema, {}, rnTupleLocator); - auto rNtupleSuccess = rNtupleWriter->get()->Write(*result); + auto rNtupleSuccess = rNtupleWriter->get()->Write(batch); REQUIRE(rNtupleSuccess.ok()); }