diff --git a/Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx b/Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx index 9c19de85739ce..f8a9705e4eb62 100644 --- a/Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx +++ b/Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx @@ -10,10 +10,12 @@ // or submit itself to any jurisdiction. #include "AODJAlienReaderHelpers.h" +#include #include "Framework/TableTreeHelpers.h" #include "Framework/AnalysisHelpers.h" #include "Framework/DataProcessingStats.h" #include "Framework/RootTableBuilderHelpers.h" +#include "Framework/RootArrowFilesystem.h" #include "Framework/AlgorithmSpec.h" #include "Framework/ConfigParamRegistry.h" #include "Framework/ControlService.h" @@ -41,6 +43,8 @@ #include #include #include +#include +#include using namespace o2; using namespace o2::aod; @@ -272,11 +276,13 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback(ConfigContext const // Origin file name for derived output map auto o2 = Output(TFFileNameHeader); auto fileAndFolder = didir->getFileFolder(dh, fcnt, ntf); - std::string currentFilename(fileAndFolder.file->GetName()); - if (strcmp(fileAndFolder.file->GetEndpointUrl()->GetProtocol(), "file") == 0 && fileAndFolder.file->GetEndpointUrl()->GetFile()[0] != '/') { + auto rootFS = std::dynamic_pointer_cast(fileAndFolder.filesystem()); + auto* f = dynamic_cast(rootFS->GetFile()); + std::string currentFilename(f->GetFile()->GetName()); + if (strcmp(f->GetEndpointUrl()->GetProtocol(), "file") == 0 && f->GetEndpointUrl()->GetFile()[0] != '/') { // This is not an absolute local path. Make it absolute. static std::string pwd = gSystem->pwd() + std::string("/"); - currentFilename = pwd + std::string(fileAndFolder.file->GetName()); + currentFilename = pwd + std::string(f->GetName()); } outputs.make(o2) = currentFilename; } @@ -312,7 +318,9 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback(ConfigContext const auto concrete = DataSpecUtils::asConcreteDataMatcher(firstRoute.matcher); auto dh = header::DataHeader(concrete.description, concrete.origin, concrete.subSpec); auto fileAndFolder = didir->getFileFolder(dh, fcnt, ntf); - if (!fileAndFolder.file) { + + // In case the filesource is empty, move to the next one. + if (fileAndFolder.path().empty()) { fcnt += 1; ntf = 0; if (didir->atEnd(fcnt)) { diff --git a/Framework/AnalysisSupport/src/DataInputDirector.cxx b/Framework/AnalysisSupport/src/DataInputDirector.cxx index 172ecd66c0e64..1daab029b3e8e 100644 --- a/Framework/AnalysisSupport/src/DataInputDirector.cxx +++ b/Framework/AnalysisSupport/src/DataInputDirector.cxx @@ -11,6 +11,8 @@ #include "DataInputDirector.h" #include "Framework/DataDescriptorQueryBuilder.h" #include "Framework/Logger.h" +#include "Framework/PluginManager.h" +#include "Framework/RootArrowFilesystem.h" #include "Framework/AnalysisDataModelHelpers.h" #include "Framework/Output.h" #include "Headers/DataHeader.h" @@ -26,8 +28,12 @@ #include "TGrid.h" #include "TObjString.h" #include "TMap.h" +#include "TFile.h" +#include +#include #include +#include #if __has_include() #include @@ -47,12 +53,27 @@ FileNameHolder* makeFileNameHolder(std::string fileName) return fileNameHolder; } -DataInputDescriptor::DataInputDescriptor(bool alienSupport, int level, o2::monitoring::Monitoring* monitoring, int allowedParentLevel, std::string parentFileReplacement) : mAlienSupport(alienSupport), - mMonitoring(monitoring), - mAllowedParentLevel(allowedParentLevel), - mParentFileReplacement(std::move(parentFileReplacement)), - mLevel(level) +DataInputDescriptor::DataInputDescriptor(bool alienSupport, int level, o2::monitoring::Monitoring* monitoring, int allowedParentLevel, std::string parentFileReplacement) + : mAlienSupport(alienSupport), + mMonitoring(monitoring), + mAllowedParentLevel(allowedParentLevel), + mParentFileReplacement(std::move(parentFileReplacement)), + mLevel(level) { + std::vector capabilitiesSpecs = { + "O2Framework:RNTupleObjectReadingCapability", + "O2Framework:TTreeObjectReadingCapability", + }; + + std::vector plugins; + for (auto spec : capabilitiesSpecs) { + auto morePlugins = PluginManager::parsePluginSpecString(spec); + for (auto& extra : morePlugins) { + plugins.push_back(extra); + } + } + + PluginManager::loadFromPlugin(plugins, mFactory.capabilities); } void DataInputDescriptor::printOut() @@ -108,20 +129,22 @@ bool DataInputDescriptor::setFile(int counter) // open file auto filename = mfilenames[counter]->fileName; - if (mcurrentFile) { - if (mcurrentFile->GetName() == filename) { + auto rootFS = std::dynamic_pointer_cast(mCurrentFilesystem); + if (rootFS.get()) { + if (rootFS->GetFile()->GetName() == filename) { return true; } closeInputFile(); } - mcurrentFile = TFile::Open(filename.c_str()); - if (!mcurrentFile) { + + mCurrentFilesystem = std::make_shared(TFile::Open(filename.c_str()), 50 * 1024 * 1024, mFactory); + if (!mCurrentFilesystem.get()) { throw std::runtime_error(fmt::format("Couldn't open file \"{}\"!", filename)); } - mcurrentFile->SetReadaheadSize(50 * 1024 * 1024); + rootFS = std::dynamic_pointer_cast(mCurrentFilesystem); // get the parent file map if exists - mParentFileMap = (TMap*)mcurrentFile->Get("parentFiles"); // folder name (DF_XXX) --> parent file (absolute path) + mParentFileMap = (TMap*)rootFS->GetFile()->Get("parentFiles"); // folder name (DF_XXX) --> parent file (absolute path) if (mParentFileMap && !mParentFileReplacement.empty()) { auto pos = mParentFileReplacement.find(';'); if (pos == std::string::npos) { @@ -140,16 +163,28 @@ bool DataInputDescriptor::setFile(int counter) // get the directory names if (mfilenames[counter]->numberOfTimeFrames <= 0) { - std::regex TFRegex = std::regex("DF_[0-9]+"); - TList* keyList = mcurrentFile->GetListOfKeys(); + const std::regex TFRegex = std::regex("/?DF_([0-9]+)(|-.*)$"); + TList* keyList = rootFS->GetFile()->GetListOfKeys(); + std::vector finalList; // extract TF numbers and sort accordingly + // We use an extra seen set to make sure we preserve the order in which + // we instert things in the final list and to make sure we do not have duplicates. + // Multiple folder numbers can happen if we use a flat structure /DF_- + std::unordered_set seen; for (auto key : *keyList) { - if (std::regex_match(((TObjString*)key)->GetString().Data(), TFRegex)) { - auto folderNumber = std::stoul(std::string(((TObjString*)key)->GetString().Data()).substr(3)); - mfilenames[counter]->listOfTimeFrameNumbers.emplace_back(folderNumber); + std::smatch matchResult; + std::string keyName = ((TObjString*)key)->GetString().Data(); + bool match = std::regex_match(keyName, matchResult, TFRegex); + if (match) { + auto folderNumber = std::stoul(matchResult[1].str()); + if (seen.find(folderNumber) == seen.end()) { + seen.insert(folderNumber); + mfilenames[counter]->listOfTimeFrameNumbers.emplace_back(folderNumber); + } } } + if (mParentFileMap != nullptr) { // If we have a parent map, we should not process in DF alphabetical order but according to parent file to avoid swapping between files std::sort(mfilenames[counter]->listOfTimeFrameNumbers.begin(), mfilenames[counter]->listOfTimeFrameNumbers.end(), @@ -162,12 +197,8 @@ bool DataInputDescriptor::setFile(int counter) std::sort(mfilenames[counter]->listOfTimeFrameNumbers.begin(), mfilenames[counter]->listOfTimeFrameNumbers.end()); } - for (auto folderNumber : mfilenames[counter]->listOfTimeFrameNumbers) { - auto folderName = "DF_" + std::to_string(folderNumber); - mfilenames[counter]->listOfTimeFrameKeys.emplace_back(folderName); - mfilenames[counter]->alreadyRead.emplace_back(false); - } - mfilenames[counter]->numberOfTimeFrames = mfilenames[counter]->listOfTimeFrameKeys.size(); + mfilenames[counter]->alreadyRead.resize(mfilenames[counter]->alreadyRead.size() + mfilenames[counter]->listOfTimeFrameNumbers.size(), false); + mfilenames[counter]->numberOfTimeFrames = mfilenames[counter]->listOfTimeFrameNumbers.size(); } mCurrentFileID = counter; @@ -193,26 +224,21 @@ uint64_t DataInputDescriptor::getTimeFrameNumber(int counter, int numTF) return (mfilenames[counter]->listOfTimeFrameNumbers)[numTF]; } -FileAndFolder DataInputDescriptor::getFileFolder(int counter, int numTF) +arrow::dataset::FileSource DataInputDescriptor::getFileFolder(int counter, int numTF) { - FileAndFolder fileAndFolder; - // open file if (!setFile(counter)) { - return fileAndFolder; + return {}; } // no TF left if (mfilenames[counter]->numberOfTimeFrames > 0 && numTF >= mfilenames[counter]->numberOfTimeFrames) { - return fileAndFolder; + return {}; } - fileAndFolder.file = mcurrentFile; - fileAndFolder.folderName = (mfilenames[counter]->listOfTimeFrameKeys)[numTF]; - mfilenames[counter]->alreadyRead[numTF] = true; - return fileAndFolder; + return {fmt::format("DF_{}", mfilenames[counter]->listOfTimeFrameNumbers[numTF]), mCurrentFilesystem}; } DataInputDescriptor* DataInputDescriptor::getParentFile(int counter, int numTF, std::string treename) @@ -221,17 +247,19 @@ DataInputDescriptor* DataInputDescriptor::getParentFile(int counter, int numTF, // This file has no parent map return nullptr; } - auto folderName = (mfilenames[counter]->listOfTimeFrameKeys)[numTF]; + auto folderName = fmt::format("DF_{}", mfilenames[counter]->listOfTimeFrameNumbers[numTF]); auto parentFileName = (TObjString*)mParentFileMap->GetValue(folderName.c_str()); + // The current DF is not found in the parent map (this should not happen and is a fatal error) + auto rootFS = std::dynamic_pointer_cast(mCurrentFilesystem); if (!parentFileName) { - // The current DF is not found in the parent map (this should not happen and is a fatal error) - throw std::runtime_error(fmt::format(R"(parent file map exists but does not contain the current DF "{}" in file "{}")", folderName.c_str(), mcurrentFile->GetName())); + throw std::runtime_error(fmt::format(R"(parent file map exists but does not contain the current DF "{}" in file "{}")", folderName.c_str(), rootFS->GetFile()->GetName())); return nullptr; } if (mParentFile) { // Is this still the corresponding to the correct file? - if (parentFileName->GetString().CompareTo(mParentFile->mcurrentFile->GetName()) == 0) { + auto parentRootFS = std::dynamic_pointer_cast(mParentFile->mCurrentFilesystem); + if (parentFileName->GetString().CompareTo(parentRootFS->GetFile()->GetName()) == 0) { return mParentFile; } else { mParentFile->closeInputFile(); @@ -241,7 +269,8 @@ DataInputDescriptor* DataInputDescriptor::getParentFile(int counter, int numTF, } if (mLevel == mAllowedParentLevel) { - throw std::runtime_error(fmt::format(R"(while looking for tree "{}", the parent file was requested but we are already at level {} of maximal allowed level {} for DF "{}" in file "{}")", treename.c_str(), mLevel, mAllowedParentLevel, folderName.c_str(), mcurrentFile->GetName())); + throw std::runtime_error(fmt::format(R"(while looking for tree "{}", the parent file was requested but we are already at level {} of maximal allowed level {} for DF "{}" in file "{}")", treename.c_str(), mLevel, mAllowedParentLevel, folderName.c_str(), + rootFS->GetFile()->GetName())); } LOGP(info, "Opening parent file {} for DF {}", parentFileName->GetString().Data(), folderName.c_str()); @@ -270,11 +299,13 @@ void DataInputDescriptor::printFileStatistics() if (wait_time < 0) { wait_time = 0; } - std::string monitoringInfo(fmt::format("lfn={},size={},total_df={},read_df={},read_bytes={},read_calls={},io_time={:.1f},wait_time={:.1f},level={}", mcurrentFile->GetName(), - mcurrentFile->GetSize(), getTimeFramesInFile(mCurrentFileID), getReadTimeFramesInFile(mCurrentFileID), mcurrentFile->GetBytesRead(), mcurrentFile->GetReadCalls(), + auto rootFS = std::dynamic_pointer_cast(mCurrentFilesystem); + auto f = dynamic_cast(rootFS->GetFile()); + std::string monitoringInfo(fmt::format("lfn={},size={},total_df={},read_df={},read_bytes={},read_calls={},io_time={:.1f},wait_time={:.1f},level={}", f->GetName(), + f->GetSize(), getTimeFramesInFile(mCurrentFileID), getReadTimeFramesInFile(mCurrentFileID), f->GetBytesRead(), f->GetReadCalls(), ((float)mIOTime / 1e9), ((float)wait_time / 1e9), mLevel)); #if __has_include() - auto alienFile = dynamic_cast(mcurrentFile); + auto alienFile = dynamic_cast(f); if (alienFile) { monitoringInfo += fmt::format(",se={},open_time={:.1f}", alienFile->GetSE(), alienFile->GetElapsed()); } @@ -285,7 +316,7 @@ void DataInputDescriptor::printFileStatistics() void DataInputDescriptor::closeInputFile() { - if (mcurrentFile) { + if (mCurrentFilesystem.get()) { if (mParentFile) { mParentFile->closeInputFile(); delete mParentFile; @@ -296,9 +327,7 @@ void DataInputDescriptor::closeInputFile() mParentFileMap = nullptr; printFileStatistics(); - mcurrentFile->Close(); - delete mcurrentFile; - mcurrentFile = nullptr; + mCurrentFilesystem.reset(); } } @@ -346,8 +375,8 @@ int DataInputDescriptor::fillInputfiles() int DataInputDescriptor::findDFNumber(int file, std::string dfName) { - auto dfList = mfilenames[file]->listOfTimeFrameKeys; - auto it = std::find(dfList.begin(), dfList.end(), dfName); + auto dfList = mfilenames[file]->listOfTimeFrameNumbers; + auto it = std::find_if(dfList.begin(), dfList.end(), [dfName](size_t i) { return fmt::format("DF_{}", i) == dfName; }); if (it == dfList.end()) { return -1; } @@ -358,40 +387,76 @@ bool DataInputDescriptor::readTree(DataAllocator& outputs, header::DataHeader dh { auto ioStart = uv_hrtime(); - auto fileAndFolder = getFileFolder(counter, numTF); - if (!fileAndFolder.file) { + auto folder = getFileFolder(counter, numTF); + if (!folder.filesystem()) { return false; } - auto fullpath = fileAndFolder.folderName + "/" + treename; - auto tree = (TTree*)fileAndFolder.file->Get(fullpath.c_str()); + auto rootFS = std::dynamic_pointer_cast(folder.filesystem()); + + if (!rootFS) { + throw std::runtime_error(fmt::format(R"(Not a TFile filesystem!)")); + } + // FIXME: Ugly. We should detect the format from the treename, good enough for now. + std::shared_ptr format; + FragmentToBatch::StreamerCreator creator = nullptr; + + auto fullpath = arrow::dataset::FileSource{folder.path() + "/" + treename, folder.filesystem()}; + + for (auto& capability : mFactory.capabilities) { + auto objectPath = capability.lfn2objectPath(fullpath.path()); + void* handle = capability.getHandle(rootFS, objectPath); + if (handle) { + format = capability.factory().format(); + creator = capability.factory().deferredOutputStreamer; + break; + } + } + + if (!format) { + throw std::runtime_error(fmt::format(R"(Cannot find a viable format for object {}!)", fullpath.path())); + } + + auto schemaOpt = format->Inspect(fullpath); + auto physicalSchema = schemaOpt; + std::vector> fields; + for (auto& original : (*schemaOpt)->fields()) { + if (original->name().ends_with("_size")) { + continue; + } + fields.push_back(original); + } + auto datasetSchema = std::make_shared(fields); + + auto fragment = format->MakeFragment(fullpath, {}, *physicalSchema); - if (!tree) { - LOGP(debug, "Could not find tree {}. Trying in parent file.", fullpath.c_str()); + if (!fragment.ok()) { + LOGP(debug, "Could not find tree {}. Trying in parent file.", fullpath.path()); auto parentFile = getParentFile(counter, numTF, treename); if (parentFile != nullptr) { - int parentNumTF = parentFile->findDFNumber(0, fileAndFolder.folderName); + int parentNumTF = parentFile->findDFNumber(0, folder.path()); if (parentNumTF == -1) { - throw std::runtime_error(fmt::format(R"(DF {} listed in parent file map but not found in the corresponding file "{}")", fileAndFolder.folderName, parentFile->mcurrentFile->GetName())); + auto parentRootFS = std::dynamic_pointer_cast(parentFile->mCurrentFilesystem); + throw std::runtime_error(fmt::format(R"(DF {} listed in parent file map but not found in the corresponding file "{}")", folder.path(), parentRootFS->GetFile()->GetName())); } // first argument is 0 as the parent file object contains only 1 file return parentFile->readTree(outputs, dh, 0, parentNumTF, treename, totalSizeCompressed, totalSizeUncompressed); } - throw std::runtime_error(fmt::format(R"(Couldn't get TTree "{}" from "{}". Please check https://aliceo2group.github.io/analysis-framework/docs/troubleshooting/#tree-not-found for more information.)", fileAndFolder.folderName + "/" + treename, fileAndFolder.file->GetName())); + auto rootFS = std::dynamic_pointer_cast(mCurrentFilesystem); + throw std::runtime_error(fmt::format(R"(Couldn't get TTree "{}" from "{}". Please check https://aliceo2group.github.io/analysis-framework/docs/troubleshooting/#tree-not-found for more information.)", fullpath.path(), rootFS->GetFile()->GetName())); } // create table output auto o = Output(dh); - auto t2t = outputs.make(o); - // add branches to read - // fill the table - t2t->setLabel(tree->GetName()); - totalSizeCompressed += tree->GetZipBytes(); - totalSizeUncompressed += tree->GetTotBytes(); - t2t->addAllColumns(tree); - t2t->fill(tree); - delete tree; + // FIXME: This should allow me to create a memory pool + // which I can then use to scan the dataset. + auto f2b = outputs.make(o, creator, *fragment); + + //// add branches to read + //// fill the table + f2b->setLabel(treename.c_str()); + f2b->fill(datasetSchema, format); mIOTime += (uv_hrtime() - ioStart); @@ -693,7 +758,7 @@ DataInputDescriptor* DataInputDirector::getDataInputDescriptor(header::DataHeade return result; } -FileAndFolder DataInputDirector::getFileFolder(header::DataHeader dh, int counter, int numTF) +arrow::dataset::FileSource DataInputDirector::getFileFolder(header::DataHeader dh, int counter, int numTF) { auto didesc = getDataInputDescriptor(dh); // if NOT match then use defaultDataInputDescriptor diff --git a/Framework/AnalysisSupport/src/DataInputDirector.h b/Framework/AnalysisSupport/src/DataInputDirector.h index eca0ef195d111..9bab29db3ff24 100644 --- a/Framework/AnalysisSupport/src/DataInputDirector.h +++ b/Framework/AnalysisSupport/src/DataInputDirector.h @@ -15,6 +15,10 @@ #include "Framework/DataDescriptorMatcher.h" #include "Framework/DataAllocator.h" +#include "Framework/RootArrowFilesystem.h" + +#include +#include #include #include "rapidjson/fwd.h" @@ -31,16 +35,10 @@ struct FileNameHolder { std::string fileName; int numberOfTimeFrames = 0; std::vector listOfTimeFrameNumbers; - std::vector listOfTimeFrameKeys; std::vector alreadyRead; }; FileNameHolder* makeFileNameHolder(std::string fileName); -struct FileAndFolder { - TFile* file = nullptr; - std::string folderName = ""; -}; - class DataInputDescriptor { /// Holds information concerning the reading of an aod table. @@ -52,7 +50,6 @@ class DataInputDescriptor std::string treename = ""; std::unique_ptr matcher; - DataInputDescriptor() = default; DataInputDescriptor(bool alienSupport, int level, o2::monitoring::Monitoring* monitoring = nullptr, int allowedParentLevel = 0, std::string parentFileReplacement = ""); void printOut(); @@ -78,7 +75,7 @@ class DataInputDescriptor int findDFNumber(int file, std::string dfName); uint64_t getTimeFrameNumber(int counter, int numTF); - FileAndFolder getFileFolder(int counter, int numTF); + arrow::dataset::FileSource getFileFolder(int counter, int numTF); DataInputDescriptor* getParentFile(int counter, int numTF, std::string treename); int getTimeFramesInFile(int counter); int getReadTimeFramesInFile(int counter); @@ -90,6 +87,7 @@ class DataInputDescriptor bool isAlienSupportOn() { return mAlienSupport; } private: + o2::framework::RootObjectReadingFactory mFactory; std::string minputfilesFile = ""; std::string* minputfilesFilePtr = nullptr; std::string mFilenameRegex = ""; @@ -98,7 +96,7 @@ class DataInputDescriptor std::string mParentFileReplacement; std::vector mfilenames; std::vector* mdefaultFilenamesPtr = nullptr; - TFile* mcurrentFile = nullptr; + std::shared_ptr mCurrentFilesystem; int mCurrentFileID = -1; bool mAlienSupport = false; @@ -127,7 +125,6 @@ class DataInputDirector ~DataInputDirector(); void reset(); - void createDefaultDataInputDescriptor(); void printOut(); bool atEnd(int counter); @@ -140,10 +137,11 @@ class DataInputDirector // getters DataInputDescriptor* getDataInputDescriptor(header::DataHeader dh); int getNumberInputDescriptors() { return mdataInputDescriptors.size(); } + void createDefaultDataInputDescriptor(); bool readTree(DataAllocator& outputs, header::DataHeader dh, int counter, int numTF, size_t& totalSizeCompressed, size_t& totalSizeUncompressed); uint64_t getTimeFrameNumber(header::DataHeader dh, int counter, int numTF); - FileAndFolder getFileFolder(header::DataHeader dh, int counter, int numTF); + arrow::dataset::FileSource getFileFolder(header::DataHeader dh, int counter, int numTF); int getTimeFramesInFile(header::DataHeader dh, int counter); uint64_t getTotalSizeCompressed(); diff --git a/Framework/AnalysisSupport/src/Plugin.cxx b/Framework/AnalysisSupport/src/Plugin.cxx index e3a39761e8049..033adc461c600 100644 --- a/Framework/AnalysisSupport/src/Plugin.cxx +++ b/Framework/AnalysisSupport/src/Plugin.cxx @@ -85,20 +85,48 @@ std::vector getListOfTables(std::unique_ptr& f) { std::vector r; TList* keyList = f->GetListOfKeys(); + // We should handle two cases, one where the list of tables in a TDirectory, + // the other one where the dataframe number is just a prefix + std::string first = ""; for (auto key : *keyList) { - if (!std::string_view(key->GetName()).starts_with("DF_")) { + if (!std::string_view(key->GetName()).starts_with("DF_") && !std::string_view(key->GetName()).starts_with("/DF_")) { continue; } - auto* d = (TDirectory*)f->Get(key->GetName()); - TList* branchList = d->GetListOfKeys(); - for (auto b : *branchList) { - r.emplace_back(b->GetName()); + auto* d = (TDirectory*)f->GetObjectChecked(key->GetName(), TClass::GetClass("TDirectory")); + // Objects are in a folder, list it. + if (d) { + TList* branchList = d->GetListOfKeys(); + for (auto b : *branchList) { + r.emplace_back(b->GetName()); + } + break; + } + + void* v = f->GetObjectChecked(key->GetName(), TClass::GetClass("ROOT::Experimental::RNTuple")); + if (v) { + std::string s = key->GetName(); + size_t pos = s.find('-'); + // Check if '-' is found + // Skip metaData and parentFiles + if (pos == std::string::npos) { + continue; + } + std::string t = s.substr(pos + 1); + // If we find a duplicate table name, it means we are in the next DF and we can stop. + if (t == first) { + break; + } + if (first.empty()) { + first = t; + } + // Create a new string starting after the '-' + r.emplace_back(t); } - break; } return r; } + auto readMetadata(std::unique_ptr& currentFile) -> std::vector { // Get the metadata, if any diff --git a/Framework/AnalysisSupport/src/TTreePlugin.cxx b/Framework/AnalysisSupport/src/TTreePlugin.cxx index 881f7d6edc117..f36f309404699 100644 --- a/Framework/AnalysisSupport/src/TTreePlugin.cxx +++ b/Framework/AnalysisSupport/src/TTreePlugin.cxx @@ -633,6 +633,10 @@ arrow::Result TTreeFileFormat::ScanBatchesAsync( mappings.push_back({physicalFieldIdx, physicalFieldIdx - 1, fi}); opsCount += 2; } else { + if (physicalFieldIdx > 1) { + O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Generator", "Field %{public}s previous field is %{public}s.", dataset_field->name().c_str(), + physical_schema->field(physicalFieldIdx - 1)->name().c_str()); + } mappings.push_back({physicalFieldIdx, -1, fi}); opsCount++; } diff --git a/Framework/Core/include/Framework/TableTreeHelpers.h b/Framework/Core/include/Framework/TableTreeHelpers.h index c6a769e579fb7..92725d186ee33 100644 --- a/Framework/Core/include/Framework/TableTreeHelpers.h +++ b/Framework/Core/include/Framework/TableTreeHelpers.h @@ -11,6 +11,8 @@ #ifndef O2_FRAMEWORK_TABLETREEHELPERS_H_ #define O2_FRAMEWORK_TABLETREEHELPERS_H_ +#include +#include #include #include "TFile.h" #include "TTreeReader.h" @@ -146,15 +148,25 @@ class TreeToTable class FragmentToBatch { public: - FragmentToBatch(arrow::MemoryPool* pool = arrow::default_memory_pool()); + // The function to be used to create the required stream. + using StreamerCreator = std::function(std::shared_ptr, const std::shared_ptr& buffer)>; + + FragmentToBatch(StreamerCreator, std::shared_ptr, arrow::MemoryPool* pool = arrow::default_memory_pool()); void setLabel(const char* label); - void fill(std::shared_ptr, std::shared_ptr dataSetSchema, std::shared_ptr); + void fill(std::shared_ptr dataSetSchema, std::shared_ptr); std::shared_ptr finalize(); + std::shared_ptr streamer(std::shared_ptr buffer) + { + return mCreator(mFragment, buffer); + } + private: + std::shared_ptr mFragment; arrow::MemoryPool* mArrowMemoryPool = nullptr; std::string mTableLabel; std::shared_ptr mRecordBatch; + StreamerCreator mCreator; }; // ----------------------------------------------------------------------------- diff --git a/Framework/Core/src/DataAllocator.cxx b/Framework/Core/src/DataAllocator.cxx index c310892c4c490..b735eee1f3308 100644 --- a/Framework/Core/src/DataAllocator.cxx +++ b/Framework/Core/src/DataAllocator.cxx @@ -211,34 +211,6 @@ void doWriteTable(std::shared_ptr b, arrow::Table* table) } } -void doWriteBatch(std::shared_ptr b, arrow::RecordBatch* batch) -{ - auto mock = std::make_shared(); - int64_t expectedSize = 0; - auto mockWriter = arrow::ipc::MakeStreamWriter(mock.get(), batch->schema()); - arrow::Status outStatus = mockWriter.ValueOrDie()->WriteRecordBatch(*batch); - - expectedSize = mock->Tell().ValueOrDie(); - auto reserve = b->Reserve(expectedSize); - if (reserve.ok() == false) { - throw std::runtime_error("Unable to reserve memory for table"); - } - - auto stream = std::make_shared(b); - // This is a copy maybe we can finally get rid of it by having using the - // dataset API? - auto outBatch = arrow::ipc::MakeStreamWriter(stream.get(), batch->schema()); - if (outBatch.ok() == false) { - throw ::std::runtime_error("Unable to create batch writer"); - } - - outStatus = outBatch.ValueOrDie()->WriteRecordBatch(*batch); - - if (outStatus.ok() == false) { - throw std::runtime_error("Unable to Write batch"); - } -} - void DataAllocator::adopt(const Output& spec, LifetimeHolder& tb) { auto& timingInfo = mRegistry.get(); @@ -318,16 +290,35 @@ void DataAllocator::adopt(const Output& spec, LifetimeHolder& f // Serialization happens in here, so that we can // get rid of the intermediate tree 2 table object, saving memory. auto batch = source.finalize(); - doWriteBatch(buffer, batch.get()); + auto mock = std::make_shared(); + int64_t expectedSize = 0; + auto mockWriter = arrow::ipc::MakeStreamWriter(mock.get(), batch->schema()); + arrow::Status outStatus = mockWriter.ValueOrDie()->WriteRecordBatch(*batch); + + expectedSize = mock->Tell().ValueOrDie(); + auto reserve = buffer->Reserve(expectedSize); + if (reserve.ok() == false) { + throw std::runtime_error("Unable to reserve memory for table"); + } + + auto deferredWriterStream = source.streamer(buffer); + + auto outBatch = arrow::ipc::MakeStreamWriter(deferredWriterStream, batch->schema()); + if (outBatch.ok() == false) { + throw ::std::runtime_error("Unable to create batch writer"); + } + + outStatus = outBatch.ValueOrDie()->WriteRecordBatch(*batch); + + if (outStatus.ok() == false) { + throw std::runtime_error("Unable to Write batch"); + } // deletion happens in the caller }; - /// To finalise this we write the table to the buffer. - /// FIXME: most likely not a great idea. We should probably write to the buffer - /// directly in the TableBuilder, incrementally. auto finalizer = [](std::shared_ptr b) -> void { // This is empty because we already serialised the object when - // the LifetimeHolder goes out of scope. + // the LifetimeHolder goes out of scope. See code above. }; context.addBuffer(std::move(header), buffer, std::move(finalizer), routeIndex); diff --git a/Framework/Core/src/TableTreeHelpers.cxx b/Framework/Core/src/TableTreeHelpers.cxx index 2f23c07aea451..84d4ff171bc39 100644 --- a/Framework/Core/src/TableTreeHelpers.cxx +++ b/Framework/Core/src/TableTreeHelpers.cxx @@ -13,7 +13,6 @@ #include "Framework/Endian.h" #include "Framework/Signpost.h" -#include "arrow/type_traits.h" #include #include #include @@ -533,7 +532,7 @@ void TreeToTable::setLabel(const char* label) mTableLabel = label; } -void TreeToTable::fill(TTree*tree) +void TreeToTable::fill(TTree* tree) { std::vector> columns; std::vector> fields; @@ -569,8 +568,10 @@ std::shared_ptr TreeToTable::finalize() return mTable; } -FragmentToBatch::FragmentToBatch(arrow::MemoryPool* pool) - : mArrowMemoryPool{pool} +FragmentToBatch::FragmentToBatch(StreamerCreator creator, std::shared_ptr fragment, arrow::MemoryPool* pool) + : mFragment{std::move(fragment)}, + mArrowMemoryPool{pool}, + mCreator{std::move(creator)} { } @@ -579,13 +580,14 @@ void FragmentToBatch::setLabel(const char* label) mTableLabel = label; } -void FragmentToBatch::fill(std::shared_ptr fragment, std::shared_ptr schema, std::shared_ptr format) +void FragmentToBatch::fill(std::shared_ptr schema, std::shared_ptr format) { auto options = std::make_shared(); options->dataset_schema = schema; - auto scanner = format->ScanBatchesAsync(options, fragment); + auto scanner = format->ScanBatchesAsync(options, mFragment); auto batch = (*scanner)(); mRecordBatch = *batch.result(); + // Notice that up to here the buffer was not yet filled. } std::shared_ptr FragmentToBatch::finalize() diff --git a/Framework/TestWorkflows/src/o2TestHistograms.cxx b/Framework/TestWorkflows/src/o2TestHistograms.cxx index efac16f6da4f0..2ec268130267b 100644 --- a/Framework/TestWorkflows/src/o2TestHistograms.cxx +++ b/Framework/TestWorkflows/src/o2TestHistograms.cxx @@ -40,7 +40,7 @@ struct EtaAndClsHistogramsSimple { OutputObj etaClsH{TH2F("eta_vs_pt", "#eta vs pT", 102, -2.01, 2.01, 100, 0, 10)}; Produces skimEx; - void process(aod::Tracks const& tracks) + void process(aod::Tracks const& tracks, aod::FT0s const&) { LOGP(info, "Invoking the simple one"); for (auto& track : tracks) { @@ -54,7 +54,7 @@ struct EtaAndClsHistogramsIUSimple { OutputObj etaClsH{TH2F("eta_vs_pt", "#eta vs pT", 102, -2.01, 2.01, 100, 0, 10)}; Produces skimEx; - void process(aod::TracksIU const& tracks) + void process(aod::TracksIU const& tracks, aod::FT0s const&) { LOGP(info, "Invoking the simple one"); for (auto& track : tracks) {