From e26ee5258f25b29f3a35c432a196093a57a6ad2e Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Thu, 21 Aug 2025 09:14:53 +0200 Subject: [PATCH 1/2] WIP DPL Analysis: centralised CCDB support in analysis Thanks to the newly added binary view columns we can finally support proper CCDB integration in analysis. In order to do so, the user needs to create a TIMESTAMPED table, i.e. a table which is an extension of another one where the timestamps for each rows are provided. The extra columns of such timestamped table will be CCDB columns where the iterator of each provides access for one specified CCDB object. --- Framework/CCDBSupport/CMakeLists.txt | 21 +- .../CCDBSupport/src/AnalysisCCDBHelpers.cxx | 202 +++++++++++++ .../CCDBSupport/src/AnalysisCCDBHelpers.h | 25 ++ .../CCDBSupport/src/CCDBFetcherHelper.cxx | 278 ++++++++++++++++++ Framework/CCDBSupport/src/CCDBFetcherHelper.h | 109 +++++++ Framework/CCDBSupport/src/Plugin.cxx | 9 + .../CCDBSupport/test/test_CCDBHelpers.cxx | 52 ++-- Framework/Core/include/Framework/ASoA.h | 91 ++++++ .../Core/include/Framework/AnalysisContext.h | 10 +- .../Core/include/Framework/AnalysisHelpers.h | 38 ++- .../Framework/AnalysisSupportHelpers.h | 5 + .../Core/include/Framework/AnalysisTask.h | 10 +- Framework/Core/src/ASoA.cxx | 63 ++++ Framework/Core/src/AnalysisSupportHelpers.cxx | 29 ++ Framework/Core/src/ArrowSupport.cxx | 29 ++ Framework/Core/src/WorkflowHelpers.cxx | 35 +++ Framework/TestWorkflows/CMakeLists.txt | 5 + .../TestWorkflows/src/o2TestAnalysisCCDB.cxx | 69 +++++ 18 files changed, 1039 insertions(+), 41 deletions(-) create mode 100644 Framework/CCDBSupport/src/AnalysisCCDBHelpers.cxx create mode 100644 Framework/CCDBSupport/src/AnalysisCCDBHelpers.h create mode 100644 Framework/CCDBSupport/src/CCDBFetcherHelper.cxx create mode 100644 Framework/CCDBSupport/src/CCDBFetcherHelper.h create mode 100644 Framework/TestWorkflows/src/o2TestAnalysisCCDB.cxx diff --git a/Framework/CCDBSupport/CMakeLists.txt b/Framework/CCDBSupport/CMakeLists.txt index e4310ac5e0ec5..ed898fb3114aa 100644 --- a/Framework/CCDBSupport/CMakeLists.txt +++ b/Framework/CCDBSupport/CMakeLists.txt @@ -1,4 +1,4 @@ -# Copyright 2019-2020 CERN and copyright holders of ALICE O2. +# Copyright 2019-2025 CERN and copyright holders of ALICE O2. # See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. # All rights not expressly granted are reserved. # @@ -9,14 +9,21 @@ # granted to it by virtue of its status as an Intergovernmental Organization # or submit itself to any jurisdiction. o2_add_library(FrameworkCCDBSupport - SOURCES + SOURCES src/Plugin.cxx + src/CCDBFetcherHelper.cxx src/CCDBHelpers.cxx + src/AnalysisCCDBHelpers.cxx PRIVATE_INCLUDE_DIRECTORIES ${CMAKE_CURRENT_LIST_DIR}/src PUBLIC_LINK_LIBRARIES O2::Framework O2::CCDB) -o2_add_test(CCDBHelpers NAME test_Framework_test_CCDBHelpers - SOURCES test/test_CCDBHelpers.cxx - COMPONENT_NAME Framework - LABELS framework - PUBLIC_LINK_LIBRARIES O2::Framework O2::FrameworkCCDBSupport) +add_executable(o2-test-framework-ccdbsupport + test/test_CCDBHelpers.cxx) +target_link_libraries(o2-test-framework-ccdbsupport PRIVATE O2::Framework) +target_link_libraries(o2-test-framework-ccdbsupport PRIVATE O2::FrameworkCCDBSupport) +target_link_libraries(o2-test-framework-ccdbsupport PRIVATE O2::Catch2) + +get_filename_component(outdir ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/../tests ABSOLUTE) +set_property(TARGET o2-test-framework-ccdbsupport PROPERTY RUNTIME_OUTPUT_DIRECTORY ${outdir}) + +add_test(NAME framework:ccdbsupport COMMAND o2-test-framework-ccdbsupport --skip-benchmarks) diff --git a/Framework/CCDBSupport/src/AnalysisCCDBHelpers.cxx b/Framework/CCDBSupport/src/AnalysisCCDBHelpers.cxx new file mode 100644 index 0000000000000..1f655384eabaf --- /dev/null +++ b/Framework/CCDBSupport/src/AnalysisCCDBHelpers.cxx @@ -0,0 +1,202 @@ +// Copyright 2019-2025 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +#include "AnalysisCCDBHelpers.h" +#include "CCDBFetcherHelper.h" +#include "Framework/DeviceSpec.h" +#include "Framework/TimingInfo.h" +#include "Framework/ConfigParamRegistry.h" +#include "Framework/DataTakingContext.h" +#include "Framework/RawDeviceService.h" +#include "Framework/Output.h" +#include "Framework/Signpost.h" +#include "Framework/AnalysisContext.h" +#include "Framework/ConfigContext.h" +#include "Framework/ConfigContext.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +O2_DECLARE_DYNAMIC_LOG(ccdb); + +namespace o2::framework +{ +// Fill valid routes. Notice that for analysis the timestamps are associated to +// a TIM table and there might be multiple CCDB objects of the same kind for +// dataframe. +// For this reason rather than matching the Lifetime::Condition, we match the +// origin. +namespace +{ +void fillValidRoutes(CCDBFetcherHelper& helper, std::vector const& outputRoutes, std::unordered_map& bindings) +{ + for (auto& route : outputRoutes) { + auto originMatcher = DataSpecUtils::asConcreteDataMatcher(route.matcher); + if (originMatcher.origin != header::DataOrigin{"TIM"}) { + continue; + } + auto specStr = DataSpecUtils::describe(route.matcher); + if (bindings.find(specStr) != bindings.end()) { + continue; + } + std::cout << specStr << " is at " << helper.routes.size() << std::endl; + bindings[specStr] = helper.routes.size(); + helper.routes.push_back(route); + LOGP(info, "The following route needs condition objects {} ", DataSpecUtils::describe(route.matcher)); + for (auto& metadata : route.matcher.metadata) { + if (metadata.type == VariantType::String) { + LOGP(info, "- {}: {}", metadata.name, metadata.defaultValue.asString()); + } + } + } +} +} // namespace + +AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& ctx) +{ + auto& ac = ctx.services().get(); + std::vector> schemas; + auto schemaMetadata = std::make_shared(); + + for (auto& input : ac.analysisCCDBInputs) { + std::vector> fields; + std::cout << input << std::endl; + schemaMetadata->Append("outputRoute", DataSpecUtils::describe(input)); + schemaMetadata->Append("outputBinding", input.binding); + + for (auto& m : input.metadata) { + // Save the list of input tables + if (m.name.starts_with("input:")) { + auto name = m.name.substr(6); + schemaMetadata->Append("sourceTable", name); + std::cout << "sourceTable:" << name << " " << m.defaultValue.asString() << std::endl; + continue; + } + // Ignore the non ccdb: entries + if (!m.name.starts_with("ccdb:")) { + continue; + } + // Create the schema of the output + std::cout << m.name << " " << m.defaultValue.asString() << std::endl; + + auto metadata = std::make_shared(); + metadata->Append("url", m.defaultValue.asString()); + std::cout << "url" << " " << m.defaultValue.asString() << std::endl; + auto columnName = m.name.substr(strlen("ccdb:")); + fields.emplace_back(std::make_shared(columnName, arrow::binary_view(), false, metadata)); + } + schemas.emplace_back(std::make_shared(fields, schemaMetadata)); + } + return adaptStateful([schemas](CallbackService& callbacks, ConfigParamRegistry const& options, DeviceSpec const& spec) { + std::shared_ptr helper = std::make_shared(); + CCDBFetcherHelper::initialiseHelper(*helper, options); + std::unordered_map bindings; + fillValidRoutes(*helper, spec.outputs, bindings); + + return adaptStateless([schemas, bindings, helper](InputRecord& inputs, DataTakingContext& dtc, DataAllocator& allocator, TimingInfo& timingInfo) { + std::cout << " Also executed" << std::endl; + O2_SIGNPOST_ID_GENERATE(sid, ccdb); + O2_SIGNPOST_START(ccdb, sid, "fetchFromAnalysisCCDB", "Fetching CCDB objects for analysis%" PRIu64, (uint64_t)timingInfo.timeslice); + for (auto& schema : schemas) { + std::vector ops; + auto inputBinding = *schema->metadata()->Get("sourceTable"); + auto outRouteDesc = *schema->metadata()->Get("outputRoute"); + std::string outBinding = *schema->metadata()->Get("outputBinding"); + O2_SIGNPOST_EVENT_EMIT_INFO(ccdb, sid, "fetchFromAnalysisCCDB", + "Fetching CCDB objects for %{public}s's columns with timestamps from %{public}s and putting them in route %{public}s", + outBinding.c_str(), inputBinding.c_str(), outRouteDesc.c_str()); + auto ref = inputs.get(inputBinding); + auto table = ref->asArrowTable(); + std::cout << table->ToString() << std::endl; + // FIXME: make the fTimestamp column configurable. + auto timestampColumn = table->GetColumnByName("fTimestamp"); + O2_SIGNPOST_EVENT_EMIT_INFO(ccdb, sid, "fetchFromAnalysisCCDB", + "There are %zu bindings available", bindings.size()); + for (auto& binding : bindings) { + O2_SIGNPOST_EVENT_EMIT_INFO(ccdb, sid, "fetchFromAnalysisCCDB", + "* %{public}s: %d", + binding.first.c_str(), binding.second); + } + std::cout << " output route is " << outRouteDesc << std::endl; + std::cout << " output binding is " << outBinding << std::endl; + int outputRouteIndex = bindings.at(outRouteDesc); + std::cout << " Index for output is " << outputRouteIndex << std::endl; + auto& spec = helper->routes[outputRouteIndex].matcher; + std::vector> builders; + for (auto &_ : schema->fields()) { + builders.emplace_back(std::make_shared()); + } + + for (size_t ci = 0; ci < timestampColumn->num_chunks(); ++ci) { + std::shared_ptr chunk = timestampColumn->chunk(ci); + auto const* timestamps = chunk->data()->GetValuesSafe(1); + + for (int64_t ri = 0; ri < chunk->data()->length; ri++) { + ops.clear(); + int64_t timestamp = timestamps[ri]; + for (auto& field : schema->fields()) { + auto url = *field->metadata()->Get("url"); + std::cout << "Fetching " << field->name() << " from " << url << "/" << timestamp << std::endl; + // Time to actually populate the blob + ops.push_back({ + .spec = spec, + .url = url, + .timestamp = timestamp, + .runNumber = 1, + .runDependent = 0, + .queryRate = 0, + }); + } + auto responses = CCDBFetcherHelper::populateCacheWith(helper, ops, timingInfo, dtc, allocator); + O2_SIGNPOST_START(ccdb, sid, "handlingResponses", + "Got %zu responses from server.", + responses.size()); + if (builders.size() != responses.size()) { + LOGP(fatal, "Not enough responses (expected {}, found {})", builders.size(), responses.size()); + } + arrow::Status result; + for (size_t bi = 0; bi < responses.size(); bi++) { + auto &builder = builders[bi]; + auto &response = responses[bi]; + char const* address = reinterpret_cast(response.id.value); + std::cout << "Inserting view (" << (void*)address << "," << response.size << ")" << std::endl; + result &= builder->Append(std::string_view(address, response.size)); + } + if (!result.ok()) { + LOGP(fatal, "Error adding results from CCDB"); + } + O2_SIGNPOST_END(ccdb, sid, "handlingResponses", "Done processing responses"); + } + } + arrow::ArrayVector arrays; + for (auto &builder : builders) { + arrays.push_back(*builder->Finish()); + } + auto outTable = arrow::Table::Make(schema, arrays); + auto concrete = DataSpecUtils::asConcreteDataMatcher(spec); + allocator.adopt(Output{concrete.origin, concrete.description, concrete.subSpec}, outTable); + } + + O2_SIGNPOST_END(ccdb, sid, "fetchFromAnalysisCCDB", "Fetching CCDB objects"); + }); + }); +} + +} // namespace o2::framework diff --git a/Framework/CCDBSupport/src/AnalysisCCDBHelpers.h b/Framework/CCDBSupport/src/AnalysisCCDBHelpers.h new file mode 100644 index 0000000000000..1871bb1906262 --- /dev/null +++ b/Framework/CCDBSupport/src/AnalysisCCDBHelpers.h @@ -0,0 +1,25 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. +#ifndef O2_FRAMEWORK_ANALYSISCCDBHELPERS_H_ +#define O2_FRAMEWORK_ANALYSISCCDBHELPERS_H_ + +#include "Framework/AlgorithmSpec.h" + +namespace o2::framework +{ + +struct AnalysisCCDBHelpers { + static AlgorithmSpec fetchFromCCDB(ConfigContext const&ctx); +}; + +} // namespace o2::framework + +#endif // O2_FRAMEWORK_ANALYSISCCDBHELPERS_H_ diff --git a/Framework/CCDBSupport/src/CCDBFetcherHelper.cxx b/Framework/CCDBSupport/src/CCDBFetcherHelper.cxx new file mode 100644 index 0000000000000..902a57fe1588b --- /dev/null +++ b/Framework/CCDBSupport/src/CCDBFetcherHelper.cxx @@ -0,0 +1,278 @@ +// Copyright 2019-2025 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. +#include "CCDBFetcherHelper.h" +#include "Framework/DataTakingContext.h" +#include "Framework/Signpost.h" +#include "Framework/DataSpecUtils.h" +#include "Framework/ConfigParamRegistry.h" +#include +#include + +O2_DECLARE_DYNAMIC_LOG(ccdb); + +namespace o2::framework +{ + +o2::ccdb::CcdbApi& CCDBFetcherHelper::getAPI(const std::string& path) +{ + // find the first = sign in the string. If present drop everything after it + // and between it and the previous /. + auto pos = path.find('='); + if (pos == std::string::npos) { + auto entry = remappings.find(path); + return apis[entry == remappings.end() ? "" : entry->second]; + } + auto pos2 = path.rfind('/', pos); + if (pos2 == std::string::npos || pos2 == pos - 1 || pos2 == 0) { + throw runtime_error_f("Malformed path %s", path.c_str()); + } + auto entry = remappings.find(path.substr(0, pos2)); + return apis[entry == remappings.end() ? "" : entry->second]; +} + +namespace { +bool isOnlineRun(DataTakingContext const& dtc) +{ + return dtc.deploymentMode == DeploymentMode::OnlineAUX || dtc.deploymentMode == DeploymentMode::OnlineDDS || dtc.deploymentMode == DeploymentMode::OnlineECS; +} +} + +void CCDBFetcherHelper::initialiseHelper(CCDBFetcherHelper& helper, ConfigParamRegistry const& options) +{ + auto defHost = options.get("condition-backend"); + auto checkRate = options.get("condition-tf-per-query"); + auto checkMult = options.get("condition-tf-per-query-multiplier"); + helper.timeToleranceMS = options.get("condition-time-tolerance"); + helper.queryPeriodGlo = checkRate > 0 ? checkRate : std::numeric_limits::max(); + helper.queryPeriodFactor = checkMult > 0 ? checkMult : 1; + LOGP(info, "CCDB Backend at: {}, validity check for every {} TF{}", defHost, helper.queryPeriodGlo, helper.queryPeriodFactor == 1 ? std::string{} : fmt::format(", (query for high-rate objects downscaled by {})", helper.queryPeriodFactor)); + LOGP(info, "Hook to enable signposts for CCDB messages at {}", (void*)&private_o2_log_ccdb->stacktrace); + auto remapString = options.get("condition-remap"); + ParserResult result = parseRemappings(remapString.c_str()); + if (!result.error.empty()) { + throw runtime_error_f("Error while parsing remapping string %s", result.error.c_str()); + } + helper.remappings = result.remappings; + helper.apis[""].init(defHost); // default backend + LOGP(info, "Initialised default CCDB host {}", defHost); + // + for (auto& entry : helper.remappings) { // init api instances for every host seen in the remapping + if (helper.apis.find(entry.second) == helper.apis.end()) { + helper.apis[entry.second].init(entry.second); + LOGP(info, "Initialised custom CCDB host {}", entry.second); + } + LOGP(info, "{} is remapped to {}", entry.first, entry.second); + } + helper.createdNotBefore = std::to_string(options.get("condition-not-before")); + helper.createdNotAfter = std::to_string(options.get("condition-not-after")); +} + +CCDBFetcherHelper::ParserResult CCDBFetcherHelper::parseRemappings(char const* str) +{ + std::unordered_map remappings; + std::string currentUrl = ""; + + enum ParsingStates { + IN_BEGIN, + IN_BEGIN_URL, + IN_BEGIN_TARGET, + IN_END_TARGET, + IN_END_URL + }; + ParsingStates state = IN_BEGIN; + + while (true) { + switch (state) { + case IN_BEGIN: { + if (*str == 0) { + return {remappings, ""}; + } + state = IN_BEGIN_URL; + } + case IN_BEGIN_URL: { + if ((strncmp("http://", str, 7) != 0) && (strncmp("https://", str, 8) != 0 && (strncmp("file://", str, 7) != 0))) { + return {remappings, "URL should start with either http:// or https:// or file://"}; + } + state = IN_END_URL; + } break; + case IN_END_URL: { + char const* c = strchr(str, '='); + if (c == nullptr) { + return {remappings, "Expecting at least one target path, missing `='?"}; + } + if ((c - str) == 0) { + return {remappings, "Empty url"}; + } + currentUrl = std::string_view(str, c - str); + state = IN_BEGIN_TARGET; + str = c + 1; + } break; + case IN_BEGIN_TARGET: { + if (*str == 0) { + return {remappings, "Empty target"}; + } + state = IN_END_TARGET; + } break; + case IN_END_TARGET: { + char const* c = strpbrk(str, ",;"); + if (c == nullptr) { + if (remappings.count(str)) { + return {remappings, fmt::format("Path {} requested more than once.", str)}; + } + remappings[std::string(str)] = currentUrl; + return {remappings, ""}; + } + if ((c - str) == 0) { + return {remappings, "Empty target"}; + } + auto key = std::string(str, c - str); + if (remappings.count(str)) { + return {remappings, fmt::format("Path {} requested more than once.", key)}; + } + remappings[key] = currentUrl; + if (*c == ';') { + state = IN_BEGIN_URL; + } else { + state = IN_BEGIN_TARGET; + } + str = c + 1; + } break; + } + } +} + +auto CCDBFetcherHelper::populateCacheWith(std::shared_ptr const& helper, + std::vector const& ops, + TimingInfo& timingInfo, + DataTakingContext& dtc, + DataAllocator& allocator) -> std::vector +{ + int objCnt = -1; + // We use the timeslice, so that we hook into the same interval as the rest of the + // callback. + static bool isOnline = isOnlineRun(dtc); + + auto sid = _o2_signpost_id_t{(int64_t)timingInfo.timeslice}; + O2_SIGNPOST_START(ccdb, sid, "populateCacheWith", "Starting to populate cache with CCDB objects"); + std::vector responses; + for (auto& op : ops) { + int64_t timestampToUse = op.timestamp; + O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Fetching object for route %{public}s", DataSpecUtils::describe(op.spec).data()); + objCnt++; + auto concrete = DataSpecUtils::asConcreteDataMatcher(op.spec); + Output output{concrete.origin, concrete.description, concrete.subSpec}; + auto&& v = allocator.makeVector(output); + std::map metadata; + std::map headers; + std::string path = op.url; + std::string etag = ""; + int chRate = helper->queryPeriodGlo; + bool checkValidity = false; + if (op.runDependent > 0) { + if (op.runDependent == 1) { + metadata["runNumber"] = std::format("{}", op.runNumber); + } else if (op.runDependent == 2) { + timestampToUse = op.runNumber; + } else { + LOGP(fatal, "Undefined ccdb-run-dependent option {} for spec {}/{}/{}", op.runDependent, + concrete.origin.as(), concrete.description.as(), int(concrete.subSpec)); + } + } + for (auto m : op.metadata) { + O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Adding metadata %{public}s: %{public}s to the request", m.key.data(), m.value.data()); + metadata[m.key] = m.value; + } + if (op.queryRate != 0) { + chRate = op.queryRate * helper->queryPeriodFactor; + } + + const auto url2uuid = helper->mapURL2UUID.find(path); + if (url2uuid != helper->mapURL2UUID.end()) { + etag = url2uuid->second.etag; + // We check validity every chRate timeslices or if the cache is expired + uint64_t validUntil = url2uuid->second.cacheValidUntil; + // When the cache was populated. If the cache was populated after the timestamp, we need to check validity. + uint64_t cachePopulatedAt = url2uuid->second.cachePopulatedAt; + // If timestamp is before the time the element was cached or after the claimed validity, we need to check validity, again + // when online. + bool cacheExpired = (validUntil <= timestampToUse) || (op.timestamp < cachePopulatedAt); + checkValidity = (std::abs(int(timingInfo.tfCounter - url2uuid->second.lastCheckedTF)) >= chRate) && (isOnline || cacheExpired); + } else { + checkValidity = true; // never skip check if the cache is empty + } + + O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "checkValidity is %{public}s for tfID %d of %{public}s", checkValidity ? "true" : "false", timingInfo.tfCounter, path.data()); + + const auto& api = helper->getAPI(path); + if (checkValidity && (!api.isSnapshotMode() || etag.empty())) { // in the snapshot mode the object needs to be fetched only once + LOGP(detail, "Loading {} for timestamp {}", path, timestampToUse); + api.loadFileToMemory(v, path, metadata, timestampToUse, &headers, etag, helper->createdNotAfter, helper->createdNotBefore); + if ((headers.count("Error") != 0) || (etag.empty() && v.empty())) { + LOGP(fatal, "Unable to find CCDB object {}/{}", path, timestampToUse); + // FIXME: I should send a dummy message. + continue; + } + // printing in case we find a default entry + if (headers.find("default") != headers.end()) { + LOGP(detail, "******** Default entry used for {} ********", path); + } + helper->mapURL2UUID[path].lastCheckedTF = timingInfo.tfCounter; + if (etag.empty()) { + helper->mapURL2UUID[path].etag = headers["ETag"]; // update uuid + helper->mapURL2UUID[path].cachePopulatedAt = timestampToUse; + helper->mapURL2UUID[path].cacheMiss++; + helper->mapURL2UUID[path].size = v.size(); + helper->mapURL2UUID[path].minSize = std::min(v.size(), helper->mapURL2UUID[path].minSize); + helper->mapURL2UUID[path].maxSize = std::max(v.size(), helper->mapURL2UUID[path].maxSize); + auto size = v.size(); + api.appendFlatHeader(v, headers); + auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodCCDB); + helper->mapURL2DPLCache[path] = cacheId; + responses.emplace_back(Response{.id = cacheId, .size = size, .request = nullptr}); + O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Caching %{public}s for %{public}s (DPL id %" PRIu64 ", size %zu)", path.data(), headers["ETag"].data(), cacheId.value, size); + continue; + } + if (v.size()) { // but should be overridden by fresh object + // somewhere here pruneFromCache should be called + helper->mapURL2UUID[path].etag = headers["ETag"]; // update uuid + helper->mapURL2UUID[path].cachePopulatedAt = timestampToUse; + helper->mapURL2UUID[path].cacheValidUntil = headers["Cache-Valid-Until"].empty() ? 0 : std::stoul(headers["Cache-Valid-Until"]); + helper->mapURL2UUID[path].cacheMiss++; + helper->mapURL2UUID[path].size = v.size(); + helper->mapURL2UUID[path].minSize = std::min(v.size(), helper->mapURL2UUID[path].minSize); + helper->mapURL2UUID[path].maxSize = std::max(v.size(), helper->mapURL2UUID[path].maxSize); + auto size = v.size(); + api.appendFlatHeader(v, headers); + auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodCCDB); + helper->mapURL2DPLCache[path] = cacheId; + responses.emplace_back(Response{.id = cacheId, .size = size, .request = nullptr}); + O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Caching %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value); + // one could modify the adoptContainer to take optional old cacheID to clean: + // mapURL2DPLCache[URL] = ctx.outputs().adoptContainer(output, std::move(outputBuffer), DataAllocator::CacheStrategy::Always, mapURL2DPLCache[URL]); + continue; + } else { + // Only once the etag is actually used, we get the information on how long the object is valid + helper->mapURL2UUID[path].cacheValidUntil = headers["Cache-Valid-Until"].empty() ? 0 : std::stoul(headers["Cache-Valid-Until"]); + } + } + // cached object is fine + auto cacheId = helper->mapURL2DPLCache[path]; + O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Reusing %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value); + helper->mapURL2UUID[path].cacheHit++; + responses.emplace_back(Response{.id = cacheId, .size = helper->mapURL2UUID[path].size, .request = nullptr}); + allocator.adoptFromCache(output, cacheId, header::gSerializationMethodCCDB); + // the outputBuffer was not used, can we destroy it? + } + O2_SIGNPOST_END(ccdb, sid, "populateCacheWith", "Finished populating cache with CCDB objects"); + return responses; +}; + +} // namespace o2::framework diff --git a/Framework/CCDBSupport/src/CCDBFetcherHelper.h b/Framework/CCDBSupport/src/CCDBFetcherHelper.h new file mode 100644 index 0000000000000..721fadf73c582 --- /dev/null +++ b/Framework/CCDBSupport/src/CCDBFetcherHelper.h @@ -0,0 +1,109 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. +#ifndef O2_FRAMEWORK_CCDBFETCHERHELPER_H_ +#define O2_FRAMEWORK_CCDBFETCHERHELPER_H_ + +#include "Framework/OutputRoute.h" +#include "Framework/DataAllocator.h" +#include "CCDB/CcdbApi.h" +#include +#include + +namespace o2::framework +{ + +struct DataTakingContext; + +struct CCDBFetcherHelper { + struct CCDBCacheInfo { + std::string etag; + size_t cacheValidUntil = 0; + size_t cachePopulatedAt = 0; + size_t cacheMiss = 0; + size_t cacheHit = 0; + size_t size = 0L; + size_t minSize = -1ULL; + size_t maxSize = 0; + int lastCheckedTF = 0; + }; + + struct RemapMatcher { + std::string path; + }; + + struct RemapTarget { + std::string url; + }; + + struct ParserResult { + std::unordered_map remappings; + std::string error; + }; + + struct MetadataEntry { + std::string key; + std::string value; + }; + + // A fetch operation. + struct FetchOp { + // Where to put the blob + OutputSpec &spec; + // The url to fetch + std::string url = ""; + // The timestamp to use + int64_t timestamp = 0; + // The run to use + int runNumber = 0; + // Wether or not the thing is run dependent + int runDependent = 0; + // Actual metadata + std::vector metadata = {}; + // Query rate + int queryRate = 0; + }; + + // Where the data has been fetched + struct Response { + // CacheId / Pointer to the actual data + DataAllocator::CacheId id; + // The size of the buffer + size_t size = 0; + // Where to actually + FetchOp *request = nullptr; + }; + + static ParserResult parseRemappings(char const*); + + std::unordered_map mapURL2UUID; + std::unordered_map mapURL2DPLCache; + std::string createdNotBefore = "0"; + std::string createdNotAfter = "3385078236000"; + std::unordered_map apis; + std::vector routes; + std::unordered_map remappings; + uint32_t lastCheckedTFCounterOrbReset = 0; // last checkecked TFcounter for bulk check + int queryPeriodGlo = 1; + int queryPeriodFactor = 1; + int64_t timeToleranceMS = 5000; + + o2::ccdb::CcdbApi& getAPI(const std::string& path); + static void initialiseHelper(CCDBFetcherHelper& helper, ConfigParamRegistry const& options); + static auto populateCacheWith(std::shared_ptr const& helper, + std::vector const &ops, + TimingInfo& timingInfo, + DataTakingContext& dtc, + DataAllocator& allocator) -> std::vector; +}; + +} // namespace o2::framework + +#endif // O2_FRAMEWORK_CCDBFETCHERHELPER_H_ diff --git a/Framework/CCDBSupport/src/Plugin.cxx b/Framework/CCDBSupport/src/Plugin.cxx index 18aabc07ae4a4..d9083f97a023e 100644 --- a/Framework/CCDBSupport/src/Plugin.cxx +++ b/Framework/CCDBSupport/src/Plugin.cxx @@ -10,6 +10,7 @@ // or submit itself to any jurisdiction. #include "Framework/Plugins.h" #include "Framework/AlgorithmSpec.h" +#include "AnalysisCCDBHelpers.h" #include "CCDBHelpers.h" struct CCDBFetcherPlugin : o2::framework::AlgorithmPlugin { @@ -19,6 +20,14 @@ struct CCDBFetcherPlugin : o2::framework::AlgorithmPlugin { } }; +struct AnalysisCCDBFetcherPlugin : o2::framework::AlgorithmPlugin { + o2::framework::AlgorithmSpec create(o2::framework::ConfigContext const& ctx) final + { + return o2::framework::AnalysisCCDBHelpers::fetchFromCCDB(ctx); + } +}; + DEFINE_DPL_PLUGINS_BEGIN DEFINE_DPL_PLUGIN_INSTANCE(CCDBFetcherPlugin, CustomAlgorithm); +DEFINE_DPL_PLUGIN_INSTANCE(AnalysisCCDBFetcherPlugin, CustomAlgorithm); DEFINE_DPL_PLUGINS_END diff --git a/Framework/CCDBSupport/test/test_CCDBHelpers.cxx b/Framework/CCDBSupport/test/test_CCDBHelpers.cxx index df21738ddb647..53e6b66a2b30c 100644 --- a/Framework/CCDBSupport/test/test_CCDBHelpers.cxx +++ b/Framework/CCDBSupport/test/test_CCDBHelpers.cxx @@ -9,43 +9,39 @@ // granted to it by virtue of its status as an Intergovernmental Organization // or submit itself to any jurisdiction. -#define BOOST_TEST_MODULE Test Framework CCDBHelpers -#define BOOST_TEST_MAIN -#define BOOST_TEST_DYN_LINK - -#include -#include "../src/CCDBHelpers.h" +#include +#include "../src/CCDBFetcherHelper.h" using namespace o2::framework; -BOOST_AUTO_TEST_CASE(TestSorting) +TEST_CASE("TestSorting") { - auto result = CCDBHelpers::parseRemappings(""); - BOOST_CHECK_EQUAL(result.error, ""); // not an error + auto result = CCDBFetcherHelper::parseRemappings(""); + CHECK(result.error == ""); // not an error - result = CCDBHelpers::parseRemappings("https"); - BOOST_CHECK_EQUAL(result.error, "URL should start with either http:// or https:// or file://"); + result = CCDBFetcherHelper::parseRemappings("https"); + CHECK(result.error == "URL should start with either http:// or https:// or file://"); - result = CCDBHelpers::parseRemappings("https://alice.cern.ch:8000"); - BOOST_CHECK_EQUAL(result.error, "Expecting at least one target path, missing `='?"); + result = CCDBFetcherHelper::parseRemappings("https://alice.cern.ch:8000"); + CHECK(result.error == "Expecting at least one target path, missing `='?"); - result = CCDBHelpers::parseRemappings("https://alice.cern.ch:8000="); - BOOST_CHECK_EQUAL(result.error, "Empty target"); + result = CCDBFetcherHelper::parseRemappings("https://alice.cern.ch:8000="); + CHECK(result.error == "Empty target"); - result = CCDBHelpers::parseRemappings("https://alice.cern.ch:8000=/foo/bar,"); - BOOST_CHECK_EQUAL(result.error, "Empty target"); + result = CCDBFetcherHelper::parseRemappings("https://alice.cern.ch:8000=/foo/bar,"); + CHECK(result.error == "Empty target"); - result = CCDBHelpers::parseRemappings("https://alice.cern.ch:8000=/foo/bar,/foo/bar;"); - BOOST_CHECK_EQUAL(result.error, "URL should start with either http:// or https:// or file://"); + result = CCDBFetcherHelper::parseRemappings("https://alice.cern.ch:8000=/foo/bar,/foo/bar;"); + CHECK(result.error == "URL should start with either http:// or https:// or file://"); - result = CCDBHelpers::parseRemappings("https://alice.cern.ch:8000=/foo/bar,/foo/barbar;file://user/test=/foo/barr"); - BOOST_CHECK_EQUAL(result.error, ""); - BOOST_CHECK_EQUAL(result.remappings.size(), 3); - BOOST_CHECK_EQUAL(result.remappings["/foo/bar"], "https://alice.cern.ch:8000"); - BOOST_CHECK_EQUAL(result.remappings["/foo/barbar"], "https://alice.cern.ch:8000"); - BOOST_CHECK_EQUAL(result.remappings["/foo/barr"], "file://user/test"); + result = CCDBFetcherHelper::parseRemappings("https://alice.cern.ch:8000=/foo/bar,/foo/barbar;file://user/test=/foo/barr"); + CHECK(result.error == ""); + CHECK(result.remappings.size() == 3); + CHECK(result.remappings["/foo/bar"] == "https://alice.cern.ch:8000"); + CHECK(result.remappings["/foo/barbar"] == "https://alice.cern.ch:8000"); + CHECK(result.remappings["/foo/barr"] == "file://user/test"); - result = CCDBHelpers::parseRemappings("https://alice.cern.ch:8000=/foo/bar;file://user/test=/foo/bar"); - BOOST_CHECK_EQUAL(result.remappings.size(), 1); - BOOST_CHECK_EQUAL(result.error, "Path /foo/bar requested more than once."); + result = CCDBFetcherHelper::parseRemappings("https://alice.cern.ch:8000=/foo/bar;file://user/test=/foo/bar"); + CHECK(result.remappings.size() == 1); + CHECK(result.error == "Path /foo/bar requested more than once."); } diff --git a/Framework/Core/include/Framework/ASoA.h b/Framework/Core/include/Framework/ASoA.h index 6a49ed25e40d2..fd248850136a3 100644 --- a/Framework/Core/include/Framework/ASoA.h +++ b/Framework/Core/include/Framework/ASoA.h @@ -44,14 +44,18 @@ std::string cutString(std::string&& str); std::string strToUpper(std::string&& str); } // namespace o2::framework +struct TClass; + namespace o2::soa { void accessingInvalidIndexFor(const char* getter); void dereferenceWithWrongType(const char* getter, const char* target); void missingFilterDeclaration(int hash, int ai); void notBoundTable(const char* tableName); +void* extractCCDBPayload(char *payload, size_t size, TClass const* cl, const char* what); } // namespace o2::soa + namespace o2::soa { /// Generic identifier for a table type @@ -1274,6 +1278,11 @@ concept with_sources = requires { T::sources.size(); }; +template +concept with_ccdb_urls = requires { + T::ccdb_urls.size(); +}; + template concept with_base_table = not_void>::metadata::base_table_t>; @@ -2248,11 +2257,14 @@ ColumnGetterFunction getColumnGetterByLabel(const std:: namespace o2::aod { +// If you get an error about not satisfying is_origin_hash, you need to add +// an entry here. O2ORIGIN("AOD"); O2ORIGIN("AOD1"); O2ORIGIN("AOD2"); O2ORIGIN("DYN"); O2ORIGIN("IDX"); +O2ORIGIN("TIM"); O2ORIGIN("JOIN"); O2HASH("JOIN/0"); O2ORIGIN("CONC"); @@ -2313,6 +2325,48 @@ consteval static std::string_view namespace_prefix() }; \ [[maybe_unused]] static constexpr o2::framework::expressions::BindingNode _Getter_ { _Label_, _Name_::hash, o2::framework::expressions::selectArrowType<_Type_>() } +#define DECLARE_SOA_CCDB_COLUMN_FULL(_Name_, _Label_, _Getter_, _ConcreteType_, _CCDBQuery_) \ + struct _Name_ : o2::soa::Column, _Name_> { \ + static constexpr const char* mLabel = _Label_; \ + static constexpr const char* query = _CCDBQuery_; \ + static constexpr const uint32_t hash = crc32(namespace_prefix<_Name_>(), std::string_view{#_Getter_}); \ + using base = o2::soa::Column, _Name_>; \ + using type = std::span; \ + using column_t = _Name_; \ + _Name_(arrow::ChunkedArray const* column) \ + : o2::soa::Column, _Name_>(o2::soa::ColumnIterator>(column)) \ + { \ + } \ + \ + _Name_() = default; \ + _Name_(_Name_ const& other) = default; \ + _Name_& operator=(_Name_ const& other) = default; \ + \ + decltype(auto) _Getter_() const \ + { \ + static std::byte* payload = nullptr; \ + static _ConcreteType_* deserialised = nullptr; \ + static TClass* c = TClass::GetClass(#_ConcreteType_); \ + auto span = *mColumnIterator; \ + if (payload != (std::byte*)span.data()) { \ + payload = (std::byte*)span.data(); \ + delete deserialised; \ + TBufferFile f(TBufferFile::EMode::kRead, span.size(), (char*)span.data(), kFALSE); \ + deserialised = (_ConcreteType_*)soa::extractCCDBPayload((char*)payload, span.size(), c, "ccdb_object"); \ + } \ + return *deserialised; \ + } \ + \ + decltype(auto) \ + get() const \ + { \ + return _Getter_(); \ + } \ + }; + +#define DECLARE_SOA_CCDB_COLUMN(_Name_, _Getter_, _ConcreteType_, _CCDBQuery_) \ + DECLARE_SOA_CCDB_COLUMN_FULL(_Name_, "f" #_Name_, _Getter_, _ConcreteType_, _CCDBQuery_) + #define DECLARE_SOA_COLUMN(_Name_, _Getter_, _Type_) \ DECLARE_SOA_COLUMN_FULL(_Name_, _Getter_, _Type_, "f" #_Name_) @@ -3188,6 +3242,43 @@ consteval auto getIndexTargets() using metadata = _Name_##Metadata; \ }; +// Declare were each row is associated to a timestamp column of an _TimestampSource_ +// table. +// +// The columns of this table have to be CCDB_COLUMNS so that for each timestamp, we get a row +// which points to the specified CCDB objectes described by those columns. +#define DECLARE_SOA_TIMESTAMPED_TABLE_FULL(_Name_, _Label_, _TimestampSource_, _TimestampColumn_, _Origin_, _Version_, _Desc_, ...) \ + O2HASH(_Desc_ "/" #_Version_); \ + template \ + using _Name_##TimestampFrom = soa::Table, o2::aod::Hash<_Desc_ "/" #_Version_ ""_h>, O>; \ + using _Name_##Timestamp = _Name_##TimestampFrom>; \ + template > \ + struct _Name_##TimestampMetadataFrom : TableMetadata, __VA_ARGS__> { \ + using base_table_t = _TimestampSource_; \ + using extension_table_t = _Name_##TimestampFrom; \ + static constexpr const auto ccdb_urls = [](framework::pack) { \ + return std::array{Cs::query...}; \ + }(framework::pack<__VA_ARGS__>{}); \ + static constexpr const auto ccdb_bindings = [](framework::pack) { \ + return std::array{Cs::mLabel...}; \ + }(framework::pack<__VA_ARGS__>{}); \ + static constexpr auto sources = _TimestampSource_::originals; \ + static constexpr auto timestamp_column_label = _TimestampColumn_::mLabel; \ + /*static constexpr auto timestampColumn = _TimestampColumn_;*/ \ + }; \ + using _Name_##TimestampMetadata = _Name_##TimestampMetadataFrom>; \ + template <> \ + struct MetadataTrait> { \ + using metadata = _Name_##TimestampMetadata; \ + }; \ + template \ + using _Name_##From = o2::soa::JoinFull, _TimestampSource_, _Name_##TimestampFrom>; \ + using _Name_ = _Name_##From>; + +#define DECLARE_SOA_TIMESTAMPED_TABLE(_Name_, _TimestampSource_, _TimestampColumn_, _Version_, _Desc_, ...) \ + O2HASH(#_Name_ "Timestamped"); \ + DECLARE_SOA_TIMESTAMPED_TABLE_FULL(_Name_, #_Name_ "Timestamped", _TimestampSource_, _TimestampColumn_, "TIM", _Version_, _Desc_, __VA_ARGS__) + #define DECLARE_SOA_INDEX_TABLE(_Name_, _Key_, _Description_, ...) \ DECLARE_SOA_INDEX_TABLE_FULL(_Name_, _Key_, "IDX", 0, _Description_, false, __VA_ARGS__) diff --git a/Framework/Core/include/Framework/AnalysisContext.h b/Framework/Core/include/Framework/AnalysisContext.h index 0f62f952d0aaa..7d1544ed312a4 100644 --- a/Framework/Core/include/Framework/AnalysisContext.h +++ b/Framework/Core/include/Framework/AnalysisContext.h @@ -29,16 +29,24 @@ struct OutputObjectInfo { std::vector bindings; }; -// +// This will keep track of the inputs which have +// been requested and for which we will need to inject +// some source device. struct AnalysisContext { std::vector requestedAODs; std::vector providedAODs; std::vector requestedDYNs; std::vector providedDYNs; std::vector requestedIDXs; + std::vector providedTIMs; + std::vector requestedTIMs; std::vector providedOutputObjHist; std::vector spawnerInputs; + // These are the timestamped tables which are required to + // inject the the CCDB objecs. + std::vector analysisCCDBInputs; + // Needed to created the hist writer std::vector outTskMap; std::vector outObjHistMap; diff --git a/Framework/Core/include/Framework/AnalysisHelpers.h b/Framework/Core/include/Framework/AnalysisHelpers.h index 6e9b1e211bb76..06caa7f3df0a4 100644 --- a/Framework/Core/include/Framework/AnalysisHelpers.h +++ b/Framework/Core/include/Framework/AnalysisHelpers.h @@ -11,6 +11,7 @@ #ifndef o2_framework_AnalysisHelpers_H_DEFINED #define o2_framework_AnalysisHelpers_H_DEFINED +#include "ConfigParamSpec.h" #include "Framework/ASoA.h" #include "Framework/DataAllocator.h" #include "Framework/IndexBuilderHelpers.h" @@ -49,6 +50,19 @@ inline constexpr auto getSources() }.template operator()(); } +template +inline constexpr auto getCCDBUrls() +{ + std::vector result; + for (size_t i = 0; i < T::ccdb_urls.size(); ++i) { + result.push_back({std::string{"ccdb:"} + std::string{T::ccdb_bindings[i]}, + framework::VariantType::String, + T::ccdb_urls[i], + {"\"\""}}); + } + return result; +} + template constexpr auto getInputMetadata() -> std::vector { @@ -67,18 +81,40 @@ constexpr auto getInputMetadata() -> std::vector { return {}; } + +template +constexpr auto getCCDBMetadata() -> std::vector +{ + std::vector results = getCCDBUrls(); + std::sort(results.begin(), results.end(), [](framework::ConfigParamSpec const& a, framework::ConfigParamSpec const& b) { return a.name < b.name; }); + auto last = std::unique(results.begin(), results.end(), [](framework::ConfigParamSpec const& a, framework::ConfigParamSpec const& b) { return a.name == b.name; }); + results.erase(last, results.end()); + return results; +} + +template +constexpr auto getCCDBMetadata() -> std::vector +{ + return {}; +} } // namespace template constexpr auto tableRef2InputSpec() { + std::vector metadata; + auto m = getInputMetadata>::metadata>(); + metadata.insert(metadata.end(), m.begin(), m.end()); + auto ccdbMetadata = getCCDBMetadata>::metadata>(); + metadata.insert(metadata.end(), ccdbMetadata.begin(), ccdbMetadata.end()); + return framework::InputSpec{ o2::aod::label(), o2::aod::origin(), o2::aod::description(o2::aod::signature()), R.version, framework::Lifetime::Timeframe, - getInputMetadata>::metadata>()}; + metadata}; } template diff --git a/Framework/Core/include/Framework/AnalysisSupportHelpers.h b/Framework/Core/include/Framework/AnalysisSupportHelpers.h index 4ae601dc9e4a2..a4e80decf2bbe 100644 --- a/Framework/Core/include/Framework/AnalysisSupportHelpers.h +++ b/Framework/Core/include/Framework/AnalysisSupportHelpers.h @@ -39,6 +39,11 @@ struct AnalysisSupportHelpers { std::vector const& requestedSpecials, std::vector& requestedAODs, DataProcessorSpec& publisher); + static void addMissingOutputsToAnalysisCCDBFetcher(std::vector const& providedSpecials, + std::vector const& requestedSpecials, + std::vector& requestedAODs, + std::vector& requestedDYNs, + DataProcessorSpec& publisher); static void addMissingOutputsToBuilder(std::vector const& requestedSpecials, std::vector& requestedAODs, std::vector& requestedDYNs, diff --git a/Framework/Core/include/Framework/AnalysisTask.h b/Framework/Core/include/Framework/AnalysisTask.h index b3378543e6ebb..53f6bc0f862d6 100644 --- a/Framework/Core/include/Framework/AnalysisTask.h +++ b/Framework/Core/include/Framework/AnalysisTask.h @@ -65,7 +65,8 @@ concept is_enumeration = is_enumeration_v>; // Helper struct which builds a DataProcessorSpec from // the contents of an AnalysisTask... -namespace { +namespace +{ struct AnalysisDataProcessorBuilder { template static void addGroupingCandidates(Cache& bk, Cache& bku, bool enabled) @@ -417,7 +418,7 @@ struct AnalysisDataProcessorBuilder { std::invoke(processingFunction, task, g, std::get(at)...); } }; -} +} // namespace struct SetDefaultProcesses { std::vector> map; @@ -429,7 +430,8 @@ struct TaskName { std::string value; }; -namespace { +namespace +{ template auto getTaskNameSetProcesses(std::string& outputName, TaskName first, SetDefaultProcesses second, A... args) { @@ -493,7 +495,7 @@ auto getTaskNameSetProcesses(std::string& outputName, A... args) return task; } -} +} // namespace /// Adaptor to make an AlgorithmSpec from a o2::framework::Task /// diff --git a/Framework/Core/src/ASoA.cxx b/Framework/Core/src/ASoA.cxx index 3a681ee931a2b..ba0d105b9d8cc 100644 --- a/Framework/Core/src/ASoA.cxx +++ b/Framework/Core/src/ASoA.cxx @@ -14,6 +14,12 @@ #include "Framework/RuntimeError.h" #include #include +#include +#include +#include +#include +#include +#include namespace o2::soa { @@ -149,6 +155,7 @@ arrow::ChunkedArray* getIndexFromLabel(arrow::Table* table, std::string_view lab return caseInsensitiveCompare(label, f->name()); }); if (field == table->schema()->fields().end()) { + std::cout << table->ToString() << std::endl; o2::framework::throw_error(o2::framework::runtime_error_f("Unable to find column with label %s", label)); } auto index = std::distance(table->schema()->fields().begin(), field); @@ -170,6 +177,62 @@ void missingOptionalPreslice(const char* label, const char* key) throw o2::framework::runtime_error_f(R"(Optional Preslice with missing binding used: table "%s" (or join based on it) does not have column "%s")", label, key); } +void* extractCCDBPayload(char *payload, size_t size, TClass const* cl, const char* what) +{ + Int_t previousErrorLevel = gErrorIgnoreLevel; + gErrorIgnoreLevel = kFatal; + // does it have a flattened headers map attached in the end? + TMemFile file("name", (char*)payload, size, "READ"); + gErrorIgnoreLevel = previousErrorLevel; + if (file.IsZombie()) { + return nullptr; + } + + if (!cl) { + return nullptr; + } + auto object = file.GetObjectChecked(what, cl); + if (!object) { + // it could be that object was stored with previous convention + // where the classname was taken as key + std::string objectName(cl->GetName()); + objectName.erase(std::find_if(objectName.rbegin(), objectName.rend(), [](unsigned char ch) { + return !std::isspace(ch); + }).base(), + objectName.end()); + objectName.erase(objectName.begin(), std::find_if(objectName.begin(), objectName.end(), [](unsigned char ch) { + return !std::isspace(ch); + })); + + object = file.GetObjectChecked(objectName.c_str(), cl); + LOG(warn) << "Did not find object under expected name " << what; + if (!object) { + return nullptr; + } + LOG(warn) << "Found object under deprecated name " << cl->GetName(); + } + auto result = object; + // We need to handle some specific cases as ROOT ties them deeply + // to the file they are contained in + if (cl->InheritsFrom("TObject")) { + // make a clone + // detach from the file + auto tree = dynamic_cast((TObject*)object); + if (tree) { + tree->LoadBaskets(0x1L << 32); // make tree memory based + tree->SetDirectory(nullptr); + result = tree; + } else { + auto h = dynamic_cast((TObject*)object); + if (h) { + h->SetDirectory(nullptr); + result = h; + } + } + } + return result; +} + } // namespace o2::soa namespace o2::framework diff --git a/Framework/Core/src/AnalysisSupportHelpers.cxx b/Framework/Core/src/AnalysisSupportHelpers.cxx index eb17566fd6d31..e8c2d7acab5d2 100644 --- a/Framework/Core/src/AnalysisSupportHelpers.cxx +++ b/Framework/Core/src/AnalysisSupportHelpers.cxx @@ -207,6 +207,35 @@ void AnalysisSupportHelpers::addMissingOutputsToBuilder(std::vector c } } +void AnalysisSupportHelpers::addMissingOutputsToAnalysisCCDBFetcher( + std::vector const& providedSpecials, + std::vector const& requestedSpecials, + std::vector& requestedAODs, + std::vector& requestedDYNs, + DataProcessorSpec& publisher) +{ + for (auto& input : requestedSpecials) { + auto concrete = DataSpecUtils::asConcreteDataMatcher(input); + publisher.outputs.emplace_back(concrete.origin, concrete.description, concrete.subSpec); + // FIXME: good enough for now... + for (auto& i : input.metadata) { + if ((i.type == VariantType::String) && (i.name.find("input:") != std::string::npos)) { + auto value = i.defaultValue.get(); + auto spec = DataSpecUtils::fromMetadataString(i.defaultValue.get()); + auto j = std::find_if(publisher.inputs.begin(), publisher.inputs.end(), [&](auto x) { return x.binding == spec.binding; }); + if (j == publisher.inputs.end()) { + publisher.inputs.push_back(spec); + } + if (DataSpecUtils::partialMatch(spec, AODOrigins)) { + DataSpecUtils::updateInputList(requestedAODs, std::move(spec)); + } else if (DataSpecUtils::partialMatch(spec, header::DataOrigin{"DYN"})) { + DataSpecUtils::updateInputList(requestedDYNs, std::move(spec)); + } + } + } + } +} + // ============================================================================= DataProcessorSpec AnalysisSupportHelpers::getOutputObjHistSink(ConfigContext const& ctx) { diff --git a/Framework/Core/src/ArrowSupport.cxx b/Framework/Core/src/ArrowSupport.cxx index 3a7699fb6876d..cb700dca34779 100644 --- a/Framework/Core/src/ArrowSupport.cxx +++ b/Framework/Core/src/ArrowSupport.cxx @@ -19,6 +19,8 @@ #include "Framework/ServiceRegistry.h" #include "Framework/ConfigContext.h" #include "Framework/CommonDataProcessors.h" +#include "Framework/DataSpecUtils.h" +#include "Framework/DataSpecViews.h" #include "Framework/DeviceSpec.h" #include "Framework/EndOfStreamContext.h" #include "Framework/Tracing.h" @@ -27,6 +29,7 @@ #include "Framework/DeviceInfo.h" #include "Framework/DevicesManager.h" #include "Framework/DeviceConfig.h" +#include "Framework/PluginManager.h" #include "Framework/ServiceMetricsInfo.h" #include "WorkflowHelpers.h" #include "Framework/WorkflowSpecNode.h" @@ -441,6 +444,7 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() .adjustTopology = [](WorkflowSpecNode& node, ConfigContext const& ctx) { auto& workflow = node.specs; auto spawner = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-spawner"; }); + auto analysisCCDB = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-ccdb"; }); auto builder = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-index-builder"; }); auto reader = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-reader"; }); auto writer = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-writer"; }); @@ -448,6 +452,8 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() ac.requestedAODs.clear(); ac.requestedDYNs.clear(); ac.providedDYNs.clear(); + ac.providedTIMs.clear(); + ac.requestedTIMs.clear(); auto inputSpecLessThan = [](InputSpec const& lhs, InputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); }; @@ -511,6 +517,27 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() AnalysisSupportHelpers::addMissingOutputsToSpawner({}, ac.spawnerInputs, ac.requestedAODs, *spawner); } + if (analysisCCDB != workflow.end()) { + for (auto& d : workflow | views::exclude_by_name(analysisCCDB->name)) { + d.inputs | views::partial_match_filter(header::DataOrigin{"TIM"}) | sinks::update_input_list{ac.requestedTIMs}; + d.outputs | views::partial_match_filter(header::DataOrigin{"TIM"}) | sinks::append_to{ac.providedTIMs}; + } + std::sort(ac.requestedTIMs.begin(), ac.requestedTIMs.end(), inputSpecLessThan); + std::sort(ac.providedTIMs.begin(), ac.providedTIMs.end(), outputSpecLessThan); + // Use ranges::to> in C++23... + ac.analysisCCDBInputs.clear(); + ac.requestedTIMs | views::filter_not_matching(ac.providedTIMs) | sinks::append_to{ac.analysisCCDBInputs}; + + // recreate inputs and outputs + analysisCCDB->outputs.clear(); + analysisCCDB->inputs.clear(); + // replace AlgorithmSpec + // FIXME: it should be made more generic, so it does not need replacement... + // FIXME how can I make the lookup depend on DYN tables as well?? + analysisCCDB->algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkCCDBSupport", "AnalysisCCDBFetcherPlugin", ctx); + AnalysisSupportHelpers::addMissingOutputsToAnalysisCCDBFetcher({}, ac.analysisCCDBInputs, ac.requestedAODs, ac.requestedDYNs, *analysisCCDB); + } + if (writer != workflow.end()) { workflow.erase(writer); } @@ -538,6 +565,8 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() } } + + // replace writer as some outputs may have become dangling and some are now consumed auto [outputsInputs, isDangling] = WorkflowHelpers::analyzeOutputs(workflow); diff --git a/Framework/Core/src/WorkflowHelpers.cxx b/Framework/Core/src/WorkflowHelpers.cxx index b86a4f15e7306..dcbd1f6f51f7e 100644 --- a/Framework/Core/src/WorkflowHelpers.cxx +++ b/Framework/Core/src/WorkflowHelpers.cxx @@ -18,6 +18,7 @@ #include "Framework/ConfigContext.h" #include "Framework/DeviceSpec.h" #include "Framework/DataSpecUtils.h" +#include "Framework/DataSpecViews.h" #include "Framework/DataAllocator.h" #include "Framework/ControlService.h" #include "Framework/RawDeviceService.h" @@ -184,6 +185,21 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext {"end-value-enumeration", VariantType::Int64, -1ll, {"final value for the enumeration"}}, {"step-value-enumeration", VariantType::Int64, 1ll, {"step between one value and the other"}}}, }; + DataProcessorSpec analysisCCDBBackend{ + .name = "internal-dpl-aod-ccdb", + .inputs = {}, + .outputs = {}, + .algorithm = AlgorithmSpec::dummyAlgorithm(), + .options = {{"condition-backend", VariantType::String, defaultConditionBackend(), {"URL for CCDB"}}, + {"condition-not-before", VariantType::Int64, 0ll, {"do not fetch from CCDB objects created before provide timestamp"}}, + {"condition-not-after", VariantType::Int64, 3385078236000ll, {"do not fetch from CCDB objects created after the timestamp"}}, + {"condition-remap", VariantType::String, "", {"remap condition path in CCDB based on the provided string."}}, + {"condition-tf-per-query", VariantType::Int, defaultConditionQueryRate(), {"check condition validity per requested number of TFs, fetch only once if <=0"}}, + {"condition-tf-per-query-multiplier", VariantType::Int, defaultConditionQueryRateMultiplier(), {"check conditions once per this amount of nominal checks"}}, + {"condition-time-tolerance", VariantType::Int64, 5000ll, {"prefer creation time if its difference to orbit-derived time exceeds threshold (ms), impose if <0"}}, + {"start-value-enumeration", VariantType::Int64, 0ll, {"initial value for the enumeration"}}, + {"end-value-enumeration", VariantType::Int64, -1ll, {"final value for the enumeration"}}, + {"step-value-enumeration", VariantType::Int64, 1ll, {"step between one value and the other"}}}}; DataProcessorSpec transientStore{"internal-dpl-transient-store", {}, {}, @@ -357,6 +373,9 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext if (DataSpecUtils::partialMatch(input, header::DataOrigin{"IDX"})) { DataSpecUtils::updateInputList(ac.requestedIDXs, InputSpec{input}); } + if (DataSpecUtils::partialMatch(input, header::DataOrigin{"TIM"})) { + DataSpecUtils::updateInputList(ac.requestedTIMs, InputSpec{input}); + } } std::stable_sort(timer.outputs.begin(), timer.outputs.end(), [](OutputSpec const& a, OutputSpec const& b) { return *DataSpecUtils::getOptionalSubSpec(a) < *DataSpecUtils::getOptionalSubSpec(b); }); @@ -366,6 +385,8 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext ac.providedAODs.emplace_back(output); } else if (DataSpecUtils::partialMatch(output, header::DataOrigin{"DYN"})) { ac.providedDYNs.emplace_back(output); + } else if (DataSpecUtils::partialMatch(output, header::DataOrigin{"TIM"})) { + ac.providedTIMs.emplace_back(output); } else if (DataSpecUtils::partialMatch(output, header::DataOrigin{"ATSK"})) { ac.providedOutputObjHist.emplace_back(output); auto it = std::find_if(ac.outObjHistMap.begin(), ac.outObjHistMap.end(), [&](auto&& x) { return x.id == hash; }); @@ -384,7 +405,9 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext auto inputSpecLessThan = [](InputSpec const& lhs, InputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); }; auto outputSpecLessThan = [](OutputSpec const& lhs, OutputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); }; std::sort(ac.requestedDYNs.begin(), ac.requestedDYNs.end(), inputSpecLessThan); + std::sort(ac.requestedTIMs.begin(), ac.requestedTIMs.end(), inputSpecLessThan); std::sort(ac.providedDYNs.begin(), ac.providedDYNs.end(), outputSpecLessThan); + std::sort(ac.providedTIMs.begin(), ac.providedTIMs.end(), outputSpecLessThan); DataProcessorSpec indexBuilder{ "internal-dpl-aod-index-builder", @@ -394,6 +417,9 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext {}}; AnalysisSupportHelpers::addMissingOutputsToBuilder(ac.requestedIDXs, ac.requestedAODs, ac.requestedDYNs, indexBuilder); + ac.requestedTIMs | views::filter_not_matching(ac.providedTIMs) | sinks::append_to(ac.analysisCCDBInputs); + AnalysisSupportHelpers::addMissingOutputsToAnalysisCCDBFetcher({}, ac.analysisCCDBInputs, ac.requestedAODs, ac.requestedTIMs, analysisCCDBBackend); + for (auto& input : ac.requestedDYNs) { if (std::none_of(ac.providedDYNs.begin(), ac.providedDYNs.end(), [&input](auto const& x) { return DataSpecUtils::match(input, x); })) { ac.spawnerInputs.emplace_back(input); @@ -568,6 +594,15 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext } } + // add the Analysys CCDB backend which reads CCDB objects using a provided + // table + if (analysisCCDBBackend.outputs.empty() == false) { + // add normal reader + auto&& algo = PluginManager::loadAlgorithmFromPlugin("O2FrameworkCCDBSupport", "AnalysisCCDBFetcherPlugin", ctx); + analysisCCDBBackend.algorithm = algo; + extraSpecs.push_back(analysisCCDBBackend); + } + // add the timer if (timer.outputs.empty() == false) { extraSpecs.push_back(timer); diff --git a/Framework/TestWorkflows/CMakeLists.txt b/Framework/TestWorkflows/CMakeLists.txt index b147a4871bf26..f5d18183c3705 100644 --- a/Framework/TestWorkflows/CMakeLists.txt +++ b/Framework/TestWorkflows/CMakeLists.txt @@ -41,6 +41,11 @@ o2_add_dpl_workflow(analysis-histograms SOURCES src/o2TestHistograms.cxx COMPONENT_NAME TestWorkflows) +o2_add_dpl_workflow(analysis-ccdb + SOURCES src/o2TestAnalysisCCDB.cxx + PUBLIC_LINK_LIBRARIES O2::DataFormatsTOF + COMPONENT_NAME TestWorkflows) + o2_add_dpl_workflow(two-timers SOURCES src/o2TwoTimers.cxx COMPONENT_NAME TestWorkflows) diff --git a/Framework/TestWorkflows/src/o2TestAnalysisCCDB.cxx b/Framework/TestWorkflows/src/o2TestAnalysisCCDB.cxx new file mode 100644 index 0000000000000..f9684762539f7 --- /dev/null +++ b/Framework/TestWorkflows/src/o2TestAnalysisCCDB.cxx @@ -0,0 +1,69 @@ +// Copyright 2019-2025 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. +/// +/// \brief FullTracks is a join of Tracks, TracksCov, and TracksExtra. +/// \author +/// \since + +#include "Framework/runDataProcessing.h" +#include "Framework/AnalysisTask.h" +#include "Framework/AnalysisDataModel.h" +#include "DataFormatsTOF/CalibLHCphaseTOF.h" +#include + +#include + +using namespace o2; +using namespace o2::framework; +using namespace o2::framework::expressions; + +namespace o2::aod +{ +namespace tofcalib +{ +DECLARE_SOA_CCDB_COLUMN(LHCphase, lhcPhase, o2::dataformats::CalibLHCphaseTOF, "TOF/Calib/LHCphase"); //! +} // namespace tofcalib + +DECLARE_SOA_TIMESTAMPED_TABLE(TOFCalibrationObjects, aod::Timestamps, o2::aod::timestamp::Timestamp, 1, "TOFCALIB", //! + tofcalib::LHCphase); +} // namespace o2::aod + +struct DummyTimestampsTable { + Produces timestamps; /// Table with SOR timestamps produced by the task + Service control; + + void process(Enumeration<0, 1>& e) + { + timestamps(1747442464000); // c2b3d801393540b7bddb949d600b199f, ecacb915-3d70-11f0-ac6f-808de0f5250c + timestamps(1747442764000); // 0262dbd9d50aa79c3d4dcd5ec3ca67c3, ed5471c5-3d70-11f0-b0a3-808de0f524ee + control->readyToQuit(QuitRequest::Me); + control->endOfStream(); + std::cout << "Executed " << std::endl; + } +}; + +struct SimpleCCDBConsumer { + void process(o2::aod::TOFCalibrationObjects const& ccdbObjectsForAllTimestamps) + { + LOGP(info, "Looking at all the LHCphases associated to the timestamps"); + for (auto& object : ccdbObjectsForAllTimestamps) { + std::cout << object.lhcPhase().getStartValidity() << " " << object.lhcPhase().getEndValidity() << std::endl; + } + } +}; + +WorkflowSpec defineDataProcessing(ConfigContext const& cfgc) +{ + return WorkflowSpec{ + adaptAnalysisTask(cfgc), + adaptAnalysisTask(cfgc, TaskName{"simple-ccdb-cunsumer"}), + }; +} From 478251f87e69072a433e51511f242c164301674f Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Thu, 21 Aug 2025 09:14:53 +0200 Subject: [PATCH 2/2] DPL: adapt reconstruction CCDB Fetcher to use the new CCDBFetcherHelper --- Framework/CCDBSupport/src/CCDBHelpers.cxx | 323 +++------------------- 1 file changed, 39 insertions(+), 284 deletions(-) diff --git a/Framework/CCDBSupport/src/CCDBHelpers.cxx b/Framework/CCDBSupport/src/CCDBHelpers.cxx index 1428e22e86651..363cdb53ba577 100644 --- a/Framework/CCDBSupport/src/CCDBHelpers.cxx +++ b/Framework/CCDBSupport/src/CCDBHelpers.cxx @@ -1,4 +1,4 @@ -// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// Copyright 2019-2025 CERN and copyright holders of ALICE O2. // See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. // All rights not expressly granted are reserved. // @@ -10,6 +10,7 @@ // or submit itself to any jurisdiction. #include "CCDBHelpers.h" +#include "CCDBFetcherHelper.h" #include "Framework/DeviceSpec.h" #include "Framework/Logger.h" #include "Framework/TimingInfo.h" @@ -22,171 +23,21 @@ #include "Framework/Signpost.h" #include #include +#include O2_DECLARE_DYNAMIC_LOG(ccdb); namespace o2::framework { -struct CCDBFetcherHelper { - struct CCDBCacheInfo { - std::string etag; - size_t cacheValidUntil = 0; - size_t cachePopulatedAt = 0; - size_t cacheMiss = 0; - size_t cacheHit = 0; - size_t minSize = -1ULL; - size_t maxSize = 0; - int lastCheckedTF = 0; - }; - - struct RemapMatcher { - std::string path; - }; - - struct RemapTarget { - std::string url; - }; - - std::unordered_map mapURL2UUID; - std::unordered_map mapURL2DPLCache; - std::string createdNotBefore = "0"; - std::string createdNotAfter = "3385078236000"; - std::unordered_map apis; - std::vector routes; - std::unordered_map remappings; - uint32_t lastCheckedTFCounterOrbReset = 0; // last checkecked TFcounter for bulk check - int queryPeriodGlo = 1; - int queryPeriodFactor = 1; - int64_t timeToleranceMS = 5000; - - o2::ccdb::CcdbApi& getAPI(const std::string& path) - { - // find the first = sign in the string. If present drop everything after it - // and between it and the previous /. - auto pos = path.find('='); - if (pos == std::string::npos) { - auto entry = remappings.find(path); - return apis[entry == remappings.end() ? "" : entry->second]; - } - auto pos2 = path.rfind('/', pos); - if (pos2 == std::string::npos || pos2 == pos - 1 || pos2 == 0) { - throw runtime_error_f("Malformed path %s", path.c_str()); - } - auto entry = remappings.find(path.substr(0, pos2)); - return apis[entry == remappings.end() ? "" : entry->second]; - } -}; - -bool isPrefix(std::string_view prefix, std::string_view full) +// Fill valid routes. Here we consider only the routes which have +// Lifetime::Condition. Notice the way we do it for analysis will be +// different. +namespace { - return prefix == full.substr(0, prefix.size()); -} - -CCDBHelpers::ParserResult CCDBHelpers::parseRemappings(char const* str) -{ - std::unordered_map remappings; - std::string currentUrl = ""; - - enum ParsingStates { - IN_BEGIN, - IN_BEGIN_URL, - IN_BEGIN_TARGET, - IN_END_TARGET, - IN_END_URL - }; - ParsingStates state = IN_BEGIN; - - while (true) { - switch (state) { - case IN_BEGIN: { - if (*str == 0) { - return {remappings, ""}; - } - state = IN_BEGIN_URL; - } - case IN_BEGIN_URL: { - if ((strncmp("http://", str, 7) != 0) && (strncmp("https://", str, 8) != 0 && (strncmp("file://", str, 7) != 0))) { - return {remappings, "URL should start with either http:// or https:// or file://"}; - } - state = IN_END_URL; - } break; - case IN_END_URL: { - char const* c = strchr(str, '='); - if (c == nullptr) { - return {remappings, "Expecting at least one target path, missing `='?"}; - } - if ((c - str) == 0) { - return {remappings, "Empty url"}; - } - currentUrl = std::string_view(str, c - str); - state = IN_BEGIN_TARGET; - str = c + 1; - } break; - case IN_BEGIN_TARGET: { - if (*str == 0) { - return {remappings, "Empty target"}; - } - state = IN_END_TARGET; - } break; - case IN_END_TARGET: { - char const* c = strpbrk(str, ",;"); - if (c == nullptr) { - if (remappings.count(str)) { - return {remappings, fmt::format("Path {} requested more than once.", str)}; - } - remappings[std::string(str)] = currentUrl; - return {remappings, ""}; - } - if ((c - str) == 0) { - return {remappings, "Empty target"}; - } - auto key = std::string(str, c - str); - if (remappings.count(str)) { - return {remappings, fmt::format("Path {} requested more than once.", key)}; - } - remappings[key] = currentUrl; - if (*c == ';') { - state = IN_BEGIN_URL; - } else { - state = IN_BEGIN_TARGET; - } - str = c + 1; - } break; - } - } -} - -void initialiseHelper(CCDBFetcherHelper& helper, ConfigParamRegistry const& options, std::vector const& outputRoutes) +void fillValidRoutes(CCDBFetcherHelper& helper, std::vector const& outputRoutes) { std::unordered_map accountedSpecs; - auto defHost = options.get("condition-backend"); - auto checkRate = options.get("condition-tf-per-query"); - auto checkMult = options.get("condition-tf-per-query-multiplier"); - helper.timeToleranceMS = options.get("condition-time-tolerance"); - helper.queryPeriodGlo = checkRate > 0 ? checkRate : std::numeric_limits::max(); - helper.queryPeriodFactor = checkMult > 0 ? checkMult : 1; - LOGP(info, "CCDB Backend at: {}, validity check for every {} TF{}", defHost, helper.queryPeriodGlo, helper.queryPeriodFactor == 1 ? std::string{} : fmt::format(", (query for high-rate objects downscaled by {})", helper.queryPeriodFactor)); - LOGP(info, "Hook to enable signposts for CCDB messages at {}", (void*)&private_o2_log_ccdb->stacktrace); - auto remapString = options.get("condition-remap"); - CCDBHelpers::ParserResult result = CCDBHelpers::parseRemappings(remapString.c_str()); - if (!result.error.empty()) { - throw runtime_error_f("Error while parsing remapping string %s", result.error.c_str()); - } - helper.remappings = result.remappings; - helper.apis[""].init(defHost); // default backend - LOGP(info, "Initialised default CCDB host {}", defHost); - // - for (auto& entry : helper.remappings) { // init api instances for every host seen in the remapping - if (helper.apis.find(entry.second) == helper.apis.end()) { - helper.apis[entry.second].init(entry.second); - LOGP(info, "Initialised custom CCDB host {}", entry.second); - } - LOGP(info, "{} is remapped to {}", entry.first, entry.second); - } - helper.createdNotBefore = std::to_string(options.get("condition-not-before")); - helper.createdNotAfter = std::to_string(options.get("condition-not-after")); - for (auto& route : outputRoutes) { if (route.matcher.lifetime != Lifetime::Condition) { continue; @@ -205,6 +56,7 @@ void initialiseHelper(CCDBFetcherHelper& helper, ConfigParamRegistry const& opti } } } +} // namespace auto getOrbitResetTime(o2::pmr::vector const& v) -> Long64_t { @@ -225,136 +77,12 @@ auto getOrbitResetTime(o2::pmr::vector const& v) -> Long64_t return (*ctp)[0]; }; -bool isOnlineRun(DataTakingContext const& dtc) -{ - return dtc.deploymentMode == DeploymentMode::OnlineAUX || dtc.deploymentMode == DeploymentMode::OnlineDDS || dtc.deploymentMode == DeploymentMode::OnlineECS; -} - -auto populateCacheWith(std::shared_ptr const& helper, - int64_t timestamp, - TimingInfo& timingInfo, - DataTakingContext& dtc, - DataAllocator& allocator) -> void -{ - std::string ccdbMetadataPrefix = "ccdb-metadata-"; - int objCnt = -1; - // We use the timeslice, so that we hook into the same interval as the rest of the - // callback. - static bool isOnline = isOnlineRun(dtc); - - auto sid = _o2_signpost_id_t{(int64_t)timingInfo.timeslice}; - O2_SIGNPOST_START(ccdb, sid, "populateCacheWith", "Starting to populate cache with CCDB objects"); - for (auto& route : helper->routes) { - int64_t timestampToUse = timestamp; - O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Fetching object for route %{public}s", DataSpecUtils::describe(route.matcher).data()); - objCnt++; - auto concrete = DataSpecUtils::asConcreteDataMatcher(route.matcher); - Output output{concrete.origin, concrete.description, concrete.subSpec}; - auto&& v = allocator.makeVector(output); - std::map metadata; - std::map headers; - std::string path = ""; - std::string etag = ""; - int chRate = helper->queryPeriodGlo; - bool checkValidity = false; - for (auto& meta : route.matcher.metadata) { - if (meta.name == "ccdb-path") { - path = meta.defaultValue.get(); - } else if (meta.name == "ccdb-run-dependent" && meta.defaultValue.get() > 0) { - if (meta.defaultValue.get() == 1) { - metadata["runNumber"] = dtc.runNumber; - } else if (meta.defaultValue.get() == 2) { - timestampToUse = std::stoi(dtc.runNumber); - } else { - LOGP(fatal, "Undefined ccdb-run-dependent option {} for spec {}/{}/{}", meta.defaultValue.get(), concrete.origin.as(), concrete.description.as(), int(concrete.subSpec)); - } - } else if (isPrefix(ccdbMetadataPrefix, meta.name)) { - std::string key = meta.name.substr(ccdbMetadataPrefix.size()); - auto value = meta.defaultValue.get(); - O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Adding metadata %{public}s: %{public}s to the request", key.data(), value.data()); - metadata[key] = value; - } else if (meta.name == "ccdb-query-rate") { - chRate = meta.defaultValue.get() * helper->queryPeriodFactor; - } - } - const auto url2uuid = helper->mapURL2UUID.find(path); - if (url2uuid != helper->mapURL2UUID.end()) { - etag = url2uuid->second.etag; - // We check validity every chRate timeslices or if the cache is expired - uint64_t validUntil = url2uuid->second.cacheValidUntil; - // When the cache was populated. If the cache was populated after the timestamp, we need to check validity. - uint64_t cachePopulatedAt = url2uuid->second.cachePopulatedAt; - // If timestamp is before the time the element was cached or after the claimed validity, we need to check validity, again - // when online. - bool cacheExpired = (validUntil <= timestampToUse) || (timestamp < cachePopulatedAt); - checkValidity = (std::abs(int(timingInfo.tfCounter - url2uuid->second.lastCheckedTF)) >= chRate) && (isOnline || cacheExpired); - } else { - checkValidity = true; // never skip check if the cache is empty - } - - O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "checkValidity is %{public}s for tfID %d of %{public}s", checkValidity ? "true" : "false", timingInfo.tfCounter, path.data()); - - const auto& api = helper->getAPI(path); - if (checkValidity && (!api.isSnapshotMode() || etag.empty())) { // in the snapshot mode the object needs to be fetched only once - LOGP(detail, "Loading {} for timestamp {}", path, timestampToUse); - api.loadFileToMemory(v, path, metadata, timestampToUse, &headers, etag, helper->createdNotAfter, helper->createdNotBefore); - if ((headers.count("Error") != 0) || (etag.empty() && v.empty())) { - LOGP(fatal, "Unable to find CCDB object {}/{}", path, timestampToUse); - // FIXME: I should send a dummy message. - continue; - } - // printing in case we find a default entry - if (headers.find("default") != headers.end()) { - LOGP(detail, "******** Default entry used for {} ********", path); - } - helper->mapURL2UUID[path].lastCheckedTF = timingInfo.tfCounter; - if (etag.empty()) { - helper->mapURL2UUID[path].etag = headers["ETag"]; // update uuid - helper->mapURL2UUID[path].cachePopulatedAt = timestampToUse; - helper->mapURL2UUID[path].cacheMiss++; - helper->mapURL2UUID[path].minSize = std::min(v.size(), helper->mapURL2UUID[path].minSize); - helper->mapURL2UUID[path].maxSize = std::max(v.size(), helper->mapURL2UUID[path].maxSize); - api.appendFlatHeader(v, headers); - auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodCCDB); - helper->mapURL2DPLCache[path] = cacheId; - O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Caching %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value); - continue; - } - if (v.size()) { // but should be overridden by fresh object - // somewhere here pruneFromCache should be called - helper->mapURL2UUID[path].etag = headers["ETag"]; // update uuid - helper->mapURL2UUID[path].cachePopulatedAt = timestampToUse; - helper->mapURL2UUID[path].cacheValidUntil = headers["Cache-Valid-Until"].empty() ? 0 : std::stoul(headers["Cache-Valid-Until"]); - helper->mapURL2UUID[path].cacheMiss++; - helper->mapURL2UUID[path].minSize = std::min(v.size(), helper->mapURL2UUID[path].minSize); - helper->mapURL2UUID[path].maxSize = std::max(v.size(), helper->mapURL2UUID[path].maxSize); - api.appendFlatHeader(v, headers); - auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodCCDB); - helper->mapURL2DPLCache[path] = cacheId; - O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Caching %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value); - // one could modify the adoptContainer to take optional old cacheID to clean: - // mapURL2DPLCache[URL] = ctx.outputs().adoptContainer(output, std::move(outputBuffer), DataAllocator::CacheStrategy::Always, mapURL2DPLCache[URL]); - continue; - } else { - // Only once the etag is actually used, we get the information on how long the object is valid - helper->mapURL2UUID[path].cacheValidUntil = headers["Cache-Valid-Until"].empty() ? 0 : std::stoul(headers["Cache-Valid-Until"]); - } - } - // cached object is fine - auto cacheId = helper->mapURL2DPLCache[path]; - O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Reusing %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value); - helper->mapURL2UUID[path].cacheHit++; - allocator.adoptFromCache(output, cacheId, header::gSerializationMethodCCDB); - // the outputBuffer was not used, can we destroy it? - } - O2_SIGNPOST_END(ccdb, sid, "populateCacheWith", "Finished populating cache with CCDB objects"); -}; - AlgorithmSpec CCDBHelpers::fetchFromCCDB() { return adaptStateful([](CallbackService& callbacks, ConfigParamRegistry const& options, DeviceSpec const& spec) { std::shared_ptr helper = std::make_shared(); - initialiseHelper(*helper, options, spec.outputs); + CCDBFetcherHelper::initialiseHelper(*helper, options); + fillValidRoutes(*helper, spec.outputs); /// Add a callback on stop which dumps the statistics for the caching per /// path callbacks.set([helper]() { @@ -453,8 +181,35 @@ AlgorithmSpec CCDBHelpers::fetchFromCCDB() // Fetch the rest of the objects. O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "fetchFromCCDB", "Fetching objects. Run %{public}s. OrbitResetTime %lld. Creation %lld. Timestamp %lld. firstTForbit %" PRIu32, dtc.runNumber.data(), orbitResetTime, timingInfo.creation, timestamp, timingInfo.firstTForbit); + std::vector ops; + int runNumber = 0; + std::string ccdbMetadataPrefix = "ccdb-metadata-"; + for (auto &route : helper->routes) { + CCDBFetcherHelper::FetchOp op{.spec = route.matcher, .timestamp = timestamp, .runNumber = std::stoi(dtc.runNumber)}; + for (auto& meta : route.matcher.metadata) { + if (meta.name == "ccdb-path") { + op.url = meta.defaultValue.get(); + } else if (meta.name == "ccdb-run-dependent" && meta.defaultValue.get() > 0) { + if (meta.defaultValue.get() == 1) { + op.runNumber = runNumber; + } else if (meta.defaultValue.get() == 2) { + op.timestamp = runNumber; + } else { + LOGP(fatal, "Undefined ccdb-run-dependent option {} for spec {}", meta.defaultValue.get(), DataSpecUtils::describe(route.matcher)); + } + } else if (meta.name.starts_with(ccdbMetadataPrefix)) { + std::string key = meta.name.substr(ccdbMetadataPrefix.size()); + auto value = meta.defaultValue.get(); + O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Adding metadata %{public}s: %{public}s to the request", key.data(), value.data()); + op.metadata.push_back({key, value}); + } else if (meta.name == "ccdb-query-rate") { + op.queryRate = meta.defaultValue.get() * helper->queryPeriodFactor; + } + } + ops.push_back(op); + } - populateCacheWith(helper, timestamp, timingInfo, dtc, allocator); + CCDBFetcherHelper::populateCacheWith(helper, ops, timingInfo, dtc, allocator); O2_SIGNPOST_END(ccdb, _o2_signpost_id_t{(int64_t)timingInfo.timeslice}, "fetchFromCCDB", "Fetching CCDB objects"); }); }); }