Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 91 additions & 46 deletions Framework/AnalysisSupport/src/AODWriterHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,20 @@
#include "Framework/DataOutputDirector.h"
#include "Framework/TableTreeHelpers.h"
#include "Framework/Monitoring.h"
#include "Framework/Signpost.h"

#include <Monitoring/Monitoring.h>
#include <TDirectory.h>
#include <TFile.h>
#include <TFile.h>
#include <TTree.h>
#include <TMap.h>
#include <TObjString.h>
#include <arrow/table.h>
#include <chrono>
#include <ios>

O2_DECLARE_DYNAMIC_LOG(histogram_registry);

namespace o2::framework::writers
{
Expand All @@ -46,6 +52,7 @@ struct InputObjectRoute {
struct InputObject {
TClass* kind = nullptr;
void* obj = nullptr;
std::string container;
std::string name;
int count = -1;
};
Expand Down Expand Up @@ -273,24 +280,30 @@ AlgorithmSpec AODWriterHelpers::getOutputObjHistWriter(ConfigContext const& ctx)
callbacks.set<CallbackService::Id::EndOfStream>(endofdatacb);
return [inputObjects, objmap, tskmap](ProcessingContext& pc) mutable -> void {
auto mergePart = [&inputObjects, &objmap, &tskmap, &pc](DataRef const& ref) {
O2_SIGNPOST_ID_GENERATE(hid, histogram_registry);
O2_SIGNPOST_START(histogram_registry, hid, "mergePart", "Merging histogram");
if (!ref.header) {
LOG(error) << "Header not found";
O2_SIGNPOST_END_WITH_ERROR(histogram_registry, hid, "mergePart", "Header not found.");
return;
}
auto datah = o2::header::get<o2::header::DataHeader*>(ref.header);
if (!datah) {
LOG(error) << "No data header in stack";
O2_SIGNPOST_END_WITH_ERROR(histogram_registry, hid, "mergePart", "No data header in stack");
return;
}

if (!ref.payload) {
LOGP(error, "Payload not found for {}/{}/{}", datah->dataOrigin.as<std::string>(), datah->dataDescription.as<std::string>(), datah->subSpecification);
O2_SIGNPOST_END_WITH_ERROR(histogram_registry, hid, "mergePart", "Payload not found for %{public}s/%{public}s/%d",
datah->dataOrigin.as<std::string>().c_str(), datah->dataDescription.as<std::string>().c_str(),
datah->subSpecification);
return;
}

auto objh = o2::header::get<o2::framework::OutputObjHeader*>(ref.header);
if (!objh) {
LOGP(error, "No output object header in stack of {}/{}/{}", datah->dataOrigin.as<std::string>(), datah->dataDescription.as<std::string>(), datah->subSpecification);
O2_SIGNPOST_END_WITH_ERROR(histogram_registry, hid, "mergePart", "No output object header in stack of %{public}s/%{public}s/%d.",
datah->dataOrigin.as<std::string>().c_str(), datah->dataDescription.as<std::string>().c_str(),
datah->subSpecification);
return;
}

Expand All @@ -300,48 +313,73 @@ AlgorithmSpec AODWriterHelpers::getOutputObjHistWriter(ConfigContext const& ctx)
obj.kind = tm.ReadClass();
tm.SetBufferOffset(0);
tm.ResetMap();
O2_SIGNPOST_ID_GENERATE(did, histogram_registry);
O2_SIGNPOST_START(histogram_registry, did, "initialising root", "Starting deserialization of %{public}s/%{public}s/%d",
datah->dataOrigin.as<std::string>().c_str(), datah->dataDescription.as<std::string>().c_str(),
datah->subSpecification);
if (obj.kind == nullptr) {
LOGP(error, "Cannot read class info from buffer of {}/{}/{}", datah->dataOrigin.as<std::string>(), datah->dataDescription.as<std::string>(), datah->subSpecification);
O2_SIGNPOST_END(histogram_registry, did, "initialising root", "Failed to deserialise");
O2_SIGNPOST_END_WITH_ERROR(histogram_registry, hid, "mergePart", "Cannot read class info from buffer of %{public}s/%{public}s/%d.",
datah->dataOrigin.as<std::string>().c_str(), datah->dataDescription.as<std::string>().c_str(),
datah->subSpecification);
return;
}
O2_SIGNPOST_END(histogram_registry, did, "initialising root", "Done init.");

auto policy = objh->mPolicy;
auto sourceType = objh->mSourceType;
auto hash = objh->mTaskHash;
O2_SIGNPOST_START(histogram_registry, did, "deserialization", "Starting deserialization of %{public}s/%{public}s/%d",
datah->dataOrigin.as<std::string>().c_str(), datah->dataDescription.as<std::string>().c_str(),
datah->subSpecification);

obj.obj = tm.ReadObjectAny(obj.kind);
auto* named = static_cast<TNamed*>(obj.obj);
obj.name = named->GetName();
O2_SIGNPOST_END(histogram_registry, did, "deserialization", "Done deserialization.");
// If we have a folder, we assume the first element of the path
// to be the name of the registry.
if (sourceType == HistogramRegistrySource) {
obj.container = objh->containerName;
} else {
obj.container = obj.name;
}
auto hpos = std::find_if(tskmap.begin(), tskmap.end(), [&](auto&& x) { return x.id == hash; });
if (hpos == tskmap.end()) {
LOG(error) << "No task found for hash " << hash;
O2_SIGNPOST_END_WITH_ERROR(histogram_registry, hid, "mergePart", "No task found for hash %d.", hash);
return;
}
auto taskname = hpos->name;
auto opos = std::find_if(objmap.begin(), objmap.end(), [&](auto&& x) { return x.id == hash; });
if (opos == objmap.end()) {
LOG(error) << "No object list found for task " << taskname << " (hash=" << hash << ")";
O2_SIGNPOST_END_WITH_ERROR(histogram_registry, hid, "mergePart", "No object list found for task %{public}s (hash=%d).",
taskname.c_str(), hash);
return;
}
auto objects = opos->bindings;
if (std::find(objects.begin(), objects.end(), obj.name) == objects.end()) {
LOG(error) << "No object " << obj.name << " in map for task " << taskname;
if (std::find(objects.begin(), objects.end(), obj.container) == objects.end()) {
O2_SIGNPOST_END_WITH_ERROR(histogram_registry, hid, "mergePart", "No container %{public}s in map for task %{public}s.",
obj.container.c_str(), taskname.c_str());
return;
}
auto nameHash = runtime_hash(obj.name.c_str());
InputObjectRoute key{obj.name, nameHash, taskname, hash, policy, sourceType};
auto existing = std::find_if(inputObjects->begin(), inputObjects->end(), [&](auto&& x) { return (x.first.uniqueId == nameHash) && (x.first.taskHash == hash); });
// If it's the first one, we just add it to the list.
O2_SIGNPOST_START(histogram_registry, did, "merging", "Starting merging of %{public}s/%{public}s/%d",
datah->dataOrigin.as<std::string>().c_str(), datah->dataDescription.as<std::string>().c_str(),
datah->subSpecification);
if (existing == inputObjects->end()) {
obj.count = objh->mPipelineSize;
inputObjects->push_back(std::make_pair(key, obj));
inputObjects->emplace_back(key, obj);
existing = inputObjects->end() - 1;
} else {
obj.count = existing->second.count;
// Otherwise, we merge it with the existing one.
auto merger = existing->second.kind->GetMerge();
if (!merger) {
LOG(error) << "Already one unmergeable object found for " << obj.name;
O2_SIGNPOST_END(histogram_registry, did, "merging", "Unabled to merge");
O2_SIGNPOST_END_WITH_ERROR(histogram_registry, hid, "merging", "Already one unmergeable object found for %{public}s", obj.name.c_str());
return;
}
TList coll;
Expand All @@ -353,15 +391,22 @@ AlgorithmSpec AODWriterHelpers::getOutputObjHistWriter(ConfigContext const& ctx)
existing->second.count -= 1;

if (existing->second.count != 0) {
O2_SIGNPOST_END(histogram_registry, did, "merging", "Done partial merging.");
O2_SIGNPOST_END(histogram_registry, hid, "mergePart", "Pipeline lanes still missing.");
return;
}
O2_SIGNPOST_END(histogram_registry, did, "merging", "Done merging.");
// Write the object here.
auto route = existing->first;
auto entry = existing->second;
auto file = ROOTfileNames.find(route.policy);
if (file == ROOTfileNames.end()) {
O2_SIGNPOST_END(histogram_registry, hid, "mergePart", "Not matching any file.");
return;
}
O2_SIGNPOST_START(histogram_registry, did, "writing", "Starting writing of %{public}s/%{public}s/%d",
datah->dataOrigin.as<std::string>().c_str(), datah->dataDescription.as<std::string>().c_str(),
datah->subSpecification);
auto filename = file->second;
if (f[route.policy] == nullptr) {
f[route.policy] = TFile::Open(filename.c_str(), "RECREATE");
Expand All @@ -375,53 +420,53 @@ AlgorithmSpec AODWriterHelpers::getOutputObjHistWriter(ConfigContext const& ctx)
currentFile = filename;
}

// translate the list-structure created by the registry into a directory structure within the file
std::function<void(TList*, TDirectory*)> writeListToFile;
writeListToFile = [&](TList* list, TDirectory* parentDir) {
TIter next(list);
TObject* object = nullptr;
while ((object = next())) {
if (object->InheritsFrom(TList::Class())) {
writeListToFile(static_cast<TList*>(object), parentDir->mkdir(object->GetName(), object->GetName(), true));
// FIXME: handle folders
f[route.policy]->cd("/");
auto* currentDir = f[route.policy]->GetDirectory(currentDirectory.c_str());
// The name contains a path...
int objSize = 0;
if (sourceType == HistogramRegistrySource) {
TDirectory* currentFolder = currentDir;
O2_SIGNPOST_EVENT_EMIT(histogram_registry, hid, "mergePart", "Toplevel folder is %{public}s.",
currentDir->GetName());
std::string objName = entry.name;
auto lastSlash = entry.name.rfind('/');

if (lastSlash != std::string::npos) {
auto dirname = entry.name.substr(0, lastSlash);
objName = entry.name.substr(lastSlash + 1);
currentFolder = currentDir->GetDirectory(dirname.c_str());
if (!currentFolder) {
O2_SIGNPOST_EVENT_EMIT(histogram_registry, hid, "mergePart", "Creating folder %{public}s",
dirname.c_str());
currentFolder = currentDir->mkdir(dirname.c_str(), "", kTRUE);
} else {
int objSize = parentDir->WriteObjectAny(object, object->Class(), object->GetName());
static int maxSizeWritten = 0;
if (objSize > maxSizeWritten) {
auto& monitoring = pc.services().get<Monitoring>();
maxSizeWritten = objSize;
monitoring.send(Metric{fmt::format("{}/{}:{}", object->ClassName(), object->GetName(), objSize), "aod-largest-object-written"}.addTag(tags::Key::Subsystem, tags::Value::DPL));
}
auto* written = list->Remove(object);
delete written;
O2_SIGNPOST_EVENT_EMIT(histogram_registry, hid, "mergePart", "Folder %{public}s already there.",
currentFolder->GetName());
}
}
};

TDirectory* currentDir = f[route.policy]->GetDirectory(currentDirectory.c_str());
if (route.sourceType == OutputObjSourceType::HistogramRegistrySource) {
auto* outputList = static_cast<TList*>(entry.obj);
outputList->SetOwner(false);

// if registry should live in dedicated folder a TNamed object is appended to the list
if (outputList->Last() && outputList->Last()->IsA() == TNamed::Class()) {
delete outputList->Last();
outputList->RemoveLast();
currentDir = currentDir->mkdir(outputList->GetName(), outputList->GetName(), true);
}

writeListToFile(outputList, currentDir);
outputList->SetOwner();
delete outputList;
O2_SIGNPOST_EVENT_EMIT(histogram_registry, hid, "mergePart", "Writing %{public}s of kind %{public}s in %{public}s",
entry.name.c_str(), entry.kind->GetName(), currentDir->GetName());
objSize = currentFolder->WriteObjectAny(entry.obj, entry.kind, objName.c_str());
O2_SIGNPOST_END(histogram_registry, did, "writing", "End writing %{public}s", entry.name.c_str());
delete (TObject*)entry.obj;
entry.obj = nullptr;
} else {
currentDir->WriteObjectAny(entry.obj, entry.kind, entry.name.c_str());
O2_SIGNPOST_EVENT_EMIT(histogram_registry, hid, "mergePart", "Writing %{public}s of kind %{public}s in %{public}s",
entry.name.c_str(), entry.kind->GetName(), currentDir->GetName());
objSize = currentDir->WriteObjectAny(entry.obj, entry.kind, entry.name.c_str());
O2_SIGNPOST_END(histogram_registry, did, "writing", "End writing %{public}s", entry.name.c_str());
delete (TObject*)entry.obj;
entry.obj = nullptr;
}
O2_SIGNPOST_END(histogram_registry, hid, "mergePart", "Done merging object of %d bytes.", objSize);
};
O2_SIGNPOST_ID_GENERATE(rid, histogram_registry);
O2_SIGNPOST_START(histogram_registry, rid, "processParts", "Start merging %zu parts received together.", pc.inputs().getNofParts(0));
for (int pi = 0; pi < pc.inputs().getNofParts(0); ++pi) {
mergePart(pc.inputs().get("x", pi));
}
O2_SIGNPOST_END(histogram_registry, rid, "processParts", "Done histograms in multipart message.");
};
}};
}
Expand Down
6 changes: 5 additions & 1 deletion Framework/Core/include/Framework/AnalysisManagers.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#ifndef FRAMEWORK_ANALYSISMANAGERS_H
#define FRAMEWORK_ANALYSISMANAGERS_H
#include "DataAllocator.h"
#include "Framework/AnalysisHelpers.h"
#include "Framework/DataSpecUtils.h"
#include "Framework/GroupedCombinations.h"
Expand Down Expand Up @@ -247,7 +248,10 @@ template <is_histogram_registry T>
bool postRunOutput(EndOfStreamContext& context, T& hr)
{
auto& deviceSpec = context.services().get<o2::framework::DeviceSpec const>();
context.outputs().snapshot(hr.ref(deviceSpec.inputTimesliceId, deviceSpec.maxInputTimeslices), *(hr.getListOfHistograms()));
auto sendHistos = [deviceSpec, &context](HistogramRegistry const& self, TNamed* obj) mutable {
context.outputs().snapshot(self.ref(deviceSpec.inputTimesliceId, deviceSpec.maxInputTimeslices), *obj);
};
hr.apply(sendHistos);
hr.clean();
return true;
}
Expand Down
14 changes: 5 additions & 9 deletions Framework/Core/include/Framework/HistogramRegistry.h
Original file line number Diff line number Diff line change
Expand Up @@ -173,16 +173,15 @@ class HistogramRegistry
template <typename T>
std::shared_ptr<T> operator()(const HistName& histName);

// Apply @a callback on every single entry in the registry
void apply(std::function<void(HistogramRegistry const&, TNamed* named)> callback) const;
// return the OutputSpec associated to the HistogramRegistry
OutputSpec const spec();

OutputRef ref(uint16_t idx, uint16_t pipelineSize);
OutputRef ref(uint16_t idx, uint16_t pipelineSize) const;

void setHash(uint32_t hash);

/// returns the list of histograms, properly sorted for writing.
TList* getListOfHistograms();

/// deletes all the histograms from the registry
void clean();

Expand Down Expand Up @@ -220,16 +219,13 @@ class HistogramRegistry

// helper function to find the histogram position in the registry
template <typename T>
uint32_t getHistIndex(const T& histName);
uint32_t getHistIndex(const T& histName) const;

constexpr uint32_t imask(uint32_t i) const
{
return i & REGISTRY_BITMASK;
}

// helper function to create resp. find the subList defined by path
TList* getSubList(TList* list, std::deque<std::string>& path);

// helper function to split user defined path/to/hist/name string
std::deque<std::string> splitPath(const std::string& pathAndNameUser);

Expand Down Expand Up @@ -431,7 +427,7 @@ std::shared_ptr<T> HistogramRegistry::operator()(const HistName& histName)
}

template <typename T>
uint32_t HistogramRegistry::getHistIndex(const T& histName)
uint32_t HistogramRegistry::getHistIndex(const T& histName) const
{
if (O2_BUILTIN_LIKELY(histName.hash == mRegistryKey[histName.idx])) {
return histName.idx;
Expand Down
2 changes: 2 additions & 0 deletions Framework/Core/include/Framework/OutputObjHeader.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ struct OutputObjHeader : public BaseHeader {
uint32_t mTaskHash;
uint16_t mPipelineIndex = 0;
uint16_t mPipelineSize = 1;
// Name of the actual container for the object, e.g. the HistogramRegistry name
char containerName[64] = {0};

constexpr OutputObjHeader()
: BaseHeader(sizeof(OutputObjHeader), sHeaderType, sSerializationMethod, sVersion),
Expand Down
Loading