Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback(ConfigContext const
numTF,
watchdog,
maxRate,
didir, reportTFN, reportTFFileName](Monitoring& monitoring, DataAllocator& outputs, ControlService& control, DeviceSpec const& device) {
didir, reportTFN, reportTFFileName](Monitoring& monitoring, DataAllocator& outputs, ControlService& control, DeviceSpec const& device, DataProcessingStats& dpstats) {
// Each parallel reader device.inputTimesliceId reads the files fileCounter*device.maxInputTimeslices+device.inputTimesliceId
// the TF to read is numTF
assert(device.inputTimesliceId < device.maxInputTimeslices);
Expand Down Expand Up @@ -302,6 +302,10 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback(ConfigContext const
}
}
totalDFSent++;

// Use the new API for sending TIMESLICE_NUMBER_STARTED
dpstats.updateStats({(int)ProcessingStatsId::TIMESLICE_NUMBER_STARTED, DataProcessingStats::Op::Add, 1});
dpstats.processCommandQueue();
monitoring.send(Metric{(uint64_t)totalDFSent, "df-sent"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL));
monitoring.send(Metric{(uint64_t)totalSizeUncompressed / 1000, "aod-bytes-read-uncompressed"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL));
monitoring.send(Metric{(uint64_t)totalSizeCompressed / 1000, "aod-bytes-read-compressed"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL));
Expand Down
1 change: 1 addition & 0 deletions Framework/Core/include/Framework/CommonDataProcessors.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ struct CommonDataProcessors {
/// and simply discards them. Rate limiting goes through the DPL driver
static DataProcessorSpec getScheduledDummySink(std::vector<InputSpec> const& danglingInputs);
static AlgorithmSpec wrapWithRateLimiting(AlgorithmSpec spec);
static AlgorithmSpec wrapWithTimesliceConsumption(AlgorithmSpec spec);
};

} // namespace o2::framework
Expand Down
8 changes: 6 additions & 2 deletions Framework/Core/include/Framework/DataProcessingStats.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,15 @@ enum struct ProcessingStatsId : short {
CPU_USAGE_FRACTION,
ARROW_BYTES_CREATED,
ARROW_BYTES_DESTROYED,
ARROW_BYTES_EXPIRED,
ARROW_MESSAGES_CREATED,
ARROW_MESSAGES_DESTROYED,
ARROW_BYTES_EXPIRED,
TIMESLICE_OFFER_NUMBER_CONSUMED,
TIMESLICE_NUMBER_STARTED,
TIMESLICE_NUMBER_EXPIRED,
TIMESLICE_NUMBER_DONE,
RESOURCE_OFFER_EXPIRED,
SHM_OFFER_BYTES_CONSUMED,
TIMESLICE_OFFER_NUMBER_CONSUMED,
RESOURCES_MISSING,
RESOURCES_INSUFFICIENT,
RESOURCES_SATISFACTORY,
Expand Down Expand Up @@ -172,9 +174,11 @@ struct DataProcessingStats {
};

void registerMetric(MetricSpec const& spec);

// Update some stats as specified by the @cmd cmd
void updateStats(CommandSpec cmd);

char const* findMetricNameById(ProcessingStatsId id) const;
/// This will process the queue of commands required to update the stats.
/// It is meant to be called periodically by a single thread.
void processCommandQueue();
Expand Down
51 changes: 48 additions & 3 deletions Framework/Core/src/ArrowSupport.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ struct MetricIndices {
size_t timeframesRead = -1;
size_t timeframesConsumed = -1;
size_t timeframesExpired = -1;
// Timeslices counting
size_t timeslicesStarted = -1;
size_t timeslicesExpired = -1;
size_t timeslicesDone = -1;
};

std::vector<MetricIndices> createDefaultIndices(std::vector<DeviceMetricsInfo>& allDevicesMetrics)
Expand All @@ -95,7 +99,11 @@ std::vector<MetricIndices> createDefaultIndices(std::vector<DeviceMetricsInfo>&
.shmOfferBytesConsumed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "shm-offer-bytes-consumed"),
.timeframesRead = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "df-sent"),
.timeframesConsumed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "consumed-timeframes"),
.timeframesExpired = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "expired-timeframes")});
.timeframesExpired = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "expired-timeframes"),
.timeslicesStarted = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "timeslices-started"),
.timeslicesExpired = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "timeslices-expired"),
.timeslicesDone = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "timeslices-done"),
});
}
return results;
}
Expand Down Expand Up @@ -230,6 +238,19 @@ auto offerResources(ResourceState& resourceState,
offeredResourceMetric(driverMetrics, resourceState.offered, timestamp);
};

