diff --git a/Framework/Core/include/Framework/FairMQResizableBuffer.h b/Framework/Core/include/Framework/FairMQResizableBuffer.h index cdf2a22a2a56b..fc86d8d5dd753 100644 --- a/Framework/Core/include/Framework/FairMQResizableBuffer.h +++ b/Framework/Core/include/Framework/FairMQResizableBuffer.h @@ -24,13 +24,10 @@ namespace o2::framework { -using namespace arrow; -using namespace arrow::io; - -class FairMQOutputStream : public OutputStream +class FairMQOutputStream : public arrow::io::OutputStream { public: - explicit FairMQOutputStream(const std::shared_ptr& buffer); + explicit FairMQOutputStream(const std::shared_ptr& buffer); /// \brief Create in-memory output stream with indicated capacity using a /// memory pool @@ -38,8 +35,8 @@ class FairMQOutputStream : public OutputStream /// the OutputStream /// \param[in,out] pool a MemoryPool to use for allocations /// \return the created stream - static Result> Create( - int64_t initial_capacity = 4096, MemoryPool* pool = default_memory_pool()); + static arrow::Result> Create( + int64_t initial_capacity = 4096, arrow::MemoryPool* pool = arrow::default_memory_pool()); // By the time we call the destructor, the contents // of the buffer are already moved to fairmq @@ -49,24 +46,24 @@ class FairMQOutputStream : public OutputStream // Implement the OutputStream interface /// Close the stream, preserving the buffer (retrieve it with Finish()). - Status Close() override; + arrow::Status Close() override; [[nodiscard]] bool closed() const override; - [[nodiscard]] Result Tell() const override; - Status Write(const void* data, int64_t nbytes) override; + [[nodiscard]] arrow::Result Tell() const override; + arrow::Status Write(const void* data, int64_t nbytes) override; /// \cond FALSE using OutputStream::Write; /// \endcond /// Close the stream and return the buffer - Result> Finish(); + arrow::Result> Finish(); /// \brief Initialize state of OutputStream with newly allocated memory and /// set position to 0 /// \param[in] initial_capacity the starting allocated capacity /// \param[in,out] pool the memory pool to use for allocations /// \return Status - Status Reset(int64_t initial_capacity = 1024, MemoryPool* pool = default_memory_pool()); + arrow::Status Reset(int64_t initial_capacity = 1024, arrow::MemoryPool* pool = arrow::default_memory_pool()); [[nodiscard]] int64_t capacity() const { return capacity_; } @@ -74,9 +71,9 @@ class FairMQOutputStream : public OutputStream FairMQOutputStream(); // Ensures there is sufficient space available to write nbytes - Status Reserve(int64_t nbytes); + arrow::Status Reserve(int64_t nbytes); - std::shared_ptr buffer_; + std::shared_ptr buffer_; bool is_open_; int64_t capacity_; int64_t position_; diff --git a/Framework/Core/src/FairMQResizableBuffer.cxx b/Framework/Core/src/FairMQResizableBuffer.cxx index 9fe1cc882b6ae..592dfcb4376e9 100644 --- a/Framework/Core/src/FairMQResizableBuffer.cxx +++ b/Framework/Core/src/FairMQResizableBuffer.cxx @@ -16,6 +16,8 @@ #include #include +using arrow::Status; + namespace arrow::io::internal { void CloseFromDestructor(FileInterface* file); @@ -28,15 +30,15 @@ static constexpr int64_t kBufferMinimumSize = 256; FairMQOutputStream::FairMQOutputStream() : is_open_(false), capacity_(0), position_(0), mutable_data_(nullptr) {} -FairMQOutputStream::FairMQOutputStream(const std::shared_ptr& buffer) +FairMQOutputStream::FairMQOutputStream(const std::shared_ptr& buffer) : buffer_(buffer), is_open_(true), capacity_(buffer->size()), position_(0), mutable_data_(buffer->mutable_data()) {} -Result> FairMQOutputStream::Create( - int64_t initial_capacity, MemoryPool* pool) +arrow::Result> FairMQOutputStream::Create( + int64_t initial_capacity, arrow::MemoryPool* pool) { // ctor is private, so cannot use make_shared auto ptr = std::shared_ptr(new FairMQOutputStream); @@ -44,7 +46,7 @@ Result> FairMQOutputStream::Create( return ptr; } -Status FairMQOutputStream::Reset(int64_t initial_capacity, MemoryPool* pool) +Status FairMQOutputStream::Reset(int64_t initial_capacity, arrow::MemoryPool* pool) { ARROW_ASSIGN_OR_RAISE(buffer_, AllocateResizableBuffer(initial_capacity, pool)); is_open_ = true; @@ -67,7 +69,7 @@ Status FairMQOutputStream::Close() bool FairMQOutputStream::closed() const { return !is_open_; } -Result> FairMQOutputStream::Finish() +arrow::Result> FairMQOutputStream::Finish() { RETURN_NOT_OK(Close()); buffer_->ZeroPadding(); @@ -75,7 +77,7 @@ Result> FairMQOutputStream::Finish() return std::move(buffer_); } -Result FairMQOutputStream::Tell() const { return position_; } +arrow::Result FairMQOutputStream::Tell() const { return position_; } Status FairMQOutputStream::Write(const void* data, int64_t nbytes) {