Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,9 @@ struct CompletionPolicyHelpers {
/// When any of the parts of the record have been received, consume them.
static CompletionPolicy consumeWhenAny(const char* name, CompletionPolicy::Matcher matcher);

#if __has_include(<fairmq/shmem/Message.h>)
/// When any of the parts which has arrived has a refcount of 1.
static CompletionPolicy consumeWhenAnyZeroCount(const char* name, CompletionPolicy::Matcher matcher);
#endif

/// Default matcher applies for all devices
static CompletionPolicy consumeWhenAny(CompletionPolicy::Matcher matcher = [](auto const&) -> bool { return true; })
{
Expand Down
4 changes: 0 additions & 4 deletions Framework/Core/src/CompletionPolicy.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,7 @@ std::vector<CompletionPolicy>
{
return {
CompletionPolicyHelpers::consumeWhenAllOrdered("internal-dpl-aod-writer"),
#if __has_include(<fairmq/shmem/Message.h>)
CompletionPolicyHelpers::consumeWhenAnyZeroCount("internal-dpl-injected-dummy-sink", [](DeviceSpec const& s) { return s.name.find("internal-dpl-injected-dummy-sink") != std::string::npos; }),
#else
CompletionPolicyHelpers::consumeWhenAny("internal-dpl-injected-dummy-sink", [](DeviceSpec const& s) { return s.name.find("internal-dpl-injected-dummy-sink") != std::string::npos; }),
#endif
CompletionPolicyHelpers::consumeWhenAll()};
}

Expand Down
4 changes: 0 additions & 4 deletions Framework/Core/src/CompletionPolicyHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@
#include "Framework/TimingInfo.h"
#include "DecongestionService.h"
#include "Framework/Signpost.h"
#if __has_include(<fairmq/shmem/Message.h>)
#include <fairmq/shmem/Message.h>
#endif

#include <cassert>
#include <regex>
Expand Down Expand Up @@ -252,7 +250,6 @@ CompletionPolicy CompletionPolicyHelpers::consumeExistingWhenAny(const char* nam
}};
}

#if __has_include(<fairmq/shmem/Message.h>)
CompletionPolicy CompletionPolicyHelpers::consumeWhenAnyZeroCount(const char* name, CompletionPolicy::Matcher matcher)
{
auto callback = [](InputSpan const& inputs, std::vector<InputSpec> const&, ServiceRegistryRef& ref) -> CompletionPolicy::CompletionOp {
Expand All @@ -265,7 +262,6 @@ CompletionPolicy CompletionPolicyHelpers::consumeWhenAnyZeroCount(const char* na
};
return CompletionPolicy{name, matcher, callback, false};
}
#endif

CompletionPolicy CompletionPolicyHelpers::consumeWhenAny(const char* name, CompletionPolicy::Matcher matcher)
{
Expand Down
14 changes: 0 additions & 14 deletions Framework/Core/src/DataProcessingDevice.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,7 @@
#include <fairmq/Parts.h>
#include <fairmq/Socket.h>
#include <fairmq/ProgOptions.h>
#if __has_include(<fairmq/shmem/Message.h>)
#include <fairmq/shmem/Message.h>
#endif
#include <Configuration/ConfigurationInterface.h>
#include <Configuration/ConfigurationFactory.h>
#include <Monitoring/Monitoring.h>
Expand Down Expand Up @@ -1046,14 +1044,6 @@ void DataProcessingDevice::fillContext(DataProcessorContext& context, DeviceCont
if (forwarded.matcher.lifetime != Lifetime::Condition) {
onlyConditions = false;
}
#if !__has_include(<fairmq/shmem/Message.h>)
if (strncmp(DataSpecUtils::asConcreteOrigin(forwarded.matcher).str, "AOD", 3) == 0) {
context.canForwardEarly = false;
overriddenEarlyForward = true;
LOG(detail) << "Cannot forward early because of AOD input: " << DataSpecUtils::describe(forwarded.matcher);
break;
}
#endif
if (DataSpecUtils::partialMatch(forwarded.matcher, o2::header::DataDescription{"RAWDATA"}) && deviceContext.processingPolicies.earlyForward == EarlyForwardPolicy::NORAW) {
context.canForwardEarly = false;
overriddenEarlyForward = true;
Expand Down Expand Up @@ -2058,14 +2048,10 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
auto nofPartsGetter = [&currentSetOfInputs](size_t i) -> size_t {
return currentSetOfInputs[i].getNumberOfPairs();
};
#if __has_include(<fairmq/shmem/Message.h>)
auto refCountGetter = [&currentSetOfInputs](size_t idx) -> int {
auto& header = static_cast<const fair::mq::shmem::Message&>(*currentSetOfInputs[idx].header(0));
return header.GetRefCount();
};
#else
std::function<int(size_t)> refCountGetter = nullptr;
#endif
return InputSpan{getter, nofPartsGetter, refCountGetter, currentSetOfInputs.size()};
};

Expand Down
10 changes: 0 additions & 10 deletions Framework/Core/src/DataRelayer.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,7 @@
#include <fairlogger/Logger.h>
#include <fairmq/Channel.h>
#include <functional>
#if __has_include(<fairmq/shmem/Message.h>)
#include <fairmq/shmem/Message.h>
#endif
#include <fmt/format.h>
#include <fmt/ostream.h>
#include <span>
Expand Down Expand Up @@ -215,14 +213,10 @@ DataRelayer::ActivityStats DataRelayer::processDanglingInputs(std::vector<Expira
auto nPartsGetter = [&partial](size_t idx) {
return partial[idx].size();
};
#if __has_include(<fairmq/shmem/Message.h>)
auto refCountGetter = [&partial](size_t idx) -> int {
auto& header = static_cast<const fair::mq::shmem::Message&>(*partial[idx].header(0));
return header.GetRefCount();
};
#else
std::function<int(size_t)> refCountGetter = nullptr;
#endif
InputSpan span{getter, nPartsGetter, refCountGetter, static_cast<size_t>(partial.size())};
// Setup the input span

Expand Down Expand Up @@ -781,14 +775,10 @@ void DataRelayer::getReadyToProcess(std::vector<DataRelayer::RecordAction>& comp
auto nPartsGetter = [&partial](size_t idx) {
return partial[idx].size();
};
#if __has_include(<fairmq/shmem/Message.h>)
auto refCountGetter = [&partial](size_t idx) -> int {
auto& header = static_cast<const fair::mq::shmem::Message&>(*partial[idx].header(0));
return header.GetRefCount();
};
#else
std::function<int(size_t)> refCountGetter = nullptr;
#endif
InputSpan span{getter, nPartsGetter, refCountGetter, static_cast<size_t>(partial.size())};
CompletionPolicy::CompletionOp action = mCompletionPolicy.callbackFull(span, mInputs, mContext);

Expand Down