Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 34 additions & 10 deletions Framework/Core/src/ArrowSupport.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "Framework/AnalysisSupportHelpers.h"
#include "Framework/ServiceRegistryRef.h"
#include "Framework/ServiceRegistryHelpers.h"
#include "Framework/Signpost.h"

#include "CommonMessageBackendsHelpers.h"
#include <Monitoring/Monitoring.h>
Expand All @@ -46,6 +47,8 @@
#include <boost/program_options/variables_map.hpp>
#include <csignal>

O2_DECLARE_DYNAMIC_LOG(rate_limiting);

namespace o2::framework
{

Expand Down Expand Up @@ -132,6 +135,7 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
auto &allDeviceMetrics = sm.deviceMetricsInfos;
auto &specs = sm.deviceSpecs;
auto &infos = sm.deviceInfos;
O2_SIGNPOST_ID_FROM_POINTER(sid, rate_limiting, &sm);

static auto stateMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "rate-limit-state");
static auto totalBytesCreatedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-bytes-created");
Expand Down Expand Up @@ -298,14 +302,17 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
for (size_t di = 0; di < specs.size(); di++) {
if (availableSharedMemory < possibleOffer) {
if (lowSharedMemoryCount == 0) {
LOGP(detail, "We do not have enough shared memory ({}MB) to offer {}MB. Total offerings {}", availableSharedMemory, possibleOffer, offeredSharedMemory);
O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "not enough",
"We do not have enough shared memory (%{bytes}llu MB) to offer %{bytes}llu MB. Total offerings %{bytes}llu",
availableSharedMemory, possibleOffer, offeredSharedMemory);
}
lowSharedMemoryCount++;
enoughSharedMemoryCount = 0;
break;
} else {
if (enoughSharedMemoryCount == 0) {
LOGP(detail, "We are back in a state where we enough shared memory: {}MB", availableSharedMemory);
O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "enough",
"We are back in a state where we enough shared memory: %{bytes}llu MB", availableSharedMemory);
}
enoughSharedMemoryCount++;
lowSharedMemoryCount = 0;
Expand All @@ -323,7 +330,9 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
continue;
}
possibleOffer = std::min(MAX_QUANTUM_SHARED_MEMORY, availableSharedMemory);
LOGP(detail, "Offering {}MB out of {} to {}", possibleOffer, availableSharedMemory, specs[candidate].id);
O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
"Offering %{bytes}llu MB out of %{bytes}llu to %{public}s",
possibleOffer, availableSharedMemory, specs[candidate].id.c_str());
manager.queueMessage(specs[candidate].id.c_str(), fmt::format("/shm-offer {}", possibleOffer).data());
availableSharedMemory -= possibleOffer;
offeredSharedMemory += possibleOffer;
Expand All @@ -341,12 +350,15 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
static int64_t lastShmOfferConsumed = 0;
static int64_t lastUnusedOfferedMemory = 0;
if (shmOfferBytesConsumed != lastShmOfferConsumed) {
LOGP(detail, "Offer consumed so far {}", shmOfferBytesConsumed);
O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
"Offer consumed so far %{bytes}llu", shmOfferBytesConsumed);
lastShmOfferConsumed = shmOfferBytesConsumed;
}
int unusedOfferedMemory = (offeredSharedMemory - (totalBytesExpired + shmOfferBytesConsumed) / 1000000);
if (lastUnusedOfferedMemory != unusedOfferedMemory) {
LOGP(detail, "unusedOfferedMemory:{} = offered:{} - (expired:{} + consumed:{}) / 1000000", unusedOfferedMemory, offeredSharedMemory, totalBytesExpired / 1000000, shmOfferBytesConsumed / 1000000);
O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
"unusedOfferedMemory:%{bytes}d = offered:%{bytes}llu - (expired:%{bytes}llu + consumed:%{bytes}llu) / 1000000",
unusedOfferedMemory, offeredSharedMemory, totalBytesExpired / 1000000, shmOfferBytesConsumed / 1000000);
lastUnusedOfferedMemory = unusedOfferedMemory;
}
// availableSharedMemory is the amount of memory which we know is available to be offered.
Expand All @@ -362,14 +374,17 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
auto* arrow = reinterpret_cast<ArrowContext*>(service);
auto totalBytes = 0;
auto totalMessages = 0;
O2_SIGNPOST_ID_FROM_POINTER(sid, rate_limiting, &arrow);
for (auto& input : ctx.inputs()) {
if (input.header == nullptr) {
continue;
}
auto const* dh = DataRefUtils::getHeader<DataHeader*>(input);
auto payloadSize = DataRefUtils::getPayloadSize(input);
if (dh->serialization != o2::header::gSerializationMethodArrow) {
LOGP(debug, "Message {}/{} is not of kind arrow, therefore we are not accounting its shared memory", dh->dataOrigin, dh->dataDescription);
O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
"Message %{public}.4s/%{public}.16s is not of kind arrow, therefore we are not accounting its shared memory.",
dh->dataOrigin.str, dh->dataDescription.str);
continue;
}
bool forwarded = false;
Expand All @@ -380,15 +395,21 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
}
}
if (forwarded) {
LOGP(debug, "Message {}/{} is forwarded so we are not returning its memory.", dh->dataOrigin, dh->dataDescription);
O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
"Message %{public}.4s/%{public}16.s is forwarded so we are not returning its memory.",
dh->dataOrigin.str, dh->dataDescription.str);
continue;
}
LOGP(debug, "Message {}/{} is being deleted. We will return {}MB.", dh->dataOrigin, dh->dataDescription, payloadSize / 1000000.);
O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
"Message %{public}.4s/%{public}.16s is being deleted. We will return %{bytes}f MB.",
dh->dataOrigin.str, dh->dataDescription.str, payloadSize / 1000000.);
totalBytes += payloadSize;
totalMessages += 1;
}
arrow->updateBytesDestroyed(totalBytes);
LOGP(debug, "{}MB bytes being given back to reader, totaling {}MB", totalBytes / 1000000., arrow->bytesDestroyed() / 1000000.);
O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "give back",
"%{bytes}f MB bytes being given back to reader, totaling %{bytes}f MB",
totalBytes / 1000000., arrow->bytesDestroyed() / 1000000.);
arrow->updateMessagesDestroyed(totalMessages);
auto& stats = ctx.services().get<DataProcessingStats>();
stats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_BYTES_DESTROYED), DataProcessingStats::Op::Set, static_cast<int64_t>(arrow->bytesDestroyed())});
Expand All @@ -410,7 +431,10 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
static bool once = false;
// Until we guarantee this is called only once...
if (!once) {
LOGP(info, "Rate limiting set up at {}MB distributed over {} readers", config->maxMemory, readers);
O2_SIGNPOST_ID_GENERATE(sid, rate_limiting);
O2_SIGNPOST_EVENT_EMIT_INFO(rate_limiting, sid, "setup",
"Rate limiting set up at %{bytes}llu MB distributed over %d readers",
config->maxMemory, readers);
registry.registerService(ServiceRegistryHelpers::handleForService<RateLimitConfig>(config));
once = true;
} },
Expand Down
24 changes: 20 additions & 4 deletions Framework/Core/src/RateLimiter.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,22 @@
#include "Framework/DataTakingContext.h"
#include "Framework/DeviceState.h"
#include "Framework/DeviceContext.h"
#include "Framework/Signpost.h"