auto processTimeslices = [](size_t index, DeviceMetricsInfo& deviceMetrics, bool& changed,
int64_t& totalMetricValue, size_t& lastTimestamp) {
assert(index < deviceMetrics.metrics.size());
changed |= deviceMetrics.changed[index];
MetricInfo info = deviceMetrics.metrics[index];
assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
auto value = (int64_t)data[(info.pos - 1) % data.size()];
totalMetricValue += value;
auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
};

o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
{
using o2::monitoring::Metric;
Expand Down Expand Up @@ -257,11 +278,22 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
int64_t totalTimeframesRead = 0;
int64_t totalTimeframesConsumed = 0;
int64_t totalTimeframesExpired = 0;
int64_t totalTimeslicesStarted = 0;
int64_t totalTimeslicesDone = 0;
int64_t totalTimeslicesExpired = 0;
auto &driverMetrics = sm.driverMetricsInfo;
auto &allDeviceMetrics = sm.deviceMetricsInfos;
auto &specs = sm.deviceSpecs;
auto &infos = sm.deviceInfos;

// Aggregated driver metrics for timeslice rate limiting
auto createUint64DriverMetric = [&driverMetrics](char const*name) -> auto {
return DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, name);
};
auto createIntDriverMetric = [&driverMetrics](char const*name) -> auto {
return DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, name);
};

static auto stateMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "rate-limit-state");
static auto totalBytesCreatedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-bytes-created");
static auto shmOfferConsumedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-shm-offer-bytes-consumed");
Expand All @@ -280,6 +312,12 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
static auto totalTimeframesReadMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-timeframes-read");
static auto totalTimeframesConsumedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-timeframes-consumed");
static auto totalTimeframesInFlyMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-timeframes-in-fly");

static auto totalTimeslicesStartedMetric = createUint64DriverMetric("total-timeslices-started");
static auto totalTimeslicesExpiredMetric = createUint64DriverMetric("total-timeslices-expired");
static auto totalTimeslicesDoneMetric = createUint64DriverMetric("total-timeslices-done");
static auto totalTimeslicesInFlyMetric = createIntDriverMetric("total-timeslices-in-fly");

static auto totalBytesDeltaMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "arrow-bytes-delta");
static auto changedCountMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "changed-metrics-count");
static auto totalSignalsMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "aod-reader-signals");
Expand Down Expand Up @@ -406,6 +444,9 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
}
processTimeslices(indices.timeslicesStarted, deviceMetrics, changed, totalTimeslicesStarted, lastTimestamp);
processTimeslices(indices.timeslicesExpired, deviceMetrics, changed, totalTimeslicesExpired, lastTimestamp);
processTimeslices(indices.timeslicesDone, deviceMetrics, changed, totalTimeslicesDone, lastTimestamp);
}
static uint64_t unchangedCount = 0;
if (changed) {
Expand All @@ -418,6 +459,10 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
totalTimeframesReadMetric(driverMetrics, totalTimeframesRead, timestamp);
totalTimeframesConsumedMetric(driverMetrics, totalTimeframesConsumed, timestamp);
totalTimeframesInFlyMetric(driverMetrics, (int)(totalTimeframesRead - totalTimeframesConsumed), timestamp);
totalTimeslicesStartedMetric(driverMetrics, totalTimeslicesStarted, timestamp);
totalTimeslicesExpiredMetric(driverMetrics, totalTimeslicesExpired, timestamp);
totalTimeslicesDoneMetric(driverMetrics, totalTimeslicesDone, timestamp);
totalTimeslicesInFlyMetric(driverMetrics, (int)(totalTimeslicesStarted - totalTimeslicesDone), timestamp);
totalBytesDeltaMetric(driverMetrics, totalBytesCreated - totalBytesExpired - totalBytesDestroyed, timestamp);
} else {
unchangedCount++;
Expand Down Expand Up @@ -458,8 +503,8 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
};

