diff --git a/Framework/Core/src/ExternalFairMQDeviceProxy.cxx b/Framework/Core/src/ExternalFairMQDeviceProxy.cxx index 449beb0cb8c0b..e67e484f7faf5 100644 --- a/Framework/Core/src/ExternalFairMQDeviceProxy.cxx +++ b/Framework/Core/src/ExternalFairMQDeviceProxy.cxx @@ -397,6 +397,11 @@ void injectMissingData(fair::mq::Device& device, fair::mq::Parts& parts, std::ve } std::string missing = ""; bool showAlarm = false; + uint32_t runNumber = 0; + try { + runNumber = strtoul(device.fConfig->GetProperty("runNumber", "").c_str(), nullptr, 10); + } catch (...) { + } for (auto mi : unmatchedDescriptions) { auto& spec = routes[mi].matcher; missing += " " + DataSpecUtils::describe(spec); @@ -412,6 +417,7 @@ void injectMissingData(fair::mq::Device& device, fair::mq::Parts& parts, std::ve dh.dataDescription = concrete.description; dh.subSpecification = *subSpec; dh.payloadSize = 0; + dh.runNumber = runNumber; dh.splitPayloadParts = 0; dh.splitPayloadIndex = 0; dh.payloadSerializationMethod = header::gSerializationMethodNone; @@ -504,7 +510,8 @@ InjectorFunction dplModelAdaptor(std::vector const& filterSpecs, DPL LOG(error) << "unexpected nullptr found. Skipping message pair."; continue; } - const auto dh = o2::header::get(parts.At(msgidx)->GetData()); + auto* header = parts.At(msgidx)->GetData(); + const auto dh = o2::header::get(header); if (!dh) { LOG(error) << "data on input " << msgidx << " does not follow the O2 data model, DataHeader missing"; if (msgidx > 0) { @@ -512,7 +519,7 @@ InjectorFunction dplModelAdaptor(std::vector const& filterSpecs, DPL } continue; } - auto dph = o2::header::get(parts.At(msgidx)->GetData()); + auto dph = o2::header::get(header); if (!dph) { LOG(error) << "data on input " << msgidx << " does not follow the O2 data model, DataProcessingHeader missing"; continue; @@ -527,7 +534,7 @@ InjectorFunction dplModelAdaptor(std::vector const& filterSpecs, DPL timingInfo.runNumber = dh->runNumber; timingInfo.tfCounter = dh->tfCounter; LOG(debug) << msgidx << ": " << DataSpecUtils::describe(OutputSpec{dh->dataOrigin, dh->dataDescription, dh->subSpecification}) << " part " << dh->splitPayloadIndex << " of " << dh->splitPayloadParts << " payload " << parts.At(msgidx + 1)->GetSize(); - if (dh->runNumber == 0 || dh->tfCounter == 0 || (fmqRunNumber > 0 && fmqRunNumber != dh->runNumber)) { + if (dh->runNumber == 0 || (dh->tfCounter == 0 && o2::header::get(header) == nullptr) || (fmqRunNumber > 0 && fmqRunNumber != dh->runNumber)) { LOG(error) << "INVALID runNumber / tfCounter: runNumber " << dh->runNumber << ", tfCounter " << dh->tfCounter << ", FMQ runNumber " << fmqRunNumber << " for msgidx " << msgidx << ": " << DataSpecUtils::describe(OutputSpec{dh->dataOrigin, dh->dataDescription, dh->subSpecification}) << " part " << dh->splitPayloadIndex << " of " << dh->splitPayloadParts << " payload " << parts.At(msgidx + 1)->GetSize(); @@ -623,6 +630,11 @@ InjectorFunction incrementalConverter(OutputSpec const& spec, o2::header::Serial auto timesliceId = std::make_shared(startTime); return [timesliceId, spec, step, method](TimingInfo&, ServiceRegistryRef const& services, fair::mq::Parts& parts, ChannelRetriever channelRetriever, size_t newTimesliceId, bool&) { auto* device = services.get().device(); + uint32_t runNumber = 0; + try { + runNumber = strtoul(device->fConfig->GetProperty("runNumber", "").c_str(), nullptr, 10); + } catch (...) { + } // We iterate on all the parts and we send them two by two, // adding the appropriate O2 header. for (int i = 0; i < parts.Size(); ++i) { @@ -635,6 +647,7 @@ InjectorFunction incrementalConverter(OutputSpec const& spec, o2::header::Serial dh.dataDescription = matcher.description; dh.subSpecification = matcher.subSpec; dh.payloadSize = parts.At(i)->GetSize(); + dh.runNumber = runNumber; DataProcessingHeader dph{newTimesliceId, 0}; if (*timesliceId != newTimesliceId) { @@ -977,11 +990,18 @@ DataProcessorSpec specifyFairMQDeviceOutputProxy(char const* name, if (channelName != outputChannelName) { continue; } + + uint32_t runNumber = 0; + try { + runNumber = strtoul(device->fConfig->GetProperty("runNumber", "").c_str(), nullptr, 10); + } catch (...) { + } DataHeader dh; dh.dataOrigin = "DPL"; dh.dataDescription = "EOS"; dh.subSpecification = 0; dh.payloadSize = 0; + dh.runNumber = runNumber; dh.payloadSerializationMethod = o2::header::gSerializationMethodNone; dh.tfCounter = 0; dh.firstTForbit = 0; @@ -1091,12 +1111,18 @@ DataProcessorSpec specifyFairMQDeviceMultiOutputProxy(char const* name, if (!checkChannel(channelName)) { continue; } + uint32_t runNumber = 0; + try { + runNumber = strtoul(device->fConfig->GetProperty("runNumber", "").c_str(), nullptr, 10); + } catch (...) { + } DataHeader dh; dh.dataOrigin = "DPL"; dh.dataDescription = "EOS"; dh.subSpecification = 0; dh.payloadSize = 0; dh.payloadSerializationMethod = o2::header::gSerializationMethodNone; + dh.runNumber = runNumber; dh.tfCounter = 0; dh.firstTForbit = 0; SourceInfoHeader sih;