From 9f728cd830a86ed9a36b2405cb19e0b1b1308083 Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Fri, 21 Feb 2025 15:02:31 +0100 Subject: [PATCH] DPL: add ability to create arrow::RecordBatches directly in shared memory without allocations --- Framework/Core/CMakeLists.txt | 1 + .../Core/include/Framework/EmptyFragment.h | 116 ++++++++++++++ Framework/Core/src/EmptyFragment.cxx | 151 ++++++++++++++++++ 3 files changed, 268 insertions(+) create mode 100644 Framework/Core/include/Framework/EmptyFragment.h create mode 100644 Framework/Core/src/EmptyFragment.cxx diff --git a/Framework/Core/CMakeLists.txt b/Framework/Core/CMakeLists.txt index 103b559f642e2..c006a4135557b 100644 --- a/Framework/Core/CMakeLists.txt +++ b/Framework/Core/CMakeLists.txt @@ -48,6 +48,7 @@ o2_add_library(Framework src/DataProcessingStates.cxx src/DefaultsHelpers.cxx src/DomainInfoHeader.cxx + src/EmptyFragment.cxx src/ProcessingPoliciesHelpers.cxx src/ConfigParamDiscovery.cxx src/ConfigParamStore.cxx diff --git a/Framework/Core/include/Framework/EmptyFragment.h b/Framework/Core/include/Framework/EmptyFragment.h new file mode 100644 index 0000000000000..d0e86ab8e23c0 --- /dev/null +++ b/Framework/Core/include/Framework/EmptyFragment.h @@ -0,0 +1,116 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. +#ifndef O2_FRAMEWORK_DEFERREDFRAGMENT_H +#define O2_FRAMEWORK_DEFERREDFRAGMENT_H + +#include + +namespace o2::framework +{ + +// A Fragment which will create a preallocated batch in shared memory +// and fill it directly in place. +class EmptyFragment : public arrow::dataset::Fragment +{ + public: + // @a numRows is the number of rows in the final result. + // @a physical_schema the schema of the resulting batch + // @a fillers helper functions to fill the given buffer. + EmptyFragment(size_t rows, + arrow::compute::Expression partition_expression, + std::shared_ptr physical_schema) + : Fragment(std::move(partition_expression), physical_schema) + { + } + + // Scanner function which returns a batch where the space is not actually used. + arrow::Result ScanBatchesAsync( + const std::shared_ptr& options) override; + + private: + /// The pointer to each allocation is an incremental number, indexing a collection to track + /// the size of each allocation. + std::shared_ptr GetPlaceholderForOp(size_t size) + { + mSizes.push_back(size); + return std::make_shared((uint8_t*)(mSizes.size() - 1), size); + } + std::vector mSizes; + size_t mRows; +}; + +/// An OutputStream which does the reading of the input buffers directly +/// on writing, if needed. Each deferred operation is encoded in the source +/// buffer by an incremental number which can be used to lookup in the @a ops +/// vector the operation to perform. +class PreallocatedOutputStream : public arrow::io::OutputStream +{ + public: + explicit PreallocatedOutputStream(std::vector& sizes, + const std::shared_ptr& buffer); + + /// \brief Create in-memory output stream with indicated capacity using a + /// memory pool + /// \param[in] initial_capacity the initial allocated internal capacity of + /// the OutputStream + /// \param[in,out] pool a MemoryPool to use for allocations + /// \return the created stream + static arrow::Result> Create( + std::vector& sizes, + int64_t initial_capacity = 4096, + arrow::MemoryPool* pool = arrow::default_memory_pool()); + + // By the time we call the destructor, the contents + // of the buffer are already moved to fairmq + // for being sent. + ~PreallocatedOutputStream() override = default; + + // Implement the OutputStream interface + + /// Close the stream, preserving the buffer (retrieve it with Finish()). + arrow::Status Close() override; + [[nodiscard]] bool closed() const override; + [[nodiscard]] arrow::Result Tell() const override; + arrow::Status Write(const void* data, int64_t nbytes) override; + + /// \cond FALSE + using OutputStream::Write; + /// \endcond + + /// Close the stream and return the buffer + arrow::Result> Finish(); + + /// \brief Initialize state of OutputStream with newly allocated memory and + /// set position to 0 + /// \param[in] initial_capacity the starting allocated capacity + /// \param[in,out] pool the memory pool to use for allocations + /// \return Status + arrow::Status Reset(std::vector sizes, + int64_t initial_capacity, arrow::MemoryPool* pool); + + [[nodiscard]] int64_t capacity() const { return capacity_; } + + private: + std::vector sizes_; + PreallocatedOutputStream(); + + // Ensures there is sufficient space available to write nbytes + arrow::Status Reserve(int64_t nbytes); + + std::shared_ptr buffer_; + bool is_open_; + int64_t capacity_; + int64_t position_; + uint8_t* mutable_data_; +}; +} // namespace o2::framework + +#endif diff --git a/Framework/Core/src/EmptyFragment.cxx b/Framework/Core/src/EmptyFragment.cxx new file mode 100644 index 0000000000000..588f605fb429e --- /dev/null +++ b/Framework/Core/src/EmptyFragment.cxx @@ -0,0 +1,151 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. +#include "Framework/EmptyFragment.h" +#include +#include +#include +#include + +static constexpr int64_t kBufferMinimumSize = 256; + +namespace o2::framework +{ + +// Scanner function which returns a batch where the space is not actually used. +arrow::Result EmptyFragment::ScanBatchesAsync( + const std::shared_ptr& options) +{ + auto generator = [this]() -> arrow::Future> { + std::vector> columns; + columns.reserve(this->physical_schema_->fields().size()); + + for (auto& field : this->physical_schema_->fields()) { + if (auto listType = std::dynamic_pointer_cast(field->type())) { + size_t size = mRows * listType->list_size(); + if (field->type()->field(0)->type()->byte_width() == 0) { + size /= 8; + } else { + size *= field->type()->field(0)->type()->byte_width(); + } + auto varray = std::make_shared(field->type()->field(0)->type(), mRows * listType->list_size(), GetPlaceholderForOp(size)); + columns.push_back(std::make_shared(field->type(), (int32_t)mRows, varray)); + } else { + size_t size = mRows; + if (field->type()->byte_width() == 0) { + size /= 8; + } else { + size *= field->type()->byte_width(); + } + columns.push_back(std::make_shared(field->type(), mRows, GetPlaceholderForOp(size))); + } + } + return arrow::RecordBatch::Make(physical_schema_, mRows, columns); + }; + return generator; +} + +PreallocatedOutputStream::PreallocatedOutputStream() + : is_open_(false), capacity_(0), position_(0), mutable_data_(nullptr) {} + +PreallocatedOutputStream::PreallocatedOutputStream(std::vector& sizes, + const std::shared_ptr& buffer) + : sizes_(sizes), + buffer_(buffer), + is_open_(true), + capacity_(buffer->size()), + position_(0), + mutable_data_(buffer->mutable_data()) {} + +arrow::Result> PreallocatedOutputStream::Create( + std::vector& ops, + int64_t initial_capacity, arrow::MemoryPool* pool) +{ + // ctor is private, so cannot use make_shared + auto ptr = std::shared_ptr(new PreallocatedOutputStream); + RETURN_NOT_OK(ptr->Reset(ops, initial_capacity, pool)); + return ptr; +} + +arrow::Status PreallocatedOutputStream::Reset(std::vector sizes, + int64_t initial_capacity, arrow::MemoryPool* pool) +{ + ARROW_ASSIGN_OR_RAISE(buffer_, AllocateResizableBuffer(initial_capacity, pool)); + sizes_ = sizes; + is_open_ = true; + capacity_ = initial_capacity; + position_ = 0; + mutable_data_ = buffer_->mutable_data(); + return arrow::Status::OK(); +} + +arrow::Status PreallocatedOutputStream::Close() +{ + if (is_open_) { + is_open_ = false; + if (position_ < capacity_) { + RETURN_NOT_OK(buffer_->Resize(position_, false)); + } + } + return arrow::Status::OK(); +} + +bool PreallocatedOutputStream::closed() const { return !is_open_; } + +arrow::Result> PreallocatedOutputStream::Finish() +{ + RETURN_NOT_OK(Close()); + buffer_->ZeroPadding(); + is_open_ = false; + return std::move(buffer_); +} + +arrow::Result PreallocatedOutputStream::Tell() const { return position_; } + +arrow::Status PreallocatedOutputStream::Write(const void* data, int64_t nbytes) +{ + if (ARROW_PREDICT_FALSE(!is_open_)) { + return arrow::Status::IOError("OutputStream is closed"); + } + if (ARROW_PREDICT_TRUE(nbytes == 0)) { + return arrow::Status::OK(); + } + if (ARROW_PREDICT_FALSE(position_ + nbytes >= capacity_)) { + RETURN_NOT_OK(Reserve(nbytes)); + } + // This is a real address which needs to be copied. Do it! + auto ref = (int64_t)data; + if (ref >= sizes_.size()) { + memcpy(mutable_data_ + position_, data, nbytes); + position_ += nbytes; + return arrow::Status::OK(); + } + + position_ += nbytes; + return arrow::Status::OK(); +} + +arrow::Status PreallocatedOutputStream::Reserve(int64_t nbytes) +{ + // Always overallocate by doubling. It seems that it is a better growth + // strategy, at least for memory_benchmark.cc. + // This may be because it helps match the allocator's allocation buckets + // more exactly. Or perhaps it hits a sweet spot in jemalloc. + int64_t new_capacity = std::max(kBufferMinimumSize, capacity_); + new_capacity = position_ + nbytes; + if (new_capacity > capacity_) { + RETURN_NOT_OK(buffer_->Resize(new_capacity)); + capacity_ = new_capacity; + mutable_data_ = buffer_->mutable_data(); + } + return arrow::Status::OK(); +} + +} // namespace o2::framework