From 238bc3eadcf377a9ed26bf8b6ed7fadd4fe8ed3b Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Fri, 4 Jul 2025 09:30:00 +0200 Subject: [PATCH] DPL: add a poller to check for the metric-feedback channel In the AOD reader it might happen that the memory rate limiting prevents proper handling of the messages coming on the metric-feedback channel. This obviates to the issue by adding an explicit poller for such channel. While doing so, I also had to change the rate limiter to account for the extra polling information, to avoid having a busy loop. --- Framework/Core/src/DataProcessingDevice.cxx | 9 ++++- Framework/Core/src/RateLimiter.cxx | 45 +++++++++++++++------ 2 files changed, 40 insertions(+), 14 deletions(-) diff --git a/Framework/Core/src/DataProcessingDevice.cxx b/Framework/Core/src/DataProcessingDevice.cxx index ba3fc2cd1bedd..d55af431ff239 100644 --- a/Framework/Core/src/DataProcessingDevice.cxx +++ b/Framework/Core/src/DataProcessingDevice.cxx @@ -851,8 +851,13 @@ void DataProcessingDevice::initPollers() continue; } - if (channelName.rfind("from_", 0) != 0) { - LOGP(detail, "{} is not a DPL socket. Not polling.", channelName); + if (channelName.rfind("from_", 0) != 0 && channelName != "metric-feedback") { + LOGP(detail, "{} is not a DPL input socket. Not polling.", channelName); + continue; + } + + if (channelName == "metric-feedback" && spec.name.rfind("internal-dpl-aod-reader", 0) != 0) { + LOGP(detail, "{} is not a DPL input socket. Not polling.", channelName); continue; } diff --git a/Framework/Core/src/RateLimiter.cxx b/Framework/Core/src/RateLimiter.cxx index e0a320c8b9c9c..98609c15675a7 100644 --- a/Framework/Core/src/RateLimiter.cxx +++ b/Framework/Core/src/RateLimiter.cxx @@ -18,10 +18,11 @@ #include "Framework/DeviceContext.h" #include "Framework/Signpost.h" +#include #include -#include #include #include +#include #include #include @@ -32,15 +33,31 @@ using namespace o2::framework; int RateLimiter::check(ProcessingContext& ctx, int maxInFlight, size_t minSHM) { O2_SIGNPOST_ID_GENERATE(sid, rate_limiting); + // Feature is not enabled. Nothing to do. if (!maxInFlight && !minSHM) { return 0; } - auto device = ctx.services().get().device(); - auto& deviceState = ctx.services().get(); - if (maxInFlight && device->GetChannels().count("metric-feedback")) { + // Let's check for the metric channel. + InputChannelInfo* channelInfo = nullptr; + for (auto& info : ctx.services().get().inputChannelInfos) { + if (info.channel->GetName() == "metric-feedback") { + channelInfo = &info; + } + } + if (!channelInfo) { + return 0; + } + // No new data on channel. Nothing to do. + if (!channelInfo->readPolled) { + O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "timeframe_ratelimit", + "Socket not polled. No need to check."); + + return 0; + } + if (maxInFlight && channelInfo) { auto& dtc = ctx.services().get(); - const auto& device = ctx.services().get().device(); const auto& deviceContext = ctx.services().get(); + auto& proxy = ctx.services().get(); bool timeout = deviceContext.exitTransitionTimeout; bool timeoutForMessage = dtc.deploymentMode == DeploymentMode::OnlineDDS || dtc.deploymentMode == DeploymentMode::OnlineECS; bool waitMessage = false; @@ -55,17 +72,17 @@ int RateLimiter::check(ProcessingContext& ctx, int maxInFlight, size_t minSHM) maxInFlight, mSentTimeframes, mConsumedTimeframes); } else { O2_SIGNPOST_EVENT_EMIT_INFO(rate_limiting, sid, "timeframe_ratelimit", - "Maximum number of TF in flight reached (%d: published %llu - finished %llu), waiting", - maxInFlight, mSentTimeframes, mConsumedTimeframes); + "Maximum number of TF in flight reached (%d: published %llu - finished %llu), waiting", + maxInFlight, mSentTimeframes, mConsumedTimeframes); } waitMessage = true; timeoutForMessage = false; } - auto msg = device->NewMessageFor("metric-feedback", 0, 0); + auto msg = channelInfo->channel->NewMessage(0, fair::mq::Alignment{64}); int64_t count = 0; do { - count = device->Receive(msg, "metric-feedback", 0, recvTimeout); - if (timeout && count <= 0 && device->NewStatePending()) { + count = channelInfo->channel->Receive(msg, recvTimeout); + if (timeout && count <= 0 && proxy.newStateRequested()) { return 1; } } while (count <= 0 && recvTimeout > 0 && !timeoutForMessage); @@ -76,6 +93,8 @@ int RateLimiter::check(ProcessingContext& ctx, int maxInFlight, size_t minSHM) } assert(msg->GetSize() == 8); mConsumedTimeframes = *(int64_t*)msg->GetData(); + // We reset the read polled. + channelInfo->readPolled = false; O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "timeframe_ratelimit", "Received %llu as consumed timeframes", mConsumedTimeframes); @@ -87,8 +106,8 @@ int RateLimiter::check(ProcessingContext& ctx, int maxInFlight, size_t minSHM) (mSentTimeframes - mConsumedTimeframes), maxInFlight); } else { O2_SIGNPOST_EVENT_EMIT_INFO(rate_limiting, sid, "timeframe_ratelimit", - "%lli / %d TF in flight, continue to publish", - (mSentTimeframes - mConsumedTimeframes), maxInFlight); + "%lli / %d TF in flight, continue to publish", + (mSentTimeframes - mConsumedTimeframes), maxInFlight); } } @@ -133,6 +152,7 @@ int RateLimiter::check(ProcessingContext& ctx, int maxInFlight, size_t minSHM) float elapsed = std::chrono::duration_cast>(curTime - mLastTime).count(); if (elapsed < mSmothDelay) { LOG(debug) << "TF Throttling: Elapsed " << elapsed << " --> Waiting for " << mSmothDelay - elapsed; + auto& deviceState = ctx.services().get(); uv_run(deviceState.loop, UV_RUN_NOWAIT); std::this_thread::sleep_for(std::chrono::microseconds((size_t)((mSmothDelay - elapsed) * 1.e6f))); } @@ -144,6 +164,7 @@ int RateLimiter::check(ProcessingContext& ctx, int maxInFlight, size_t minSHM) if (minSHM) { int waitMessage = 0; auto& runningWorkflow = ctx.services().get(); + auto* device = ctx.services().get().device(); while (true) { long freeMemory = -1; try {