From bba108bc52d8bbc67a23af7de9b62e8430724746 Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Fri, 9 May 2025 13:01:12 +0200 Subject: [PATCH] DPL: fix rate limiting handling On success, FairMQ returns a positive number of bytes, not 0. --- Framework/Core/src/CommonDataProcessors.cxx | 26 ++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/Framework/Core/src/CommonDataProcessors.cxx b/Framework/Core/src/CommonDataProcessors.cxx index 737e1b7e635c8..c2431b3ab068d 100644 --- a/Framework/Core/src/CommonDataProcessors.cxx +++ b/Framework/Core/src/CommonDataProcessors.cxx @@ -30,9 +30,11 @@ #include "Framework/RuntimeError.h" #include "Framework/RateLimiter.h" #include "Framework/PluginManager.h" +#include "Framework/Signpost.h" #include #include +#include #include #include #include @@ -40,6 +42,9 @@ using namespace o2::framework::data_matcher; +// Special log to track callbacks we know about +O2_DECLARE_DYNAMIC_LOG(callbacks); + namespace o2::framework { @@ -145,6 +150,10 @@ DataProcessorSpec CommonDataProcessors::getGlobalFairMQSink(std::vectordata; auto& timesliceIndex = services->get(); @@ -152,20 +161,35 @@ void retryMetricCallback(uv_async_t* async) auto channel = device->GetChannels().find("metric-feedback"); auto oldestPossingTimeslice = timesliceIndex.getOldestPossibleOutput().timeslice.value; if (channel == device->GetChannels().end()) { + O2_SIGNPOST_EVENT_EMIT(callbacks, cid, "rate-limiting", "Could not find metric-feedback channel."); return; } fair::mq::MessagePtr payload(device->NewMessage()); payload->Rebuild(&oldestPossingTimeslice, sizeof(int64_t), nullptr, nullptr); auto consumed = oldestPossingTimeslice; + size_t start = uv_hrtime(); int64_t result = channel->second[0].Send(payload, 100); + size_t stop = uv_hrtime(); // If the sending worked, we do not retry. - if (result != 0) { + if (result <= 0) { + // Forcefully slow down in case FairMQ returns earlier than expected... + int64_t ellapsed = (stop - start) / 1000000; + if (ellapsed < 100) { + O2_SIGNPOST_EVENT_EMIT(callbacks, cid, "rate-limiting", + "FairMQ returned %llu earlier than expected. Sleeping %llu ms more before, retrying.", + result, ellapsed); + uv_sleep(100 - ellapsed); + } else { + O2_SIGNPOST_EVENT_EMIT(callbacks, cid, "rate-limiting", + "FairMQ returned %llu, unable to send last consumed timeslice to source for %llu ms, retrying.", result, ellapsed); + } // If the sending did not work, we keep trying until it actually works. // This will schedule other tasks in the queue, so the processing of the // data will still happen. uv_async_send(async); } else { + O2_SIGNPOST_EVENT_EMIT(callbacks, cid, "rate-limiting", "Send %llu bytes, Last timeslice now set to %zu.", result, consumed); lastTimeslice = consumed; } }