Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 40 additions & 24 deletions Framework/Core/src/ComputingQuotaEvaluator.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,17 @@
#include "Framework/DataProcessingStats.h"
#include "Framework/ServiceRegistryRef.h"
#include "Framework/DeviceState.h"
#include "Framework/DriverClient.h"
#include "Framework/Monitoring.h"
#include "Framework/Logger.h"
#include "Framework/Signpost.h"
#include <Monitoring/Monitoring.h>

#include <vector>
#include <uv.h>
#include <cassert>
#include <fmt/core.h>
#include <fmt/format.h>
#include <fmt/ranges.h>

#define LOGLEVEL debug

O2_DECLARE_DYNAMIC_LOG(quota);

namespace o2::framework
{
Expand Down Expand Up @@ -64,6 +64,8 @@ struct QuotaEvaluatorStats {

bool ComputingQuotaEvaluator::selectOffer(int task, ComputingQuotaRequest const& selector, uint64_t now)
{
O2_SIGNPOST_ID_GENERATE(qid, quota);

auto selectOffer = [&offers = this->mOffers, &infos = this->mInfos, task](int ref, uint64_t now) {
auto& selected = offers[ref];
auto& info = infos[ref];
Expand All @@ -89,28 +91,36 @@ bool ComputingQuotaEvaluator::selectOffer(int task, ComputingQuotaRequest const&
// LOG(LOGLEVEL) << "No particular resource was requested, so we schedule task anyways";
return enough;
}
O2_SIGNPOST_ID_GENERATE(sid, quota);
if (enough) {
LOGP(LOGLEVEL, "{} offers were selected for a total of: cpu {}, memory {}, shared memory {}", result.size(), totalOffer.cpu, totalOffer.memory, totalOffer.sharedMemory);
//LOG(LOGLEVEL) << " The following offers were selected for computation: {} " << fmt::join(result, ", ");
O2_SIGNPOST_START(quota, sid, "summary", "%zu offers were selected for a total of: cpu %d, memory %lli, shared memory %lli",
result.size(), totalOffer.cpu, totalOffer.memory, totalOffer.sharedMemory);
for (auto& offer : result) {
// We pretend each offer id is a pointer, to have a unique id.
O2_SIGNPOST_ID_FROM_POINTER(oid, quota, (void*)(int64_t)(offer*8));
O2_SIGNPOST_START(quota, oid, "offers", "Offer %d has been selected.", offer);
}
dpStats.updateStats({static_cast<short>(ProcessingStatsId::RESOURCES_SATISFACTORY), DataProcessingStats::Op::Add, 1});
} else {
O2_SIGNPOST_START(quota, sid, "summary", "Not enough resources to select offers.");
dpStats.updateStats({static_cast<short>(ProcessingStatsId::RESOURCES_MISSING), DataProcessingStats::Op::Add, 1});
if (result.size()) {
dpStats.updateStats({static_cast<short>(ProcessingStatsId::RESOURCES_INSUFFICIENT), DataProcessingStats::Op::Add, 1});
}
}
if (stats.invalidOffers.size()) {
// LOGP(LOGLEVEL, " The following offers were invalid: {}", fmt::join(stats.invalidOffers, ", "));
O2_SIGNPOST_EVENT_EMIT(quota, sid, "summary", "The following offers were invalid: %s", fmt::format("{}", fmt::join(stats.invalidOffers, ", ")).c_str());
}
if (stats.otherUser.size()) {
// LOGP(LOGLEVEL, " The following offers were owned by other users: {}", fmt::join(stats.otherUser, ", "));
O2_SIGNPOST_EVENT_EMIT(quota, sid, "summary", "The following offers were owned by other users: %s", fmt::format("{}", fmt::join(stats.otherUser, ", ")).c_str());
}
if (stats.expired.size()) {
// LOGP(LOGLEVEL, " The following offers are expired: {}", fmt::join(stats.expired, ", "));
O2_SIGNPOST_EVENT_EMIT(quota, sid, "summary", "The following offers are expired: %s", fmt::format("{}", fmt::join(stats.expired, ", ")).c_str());
}
if (stats.unexpiring.size() > 1) {
// LOGP(LOGLEVEL, " The following offers will never expire: {}", fmt::join(stats.unexpiring, ", "));
O2_SIGNPOST_EVENT_EMIT(quota, sid, "summary", "The following offers will never expire: %s", fmt::format("{}", fmt::join(stats.unexpiring, ", ")).c_str());
}
O2_SIGNPOST_END(quota, sid, "summary", "Done selecting offers.");

return enough;
};
Expand Down Expand Up @@ -139,16 +149,18 @@ bool ComputingQuotaEvaluator::selectOffer(int task, ComputingQuotaRequest const&
if (offer.runtime < 0) {
stats.unexpiring.push_back(i);
} else if (offer.runtime + info.received < now) {
LOGP(LOGLEVEL, "Offer {} expired since {} milliseconds and holds {}MB", i, now - offer.runtime - info.received, offer.sharedMemory / 1000000);
O2_SIGNPOST_EVENT_EMIT(quota, qid, "select", "Offer %d expired since %llu milliseconds and holds %llu MB",
i, now - offer.runtime - info.received, offer.sharedMemory / 1000000);
mExpiredOffers.push_back(ComputingQuotaOfferRef{i});
stats.expired.push_back(i);
continue;
} else {
LOGP(LOGLEVEL, "Offer {} still valid for {} milliseconds, providing {}MB", i, offer.runtime + info.received - now, offer.sharedMemory / 1000000);
O2_SIGNPOST_EVENT_EMIT(quota, qid, "select", "Offer %d still valid for %llu milliseconds, providing %llu MB",
i, offer.runtime + info.received - now, offer.sharedMemory / 1000000);
if (minValidity == 0) {
minValidity = offer.runtime + info.received - now;
}
minValidity = std::min(minValidity,(int64_t)(offer.runtime + info.received - now));
minValidity = std::min(minValidity, (int64_t)(offer.runtime + info.received - now));
}
/// We then check if the offer is suitable
assert(offer.sharedMemory >= 0);
Expand Down Expand Up @@ -177,11 +189,10 @@ bool ComputingQuotaEvaluator::selectOffer(int task, ComputingQuotaRequest const&
}

if (minValidity != 0) {
LOGP(LOGLEVEL, "Next offer to expire in {} milliseconds", minValidity);
O2_SIGNPOST_EVENT_EMIT(quota, qid, "select", "Next offer to expire in %llu milliseconds", minValidity);
uv_timer_start(mTimer, [](uv_timer_t* handle) {
LOGP(LOGLEVEL, "Offer should be expired by now, checking again");
},
minValidity + 100, 0);
O2_SIGNPOST_ID_GENERATE(tid, quota);
O2_SIGNPOST_EVENT_EMIT(quota, tid, "select", "Offer should be expired by now, checking again."); }, minValidity + 100, 0);
}
// If we get here it means we never got enough offers, so we return false.
return summarizeWhatHappended(enough, stats.selectedOffers, accumulated, stats);
Expand Down Expand Up @@ -213,6 +224,8 @@ void ComputingQuotaEvaluator::dispose(int taskId)
continue;
}
if (offer.sharedMemory <= 0) {
O2_SIGNPOST_ID_FROM_POINTER(oid, quota, (void*)(int64_t)(oi*8));
O2_SIGNPOST_END(quota, oid, "offers", "Offer %d back to not needed.", oi);
offer.valid = false;
offer.score = OfferScore::Unneeded;
}
Expand Down Expand Up @@ -242,34 +255,37 @@ void ComputingQuotaEvaluator::updateOffers(std::vector<ComputingQuotaOffer>& pen
void ComputingQuotaEvaluator::handleExpired(std::function<void(ComputingQuotaOffer const&, ComputingQuotaStats const& stats)> expirator)
{
static int nothingToDoCount = mExpiredOffers.size();
O2_SIGNPOST_ID_GENERATE(qid, quota);
if (mExpiredOffers.size()) {
LOGP(LOGLEVEL, "Handling {} expired offers", mExpiredOffers.size());
O2_SIGNPOST_EVENT_EMIT(quota, qid, "handleExpired", "Handling %zu expired offers", mExpiredOffers.size());
nothingToDoCount = 0;
} else {
if (nothingToDoCount == 0) {
nothingToDoCount++;
LOGP(LOGLEVEL, "No expired offers");
O2_SIGNPOST_EVENT_EMIT(quota, qid, "handleExpired", "No expired offers");
}
}
/// Whenever an offer is expired, we give back the resources
/// to the driver.
for (auto& ref : mExpiredOffers) {
auto& offer = mOffers[ref.index];
O2_SIGNPOST_ID_FROM_POINTER(oid, quota, (void*)(int64_t)(ref.index*8));
if (offer.sharedMemory < 0) {
LOGP(LOGLEVEL, "Offer {} does not have any more memory. Marking it as invalid.", ref.index);
O2_SIGNPOST_END(quota, oid, "handleExpired", "Offer %d does not have any more memory. Marking it as invalid.", ref.index);
offer.valid = false;
offer.score = OfferScore::Unneeded;
continue;
}
// FIXME: offers should go through the driver client, not the monitoring
// api.
LOGP(LOGLEVEL, "Offer {} expired. Giving back {}MB and {} cores", ref.index, offer.sharedMemory / 1000000, offer.cpu);
O2_SIGNPOST_END(quota, oid, "handleExpired", "Offer %d expired. Giving back %llu MB and %d cores",
ref.index, offer.sharedMemory / 1000000, offer.cpu);
assert(offer.sharedMemory >= 0);
mStats.totalExpiredBytes += offer.sharedMemory;
mStats.totalExpiredOffers++;
expirator(offer, mStats);
//driverClient.tell("expired shmem {}", offer.sharedMemory);
//driverClient.tell("expired cpu {}", offer.cpu);
// driverClient.tell("expired shmem {}", offer.sharedMemory);
// driverClient.tell("expired cpu {}", offer.cpu);
offer.sharedMemory = -1;
offer.valid = false;
offer.score = OfferScore::Unneeded;
Expand Down
Loading