From 0a68680c7b69d535c384dfc2e970d572d729fe80 Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Fri, 27 Jun 2025 09:21:21 +0200 Subject: [PATCH] Back out "DPL: fix rate limiting handling (#14255)" Original commit changeset: 1bcf367115a9 --- Framework/Core/src/CommonDataProcessors.cxx | 26 +-------------------- 1 file changed, 1 insertion(+), 25 deletions(-) diff --git a/Framework/Core/src/CommonDataProcessors.cxx b/Framework/Core/src/CommonDataProcessors.cxx index c2431b3ab068d..737e1b7e635c8 100644 --- a/Framework/Core/src/CommonDataProcessors.cxx +++ b/Framework/Core/src/CommonDataProcessors.cxx @@ -30,11 +30,9 @@ #include "Framework/RuntimeError.h" #include "Framework/RateLimiter.h" #include "Framework/PluginManager.h" -#include "Framework/Signpost.h" #include #include -#include #include #include #include @@ -42,9 +40,6 @@ using namespace o2::framework::data_matcher; -// Special log to track callbacks we know about -O2_DECLARE_DYNAMIC_LOG(callbacks); - namespace o2::framework { @@ -150,10 +145,6 @@ DataProcessorSpec CommonDataProcessors::getGlobalFairMQSink(std::vectordata; auto& timesliceIndex = services->get(); @@ -161,35 +152,20 @@ void retryMetricCallback(uv_async_t* async) auto channel = device->GetChannels().find("metric-feedback"); auto oldestPossingTimeslice = timesliceIndex.getOldestPossibleOutput().timeslice.value; if (channel == device->GetChannels().end()) { - O2_SIGNPOST_EVENT_EMIT(callbacks, cid, "rate-limiting", "Could not find metric-feedback channel."); return; } fair::mq::MessagePtr payload(device->NewMessage()); payload->Rebuild(&oldestPossingTimeslice, sizeof(int64_t), nullptr, nullptr); auto consumed = oldestPossingTimeslice; - size_t start = uv_hrtime(); int64_t result = channel->second[0].Send(payload, 100); - size_t stop = uv_hrtime(); // If the sending worked, we do not retry. - if (result <= 0) { - // Forcefully slow down in case FairMQ returns earlier than expected... - int64_t ellapsed = (stop - start) / 1000000; - if (ellapsed < 100) { - O2_SIGNPOST_EVENT_EMIT(callbacks, cid, "rate-limiting", - "FairMQ returned %llu earlier than expected. Sleeping %llu ms more before, retrying.", - result, ellapsed); - uv_sleep(100 - ellapsed); - } else { - O2_SIGNPOST_EVENT_EMIT(callbacks, cid, "rate-limiting", - "FairMQ returned %llu, unable to send last consumed timeslice to source for %llu ms, retrying.", result, ellapsed); - } + if (result != 0) { // If the sending did not work, we keep trying until it actually works. // This will schedule other tasks in the queue, so the processing of the // data will still happen. uv_async_send(async); } else { - O2_SIGNPOST_EVENT_EMIT(callbacks, cid, "rate-limiting", "Send %llu bytes, Last timeslice now set to %zu.", result, consumed); lastTimeslice = consumed; } }