diff --git a/Framework/AnalysisSupport/src/RNTuplePlugin.cxx b/Framework/AnalysisSupport/src/RNTuplePlugin.cxx index f66723419c24e..51b585d0714bb 100644 --- a/Framework/AnalysisSupport/src/RNTuplePlugin.cxx +++ b/Framework/AnalysisSupport/src/RNTuplePlugin.cxx @@ -21,6 +21,7 @@ #include #include #include +#include #include #include @@ -51,10 +52,6 @@ class RNTupleFileSystem : public VirtualRootFileSystemBase public: ~RNTupleFileSystem() override; - std::shared_ptr GetSubFilesystem(arrow::dataset::FileSource source) override - { - return std::dynamic_pointer_cast(shared_from_this()); - }; virtual ROOT::Experimental::RNTuple* GetRNTuple(arrow::dataset::FileSource source) = 0; }; @@ -100,9 +97,28 @@ class RNTupleFileFragment : public arrow::dataset::FileFragment std::shared_ptr format, arrow::compute::Expression partition_expression, std::shared_ptr physical_schema) - : FileFragment(std::move(source), std::move(format), std::move(partition_expression), std::move(physical_schema)) + : FileFragment(source, format, partition_expression, physical_schema) { + auto fs = std::dynamic_pointer_cast(source.filesystem()); + if (!fs.get()) { + throw runtime_error_f("Do not know how to extract %s from %s", source.path().c_str(), fs->type_name().c_str()); + } + auto handler = fs->GetObjectHandler(source); + if (!handler->format->Equals(*format)) { + throw runtime_error_f("Format for %s does not match. Found %s, expected %s.", source.path().c_str(), + handler->format->type_name().c_str(), + format->type_name().c_str()); + } + mNTuple = handler->GetObjectAsOwner(); } + + ROOT::Experimental::RNTuple* GetRNTuple() + { + return mNTuple.get(); + } + + private: + std::unique_ptr mNTuple; }; class RNTupleFileFormat : public arrow::dataset::FileFormat @@ -133,11 +149,10 @@ class RNTupleFileFormat : public arrow::dataset::FileFormat arrow::Result IsSupported(const arrow::dataset::FileSource& source) const override { auto fs = std::dynamic_pointer_cast(source.filesystem()); - auto subFs = fs->GetSubFilesystem(source); - if (std::dynamic_pointer_cast(subFs)) { - return true; + if (!fs) { + return false; } - return false; + return fs->CheckSupport(source); } arrow::Result> Inspect(const arrow::dataset::FileSource& source) const override; @@ -493,11 +508,12 @@ arrow::Result> RNTupleFileFormat::Inspect(const a auto fs = std::dynamic_pointer_cast(source.filesystem()); // Actually get the TTree from the ROOT file. - auto ntupleFs = std::dynamic_pointer_cast(fs->GetSubFilesystem(source)); - if (!ntupleFs.get()) { - throw runtime_error_f("Unknown filesystem %s\n", source.filesystem()->type_name().c_str()); + auto objectHandler = fs->GetObjectHandler(source); + if (objectHandler->format->type_name() != this->type_name()) { + throw runtime_error_f("Unexpected kind of filesystem %s to handle payload %s.\n", source.filesystem()->type_name().c_str(), source.path().c_str()); } - ROOT::Experimental::RNTuple* rntuple = ntupleFs->GetRNTuple(source); + // We know this is a RNTuple, so we can continue with the inspection. + auto rntuple = objectHandler->GetObjectAsOwner().release(); auto inspector = ROOT::Experimental::RNTupleInspector::Create(rntuple); @@ -526,11 +542,8 @@ arrow::Result RNTupleFileFormat::ScanBatchesAsync( std::vector> columns; std::vector> fields = dataset_schema->fields(); - auto containerFS = std::dynamic_pointer_cast(ntupleFragment->source().filesystem()); - auto fs = std::dynamic_pointer_cast(containerFS->GetSubFilesystem(ntupleFragment->source())); - int64_t rows = -1; - ROOT::Experimental::RNTuple* rntuple = fs->GetRNTuple(ntupleFragment->source()); + ROOT::Experimental::RNTuple* rntuple = ntupleFragment->GetRNTuple(); auto reader = ROOT::Experimental::RNTupleReader::Open(rntuple); auto& model = reader->GetModel(); for (auto& physicalField : fields) { @@ -670,7 +683,7 @@ arrow::Result RNTupleFileFormat::ScanBatchesAsync( if (!result.ok()) { throw runtime_error("Cannot allocate offset buffer"); } - arrowOffsetBuffer = std::move(result).ValueUnsafe(); + arrowOffsetBuffer = result.MoveValueUnsafe(); // Offset bulk auto offsetBulk = model.CreateBulk(physicalField->name()); @@ -692,7 +705,7 @@ arrow::Result RNTupleFileFormat::ScanBatchesAsync( if (!result.ok()) { throw runtime_error("Cannot allocate values buffer"); } - arrowValuesBuffer = std::move(result).ValueUnsafe(); + arrowValuesBuffer = result.MoveValueUnsafe(); ptr = (uint8_t*)(arrowValuesBuffer->mutable_data()); // Calculate the size of the buffer here. for (size_t i = 0; i < total; i++) { @@ -811,9 +824,9 @@ arrow::Result> RNTupleFileFormat:: { std::shared_ptr format = std::make_shared(mTotCompressedSize, mTotUncompressedSize); - auto fragment = std::make_shared(std::move(source), std::move(format), - std::move(partition_expression), - std::move(physical_schema)); + auto fragment = std::make_shared(source, format, + partition_expression, + physical_schema); return std::dynamic_pointer_cast(fragment); } @@ -839,9 +852,6 @@ struct RNTupleObjectReadingImplementation : public RootArrowFactoryPlugin { return new RootArrowFactory{ .options = [context]() { return context->format->DefaultWriteOptions(); }, .format = [context]() { return context->format; }, - .getSubFilesystem = [](void* handle) { - auto rntuple = (ROOT::Experimental::RNTuple*)handle; - return std::shared_ptr(new SingleRNTupleFileSystem(rntuple)); }, }; } }; diff --git a/Framework/AnalysisSupport/src/TTreePlugin.cxx b/Framework/AnalysisSupport/src/TTreePlugin.cxx index abc08526815cc..4b130a2144253 100644 --- a/Framework/AnalysisSupport/src/TTreePlugin.cxx +++ b/Framework/AnalysisSupport/src/TTreePlugin.cxx @@ -15,6 +15,7 @@ #include "Framework/Endian.h" #include #include +#include #include #include #include @@ -26,7 +27,6 @@ #include #include #include -#include O2_DECLARE_DYNAMIC_LOG(root_arrow_fs); @@ -48,11 +48,6 @@ class TTreeFileSystem : public VirtualRootFileSystemBase public: ~TTreeFileSystem() override; - std::shared_ptr GetSubFilesystem(arrow::dataset::FileSource source) override - { - return std::dynamic_pointer_cast(shared_from_this()); - }; - arrow::Result> OpenOutputStream( const std::string& path, const std::shared_ptr& metadata) override; @@ -60,6 +55,55 @@ class TTreeFileSystem : public VirtualRootFileSystemBase virtual std::unique_ptr& GetTree(arrow::dataset::FileSource source) = 0; }; +class TTreeFileFormat : public arrow::dataset::FileFormat +{ + size_t& mTotCompressedSize; + size_t& mTotUncompressedSize; + + public: + TTreeFileFormat(size_t& totalCompressedSize, size_t& totalUncompressedSize) + : FileFormat({}), + mTotCompressedSize(totalCompressedSize), + mTotUncompressedSize(totalUncompressedSize) + { + } + + ~TTreeFileFormat() override = default; + + std::string type_name() const override + { + return "ttree"; + } + + bool Equals(const FileFormat& other) const override + { + return other.type_name() == this->type_name(); + } + + arrow::Result IsSupported(const arrow::dataset::FileSource& source) const override + { + auto fs = std::dynamic_pointer_cast(source.filesystem()); + if (!fs) { + return false; + } + return fs->CheckSupport(source); + } + + arrow::Result> Inspect(const arrow::dataset::FileSource& source) const override; + /// \brief Create a FileFragment for a FileSource. + arrow::Result> MakeFragment( + arrow::dataset::FileSource source, arrow::compute::Expression partition_expression, + std::shared_ptr physical_schema) override; + + arrow::Result> MakeWriter(std::shared_ptr destination, std::shared_ptr schema, std::shared_ptr options, arrow::fs::FileLocator destination_locator) const override; + + std::shared_ptr DefaultWriteOptions() override; + + arrow::Result ScanBatchesAsync( + const std::shared_ptr& options, + const std::shared_ptr& fragment) const override; +}; + class SingleTreeFileSystem : public TTreeFileSystem { public: @@ -76,6 +120,11 @@ class SingleTreeFileSystem : public TTreeFileSystem return "ttree"; } + std::shared_ptr GetObjectHandler(arrow::dataset::FileSource source) override + { + return std::make_shared((void*)mTree.get(), std::make_shared(mTotCompressedSize, mTotUncompressedSize)); + } + std::unique_ptr& GetTree(arrow::dataset::FileSource) override { // Simply return the only TTree we have @@ -83,6 +132,8 @@ class SingleTreeFileSystem : public TTreeFileSystem } private: + size_t mTotUncompressedSize; + size_t mTotCompressedSize; std::unique_ptr mTree; }; @@ -103,66 +154,28 @@ class TTreeFileFragment : public arrow::dataset::FileFragment std::shared_ptr format, arrow::compute::Expression partition_expression, std::shared_ptr physical_schema) - : FileFragment(std::move(source), std::move(format), std::move(partition_expression), std::move(physical_schema)) - { - } - - std::unique_ptr& GetTree() + : FileFragment(source, format, std::move(partition_expression), physical_schema) { - auto topFs = std::dynamic_pointer_cast(source().filesystem()); - auto treeFs = std::dynamic_pointer_cast(topFs->GetSubFilesystem(source())); - return treeFs->GetTree(source()); - } -}; - -class TTreeFileFormat : public arrow::dataset::FileFormat -{ - size_t& mTotCompressedSize; - size_t& mTotUncompressedSize; - - public: - TTreeFileFormat(size_t& totalCompressedSize, size_t& totalUncompressedSize) - : FileFormat({}), - mTotCompressedSize(totalCompressedSize), - mTotUncompressedSize(totalUncompressedSize) - { - } - - ~TTreeFileFormat() override = default; - - std::string type_name() const override - { - return "ttree"; - } - - bool Equals(const FileFormat& other) const override - { - return other.type_name() == this->type_name(); + auto rootFS = std::dynamic_pointer_cast(this->source().filesystem()); + if (rootFS.get() == nullptr) { + throw runtime_error_f("Unknown filesystem %s when reading %s.", + source.filesystem()->type_name().c_str(), source.path().c_str()); + } + auto objectHandler = rootFS->GetObjectHandler(source); + if (!objectHandler->format->Equals(*format)) { + throw runtime_error_f("Cannot read source %s with format %s to pupulate a TTreeFileFragment.", + source.path().c_str(), objectHandler->format->type_name().c_str()); + }; + mTree = objectHandler->GetObjectAsOwner(); } - arrow::Result IsSupported(const arrow::dataset::FileSource& source) const override + TTree* GetTree() { - auto fs = std::dynamic_pointer_cast(source.filesystem()); - auto subFs = fs->GetSubFilesystem(source); - if (std::dynamic_pointer_cast(subFs)) { - return true; - } - return false; + return mTree.get(); } - arrow::Result> Inspect(const arrow::dataset::FileSource& source) const override; - /// \brief Create a FileFragment for a FileSource. - arrow::Result> MakeFragment( - arrow::dataset::FileSource source, arrow::compute::Expression partition_expression, - std::shared_ptr physical_schema) override; - - arrow::Result> MakeWriter(std::shared_ptr destination, std::shared_ptr schema, std::shared_ptr options, arrow::fs::FileLocator destination_locator) const override; - - std::shared_ptr DefaultWriteOptions() override; - - arrow::Result ScanBatchesAsync( - const std::shared_ptr& options, - const std::shared_ptr& fragment) const override; + private: + std::unique_ptr mTree; }; // An arrow outputstream which allows to write to a TTree. Eventually @@ -250,9 +263,6 @@ struct TTreeObjectReadingImplementation : public RootArrowFactoryPlugin { return new RootArrowFactory{ .options = [context]() { return context->format->DefaultWriteOptions(); }, .format = [context]() { return context->format; }, - .getSubFilesystem = [](void* handle) { - auto tree = (TTree*)handle; - return std::shared_ptr(new SingleTreeFileSystem(tree)); }, }; } }; @@ -269,16 +279,16 @@ arrow::Result TTreeFileFormat::ScanBatchesAsync( { // This is the schema we want to read auto dataset_schema = options->dataset_schema; + auto treeFragment = std::dynamic_pointer_cast(fragment); + if (treeFragment.get() == nullptr) { + return {arrow::Status::NotImplemented("Not a ttree fragment")}; + } - auto generator = [pool = options->pool, fragment, dataset_schema, &totalCompressedSize = mTotCompressedSize, + auto generator = [pool = options->pool, treeFragment, dataset_schema, &totalCompressedSize = mTotCompressedSize, &totalUncompressedSize = mTotUncompressedSize]() -> arrow::Future> { std::vector> columns; std::vector> fields = dataset_schema->fields(); - auto physical_schema = *fragment->ReadPhysicalSchema(); - - auto fs = std::dynamic_pointer_cast(fragment->source().filesystem()); - // Actually get the TTree from the ROOT file. - auto treeFs = std::dynamic_pointer_cast(fs->GetSubFilesystem(fragment->source())); + auto physical_schema = *treeFragment->ReadPhysicalSchema(); if (dataset_schema->num_fields() > physical_schema->num_fields()) { throw runtime_error_f("One TTree must have all the fields requested in a table"); @@ -301,7 +311,7 @@ arrow::Result TTreeFileFormat::ScanBatchesAsync( } } - auto& tree = treeFs->GetTree(fragment->source()); + auto* tree = treeFragment->GetTree(); tree->SetCacheSize(25000000); auto branches = tree->GetListOfBranches(); for (auto& mapping : mappings) { @@ -586,12 +596,19 @@ struct RootTransientIndexType : arrow::ExtensionType { arrow::Result> TTreeFileFormat::Inspect(const arrow::dataset::FileSource& source) const { auto fs = std::dynamic_pointer_cast(source.filesystem()); - // Actually get the TTree from the ROOT file. - auto treeFs = std::dynamic_pointer_cast(fs->GetSubFilesystem(source)); - if (!treeFs.get()) { + + if (!fs.get()) { + throw runtime_error_f("Unknown filesystem %s\n", source.filesystem()->type_name().c_str()); + } + auto objectHandler = fs->GetObjectHandler(source); + + if (!objectHandler->format->Equals(*this)) { throw runtime_error_f("Unknown filesystem %s\n", source.filesystem()->type_name().c_str()); } - auto& tree = treeFs->GetTree(source); + + // Notice that we abuse of the API here and do not release the TTree, + // so that it's still managed by ROOT. + auto tree = objectHandler->GetObjectAsOwner().release(); auto branches = tree->GetListOfBranches(); auto n = branches->GetEntries(); @@ -636,10 +653,9 @@ arrow::Result> TTreeFileFormat::Ma std::shared_ptr physical_schema) { - auto fragment = std::make_shared(std::move(source), std::dynamic_pointer_cast(shared_from_this()), - std::move(partition_expression), - std::move(physical_schema)); - return std::dynamic_pointer_cast(fragment); + return std::make_shared(source, std::dynamic_pointer_cast(shared_from_this()), + std::move(partition_expression), + physical_schema); } class TTreeFileWriter : public arrow::dataset::FileWriter diff --git a/Framework/Core/include/Framework/RootArrowFilesystem.h b/Framework/Core/include/Framework/RootArrowFilesystem.h index feab713b445fe..441b43aeca331 100644 --- a/Framework/Core/include/Framework/RootArrowFilesystem.h +++ b/Framework/Core/include/Framework/RootArrowFilesystem.h @@ -12,11 +12,13 @@ #define O2_FRAMEWORK_ROOT_ARROW_FILESYSTEM_H_ #include +#include #include #include #include #include #include +#include class TFile; class TBufferFile; @@ -25,6 +27,27 @@ class TDirectoryFile; namespace o2::framework { +struct RootObjectHandler { + RootObjectHandler(void* p, std::shared_ptr f) + : payload(p), format(std::move(f)) + { + } + + ~RootObjectHandler() noexcept(false); + + template + std::unique_ptr GetObjectAsOwner() + { + auto* p = payload; + payload = nullptr; + return std::unique_ptr((T*)p); + } + std::shared_ptr format; + + private: + void* payload = nullptr; +}; + // This is to avoid having to implement a bunch of unimplemented methods // for all the possible virtual filesystem we can invent on top of ROOT // data structures. @@ -40,7 +63,8 @@ class VirtualRootFileSystemBase : public arrow::fs::FileSystem return this->type_name() == other.type_name(); } - virtual std::shared_ptr GetSubFilesystem(arrow::dataset::FileSource source) = 0; + virtual std::shared_ptr GetObjectHandler(arrow::dataset::FileSource source) = 0; + virtual bool CheckSupport(arrow::dataset::FileSource source) = 0; arrow::Status CreateDir(const std::string& path, bool recursive) override; @@ -72,7 +96,6 @@ class VirtualRootFileSystemBase : public arrow::fs::FileSystem struct RootArrowFactory final { std::function()> options = nullptr; std::function()> format = nullptr; - std::function(void*)> getSubFilesystem = nullptr; }; struct RootArrowFactoryPlugin { @@ -92,9 +115,10 @@ struct RootObjectReadingCapability { // Use a void * in order not to expose the kind of object to the // generic reading code. This is also where we load the plugin // which will be used for the actual creation. - std::function getHandle; - // Same as the above, but uses a TBufferFile as storage - std::function getBufferHandle; + std::function fs, std::string const& path)> getHandle; + // Wether or not this actually supports reading an object of the following class + std::function checkSupport; + // This must be implemented to load the actual RootArrowFactory plugin which // implements this capability. This way the detection of the file format // (via get handle) does not need to know about the actual code which performs @@ -125,7 +149,9 @@ class TFileFileSystem : public VirtualRootFileSystemBase return "TDirectoryFile"; } - std::shared_ptr GetSubFilesystem(arrow::dataset::FileSource source) override; + std::shared_ptr GetObjectHandler(arrow::dataset::FileSource source) override; + bool CheckSupport(arrow::dataset::FileSource source) override; + virtual std::shared_ptr GetSubFilesystem(arrow::dataset::FileSource source); arrow::Result> OpenOutputStream( const std::string& path, @@ -153,7 +179,12 @@ class TBufferFileFS : public VirtualRootFileSystemBase return "tbufferfile"; } - std::shared_ptr GetSubFilesystem(arrow::dataset::FileSource source) override; + bool CheckSupport(arrow::dataset::FileSource source) override; + std::shared_ptr GetObjectHandler(arrow::dataset::FileSource source) override; + TBufferFile* GetBuffer() + { + return mBuffer; + } private: TBufferFile* mBuffer; diff --git a/Framework/Core/src/Plugin.cxx b/Framework/Core/src/Plugin.cxx index 568908426c143..13b67e2a781ba 100644 --- a/Framework/Core/src/Plugin.cxx +++ b/Framework/Core/src/Plugin.cxx @@ -17,10 +17,14 @@ #include "Framework/Signpost.h" #include "Framework/VariantJSONHelpers.h" #include "Framework/PluginManager.h" +#include #include #include #include +#include +#include #include +#include #include O2_DECLARE_DYNAMIC_LOG(capabilities); @@ -177,14 +181,24 @@ struct ImplementationContext { std::vector implementations; }; -std::function getHandleByClass(char const* classname) +std::function, std::string const&)> getHandleByClass(char const* classname) { - return [c = TClass::GetClass(classname)](TDirectoryFile* file, std::string const& path) { return file->GetObjectChecked(path.c_str(), c); }; + return [c = TClass::GetClass(classname)](std::shared_ptr fs, std::string const& path) -> void* { + if (auto tfileFS = std::dynamic_pointer_cast(fs)) { + return tfileFS->GetFile()->GetObjectChecked(path.c_str(), c); + } else if (auto tbufferFS = std::dynamic_pointer_cast(fs)) { + tbufferFS->GetBuffer()->Reset(); + return tbufferFS->GetBuffer()->ReadObjectAny(c); + } + return nullptr; + }; } -std::function getBufferHandleByClass(char const* classname) +std::function matchClassByName(std::string_view classname) { - return [c = TClass::GetClass(classname)](TBufferFile* buffer, std::string const& path) { buffer->Reset(); return buffer->ReadObjectAny(c); }; + return [c = classname](char const* attempt) -> bool { + return c == attempt; + }; } void lazyLoadFactory(std::vector& implementations, char const* specs) @@ -218,7 +232,7 @@ struct RNTupleObjectReadingCapability : o2::framework::RootObjectReadingCapabili return "/" + s; } }, .getHandle = getHandleByClass("ROOT::Experimental::RNTuple"), - .getBufferHandle = getBufferHandleByClass("ROOT::Experimental::RNTuple"), + .checkSupport = matchClassByName("ROOT::Experimental::RNTuple"), .factory = [context]() -> RootArrowFactory& { lazyLoadFactory(context->implementations, "O2FrameworkAnalysisRNTupleSupport:RNTupleObjectReadingImplementation"); return context->implementations.back(); @@ -235,7 +249,7 @@ struct TTreeObjectReadingCapability : o2::framework::RootObjectReadingCapability .name = "ttree", .lfn2objectPath = [](std::string s) { return s; }, .getHandle = getHandleByClass("TTree"), - .getBufferHandle = getBufferHandleByClass("TTree"), + .checkSupport = matchClassByName("TTree"), .factory = [context]() -> RootArrowFactory& { lazyLoadFactory(context->implementations, "O2FrameworkAnalysisTTreeSupport:TTreeObjectReadingImplementation"); return context->implementations.back(); diff --git a/Framework/Core/src/RootArrowFilesystem.cxx b/Framework/Core/src/RootArrowFilesystem.cxx index 4a1286515508c..c563866e802bb 100644 --- a/Framework/Core/src/RootArrowFilesystem.cxx +++ b/Framework/Core/src/RootArrowFilesystem.cxx @@ -25,6 +25,7 @@ #include #include #include +#include template class std::shared_ptr; @@ -41,22 +42,40 @@ TFileFileSystem::TFileFileSystem(TDirectoryFile* f, size_t readahead, RootObject ((TFile*)mFile)->SetReadaheadSize(50 * 1024 * 1024); } -std::shared_ptr TFileFileSystem::GetSubFilesystem(arrow::dataset::FileSource source) +std::shared_ptr TFileFileSystem::GetObjectHandler(arrow::dataset::FileSource source) { // We use a plugin to create the actual objects inside the // file, so that we can support TTree and RNTuple at the same time // without having to depend on both. for (auto& capability : mObjectFactory.capabilities) { auto objectPath = capability.lfn2objectPath(source.path()); - void* handle = capability.getHandle(mFile, objectPath); + void* handle = capability.getHandle(shared_from_this(), objectPath); if (!handle) { continue; } + return std::make_shared(handle, capability.factory().format()); + } + throw runtime_error_f("Unable to get handler for object %s", source.path().c_str()); +} + +bool TFileFileSystem::CheckSupport(arrow::dataset::FileSource source) +{ + // We use a plugin to create the actual objects inside the + // file, so that we can support TTree and RNTuple at the same time + // without having to depend on both. + for (auto& capability : mObjectFactory.capabilities) { + auto objectPath = capability.lfn2objectPath(source.path()); + + void* handle = capability.getHandle(shared_from_this(), objectPath); if (handle) { - return capability.factory().getSubFilesystem(handle); + return true; } } + return false; +} +std::shared_ptr TFileFileSystem::GetSubFilesystem(arrow::dataset::FileSource source) +{ auto directory = (TDirectoryFile*)mFile->GetObjectChecked(source.path().c_str(), TClass::GetClass()); if (directory) { return std::shared_ptr(new TFileFileSystem(directory, 50 * 1024 * 1024, mObjectFactory)); @@ -233,19 +252,53 @@ arrow::Result TBufferFileFS::GetFileInfo(const std::string& return result; } -std::shared_ptr TBufferFileFS::GetSubFilesystem(arrow::dataset::FileSource source) +bool TBufferFileFS::CheckSupport(arrow::dataset::FileSource source) { // We use a plugin to create the actual objects inside the // file, so that we can support TTree and RNTuple at the same time // without having to depend on both. for (auto& capability : mObjectFactory.capabilities) { + auto objectPath = capability.lfn2objectPath(source.path()); - void* handle = capability.getBufferHandle(mBuffer, source.path()); - if (handle) { - mFilesystem = capability.factory().getSubFilesystem(handle); - break; + mBuffer->SetBufferOffset(0); + mBuffer->InitMap(); + TClass* serializedClass = mBuffer->ReadClass(); + mBuffer->SetBufferOffset(0); + mBuffer->ResetMap(); + mBuffer->Reset(); + if (!serializedClass) { + continue; + } + + bool supports = capability.checkSupport(serializedClass->GetName()); + if (supports) { + return true; + } + } + return false; +} + +std::shared_ptr TBufferFileFS::GetObjectHandler(arrow::dataset::FileSource source) +{ + // We use a plugin to create the actual objects inside the + // file, so that we can support TTree and RNTuple at the same time + // without having to depend on both. + for (auto& capability : mObjectFactory.capabilities) { + auto objectPath = capability.lfn2objectPath(source.path()); + void* handle = capability.getHandle(shared_from_this(), objectPath); + if (!handle) { + continue; } + return std::make_shared(handle, capability.factory().format()); } - return mFilesystem; + throw runtime_error_f("Unable to get handler for object %s", source.path().c_str()); } + +RootObjectHandler::~RootObjectHandler() noexcept(false) +{ + if (payload) { + throw runtime_error_f("Payload not owned"); + } +} + } // namespace o2::framework diff --git a/Framework/Core/test/test_Root2ArrowTable.cxx b/Framework/Core/test/test_Root2ArrowTable.cxx index ebc854d1d6dc0..438f388ec86b5 100644 --- a/Framework/Core/test/test_Root2ArrowTable.cxx +++ b/Framework/Core/test/test_Root2ArrowTable.cxx @@ -565,12 +565,23 @@ TEST_CASE("RootTree2Dataset") { REQUIRE(success.ok()); // Let's read it back... + auto tfileFs = std::dynamic_pointer_cast(outFs); + REQUIRE(tfileFs.get()); + REQUIRE(tfileFs->GetFile()); + REQUIRE(tfileFs->GetFile()->GetObjectChecked("/DF_3", TClass::GetClass("TTree"))); arrow::dataset::FileSource source2("/DF_3", outFs); - auto newTreeFS = outFs->GetSubFilesystem(source2); - REQUIRE(format->IsSupported(source) == true); - - auto schemaOptWritten = format->Inspect(source); + REQUIRE(format->IsSupported(source2) == true); + tfileFs = std::dynamic_pointer_cast(source2.filesystem()); + REQUIRE(tfileFs.get()); + REQUIRE(tfileFs->GetFile()); + REQUIRE(tfileFs->GetFile()->GetObjectChecked("/DF_3", TClass::GetClass("TTree"))); + + auto schemaOptWritten = format->Inspect(source2); + tfileFs = std::dynamic_pointer_cast(source2.filesystem()); + REQUIRE(tfileFs.get()); + REQUIRE(tfileFs->GetFile()); + REQUIRE(tfileFs->GetFile()->GetObjectChecked("/DF_3", TClass::GetClass("TTree"))); REQUIRE(schemaOptWritten.ok()); auto schemaWritten = *schemaOptWritten; @@ -585,7 +596,7 @@ TEST_CASE("RootTree2Dataset") std::shared_ptr schema = std::make_shared(fields); REQUIRE(validateSchema(schema)); - auto fragmentWritten = format->MakeFragment(source, {}, *physicalSchema); + auto fragmentWritten = format->MakeFragment(source2, {}, *physicalSchema); REQUIRE(fragmentWritten.ok()); auto optionsWritten = std::make_shared(); options->dataset_schema = schema; @@ -610,7 +621,6 @@ TEST_CASE("RootTree2Dataset") // And now we can read back the RNTuple into a RecordBatch arrow::dataset::FileSource writtenRntupleSource("/rntuple", outFs); - auto newRNTupleFS = outFs->GetSubFilesystem(writtenRntupleSource); REQUIRE(rNtupleFormat->IsSupported(writtenRntupleSource) == true);