Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 24 additions & 4 deletions Framework/CCDBSupport/src/CCDBFetcherHelper.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,20 @@ void CCDBFetcherHelper::initialiseHelper(CCDBFetcherHelper& helper, ConfigParamR
auto defHost = options.get<std::string>("condition-backend");
auto checkRate = options.get<int>("condition-tf-per-query");
auto checkMult = options.get<int>("condition-tf-per-query-multiplier");
helper.useTFSlice = options.get<int>("condition-use-slice-for-prescaling");
helper.timeToleranceMS = options.get<int64_t>("condition-time-tolerance");
helper.queryPeriodGlo = checkRate > 0 ? checkRate : std::numeric_limits<int>::max();
helper.queryPeriodFactor = checkMult > 0 ? checkMult : 1;
LOGP(info, "CCDB Backend at: {}, validity check for every {} TF{}", defHost, helper.queryPeriodGlo, helper.queryPeriodFactor == 1 ? std::string{} : fmt::format(", (query for high-rate objects downscaled by {})", helper.queryPeriodFactor));
helper.queryPeriodFactor = checkMult == 0 ? 1 : checkMult;
std::string extraCond{};
if (helper.useTFSlice) {
extraCond = ". Use TFSlice";
if (helper.useTFSlice > 0) {
extraCond += fmt::format(" + max TFcounter jump <= {}", helper.useTFSlice);
}
}
LOGP(info, "CCDB Backend at: {}, validity check for every {} TF{}{}", defHost, helper.queryPeriodGlo,
helper.queryPeriodFactor == 1 ? std::string{} : (helper.queryPeriodFactor > 0 ? fmt::format(", (query for high-rate objects downscaled by {})", helper.queryPeriodFactor) : fmt::format(", (query downscaled as TFcounter%{})", -helper.queryPeriodFactor)),
extraCond);
LOGP(info, "Hook to enable signposts for CCDB messages at {}", (void*)&private_o2_log_ccdb->stacktrace);
auto remapString = options.get<std::string>("condition-remap");
ParserResult result = parseRemappings(remapString.c_str());
Expand Down Expand Up @@ -205,12 +215,21 @@ auto CCDBFetcherHelper::populateCacheWith(std::shared_ptr<CCDBFetcherHelper> con
// If timestamp is before the time the element was cached or after the claimed validity, we need to check validity, again
// when online.
bool cacheExpired = (validUntil <= timestampToUse) || (op.timestamp < cachePopulatedAt);
checkValidity = (std::abs(int(timingInfo.tfCounter - url2uuid->second.lastCheckedTF)) >= chRate) && (isOnline || cacheExpired);
if (isOnline || cacheExpired) {
if (!helper->useTFSlice) {
checkValidity = chRate > 0 ? (std::abs(int(timingInfo.tfCounter - url2uuid->second.lastCheckedTF)) >= chRate) : (timingInfo.tfCounter % -chRate) == 0;
} else {
checkValidity = chRate > 0 ? (std::abs(int(timingInfo.timeslice - url2uuid->second.lastCheckedSlice)) >= chRate) : (timingInfo.timeslice % -chRate) == 0;
if (!checkValidity && helper->useTFSlice > std::abs(chRate)) { // make sure the interval is tolerated unless the check rate itself is too large
checkValidity = std::abs(int(timingInfo.tfCounter) - url2uuid->second.lastCheckedTF) > helper->useTFSlice;
}
}
}
} else {
checkValidity = true; // never skip check if the cache is empty
}

O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "checkValidity is %{public}s for tfID %d of %{public}s", checkValidity ? "true" : "false", timingInfo.tfCounter, path.data());
O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "checkValidity is %{public}s for tf%{public}s %d of %{public}s", checkValidity ? "true" : "false", helper->useTFSlice ? "ID" : "Slice", helper->useTFSlice ? timingInfo.timeslice : timingInfo.tfCounter, path.data());