#include <fairmq/Device.h>
#include <uv.h>
#include <fairmq/shmem/Monitor.h>
#include <fairmq/shmem/Common.h>
#include <chrono>
#include <thread>

O2_DECLARE_DYNAMIC_LOG(rate_limiting);

using namespace o2::framework;

int RateLimiter::check(ProcessingContext& ctx, int maxInFlight, size_t minSHM)
{
O2_SIGNPOST_ID_GENERATE(sid, rate_limiting);
if (!maxInFlight && !minSHM) {
return 0;
}
Expand All @@ -45,9 +50,13 @@ int RateLimiter::check(ProcessingContext& ctx, int maxInFlight, size_t minSHM)
while ((mSentTimeframes - mConsumedTimeframes) >= maxInFlight) {
if (recvTimeout != 0 && !waitMessage && (timeoutForMessage == false || std::chrono::duration_cast<std::chrono::duration<float>>(std::chrono::system_clock::now() - startTime).count() > MESSAGE_DELAY_TIME)) {
if (dtc.deploymentMode == DeploymentMode::OnlineDDS || dtc.deploymentMode == DeploymentMode::OnlineECS || dtc.deploymentMode == DeploymentMode::FST) {
LOG(alarm) << "Maximum number of TF in flight reached (" << maxInFlight << ": published " << mSentTimeframes << " - finished " << mConsumedTimeframes << "), waiting";
O2_SIGNPOST_EVENT_EMIT_ALARM(rate_limiting, sid, "timeframe_ratelimit",
"Maximum number of TF in flight reached (%d: published %llu - finished %llu), waiting",
maxInFlight, mSentTimeframes, mConsumedTimeframes);
} else {
LOG(info) << "Maximum number of TF in flight reached (" << maxInFlight << ": published " << mSentTimeframes << " - finished " << mConsumedTimeframes << "), waiting";
O2_SIGNPOST_EVENT_EMIT_INFO(rate_limiting, sid, "timeframe_ratelimit",
"Maximum number of TF in flight reached (%d: published %llu - finished %llu), waiting",
maxInFlight, mSentTimeframes, mConsumedTimeframes);
}
waitMessage = true;
timeoutForMessage = false;
Expand All @@ -67,12 +76,19 @@ int RateLimiter::check(ProcessingContext& ctx, int maxInFlight, size_t minSHM)
}
assert(msg->GetSize() == 8);
mConsumedTimeframes = *(int64_t*)msg->GetData();
O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "timeframe_ratelimit",
"Received %llu as consumed timeframes",
mConsumedTimeframes);
}
if (waitMessage) {
if (dtc.deploymentMode == DeploymentMode::OnlineDDS || dtc.deploymentMode == DeploymentMode::OnlineECS || dtc.deploymentMode == DeploymentMode::FST) {
LOG(important) << (mSentTimeframes - mConsumedTimeframes) << " / " << maxInFlight << " TF in flight, continuing to publish";
O2_SIGNPOST_EVENT_EMIT_IMPORTANT(rate_limiting, sid, "timeframe_ratelimit",
"%lli / %d TF in flight, continue to publish",
(mSentTimeframes - mConsumedTimeframes), maxInFlight);
} else {
LOG(info) << (mSentTimeframes - mConsumedTimeframes) << " / " << maxInFlight << " TF in flight, continuing to publish";
O2_SIGNPOST_EVENT_EMIT_INFO(rate_limiting, sid, "timeframe_ratelimit",
"%lli / %d TF in flight, continue to publish",
(mSentTimeframes - mConsumedTimeframes), maxInFlight);
}
}

