diff --git a/Framework/Core/src/ArrowSupport.cxx b/Framework/Core/src/ArrowSupport.cxx index 3b13e30581f70..3a7699fb6876d 100644 --- a/Framework/Core/src/ArrowSupport.cxx +++ b/Framework/Core/src/ArrowSupport.cxx @@ -33,6 +33,7 @@ #include "Framework/AnalysisSupportHelpers.h" #include "Framework/ServiceRegistryRef.h" #include "Framework/ServiceRegistryHelpers.h" +#include "Framework/Signpost.h" #include "CommonMessageBackendsHelpers.h" #include @@ -46,6 +47,8 @@ #include #include +O2_DECLARE_DYNAMIC_LOG(rate_limiting); + namespace o2::framework { @@ -132,6 +135,7 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() auto &allDeviceMetrics = sm.deviceMetricsInfos; auto &specs = sm.deviceSpecs; auto &infos = sm.deviceInfos; + O2_SIGNPOST_ID_FROM_POINTER(sid, rate_limiting, &sm); static auto stateMetric = DeviceMetricsHelper::createNumericMetric(driverMetrics, "rate-limit-state"); static auto totalBytesCreatedMetric = DeviceMetricsHelper::createNumericMetric(driverMetrics, "total-arrow-bytes-created"); @@ -298,14 +302,17 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() for (size_t di = 0; di < specs.size(); di++) { if (availableSharedMemory < possibleOffer) { if (lowSharedMemoryCount == 0) { - LOGP(detail, "We do not have enough shared memory ({}MB) to offer {}MB. Total offerings {}", availableSharedMemory, possibleOffer, offeredSharedMemory); + O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "not enough", + "We do not have enough shared memory (%{bytes}llu MB) to offer %{bytes}llu MB. Total offerings %{bytes}llu", + availableSharedMemory, possibleOffer, offeredSharedMemory); } lowSharedMemoryCount++; enoughSharedMemoryCount = 0; break; } else { if (enoughSharedMemoryCount == 0) { - LOGP(detail, "We are back in a state where we enough shared memory: {}MB", availableSharedMemory); + O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "enough", + "We are back in a state where we enough shared memory: %{bytes}llu MB", availableSharedMemory); } enoughSharedMemoryCount++; lowSharedMemoryCount = 0; @@ -323,7 +330,9 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() continue; } possibleOffer = std::min(MAX_QUANTUM_SHARED_MEMORY, availableSharedMemory); - LOGP(detail, "Offering {}MB out of {} to {}", possibleOffer, availableSharedMemory, specs[candidate].id); + O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer", + "Offering %{bytes}llu MB out of %{bytes}llu to %{public}s", + possibleOffer, availableSharedMemory, specs[candidate].id.c_str()); manager.queueMessage(specs[candidate].id.c_str(), fmt::format("/shm-offer {}", possibleOffer).data()); availableSharedMemory -= possibleOffer; offeredSharedMemory += possibleOffer; @@ -341,12 +350,15 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() static int64_t lastShmOfferConsumed = 0; static int64_t lastUnusedOfferedMemory = 0; if (shmOfferBytesConsumed != lastShmOfferConsumed) { - LOGP(detail, "Offer consumed so far {}", shmOfferBytesConsumed); + O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer", + "Offer consumed so far %{bytes}llu", shmOfferBytesConsumed); lastShmOfferConsumed = shmOfferBytesConsumed; } int unusedOfferedMemory = (offeredSharedMemory - (totalBytesExpired + shmOfferBytesConsumed) / 1000000); if (lastUnusedOfferedMemory != unusedOfferedMemory) { - LOGP(detail, "unusedOfferedMemory:{} = offered:{} - (expired:{} + consumed:{}) / 1000000", unusedOfferedMemory, offeredSharedMemory, totalBytesExpired / 1000000, shmOfferBytesConsumed / 1000000); + O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer", + "unusedOfferedMemory:%{bytes}d = offered:%{bytes}llu - (expired:%{bytes}llu + consumed:%{bytes}llu) / 1000000", + unusedOfferedMemory, offeredSharedMemory, totalBytesExpired / 1000000, shmOfferBytesConsumed / 1000000); lastUnusedOfferedMemory = unusedOfferedMemory; } // availableSharedMemory is the amount of memory which we know is available to be offered. @@ -362,6 +374,7 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() auto* arrow = reinterpret_cast(service); auto totalBytes = 0; auto totalMessages = 0; + O2_SIGNPOST_ID_FROM_POINTER(sid, rate_limiting, &arrow); for (auto& input : ctx.inputs()) { if (input.header == nullptr) { continue; @@ -369,7 +382,9 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() auto const* dh = DataRefUtils::getHeader(input); auto payloadSize = DataRefUtils::getPayloadSize(input); if (dh->serialization != o2::header::gSerializationMethodArrow) { - LOGP(debug, "Message {}/{} is not of kind arrow, therefore we are not accounting its shared memory", dh->dataOrigin, dh->dataDescription); + O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer", + "Message %{public}.4s/%{public}.16s is not of kind arrow, therefore we are not accounting its shared memory.", + dh->dataOrigin.str, dh->dataDescription.str); continue; } bool forwarded = false; @@ -380,15 +395,21 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() } } if (forwarded) { - LOGP(debug, "Message {}/{} is forwarded so we are not returning its memory.", dh->dataOrigin, dh->dataDescription); + O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer", + "Message %{public}.4s/%{public}16.s is forwarded so we are not returning its memory.", + dh->dataOrigin.str, dh->dataDescription.str); continue; } - LOGP(debug, "Message {}/{} is being deleted. We will return {}MB.", dh->dataOrigin, dh->dataDescription, payloadSize / 1000000.); + O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer", + "Message %{public}.4s/%{public}.16s is being deleted. We will return %{bytes}f MB.", + dh->dataOrigin.str, dh->dataDescription.str, payloadSize / 1000000.); totalBytes += payloadSize; totalMessages += 1; } arrow->updateBytesDestroyed(totalBytes); - LOGP(debug, "{}MB bytes being given back to reader, totaling {}MB", totalBytes / 1000000., arrow->bytesDestroyed() / 1000000.); + O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "give back", + "%{bytes}f MB bytes being given back to reader, totaling %{bytes}f MB", + totalBytes / 1000000., arrow->bytesDestroyed() / 1000000.); arrow->updateMessagesDestroyed(totalMessages); auto& stats = ctx.services().get(); stats.updateStats({static_cast(ProcessingStatsId::ARROW_BYTES_DESTROYED), DataProcessingStats::Op::Set, static_cast(arrow->bytesDestroyed())}); @@ -410,7 +431,10 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() static bool once = false; // Until we guarantee this is called only once... if (!once) { - LOGP(info, "Rate limiting set up at {}MB distributed over {} readers", config->maxMemory, readers); + O2_SIGNPOST_ID_GENERATE(sid, rate_limiting); + O2_SIGNPOST_EVENT_EMIT_INFO(rate_limiting, sid, "setup", + "Rate limiting set up at %{bytes}llu MB distributed over %d readers", + config->maxMemory, readers); registry.registerService(ServiceRegistryHelpers::handleForService(config)); once = true; } }, diff --git a/Framework/Core/src/RateLimiter.cxx b/Framework/Core/src/RateLimiter.cxx index f381148223280..e0a320c8b9c9c 100644 --- a/Framework/Core/src/RateLimiter.cxx +++ b/Framework/Core/src/RateLimiter.cxx @@ -16,6 +16,8 @@ #include "Framework/DataTakingContext.h" #include "Framework/DeviceState.h" #include "Framework/DeviceContext.h" +#include "Framework/Signpost.h" + #include #include #include @@ -23,10 +25,13 @@ #include #include +O2_DECLARE_DYNAMIC_LOG(rate_limiting); + using namespace o2::framework; int RateLimiter::check(ProcessingContext& ctx, int maxInFlight, size_t minSHM) { + O2_SIGNPOST_ID_GENERATE(sid, rate_limiting); if (!maxInFlight && !minSHM) { return 0; } @@ -45,9 +50,13 @@ int RateLimiter::check(ProcessingContext& ctx, int maxInFlight, size_t minSHM) while ((mSentTimeframes - mConsumedTimeframes) >= maxInFlight) { if (recvTimeout != 0 && !waitMessage && (timeoutForMessage == false || std::chrono::duration_cast>(std::chrono::system_clock::now() - startTime).count() > MESSAGE_DELAY_TIME)) { if (dtc.deploymentMode == DeploymentMode::OnlineDDS || dtc.deploymentMode == DeploymentMode::OnlineECS || dtc.deploymentMode == DeploymentMode::FST) { - LOG(alarm) << "Maximum number of TF in flight reached (" << maxInFlight << ": published " << mSentTimeframes << " - finished " << mConsumedTimeframes << "), waiting"; + O2_SIGNPOST_EVENT_EMIT_ALARM(rate_limiting, sid, "timeframe_ratelimit", + "Maximum number of TF in flight reached (%d: published %llu - finished %llu), waiting", + maxInFlight, mSentTimeframes, mConsumedTimeframes); } else { - LOG(info) << "Maximum number of TF in flight reached (" << maxInFlight << ": published " << mSentTimeframes << " - finished " << mConsumedTimeframes << "), waiting"; + O2_SIGNPOST_EVENT_EMIT_INFO(rate_limiting, sid, "timeframe_ratelimit", + "Maximum number of TF in flight reached (%d: published %llu - finished %llu), waiting", + maxInFlight, mSentTimeframes, mConsumedTimeframes); } waitMessage = true; timeoutForMessage = false; @@ -67,12 +76,19 @@ int RateLimiter::check(ProcessingContext& ctx, int maxInFlight, size_t minSHM) } assert(msg->GetSize() == 8); mConsumedTimeframes = *(int64_t*)msg->GetData(); + O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "timeframe_ratelimit", + "Received %llu as consumed timeframes", + mConsumedTimeframes); } if (waitMessage) { if (dtc.deploymentMode == DeploymentMode::OnlineDDS || dtc.deploymentMode == DeploymentMode::OnlineECS || dtc.deploymentMode == DeploymentMode::FST) { - LOG(important) << (mSentTimeframes - mConsumedTimeframes) << " / " << maxInFlight << " TF in flight, continuing to publish"; + O2_SIGNPOST_EVENT_EMIT_IMPORTANT(rate_limiting, sid, "timeframe_ratelimit", + "%lli / %d TF in flight, continue to publish", + (mSentTimeframes - mConsumedTimeframes), maxInFlight); } else { - LOG(info) << (mSentTimeframes - mConsumedTimeframes) << " / " << maxInFlight << " TF in flight, continuing to publish"; + O2_SIGNPOST_EVENT_EMIT_INFO(rate_limiting, sid, "timeframe_ratelimit", + "%lli / %d TF in flight, continue to publish", + (mSentTimeframes - mConsumedTimeframes), maxInFlight); } } diff --git a/Framework/Foundation/include/Framework/Signpost.h b/Framework/Foundation/include/Framework/Signpost.h index 53cc4d914a73b..781a2242375c8 100644 --- a/Framework/Foundation/include/Framework/Signpost.h +++ b/Framework/Foundation/include/Framework/Signpost.h @@ -562,6 +562,26 @@ void o2_debug_log_set_stacktrace(_o2_log_t* log, int stacktrace) O2_LOG_MACRO_RAW(critical, remove_engineering_type(format).data(), ##__VA_ARGS__); \ }) +// Similar to the above, however it will also print a normal alarm message regardless of the signpost being enabled or not. +#define O2_SIGNPOST_EVENT_EMIT_ALARM(log, id, name, format, ...) __extension__({ \ + if (O2_BUILTIN_UNLIKELY(O2_SIGNPOST_ENABLED_MAC(log))) { \ + O2_SIGNPOST_EVENT_EMIT_MAC(log, id, name, format, ##__VA_ARGS__); \ + } else if (O2_BUILTIN_UNLIKELY(private_o2_log_##log->stacktrace)) { \ + _o2_signpost_event_emit(private_o2_log_##log, id, name, remove_engineering_type(format).data(), ##__VA_ARGS__); \ + } \ + O2_LOG_MACRO_RAW(alarm, remove_engineering_type(format).data(), ##__VA_ARGS__); \ +}) + +// Similar to the above, however it will also print a normal alarm message regardless of the signpost being enabled or not. +#define O2_SIGNPOST_EVENT_EMIT_IMPORTANT(log, id, name, format, ...) __extension__({ \ + if (O2_BUILTIN_UNLIKELY(O2_SIGNPOST_ENABLED_MAC(log))) { \ + O2_SIGNPOST_EVENT_EMIT_MAC(log, id, name, format, ##__VA_ARGS__); \ + } else if (O2_BUILTIN_UNLIKELY(private_o2_log_##log->stacktrace)) { \ + _o2_signpost_event_emit(private_o2_log_##log, id, name, remove_engineering_type(format).data(), ##__VA_ARGS__); \ + } \ + O2_LOG_MACRO_RAW(important, remove_engineering_type(format).data(), ##__VA_ARGS__); \ +}) + #define O2_SIGNPOST_START(log, id, name, format, ...) \ if (O2_BUILTIN_UNLIKELY(O2_SIGNPOST_ENABLED_MAC(log))) { \ O2_SIGNPOST_START_MAC(log, id, name, format, ##__VA_ARGS__); \