const auto& api = helper->getAPI(path);
if (checkValidity && (!api.isSnapshotMode() || etag.empty())) { // in the snapshot mode the object needs to be fetched only once
Expand All @@ -226,6 +245,7 @@ auto CCDBFetcherHelper::populateCacheWith(std::shared_ptr<CCDBFetcherHelper> con
LOGP(detail, "******** Default entry used for {} ********", path);
}
helper->mapURL2UUID[path].lastCheckedTF = timingInfo.tfCounter;
helper->mapURL2UUID[path].lastCheckedSlice = timingInfo.timeslice;
if (etag.empty()) {
helper->mapURL2UUID[path].etag = headers["ETag"]; // update uuid
helper->mapURL2UUID[path].cachePopulatedAt = timestampToUse;
Expand Down
2 changes: 2 additions & 0 deletions Framework/CCDBSupport/src/CCDBFetcherHelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ struct CCDBFetcherHelper {
size_t minSize = -1ULL;
size_t maxSize = 0;
int lastCheckedTF = 0;
int lastCheckedSlice = 0;
};

struct RemapMatcher {
Expand Down Expand Up @@ -94,6 +95,7 @@ struct CCDBFetcherHelper {
int queryPeriodGlo = 1;
int queryPeriodFactor = 1;
int64_t timeToleranceMS = 5000;
int useTFSlice = 0; // if non-zero, use TFslice instead of TFcounter for the validity check. If > requested checking rate, add additional check on |lastTFchecked - TCcounter|<=useTFSlice

o2::ccdb::CcdbApi& getAPI(const std::string& path);
static void initialiseHelper(CCDBFetcherHelper& helper, ConfigParamRegistry const& options);
Expand Down
41 changes: 32 additions & 9 deletions Framework/CCDBSupport/src/CCDBHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ struct CCDBFetcherHelper {
size_t minSize = -1ULL;
size_t maxSize = 0;
int lastCheckedTF = 0;
int lastCheckedSlice = 0;
};

struct RemapMatcher {
Expand All @@ -60,6 +61,7 @@ struct CCDBFetcherHelper {
int queryPeriodGlo = 1;
int queryPeriodFactor = 1;
int64_t timeToleranceMS = 5000;
int useTFSlice = 0; // if non-zero, use TFslice instead of TFcounter for the validity check. If > requested checking rate, add additional check on |lastTFchecked - TCcounter|<=useTFSlice

o2::ccdb::CcdbApi& getAPI(const std::string& path)
{
Expand Down Expand Up @@ -165,10 +167,20 @@ void initialiseHelper(CCDBFetcherHelper& helper, ConfigParamRegistry const& opti
auto defHost = options.get<std::string>("condition-backend");
auto checkRate = options.get<int>("condition-tf-per-query");
auto checkMult = options.get<int>("condition-tf-per-query-multiplier");
helper.useTFSlice = options.get<int>("condition-use-slice-for-prescaling");
helper.timeToleranceMS = options.get<int64_t>("condition-time-tolerance");
helper.queryPeriodGlo = checkRate > 0 ? checkRate : std::numeric_limits<int>::max();
helper.queryPeriodFactor = checkMult > 0 ? checkMult : 1;
LOGP(info, "CCDB Backend at: {}, validity check for every {} TF{}", defHost, helper.queryPeriodGlo, helper.queryPeriodFactor == 1 ? std::string{} : fmt::format(", (query for high-rate objects downscaled by {})", helper.queryPeriodFactor));
helper.queryPeriodFactor = checkMult == 0 ? 1 : checkMult;
std::string extraCond{};
if (helper.useTFSlice) {
extraCond = ". Use TFSlice";
if (helper.useTFSlice > 0) {
extraCond += fmt::format(" + max TFcounter jump <= {}", helper.useTFSlice);
}
}
LOGP(info, "CCDB Backend at: {}, validity check for every {} TF{}{}", defHost, helper.queryPeriodGlo,
helper.queryPeriodFactor == 1 ? std::string{} : (helper.queryPeriodFactor > 0 ? fmt::format(", (query for high-rate objects downscaled by {})", helper.queryPeriodFactor) : fmt::format(", (query downscaled as TFcounter%{})", -helper.queryPeriodFactor)),
extraCond);
LOGP(info, "Hook to enable signposts for CCDB messages at {}", (void*)&private_o2_log_ccdb->stacktrace);
auto remapString = options.get<std::string>("condition-remap");
CCDBHelpers::ParserResult result = CCDBHelpers::parseRemappings(remapString.c_str());
Expand Down Expand Up @@ -276,7 +288,7 @@ auto populateCacheWith(std::shared_ptr<CCDBFetcherHelper> const& helper,
O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Adding metadata %{public}s: %{public}s to the request", key.data(), value.data());
metadata[key] = value;
} else if (meta.name == "ccdb-query-rate") {
chRate = meta.defaultValue.get<int>() * helper->queryPeriodFactor;
chRate = std::max(1, meta.defaultValue.get<int>()) * helper->queryPeriodFactor;
}
}
const auto url2uuid = helper->mapURL2UUID.find(path);
Expand All @@ -289,12 +301,21 @@ auto populateCacheWith(std::shared_ptr<CCDBFetcherHelper> const& helper,
// If timestamp is before the time the element was cached or after the claimed validity, we need to check validity, again
// when online.
bool cacheExpired = (validUntil <= timestampToUse) || (timestamp < cachePopulatedAt);
checkValidity = (std::abs(int(timingInfo.tfCounter - url2uuid->second.lastCheckedTF)) >= chRate) && (isOnline || cacheExpired);
if (isOnline || cacheExpired) {
if (!helper->useTFSlice) {
checkValidity = chRate > 0 ? (std::abs(int(timingInfo.tfCounter - url2uuid->second.lastCheckedTF)) >= chRate) : (timingInfo.tfCounter % -chRate) == 0;
} else {
checkValidity = chRate > 0 ? (std::abs(int(timingInfo.timeslice - url2uuid->second.lastCheckedSlice)) >= chRate) : (timingInfo.timeslice % -chRate) == 0;
if (!checkValidity && helper->useTFSlice > std::abs(chRate)) { // make sure the interval is tolerated unless the check rate itself is too large
checkValidity = std::abs(int(timingInfo.tfCounter) - url2uuid->second.lastCheckedTF) > helper->useTFSlice;
}
}
}
} else {
checkValidity = true; // never skip check if the cache is empty
}

O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "checkValidity is %{public}s for tfID %d of %{public}s", checkValidity ? "true" : "false", timingInfo.tfCounter, path.data());
O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "checkValidity is %{public}s for tf%{public}s %d of %{public}s", checkValidity ? "true" : "false", helper->useTFSlice ? "ID" : "Slice", helper->useTFSlice ? timingInfo.timeslice : timingInfo.tfCounter, path.data());

const auto& api = helper->getAPI(path);
if (checkValidity && (!api.isSnapshotMode() || etag.empty())) { // in the snapshot mode the object needs to be fetched only once
Expand All @@ -310,6 +331,7 @@ auto populateCacheWith(std::shared_ptr<CCDBFetcherHelper> const& helper,
LOGP(detail, "******** Default entry used for {} ********", path);
}
helper->mapURL2UUID[path].lastCheckedTF = timingInfo.tfCounter;
helper->mapURL2UUID[path].lastCheckedSlice = timingInfo.timeslice;
if (etag.empty()) {
helper->mapURL2UUID[path].etag = headers["ETag"]; // update uuid
helper->mapURL2UUID[path].cachePopulatedAt = timestampToUse;
Expand Down Expand Up @@ -382,21 +404,22 @@ AlgorithmSpec CCDBHelpers::fetchFromCCDB()
std::map<std::string, std::string> metadata;
std::map<std::string, std::string> headers;
std::string etag;
bool checkValidity = std::abs(int(timingInfo.tfCounter - helper->lastCheckedTFCounterOrbReset)) >= helper->queryPeriodGlo;
int32_t counter = helper->useTFSlice ? timingInfo.timeslice : timingInfo.tfCounter;
bool checkValidity = std::abs(int(counter - helper->lastCheckedTFCounterOrbReset)) >= helper->queryPeriodGlo;
const auto url2uuid = helper->mapURL2UUID.find(path);
if (url2uuid != helper->mapURL2UUID.end()) {
etag = url2uuid->second.etag;
} else {
checkValidity = true; // never skip check if the cache is empty
}
O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "fetchFromCCDB", "checkValidity is %{public}s for tfID %d of %{public}s",
checkValidity ? "true" : "false", timingInfo.tfCounter, path.data());
O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "fetchFromCCDB", "checkValidity is %{public}s for tf%{public}s %d of %{public}s",
checkValidity ? "true" : "false", helper->useTFSlice ? "ID" : "Slice", counter, path.data());
Output output{"CTP", "OrbitReset", 0};
Long64_t newOrbitResetTime = orbitResetTime;
auto&& v = allocator.makeVector<char>(output);
const auto& api = helper->getAPI(path);
if (checkValidity && (!api.isSnapshotMode() || etag.empty())) { // in the snapshot mode the object needs to be fetched only once
helper->lastCheckedTFCounterOrbReset = timingInfo.tfCounter;
helper->lastCheckedTFCounterOrbReset = counter;
api.loadFileToMemory(v, path, metadata, timingInfo.creation, &headers, etag, helper->createdNotAfter, helper->createdNotBefore);
if ((headers.count("Error") != 0) || (etag.empty() && v.empty())) {
LOGP(fatal, "Unable to find CCDB object {}/{}", path, timingInfo.creation);
Expand Down
6 changes: 4 additions & 2 deletions Framework/Core/src/WorkflowHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,8 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext
{"condition-not-after", VariantType::Int64, 3385078236000ll, {"do not fetch from CCDB objects created after the timestamp"}},
{"condition-remap", VariantType::String, "", {"remap condition path in CCDB based on the provided string."}},
{"condition-tf-per-query", VariantType::Int, defaultConditionQueryRate(), {"check condition validity per requested number of TFs, fetch only once if <=0"}},
{"condition-tf-per-query-multiplier", VariantType::Int, defaultConditionQueryRateMultiplier(), {"check conditions once per this amount of nominal checks"}},
{"condition-tf-per-query-multiplier", VariantType::Int, defaultConditionQueryRateMultiplier(), {"check conditions once per this amount of nominal checks (>0) or on module of TFcounter (<0)"}},
{"condition-use-slice-for-prescaling", VariantType::Int, 0, {"use TFslice instead of TFcounter to control validation frequency. If > query rate, do not allow TFCounter excursion exceeding it"}},
{"condition-time-tolerance", VariantType::Int64, 5000ll, {"prefer creation time if its difference to orbit-derived time exceeds threshold (ms), impose if <0"}},
{"orbit-offset-enumeration", VariantType::Int64, 0ll, {"initial value for the orbit"}},
{"orbit-multiplier-enumeration", VariantType::Int64, 0ll, {"multiplier to get the orbit from the counter"}},
Expand All @@ -195,7 +196,8 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext
{"condition-not-after", VariantType::Int64, 3385078236000ll, {"do not fetch from CCDB objects created after the timestamp"}},
{"condition-remap", VariantType::String, "", {"remap condition path in CCDB based on the provided string."}},
{"condition-tf-per-query", VariantType::Int, defaultConditionQueryRate(), {"check condition validity per requested number of TFs, fetch only once if <=0"}},
{"condition-tf-per-query-multiplier", VariantType::Int, defaultConditionQueryRateMultiplier(), {"check conditions once per this amount of nominal checks"}},
{"condition-tf-per-query-multiplier", VariantType::Int, defaultConditionQueryRateMultiplier(), {"check conditions once per this amount of nominal checks (>0) or on module of TFcounter (<0)"}},
{"condition-use-slice-for-prescaling", VariantType::Int, 0, {"use TFslice instead of TFcounter to control validation frequency. If > query rate, do not allow TFCounter excursion exceeding it"}},
{"condition-time-tolerance", VariantType::Int64, 5000ll, {"prefer creation time if its difference to orbit-derived time exceeds threshold (ms), impose if <0"}},
{"start-value-enumeration", VariantType::Int64, 0ll, {"initial value for the enumeration"}},
{"end-value-enumeration", VariantType::Int64, -1ll, {"final value for the enumeration"}},
Expand Down