Expand Down
20 changes: 20 additions & 0 deletions Framework/Foundation/include/Framework/Signpost.h
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,26 @@ void o2_debug_log_set_stacktrace(_o2_log_t* log, int stacktrace)
O2_LOG_MACRO_RAW(critical, remove_engineering_type(format).data(), ##__VA_ARGS__); \
})

// Similar to the above, however it will also print a normal alarm message regardless of the signpost being enabled or not.
#define O2_SIGNPOST_EVENT_EMIT_ALARM(log, id, name, format, ...) __extension__({ \
if (O2_BUILTIN_UNLIKELY(O2_SIGNPOST_ENABLED_MAC(log))) { \
O2_SIGNPOST_EVENT_EMIT_MAC(log, id, name, format, ##__VA_ARGS__); \
} else if (O2_BUILTIN_UNLIKELY(private_o2_log_##log->stacktrace)) { \
_o2_signpost_event_emit(private_o2_log_##log, id, name, remove_engineering_type(format).data(), ##__VA_ARGS__); \
} \
O2_LOG_MACRO_RAW(alarm, remove_engineering_type(format).data(), ##__VA_ARGS__); \
})

// Similar to the above, however it will also print a normal alarm message regardless of the signpost being enabled or not.
#define O2_SIGNPOST_EVENT_EMIT_IMPORTANT(log, id, name, format, ...) __extension__({ \
if (O2_BUILTIN_UNLIKELY(O2_SIGNPOST_ENABLED_MAC(log))) { \
O2_SIGNPOST_EVENT_EMIT_MAC(log, id, name, format, ##__VA_ARGS__); \
} else if (O2_BUILTIN_UNLIKELY(private_o2_log_##log->stacktrace)) { \
_o2_signpost_event_emit(private_o2_log_##log, id, name, remove_engineering_type(format).data(), ##__VA_ARGS__); \
} \
O2_LOG_MACRO_RAW(important, remove_engineering_type(format).data(), ##__VA_ARGS__); \
})

#define O2_SIGNPOST_START(log, id, name, format, ...) \
if (O2_BUILTIN_UNLIKELY(O2_SIGNPOST_ENABLED_MAC(log))) { \
O2_SIGNPOST_START_MAC(log, id, name, format, ##__VA_ARGS__); \
Expand Down
Loading