From b4c5882544e0ed86a57f2e6ae166f45d9d4e301d Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Thu, 20 Feb 2025 12:30:06 +0100 Subject: [PATCH] DPL Analysis: Improve debugging of AOD / Histograms Writer --- .../AnalysisSupport/src/AODWriterHelpers.cxx | 81 +++++++++++++------ 1 file changed, 56 insertions(+), 25 deletions(-) diff --git a/Framework/AnalysisSupport/src/AODWriterHelpers.cxx b/Framework/AnalysisSupport/src/AODWriterHelpers.cxx index fa10d4661f537..af7037d077fb6 100644 --- a/Framework/AnalysisSupport/src/AODWriterHelpers.cxx +++ b/Framework/AnalysisSupport/src/AODWriterHelpers.cxx @@ -21,14 +21,18 @@ #include "Framework/TableConsumer.h" #include "Framework/DataOutputDirector.h" #include "Framework/TableTreeHelpers.h" +#include "Framework/Signpost.h" -#include #include #include +#include #include #include #include +O2_DECLARE_DYNAMIC_LOG(histograms); +O2_DECLARE_DYNAMIC_LOG(derived_data); + namespace o2::framework::writers { @@ -102,12 +106,13 @@ AlgorithmSpec AODWriterHelpers::getOutputTTreeWriter(ConfigContext const& ctx) // this functor is called once per time frame return [dod, tfNumbers, tfFilenames, aodMetaDataKeys, aodMetaDataVals, compressionLevel](ProcessingContext& pc) mutable -> void { - LOGP(debug, "======== getGlobalAODSink::processing =========="); - LOGP(debug, " processing data set with {} entries", pc.inputs().size()); + O2_SIGNPOST_ID_GENERATE(hid, histograms); + O2_SIGNPOST_START(derived_data, hid, "getGlobalAODSink", "Processing dataset with %zu entries.", pc.inputs().size()); // return immediately if pc.inputs() is empty. This should never happen! if (pc.inputs().size() == 0) { - LOGP(info, "No inputs available!"); + O2_SIGNPOST_EVENT_EMIT(derived_data, hid, "getGlobalAODSink", "Processing dataset with %zu entries.", pc.inputs().size()); + O2_SIGNPOST_END(derived_data, hid, "getGlobalAODSink", "Done processing."); return; } @@ -135,7 +140,7 @@ AlgorithmSpec AODWriterHelpers::getOutputTTreeWriter(ConfigContext const& ctx) // loop over the DataRefs which are contained in pc.inputs() for (const auto& ref : pc.inputs()) { if (!ref.spec) { - LOGP(debug, "Invalid input will be skipped!"); + O2_SIGNPOST_EVENT_EMIT_ERROR(derived_data, hid, "getGlobalAODSink", "Invalid input will be skipped!"); continue; } @@ -178,17 +183,18 @@ AlgorithmSpec AODWriterHelpers::getOutputTTreeWriter(ConfigContext const& ctx) // get the TableConsumer and corresponding arrow table auto msg = pc.inputs().get(ref.spec->binding); if (msg.header == nullptr) { - LOGP(error, "No header for message {}:{}", ref.spec->binding, DataSpecUtils::describe(*ref.spec)); + O2_SIGNPOST_EVENT_EMIT_ERROR(derived_data, hid, "getGlobalAODSink", "No header for message %{public}s:%{public}s", + ref.spec->binding.c_str(), DataSpecUtils::describe(*ref.spec).c_str()); continue; } auto s = pc.inputs().get(ref.spec->binding); auto table = s->asArrowTable(); if (!table->Validate().ok()) { - LOGP(warning, "The table \"{}\" is not valid and will not be saved!", tableName); + O2_SIGNPOST_EVENT_EMIT_WARN(derived_data, hid, "getGlobalAODSink", "The table \"%{public}s\" is not valid and will not be saved!", tableName.c_str()); continue; } if (table->schema()->fields().empty()) { - LOGP(debug, "The table \"{}\" is empty but will be saved anyway!", tableName); + O2_SIGNPOST_EVENT_EMIT(derived_data, hid, "getGlobalAODSink", "The table \"%{public}s\" is empty but will be saved anyway!", tableName.c_str()); } // loop over all DataOutputDescriptors @@ -203,7 +209,8 @@ AlgorithmSpec AODWriterHelpers::getOutputTTreeWriter(ConfigContext const& ctx) // update metadata if (fileAndFolder.file->FindObjectAny("metaData")) { - LOGF(debug, "Metadata: target file %s already has metadata, preserving it", fileAndFolder.file->GetName()); + O2_SIGNPOST_EVENT_EMIT(derived_data, hid, "getGlobalAODSink", "Metadata: target file %{public}s already has metadata, preserving it", + fileAndFolder.file->GetName()); } else if (!aodMetaDataKeys.empty() && !aodMetaDataVals.empty()) { TMap aodMetaDataMap; for (uint32_t imd = 0; imd < aodMetaDataKeys.size(); imd++) { @@ -227,6 +234,7 @@ AlgorithmSpec AODWriterHelpers::getOutputTTreeWriter(ConfigContext const& ctx) ta2tr.process(); } } + O2_SIGNPOST_END(derived_data, hid, "getGlobalAODSink", "Done processing."); }; } @@ -252,9 +260,11 @@ AlgorithmSpec AODWriterHelpers::getOutputObjHistWriter(ConfigContext const& ctx) static std::string currentFile = ""; auto endofdatacb = [inputObjects](EndOfStreamContext& context) { - LOG(debug) << "Writing merged objects and histograms to file"; + O2_SIGNPOST_ID_GENERATE(hid, histograms); + O2_SIGNPOST_START(histograms, hid, "getOutputObjHistWriter", "Writing merged objects and histograms to file"); if (inputObjects->empty()) { - LOG(error) << "Output object map is empty!"; + O2_SIGNPOST_EVENT_EMIT_ERROR(histograms, hid, "getOutputObjHistWriter", "Output object map is empty!"); + O2_SIGNPOST_END(histograms, hid, "getOutputObjHistWriter", "Writing completed with error."); context.services().get().readyToQuit(QuitRequest::Me); return; } @@ -263,30 +273,39 @@ AlgorithmSpec AODWriterHelpers::getOutputObjHistWriter(ConfigContext const& ctx) f[i]->Close(); } } - LOG(debug) << "All outputs merged in their respective target files"; + O2_SIGNPOST_END(histograms, hid, "getOutputObjHistWriter", "Writing completed correctly."); context.services().get().readyToQuit(QuitRequest::Me); }; callbacks.set(endofdatacb); return [inputObjects, objmap, tskmap](ProcessingContext& pc) mutable -> void { + O2_SIGNPOST_ID_GENERATE(hid, histograms); + O2_SIGNPOST_START(histograms, hid, "getOutputObjHistWriter", "Processing dataset with %zu entries.", pc.inputs().size()); auto const& ref = pc.inputs().get("x"); if (!ref.header) { - LOG(error) << "Header not found"; - return; - } - if (!ref.payload) { - LOG(error) << "Payload not found"; + O2_SIGNPOST_EVENT_EMIT_ERROR(histograms, hid, "getOutputObjHistWriter", "Header not found."); + O2_SIGNPOST_END(histograms, hid, "getOutputObjHistWriter", "Processing completed with error."); return; } auto datah = o2::header::get(ref.header); if (!datah) { - LOG(error) << "No data header in stack"; + O2_SIGNPOST_EVENT_EMIT_ERROR(histograms, hid, "getOutputObjHistWriter", "No data header in stack."); + O2_SIGNPOST_END(histograms, hid, "getOutputObjHistWriter", "Processing completed with error."); + return; + } + + if (!ref.payload) { + O2_SIGNPOST_EVENT_EMIT_ERROR(histograms, hid, "getOutputObjHistWriter", "Payload not found for %{public}s.", + datah->dataDescription.as().c_str()); + O2_SIGNPOST_END(histograms, hid, "getOutputObjHistWriter", "Processing completed with error."); return; } auto objh = o2::header::get(ref.header); if (!objh) { - LOG(error) << "No output object header in stack"; + O2_SIGNPOST_EVENT_EMIT_ERROR(histograms, hid, "getOutputObjHistWriter", "No output object in stack %{public}s.", + datah->dataDescription.as().c_str()); + O2_SIGNPOST_END(histograms, hid, "getOutputObjHistWriter", "Processing completed with error."); return; } @@ -297,7 +316,8 @@ AlgorithmSpec AODWriterHelpers::getOutputObjHistWriter(ConfigContext const& ctx) tm.SetBufferOffset(0); tm.ResetMap(); if (obj.kind == nullptr) { - LOG(error) << "Cannot read class info from buffer."; + O2_SIGNPOST_EVENT_EMIT_ERROR(histograms, hid, "getOutputObjHistWriter", "Cannot read class info from buffer."); + O2_SIGNPOST_END(histograms, hid, "getOutputObjHistWriter", "Processing completed with error."); return; } @@ -310,18 +330,23 @@ AlgorithmSpec AODWriterHelpers::getOutputObjHistWriter(ConfigContext const& ctx) obj.name = named->GetName(); auto hpos = std::find_if(tskmap.begin(), tskmap.end(), [&](auto&& x) { return x.id == hash; }); if (hpos == tskmap.end()) { - LOG(error) << "No task found for hash " << hash; + O2_SIGNPOST_EVENT_EMIT_ERROR(histograms, hid, "getOutputObjHistWriter", "No task found for hash %d", hash); + O2_SIGNPOST_END(histograms, hid, "getOutputObjHistWriter", "Processing completed with error."); return; } auto taskname = hpos->name; auto opos = std::find_if(objmap.begin(), objmap.end(), [&](auto&& x) { return x.id == hash; }); if (opos == objmap.end()) { - LOG(error) << "No object list found for task " << taskname << " (hash=" << hash << ")"; + O2_SIGNPOST_EVENT_EMIT_ERROR(histograms, hid, "getOutputObjHistWriter", "No object list found for task %{public}s (hash=%d)", + taskname.c_str(), hash); + O2_SIGNPOST_END(histograms, hid, "getOutputObjHistWriter", "Processing completed with error."); return; } auto objects = opos->bindings; if (std::find(objects.begin(), objects.end(), obj.name) == objects.end()) { - LOG(error) << "No object " << obj.name << " in map for task " << taskname; + O2_SIGNPOST_EVENT_EMIT_ERROR(histograms, hid, "getOutputObjHistWriter", "No object %{public}s in map for task %{public}s", + obj.name.c_str(), taskname.c_str()); + O2_SIGNPOST_END(histograms, hid, "getOutputObjHistWriter", "Processing completed with error."); return; } auto nameHash = runtime_hash(obj.name.c_str()); @@ -330,14 +355,16 @@ AlgorithmSpec AODWriterHelpers::getOutputObjHistWriter(ConfigContext const& ctx) // If it's the first one, we just add it to the list. if (existing == inputObjects->end()) { obj.count = objh->mPipelineSize; - inputObjects->push_back(std::make_pair(key, obj)); + inputObjects->emplace_back(key, obj); existing = inputObjects->end() - 1; } else { obj.count = existing->second.count; // Otherwise, we merge it with the existing one. auto merger = existing->second.kind->GetMerge(); if (!merger) { - LOG(error) << "Already one unmergeable object found for " << obj.name; + O2_SIGNPOST_EVENT_EMIT_ERROR(histograms, hid, "getOutputObjHistWriter", "Already one unmergeable object found for %{public}s", + obj.name.c_str()); + O2_SIGNPOST_END(histograms, hid, "getOutputObjHistWriter", "Processing completed with error."); return; } TList coll; @@ -349,13 +376,16 @@ AlgorithmSpec AODWriterHelpers::getOutputObjHistWriter(ConfigContext const& ctx) existing->second.count -= 1; if (existing->second.count != 0) { + O2_SIGNPOST_END(histograms, hid, "getOutputObjHistWriter", "Still waiting for %d histograms to arrive.", existing->second.count); return; } + O2_SIGNPOST_EVENT_EMIT(histograms, hid, "getOutputObjHistWriter", "All histogramsa are there. Writing to disk."); // Write the object here. auto route = existing->first; auto entry = existing->second; auto file = ROOTfileNames.find(route.policy); if (file == ROOTfileNames.end()) { + O2_SIGNPOST_END(histograms, hid, "getOutputObjHistWriter", "Could not find where to write object."); return; } auto filename = file->second; @@ -408,6 +438,7 @@ AlgorithmSpec AODWriterHelpers::getOutputObjHistWriter(ConfigContext const& ctx) delete (TObject*)entry.obj; entry.obj = nullptr; } + O2_SIGNPOST_END(histograms, hid, "getOutputObjHistWriter", "Done processing histogram."); }; }}; }