diff --git a/Framework/Core/include/Framework/FairMQDeviceProxy.h b/Framework/Core/include/Framework/FairMQDeviceProxy.h index 46b35f54f21ba..ab0d094c18486 100644 --- a/Framework/Core/include/Framework/FairMQDeviceProxy.h +++ b/Framework/Core/include/Framework/FairMQDeviceProxy.h @@ -40,6 +40,8 @@ class FairMQDeviceProxy void bind(std::vector const& outputs, std::vector const& inputs, std::vector const& forwards, fair::mq::Device& device); + /// Retrieve the transport associated to a given route. + [[nodiscard]] OutputRoute const& getOutputRoute(RouteIndex routeIndex) const { return mOutputs.at(routeIndex.value); } /// Retrieve the transport associated to a given route. [[nodiscard]] fair::mq::TransportFactory* getOutputTransport(RouteIndex routeIndex) const; /// Retrieve the transport associated to a given route. diff --git a/Framework/Core/src/DataAllocator.cxx b/Framework/Core/src/DataAllocator.cxx index ca35089fdfaab..4b559ef26191e 100644 --- a/Framework/Core/src/DataAllocator.cxx +++ b/Framework/Core/src/DataAllocator.cxx @@ -9,6 +9,7 @@ // granted to it by virtue of its status as an Intergovernmental Organization // or submit itself to any jurisdiction. #include "Framework/CompilerBuiltins.h" +#include "Framework/Lifetime.h" #include "Framework/TableBuilder.h" #include "Framework/TableTreeHelpers.h" #include "Framework/DataAllocator.h" @@ -121,8 +122,12 @@ fair::mq::MessagePtr DataAllocator::headerMessageFromOutput(Output const& spec, dh.runNumber = timingInfo.runNumber; DataProcessingHeader dph{timingInfo.timeslice, 1, timingInfo.creation}; - static_cast(dph).flagsDerivedHeader |= timingInfo.keepAtEndOfStream ? DataProcessingHeader::KEEP_AT_EOS_FLAG : 0; auto& proxy = mRegistry.get(); + auto lifetime = proxy.getOutputRoute(routeIndex).matcher.lifetime; + static_cast(dph).flagsDerivedHeader |= timingInfo.keepAtEndOfStream ? DataProcessingHeader::KEEP_AT_EOS_FLAG : 0; + // Messages associated to sporatic output we always keep, since they are most likely histograms / condition + // objects which need to be kept at the end of stream. + static_cast(dph).flagsDerivedHeader |= (lifetime == Lifetime::Sporadic) ? DataProcessingHeader::KEEP_AT_EOS_FLAG : 0; auto* transport = proxy.getOutputTransport(routeIndex); auto channelAlloc = o2::pmr::getTransportAllocator(transport);