Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
249 changes: 144 additions & 105 deletions Framework/Core/src/ArrowSupport.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,13 @@
#include "Framework/AODReaderHelpers.h"
#include "Framework/ArrowContext.h"
#include "Framework/ArrowTableSlicingCache.h"
#include "Framework/SliceCache.h"
#include "Framework/DataProcessor.h"
#include "Framework/DataProcessingStats.h"
#include "Framework/ServiceRegistry.h"
#include "Framework/ConfigContext.h"
#include "Framework/CommonDataProcessors.h"
#include "Framework/DataSpecUtils.h"
#include "Framework/DataSpecViews.h"
#include "Framework/DeviceSpec.h"
#include "Framework/EndOfStreamContext.h"
#include "Framework/Tracing.h"
#include "Framework/DeviceMetricsInfo.h"
#include "Framework/DeviceMetricsHelper.h"
#include "Framework/DeviceInfo.h"
Expand All @@ -41,7 +37,6 @@
#include "CommonMessageBackendsHelpers.h"
#include <Monitoring/Monitoring.h>
#include "Headers/DataHeader.h"
#include "Headers/DataHeaderHelpers.h"

#include <RtypesCore.h>
#include <fairmq/ProgOptions.h>
Expand Down Expand Up @@ -108,6 +103,135 @@ uint64_t calculateAvailableSharedMemory(ServiceRegistryRef registry)
return registry.get<RateLimitConfig>().maxMemory;
}

struct ResourceState {
int64_t available;
int64_t offered = 0;
int64_t lastDeviceOffered = 0;
};
struct ResourceStats {
int64_t enoughCount; /// How many times the resources were enough
int64_t lowCount; /// How many times the resources were not enough
};
struct ResourceSpec {
char const* name;
char const* unit;
char const* api; /// The callback to give resources to a device
int64_t maxAvailable; /// Maximum available quantity for a resource
int64_t maxQuantum; /// Largest offer which can be given
int64_t minQuantum; /// Smallest offer which can be given
int64_t metricOfferScaleFactor; /// The scale factor between the metric accounting and offers accounting
};

auto offerResources(ResourceState& resourceState,
ResourceSpec const& resourceSpec,
ResourceStats& resourceStats,
std::vector<DeviceSpec> const& specs,
std::vector<DeviceInfo> const& infos,
DevicesManager& manager,
int64_t offerConsumedCurrentValue,
int64_t offerExpiredCurrentValue,
int64_t acquiredResourceCurrentValue,
int64_t disposedResourceCurrentValue,
size_t timestamp,
DeviceMetricsInfo& driverMetrics,
std::function<void(DeviceMetricsInfo&, int value, size_t timestamp)>& availableResourceMetric,
std::function<void(DeviceMetricsInfo&, int value, size_t timestamp)>& unusedOfferedResourceMetric,
std::function<void(DeviceMetricsInfo&, int value, size_t timestamp)>& offeredResourceMetric,
void* signpostId) -> void
{
O2_SIGNPOST_ID_FROM_POINTER(sid, rate_limiting, signpostId);
/// We loop over the devices, starting from where we stopped last time
/// offering the minimum offer to each one
int64_t lastCandidate = -1;
int64_t possibleOffer = resourceSpec.minQuantum;

for (size_t di = 0; di < specs.size(); di++) {
if (resourceState.available < possibleOffer) {
if (resourceStats.lowCount == 0) {
O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "not enough",
"We do not have enough %{public}s (%llu %{public}s) to offer %llu %{public}s. Total offerings %{bytes}llu %{string}s.",
resourceSpec.name, resourceState.available, resourceSpec.unit,
possibleOffer, resourceSpec.unit,
resourceState.offered, resourceSpec.unit);
}
resourceStats.lowCount++;
resourceStats.enoughCount = 0;
break;
} else {
if (resourceStats.enoughCount == 0) {
O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "enough",
"We are back in a state where we enough %{public}s: %llu %{public}s",
resourceSpec.name,
resourceState.available,
resourceSpec.unit);
}
resourceStats.lowCount = 0;
resourceStats.enoughCount++;
}
size_t candidate = (resourceState.lastDeviceOffered + di) % specs.size();

auto& info = infos[candidate];
// Do not bother for inactive devices
// FIXME: there is probably a race condition if the device died and we did not
// took notice yet...
if (info.active == false || info.readyToQuit) {
O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
"Device %s is inactive not offering %{public}s to it.",
specs[candidate].name.c_str(), resourceSpec.name);
continue;
}
if (specs[candidate].name != "internal-dpl-aod-reader") {
O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
"Device %s is not a reader. Not offering %{public}s to it.",
specs[candidate].name.c_str(),
resourceSpec.name);
continue;
}
possibleOffer = std::min(resourceSpec.maxQuantum, resourceState.available);
O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
"Offering %llu %{public}s out of %llu to %{public}s",
possibleOffer, resourceSpec.unit, resourceState.available, specs[candidate].id.c_str());
manager.queueMessage(specs[candidate].id.c_str(), fmt::format(fmt::runtime(resourceSpec.api), possibleOffer).data());
resourceState.available -= possibleOffer;
resourceState.offered += possibleOffer;
lastCandidate = candidate;
}
// We had at least a valid candidate, so
// next time we offer to the next device.
if (lastCandidate >= 0) {
resourceState.lastDeviceOffered = lastCandidate + 1;
}

