diff --git a/Framework/CCDBSupport/CMakeLists.txt b/Framework/CCDBSupport/CMakeLists.txt index e4310ac5e0ec5..ed898fb3114aa 100644 --- a/Framework/CCDBSupport/CMakeLists.txt +++ b/Framework/CCDBSupport/CMakeLists.txt @@ -1,4 +1,4 @@ -# Copyright 2019-2020 CERN and copyright holders of ALICE O2. +# Copyright 2019-2025 CERN and copyright holders of ALICE O2. # See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. # All rights not expressly granted are reserved. # @@ -9,14 +9,21 @@ # granted to it by virtue of its status as an Intergovernmental Organization # or submit itself to any jurisdiction. o2_add_library(FrameworkCCDBSupport - SOURCES + SOURCES src/Plugin.cxx + src/CCDBFetcherHelper.cxx src/CCDBHelpers.cxx + src/AnalysisCCDBHelpers.cxx PRIVATE_INCLUDE_DIRECTORIES ${CMAKE_CURRENT_LIST_DIR}/src PUBLIC_LINK_LIBRARIES O2::Framework O2::CCDB) -o2_add_test(CCDBHelpers NAME test_Framework_test_CCDBHelpers - SOURCES test/test_CCDBHelpers.cxx - COMPONENT_NAME Framework - LABELS framework - PUBLIC_LINK_LIBRARIES O2::Framework O2::FrameworkCCDBSupport) +add_executable(o2-test-framework-ccdbsupport + test/test_CCDBHelpers.cxx) +target_link_libraries(o2-test-framework-ccdbsupport PRIVATE O2::Framework) +target_link_libraries(o2-test-framework-ccdbsupport PRIVATE O2::FrameworkCCDBSupport) +target_link_libraries(o2-test-framework-ccdbsupport PRIVATE O2::Catch2) + +get_filename_component(outdir ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/../tests ABSOLUTE) +set_property(TARGET o2-test-framework-ccdbsupport PROPERTY RUNTIME_OUTPUT_DIRECTORY ${outdir}) + +add_test(NAME framework:ccdbsupport COMMAND o2-test-framework-ccdbsupport --skip-benchmarks) diff --git a/Framework/CCDBSupport/src/AnalysisCCDBHelpers.cxx b/Framework/CCDBSupport/src/AnalysisCCDBHelpers.cxx new file mode 100644 index 0000000000000..1f655384eabaf --- /dev/null +++ b/Framework/CCDBSupport/src/AnalysisCCDBHelpers.cxx @@ -0,0 +1,202 @@ +// Copyright 2019-2025 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +#include "AnalysisCCDBHelpers.h" +#include "CCDBFetcherHelper.h" +#include "Framework/DeviceSpec.h" +#include "Framework/TimingInfo.h" +#include "Framework/ConfigParamRegistry.h" +#include "Framework/DataTakingContext.h" +#include "Framework/RawDeviceService.h" +#include "Framework/Output.h" +#include "Framework/Signpost.h" +#include "Framework/AnalysisContext.h" +#include "Framework/ConfigContext.h" +#include "Framework/ConfigContext.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +O2_DECLARE_DYNAMIC_LOG(ccdb); + +namespace o2::framework +{ +// Fill valid routes. Notice that for analysis the timestamps are associated to +// a TIM table and there might be multiple CCDB objects of the same kind for +// dataframe. +// For this reason rather than matching the Lifetime::Condition, we match the +// origin. +namespace +{ +void fillValidRoutes(CCDBFetcherHelper& helper, std::vector const& outputRoutes, std::unordered_map& bindings) +{ + for (auto& route : outputRoutes) { + auto originMatcher = DataSpecUtils::asConcreteDataMatcher(route.matcher); + if (originMatcher.origin != header::DataOrigin{"TIM"}) { + continue; + } + auto specStr = DataSpecUtils::describe(route.matcher); + if (bindings.find(specStr) != bindings.end()) { + continue; + } + std::cout << specStr << " is at " << helper.routes.size() << std::endl; + bindings[specStr] = helper.routes.size(); + helper.routes.push_back(route); + LOGP(info, "The following route needs condition objects {} ", DataSpecUtils::describe(route.matcher)); + for (auto& metadata : route.matcher.metadata) { + if (metadata.type == VariantType::String) { + LOGP(info, "- {}: {}", metadata.name, metadata.defaultValue.asString()); + } + } + } +} +} // namespace + +AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& ctx) +{ + auto& ac = ctx.services().get(); + std::vector> schemas; + auto schemaMetadata = std::make_shared(); + + for (auto& input : ac.analysisCCDBInputs) { + std::vector> fields; + std::cout << input << std::endl; + schemaMetadata->Append("outputRoute", DataSpecUtils::describe(input)); + schemaMetadata->Append("outputBinding", input.binding); + + for (auto& m : input.metadata) { + // Save the list of input tables + if (m.name.starts_with("input:")) { + auto name = m.name.substr(6); + schemaMetadata->Append("sourceTable", name); + std::cout << "sourceTable:" << name << " " << m.defaultValue.asString() << std::endl; + continue; + } + // Ignore the non ccdb: entries + if (!m.name.starts_with("ccdb:")) { + continue; + } + // Create the schema of the output + std::cout << m.name << " " << m.defaultValue.asString() << std::endl; + + auto metadata = std::make_shared(); + metadata->Append("url", m.defaultValue.asString()); + std::cout << "url" << " " << m.defaultValue.asString() << std::endl; + auto columnName = m.name.substr(strlen("ccdb:")); + fields.emplace_back(std::make_shared(columnName, arrow::binary_view(), false, metadata)); + } + schemas.emplace_back(std::make_shared(fields, schemaMetadata)); + } + return adaptStateful([schemas](CallbackService& callbacks, ConfigParamRegistry const& options, DeviceSpec const& spec) { + std::shared_ptr helper = std::make_shared(); + CCDBFetcherHelper::initialiseHelper(*helper, options); + std::unordered_map bindings; + fillValidRoutes(*helper, spec.outputs, bindings); + + return adaptStateless([schemas, bindings, helper](InputRecord& inputs, DataTakingContext& dtc, DataAllocator& allocator, TimingInfo& timingInfo) { + std::cout << " Also executed" << std::endl; + O2_SIGNPOST_ID_GENERATE(sid, ccdb); + O2_SIGNPOST_START(ccdb, sid, "fetchFromAnalysisCCDB", "Fetching CCDB objects for analysis%" PRIu64, (uint64_t)timingInfo.timeslice); + for (auto& schema : schemas) { + std::vector ops; + auto inputBinding = *schema->metadata()->Get("sourceTable"); + auto outRouteDesc = *schema->metadata()->Get("outputRoute"); + std::string outBinding = *schema->metadata()->Get("outputBinding"); + O2_SIGNPOST_EVENT_EMIT_INFO(ccdb, sid, "fetchFromAnalysisCCDB", + "Fetching CCDB objects for %{public}s's columns with timestamps from %{public}s and putting them in route %{public}s", + outBinding.c_str(), inputBinding.c_str(), outRouteDesc.c_str()); + auto ref = inputs.get(inputBinding); + auto table = ref->asArrowTable(); + std::cout << table->ToString() << std::endl; + // FIXME: make the fTimestamp column configurable. + auto timestampColumn = table->GetColumnByName("fTimestamp"); + O2_SIGNPOST_EVENT_EMIT_INFO(ccdb, sid, "fetchFromAnalysisCCDB", + "There are %zu bindings available", bindings.size()); + for (auto& binding : bindings) { + O2_SIGNPOST_EVENT_EMIT_INFO(ccdb, sid, "fetchFromAnalysisCCDB", + "* %{public}s: %d", + binding.first.c_str(), binding.second); + } + std::cout << " output route is " << outRouteDesc << std::endl; + std::cout << " output binding is " << outBinding << std::endl; + int outputRouteIndex = bindings.at(outRouteDesc); + std::cout << " Index for output is " << outputRouteIndex << std::endl; + auto& spec = helper->routes[outputRouteIndex].matcher; + std::vector> builders; + for (auto &_ : schema->fields()) { + builders.emplace_back(std::make_shared()); + } + + for (size_t ci = 0; ci < timestampColumn->num_chunks(); ++ci) { + std::shared_ptr chunk = timestampColumn->chunk(ci); + auto const* timestamps = chunk->data()->GetValuesSafe(1); + + for (int64_t ri = 0; ri < chunk->data()->length; ri++) { + ops.clear(); + int64_t timestamp = timestamps[ri]; + for (auto& field : schema->fields()) { + auto url = *field->metadata()->Get("url"); + std::cout << "Fetching " << field->name() << " from " << url << "/" << timestamp << std::endl; + // Time to actually populate the blob + ops.push_back({ + .spec = spec, + .url = url, + .timestamp = timestamp, + .runNumber = 1, + .runDependent = 0, + .queryRate = 0, + }); + } + auto responses = CCDBFetcherHelper::populateCacheWith(helper, ops, timingInfo, dtc, allocator); + O2_SIGNPOST_START(ccdb, sid, "handlingResponses", + "Got %zu responses from server.", + responses.size()); + if (builders.size() != responses.size()) { + LOGP(fatal, "Not enough responses (expected {}, found {})", builders.size(), responses.size()); + } + arrow::Status result; + for (size_t bi = 0; bi < responses.size(); bi++) { + auto &builder = builders[bi]; + auto &response = responses[bi]; + char const* address = reinterpret_cast(response.id.value); + std::cout << "Inserting view (" << (void*)address << "," << response.size << ")" << std::endl; + result &= builder->Append(std::string_view(address, response.size)); + } + if (!result.ok()) { + LOGP(fatal, "Error adding results from CCDB"); + } + O2_SIGNPOST_END(ccdb, sid, "handlingResponses", "Done processing responses"); + } + } + arrow::ArrayVector arrays; + for (auto &builder : builders) { + arrays.push_back(*builder->Finish()); + } + auto outTable = arrow::Table::Make(schema, arrays); + auto concrete = DataSpecUtils::asConcreteDataMatcher(spec); + allocator.adopt(Output{concrete.origin, concrete.description, concrete.subSpec}, outTable); + } + + O2_SIGNPOST_END(ccdb, sid, "fetchFromAnalysisCCDB", "Fetching CCDB objects"); + }); + }); +} + +} // namespace o2::framework diff --git a/Framework/CCDBSupport/src/AnalysisCCDBHelpers.h b/Framework/CCDBSupport/src/AnalysisCCDBHelpers.h new file mode 100644 index 0000000000000..1871bb1906262 --- /dev/null +++ b/Framework/CCDBSupport/src/AnalysisCCDBHelpers.h @@ -0,0 +1,25 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. +#ifndef O2_FRAMEWORK_ANALYSISCCDBHELPERS_H_ +#define O2_FRAMEWORK_ANALYSISCCDBHELPERS_H_ + +#include "Framework/AlgorithmSpec.h" + +namespace o2::framework +{ + +struct AnalysisCCDBHelpers { + static AlgorithmSpec fetchFromCCDB(ConfigContext const&ctx); +}; + +} // namespace o2::framework + +#endif // O2_FRAMEWORK_ANALYSISCCDBHELPERS_H_ diff --git a/Framework/CCDBSupport/src/CCDBFetcherHelper.cxx b/Framework/CCDBSupport/src/CCDBFetcherHelper.cxx new file mode 100644 index 0000000000000..902a57fe1588b --- /dev/null +++ b/Framework/CCDBSupport/src/CCDBFetcherHelper.cxx @@ -0,0 +1,278 @@ +// Copyright 2019-2025 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. +#include "CCDBFetcherHelper.h" +#include "Framework/DataTakingContext.h" +#include "Framework/Signpost.h" +#include "Framework/DataSpecUtils.h" +#include "Framework/ConfigParamRegistry.h" +#include +#include + +O2_DECLARE_DYNAMIC_LOG(ccdb); + +namespace o2::framework +{ + +o2::ccdb::CcdbApi& CCDBFetcherHelper::getAPI(const std::string& path) +{ + // find the first = sign in the string. If present drop everything after it + // and between it and the previous /. + auto pos = path.find('='); + if (pos == std::string::npos) { + auto entry = remappings.find(path); + return apis[entry == remappings.end() ? "" : entry->second]; + } + auto pos2 = path.rfind('/', pos); + if (pos2 == std::string::npos || pos2 == pos - 1 || pos2 == 0) { + throw runtime_error_f("Malformed path %s", path.c_str()); + } + auto entry = remappings.find(path.substr(0, pos2)); + return apis[entry == remappings.end() ? "" : entry->second]; +} + +namespace { +bool isOnlineRun(DataTakingContext const& dtc) +{ + return dtc.deploymentMode == DeploymentMode::OnlineAUX || dtc.deploymentMode == DeploymentMode::OnlineDDS || dtc.deploymentMode == DeploymentMode::OnlineECS; +} +} + +void CCDBFetcherHelper::initialiseHelper(CCDBFetcherHelper& helper, ConfigParamRegistry const& options) +{ + auto defHost = options.get("condition-backend"); + auto checkRate = options.get("condition-tf-per-query"); + auto checkMult = options.get("condition-tf-per-query-multiplier"); + helper.timeToleranceMS = options.get("condition-time-tolerance"); + helper.queryPeriodGlo = checkRate > 0 ? checkRate : std::numeric_limits::max(); + helper.queryPeriodFactor = checkMult > 0 ? checkMult : 1; + LOGP(info, "CCDB Backend at: {}, validity check for every {} TF{}", defHost, helper.queryPeriodGlo, helper.queryPeriodFactor == 1 ? std::string{} : fmt::format(", (query for high-rate objects downscaled by {})", helper.queryPeriodFactor)); + LOGP(info, "Hook to enable signposts for CCDB messages at {}", (void*)&private_o2_log_ccdb->stacktrace); + auto remapString = options.get("condition-remap"); + ParserResult result = parseRemappings(remapString.c_str()); + if (!result.error.empty()) { + throw runtime_error_f("Error while parsing remapping string %s", result.error.c_str()); + } + helper.remappings = result.remappings; + helper.apis[""].init(defHost); // default backend + LOGP(info, "Initialised default CCDB host {}", defHost); + // + for (auto& entry : helper.remappings) { // init api instances for every host seen in the remapping + if (helper.apis.find(entry.second) == helper.apis.end()) { + helper.apis[entry.second].init(entry.second); + LOGP(info, "Initialised custom CCDB host {}", entry.second); + } + LOGP(info, "{} is remapped to {}", entry.first, entry.second); + } + helper.createdNotBefore = std::to_string(options.get("condition-not-before")); + helper.createdNotAfter = std::to_string(options.get("condition-not-after")); +} + +CCDBFetcherHelper::ParserResult CCDBFetcherHelper::parseRemappings(char const* str) +{ + std::unordered_map remappings; + std::string currentUrl = ""; + + enum ParsingStates { + IN_BEGIN, + IN_BEGIN_URL, + IN_BEGIN_TARGET, + IN_END_TARGET, + IN_END_URL + }; + ParsingStates state = IN_BEGIN; + + while (true) { + switch (state) { + case IN_BEGIN: { + if (*str == 0) { + return {remappings, ""}; + } + state = IN_BEGIN_URL; + } + case IN_BEGIN_URL: { + if ((strncmp("http://", str, 7) != 0) && (strncmp("https://", str, 8) != 0 && (strncmp("file://", str, 7) != 0))) { + return {remappings, "URL should start with either http:// or https:// or file://"}; + } + state = IN_END_URL; + } break; + case IN_END_URL: { + char const* c = strchr(str, '='); + if (c == nullptr) { + return {remappings, "Expecting at least one target path, missing `='?"}; + } + if ((c - str) == 0) { + return {remappings, "Empty url"}; + } + currentUrl = std::string_view(str, c - str); + state = IN_BEGIN_TARGET; + str = c + 1; + } break; + case IN_BEGIN_TARGET: { + if (*str == 0) { + return {remappings, "Empty target"}; + } + state = IN_END_TARGET; + } break; + case IN_END_TARGET: { + char const* c = strpbrk(str, ",;"); + if (c == nullptr) { + if (remappings.count(str)) { + return {remappings, fmt::format("Path {} requested more than once.", str)}; + } + remappings[std::string(str)] = currentUrl; + return {remappings, ""}; + } + if ((c - str) == 0) { + return {remappings, "Empty target"}; + } + auto key = std::string(str, c - str); + if (remappings.count(str)) { + return {remappings, fmt::format("Path {} requested more than once.", key)}; + } + remappings[key] = currentUrl; + if (*c == ';') { + state = IN_BEGIN_URL; + } else { + state = IN_BEGIN_TARGET; + } + str = c + 1; + } break; + } + } +} + +auto CCDBFetcherHelper::populateCacheWith(std::shared_ptr const& helper, + std::vector const& ops, + TimingInfo& timingInfo, + DataTakingContext& dtc, + DataAllocator& allocator) -> std::vector +{ + int objCnt = -1; + // We use the timeslice, so that we hook into the same interval as the rest of the + // callback. + static bool isOnline = isOnlineRun(dtc); + + auto sid = _o2_signpost_id_t{(int64_t)timingInfo.timeslice}; + O2_SIGNPOST_START(ccdb, sid, "populateCacheWith", "Starting to populate cache with CCDB objects"); + std::vector responses; + for (auto& op : ops) { + int64_t timestampToUse = op.timestamp; + O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Fetching object for route %{public}s", DataSpecUtils::describe(op.spec).data()); + objCnt++; + auto concrete = DataSpecUtils::asConcreteDataMatcher(op.spec); + Output output{concrete.origin, concrete.description, concrete.subSpec}; + auto&& v = allocator.makeVector(output); + std::map metadata; + std::map headers; + std::string path = op.url; + std::string etag = ""; + int chRate = helper->queryPeriodGlo; + bool checkValidity = false; + if (op.runDependent > 0) { + if (op.runDependent == 1) { + metadata["runNumber"] = std::format("{}", op.runNumber); + } else if (op.runDependent == 2) { + timestampToUse = op.runNumber; + } else { + LOGP(fatal, "Undefined ccdb-run-dependent option {} for spec {}/{}/{}", op.runDependent, + concrete.origin.as(), concrete.description.as(), int(concrete.subSpec)); + } + } + for (auto m : op.metadata) { + O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Adding metadata %{public}s: %{public}s to the request", m.key.data(), m.value.data()); + metadata[m.key] = m.value; + } + if (op.queryRate != 0) { + chRate = op.queryRate * helper->queryPeriodFactor; + } + + const auto url2uuid = helper->mapURL2UUID.find(path); + if (url2uuid != helper->mapURL2UUID.end()) { + etag = url2uuid->second.etag; + // We check validity every chRate timeslices or if the cache is expired + uint64_t validUntil = url2uuid->second.cacheValidUntil; + // When the cache was populated. If the cache was populated after the timestamp, we need to check validity. + uint64_t cachePopulatedAt = url2uuid->second.cachePopulatedAt; + // If timestamp is before the time the element was cached or after the claimed validity, we need to check validity, again + // when online. + bool cacheExpired = (validUntil <= timestampToUse) || (op.timestamp < cachePopulatedAt); + checkValidity = (std::abs(int(timingInfo.tfCounter - url2uuid->second.lastCheckedTF)) >= chRate) && (isOnline || cacheExpired); + } else { + checkValidity = true; // never skip check if the cache is empty + } + + O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "checkValidity is %{public}s for tfID %d of %{public}s", checkValidity ? "true" : "false", timingInfo.tfCounter, path.data()); + + const auto& api = helper->getAPI(path); + if (checkValidity && (!api.isSnapshotMode() || etag.empty())) { // in the snapshot mode the object needs to be fetched only once + LOGP(detail, "Loading {} for timestamp {}", path, timestampToUse); + api.loadFileToMemory(v, path, metadata, timestampToUse, &headers, etag, helper->createdNotAfter, helper->createdNotBefore); + if ((headers.count("Error") != 0) || (etag.empty() && v.empty())) { + LOGP(fatal, "Unable to find CCDB object {}/{}", path, timestampToUse); + // FIXME: I should send a dummy message. + continue; + } + // printing in case we find a default entry + if (headers.find("default") != headers.end()) { + LOGP(detail, "******** Default entry used for {} ********", path); + } + helper->mapURL2UUID[path].lastCheckedTF = timingInfo.tfCounter; + if (etag.empty()) { + helper->mapURL2UUID[path].etag = headers["ETag"]; // update uuid + helper->mapURL2UUID[path].cachePopulatedAt = timestampToUse; + helper->mapURL2UUID[path].cacheMiss++; + helper->mapURL2UUID[path].size = v.size(); + helper->mapURL2UUID[path].minSize = std::min(v.size(), helper->mapURL2UUID[path].minSize); + helper->mapURL2UUID[path].maxSize = std::max(v.size(), helper->mapURL2UUID[path].maxSize); + auto size = v.size(); + api.appendFlatHeader(v, headers); + auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodCCDB); + helper->mapURL2DPLCache[path] = cacheId; + responses.emplace_back(Response{.id = cacheId, .size = size, .request = nullptr}); + O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Caching %{public}s for %{public}s (DPL id %" PRIu64 ", size %zu)", path.data(), headers["ETag"].data(), cacheId.value, size); + continue; + } + if (v.size()) { // but should be overridden by fresh object + // somewhere here pruneFromCache should be called + helper->mapURL2UUID[path].etag = headers["ETag"]; // update uuid + helper->mapURL2UUID[path].cachePopulatedAt = timestampToUse; + helper->mapURL2UUID[path].cacheValidUntil = headers["Cache-Valid-Until"].empty() ? 0 : std::stoul(headers["Cache-Valid-Until"]); + helper->mapURL2UUID[path].cacheMiss++; + helper->mapURL2UUID[path].size = v.size(); + helper->mapURL2UUID[path].minSize = std::min(v.size(), helper->mapURL2UUID[path].minSize); + helper->mapURL2UUID[path].maxSize = std::max(v.size(), helper->mapURL2UUID[path].maxSize); + auto size = v.size(); + api.appendFlatHeader(v, headers); + auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodCCDB); + helper->mapURL2DPLCache[path] = cacheId; + responses.emplace_back(Response{.id = cacheId, .size = size, .request = nullptr}); + O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Caching %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value); + // one could modify the adoptContainer to take optional old cacheID to clean: + // mapURL2DPLCache[URL] = ctx.outputs().adoptContainer(output, std::move(outputBuffer), DataAllocator::CacheStrategy::Always, mapURL2DPLCache[URL]); + continue; + } else { + // Only once the etag is actually used, we get the information on how long the object is valid + helper->mapURL2UUID[path].cacheValidUntil = headers["Cache-Valid-Until"].empty() ? 0 : std::stoul(headers["Cache-Valid-Until"]); + } + } + // cached object is fine + auto cacheId = helper->mapURL2DPLCache[path]; + O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Reusing %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value); + helper->mapURL2UUID[path].cacheHit++; + responses.emplace_back(Response{.id = cacheId, .size = helper->mapURL2UUID[path].size, .request = nullptr}); + allocator.adoptFromCache(output, cacheId, header::gSerializationMethodCCDB); + // the outputBuffer was not used, can we destroy it? + } + O2_SIGNPOST_END(ccdb, sid, "populateCacheWith", "Finished populating cache with CCDB objects"); + return responses; +}; + +} // namespace o2::framework diff --git a/Framework/CCDBSupport/src/CCDBFetcherHelper.h b/Framework/CCDBSupport/src/CCDBFetcherHelper.h new file mode 100644 index 0000000000000..721fadf73c582 --- /dev/null +++ b/Framework/CCDBSupport/src/CCDBFetcherHelper.h @@ -0,0 +1,109 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. +#ifndef O2_FRAMEWORK_CCDBFETCHERHELPER_H_ +#define O2_FRAMEWORK_CCDBFETCHERHELPER_H_ + +#include "Framework/OutputRoute.h" +#include "Framework/DataAllocator.h" +#include "CCDB/CcdbApi.h" +#include +#include + +namespace o2::framework +{ + +struct DataTakingContext; + +struct CCDBFetcherHelper { + struct CCDBCacheInfo { + std::string etag; + size_t cacheValidUntil = 0; + size_t cachePopulatedAt = 0; + size_t cacheMiss = 0; + size_t cacheHit = 0; + size_t size = 0L; + size_t minSize = -1ULL; + size_t maxSize = 0; + int lastCheckedTF = 0; + }; + + struct RemapMatcher { + std::string path; + }; + + struct RemapTarget { + std::string url; + }; + + struct ParserResult { + std::unordered_map remappings; + std::string error; + }; + + struct MetadataEntry { + std::string key; + std::string value; + }; + + // A fetch operation. + struct FetchOp { + // Where to put the blob + OutputSpec &spec; + // The url to fetch + std::string url = ""; + // The timestamp to use + int64_t timestamp = 0; + // The run to use + int runNumber = 0; + // Wether or not the thing is run dependent + int runDependent = 0; + // Actual metadata + std::vector metadata = {}; + // Query rate + int queryRate = 0; + }; + + // Where the data has been fetched + struct Response { + // CacheId / Pointer to the actual data + DataAllocator::CacheId id; + // The size of the buffer + size_t size = 0; + // Where to actually + FetchOp *request = nullptr; + }; + + static ParserResult parseRemappings(char const*); + + std::unordered_map mapURL2UUID; + std::unordered_map mapURL2DPLCache; + std::string createdNotBefore = "0"; + std::string createdNotAfter = "3385078236000"; + std::unordered_map apis; + std::vector routes; + std::unordered_map remappings; + uint32_t lastCheckedTFCounterOrbReset = 0; // last checkecked TFcounter for bulk check + int queryPeriodGlo = 1; + int queryPeriodFactor = 1; + int64_t timeToleranceMS = 5000; + + o2::ccdb::CcdbApi& getAPI(const std::string& path); + static void initialiseHelper(CCDBFetcherHelper& helper, ConfigParamRegistry const& options); + static auto populateCacheWith(std::shared_ptr const& helper, + std::vector const &ops, + TimingInfo& timingInfo, + DataTakingContext& dtc, + DataAllocator& allocator) -> std::vector; +}; + +} // namespace o2::framework + +#endif // O2_FRAMEWORK_CCDBFETCHERHELPER_H_ diff --git a/Framework/CCDBSupport/src/CCDBHelpers.cxx b/Framework/CCDBSupport/src/CCDBHelpers.cxx index 1428e22e86651..363cdb53ba577 100644 --- a/Framework/CCDBSupport/src/CCDBHelpers.cxx +++ b/Framework/CCDBSupport/src/CCDBHelpers.cxx @@ -1,4 +1,4 @@ -// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// Copyright 2019-2025 CERN and copyright holders of ALICE O2. // See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. // All rights not expressly granted are reserved. // @@ -10,6 +10,7 @@ // or submit itself to any jurisdiction. #include "CCDBHelpers.h" +#include "CCDBFetcherHelper.h" #include "Framework/DeviceSpec.h" #include "Framework/Logger.h" #include "Framework/TimingInfo.h" @@ -22,171 +23,21 @@ #include "Framework/Signpost.h" #include #include +#include O2_DECLARE_DYNAMIC_LOG(ccdb); namespace o2::framework { -struct CCDBFetcherHelper { - struct CCDBCacheInfo { - std::string etag; - size_t cacheValidUntil = 0; - size_t cachePopulatedAt = 0; - size_t cacheMiss = 0; - size_t cacheHit = 0; - size_t minSize = -1ULL; - size_t maxSize = 0; - int lastCheckedTF = 0; - }; - - struct RemapMatcher { - std::string path; - }; - - struct RemapTarget { - std::string url; - }; - - std::unordered_map mapURL2UUID; - std::unordered_map mapURL2DPLCache; - std::string createdNotBefore = "0"; - std::string createdNotAfter = "3385078236000"; - std::unordered_map apis; - std::vector routes; - std::unordered_map remappings; - uint32_t lastCheckedTFCounterOrbReset = 0; // last checkecked TFcounter for bulk check - int queryPeriodGlo = 1; - int queryPeriodFactor = 1; - int64_t timeToleranceMS = 5000; - - o2::ccdb::CcdbApi& getAPI(const std::string& path) - { - // find the first = sign in the string. If present drop everything after it - // and between it and the previous /. - auto pos = path.find('='); - if (pos == std::string::npos) { - auto entry = remappings.find(path); - return apis[entry == remappings.end() ? "" : entry->second]; - } - auto pos2 = path.rfind('/', pos); - if (pos2 == std::string::npos || pos2 == pos - 1 || pos2 == 0) { - throw runtime_error_f("Malformed path %s", path.c_str()); - } - auto entry = remappings.find(path.substr(0, pos2)); - return apis[entry == remappings.end() ? "" : entry->second]; - } -}; - -bool isPrefix(std::string_view prefix, std::string_view full) +// Fill valid routes. Here we consider only the routes which have +// Lifetime::Condition. Notice the way we do it for analysis will be +// different. +namespace { - return prefix == full.substr(0, prefix.size()); -} - -CCDBHelpers::ParserResult CCDBHelpers::parseRemappings(char const* str) -{ - std::unordered_map remappings; - std::string currentUrl = ""; - - enum ParsingStates { - IN_BEGIN, - IN_BEGIN_URL, - IN_BEGIN_TARGET, - IN_END_TARGET, - IN_END_URL - }; - ParsingStates state = IN_BEGIN; - - while (true) { - switch (state) { - case IN_BEGIN: { - if (*str == 0) { - return {remappings, ""}; - } - state = IN_BEGIN_URL; - } - case IN_BEGIN_URL: { - if ((strncmp("http://", str, 7) != 0) && (strncmp("https://", str, 8) != 0 && (strncmp("file://", str, 7) != 0))) { - return {remappings, "URL should start with either http:// or https:// or file://"}; - } - state = IN_END_URL; - } break; - case IN_END_URL: { - char const* c = strchr(str, '='); - if (c == nullptr) { - return {remappings, "Expecting at least one target path, missing `='?"}; - } - if ((c - str) == 0) { - return {remappings, "Empty url"}; - } - currentUrl = std::string_view(str, c - str); - state = IN_BEGIN_TARGET; - str = c + 1; - } break; - case IN_BEGIN_TARGET: { - if (*str == 0) { - return {remappings, "Empty target"}; - } - state = IN_END_TARGET; - } break; - case IN_END_TARGET: { - char const* c = strpbrk(str, ",;"); - if (c == nullptr) { - if (remappings.count(str)) { - return {remappings, fmt::format("Path {} requested more than once.", str)}; - } - remappings[std::string(str)] = currentUrl; - return {remappings, ""}; - } - if ((c - str) == 0) { - return {remappings, "Empty target"}; - } - auto key = std::string(str, c - str); - if (remappings.count(str)) { - return {remappings, fmt::format("Path {} requested more than once.", key)}; - } - remappings[key] = currentUrl; - if (*c == ';') { - state = IN_BEGIN_URL; - } else { - state = IN_BEGIN_TARGET; - } - str = c + 1; - } break; - } - } -} - -void initialiseHelper(CCDBFetcherHelper& helper, ConfigParamRegistry const& options, std::vector const& outputRoutes) +void fillValidRoutes(CCDBFetcherHelper& helper, std::vector const& outputRoutes) { std::unordered_map accountedSpecs; - auto defHost = options.get("condition-backend"); - auto checkRate = options.get("condition-tf-per-query"); - auto checkMult = options.get("condition-tf-per-query-multiplier"); - helper.timeToleranceMS = options.get("condition-time-tolerance"); - helper.queryPeriodGlo = checkRate > 0 ? checkRate : std::numeric_limits::max(); - helper.queryPeriodFactor = checkMult > 0 ? checkMult : 1; - LOGP(info, "CCDB Backend at: {}, validity check for every {} TF{}", defHost, helper.queryPeriodGlo, helper.queryPeriodFactor == 1 ? std::string{} : fmt::format(", (query for high-rate objects downscaled by {})", helper.queryPeriodFactor)); - LOGP(info, "Hook to enable signposts for CCDB messages at {}", (void*)&private_o2_log_ccdb->stacktrace); - auto remapString = options.get("condition-remap"); - CCDBHelpers::ParserResult result = CCDBHelpers::parseRemappings(remapString.c_str()); - if (!result.error.empty()) { - throw runtime_error_f("Error while parsing remapping string %s", result.error.c_str()); - } - helper.remappings = result.remappings; - helper.apis[""].init(defHost); // default backend - LOGP(info, "Initialised default CCDB host {}", defHost); - // - for (auto& entry : helper.remappings) { // init api instances for every host seen in the remapping - if (helper.apis.find(entry.second) == helper.apis.end()) { - helper.apis[entry.second].init(entry.second); - LOGP(info, "Initialised custom CCDB host {}", entry.second); - } - LOGP(info, "{} is remapped to {}", entry.first, entry.second); - } - helper.createdNotBefore = std::to_string(options.get("condition-not-before")); - helper.createdNotAfter = std::to_string(options.get("condition-not-after")); - for (auto& route : outputRoutes) { if (route.matcher.lifetime != Lifetime::Condition) { continue; @@ -205,6 +56,7 @@ void initialiseHelper(CCDBFetcherHelper& helper, ConfigParamRegistry const& opti } } } +} // namespace auto getOrbitResetTime(o2::pmr::vector const& v) -> Long64_t { @@ -225,136 +77,12 @@ auto getOrbitResetTime(o2::pmr::vector const& v) -> Long64_t return (*ctp)[0]; }; -bool isOnlineRun(DataTakingContext const& dtc) -{ - return dtc.deploymentMode == DeploymentMode::OnlineAUX || dtc.deploymentMode == DeploymentMode::OnlineDDS || dtc.deploymentMode == DeploymentMode::OnlineECS; -} - -auto populateCacheWith(std::shared_ptr const& helper, - int64_t timestamp, - TimingInfo& timingInfo, - DataTakingContext& dtc, - DataAllocator& allocator) -> void -{ - std::string ccdbMetadataPrefix = "ccdb-metadata-"; - int objCnt = -1; - // We use the timeslice, so that we hook into the same interval as the rest of the - // callback. - static bool isOnline = isOnlineRun(dtc); - - auto sid = _o2_signpost_id_t{(int64_t)timingInfo.timeslice}; - O2_SIGNPOST_START(ccdb, sid, "populateCacheWith", "Starting to populate cache with CCDB objects"); - for (auto& route : helper->routes) { - int64_t timestampToUse = timestamp; - O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Fetching object for route %{public}s", DataSpecUtils::describe(route.matcher).data()); - objCnt++; - auto concrete = DataSpecUtils::asConcreteDataMatcher(route.matcher); - Output output{concrete.origin, concrete.description, concrete.subSpec}; - auto&& v = allocator.makeVector(output); - std::map metadata; - std::map headers; - std::string path = ""; - std::string etag = ""; - int chRate = helper->queryPeriodGlo; - bool checkValidity = false; - for (auto& meta : route.matcher.metadata) { - if (meta.name == "ccdb-path") { - path = meta.defaultValue.get(); - } else if (meta.name == "ccdb-run-dependent" && meta.defaultValue.get() > 0) { - if (meta.defaultValue.get() == 1) { - metadata["runNumber"] = dtc.runNumber; - } else if (meta.defaultValue.get() == 2) { - timestampToUse = std::stoi(dtc.runNumber); - } else { - LOGP(fatal, "Undefined ccdb-run-dependent option {} for spec {}/{}/{}", meta.defaultValue.get(), concrete.origin.as(), concrete.description.as(), int(concrete.subSpec)); - } - } else if (isPrefix(ccdbMetadataPrefix, meta.name)) { - std::string key = meta.name.substr(ccdbMetadataPrefix.size()); - auto value = meta.defaultValue.get(); - O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Adding metadata %{public}s: %{public}s to the request", key.data(), value.data()); - metadata[key] = value; - } else if (meta.name == "ccdb-query-rate") { - chRate = meta.defaultValue.get() * helper->queryPeriodFactor; - } - } - const auto url2uuid = helper->mapURL2UUID.find(path); - if (url2uuid != helper->mapURL2UUID.end()) { - etag = url2uuid->second.etag; - // We check validity every chRate timeslices or if the cache is expired - uint64_t validUntil = url2uuid->second.cacheValidUntil; - // When the cache was populated. If the cache was populated after the timestamp, we need to check validity. - uint64_t cachePopulatedAt = url2uuid->second.cachePopulatedAt; - // If timestamp is before the time the element was cached or after the claimed validity, we need to check validity, again - // when online. - bool cacheExpired = (validUntil <= timestampToUse) || (timestamp < cachePopulatedAt); - checkValidity = (std::abs(int(timingInfo.tfCounter - url2uuid->second.lastCheckedTF)) >= chRate) && (isOnline || cacheExpired); - } else { - checkValidity = true; // never skip check if the cache is empty - } - - O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "checkValidity is %{public}s for tfID %d of %{public}s", checkValidity ? "true" : "false", timingInfo.tfCounter, path.data()); - - const auto& api = helper->getAPI(path); - if (checkValidity && (!api.isSnapshotMode() || etag.empty())) { // in the snapshot mode the object needs to be fetched only once - LOGP(detail, "Loading {} for timestamp {}", path, timestampToUse); - api.loadFileToMemory(v, path, metadata, timestampToUse, &headers, etag, helper->createdNotAfter, helper->createdNotBefore); - if ((headers.count("Error") != 0) || (etag.empty() && v.empty())) { - LOGP(fatal, "Unable to find CCDB object {}/{}", path, timestampToUse); - // FIXME: I should send a dummy message. - continue; - } - // printing in case we find a default entry - if (headers.find("default") != headers.end()) { - LOGP(detail, "******** Default entry used for {} ********", path); - } - helper->mapURL2UUID[path].lastCheckedTF = timingInfo.tfCounter; - if (etag.empty()) { - helper->mapURL2UUID[path].etag = headers["ETag"]; // update uuid - helper->mapURL2UUID[path].cachePopulatedAt = timestampToUse; - helper->mapURL2UUID[path].cacheMiss++; - helper->mapURL2UUID[path].minSize = std::min(v.size(), helper->mapURL2UUID[path].minSize); - helper->mapURL2UUID[path].maxSize = std::max(v.size(), helper->mapURL2UUID[path].maxSize); - api.appendFlatHeader(v, headers); - auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodCCDB); - helper->mapURL2DPLCache[path] = cacheId; - O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Caching %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value); - continue; - } - if (v.size()) { // but should be overridden by fresh object - // somewhere here pruneFromCache should be called - helper->mapURL2UUID[path].etag = headers["ETag"]; // update uuid - helper->mapURL2UUID[path].cachePopulatedAt = timestampToUse; - helper->mapURL2UUID[path].cacheValidUntil = headers["Cache-Valid-Until"].empty() ? 0 : std::stoul(headers["Cache-Valid-Until"]); - helper->mapURL2UUID[path].cacheMiss++; - helper->mapURL2UUID[path].minSize = std::min(v.size(), helper->mapURL2UUID[path].minSize); - helper->mapURL2UUID[path].maxSize = std::max(v.size(), helper->mapURL2UUID[path].maxSize); - api.appendFlatHeader(v, headers); - auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodCCDB); - helper->mapURL2DPLCache[path] = cacheId; - O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Caching %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value); - // one could modify the adoptContainer to take optional old cacheID to clean: - // mapURL2DPLCache[URL] = ctx.outputs().adoptContainer(output, std::move(outputBuffer), DataAllocator::CacheStrategy::Always, mapURL2DPLCache[URL]); - continue; - } else { - // Only once the etag is actually used, we get the information on how long the object is valid - helper->mapURL2UUID[path].cacheValidUntil = headers["Cache-Valid-Until"].empty() ? 0 : std::stoul(headers["Cache-Valid-Until"]); - } - } - // cached object is fine - auto cacheId = helper->mapURL2DPLCache[path]; - O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Reusing %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value); - helper->mapURL2UUID[path].cacheHit++; - allocator.adoptFromCache(output, cacheId, header::gSerializationMethodCCDB); - // the outputBuffer was not used, can we destroy it? - } - O2_SIGNPOST_END(ccdb, sid, "populateCacheWith", "Finished populating cache with CCDB objects"); -}; - AlgorithmSpec CCDBHelpers::fetchFromCCDB() { return adaptStateful([](CallbackService& callbacks, ConfigParamRegistry const& options, DeviceSpec const& spec) { std::shared_ptr helper = std::make_shared(); - initialiseHelper(*helper, options, spec.outputs); + CCDBFetcherHelper::initialiseHelper(*helper, options); + fillValidRoutes(*helper, spec.outputs); /// Add a callback on stop which dumps the statistics for the caching per /// path callbacks.set([helper]() { @@ -453,8 +181,35 @@ AlgorithmSpec CCDBHelpers::fetchFromCCDB() // Fetch the rest of the objects. O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "fetchFromCCDB", "Fetching objects. Run %{public}s. OrbitResetTime %lld. Creation %lld. Timestamp %lld. firstTForbit %" PRIu32, dtc.runNumber.data(), orbitResetTime, timingInfo.creation, timestamp, timingInfo.firstTForbit); + std::vector ops; + int runNumber = 0; + std::string ccdbMetadataPrefix = "ccdb-metadata-"; + for (auto &route : helper->routes) { + CCDBFetcherHelper::FetchOp op{.spec = route.matcher, .timestamp = timestamp, .runNumber = std::stoi(dtc.runNumber)}; + for (auto& meta : route.matcher.metadata) { + if (meta.name == "ccdb-path") { + op.url = meta.defaultValue.get(); + } else if (meta.name == "ccdb-run-dependent" && meta.defaultValue.get() > 0) { + if (meta.defaultValue.get() == 1) { + op.runNumber = runNumber; + } else if (meta.defaultValue.get() == 2) { + op.timestamp = runNumber; + } else { + LOGP(fatal, "Undefined ccdb-run-dependent option {} for spec {}", meta.defaultValue.get(), DataSpecUtils::describe(route.matcher)); + } + } else if (meta.name.starts_with(ccdbMetadataPrefix)) { + std::string key = meta.name.substr(ccdbMetadataPrefix.size()); + auto value = meta.defaultValue.get(); + O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Adding metadata %{public}s: %{public}s to the request", key.data(), value.data()); + op.metadata.push_back({key, value}); + } else if (meta.name == "ccdb-query-rate") { + op.queryRate = meta.defaultValue.get() * helper->queryPeriodFactor; + } + } + ops.push_back(op); + } - populateCacheWith(helper, timestamp, timingInfo, dtc, allocator); + CCDBFetcherHelper::populateCacheWith(helper, ops, timingInfo, dtc, allocator); O2_SIGNPOST_END(ccdb, _o2_signpost_id_t{(int64_t)timingInfo.timeslice}, "fetchFromCCDB", "Fetching CCDB objects"); }); }); } diff --git a/Framework/CCDBSupport/src/Plugin.cxx b/Framework/CCDBSupport/src/Plugin.cxx index 18aabc07ae4a4..d9083f97a023e 100644 --- a/Framework/CCDBSupport/src/Plugin.cxx +++ b/Framework/CCDBSupport/src/Plugin.cxx @@ -10,6 +10,7 @@ // or submit itself to any jurisdiction. #include "Framework/Plugins.h" #include "Framework/AlgorithmSpec.h" +#include "AnalysisCCDBHelpers.h" #include "CCDBHelpers.h" struct CCDBFetcherPlugin : o2::framework::AlgorithmPlugin { @@ -19,6 +20,14 @@ struct CCDBFetcherPlugin : o2::framework::AlgorithmPlugin { } }; +struct AnalysisCCDBFetcherPlugin : o2::framework::AlgorithmPlugin { + o2::framework::AlgorithmSpec create(o2::framework::ConfigContext const& ctx) final + { + return o2::framework::AnalysisCCDBHelpers::fetchFromCCDB(ctx); + } +}; + DEFINE_DPL_PLUGINS_BEGIN DEFINE_DPL_PLUGIN_INSTANCE(CCDBFetcherPlugin, CustomAlgorithm); +DEFINE_DPL_PLUGIN_INSTANCE(AnalysisCCDBFetcherPlugin, CustomAlgorithm); DEFINE_DPL_PLUGINS_END diff --git a/Framework/CCDBSupport/test/test_CCDBHelpers.cxx b/Framework/CCDBSupport/test/test_CCDBHelpers.cxx index df21738ddb647..53e6b66a2b30c 100644 --- a/Framework/CCDBSupport/test/test_CCDBHelpers.cxx +++ b/Framework/CCDBSupport/test/test_CCDBHelpers.cxx @@ -9,43 +9,39 @@ // granted to it by virtue of its status as an Intergovernmental Organization // or submit itself to any jurisdiction. -#define BOOST_TEST_MODULE Test Framework CCDBHelpers -#define BOOST_TEST_MAIN -#define BOOST_TEST_DYN_LINK - -#include -#include "../src/CCDBHelpers.h" +#include +#include "../src/CCDBFetcherHelper.h" using namespace o2::framework; -BOOST_AUTO_TEST_CASE(TestSorting) +TEST_CASE("TestSorting") { - auto result = CCDBHelpers::parseRemappings(""); - BOOST_CHECK_EQUAL(result.error, ""); // not an error + auto result = CCDBFetcherHelper::parseRemappings(""); + CHECK(result.error == ""); // not an error - result = CCDBHelpers::parseRemappings("https"); - BOOST_CHECK_EQUAL(result.error, "URL should start with either http:// or https:// or file://"); + result = CCDBFetcherHelper::parseRemappings("https"); + CHECK(result.error == "URL should start with either http:// or https:// or file://"); - result = CCDBHelpers::parseRemappings("https://alice.cern.ch:8000"); - BOOST_CHECK_EQUAL(result.error, "Expecting at least one target path, missing `='?"); + result = CCDBFetcherHelper::parseRemappings("https://alice.cern.ch:8000"); + CHECK(result.error == "Expecting at least one target path, missing `='?"); - result = CCDBHelpers::parseRemappings("https://alice.cern.ch:8000="); - BOOST_CHECK_EQUAL(result.error, "Empty target"); + result = CCDBFetcherHelper::parseRemappings("https://alice.cern.ch:8000="); + CHECK(result.error == "Empty target"); - result = CCDBHelpers::parseRemappings("https://alice.cern.ch:8000=/foo/bar,"); - BOOST_CHECK_EQUAL(result.error, "Empty target"); + result = CCDBFetcherHelper::parseRemappings("https://alice.cern.ch:8000=/foo/bar,"); + CHECK(result.error == "Empty target"); - result = CCDBHelpers::parseRemappings("https://alice.cern.ch:8000=/foo/bar,/foo/bar;"); - BOOST_CHECK_EQUAL(result.error, "URL should start with either http:// or https:// or file://"); + result = CCDBFetcherHelper::parseRemappings("https://alice.cern.ch:8000=/foo/bar,/foo/bar;"); + CHECK(result.error == "URL should start with either http:// or https:// or file://"); - result = CCDBHelpers::parseRemappings("https://alice.cern.ch:8000=/foo/bar,/foo/barbar;file://user/test=/foo/barr"); - BOOST_CHECK_EQUAL(result.error, ""); - BOOST_CHECK_EQUAL(result.remappings.size(), 3); - BOOST_CHECK_EQUAL(result.remappings["/foo/bar"], "https://alice.cern.ch:8000"); - BOOST_CHECK_EQUAL(result.remappings["/foo/barbar"], "https://alice.cern.ch:8000"); - BOOST_CHECK_EQUAL(result.remappings["/foo/barr"], "file://user/test"); + result = CCDBFetcherHelper::parseRemappings("https://alice.cern.ch:8000=/foo/bar,/foo/barbar;file://user/test=/foo/barr"); + CHECK(result.error == ""); + CHECK(result.remappings.size() == 3); + CHECK(result.remappings["/foo/bar"] == "https://alice.cern.ch:8000"); + CHECK(result.remappings["/foo/barbar"] == "https://alice.cern.ch:8000"); + CHECK(result.remappings["/foo/barr"] == "file://user/test"); - result = CCDBHelpers::parseRemappings("https://alice.cern.ch:8000=/foo/bar;file://user/test=/foo/bar"); - BOOST_CHECK_EQUAL(result.remappings.size(), 1); - BOOST_CHECK_EQUAL(result.error, "Path /foo/bar requested more than once."); + result = CCDBFetcherHelper::parseRemappings("https://alice.cern.ch:8000=/foo/bar;file://user/test=/foo/bar"); + CHECK(result.remappings.size() == 1); + CHECK(result.error == "Path /foo/bar requested more than once."); } diff --git a/Framework/Core/include/Framework/ASoA.h b/Framework/Core/include/Framework/ASoA.h index 6a49ed25e40d2..fd248850136a3 100644 --- a/Framework/Core/include/Framework/ASoA.h +++ b/Framework/Core/include/Framework/ASoA.h @@ -44,14 +44,18 @@ std::string cutString(std::string&& str); std::string strToUpper(std::string&& str); } // namespace o2::framework +struct TClass; + namespace o2::soa { void accessingInvalidIndexFor(const char* getter); void dereferenceWithWrongType(const char* getter, const char* target); void missingFilterDeclaration(int hash, int ai); void notBoundTable(const char* tableName); +void* extractCCDBPayload(char *payload, size_t size, TClass const* cl, const char* what); } // namespace o2::soa + namespace o2::soa { /// Generic identifier for a table type @@ -1274,6 +1278,11 @@ concept with_sources = requires { T::sources.size(); }; +template +concept with_ccdb_urls = requires { + T::ccdb_urls.size(); +}; + template concept with_base_table = not_void>::metadata::base_table_t>; @@ -2248,11 +2257,14 @@ ColumnGetterFunction getColumnGetterByLabel(const std:: namespace o2::aod { +// If you get an error about not satisfying is_origin_hash, you need to add +// an entry here. O2ORIGIN("AOD"); O2ORIGIN("AOD1"); O2ORIGIN("AOD2"); O2ORIGIN("DYN"); O2ORIGIN("IDX"); +O2ORIGIN("TIM"); O2ORIGIN("JOIN"); O2HASH("JOIN/0"); O2ORIGIN("CONC"); @@ -2313,6 +2325,48 @@ consteval static std::string_view namespace_prefix() }; \ [[maybe_unused]] static constexpr o2::framework::expressions::BindingNode _Getter_ { _Label_, _Name_::hash, o2::framework::expressions::selectArrowType<_Type_>() } +#define DECLARE_SOA_CCDB_COLUMN_FULL(_Name_, _Label_, _Getter_, _ConcreteType_, _CCDBQuery_) \ + struct _Name_ : o2::soa::Column, _Name_> { \ + static constexpr const char* mLabel = _Label_; \ + static constexpr const char* query = _CCDBQuery_; \ + static constexpr const uint32_t hash = crc32(namespace_prefix<_Name_>(), std::string_view{#_Getter_}); \ + using base = o2::soa::Column, _Name_>; \ + using type = std::span; \ + using column_t = _Name_; \ + _Name_(arrow::ChunkedArray const* column) \ + : o2::soa::Column, _Name_>(o2::soa::ColumnIterator>(column)) \ + { \ + } \ + \ + _Name_() = default; \ + _Name_(_Name_ const& other) = default; \ + _Name_& operator=(_Name_ const& other) = default; \ + \ + decltype(auto) _Getter_() const \ + { \ + static std::byte* payload = nullptr; \ + static _ConcreteType_* deserialised = nullptr; \ + static TClass* c = TClass::GetClass(#_ConcreteType_); \ + auto span = *mColumnIterator; \ + if (payload != (std::byte*)span.data()) { \ + payload = (std::byte*)span.data(); \ + delete deserialised; \ + TBufferFile f(TBufferFile::EMode::kRead, span.size(), (char*)span.data(), kFALSE); \ + deserialised = (_ConcreteType_*)soa::extractCCDBPayload((char*)payload, span.size(), c, "ccdb_object"); \ + } \ + return *deserialised; \ + } \ + \ + decltype(auto) \ + get() const \ + { \ + return _Getter_(); \ + } \ + }; + +#define DECLARE_SOA_CCDB_COLUMN(_Name_, _Getter_, _ConcreteType_, _CCDBQuery_) \ + DECLARE_SOA_CCDB_COLUMN_FULL(_Name_, "f" #_Name_, _Getter_, _ConcreteType_, _CCDBQuery_) + #define DECLARE_SOA_COLUMN(_Name_, _Getter_, _Type_) \ DECLARE_SOA_COLUMN_FULL(_Name_, _Getter_, _Type_, "f" #_Name_) @@ -3188,6 +3242,43 @@ consteval auto getIndexTargets() using metadata = _Name_##Metadata; \ }; +// Declare were each row is associated to a timestamp column of an _TimestampSource_ +// table. +// +// The columns of this table have to be CCDB_COLUMNS so that for each timestamp, we get a row +// which points to the specified CCDB objectes described by those columns. +#define DECLARE_SOA_TIMESTAMPED_TABLE_FULL(_Name_, _Label_, _TimestampSource_, _TimestampColumn_, _Origin_, _Version_, _Desc_, ...) \ + O2HASH(_Desc_ "/" #_Version_); \ + template \ + using _Name_##TimestampFrom = soa::Table, o2::aod::Hash<_Desc_ "/" #_Version_ ""_h>, O>; \ + using _Name_##Timestamp = _Name_##TimestampFrom>; \ + template > \ + struct _Name_##TimestampMetadataFrom : TableMetadata, __VA_ARGS__> { \ + using base_table_t = _TimestampSource_; \ + using extension_table_t = _Name_##TimestampFrom; \ + static constexpr const auto ccdb_urls = [](framework::pack) { \ + return std::array{Cs::query...}; \ + }(framework::pack<__VA_ARGS__>{}); \ + static constexpr const auto ccdb_bindings = [](framework::pack) { \ + return std::array{Cs::mLabel...}; \ + }(framework::pack<__VA_ARGS__>{}); \ + static constexpr auto sources = _TimestampSource_::originals; \ + static constexpr auto timestamp_column_label = _TimestampColumn_::mLabel; \ + /*static constexpr auto timestampColumn = _TimestampColumn_;*/ \ + }; \ + using _Name_##TimestampMetadata = _Name_##TimestampMetadataFrom>; \ + template <> \ + struct MetadataTrait> { \ + using metadata = _Name_##TimestampMetadata; \ + }; \ + template \ + using _Name_##From = o2::soa::JoinFull, _TimestampSource_, _Name_##TimestampFrom>; \ + using _Name_ = _Name_##From>; + +#define DECLARE_SOA_TIMESTAMPED_TABLE(_Name_, _TimestampSource_, _TimestampColumn_, _Version_, _Desc_, ...) \ + O2HASH(#_Name_ "Timestamped"); \ + DECLARE_SOA_TIMESTAMPED_TABLE_FULL(_Name_, #_Name_ "Timestamped", _TimestampSource_, _TimestampColumn_, "TIM", _Version_, _Desc_, __VA_ARGS__) + #define DECLARE_SOA_INDEX_TABLE(_Name_, _Key_, _Description_, ...) \ DECLARE_SOA_INDEX_TABLE_FULL(_Name_, _Key_, "IDX", 0, _Description_, false, __VA_ARGS__) diff --git a/Framework/Core/include/Framework/AnalysisContext.h b/Framework/Core/include/Framework/AnalysisContext.h index 0f62f952d0aaa..7d1544ed312a4 100644 --- a/Framework/Core/include/Framework/AnalysisContext.h +++ b/Framework/Core/include/Framework/AnalysisContext.h @@ -29,16 +29,24 @@ struct OutputObjectInfo { std::vector bindings; }; -// +// This will keep track of the inputs which have +// been requested and for which we will need to inject +// some source device. struct AnalysisContext { std::vector requestedAODs; std::vector providedAODs; std::vector requestedDYNs; std::vector providedDYNs; std::vector requestedIDXs; + std::vector providedTIMs; + std::vector requestedTIMs; std::vector providedOutputObjHist; std::vector spawnerInputs; + // These are the timestamped tables which are required to + // inject the the CCDB objecs. + std::vector analysisCCDBInputs; + // Needed to created the hist writer std::vector outTskMap; std::vector outObjHistMap; diff --git a/Framework/Core/include/Framework/AnalysisHelpers.h b/Framework/Core/include/Framework/AnalysisHelpers.h index 6e9b1e211bb76..06caa7f3df0a4 100644 --- a/Framework/Core/include/Framework/AnalysisHelpers.h +++ b/Framework/Core/include/Framework/AnalysisHelpers.h @@ -11,6 +11,7 @@ #ifndef o2_framework_AnalysisHelpers_H_DEFINED #define o2_framework_AnalysisHelpers_H_DEFINED +#include "ConfigParamSpec.h" #include "Framework/ASoA.h" #include "Framework/DataAllocator.h" #include "Framework/IndexBuilderHelpers.h" @@ -49,6 +50,19 @@ inline constexpr auto getSources() }.template operator()(); } +template +inline constexpr auto getCCDBUrls() +{ + std::vector result; + for (size_t i = 0; i < T::ccdb_urls.size(); ++i) { + result.push_back({std::string{"ccdb:"} + std::string{T::ccdb_bindings[i]}, + framework::VariantType::String, + T::ccdb_urls[i], + {"\"\""}}); + } + return result; +} + template constexpr auto getInputMetadata() -> std::vector { @@ -67,18 +81,40 @@ constexpr auto getInputMetadata() -> std::vector { return {}; } + +template +constexpr auto getCCDBMetadata() -> std::vector +{ + std::vector results = getCCDBUrls(); + std::sort(results.begin(), results.end(), [](framework::ConfigParamSpec const& a, framework::ConfigParamSpec const& b) { return a.name < b.name; }); + auto last = std::unique(results.begin(), results.end(), [](framework::ConfigParamSpec const& a, framework::ConfigParamSpec const& b) { return a.name == b.name; }); + results.erase(last, results.end()); + return results; +} + +template +constexpr auto getCCDBMetadata() -> std::vector +{ + return {}; +} } // namespace template constexpr auto tableRef2InputSpec() { + std::vector metadata; + auto m = getInputMetadata>::metadata>(); + metadata.insert(metadata.end(), m.begin(), m.end()); + auto ccdbMetadata = getCCDBMetadata>::metadata>(); + metadata.insert(metadata.end(), ccdbMetadata.begin(), ccdbMetadata.end()); + return framework::InputSpec{ o2::aod::label(), o2::aod::origin(), o2::aod::description(o2::aod::signature()), R.version, framework::Lifetime::Timeframe, - getInputMetadata>::metadata>()}; + metadata}; } template diff --git a/Framework/Core/include/Framework/AnalysisSupportHelpers.h b/Framework/Core/include/Framework/AnalysisSupportHelpers.h index 4ae601dc9e4a2..a4e80decf2bbe 100644 --- a/Framework/Core/include/Framework/AnalysisSupportHelpers.h +++ b/Framework/Core/include/Framework/AnalysisSupportHelpers.h @@ -39,6 +39,11 @@ struct AnalysisSupportHelpers { std::vector const& requestedSpecials, std::vector& requestedAODs, DataProcessorSpec& publisher); + static void addMissingOutputsToAnalysisCCDBFetcher(std::vector const& providedSpecials, + std::vector const& requestedSpecials, + std::vector& requestedAODs, + std::vector& requestedDYNs, + DataProcessorSpec& publisher); static void addMissingOutputsToBuilder(std::vector const& requestedSpecials, std::vector& requestedAODs, std::vector& requestedDYNs, diff --git a/Framework/Core/include/Framework/AnalysisTask.h b/Framework/Core/include/Framework/AnalysisTask.h index b3378543e6ebb..53f6bc0f862d6 100644 --- a/Framework/Core/include/Framework/AnalysisTask.h +++ b/Framework/Core/include/Framework/AnalysisTask.h @@ -65,7 +65,8 @@ concept is_enumeration = is_enumeration_v>; // Helper struct which builds a DataProcessorSpec from // the contents of an AnalysisTask... -namespace { +namespace +{ struct AnalysisDataProcessorBuilder { template static void addGroupingCandidates(Cache& bk, Cache& bku, bool enabled) @@ -417,7 +418,7 @@ struct AnalysisDataProcessorBuilder { std::invoke(processingFunction, task, g, std::get(at)...); } }; -} +} // namespace struct SetDefaultProcesses { std::vector> map; @@ -429,7 +430,8 @@ struct TaskName { std::string value; }; -namespace { +namespace +{ template auto getTaskNameSetProcesses(std::string& outputName, TaskName first, SetDefaultProcesses second, A... args) { @@ -493,7 +495,7 @@ auto getTaskNameSetProcesses(std::string& outputName, A... args) return task; } -} +} // namespace /// Adaptor to make an AlgorithmSpec from a o2::framework::Task /// diff --git a/Framework/Core/src/ASoA.cxx b/Framework/Core/src/ASoA.cxx index 3a681ee931a2b..ba0d105b9d8cc 100644 --- a/Framework/Core/src/ASoA.cxx +++ b/Framework/Core/src/ASoA.cxx @@ -14,6 +14,12 @@ #include "Framework/RuntimeError.h" #include #include +#include +#include +#include +#include +#include +#include namespace o2::soa { @@ -149,6 +155,7 @@ arrow::ChunkedArray* getIndexFromLabel(arrow::Table* table, std::string_view lab return caseInsensitiveCompare(label, f->name()); }); if (field == table->schema()->fields().end()) { + std::cout << table->ToString() << std::endl; o2::framework::throw_error(o2::framework::runtime_error_f("Unable to find column with label %s", label)); } auto index = std::distance(table->schema()->fields().begin(), field); @@ -170,6 +177,62 @@ void missingOptionalPreslice(const char* label, const char* key) throw o2::framework::runtime_error_f(R"(Optional Preslice with missing binding used: table "%s" (or join based on it) does not have column "%s")", label, key); } +void* extractCCDBPayload(char *payload, size_t size, TClass const* cl, const char* what) +{ + Int_t previousErrorLevel = gErrorIgnoreLevel; + gErrorIgnoreLevel = kFatal; + // does it have a flattened headers map attached in the end? + TMemFile file("name", (char*)payload, size, "READ"); + gErrorIgnoreLevel = previousErrorLevel; + if (file.IsZombie()) { + return nullptr; + } + + if (!cl) { + return nullptr; + } + auto object = file.GetObjectChecked(what, cl); + if (!object) { + // it could be that object was stored with previous convention + // where the classname was taken as key + std::string objectName(cl->GetName()); + objectName.erase(std::find_if(objectName.rbegin(), objectName.rend(), [](unsigned char ch) { + return !std::isspace(ch); + }).base(), + objectName.end()); + objectName.erase(objectName.begin(), std::find_if(objectName.begin(), objectName.end(), [](unsigned char ch) { + return !std::isspace(ch); + })); + + object = file.GetObjectChecked(objectName.c_str(), cl); + LOG(warn) << "Did not find object under expected name " << what; + if (!object) { + return nullptr; + } + LOG(warn) << "Found object under deprecated name " << cl->GetName(); + } + auto result = object; + // We need to handle some specific cases as ROOT ties them deeply + // to the file they are contained in + if (cl->InheritsFrom("TObject")) { + // make a clone + // detach from the file + auto tree = dynamic_cast((TObject*)object); + if (tree) { + tree->LoadBaskets(0x1L << 32); // make tree memory based + tree->SetDirectory(nullptr); + result = tree; + } else { + auto h = dynamic_cast((TObject*)object); + if (h) { + h->SetDirectory(nullptr); + result = h; + } + } + } + return result; +} + } // namespace o2::soa namespace o2::framework diff --git a/Framework/Core/src/AnalysisSupportHelpers.cxx b/Framework/Core/src/AnalysisSupportHelpers.cxx index eb17566fd6d31..e8c2d7acab5d2 100644 --- a/Framework/Core/src/AnalysisSupportHelpers.cxx +++ b/Framework/Core/src/AnalysisSupportHelpers.cxx @@ -207,6 +207,35 @@ void AnalysisSupportHelpers::addMissingOutputsToBuilder(std::vector c } } +void AnalysisSupportHelpers::addMissingOutputsToAnalysisCCDBFetcher( + std::vector const& providedSpecials, + std::vector const& requestedSpecials, + std::vector& requestedAODs, + std::vector& requestedDYNs, + DataProcessorSpec& publisher) +{ + for (auto& input : requestedSpecials) { + auto concrete = DataSpecUtils::asConcreteDataMatcher(input); + publisher.outputs.emplace_back(concrete.origin, concrete.description, concrete.subSpec); + // FIXME: good enough for now... + for (auto& i : input.metadata) { + if ((i.type == VariantType::String) && (i.name.find("input:") != std::string::npos)) { + auto value = i.defaultValue.get(); + auto spec = DataSpecUtils::fromMetadataString(i.defaultValue.get()); + auto j = std::find_if(publisher.inputs.begin(), publisher.inputs.end(), [&](auto x) { return x.binding == spec.binding; }); + if (j == publisher.inputs.end()) { + publisher.inputs.push_back(spec); + } + if (DataSpecUtils::partialMatch(spec, AODOrigins)) { + DataSpecUtils::updateInputList(requestedAODs, std::move(spec)); + } else if (DataSpecUtils::partialMatch(spec, header::DataOrigin{"DYN"})) { + DataSpecUtils::updateInputList(requestedDYNs, std::move(spec)); + } + } + } + } +} + // ============================================================================= DataProcessorSpec AnalysisSupportHelpers::getOutputObjHistSink(ConfigContext const& ctx) { diff --git a/Framework/Core/src/ArrowSupport.cxx b/Framework/Core/src/ArrowSupport.cxx index 3a7699fb6876d..cb700dca34779 100644 --- a/Framework/Core/src/ArrowSupport.cxx +++ b/Framework/Core/src/ArrowSupport.cxx @@ -19,6 +19,8 @@ #include "Framework/ServiceRegistry.h" #include "Framework/ConfigContext.h" #include "Framework/CommonDataProcessors.h" +#include "Framework/DataSpecUtils.h" +#include "Framework/DataSpecViews.h" #include "Framework/DeviceSpec.h" #include "Framework/EndOfStreamContext.h" #include "Framework/Tracing.h" @@ -27,6 +29,7 @@ #include "Framework/DeviceInfo.h" #include "Framework/DevicesManager.h" #include "Framework/DeviceConfig.h" +#include "Framework/PluginManager.h" #include "Framework/ServiceMetricsInfo.h" #include "WorkflowHelpers.h" #include "Framework/WorkflowSpecNode.h" @@ -441,6 +444,7 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() .adjustTopology = [](WorkflowSpecNode& node, ConfigContext const& ctx) { auto& workflow = node.specs; auto spawner = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-spawner"; }); + auto analysisCCDB = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-ccdb"; }); auto builder = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-index-builder"; }); auto reader = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-reader"; }); auto writer = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-writer"; }); @@ -448,6 +452,8 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() ac.requestedAODs.clear(); ac.requestedDYNs.clear(); ac.providedDYNs.clear(); + ac.providedTIMs.clear(); + ac.requestedTIMs.clear(); auto inputSpecLessThan = [](InputSpec const& lhs, InputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); }; @@ -511,6 +517,27 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() AnalysisSupportHelpers::addMissingOutputsToSpawner({}, ac.spawnerInputs, ac.requestedAODs, *spawner); } + if (analysisCCDB != workflow.end()) { + for (auto& d : workflow | views::exclude_by_name(analysisCCDB->name)) { + d.inputs | views::partial_match_filter(header::DataOrigin{"TIM"}) | sinks::update_input_list{ac.requestedTIMs}; + d.outputs | views::partial_match_filter(header::DataOrigin{"TIM"}) | sinks::append_to{ac.providedTIMs}; + } + std::sort(ac.requestedTIMs.begin(), ac.requestedTIMs.end(), inputSpecLessThan); + std::sort(ac.providedTIMs.begin(), ac.providedTIMs.end(), outputSpecLessThan); + // Use ranges::to> in C++23... + ac.analysisCCDBInputs.clear(); + ac.requestedTIMs | views::filter_not_matching(ac.providedTIMs) | sinks::append_to{ac.analysisCCDBInputs}; + + // recreate inputs and outputs + analysisCCDB->outputs.clear(); + analysisCCDB->inputs.clear(); + // replace AlgorithmSpec + // FIXME: it should be made more generic, so it does not need replacement... + // FIXME how can I make the lookup depend on DYN tables as well?? + analysisCCDB->algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkCCDBSupport", "AnalysisCCDBFetcherPlugin", ctx); + AnalysisSupportHelpers::addMissingOutputsToAnalysisCCDBFetcher({}, ac.analysisCCDBInputs, ac.requestedAODs, ac.requestedDYNs, *analysisCCDB); + } + if (writer != workflow.end()) { workflow.erase(writer); } @@ -538,6 +565,8 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() } } + + // replace writer as some outputs may have become dangling and some are now consumed auto [outputsInputs, isDangling] = WorkflowHelpers::analyzeOutputs(workflow); diff --git a/Framework/Core/src/WorkflowHelpers.cxx b/Framework/Core/src/WorkflowHelpers.cxx index b86a4f15e7306..dcbd1f6f51f7e 100644 --- a/Framework/Core/src/WorkflowHelpers.cxx +++ b/Framework/Core/src/WorkflowHelpers.cxx @@ -18,6 +18,7 @@ #include "Framework/ConfigContext.h" #include "Framework/DeviceSpec.h" #include "Framework/DataSpecUtils.h" +#include "Framework/DataSpecViews.h" #include "Framework/DataAllocator.h" #include "Framework/ControlService.h" #include "Framework/RawDeviceService.h" @@ -184,6 +185,21 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext {"end-value-enumeration", VariantType::Int64, -1ll, {"final value for the enumeration"}}, {"step-value-enumeration", VariantType::Int64, 1ll, {"step between one value and the other"}}}, }; + DataProcessorSpec analysisCCDBBackend{ + .name = "internal-dpl-aod-ccdb", + .inputs = {}, + .outputs = {}, + .algorithm = AlgorithmSpec::dummyAlgorithm(), + .options = {{"condition-backend", VariantType::String, defaultConditionBackend(), {"URL for CCDB"}}, + {"condition-not-before", VariantType::Int64, 0ll, {"do not fetch from CCDB objects created before provide timestamp"}}, + {"condition-not-after", VariantType::Int64, 3385078236000ll, {"do not fetch from CCDB objects created after the timestamp"}}, + {"condition-remap", VariantType::String, "", {"remap condition path in CCDB based on the provided string."}}, + {"condition-tf-per-query", VariantType::Int, defaultConditionQueryRate(), {"check condition validity per requested number of TFs, fetch only once if <=0"}}, + {"condition-tf-per-query-multiplier", VariantType::Int, defaultConditionQueryRateMultiplier(), {"check conditions once per this amount of nominal checks"}}, + {"condition-time-tolerance", VariantType::Int64, 5000ll, {"prefer creation time if its difference to orbit-derived time exceeds threshold (ms), impose if <0"}}, + {"start-value-enumeration", VariantType::Int64, 0ll, {"initial value for the enumeration"}}, + {"end-value-enumeration", VariantType::Int64, -1ll, {"final value for the enumeration"}}, + {"step-value-enumeration", VariantType::Int64, 1ll, {"step between one value and the other"}}}}; DataProcessorSpec transientStore{"internal-dpl-transient-store", {}, {}, @@ -357,6 +373,9 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext if (DataSpecUtils::partialMatch(input, header::DataOrigin{"IDX"})) { DataSpecUtils::updateInputList(ac.requestedIDXs, InputSpec{input}); } + if (DataSpecUtils::partialMatch(input, header::DataOrigin{"TIM"})) { + DataSpecUtils::updateInputList(ac.requestedTIMs, InputSpec{input}); + } } std::stable_sort(timer.outputs.begin(), timer.outputs.end(), [](OutputSpec const& a, OutputSpec const& b) { return *DataSpecUtils::getOptionalSubSpec(a) < *DataSpecUtils::getOptionalSubSpec(b); }); @@ -366,6 +385,8 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext ac.providedAODs.emplace_back(output); } else if (DataSpecUtils::partialMatch(output, header::DataOrigin{"DYN"})) { ac.providedDYNs.emplace_back(output); + } else if (DataSpecUtils::partialMatch(output, header::DataOrigin{"TIM"})) { + ac.providedTIMs.emplace_back(output); } else if (DataSpecUtils::partialMatch(output, header::DataOrigin{"ATSK"})) { ac.providedOutputObjHist.emplace_back(output); auto it = std::find_if(ac.outObjHistMap.begin(), ac.outObjHistMap.end(), [&](auto&& x) { return x.id == hash; }); @@ -384,7 +405,9 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext auto inputSpecLessThan = [](InputSpec const& lhs, InputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); }; auto outputSpecLessThan = [](OutputSpec const& lhs, OutputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); }; std::sort(ac.requestedDYNs.begin(), ac.requestedDYNs.end(), inputSpecLessThan); + std::sort(ac.requestedTIMs.begin(), ac.requestedTIMs.end(), inputSpecLessThan); std::sort(ac.providedDYNs.begin(), ac.providedDYNs.end(), outputSpecLessThan); + std::sort(ac.providedTIMs.begin(), ac.providedTIMs.end(), outputSpecLessThan); DataProcessorSpec indexBuilder{ "internal-dpl-aod-index-builder", @@ -394,6 +417,9 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext {}}; AnalysisSupportHelpers::addMissingOutputsToBuilder(ac.requestedIDXs, ac.requestedAODs, ac.requestedDYNs, indexBuilder); + ac.requestedTIMs | views::filter_not_matching(ac.providedTIMs) | sinks::append_to(ac.analysisCCDBInputs); + AnalysisSupportHelpers::addMissingOutputsToAnalysisCCDBFetcher({}, ac.analysisCCDBInputs, ac.requestedAODs, ac.requestedTIMs, analysisCCDBBackend); + for (auto& input : ac.requestedDYNs) { if (std::none_of(ac.providedDYNs.begin(), ac.providedDYNs.end(), [&input](auto const& x) { return DataSpecUtils::match(input, x); })) { ac.spawnerInputs.emplace_back(input); @@ -568,6 +594,15 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext } } + // add the Analysys CCDB backend which reads CCDB objects using a provided + // table + if (analysisCCDBBackend.outputs.empty() == false) { + // add normal reader + auto&& algo = PluginManager::loadAlgorithmFromPlugin("O2FrameworkCCDBSupport", "AnalysisCCDBFetcherPlugin", ctx); + analysisCCDBBackend.algorithm = algo; + extraSpecs.push_back(analysisCCDBBackend); + } + // add the timer if (timer.outputs.empty() == false) { extraSpecs.push_back(timer); diff --git a/Framework/TestWorkflows/CMakeLists.txt b/Framework/TestWorkflows/CMakeLists.txt index b147a4871bf26..f5d18183c3705 100644 --- a/Framework/TestWorkflows/CMakeLists.txt +++ b/Framework/TestWorkflows/CMakeLists.txt @@ -41,6 +41,11 @@ o2_add_dpl_workflow(analysis-histograms SOURCES src/o2TestHistograms.cxx COMPONENT_NAME TestWorkflows) +o2_add_dpl_workflow(analysis-ccdb + SOURCES src/o2TestAnalysisCCDB.cxx + PUBLIC_LINK_LIBRARIES O2::DataFormatsTOF + COMPONENT_NAME TestWorkflows) + o2_add_dpl_workflow(two-timers SOURCES src/o2TwoTimers.cxx COMPONENT_NAME TestWorkflows) diff --git a/Framework/TestWorkflows/src/o2TestAnalysisCCDB.cxx b/Framework/TestWorkflows/src/o2TestAnalysisCCDB.cxx new file mode 100644 index 0000000000000..f9684762539f7 --- /dev/null +++ b/Framework/TestWorkflows/src/o2TestAnalysisCCDB.cxx @@ -0,0 +1,69 @@ +// Copyright 2019-2025 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. +/// +/// \brief FullTracks is a join of Tracks, TracksCov, and TracksExtra. +/// \author +/// \since + +#include "Framework/runDataProcessing.h" +#include "Framework/AnalysisTask.h" +#include "Framework/AnalysisDataModel.h" +#include "DataFormatsTOF/CalibLHCphaseTOF.h" +#include + +#include + +using namespace o2; +using namespace o2::framework; +using namespace o2::framework::expressions; + +namespace o2::aod +{ +namespace tofcalib +{ +DECLARE_SOA_CCDB_COLUMN(LHCphase, lhcPhase, o2::dataformats::CalibLHCphaseTOF, "TOF/Calib/LHCphase"); //! +} // namespace tofcalib + +DECLARE_SOA_TIMESTAMPED_TABLE(TOFCalibrationObjects, aod::Timestamps, o2::aod::timestamp::Timestamp, 1, "TOFCALIB", //! + tofcalib::LHCphase); +} // namespace o2::aod + +struct DummyTimestampsTable { + Produces timestamps; /// Table with SOR timestamps produced by the task + Service control; + + void process(Enumeration<0, 1>& e) + { + timestamps(1747442464000); // c2b3d801393540b7bddb949d600b199f, ecacb915-3d70-11f0-ac6f-808de0f5250c + timestamps(1747442764000); // 0262dbd9d50aa79c3d4dcd5ec3ca67c3, ed5471c5-3d70-11f0-b0a3-808de0f524ee + control->readyToQuit(QuitRequest::Me); + control->endOfStream(); + std::cout << "Executed " << std::endl; + } +}; + +struct SimpleCCDBConsumer { + void process(o2::aod::TOFCalibrationObjects const& ccdbObjectsForAllTimestamps) + { + LOGP(info, "Looking at all the LHCphases associated to the timestamps"); + for (auto& object : ccdbObjectsForAllTimestamps) { + std::cout << object.lhcPhase().getStartValidity() << " " << object.lhcPhase().getEndValidity() << std::endl; + } + } +}; + +WorkflowSpec defineDataProcessing(ConfigContext const& cfgc) +{ + return WorkflowSpec{ + adaptAnalysisTask(cfgc), + adaptAnalysisTask(cfgc, TaskName{"simple-ccdb-cunsumer"}), + }; +}