From c1a0bf60ade4cc946010d93dfe3c7f8d53af2b6e Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Wed, 2 Jul 2025 14:54:42 +0200 Subject: [PATCH] DPL: improve debugging for rate limiting --- Framework/Core/src/ArrowSupport.cxx | 44 ++++++++++++++++++++++------- 1 file changed, 34 insertions(+), 10 deletions(-) diff --git a/Framework/Core/src/ArrowSupport.cxx b/Framework/Core/src/ArrowSupport.cxx index 3b13e30581f70..3a7699fb6876d 100644 --- a/Framework/Core/src/ArrowSupport.cxx +++ b/Framework/Core/src/ArrowSupport.cxx @@ -33,6 +33,7 @@ #include "Framework/AnalysisSupportHelpers.h" #include "Framework/ServiceRegistryRef.h" #include "Framework/ServiceRegistryHelpers.h" +#include "Framework/Signpost.h" #include "CommonMessageBackendsHelpers.h" #include @@ -46,6 +47,8 @@ #include #include +O2_DECLARE_DYNAMIC_LOG(rate_limiting); + namespace o2::framework { @@ -132,6 +135,7 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() auto &allDeviceMetrics = sm.deviceMetricsInfos; auto &specs = sm.deviceSpecs; auto &infos = sm.deviceInfos; + O2_SIGNPOST_ID_FROM_POINTER(sid, rate_limiting, &sm); static auto stateMetric = DeviceMetricsHelper::createNumericMetric(driverMetrics, "rate-limit-state"); static auto totalBytesCreatedMetric = DeviceMetricsHelper::createNumericMetric(driverMetrics, "total-arrow-bytes-created"); @@ -298,14 +302,17 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() for (size_t di = 0; di < specs.size(); di++) { if (availableSharedMemory < possibleOffer) { if (lowSharedMemoryCount == 0) { - LOGP(detail, "We do not have enough shared memory ({}MB) to offer {}MB. Total offerings {}", availableSharedMemory, possibleOffer, offeredSharedMemory); + O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "not enough", + "We do not have enough shared memory (%{bytes}llu MB) to offer %{bytes}llu MB. Total offerings %{bytes}llu", + availableSharedMemory, possibleOffer, offeredSharedMemory); } lowSharedMemoryCount++; enoughSharedMemoryCount = 0; break; } else { if (enoughSharedMemoryCount == 0) { - LOGP(detail, "We are back in a state where we enough shared memory: {}MB", availableSharedMemory); + O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "enough", + "We are back in a state where we enough shared memory: %{bytes}llu MB", availableSharedMemory); } enoughSharedMemoryCount++; lowSharedMemoryCount = 0; @@ -323,7 +330,9 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() continue; } possibleOffer = std::min(MAX_QUANTUM_SHARED_MEMORY, availableSharedMemory); - LOGP(detail, "Offering {}MB out of {} to {}", possibleOffer, availableSharedMemory, specs[candidate].id); + O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer", + "Offering %{bytes}llu MB out of %{bytes}llu to %{public}s", + possibleOffer, availableSharedMemory, specs[candidate].id.c_str()); manager.queueMessage(specs[candidate].id.c_str(), fmt::format("/shm-offer {}", possibleOffer).data()); availableSharedMemory -= possibleOffer; offeredSharedMemory += possibleOffer; @@ -341,12 +350,15 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() static int64_t lastShmOfferConsumed = 0; static int64_t lastUnusedOfferedMemory = 0; if (shmOfferBytesConsumed != lastShmOfferConsumed) { - LOGP(detail, "Offer consumed so far {}", shmOfferBytesConsumed); + O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer", + "Offer consumed so far %{bytes}llu", shmOfferBytesConsumed); lastShmOfferConsumed = shmOfferBytesConsumed; } int unusedOfferedMemory = (offeredSharedMemory - (totalBytesExpired + shmOfferBytesConsumed) / 1000000); if (lastUnusedOfferedMemory != unusedOfferedMemory) { - LOGP(detail, "unusedOfferedMemory:{} = offered:{} - (expired:{} + consumed:{}) / 1000000", unusedOfferedMemory, offeredSharedMemory, totalBytesExpired / 1000000, shmOfferBytesConsumed / 1000000); + O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer", + "unusedOfferedMemory:%{bytes}d = offered:%{bytes}llu - (expired:%{bytes}llu + consumed:%{bytes}llu) / 1000000", + unusedOfferedMemory, offeredSharedMemory, totalBytesExpired / 1000000, shmOfferBytesConsumed / 1000000); lastUnusedOfferedMemory = unusedOfferedMemory; } // availableSharedMemory is the amount of memory which we know is available to be offered. @@ -362,6 +374,7 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() auto* arrow = reinterpret_cast(service); auto totalBytes = 0; auto totalMessages = 0; + O2_SIGNPOST_ID_FROM_POINTER(sid, rate_limiting, &arrow); for (auto& input : ctx.inputs()) { if (input.header == nullptr) { continue; @@ -369,7 +382,9 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() auto const* dh = DataRefUtils::getHeader(input); auto payloadSize = DataRefUtils::getPayloadSize(input); if (dh->serialization != o2::header::gSerializationMethodArrow) { - LOGP(debug, "Message {}/{} is not of kind arrow, therefore we are not accounting its shared memory", dh->dataOrigin, dh->dataDescription); + O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer", + "Message %{public}.4s/%{public}.16s is not of kind arrow, therefore we are not accounting its shared memory.", + dh->dataOrigin.str, dh->dataDescription.str); continue; } bool forwarded = false; @@ -380,15 +395,21 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() } } if (forwarded) { - LOGP(debug, "Message {}/{} is forwarded so we are not returning its memory.", dh->dataOrigin, dh->dataDescription); + O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer", + "Message %{public}.4s/%{public}16.s is forwarded so we are not returning its memory.", + dh->dataOrigin.str, dh->dataDescription.str); continue; } - LOGP(debug, "Message {}/{} is being deleted. We will return {}MB.", dh->dataOrigin, dh->dataDescription, payloadSize / 1000000.); + O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer", + "Message %{public}.4s/%{public}.16s is being deleted. We will return %{bytes}f MB.", + dh->dataOrigin.str, dh->dataDescription.str, payloadSize / 1000000.); totalBytes += payloadSize; totalMessages += 1; } arrow->updateBytesDestroyed(totalBytes); - LOGP(debug, "{}MB bytes being given back to reader, totaling {}MB", totalBytes / 1000000., arrow->bytesDestroyed() / 1000000.); + O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "give back", + "%{bytes}f MB bytes being given back to reader, totaling %{bytes}f MB", + totalBytes / 1000000., arrow->bytesDestroyed() / 1000000.); arrow->updateMessagesDestroyed(totalMessages); auto& stats = ctx.services().get(); stats.updateStats({static_cast(ProcessingStatsId::ARROW_BYTES_DESTROYED), DataProcessingStats::Op::Set, static_cast(arrow->bytesDestroyed())}); @@ -410,7 +431,10 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() static bool once = false; // Until we guarantee this is called only once... if (!once) { - LOGP(info, "Rate limiting set up at {}MB distributed over {} readers", config->maxMemory, readers); + O2_SIGNPOST_ID_GENERATE(sid, rate_limiting); + O2_SIGNPOST_EVENT_EMIT_INFO(rate_limiting, sid, "setup", + "Rate limiting set up at %{bytes}llu MB distributed over %d readers", + config->maxMemory, readers); registry.registerService(ServiceRegistryHelpers::handleForService(config)); once = true; } },