Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 1 addition & 25 deletions Framework/Core/src/CommonDataProcessors.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,16 @@
#include "Framework/RuntimeError.h"
#include "Framework/RateLimiter.h"
#include "Framework/PluginManager.h"
#include "Framework/Signpost.h"
#include <Monitoring/Monitoring.h>

#include <fairmq/Device.h>
#include <uv.h>
#include <fstream>
#include <functional>
#include <memory>
#include <string>

using namespace o2::framework::data_matcher;

// Special log to track callbacks we know about
O2_DECLARE_DYNAMIC_LOG(callbacks);

namespace o2::framework
{

Expand Down Expand Up @@ -150,46 +145,27 @@ DataProcessorSpec CommonDataProcessors::getGlobalFairMQSink(std::vector<InputSpe

void retryMetricCallback(uv_async_t* async)
{
O2_SIGNPOST_ID_FROM_POINTER(cid, callbacks, async);
O2_SIGNPOST_EVENT_EMIT(callbacks, cid, "rate-limiting", "Attempting again propagating rate-limiting information.");

// Check if this is a source device
static size_t lastTimeslice = -1;
auto* services = (ServiceRegistryRef*)async->data;
auto& timesliceIndex = services->get<TimesliceIndex>();
auto* device = services->get<RawDeviceService>().device();
auto channel = device->GetChannels().find("metric-feedback");
auto oldestPossingTimeslice = timesliceIndex.getOldestPossibleOutput().timeslice.value;
if (channel == device->GetChannels().end()) {
O2_SIGNPOST_EVENT_EMIT(callbacks, cid, "rate-limiting", "Could not find metric-feedback channel.");
return;
}
fair::mq::MessagePtr payload(device->NewMessage());
payload->Rebuild(&oldestPossingTimeslice, sizeof(int64_t), nullptr, nullptr);
auto consumed = oldestPossingTimeslice;

size_t start = uv_hrtime();
int64_t result = channel->second[0].Send(payload, 100);
size_t stop = uv_hrtime();
// If the sending worked, we do not retry.
if (result <= 0) {
// Forcefully slow down in case FairMQ returns earlier than expected...
int64_t ellapsed = (stop - start) / 1000000;
if (ellapsed < 100) {
O2_SIGNPOST_EVENT_EMIT(callbacks, cid, "rate-limiting",
"FairMQ returned %llu earlier than expected. Sleeping %llu ms more before, retrying.",
result, ellapsed);
uv_sleep(100 - ellapsed);
} else {
O2_SIGNPOST_EVENT_EMIT(callbacks, cid, "rate-limiting",
"FairMQ returned %llu, unable to send last consumed timeslice to source for %llu ms, retrying.", result, ellapsed);
}
if (result != 0) {
// If the sending did not work, we keep trying until it actually works.
// This will schedule other tasks in the queue, so the processing of the
// data will still happen.
uv_async_send(async);
} else {
O2_SIGNPOST_EVENT_EMIT(callbacks, cid, "rate-limiting", "Send %llu bytes, Last timeslice now set to %zu.", result, consumed);
lastTimeslice = consumed;
}
}
Expand Down