Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions DataFormats/Headers/include/Headers/DataHeader.h
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,8 @@ struct BaseHeader {
uint32_t flags;
struct {
uint32_t flagsNextHeader : 1, // do we have a next header after this one?
flagsReserved : 15, // reserved for future use
flagsReserved : 14, // reserved for future use. MUST be filled with 0s.
flagsDisabled : 1, // header should be ignored if this is 1
flagsDerivedHeader : 16; // reserved for usage by the derived header
};
};
Expand Down Expand Up @@ -467,15 +468,20 @@ auto get(const std::byte* buffer, size_t /*len*/ = 0)
// otherwise, we keep the code related to the exception outside the header file.
// Note: Can not check on size because the O2 data model requires variable size headers
// to be supported.
if (current->sanityCheck(HeaderValueType::sVersion)) {
if (current->sanityCheck(HeaderValueType::sVersion) && current->flagsDisabled == 0) {
// If the first header matches and it's enabled, we return it
// otherwise we look for more.
return reinterpret_cast<HeaderConstPtrType>(current);
}
}
auto* prev = current;
while ((current = current->next())) {
prev = current;
if (current->description == HeaderValueType::sHeaderType) {
if (current->sanityCheck(HeaderValueType::sVersion)) {
// This is needed to allow disabling some headers from being picked up
// even if they are matching. This is handy to have a quick
// way to disable a sub headers without having to drop them.
if (current->sanityCheck(HeaderValueType::sVersion) && current->flagsDisabled == 0) {
return reinterpret_cast<HeaderConstPtrType>(current);
}
}
Expand Down
1 change: 1 addition & 0 deletions Framework/Core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ add_executable(o2-test-framework-core
test/test_FairMQOptionsRetriever.cxx
test/test_FairMQResizableBuffer.cxx
test/test_FairMQ.cxx
test/test_ForwardInputs.cxx
test/test_FrameworkDataFlowToDDS.cxx
test/test_FrameworkDataFlowToO2Control.cxx
test/test_Graphviz.cxx
Expand Down
11 changes: 10 additions & 1 deletion Framework/Core/include/Framework/DataProcessingHelpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
#define O2_FRAMEWORK_DATAPROCESSINGHELPERS_H_

#include <cstddef>
#include "Framework/TimesliceSlot.h"
#include "Framework/TimesliceIndex.h"
#include <fairmq/FwdDecls.h>
#include <vector>

namespace o2::framework
{
Expand All @@ -23,6 +27,9 @@ struct OutputChannelSpec;
struct OutputChannelState;
struct ProcessingPolicies;
struct DeviceSpec;
struct FairMQDeviceProxy;
struct MessageSet;
struct ChannelIndex;
enum struct StreamingState;
enum struct TransitionHandlingState;

Expand All @@ -45,7 +52,9 @@ struct DataProcessingHelpers {
static bool hasOnlyGenerated(DeviceSpec const& spec);
/// starts the EoS timers and returns the new TransitionHandlingState in case as new state is requested
static TransitionHandlingState updateStateTransition(ServiceRegistryRef const& ref, ProcessingPolicies const& policies);
/// Helper to route messages for forwarding
static std::vector<fair::mq::Parts> routeForwardedMessages(FairMQDeviceProxy& proxy, TimesliceSlot slot, std::vector<MessageSet>& currentSetOfInputs,
TimesliceIndex::OldestOutputInfo oldestTimeslice, bool copy, bool consume = true);
};

} // namespace o2::framework
#endif // O2_FRAMEWORK_DATAPROCESSINGHELPERS_H_
132 changes: 2 additions & 130 deletions Framework/Core/src/DataProcessingDevice.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -550,76 +550,6 @@ void on_signal_callback(uv_signal_t* handle, int signum)
O2_SIGNPOST_END(device, sid, "signal_state", "Done processing signals.");
}

static auto toBeForwardedHeader = [](void* header) -> bool {
// If is now possible that the record is not complete when
// we forward it, because of a custom completion policy.
// this means that we need to skip the empty entries in the
// record for being forwarded.
if (header == nullptr) {
return false;
}
auto sih = o2::header::get<SourceInfoHeader*>(header);
if (sih) {
return false;
}

auto dih = o2::header::get<DomainInfoHeader*>(header);
if (dih) {
return false;
}

auto dh = o2::header::get<DataHeader*>(header);
if (!dh) {
return false;
}
auto dph = o2::header::get<DataProcessingHeader*>(header);
if (!dph) {
return false;
}
return true;
};

static auto toBeforwardedMessageSet = [](std::vector<ChannelIndex>& cachedForwardingChoices,
FairMQDeviceProxy& proxy,
std::unique_ptr<fair::mq::Message>& header,
std::unique_ptr<fair::mq::Message>& payload,
size_t total,
bool consume) {
if (header.get() == nullptr) {
// Missing an header is not an error anymore.
// it simply means that we did not receive the
// given input, but we were asked to
// consume existing, so we skip it.
return false;
}
if (payload.get() == nullptr && consume == true) {
// If the payload is not there, it means we already
// processed it with ConsumeExisiting. Therefore we
// need to do something only if this is the last consume.
header.reset(nullptr);
return false;
}

auto fdph = o2::header::get<DataProcessingHeader*>(header->GetData());
if (fdph == nullptr) {
LOG(error) << "Data is missing DataProcessingHeader";
return false;
}
auto fdh = o2::header::get<DataHeader*>(header->GetData());
if (fdh == nullptr) {
LOG(error) << "Data is missing DataHeader";
return false;
}

// We need to find the forward route only for the first
// part of a split payload. All the others will use the same.
// but always check if we have a sequence of multiple payloads
if (fdh->splitPayloadIndex == 0 || fdh->splitPayloadParts <= 1 || total > 1) {
proxy.getMatchingForwardChannelIndexes(cachedForwardingChoices, *fdh, fdph->startTime);
}
return cachedForwardingChoices.empty() == false;
};

struct DecongestionContext {
ServiceRegistryRef ref;
TimesliceIndex::OldestOutputInfo oldestTimeslice;
Expand Down Expand Up @@ -660,67 +590,9 @@ auto decongestionCallbackLate = [](AsyncTask& task, size_t aid) -> void {
static auto forwardInputs = [](ServiceRegistryRef registry, TimesliceSlot slot, std::vector<MessageSet>& currentSetOfInputs,
TimesliceIndex::OldestOutputInfo oldestTimeslice, bool copy, bool consume = true) {
auto& proxy = registry.get<FairMQDeviceProxy>();
// we collect all messages per forward in a map and send them together
std::vector<fair::mq::Parts> forwardedParts;
forwardedParts.resize(proxy.getNumForwards());
std::vector<ChannelIndex> cachedForwardingChoices{};
O2_SIGNPOST_ID_GENERATE(sid, forwarding);
O2_SIGNPOST_START(forwarding, sid, "forwardInputs", "Starting forwarding for slot %zu with oldestTimeslice %zu %{public}s%{public}s%{public}s",
slot.index, oldestTimeslice.timeslice.value, copy ? "with copy" : "", copy && consume ? " and " : "", consume ? "with consume" : "");

for (size_t ii = 0, ie = currentSetOfInputs.size(); ii < ie; ++ii) {
auto& messageSet = currentSetOfInputs[ii];
// In case the messageSet is empty, there is nothing to be done.
if (messageSet.size() == 0) {
continue;
}
if (!toBeForwardedHeader(messageSet.header(0)->GetData())) {
continue;
}
cachedForwardingChoices.clear();

for (size_t pi = 0; pi < currentSetOfInputs[ii].size(); ++pi) {
auto& messageSet = currentSetOfInputs[ii];
auto& header = messageSet.header(pi);
auto& payload = messageSet.payload(pi);
auto total = messageSet.getNumberOfPayloads(pi);
auto forwardedParts = DataProcessingHelpers::routeForwardedMessages(proxy, slot, currentSetOfInputs, oldestTimeslice, copy);

if (!toBeforwardedMessageSet(cachedForwardingChoices, proxy, header, payload, total, consume)) {
continue;
}

// In case of more than one forward route, we need to copy the message.
// This will eventually use the same mamory if running with the same backend.
if (cachedForwardingChoices.size() > 1) {
copy = true;
}
auto* dh = o2::header::get<DataHeader*>(header->GetData());
auto* dph = o2::header::get<DataProcessingHeader*>(header->GetData());

if (copy) {
for (auto& cachedForwardingChoice : cachedForwardingChoices) {
auto&& newHeader = header->GetTransport()->CreateMessage();
O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding a copy of %{public}s to route %d.",
fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), cachedForwardingChoice.value);
newHeader->Copy(*header);
forwardedParts[cachedForwardingChoice.value].AddPart(std::move(newHeader));

for (size_t payloadIndex = 0; payloadIndex < messageSet.getNumberOfPayloads(pi); ++payloadIndex) {
auto&& newPayload = header->GetTransport()->CreateMessage();
newPayload->Copy(*messageSet.payload(pi, payloadIndex));
forwardedParts[cachedForwardingChoice.value].AddPart(std::move(newPayload));
}
}
} else {
O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding %{public}s to route %d.",
fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), cachedForwardingChoices.back().value);
forwardedParts[cachedForwardingChoices.back().value].AddPart(std::move(messageSet.header(pi)));
for (size_t payloadIndex = 0; payloadIndex < messageSet.getNumberOfPayloads(pi); ++payloadIndex) {
forwardedParts[cachedForwardingChoices.back().value].AddPart(std::move(messageSet.payload(pi, payloadIndex)));
}
}
}
}
O2_SIGNPOST_ID_GENERATE(sid, forwarding);
O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding %zu messages", forwardedParts.size());
for (int fi = 0; fi < proxy.getNumForwardChannels(); fi++) {
if (forwardedParts[fi].Size() == 0) {
Expand Down
129 changes: 129 additions & 0 deletions Framework/Core/src/DataProcessingHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "MemoryResources/MemoryResources.h"
#include "Framework/FairMQDeviceProxy.h"
#include "Headers/DataHeader.h"
#include "Headers/DataHeaderHelpers.h"
#include "Headers/Stack.h"
#include "Framework/Logger.h"
#include "Framework/SendingPolicy.h"
Expand All @@ -31,6 +32,8 @@
#include "Framework/ControlService.h"
#include "Framework/DataProcessingContext.h"
#include "Framework/DeviceStateEnums.h"
#include "Headers/DataHeader.h"
#include "Framework/DataProcessingHeader.h"

#include <fairmq/Device.h>
#include <fairmq/Channel.h>
Expand All @@ -41,6 +44,7 @@
O2_DECLARE_DYNAMIC_LOG(device);
// Stream which keeps track of the calibration lifetime logic
O2_DECLARE_DYNAMIC_LOG(calibration);
O2_DECLARE_DYNAMIC_LOG(forwarding);

namespace o2::framework
{
Expand Down Expand Up @@ -217,4 +221,129 @@ TransitionHandlingState DataProcessingHelpers::updateStateTransition(ServiceRegi
}
}

auto DataProcessingHelpers::routeForwardedMessages(FairMQDeviceProxy& proxy, TimesliceSlot slot,
std::vector<MessageSet>& currentSetOfInputs, TimesliceIndex::OldestOutputInfo oldestTimeslice,
const bool copyByDefault, bool consume) -> std::vector<fair::mq::Parts>
{
// we collect all messages per forward in a map and send them together
std::vector<fair::mq::Parts> forwardedParts;
forwardedParts.resize(proxy.getNumForwards());
std::vector<ChannelIndex> forwardingChoices{};
O2_SIGNPOST_ID_GENERATE(sid, forwarding);
O2_SIGNPOST_START(forwarding, sid, "forwardInputs", "Starting forwarding for slot %zu with oldestTimeslice %zu %{public}s%{public}s%{public}s",
slot.index, oldestTimeslice.timeslice.value, copyByDefault ? "with copy" : "", copyByDefault && consume ? " and " : "", consume ? "with consume" : "");

for (size_t ii = 0, ie = currentSetOfInputs.size(); ii < ie; ++ii) {
auto& messageSet = currentSetOfInputs[ii];

for (size_t pi = 0; pi < messageSet.size(); ++pi) {
auto& header = messageSet.header(pi);

// If is now possible that the record is not complete when
// we forward it, because of a custom completion policy.
// this means that we need to skip the empty entries in the
// record for being forwarded.
if (header->GetData() == nullptr) {
continue;
}

auto dph = o2::header::get<DataProcessingHeader*>(header->GetData());
auto dh = o2::header::get<o2::header::DataHeader*>(header->GetData());

if (dph == nullptr || dh == nullptr) {
// Complain only if this is not an out-of-band message
auto dih = o2::header::get<DomainInfoHeader*>(header->GetData());
auto sih = o2::header::get<SourceInfoHeader*>(header->GetData());
if (dih == nullptr || sih == nullptr) {
LOGP(error, "Data is missing {}{}{}",
dph ? "DataProcessingHeader" : "", dph || dh ? "and" : "", dh ? "DataHeader" : "");
}
continue;
}

auto& payload = messageSet.payload(pi);

if (payload.get() == nullptr && consume == true) {
// If the payload is not there, it means we already
// processed it with ConsumeExisiting. Therefore we
// need to do something only if this is the last consume.
header.reset(nullptr);
continue;
}

// We need to find the forward route only for the first
// part of a split payload. All the others will use the same.
// Therefore, we reset and recompute the forwarding choice:
//
// - If this is the first payload of a [header0][payload0][header0][payload1] sequence,
// which is actually always created and handled together
// - If the message is not a multipart (splitPayloadParts 0) or has only one part
// - If it's a message of the kind [header0][payload1][payload2][payload3]... and therefore
// we will already use the same choice in the for loop below.
if (dh->splitPayloadIndex == 0 || dh->splitPayloadParts <= 1 || messageSet.getNumberOfPayloads(pi) > 0) {
forwardingChoices.clear();
proxy.getMatchingForwardChannelIndexes(forwardingChoices, *dh, dph->startTime);
}

if (forwardingChoices.empty()) {
// Nothing to forward go to the next messageset
continue;
}

// In case of more than one forward route, we need to copy the message.
// This will eventually use the same memory if running with the same backend.
if (copyByDefault || forwardingChoices.size()) {
for (auto& choice : forwardingChoices) {
auto&& newHeader = header->GetTransport()->CreateMessage();
O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding a copy of %{public}s to route %d.",
fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), choice.value);
newHeader->Copy(*header);
auto dih = o2::header::get<DomainInfoHeader*>(newHeader->GetData());
if (dih) {
const_cast<DomainInfoHeader*>(dih)->flagsDisabled = 1;
}
auto sih = o2::header::get<SourceInfoHeader*>(newHeader->GetData());
if (sih) {
const_cast<SourceInfoHeader*>(sih)->flagsDisabled = 1;
}
forwardedParts[choice.value].AddPart(std::move(newHeader));

for (size_t payloadIndex = 0; payloadIndex < messageSet.getNumberOfPayloads(pi); ++payloadIndex) {
auto&& newPayload = header->GetTransport()->CreateMessage();
newPayload->Copy(*messageSet.payload(pi, payloadIndex));
forwardedParts[choice.value].AddPart(std::move(newPayload));
}
}
} else {
O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding %{public}s to route %d.",
fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), forwardingChoices.back().value);
auto dih = o2::header::get<DomainInfoHeader*>(messageSet.header(pi)->GetData());
auto sih = o2::header::get<SourceInfoHeader*>(messageSet.header(pi)->GetData());
// We need to copy the header if it has extra timeframe accounting
// information attached to it, so that we can disable it without having
// a race condition in shared memory.
if (dih || sih) {
auto&& newHeader = header->GetTransport()->CreateMessage();
newHeader->Copy(*header);
auto dih = o2::header::get<DomainInfoHeader*>(newHeader->GetData());
if (dih) {
const_cast<DomainInfoHeader*>(dih)->flagsDisabled = 1;
}
auto sih = o2::header::get<SourceInfoHeader*>(newHeader->GetData());
if (sih) {
const_cast<SourceInfoHeader*>(sih)->flagsDisabled = 1;
}
forwardedParts[forwardingChoices.back().value].AddPart(std::move(newHeader));
} else {
forwardedParts[forwardingChoices.back().value].AddPart(std::move(messageSet.header(pi)));
}
for (size_t payloadIndex = 0; payloadIndex < messageSet.getNumberOfPayloads(pi); ++payloadIndex) {
forwardedParts[forwardingChoices.back().value].AddPart(std::move(messageSet.payload(pi, payloadIndex)));
}
}
}
}
return forwardedParts;
};

} // namespace o2::framework
Loading