From 0d29a489116858f865100fea3c46f7c7b6ae0244 Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Wed, 12 Nov 2025 10:14:54 +0100 Subject: [PATCH 1/2] DPL: signposts for rate limiting callbacks. --- Framework/Core/src/CommonDataProcessors.cxx | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/Framework/Core/src/CommonDataProcessors.cxx b/Framework/Core/src/CommonDataProcessors.cxx index 4b5f317f58063..5d99fd3db7578 100644 --- a/Framework/Core/src/CommonDataProcessors.cxx +++ b/Framework/Core/src/CommonDataProcessors.cxx @@ -268,12 +268,16 @@ AlgorithmSpec CommonDataProcessors::wrapWithRateLimiting(AlgorithmSpec spec) return PluginManager::wrapAlgorithm(spec, [](AlgorithmSpec::ProcessCallback& original, ProcessingContext& pcx) -> void { auto& raw = pcx.services().get(); static RateLimiter limiter; + O2_SIGNPOST_ID_FROM_POINTER(sid, rate_limiting, &pcx); auto limit = std::stoi(raw.device()->fConfig->GetValue("timeframes-rate-limit")); - LOG(detail) << "Rate limiting to " << limit << " timeframes in flight"; + O2_SIGNPOST_EVENT_EMIT_DETAIL(rate_limiting, sid, "rate limiting callback", + "Rate limiting to %d timeframes in flight", limit); limiter.check(pcx, limit, 2000); - LOG(detail) << "Rate limiting passed. Invoking old callback"; + O2_SIGNPOST_EVENT_EMIT_DETAIL(rate_limiting, sid, "rate limiting callback", + "Rate limiting passed. Invoking old callback."); original(pcx); - LOG(detail) << "Rate limited callback done"; + O2_SIGNPOST_EVENT_EMIT_DETAIL(rate_limiting, sid, "rate limiting callback", + "Rate limited callback done."); }); } From 9c60ce9d44e1dcb1a3962c56574ac207166f53e4 Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Wed, 12 Nov 2025 10:14:54 +0100 Subject: [PATCH 2/2] DPL Analysis: improve default value for timeframes rate limiting --- Framework/Core/src/ArrowSupport.cxx | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/Framework/Core/src/ArrowSupport.cxx b/Framework/Core/src/ArrowSupport.cxx index 932c1fdacacfb..da00c8db42280 100644 --- a/Framework/Core/src/ArrowSupport.cxx +++ b/Framework/Core/src/ArrowSupport.cxx @@ -33,6 +33,7 @@ #include "Framework/ServiceRegistryRef.h" #include "Framework/ServiceRegistryHelpers.h" #include "Framework/Signpost.h" +#include "Framework/DefaultsHelpers.h" #include "CommonMessageBackendsHelpers.h" #include @@ -65,7 +66,7 @@ enum struct RateLimitingState { struct RateLimitConfig { int64_t maxMemory = 2000; - int64_t maxTimeframes = 1; + int64_t maxTimeframes = 1000; }; struct MetricIndices { @@ -524,7 +525,7 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() if (dc.options.count("timeframes-rate-limit") && dc.options["timeframes-rate-limit"].defaulted() == false) { config->maxTimeframes = std::stoll(dc.options["timeframes-rate-limit"].as()); } else { - config->maxTimeframes = readers; + config->maxTimeframes = readers * DefaultsHelpers::pipelineLength(); } static bool once = false; // Until we guarantee this is called only once...