// unusedOfferedSharedMemory is the amount of memory which was offered and which we know it was
// not used so far. So we need to account for the amount which got actually read (readerBytesCreated)
// and the amount which we know was given back.
static int64_t lastShmOfferConsumed = 0;
static int64_t lastUnusedOfferedMemory = 0;
if (offerConsumedCurrentValue != lastShmOfferConsumed) {
O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
"Offer consumed so far %llu", offerConsumedCurrentValue);
lastShmOfferConsumed = offerConsumedCurrentValue;
}
int unusedOfferedMemory = (resourceState.offered - (offerExpiredCurrentValue + offerConsumedCurrentValue) / resourceSpec.metricOfferScaleFactor);
if (lastUnusedOfferedMemory != unusedOfferedMemory) {
O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
"unusedOfferedMemory:%{bytes}d = offered:%{bytes}llu - (expired:%{bytes}llu + consumed:%{bytes}llu) / %lli",
unusedOfferedMemory, resourceState.offered,
offerExpiredCurrentValue / resourceSpec.metricOfferScaleFactor,
offerConsumedCurrentValue / resourceSpec.metricOfferScaleFactor,
resourceSpec.metricOfferScaleFactor);
lastUnusedOfferedMemory = unusedOfferedMemory;
}
// availableSharedMemory is the amount of memory which we know is available to be offered.
// We subtract the amount which we know was already offered but it's unused and we then balance how
// much was created with how much was destroyed.
resourceState.available = resourceSpec.maxAvailable + ((disposedResourceCurrentValue - acquiredResourceCurrentValue) / resourceSpec.metricOfferScaleFactor) - unusedOfferedMemory;
availableResourceMetric(driverMetrics, resourceState.available, timestamp);
unusedOfferedResourceMetric(driverMetrics, unusedOfferedMemory, timestamp);

offeredResourceMetric(driverMetrics, resourceState.offered, timestamp);
};

o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
{
using o2::monitoring::Metric;
Expand Down Expand Up @@ -138,7 +262,6 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
auto &allDeviceMetrics = sm.deviceMetricsInfos;
auto &specs = sm.deviceSpecs;
auto &infos = sm.deviceInfos;
O2_SIGNPOST_ID_FROM_POINTER(sid, rate_limiting, &sm);

static auto stateMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "rate-limit-state");
static auto totalBytesCreatedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-bytes-created");
Expand Down Expand Up @@ -288,112 +411,28 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
if (maxTimeframes && (totalTimeframesRead - totalTimeframesConsumed) > maxTimeframes) {
return;
}
struct ResourceState {
int64_t available;
int64_t offered = 0;
int64_t lastDeviceOffered = 0;
};
struct ResourceStats {
int64_t enoughCount;
int64_t lowCount;
};
struct ResourceSpec{
int64_t maxAvailable;
int64_t maxQuantum;
int64_t minQuantum;
};
static const ResourceSpec resourceSpec{
static const ResourceSpec shmResourceSpec{
.name = "shared memory",
.unit = "MB",
.api = "/shm-offer {}",
.maxAvailable = (int64_t)calculateAvailableSharedMemory(registry),
.maxQuantum = 100,
.minQuantum = 50,
.metricOfferScaleFactor = 1000000,
};
static ResourceState resourceState{
.available = resourceSpec.maxAvailable,
static ResourceState shmResourceState{
.available = shmResourceSpec.maxAvailable,
};
static ResourceStats resourceStats{
.enoughCount = resourceState.available - resourceSpec.minQuantum > 0 ? 1 : 0,
.lowCount = resourceState.available - resourceSpec.minQuantum > 0 ? 0 : 1
static ResourceStats shmResourceStats{
.enoughCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 1 : 0,
.lowCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 0 : 1
};

/// We loop over the devices, starting from where we stopped last time
/// offering MIN_QUANTUM_SHARED_MEMORY of shared memory to each reader.
int64_t lastCandidate = -1;
int64_t possibleOffer = resourceSpec.minQuantum;

for (size_t di = 0; di < specs.size(); di++) {
if (resourceState.available < possibleOffer) {
if (resourceStats.lowCount == 0) {
O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "not enough",
"We do not have enough shared memory (%{bytes}llu MB) to offer %{bytes}llu MB. Total offerings %{bytes}llu",
resourceState.available, possibleOffer, resourceState.offered);
}
resourceStats.lowCount++;
resourceStats.enoughCount = 0;
break;
} else {
if (resourceStats.enoughCount == 0) {
O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "enough",
"We are back in a state where we enough shared memory: %{bytes}llu MB", resourceState.available);
}
resourceStats.lowCount = 0;
resourceStats.enoughCount++;
}
size_t candidate = (resourceState.lastDeviceOffered + di) % specs.size();

auto& info = infos[candidate];
// Do not bother for inactive devices
// FIXME: there is probably a race condition if the device died and we did not
// took notice yet...
if (info.active == false || info.readyToQuit) {
O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
"Device %s is inactive not offering memory to it.", specs[candidate].name.c_str());
continue;
}
if (specs[candidate].name != "internal-dpl-aod-reader") {
O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
"Device %s is not a reader. Not offering memory to it.", specs[candidate].name.c_str());
continue;
}
possibleOffer = std::min(resourceSpec.maxQuantum, resourceState.available);
O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
"Offering %{bytes}llu MB out of %{bytes}llu to %{public}s",
possibleOffer, resourceState.available, specs[candidate].id.c_str());
manager.queueMessage(specs[candidate].id.c_str(), fmt::format("/shm-offer {}", possibleOffer).data());
resourceState.available -= possibleOffer;
resourceState.offered += possibleOffer;
lastCandidate = candidate;
}
// We had at least a valid candidate, so
// next time we offer to the next device.
if (lastCandidate >= 0) {
resourceState.lastDeviceOffered = lastCandidate + 1;
}