offerResources(timesliceResourceState, timesliceResourceSpec, timesliceResourceStats,
specs, infos, manager, totalTimeframesConsumed, totalTimeframesExpired,
totalTimeframesRead, totalTimeframesConsumed, timestamp, driverMetrics,
specs, infos, manager, totalTimeframesConsumed, totalTimeslicesExpired,
totalTimeslicesStarted, totalTimeslicesDone, timestamp, driverMetrics,
availableTimeslicesMetric, unusedOfferedTimeslicesMetric, offeredTimeslicesMetric,
(void*)&sm);

Expand Down
40 changes: 37 additions & 3 deletions Framework/Core/src/CommonDataProcessors.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ using namespace o2::framework::data_matcher;
// Special log to track callbacks we know about
O2_DECLARE_DYNAMIC_LOG(callbacks);
O2_DECLARE_DYNAMIC_LOG(rate_limiting);
O2_DECLARE_DYNAMIC_LOG(quota);

namespace o2::framework
{
Expand Down Expand Up @@ -212,7 +213,7 @@ DataProcessorSpec CommonDataProcessors::getDummySink(std::vector<InputSpec> cons
auto oldestPossingTimeslice = timesliceIndex.getOldestPossibleOutput().timeslice.value;
auto& stats = services.get<DataProcessingStats>();
stats.updateStats({(int)ProcessingStatsId::CONSUMED_TIMEFRAMES, DataProcessingStats::Op::Set, (int64_t)oldestPossingTimeslice});
stats.updateStats({(int)ProcessingStatsId::TIMESLICE_OFFER_NUMBER_CONSUMED, DataProcessingStats::Op::Set, (int64_t)oldestPossingTimeslice});
stats.updateStats({(int)ProcessingStatsId::TIMESLICE_NUMBER_DONE, DataProcessingStats::Op::Set, (int64_t)oldestPossingTimeslice});
stats.processCommandQueue();
};
callbacks.set<CallbackService::Id::DomainInfoUpdated>(domainInfoUpdated);
Expand Down Expand Up @@ -247,7 +248,7 @@ DataProcessorSpec CommonDataProcessors::getScheduledDummySink(std::vector<InputS
O2_SIGNPOST_ID_GENERATE(sid, rate_limiting);
O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "run", "Consumed timeframes (domain info updated) to be set to %zu.", oldestPossingTimeslice);
stats.updateStats({(int)ProcessingStatsId::CONSUMED_TIMEFRAMES, DataProcessingStats::Op::Set, (int64_t)oldestPossingTimeslice});
stats.updateStats({(int)ProcessingStatsId::TIMESLICE_OFFER_NUMBER_CONSUMED, DataProcessingStats::Op::Set, (int64_t)oldestPossingTimeslice});
stats.updateStats({(int)ProcessingStatsId::TIMESLICE_NUMBER_DONE, DataProcessingStats::Op::Set, (int64_t)oldestPossingTimeslice});
stats.processCommandQueue();
};
callbacks.set<CallbackService::Id::DomainInfoUpdated>(domainInfoUpdated);
Expand All @@ -257,7 +258,8 @@ DataProcessorSpec CommonDataProcessors::getScheduledDummySink(std::vector<InputS
auto oldestPossingTimeslice = timesliceIndex.getOldestPossibleOutput().timeslice.value;
O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "run", "Consumed timeframes (processing) to be set to %zu.", oldestPossingTimeslice);
stats.updateStats({(int)ProcessingStatsId::CONSUMED_TIMEFRAMES, DataProcessingStats::Op::Set, (int64_t)oldestPossingTimeslice});
stats.updateStats({(int)ProcessingStatsId::TIMESLICE_OFFER_NUMBER_CONSUMED, DataProcessingStats::Op::Set, (int64_t)oldestPossingTimeslice});
stats.updateStats({(int)ProcessingStatsId::TIMESLICE_NUMBER_DONE, DataProcessingStats::Op::Set, (int64_t)oldestPossingTimeslice});
stats.processCommandQueue();
});
})},
.labels = {{"resilient"}}};
Expand All @@ -281,4 +283,36 @@ AlgorithmSpec CommonDataProcessors::wrapWithRateLimiting(AlgorithmSpec spec)
});
}

