diff --git a/Framework/AnalysisSupport/src/AODWriterHelpers.cxx b/Framework/AnalysisSupport/src/AODWriterHelpers.cxx index 2b1b4f880d1ee..40d2189ea96d0 100644 --- a/Framework/AnalysisSupport/src/AODWriterHelpers.cxx +++ b/Framework/AnalysisSupport/src/AODWriterHelpers.cxx @@ -269,145 +269,149 @@ AlgorithmSpec AODWriterHelpers::getOutputObjHistWriter(ConfigContext const& ctx) callbacks.set(endofdatacb); return [inputObjects, objmap, tskmap](ProcessingContext& pc) mutable -> void { - auto const& ref = pc.inputs().get("x"); - if (!ref.header) { - LOG(error) << "Header not found"; - return; - } - auto datah = o2::header::get(ref.header); - if (!datah) { - LOG(error) << "No data header in stack"; - return; - } + auto mergePart = [&inputObjects, &objmap, &tskmap](DataRef const& ref) { + if (!ref.header) { + LOG(error) << "Header not found"; + return; + } + auto datah = o2::header::get(ref.header); + if (!datah) { + LOG(error) << "No data header in stack"; + return; + } - if (!ref.payload) { - LOGP(error, "Payload not found for {}/{}/{}", datah->dataOrigin.as(), datah->dataDescription.as(), datah->subSpecification); - return; - } + if (!ref.payload) { + LOGP(error, "Payload not found for {}/{}/{}", datah->dataOrigin.as(), datah->dataDescription.as(), datah->subSpecification); + return; + } - auto objh = o2::header::get(ref.header); - if (!objh) { - LOGP(error, "No output object header in stack of {}/{}/{}", datah->dataOrigin.as(), datah->dataDescription.as(), datah->subSpecification); - return; - } + auto objh = o2::header::get(ref.header); + if (!objh) { + LOGP(error, "No output object header in stack of {}/{}/{}", datah->dataOrigin.as(), datah->dataDescription.as(), datah->subSpecification); + return; + } - InputObject obj; - FairInputTBuffer tm(const_cast(ref.payload), static_cast(datah->payloadSize)); - tm.InitMap(); - obj.kind = tm.ReadClass(); - tm.SetBufferOffset(0); - tm.ResetMap(); - if (obj.kind == nullptr) { - LOGP(error, "Cannot read class info from buffer of {}/{}/{}", datah->dataOrigin.as(), datah->dataDescription.as(), datah->subSpecification); - return; - } + InputObject obj; + FairInputTBuffer tm(const_cast(ref.payload), static_cast(datah->payloadSize)); + tm.InitMap(); + obj.kind = tm.ReadClass(); + tm.SetBufferOffset(0); + tm.ResetMap(); + if (obj.kind == nullptr) { + LOGP(error, "Cannot read class info from buffer of {}/{}/{}", datah->dataOrigin.as(), datah->dataDescription.as(), datah->subSpecification); + return; + } - auto policy = objh->mPolicy; - auto sourceType = objh->mSourceType; - auto hash = objh->mTaskHash; + auto policy = objh->mPolicy; + auto sourceType = objh->mSourceType; + auto hash = objh->mTaskHash; - obj.obj = tm.ReadObjectAny(obj.kind); - auto* named = static_cast(obj.obj); - obj.name = named->GetName(); - auto hpos = std::find_if(tskmap.begin(), tskmap.end(), [&](auto&& x) { return x.id == hash; }); - if (hpos == tskmap.end()) { - LOG(error) << "No task found for hash " << hash; - return; - } - auto taskname = hpos->name; - auto opos = std::find_if(objmap.begin(), objmap.end(), [&](auto&& x) { return x.id == hash; }); - if (opos == objmap.end()) { - LOG(error) << "No object list found for task " << taskname << " (hash=" << hash << ")"; - return; - } - auto objects = opos->bindings; - if (std::find(objects.begin(), objects.end(), obj.name) == objects.end()) { - LOG(error) << "No object " << obj.name << " in map for task " << taskname; - return; - } - auto nameHash = runtime_hash(obj.name.c_str()); - InputObjectRoute key{obj.name, nameHash, taskname, hash, policy, sourceType}; - auto existing = std::find_if(inputObjects->begin(), inputObjects->end(), [&](auto&& x) { return (x.first.uniqueId == nameHash) && (x.first.taskHash == hash); }); - // If it's the first one, we just add it to the list. - if (existing == inputObjects->end()) { - obj.count = objh->mPipelineSize; - inputObjects->push_back(std::make_pair(key, obj)); - existing = inputObjects->end() - 1; - } else { - obj.count = existing->second.count; - // Otherwise, we merge it with the existing one. - auto merger = existing->second.kind->GetMerge(); - if (!merger) { - LOG(error) << "Already one unmergeable object found for " << obj.name; + obj.obj = tm.ReadObjectAny(obj.kind); + auto* named = static_cast(obj.obj); + obj.name = named->GetName(); + auto hpos = std::find_if(tskmap.begin(), tskmap.end(), [&](auto&& x) { return x.id == hash; }); + if (hpos == tskmap.end()) { + LOG(error) << "No task found for hash " << hash; return; } - TList coll; - coll.Add(static_cast(obj.obj)); - merger(existing->second.obj, &coll, nullptr); - } - // We expect as many objects as the pipeline size, for - // a given object name and task hash. - existing->second.count -= 1; - - if (existing->second.count != 0) { - return; - } - // Write the object here. - auto route = existing->first; - auto entry = existing->second; - auto file = ROOTfileNames.find(route.policy); - if (file == ROOTfileNames.end()) { - return; - } - auto filename = file->second; - if (f[route.policy] == nullptr) { - f[route.policy] = TFile::Open(filename.c_str(), "RECREATE"); - } - auto nextDirectory = route.directory; - if ((nextDirectory != currentDirectory) || (filename != currentFile)) { - if (!f[route.policy]->FindKey(nextDirectory.c_str())) { - f[route.policy]->mkdir(nextDirectory.c_str()); + auto taskname = hpos->name; + auto opos = std::find_if(objmap.begin(), objmap.end(), [&](auto&& x) { return x.id == hash; }); + if (opos == objmap.end()) { + LOG(error) << "No object list found for task " << taskname << " (hash=" << hash << ")"; + return; } - currentDirectory = nextDirectory; - currentFile = filename; - } + auto objects = opos->bindings; + if (std::find(objects.begin(), objects.end(), obj.name) == objects.end()) { + LOG(error) << "No object " << obj.name << " in map for task " << taskname; + return; + } + auto nameHash = runtime_hash(obj.name.c_str()); + InputObjectRoute key{obj.name, nameHash, taskname, hash, policy, sourceType}; + auto existing = std::find_if(inputObjects->begin(), inputObjects->end(), [&](auto&& x) { return (x.first.uniqueId == nameHash) && (x.first.taskHash == hash); }); + // If it's the first one, we just add it to the list. + if (existing == inputObjects->end()) { + obj.count = objh->mPipelineSize; + inputObjects->push_back(std::make_pair(key, obj)); + existing = inputObjects->end() - 1; + } else { + obj.count = existing->second.count; + // Otherwise, we merge it with the existing one. + auto merger = existing->second.kind->GetMerge(); + if (!merger) { + LOG(error) << "Already one unmergeable object found for " << obj.name; + return; + } + TList coll; + coll.Add(static_cast(obj.obj)); + merger(existing->second.obj, &coll, nullptr); + } + // We expect as many objects as the pipeline size, for + // a given object name and task hash. + existing->second.count -= 1; - // translate the list-structure created by the registry into a directory structure within the file - std::function writeListToFile; - writeListToFile = [&](TList* list, TDirectory* parentDir) { - TIter next(list); - TObject* object = nullptr; - while ((object = next())) { - if (object->InheritsFrom(TList::Class())) { - writeListToFile(static_cast(object), parentDir->mkdir(object->GetName(), object->GetName(), true)); - } else { - parentDir->WriteObjectAny(object, object->Class(), object->GetName()); - auto* written = list->Remove(object); - delete written; + if (existing->second.count != 0) { + return; + } + // Write the object here. + auto route = existing->first; + auto entry = existing->second; + auto file = ROOTfileNames.find(route.policy); + if (file == ROOTfileNames.end()) { + return; + } + auto filename = file->second; + if (f[route.policy] == nullptr) { + f[route.policy] = TFile::Open(filename.c_str(), "RECREATE"); + } + auto nextDirectory = route.directory; + if ((nextDirectory != currentDirectory) || (filename != currentFile)) { + if (!f[route.policy]->FindKey(nextDirectory.c_str())) { + f[route.policy]->mkdir(nextDirectory.c_str()); } + currentDirectory = nextDirectory; + currentFile = filename; } - }; - TDirectory* currentDir = f[route.policy]->GetDirectory(currentDirectory.c_str()); - if (route.sourceType == OutputObjSourceType::HistogramRegistrySource) { - auto* outputList = static_cast(entry.obj); - outputList->SetOwner(false); + // translate the list-structure created by the registry into a directory structure within the file + std::function writeListToFile; + writeListToFile = [&](TList* list, TDirectory* parentDir) { + TIter next(list); + TObject* object = nullptr; + while ((object = next())) { + if (object->InheritsFrom(TList::Class())) { + writeListToFile(static_cast(object), parentDir->mkdir(object->GetName(), object->GetName(), true)); + } else { + parentDir->WriteObjectAny(object, object->Class(), object->GetName()); + auto* written = list->Remove(object); + delete written; + } + } + }; + + TDirectory* currentDir = f[route.policy]->GetDirectory(currentDirectory.c_str()); + if (route.sourceType == OutputObjSourceType::HistogramRegistrySource) { + auto* outputList = static_cast(entry.obj); + outputList->SetOwner(false); + + // if registry should live in dedicated folder a TNamed object is appended to the list + if (outputList->Last() && outputList->Last()->IsA() == TNamed::Class()) { + delete outputList->Last(); + outputList->RemoveLast(); + currentDir = currentDir->mkdir(outputList->GetName(), outputList->GetName(), true); + } - // if registry should live in dedicated folder a TNamed object is appended to the list - if (outputList->Last() && outputList->Last()->IsA() == TNamed::Class()) { - delete outputList->Last(); - outputList->RemoveLast(); - currentDir = currentDir->mkdir(outputList->GetName(), outputList->GetName(), true); + writeListToFile(outputList, currentDir); + outputList->SetOwner(); + delete outputList; + entry.obj = nullptr; + } else { + currentDir->WriteObjectAny(entry.obj, entry.kind, entry.name.c_str()); + delete (TObject*)entry.obj; + entry.obj = nullptr; } - - writeListToFile(outputList, currentDir); - outputList->SetOwner(); - delete outputList; - entry.obj = nullptr; - } else { - currentDir->WriteObjectAny(entry.obj, entry.kind, entry.name.c_str()); - delete (TObject*)entry.obj; - entry.obj = nullptr; + }; + for (int pi = 0; pi < pc.inputs().getNofParts(0); ++pi) { + mergePart(pc.inputs().get("x", pi)); } }; }};