// unusedOfferedSharedMemory is the amount of memory which was offered and which we know it was
// not used so far. So we need to account for the amount which got actually read (readerBytesCreated)
// and the amount which we know was given back.
static int64_t lastShmOfferConsumed = 0;
static int64_t lastUnusedOfferedMemory = 0;
if (shmOfferBytesConsumed != lastShmOfferConsumed) {
O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
"Offer consumed so far %{bytes}llu", shmOfferBytesConsumed);
lastShmOfferConsumed = shmOfferBytesConsumed;
}
int unusedOfferedMemory = (resourceState.offered - (totalBytesExpired + shmOfferBytesConsumed) / 1000000);
if (lastUnusedOfferedMemory != unusedOfferedMemory) {
O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
"unusedOfferedMemory:%{bytes}d = offered:%{bytes}llu - (expired:%{bytes}llu + consumed:%{bytes}llu) / 1000000",
unusedOfferedMemory, resourceState.offered, totalBytesExpired / 1000000, shmOfferBytesConsumed / 1000000);
lastUnusedOfferedMemory = unusedOfferedMemory;
}
// availableSharedMemory is the amount of memory which we know is available to be offered.
// We subtract the amount which we know was already offered but it's unused and we then balance how
// much was created with how much was destroyed.
resourceState.available = resourceSpec.maxAvailable + ((totalBytesDestroyed - totalBytesCreated) / 1000000) - unusedOfferedMemory;
availableSharedMemoryMetric(driverMetrics, resourceState.available, timestamp);
unusedOfferedSharedMemoryMetric(driverMetrics, unusedOfferedMemory, timestamp);

offeredSharedMemoryMetric(driverMetrics, resourceState.offered, timestamp); },
offerResources(shmResourceState, shmResourceSpec, shmResourceStats,
specs, infos, manager, shmOfferBytesConsumed, totalBytesExpired,
totalBytesCreated, totalBytesDestroyed, timestamp, driverMetrics,
availableSharedMemoryMetric, unusedOfferedSharedMemoryMetric, offeredSharedMemoryMetric,
(void*)&sm); },
.postDispatching = [](ProcessingContext& ctx, void* service) {
using DataHeader = o2::header::DataHeader;
auto* arrow = reinterpret_cast<ArrowContext*>(service);
Expand Down
6 changes: 3 additions & 3 deletions Framework/Core/src/DataRelayer.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
#endif
#include <fmt/format.h>
#include <fmt/ostream.h>
#include <gsl/span>
#include <span>
#include <string>

using namespace o2::framework::data_matcher;
Expand Down Expand Up @@ -191,7 +191,7 @@ DataRelayer::ActivityStats DataRelayer::processDanglingInputs(std::vector<Expira
continue;
}

auto getPartialRecord = [&cache = mCache, numInputTypes = mDistinctRoutesIndex.size()](int li) -> gsl::span<MessageSet const> {
auto getPartialRecord = [&cache = mCache, numInputTypes = mDistinctRoutesIndex.size()](int li) -> std::span<MessageSet const> {
auto offset = li * numInputTypes;
assert(cache.size() >= offset + numInputTypes);
auto const start = cache.data() + offset;
Expand Down Expand Up @@ -710,7 +710,7 @@ void DataRelayer::getReadyToProcess(std::vector<DataRelayer::RecordAction>& comp
//
// We use this to bail out early from the check as soon as we find something
// which we know is not complete.
auto getPartialRecord = [&cache, &numInputTypes](int li) -> gsl::span<MessageSet const> {
auto getPartialRecord = [&cache, &numInputTypes](int li) -> std::span<MessageSet const> {
auto offset = li * numInputTypes;
assert(cache.size() >= offset + numInputTypes);
auto const start = cache.data() + offset;
Expand Down