// The wrapped algorithm consumes 1 timeslice every time is invoked
AlgorithmSpec CommonDataProcessors::wrapWithTimesliceConsumption(AlgorithmSpec spec)
{
return PluginManager::wrapAlgorithm(spec, [](AlgorithmSpec::ProcessCallback& original, ProcessingContext& pcx) -> void {
original(pcx);

auto disposeResources = [](int taskId,
std::array<ComputingQuotaOffer, 32>& offers,
ComputingQuotaStats& stats,
std::function<void(ComputingQuotaOffer const&, ComputingQuotaStats&)> accountDisposed) {
ComputingQuotaOffer disposed;
disposed.sharedMemory = 0;
// When invoked, we have processed one timeslice by construction.
int64_t timeslicesProcessed = 1;
for (auto& offer : offers) {
if (offer.user != taskId) {
continue;
}
int64_t toRemove = std::min((int64_t)timeslicesProcessed, offer.timeslices);
offer.timeslices -= toRemove;
timeslicesProcessed -= toRemove;
disposed.timeslices += toRemove;
if (timeslicesProcessed <= 0) {
break;
}
}
return accountDisposed(disposed, stats);
};
pcx.services().get<DeviceState>().offerConsumers.emplace_back(disposeResources);
});
}

} // namespace o2::framework
20 changes: 18 additions & 2 deletions Framework/Core/src/CommonServices.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -1080,6 +1080,14 @@ o2::framework::ServiceSpec CommonServices::dataProcessingStats()
.minPublishInterval = 0,
.maxRefreshLatency = 10000,
.sendInitialValue = true},
MetricSpec{.name = "timeslice-offer-number-consumed",
.enabled = arrowAndResourceLimitingMetrics,
.metricId = static_cast<short>(ProcessingStatsId::TIMESLICE_OFFER_NUMBER_CONSUMED),
.kind = Kind::UInt64,
.scope = Scope::DPL,
.minPublishInterval = 0,
.maxRefreshLatency = 10000,
.sendInitialValue = true},
MetricSpec{.name = "timeslices-expired",
.enabled = arrowAndResourceLimitingMetrics,
.metricId = static_cast<short>(ProcessingStatsId::TIMESLICE_NUMBER_EXPIRED),
Expand All @@ -1088,9 +1096,17 @@ o2::framework::ServiceSpec CommonServices::dataProcessingStats()
.minPublishInterval = 0,
.maxRefreshLatency = 10000,
.sendInitialValue = true},
MetricSpec{.name = "timeslices-consumed",
MetricSpec{.name = "timeslices-started",
.enabled = arrowAndResourceLimitingMetrics,
.metricId = static_cast<short>(ProcessingStatsId::TIMESLICE_OFFER_NUMBER_CONSUMED),
.metricId = static_cast<short>(ProcessingStatsId::TIMESLICE_NUMBER_STARTED),
.kind = Kind::UInt64,
.scope = Scope::DPL,
.minPublishInterval = 0,
.maxRefreshLatency = 10000,
.sendInitialValue = true},
MetricSpec{.name = "timeslices-done",
.enabled = arrowAndResourceLimitingMetrics,
.metricId = static_cast<short>(ProcessingStatsId::TIMESLICE_NUMBER_DONE),
.kind = Kind::UInt64,
.scope = Scope::DPL,
.minPublishInterval = 0,
Expand Down
15 changes: 9 additions & 6 deletions Framework/Core/src/ComputingQuotaEvaluator.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ void ComputingQuotaEvaluator::dispose(int taskId)
void ComputingQuotaEvaluator::updateOffers(std::vector<ComputingQuotaOffer>& pending, uint64_t now)
{
O2_SIGNPOST_ID_GENERATE(oid, quota);
O2_SIGNPOST_START(quota, oid, "updateOffers", "Starting to processe received offers");
O2_SIGNPOST_START(quota, oid, "updateOffers", "Starting to process %zu received offers", pending.size());
int lastValid = -1;
for (size_t oi = 0; oi < mOffers.size(); oi++) {
auto& storeOffer = mOffers[oi];
Expand Down Expand Up @@ -283,7 +283,9 @@ void ComputingQuotaEvaluator::updateOffers(std::vector<ComputingQuotaOffer>& pen
lastValidOffer.runtime = std::max(lastValidOffer.runtime, stillPending.runtime);
}
pending.clear();
O2_SIGNPOST_END(quota, oid, "updateOffers", "Remaining offers cohalesced to %d", lastValid);
auto& updatedOffer = mOffers[lastValid];
O2_SIGNPOST_END(quota, oid, "updateOffers", "Remaining offers cohalesced to %d. New values: Cpu%d, Shared Memory %lli, Timeslices %lli",
lastValid, updatedOffer.cpu, updatedOffer.sharedMemory, updatedOffer.timeslices);
}

void ComputingQuotaEvaluator::handleExpired(std::function<void(ComputingQuotaOffer const&, ComputingQuotaStats const& stats)> expirator)
Expand All @@ -304,8 +306,8 @@ void ComputingQuotaEvaluator::handleExpired(std::function<void(ComputingQuotaOff
for (auto& ref : mExpiredOffers) {
auto& offer = mOffers[ref.index];
O2_SIGNPOST_ID_FROM_POINTER(oid, quota, (void*)(int64_t)(ref.index * 8));
if (offer.sharedMemory < 0) {
O2_SIGNPOST_END(quota, oid, "handleExpired", "Offer %d does not have any more memory. Marking it as invalid.", ref.index);
if (offer.sharedMemory < 0 && offer.timeslices < 0) {
O2_SIGNPOST_END(quota, oid, "handleExpired", "Offer %d does not have any more resources. Marking it as invalid.", ref.index);
offer.valid = false;
offer.score = OfferScore::Unneeded;
continue;
Expand All @@ -314,13 +316,14 @@ void ComputingQuotaEvaluator::handleExpired(std::function<void(ComputingQuotaOff
// api.
O2_SIGNPOST_END(quota, oid, "handleExpired", "Offer %d expired. Giving back %llu MB, %d cores and %llu timeslices",
ref.index, offer.sharedMemory / 1000000, offer.cpu, offer.timeslices);
assert(offer.sharedMemory >= 0);
mStats.totalExpiredBytes += offer.sharedMemory;
mStats.totalExpiredBytes += std::max<int64_t>(offer.sharedMemory, 0);
mStats.totalExpiredTimeslices += std::max<int64_t>(offer.timeslices, 0);
mStats.totalExpiredOffers++;
expirator(offer, mStats);
// driverClient.tell("expired shmem {}", offer.sharedMemory);
// driverClient.tell("expired cpu {}", offer.cpu);
offer.sharedMemory = -1;
offer.timeslices = -1;
offer.valid = false;
offer.score = OfferScore::Unneeded;
}
Expand Down
Loading
Loading