From 113d5514eecde4b861c3de7513a147ff03d1bbc8 Mon Sep 17 00:00:00 2001 From: Anton Alkin Date: Tue, 9 Dec 2025 10:15:35 +0100 Subject: [PATCH 01/23] remove obsolete code --- Framework/Core/src/WorkflowHelpers.cxx | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/Framework/Core/src/WorkflowHelpers.cxx b/Framework/Core/src/WorkflowHelpers.cxx index 17f6c9eb7ddb6..6abc37cf000b8 100644 --- a/Framework/Core/src/WorkflowHelpers.cxx +++ b/Framework/Core/src/WorkflowHelpers.cxx @@ -157,18 +157,6 @@ int defaultConditionQueryRateMultiplier() void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext& ctx) { - auto fakeCallback = AlgorithmSpec{[](InitContext& ic) { - LOG(info) << "This is not a real device, merely a placeholder for external inputs"; - LOG(info) << "To be hidden / removed at some point."; - // mark this dummy process as ready-to-quit - ic.services().get().readyToQuit(QuitRequest::Me); - - return [](ProcessingContext& pc) { - // this callback is never called since there is no expiring input - pc.services().get().waitFor(2000); - }; - }}; - DataProcessorSpec ccdbBackend{ .name = "internal-dpl-ccdb-backend", .outputs = {}, From f52b13d0bbbffda9be4a7c9ed8d0a6676259393f Mon Sep 17 00:00:00 2001 From: Anton Alkin Date: Tue, 9 Dec 2025 10:17:56 +0100 Subject: [PATCH 02/23] std::any_of instead of a loop --- Framework/Core/src/WorkflowHelpers.cxx | 17 +++-------------- 1 file changed, 3 insertions(+), 14 deletions(-) diff --git a/Framework/Core/src/WorkflowHelpers.cxx b/Framework/Core/src/WorkflowHelpers.cxx index 6abc37cf000b8..d7486a0b28a30 100644 --- a/Framework/Core/src/WorkflowHelpers.cxx +++ b/Framework/Core/src/WorkflowHelpers.cxx @@ -269,20 +269,9 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext processor.options.push_back(ConfigParamSpec{"end-value-enumeration", VariantType::Int64, -1ll, {"final value for the enumeration"}}); processor.options.push_back(ConfigParamSpec{"step-value-enumeration", VariantType::Int64, 1ll, {"step between one value and the other"}}); } - bool hasTimeframeInputs = false; - for (auto& input : processor.inputs) { - if (input.lifetime == Lifetime::Timeframe) { - hasTimeframeInputs = true; - break; - } - } - bool hasTimeframeOutputs = false; - for (auto& output : processor.outputs) { - if (output.lifetime == Lifetime::Timeframe) { - hasTimeframeOutputs = true; - break; - } - } + bool hasTimeframeInputs = std::any_of(processor.inputs.begin(), processor.inputs.end(), [](auto const& input){ return input.lifetime == Lifetime::Timeframe; }); + bool hasTimeframeOutputs = std::any_of(processor.outputs.begin(), processor.outputs.end(), [](auto const& output){ return output.lifetime == Lifetime::Timeframe; }); + // A timeframeSink consumes timeframes without creating new // timeframe data. bool timeframeSink = hasTimeframeInputs && !hasTimeframeOutputs; From cfe60b9333048e524dd086575bc6d9b311b2503c Mon Sep 17 00:00:00 2001 From: Anton Alkin Date: Tue, 9 Dec 2025 10:23:08 +0100 Subject: [PATCH 03/23] std::find_if instead of a loop --- Framework/Core/src/WorkflowHelpers.cxx | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/Framework/Core/src/WorkflowHelpers.cxx b/Framework/Core/src/WorkflowHelpers.cxx index d7486a0b28a30..a954c532d92bd 100644 --- a/Framework/Core/src/WorkflowHelpers.cxx +++ b/Framework/Core/src/WorkflowHelpers.cxx @@ -281,14 +281,13 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext uint32_t hash = runtime_hash(processor.name.c_str()); bool hasMatch = false; ConcreteDataMatcher summaryMatcher = ConcreteDataMatcher{"DPL", "SUMMARY", static_cast(hash)}; - for (auto& output : processor.outputs) { - if (DataSpecUtils::match(output, summaryMatcher)) { - O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "output enumeration", "%{public}s already there in %{public}s", - DataSpecUtils::describe(output).c_str(), processor.name.c_str()); - hasMatch = true; - break; - } + auto summaryOutput = std::find_if(processor.outputs.begin(), processor.outputs.end(), [&summaryMatcher](auto const& output){ return DataSpecUtils::match(output, summaryMatcher); }); + if (summaryOutput != processor.outputs.end()) { + O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "output enumeration", "%{public}s already there in %{public}s", + DataSpecUtils::describe(*summaryOutput).c_str(), processor.name.c_str()); + hasMatch = true; } + if (!hasMatch) { O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "output enumeration", "Adding DPL/SUMMARY/%d to %{public}s", hash, processor.name.c_str()); processor.outputs.push_back(OutputSpec{{"dpl-summary"}, ConcreteDataMatcher{"DPL", "SUMMARY", static_cast(hash)}}); From c2798e918b8dc3ff52b5db1cb3a6a9f14c9758e1 Mon Sep 17 00:00:00 2001 From: Anton Alkin Date: Tue, 9 Dec 2025 10:28:04 +0100 Subject: [PATCH 04/23] use range adaptors for spawnerInputs --- Framework/Core/src/WorkflowHelpers.cxx | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/Framework/Core/src/WorkflowHelpers.cxx b/Framework/Core/src/WorkflowHelpers.cxx index a954c532d92bd..18ce4cce26d44 100644 --- a/Framework/Core/src/WorkflowHelpers.cxx +++ b/Framework/Core/src/WorkflowHelpers.cxx @@ -401,11 +401,7 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext AnalysisSupportHelpers::addMissingOutputsToAnalysisCCDBFetcher({}, ac.analysisCCDBInputs, ac.requestedAODs, ac.requestedTIMs, analysisCCDBBackend); } - for (auto& input : ac.requestedDYNs) { - if (std::none_of(ac.providedDYNs.begin(), ac.providedDYNs.end(), [&input](auto const& x) { return DataSpecUtils::match(input, x); })) { - ac.spawnerInputs.emplace_back(input); - } - } + ac.requestedDYNs | views::filter_not_matching(ac.providedDYNs) | sinks::append_to{ac.spawnerInputs}; DataProcessorSpec aodSpawner{ "internal-dpl-aod-spawner", From 3aec9e4b22c3355fc71498c9cddab0b13e499185 Mon Sep 17 00:00:00 2001 From: Anton Alkin Date: Tue, 9 Dec 2025 10:51:26 +0100 Subject: [PATCH 05/23] update addMissingOutputsToReader with ranges --- .../Core/include/Framework/DataSpecViews.h | 19 ++++++++++++ Framework/Core/src/AnalysisSupportHelpers.cxx | 31 +++++-------------- 2 files changed, 26 insertions(+), 24 deletions(-) diff --git a/Framework/Core/include/Framework/DataSpecViews.h b/Framework/Core/include/Framework/DataSpecViews.h index 0782cefd0f632..4d3d06a75ece0 100644 --- a/Framework/Core/include/Framework/DataSpecViews.h +++ b/Framework/Core/include/Framework/DataSpecViews.h @@ -31,6 +31,10 @@ static auto filter_not_matching(auto const& provided) return std::views::filter([&provided](auto const& input) { return std::none_of(provided.begin(), provided.end(), [&input](auto const& output) { return DataSpecUtils::match(input, output); }); }); } +static auto filter_matching(auto const& provided) +{ + return std::views::filter([&provided](auto const& input){ return std::any_of(provided.begin(), provided.end(), [&input](auto const& output){ return DataSpecUtils::match(input, output); }); }); +} } // namespace o2::framework::views // namespace o2::framework::sinks @@ -62,6 +66,21 @@ struct update_input_list { } }; +template +struct update_output_list { + Container& c; + // ends the pipeline, returns the container + template + friend Container& operator|(R&& r, update_output_list self) + { + for (auto& item : r) { + auto copy = item; + DataSpecUtils::updateOutputList(self.c, std::move(copy)); + } + return self.c; + } +}; + } // namespace o2::framework::sinks #endif // O2_FRAMEWORK_DATASPECVIEWS_H_ diff --git a/Framework/Core/src/AnalysisSupportHelpers.cxx b/Framework/Core/src/AnalysisSupportHelpers.cxx index b5c898faa515a..774c1b5304b17 100644 --- a/Framework/Core/src/AnalysisSupportHelpers.cxx +++ b/Framework/Core/src/AnalysisSupportHelpers.cxx @@ -11,6 +11,7 @@ #include "Framework/AnalysisSupportHelpers.h" #include "Framework/DataOutputDirector.h" +#include "Framework/DataSpecViews.h" #include "Framework/OutputObjHeader.h" #include "Framework/ControlService.h" #include "Framework/EndOfStreamContext.h" @@ -129,30 +130,12 @@ void AnalysisSupportHelpers::addMissingOutputsToReader(std::vector c std::vector const& requestedInputs, DataProcessorSpec& publisher) { - auto matchingOutputFor = [](InputSpec const& requested) { - return [&requested](OutputSpec const& provided) { - return DataSpecUtils::match(requested, provided); - }; - }; - for (InputSpec const& requested : requestedInputs) { - auto provided = std::find_if(providedOutputs.begin(), - providedOutputs.end(), - matchingOutputFor(requested)); - - if (provided != providedOutputs.end()) { - continue; - } - - auto inList = std::find_if(publisher.outputs.begin(), - publisher.outputs.end(), - matchingOutputFor(requested)); - if (inList != publisher.outputs.end()) { - continue; - } - - auto concrete = DataSpecUtils::asConcreteDataMatcher(requested); - publisher.outputs.emplace_back(concrete.origin, concrete.description, concrete.subSpec, requested.lifetime, requested.metadata); - } + requestedInputs | + views::filter_not_matching(providedOutputs) | // filter the inputs that are already provided + std::views::transform([](auto const& req){ // create outputspecs for unmatched inputs + return DataSpecUtils::asOutputSpec(req); + }) | + sinks::update_output_list{publisher.outputs}; // append them to the publisher outputs } void AnalysisSupportHelpers::addMissingOutputsToSpawner(std::vector const& providedSpecials, From e97ad87a45b321590879a415970ebdeaab13dc7d Mon Sep 17 00:00:00 2001 From: Anton Alkin Date: Tue, 9 Dec 2025 11:13:23 +0100 Subject: [PATCH 06/23] update addMissingOutputsToSpawner with ranges --- Framework/Core/src/AnalysisSupportHelpers.cxx | 37 ++++++++++--------- 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/Framework/Core/src/AnalysisSupportHelpers.cxx b/Framework/Core/src/AnalysisSupportHelpers.cxx index 774c1b5304b17..a47e7ecb7833a 100644 --- a/Framework/Core/src/AnalysisSupportHelpers.cxx +++ b/Framework/Core/src/AnalysisSupportHelpers.cxx @@ -143,25 +143,26 @@ void AnalysisSupportHelpers::addMissingOutputsToSpawner(std::vector std::vector& requestedAODs, DataProcessorSpec& publisher) { - for (auto& input : requestedSpecials) { - if (std::any_of(providedSpecials.begin(), providedSpecials.end(), [&input](auto const& x) { - return DataSpecUtils::match(input, x); - })) { - continue; - } - auto concrete = DataSpecUtils::asConcreteDataMatcher(input); - publisher.outputs.emplace_back(concrete.origin, concrete.description, concrete.subSpec); - for (auto& i : input.metadata) { - if ((i.type == VariantType::String) && (i.name.find("input:") != std::string::npos)) { - auto spec = DataSpecUtils::fromMetadataString(i.defaultValue.get()); - auto j = std::find(publisher.inputs.begin(), publisher.inputs.end(), spec); - if (j == publisher.inputs.end()) { - publisher.inputs.push_back(spec); - } - DataSpecUtils::updateInputList(requestedAODs, std::move(spec)); - } - } + requestedSpecials | + views::filter_not_matching(providedSpecials) | // filter the inputs that are already provided + std::views::transform([](auto const& req){ // create outputspecs for unmatched inputs + return DataSpecUtils::asOutputSpec(req); + }) | + sinks::append_to(publisher.outputs); // append them to the publisher outputs + + std::vector additionalInputs; + for (auto& input : requestedSpecials | views::filter_not_matching(providedSpecials)) { + input.metadata | + std::views::filter([](auto const& param){ // filter config params that are strings starting with "input:" + return (param.type == VariantType::String) && (param.name.find("input:") != std::string::npos); + }) | + std::views::transform([](auto const& param){ // parse them into InputSpecs + return DataSpecUtils::fromMetadataString(param.defaultValue.template get()); + }) | + sinks::update_input_list{additionalInputs}; // store into a temporary } + additionalInputs | sinks::update_input_list{requestedAODs}; // update requestedAODs + additionalInputs | sinks::update_input_list{publisher.inputs}; // update publisher inputs } void AnalysisSupportHelpers::addMissingOutputsToBuilder(std::vector const& requestedSpecials, From 80deb08b6cfc51466689a3ab4350d36fbec07d31 Mon Sep 17 00:00:00 2001 From: Anton Alkin Date: Tue, 9 Dec 2025 11:34:16 +0100 Subject: [PATCH 07/23] update addMissingOutputsToBuilder with ranges --- Framework/Core/src/AnalysisSupportHelpers.cxx | 42 +++++++++++-------- 1 file changed, 25 insertions(+), 17 deletions(-) diff --git a/Framework/Core/src/AnalysisSupportHelpers.cxx b/Framework/Core/src/AnalysisSupportHelpers.cxx index a47e7ecb7833a..c4cc0de677a82 100644 --- a/Framework/Core/src/AnalysisSupportHelpers.cxx +++ b/Framework/Core/src/AnalysisSupportHelpers.cxx @@ -170,24 +170,32 @@ void AnalysisSupportHelpers::addMissingOutputsToBuilder(std::vector c std::vector& requestedDYNs, DataProcessorSpec& publisher) { - for (auto& input : requestedSpecials) { - auto concrete = DataSpecUtils::asConcreteDataMatcher(input); - publisher.outputs.emplace_back(concrete.origin, concrete.description, concrete.subSpec); - for (auto& i : input.metadata) { - if ((i.type == VariantType::String) && (i.name.find("input:") != std::string::npos)) { - auto spec = DataSpecUtils::fromMetadataString(i.defaultValue.get()); - auto j = std::find_if(publisher.inputs.begin(), publisher.inputs.end(), [&](auto x) { return x.binding == spec.binding; }); - if (j == publisher.inputs.end()) { - publisher.inputs.push_back(spec); - } - if (DataSpecUtils::partialMatch(spec, AODOrigins)) { - DataSpecUtils::updateInputList(requestedAODs, std::move(spec)); - } else if (DataSpecUtils::partialMatch(spec, header::DataOrigin{"DYN"})) { - DataSpecUtils::updateInputList(requestedDYNs, std::move(spec)); - } - } - } + requestedSpecials | + std::views::transform([](auto const& req){ // create outputspecs for inputs + return DataSpecUtils::asOutputSpec(req); + }) | + sinks::append_to{publisher.outputs}; // append them to the publisher outputs + + std::vector additionalInputs; + for (auto const& input : requestedSpecials) { + input.metadata | + std::views::filter([](auto const& param){ // filter config params that are strings starting with "input:" + return (param.type == VariantType::String) && (param.name.find("input:") != std::string::npos); + }) | + std::views::transform([](auto const& param){ // parse them into InputSpecs + return DataSpecUtils::fromMetadataString(param.defaultValue.template get()); + }) | + sinks::update_input_list{additionalInputs}; // store into a temporary } + + additionalInputs | sinks::update_input_list(publisher.inputs); // update publisher inputs + // FIXME: until we have a single list of pairs + additionalInputs | + views::partial_match_filter(AODOrigins) | + sinks::update_input_list{requestedAODs}; // update requestedAODs + additionalInputs | + views::partial_match_filter(header::DataOrigin{"DYN"}) | + sinks::update_input_list{requestedDYNs}; // update requestedDYNs } void AnalysisSupportHelpers::addMissingOutputsToAnalysisCCDBFetcher( From 95ad8da9ee856656951c2f0ee61b08d71f4b005b Mon Sep 17 00:00:00 2001 From: Anton Alkin Date: Tue, 9 Dec 2025 12:14:04 +0100 Subject: [PATCH 08/23] update addMissingOutputsToAnalysisCCDBFetcher with ranges --- Framework/Core/src/AnalysisSupportHelpers.cxx | 41 +++++++++++-------- 1 file changed, 24 insertions(+), 17 deletions(-) diff --git a/Framework/Core/src/AnalysisSupportHelpers.cxx b/Framework/Core/src/AnalysisSupportHelpers.cxx index c4cc0de677a82..85b565f79ec4e 100644 --- a/Framework/Core/src/AnalysisSupportHelpers.cxx +++ b/Framework/Core/src/AnalysisSupportHelpers.cxx @@ -205,25 +205,32 @@ void AnalysisSupportHelpers::addMissingOutputsToAnalysisCCDBFetcher( std::vector& requestedDYNs, DataProcessorSpec& publisher) { + requestedSpecials | + std::views::transform([](auto const& req){ // create outputspecs for inputs + return DataSpecUtils::asOutputSpec(req); + }) | + sinks::append_to{publisher.outputs}; // append them to the publisher outputs + + std::vector additionalInputs; for (auto& input : requestedSpecials) { - auto concrete = DataSpecUtils::asConcreteDataMatcher(input); - publisher.outputs.emplace_back(concrete.origin, concrete.description, concrete.subSpec); - // FIXME: good enough for now... - for (auto& i : input.metadata) { - if ((i.type == VariantType::String) && (i.name.find("input:") != std::string::npos)) { - auto spec = DataSpecUtils::fromMetadataString(i.defaultValue.get()); - auto j = std::find_if(publisher.inputs.begin(), publisher.inputs.end(), [&](auto x) { return x.binding == spec.binding; }); - if (j == publisher.inputs.end()) { - publisher.inputs.push_back(spec); - } - if (DataSpecUtils::partialMatch(spec, AODOrigins)) { - DataSpecUtils::updateInputList(requestedAODs, std::move(spec)); - } else if (DataSpecUtils::partialMatch(spec, header::DataOrigin{"DYN"})) { - DataSpecUtils::updateInputList(requestedDYNs, std::move(spec)); - } - } - } + input.metadata | + std::views::filter([](auto const& param){ // filter config params that are strings starting with "input:" + return (param.type == VariantType::String) && (param.name.find("input:") != std::string::npos); + }) | + std::views::transform([](auto const& param){ // parse them into InputSpecs + return DataSpecUtils::fromMetadataString(param.defaultValue.template get()); + }) | + sinks::update_input_list{additionalInputs}; // store into a temporary } + + additionalInputs | sinks::update_input_list(publisher.inputs); // update publisher inputs + // FIXME: until we have a single list of pairs + additionalInputs | + views::partial_match_filter(AODOrigins) | + sinks::update_input_list{requestedAODs}; // update requestedAODs + additionalInputs | + views::partial_match_filter(header::DataOrigin{"DYN"}) | + sinks::update_input_list{requestedDYNs}; // update requestedDYNs } // ============================================================================= From a098c0c23a562e0cec295b0b0c1e62ace0495512 Mon Sep 17 00:00:00 2001 From: Anton Alkin Date: Tue, 9 Dec 2025 12:14:42 +0100 Subject: [PATCH 09/23] remove unused includes --- Framework/Core/src/AnalysisSupportHelpers.cxx | 4 ---- 1 file changed, 4 deletions(-) diff --git a/Framework/Core/src/AnalysisSupportHelpers.cxx b/Framework/Core/src/AnalysisSupportHelpers.cxx index 85b565f79ec4e..db39a29186905 100644 --- a/Framework/Core/src/AnalysisSupportHelpers.cxx +++ b/Framework/Core/src/AnalysisSupportHelpers.cxx @@ -12,10 +12,6 @@ #include "Framework/AnalysisSupportHelpers.h" #include "Framework/DataOutputDirector.h" #include "Framework/DataSpecViews.h" -#include "Framework/OutputObjHeader.h" -#include "Framework/ControlService.h" -#include "Framework/EndOfStreamContext.h" -#include "Framework/DeviceSpec.h" #include "Framework/PluginManager.h" #include "Framework/ConfigContext.h" #include "WorkflowHelpers.h" From 6c1c30fd68f5b9a82e0543c5cd5ffbdb64f3bc44 Mon Sep 17 00:00:00 2001 From: Anton Alkin Date: Tue, 9 Dec 2025 12:17:42 +0100 Subject: [PATCH 10/23] remove unused includes --- Framework/Core/src/WorkflowHelpers.cxx | 1 - 1 file changed, 1 deletion(-) diff --git a/Framework/Core/src/WorkflowHelpers.cxx b/Framework/Core/src/WorkflowHelpers.cxx index 18ce4cce26d44..03dafdd94f7ac 100644 --- a/Framework/Core/src/WorkflowHelpers.cxx +++ b/Framework/Core/src/WorkflowHelpers.cxx @@ -19,7 +19,6 @@ #include "Framework/DataSpecUtils.h" #include "Framework/DataSpecViews.h" #include "Framework/DataAllocator.h" -#include "Framework/ControlService.h" #include "Framework/RawDeviceService.h" #include "Framework/StringHelpers.h" #include "Framework/ChannelSpecHelpers.h" From c875b916e91c68d368acfa2bd0d01078bad59add Mon Sep 17 00:00:00 2001 From: Anton Alkin Date: Tue, 9 Dec 2025 12:21:18 +0100 Subject: [PATCH 11/23] use ranges to populate requestedIDXs in adjustTopology --- Framework/Core/src/ArrowSupport.cxx | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/Framework/Core/src/ArrowSupport.cxx b/Framework/Core/src/ArrowSupport.cxx index cf2d364027932..299d48dc7fe34 100644 --- a/Framework/Core/src/ArrowSupport.cxx +++ b/Framework/Core/src/ArrowSupport.cxx @@ -595,23 +595,16 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() ac.providedTIMs.clear(); ac.requestedTIMs.clear(); - auto inputSpecLessThan = [](InputSpec const& lhs, InputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); }; auto outputSpecLessThan = [](OutputSpec const& lhs, OutputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); }; if (builder != workflow.end()) { // collect currently requested IDXs ac.requestedIDXs.clear(); - for (auto& d : workflow) { - if (d.name == builder->name) { - continue; - } - for (auto& i : d.inputs) { - if (DataSpecUtils::partialMatch(i, header::DataOrigin{"IDX"})) { - auto copy = i; - DataSpecUtils::updateInputList(ac.requestedIDXs, std::move(copy)); - } - } + for (auto& d : workflow | views::exclude_by_name(builder->name)) { + d.inputs | + views::partial_match_filter(header::DataOrigin{"IDX"}) | + sinks::update_input_list{ac.requestedIDXs}; } // recreate inputs and outputs builder->inputs.clear(); From adf24a1a414b15262edd03cc08836c1678c0a21c Mon Sep 17 00:00:00 2001 From: Anton Alkin Date: Tue, 9 Dec 2025 12:25:29 +0100 Subject: [PATCH 12/23] use ranges to populate requestedDYNs and spawnerInputs in adjustTopology --- Framework/Core/src/ArrowSupport.cxx | 32 ++++++++++------------------- 1 file changed, 11 insertions(+), 21 deletions(-) diff --git a/Framework/Core/src/ArrowSupport.cxx b/Framework/Core/src/ArrowSupport.cxx index 299d48dc7fe34..54d440a9dfae0 100644 --- a/Framework/Core/src/ArrowSupport.cxx +++ b/Framework/Core/src/ArrowSupport.cxx @@ -617,37 +617,27 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() if (spawner != workflow.end()) { // collect currently requested DYNs - for (auto& d : workflow) { - if (d.name == spawner->name) { - continue; - } - for (auto const& i : d.inputs) { - if (DataSpecUtils::partialMatch(i, header::DataOrigin{"DYN"})) { - auto copy = i; - DataSpecUtils::updateInputList(ac.requestedDYNs, std::move(copy)); - } - } - for (auto const& o : d.outputs) { - if (DataSpecUtils::partialMatch(o, header::DataOrigin{"DYN"})) { - ac.providedDYNs.emplace_back(o); - } - } + for (auto& d : workflow | views::exclude_by_name(spawner->name)) { + d.inputs | + views::partial_match_filter(header::DataOrigin{"DYN"}) | + sinks::update_input_list{ac.requestedDYNs}; + d.outputs | + views::partial_match_filter(header::DataOrigin{"DYN"}) | + sinks::append_to{ac.providedDYNs}; } std::sort(ac.requestedDYNs.begin(), ac.requestedDYNs.end(), inputSpecLessThan); std::sort(ac.providedDYNs.begin(), ac.providedDYNs.end(), outputSpecLessThan); ac.spawnerInputs.clear(); - for (auto& input : ac.requestedDYNs) { - if (std::none_of(ac.providedDYNs.begin(), ac.providedDYNs.end(), [&input](auto const& x) { return DataSpecUtils::match(input, x); })) { - ac.spawnerInputs.emplace_back(input); - } - } + ac.requestedDYNs | + views::filter_not_matching(ac.providedDYNs) | + sinks::append_to{ac.spawnerInputs}; // recreate inputs and outputs spawner->outputs.clear(); spawner->inputs.clear(); + AnalysisSupportHelpers::addMissingOutputsToSpawner({}, ac.spawnerInputs, ac.requestedAODs, *spawner); // replace AlgorithmSpec // FIXME: it should be made more generic, so it does not need replacement... spawner->algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkOnDemandTablesSupport", "ExtendedTableSpawner", ctx); - AnalysisSupportHelpers::addMissingOutputsToSpawner({}, ac.spawnerInputs, ac.requestedAODs, *spawner); } if (analysisCCDB != workflow.end()) { From 072ccf1ce0bd249b7a6d84fcd7930ad17468774a Mon Sep 17 00:00:00 2001 From: Anton Alkin Date: Tue, 9 Dec 2025 12:27:57 +0100 Subject: [PATCH 13/23] use ranges to populate requestedAODs in adjustTopology --- Framework/Core/src/ArrowSupport.cxx | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/Framework/Core/src/ArrowSupport.cxx b/Framework/Core/src/ArrowSupport.cxx index 54d440a9dfae0..c63a4c702157c 100644 --- a/Framework/Core/src/ArrowSupport.cxx +++ b/Framework/Core/src/ArrowSupport.cxx @@ -669,12 +669,9 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() // If reader and/or builder were adjusted, remove unneeded outputs // update currently requested AODs for (auto& d : workflow) { - for (auto const& i : d.inputs) { - if (DataSpecUtils::partialMatch(i, AODOrigins)) { - auto copy = i; - DataSpecUtils::updateInputList(ac.requestedAODs, std::move(copy)); - } - } + d.inputs | + views::partial_match_filter(AODOrigins) | + sinks::update_input_list{ac.requestedAODs}; } // remove unmatched outputs @@ -688,8 +685,6 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() } } - - // replace writer as some outputs may have become dangling and some are now consumed auto [outputsInputs, isDangling] = WorkflowHelpers::analyzeOutputs(workflow); From b9f6aac4a49c184ded31413d9c1f03d86293cf6d Mon Sep 17 00:00:00 2001 From: Anton Alkin Date: Tue, 9 Dec 2025 13:49:15 +0100 Subject: [PATCH 14/23] ensure constness --- Framework/Core/include/Framework/DataSpecViews.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Framework/Core/include/Framework/DataSpecViews.h b/Framework/Core/include/Framework/DataSpecViews.h index 4d3d06a75ece0..3c362f822355a 100644 --- a/Framework/Core/include/Framework/DataSpecViews.h +++ b/Framework/Core/include/Framework/DataSpecViews.h @@ -58,7 +58,7 @@ struct update_input_list { template friend Container& operator|(R&& r, update_input_list self) { - for (auto& item : r) { + for (auto const& item : r) { auto copy = item; DataSpecUtils::updateInputList(self.c, std::move(copy)); } @@ -73,7 +73,7 @@ struct update_output_list { template friend Container& operator|(R&& r, update_output_list self) { - for (auto& item : r) { + for (auto const& item : r) { auto copy = item; DataSpecUtils::updateOutputList(self.c, std::move(copy)); } From da95b4c46aed5bdfe3c632ace88b380a56b13c46 Mon Sep 17 00:00:00 2001 From: ALICE Action Bot Date: Wed, 10 Dec 2025 09:47:18 +0000 Subject: [PATCH 15/23] Please consider the following formatting changes --- .../Core/include/Framework/DataSpecViews.h | 2 +- Framework/Core/src/AnalysisSupportHelpers.cxx | 42 +++++++++---------- Framework/Core/src/WorkflowHelpers.cxx | 6 +-- 3 files changed, 25 insertions(+), 25 deletions(-) diff --git a/Framework/Core/include/Framework/DataSpecViews.h b/Framework/Core/include/Framework/DataSpecViews.h index 3c362f822355a..b163a76a7e90c 100644 --- a/Framework/Core/include/Framework/DataSpecViews.h +++ b/Framework/Core/include/Framework/DataSpecViews.h @@ -33,7 +33,7 @@ static auto filter_not_matching(auto const& provided) static auto filter_matching(auto const& provided) { - return std::views::filter([&provided](auto const& input){ return std::any_of(provided.begin(), provided.end(), [&input](auto const& output){ return DataSpecUtils::match(input, output); }); }); + return std::views::filter([&provided](auto const& input) { return std::any_of(provided.begin(), provided.end(), [&input](auto const& output) { return DataSpecUtils::match(input, output); }); }); } } // namespace o2::framework::views // diff --git a/Framework/Core/src/AnalysisSupportHelpers.cxx b/Framework/Core/src/AnalysisSupportHelpers.cxx index db39a29186905..d8f76fa4f3a1b 100644 --- a/Framework/Core/src/AnalysisSupportHelpers.cxx +++ b/Framework/Core/src/AnalysisSupportHelpers.cxx @@ -127,8 +127,8 @@ void AnalysisSupportHelpers::addMissingOutputsToReader(std::vector c DataProcessorSpec& publisher) { requestedInputs | - views::filter_not_matching(providedOutputs) | // filter the inputs that are already provided - std::views::transform([](auto const& req){ // create outputspecs for unmatched inputs + views::filter_not_matching(providedOutputs) | // filter the inputs that are already provided + std::views::transform([](auto const& req) { // create outputspecs for unmatched inputs return DataSpecUtils::asOutputSpec(req); }) | sinks::update_output_list{publisher.outputs}; // append them to the publisher outputs @@ -141,21 +141,21 @@ void AnalysisSupportHelpers::addMissingOutputsToSpawner(std::vector { requestedSpecials | views::filter_not_matching(providedSpecials) | // filter the inputs that are already provided - std::views::transform([](auto const& req){ // create outputspecs for unmatched inputs + std::views::transform([](auto const& req) { // create outputspecs for unmatched inputs return DataSpecUtils::asOutputSpec(req); }) | - sinks::append_to(publisher.outputs); // append them to the publisher outputs + sinks::append_to(publisher.outputs); // append them to the publisher outputs std::vector additionalInputs; for (auto& input : requestedSpecials | views::filter_not_matching(providedSpecials)) { input.metadata | - std::views::filter([](auto const& param){ // filter config params that are strings starting with "input:" + std::views::filter([](auto const& param) { // filter config params that are strings starting with "input:" return (param.type == VariantType::String) && (param.name.find("input:") != std::string::npos); }) | - std::views::transform([](auto const& param){ // parse them into InputSpecs + std::views::transform([](auto const& param) { // parse them into InputSpecs return DataSpecUtils::fromMetadataString(param.defaultValue.template get()); }) | - sinks::update_input_list{additionalInputs}; // store into a temporary + sinks::update_input_list{additionalInputs}; // store into a temporary } additionalInputs | sinks::update_input_list{requestedAODs}; // update requestedAODs additionalInputs | sinks::update_input_list{publisher.inputs}; // update publisher inputs @@ -167,31 +167,31 @@ void AnalysisSupportHelpers::addMissingOutputsToBuilder(std::vector c DataProcessorSpec& publisher) { requestedSpecials | - std::views::transform([](auto const& req){ // create outputspecs for inputs + std::views::transform([](auto const& req) { // create outputspecs for inputs return DataSpecUtils::asOutputSpec(req); }) | - sinks::append_to{publisher.outputs}; // append them to the publisher outputs + sinks::append_to{publisher.outputs}; // append them to the publisher outputs std::vector additionalInputs; for (auto const& input : requestedSpecials) { input.metadata | - std::views::filter([](auto const& param){ // filter config params that are strings starting with "input:" + std::views::filter([](auto const& param) { // filter config params that are strings starting with "input:" return (param.type == VariantType::String) && (param.name.find("input:") != std::string::npos); }) | - std::views::transform([](auto const& param){ // parse them into InputSpecs + std::views::transform([](auto const& param) { // parse them into InputSpecs return DataSpecUtils::fromMetadataString(param.defaultValue.template get()); }) | - sinks::update_input_list{additionalInputs}; // store into a temporary + sinks::update_input_list{additionalInputs}; // store into a temporary } additionalInputs | sinks::update_input_list(publisher.inputs); // update publisher inputs // FIXME: until we have a single list of pairs additionalInputs | views::partial_match_filter(AODOrigins) | - sinks::update_input_list{requestedAODs}; // update requestedAODs + sinks::update_input_list{requestedAODs}; // update requestedAODs additionalInputs | views::partial_match_filter(header::DataOrigin{"DYN"}) | - sinks::update_input_list{requestedDYNs}; // update requestedDYNs + sinks::update_input_list{requestedDYNs}; // update requestedDYNs } void AnalysisSupportHelpers::addMissingOutputsToAnalysisCCDBFetcher( @@ -202,31 +202,31 @@ void AnalysisSupportHelpers::addMissingOutputsToAnalysisCCDBFetcher( DataProcessorSpec& publisher) { requestedSpecials | - std::views::transform([](auto const& req){ // create outputspecs for inputs + std::views::transform([](auto const& req) { // create outputspecs for inputs return DataSpecUtils::asOutputSpec(req); }) | - sinks::append_to{publisher.outputs}; // append them to the publisher outputs + sinks::append_to{publisher.outputs}; // append them to the publisher outputs std::vector additionalInputs; for (auto& input : requestedSpecials) { input.metadata | - std::views::filter([](auto const& param){ // filter config params that are strings starting with "input:" + std::views::filter([](auto const& param) { // filter config params that are strings starting with "input:" return (param.type == VariantType::String) && (param.name.find("input:") != std::string::npos); }) | - std::views::transform([](auto const& param){ // parse them into InputSpecs + std::views::transform([](auto const& param) { // parse them into InputSpecs return DataSpecUtils::fromMetadataString(param.defaultValue.template get()); }) | - sinks::update_input_list{additionalInputs}; // store into a temporary + sinks::update_input_list{additionalInputs}; // store into a temporary } additionalInputs | sinks::update_input_list(publisher.inputs); // update publisher inputs // FIXME: until we have a single list of pairs additionalInputs | views::partial_match_filter(AODOrigins) | - sinks::update_input_list{requestedAODs}; // update requestedAODs + sinks::update_input_list{requestedAODs}; // update requestedAODs additionalInputs | views::partial_match_filter(header::DataOrigin{"DYN"}) | - sinks::update_input_list{requestedDYNs}; // update requestedDYNs + sinks::update_input_list{requestedDYNs}; // update requestedDYNs } // ============================================================================= diff --git a/Framework/Core/src/WorkflowHelpers.cxx b/Framework/Core/src/WorkflowHelpers.cxx index 03dafdd94f7ac..8ada6192f9e0b 100644 --- a/Framework/Core/src/WorkflowHelpers.cxx +++ b/Framework/Core/src/WorkflowHelpers.cxx @@ -268,8 +268,8 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext processor.options.push_back(ConfigParamSpec{"end-value-enumeration", VariantType::Int64, -1ll, {"final value for the enumeration"}}); processor.options.push_back(ConfigParamSpec{"step-value-enumeration", VariantType::Int64, 1ll, {"step between one value and the other"}}); } - bool hasTimeframeInputs = std::any_of(processor.inputs.begin(), processor.inputs.end(), [](auto const& input){ return input.lifetime == Lifetime::Timeframe; }); - bool hasTimeframeOutputs = std::any_of(processor.outputs.begin(), processor.outputs.end(), [](auto const& output){ return output.lifetime == Lifetime::Timeframe; }); + bool hasTimeframeInputs = std::any_of(processor.inputs.begin(), processor.inputs.end(), [](auto const& input) { return input.lifetime == Lifetime::Timeframe; }); + bool hasTimeframeOutputs = std::any_of(processor.outputs.begin(), processor.outputs.end(), [](auto const& output) { return output.lifetime == Lifetime::Timeframe; }); // A timeframeSink consumes timeframes without creating new // timeframe data. @@ -280,7 +280,7 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext uint32_t hash = runtime_hash(processor.name.c_str()); bool hasMatch = false; ConcreteDataMatcher summaryMatcher = ConcreteDataMatcher{"DPL", "SUMMARY", static_cast(hash)}; - auto summaryOutput = std::find_if(processor.outputs.begin(), processor.outputs.end(), [&summaryMatcher](auto const& output){ return DataSpecUtils::match(output, summaryMatcher); }); + auto summaryOutput = std::find_if(processor.outputs.begin(), processor.outputs.end(), [&summaryMatcher](auto const& output) { return DataSpecUtils::match(output, summaryMatcher); }); if (summaryOutput != processor.outputs.end()) { O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "output enumeration", "%{public}s already there in %{public}s", DataSpecUtils::describe(*summaryOutput).c_str(), processor.name.c_str()); From e88166764c5765b15a7f96e80442596de2b383bc Mon Sep 17 00:00:00 2001 From: Anton Alkin Date: Wed, 10 Dec 2025 11:31:45 +0100 Subject: [PATCH 16/23] extract filters/transformations into helpers; merge identical functions --- .../Framework/AnalysisSupportHelpers.h | 5 -- .../Core/include/Framework/DataSpecViews.h | 19 ++++++ Framework/Core/src/AnalysisSupportHelpers.cxx | 63 +++---------------- Framework/Core/src/ArrowSupport.cxx | 2 +- Framework/Core/src/WorkflowHelpers.cxx | 2 +- 5 files changed, 28 insertions(+), 63 deletions(-) diff --git a/Framework/Core/include/Framework/AnalysisSupportHelpers.h b/Framework/Core/include/Framework/AnalysisSupportHelpers.h index cc4d45a46c8bc..c0eeb3bd9697d 100644 --- a/Framework/Core/include/Framework/AnalysisSupportHelpers.h +++ b/Framework/Core/include/Framework/AnalysisSupportHelpers.h @@ -39,11 +39,6 @@ struct AnalysisSupportHelpers { std::vector const& requestedSpecials, std::vector& requestedAODs, DataProcessorSpec& publisher); - static void addMissingOutputsToAnalysisCCDBFetcher(std::vector const& providedSpecials, - std::vector const& requestedSpecials, - std::vector& requestedAODs, - std::vector& requestedDYNs, - DataProcessorSpec& publisher); static void addMissingOutputsToBuilder(std::vector const& requestedSpecials, std::vector& requestedAODs, std::vector& requestedDYNs, diff --git a/Framework/Core/include/Framework/DataSpecViews.h b/Framework/Core/include/Framework/DataSpecViews.h index b163a76a7e90c..cc7d111fff8db 100644 --- a/Framework/Core/include/Framework/DataSpecViews.h +++ b/Framework/Core/include/Framework/DataSpecViews.h @@ -35,6 +35,25 @@ static auto filter_matching(auto const& provided) { return std::views::filter([&provided](auto const& input) { return std::any_of(provided.begin(), provided.end(), [&input](auto const& output) { return DataSpecUtils::match(input, output); }); }); } + +static auto filter_string_params_with(std::string match) +{ + return std::views::filter([match](auto const& param) { + return (param.type == VariantType::String) && (param.name.find(match) != std::string::npos); + }); +} + +static auto input_to_output_specs() +{ + return std::views::transform([](auto const& input){ return DataSpecUtils::asOutputSpec(input); }); +} + +static auto params_to_input_specs() +{ + return std::views::transform([](auto const& param) { + return DataSpecUtils::fromMetadataString(param.defaultValue.template get()); + }); +} } // namespace o2::framework::views // namespace o2::framework::sinks diff --git a/Framework/Core/src/AnalysisSupportHelpers.cxx b/Framework/Core/src/AnalysisSupportHelpers.cxx index d8f76fa4f3a1b..437bdaf8043b2 100644 --- a/Framework/Core/src/AnalysisSupportHelpers.cxx +++ b/Framework/Core/src/AnalysisSupportHelpers.cxx @@ -128,9 +128,7 @@ void AnalysisSupportHelpers::addMissingOutputsToReader(std::vector c { requestedInputs | views::filter_not_matching(providedOutputs) | // filter the inputs that are already provided - std::views::transform([](auto const& req) { // create outputspecs for unmatched inputs - return DataSpecUtils::asOutputSpec(req); - }) | + views::input_to_output_specs() | sinks::update_output_list{publisher.outputs}; // append them to the publisher outputs } @@ -141,20 +139,14 @@ void AnalysisSupportHelpers::addMissingOutputsToSpawner(std::vector { requestedSpecials | views::filter_not_matching(providedSpecials) | // filter the inputs that are already provided - std::views::transform([](auto const& req) { // create outputspecs for unmatched inputs - return DataSpecUtils::asOutputSpec(req); - }) | + views::input_to_output_specs() | sinks::append_to(publisher.outputs); // append them to the publisher outputs std::vector additionalInputs; for (auto& input : requestedSpecials | views::filter_not_matching(providedSpecials)) { input.metadata | - std::views::filter([](auto const& param) { // filter config params that are strings starting with "input:" - return (param.type == VariantType::String) && (param.name.find("input:") != std::string::npos); - }) | - std::views::transform([](auto const& param) { // parse them into InputSpecs - return DataSpecUtils::fromMetadataString(param.defaultValue.template get()); - }) | + views::filter_string_params_with("input:") | + views::params_to_input_specs() | sinks::update_input_list{additionalInputs}; // store into a temporary } additionalInputs | sinks::update_input_list{requestedAODs}; // update requestedAODs @@ -167,55 +159,14 @@ void AnalysisSupportHelpers::addMissingOutputsToBuilder(std::vector c DataProcessorSpec& publisher) { requestedSpecials | - std::views::transform([](auto const& req) { // create outputspecs for inputs - return DataSpecUtils::asOutputSpec(req); - }) | + views::input_to_output_specs() | sinks::append_to{publisher.outputs}; // append them to the publisher outputs std::vector additionalInputs; for (auto const& input : requestedSpecials) { input.metadata | - std::views::filter([](auto const& param) { // filter config params that are strings starting with "input:" - return (param.type == VariantType::String) && (param.name.find("input:") != std::string::npos); - }) | - std::views::transform([](auto const& param) { // parse them into InputSpecs - return DataSpecUtils::fromMetadataString(param.defaultValue.template get()); - }) | - sinks::update_input_list{additionalInputs}; // store into a temporary - } - - additionalInputs | sinks::update_input_list(publisher.inputs); // update publisher inputs - // FIXME: until we have a single list of pairs - additionalInputs | - views::partial_match_filter(AODOrigins) | - sinks::update_input_list{requestedAODs}; // update requestedAODs - additionalInputs | - views::partial_match_filter(header::DataOrigin{"DYN"}) | - sinks::update_input_list{requestedDYNs}; // update requestedDYNs -} - -void AnalysisSupportHelpers::addMissingOutputsToAnalysisCCDBFetcher( - std::vector const& providedSpecials, - std::vector const& requestedSpecials, - std::vector& requestedAODs, - std::vector& requestedDYNs, - DataProcessorSpec& publisher) -{ - requestedSpecials | - std::views::transform([](auto const& req) { // create outputspecs for inputs - return DataSpecUtils::asOutputSpec(req); - }) | - sinks::append_to{publisher.outputs}; // append them to the publisher outputs - - std::vector additionalInputs; - for (auto& input : requestedSpecials) { - input.metadata | - std::views::filter([](auto const& param) { // filter config params that are strings starting with "input:" - return (param.type == VariantType::String) && (param.name.find("input:") != std::string::npos); - }) | - std::views::transform([](auto const& param) { // parse them into InputSpecs - return DataSpecUtils::fromMetadataString(param.defaultValue.template get()); - }) | + views::filter_string_params_with("input:") | + views::params_to_input_specs() | sinks::update_input_list{additionalInputs}; // store into a temporary } diff --git a/Framework/Core/src/ArrowSupport.cxx b/Framework/Core/src/ArrowSupport.cxx index c63a4c702157c..26594252e888b 100644 --- a/Framework/Core/src/ArrowSupport.cxx +++ b/Framework/Core/src/ArrowSupport.cxx @@ -658,7 +658,7 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() // FIXME: it should be made more generic, so it does not need replacement... // FIXME how can I make the lookup depend on DYN tables as well?? analysisCCDB->algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkCCDBSupport", "AnalysisCCDBFetcherPlugin", ctx); - AnalysisSupportHelpers::addMissingOutputsToAnalysisCCDBFetcher({}, ac.analysisCCDBInputs, ac.requestedAODs, ac.requestedDYNs, *analysisCCDB); + AnalysisSupportHelpers::addMissingOutputsToBuilder(ac.analysisCCDBInputs, ac.requestedAODs, ac.requestedDYNs, *analysisCCDB); } if (writer != workflow.end()) { diff --git a/Framework/Core/src/WorkflowHelpers.cxx b/Framework/Core/src/WorkflowHelpers.cxx index 8ada6192f9e0b..3b0e3330578e3 100644 --- a/Framework/Core/src/WorkflowHelpers.cxx +++ b/Framework/Core/src/WorkflowHelpers.cxx @@ -397,7 +397,7 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext ac.requestedTIMs | views::filter_not_matching(ac.providedTIMs) | sinks::append_to{ac.analysisCCDBInputs}; DeploymentMode deploymentMode = DefaultsHelpers::deploymentMode(); if (deploymentMode != DeploymentMode::OnlineDDS && deploymentMode != DeploymentMode::OnlineECS) { - AnalysisSupportHelpers::addMissingOutputsToAnalysisCCDBFetcher({}, ac.analysisCCDBInputs, ac.requestedAODs, ac.requestedTIMs, analysisCCDBBackend); + AnalysisSupportHelpers::addMissingOutputsToBuilder(ac.analysisCCDBInputs, ac.requestedAODs, ac.requestedTIMs, analysisCCDBBackend); } ac.requestedDYNs | views::filter_not_matching(ac.providedDYNs) | sinks::append_to{ac.spawnerInputs}; From 646c8e2e4eab715496ec39389b7270e1fa723210 Mon Sep 17 00:00:00 2001 From: ALICE Action Bot Date: Wed, 10 Dec 2025 10:32:25 +0000 Subject: [PATCH 17/23] Please consider the following formatting changes --- Framework/Core/include/Framework/DataSpecViews.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Framework/Core/include/Framework/DataSpecViews.h b/Framework/Core/include/Framework/DataSpecViews.h index cc7d111fff8db..4318d5aa86791 100644 --- a/Framework/Core/include/Framework/DataSpecViews.h +++ b/Framework/Core/include/Framework/DataSpecViews.h @@ -45,7 +45,7 @@ static auto filter_string_params_with(std::string match) static auto input_to_output_specs() { - return std::views::transform([](auto const& input){ return DataSpecUtils::asOutputSpec(input); }); + return std::views::transform([](auto const& input) { return DataSpecUtils::asOutputSpec(input); }); } static auto params_to_input_specs() From a68da09049cc6d723e366a31b66f42c4667bf96e Mon Sep 17 00:00:00 2001 From: Anton Alkin Date: Wed, 10 Dec 2025 12:14:28 +0100 Subject: [PATCH 18/23] fix --- Framework/Core/src/AnalysisSupportHelpers.cxx | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Framework/Core/src/AnalysisSupportHelpers.cxx b/Framework/Core/src/AnalysisSupportHelpers.cxx index 437bdaf8043b2..31acaa4726007 100644 --- a/Framework/Core/src/AnalysisSupportHelpers.cxx +++ b/Framework/Core/src/AnalysisSupportHelpers.cxx @@ -140,7 +140,7 @@ void AnalysisSupportHelpers::addMissingOutputsToSpawner(std::vector requestedSpecials | views::filter_not_matching(providedSpecials) | // filter the inputs that are already provided views::input_to_output_specs() | - sinks::append_to(publisher.outputs); // append them to the publisher outputs + sinks::append_to{publisher.outputs}; // append them to the publisher outputs std::vector additionalInputs; for (auto& input : requestedSpecials | views::filter_not_matching(providedSpecials)) { @@ -170,7 +170,7 @@ void AnalysisSupportHelpers::addMissingOutputsToBuilder(std::vector c sinks::update_input_list{additionalInputs}; // store into a temporary } - additionalInputs | sinks::update_input_list(publisher.inputs); // update publisher inputs + additionalInputs | sinks::update_input_list{publisher.inputs}; // update publisher inputs // FIXME: until we have a single list of pairs additionalInputs | views::partial_match_filter(AODOrigins) | From 2c494887ebc927399392986d5ed73c99e8ff5d04 Mon Sep 17 00:00:00 2001 From: Anton Alkin Date: Fri, 12 Dec 2025 23:40:07 +0100 Subject: [PATCH 19/23] potential fix for ccdb-fetcher issue --- Framework/Core/include/Framework/DataSpecViews.h | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/Framework/Core/include/Framework/DataSpecViews.h b/Framework/Core/include/Framework/DataSpecViews.h index 4318d5aa86791..162a12419594e 100644 --- a/Framework/Core/include/Framework/DataSpecViews.h +++ b/Framework/Core/include/Framework/DataSpecViews.h @@ -45,7 +45,10 @@ static auto filter_string_params_with(std::string match) static auto input_to_output_specs() { - return std::views::transform([](auto const& input) { return DataSpecUtils::asOutputSpec(input); }); + return std::views::transform([](auto const& input) { + auto concrete = DataSpecUtils::asConcreteDataMatcher(input); + return OutputSpec{concrete.origin, concrete.description, concrete.subSpec, input.lifetime, input.metadata}; + }); } static auto params_to_input_specs() From 9394914e145585c982ddfce629d31daaa70c1311 Mon Sep 17 00:00:00 2001 From: Anton Alkin Date: Sat, 13 Dec 2025 00:13:36 +0100 Subject: [PATCH 20/23] fixup! potential fix for ccdb-fetcher issue --- Framework/Core/src/AnalysisSupportHelpers.cxx | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/Framework/Core/src/AnalysisSupportHelpers.cxx b/Framework/Core/src/AnalysisSupportHelpers.cxx index 31acaa4726007..5b3a93b1b1909 100644 --- a/Framework/Core/src/AnalysisSupportHelpers.cxx +++ b/Framework/Core/src/AnalysisSupportHelpers.cxx @@ -127,9 +127,10 @@ void AnalysisSupportHelpers::addMissingOutputsToReader(std::vector c DataProcessorSpec& publisher) { requestedInputs | - views::filter_not_matching(providedOutputs) | // filter the inputs that are already provided + views::filter_not_matching(providedOutputs) | // filter the inputs that are already provided + views::filter_not_matching(publisher.outpus) | // filter the inputs that are already covered views::input_to_output_specs() | - sinks::update_output_list{publisher.outputs}; // append them to the publisher outputs + sinks::append_to{publisher.outputs}; // append them to the publisher outputs } void AnalysisSupportHelpers::addMissingOutputsToSpawner(std::vector const& providedSpecials, From ac02539f578f1bf11b1782fd6467fbec69f78d38 Mon Sep 17 00:00:00 2001 From: Anton Alkin Date: Sat, 13 Dec 2025 08:55:16 +0100 Subject: [PATCH 21/23] fixup! potential fix for ccdb-fetcher issue --- Framework/Core/src/AnalysisSupportHelpers.cxx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Framework/Core/src/AnalysisSupportHelpers.cxx b/Framework/Core/src/AnalysisSupportHelpers.cxx index 5b3a93b1b1909..08d06602c4954 100644 --- a/Framework/Core/src/AnalysisSupportHelpers.cxx +++ b/Framework/Core/src/AnalysisSupportHelpers.cxx @@ -128,7 +128,7 @@ void AnalysisSupportHelpers::addMissingOutputsToReader(std::vector c { requestedInputs | views::filter_not_matching(providedOutputs) | // filter the inputs that are already provided - views::filter_not_matching(publisher.outpus) | // filter the inputs that are already covered + views::filter_not_matching(publisher.outputs) | // filter the inputs that are already covered views::input_to_output_specs() | sinks::append_to{publisher.outputs}; // append them to the publisher outputs } From a4841945f9ffd932d3e5907b833c6a0214e3cb7d Mon Sep 17 00:00:00 2001 From: Anton Alkin Date: Sat, 13 Dec 2025 13:23:43 +0100 Subject: [PATCH 22/23] fix --- Framework/Core/src/WorkflowHelpers.cxx | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/Framework/Core/src/WorkflowHelpers.cxx b/Framework/Core/src/WorkflowHelpers.cxx index 3b0e3330578e3..37f3cf4736246 100644 --- a/Framework/Core/src/WorkflowHelpers.cxx +++ b/Framework/Core/src/WorkflowHelpers.cxx @@ -314,18 +314,12 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext timer.outputs.emplace_back(OutputSpec{concrete.origin, concrete.description, concrete.subSpec, Lifetime::Enumeration}); } break; case Lifetime::Condition: { - for (auto& option : processor.options) { - if (option.name == "condition-backend") { - hasConditionOption = true; - break; - } - } - if (hasConditionOption == false) { + requestedCCDBs.emplace_back(input); + if ((hasConditionOption == false) && std::none_of(processor.options.begin(), processor.options.end(), [](auto const& option){ return (option.name.compare("condition-backend") == 0); })) { processor.options.emplace_back(ConfigParamSpec{"condition-backend", VariantType::String, defaultConditionBackend(), {"URL for CCDB"}}); processor.options.emplace_back(ConfigParamSpec{"condition-timestamp", VariantType::Int64, 0ll, {"Force timestamp for CCDB lookup"}}); hasConditionOption = true; } - requestedCCDBs.emplace_back(input); } break; case Lifetime::OutOfBand: { auto concrete = DataSpecUtils::asConcreteDataMatcher(input); @@ -411,6 +405,9 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext AnalysisSupportHelpers::addMissingOutputsToSpawner({}, ac.spawnerInputs, ac.requestedAODs, aodSpawner); AnalysisSupportHelpers::addMissingOutputsToReader(ac.providedAODs, ac.requestedAODs, aodReader); + + std::sort(requestedCCDBs.begin(), requestedCCDBs.end(), inputSpecLessThan); + std::sort(providedCCDBs.begin(), providedCCDBs.end(), outputSpecLessThan); AnalysisSupportHelpers::addMissingOutputsToReader(providedCCDBs, requestedCCDBs, ccdbBackend); std::vector extraSpecs; From 8879b6f66f6d79956ce6b023a61a0c798c8aed0e Mon Sep 17 00:00:00 2001 From: ALICE Action Bot Date: Sat, 13 Dec 2025 12:25:03 +0000 Subject: [PATCH 23/23] Please consider the following formatting changes --- Framework/Core/src/AnalysisSupportHelpers.cxx | 4 ++-- Framework/Core/src/WorkflowHelpers.cxx | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Framework/Core/src/AnalysisSupportHelpers.cxx b/Framework/Core/src/AnalysisSupportHelpers.cxx index 08d06602c4954..e59f36c72bdab 100644 --- a/Framework/Core/src/AnalysisSupportHelpers.cxx +++ b/Framework/Core/src/AnalysisSupportHelpers.cxx @@ -127,10 +127,10 @@ void AnalysisSupportHelpers::addMissingOutputsToReader(std::vector c DataProcessorSpec& publisher) { requestedInputs | - views::filter_not_matching(providedOutputs) | // filter the inputs that are already provided + views::filter_not_matching(providedOutputs) | // filter the inputs that are already provided views::filter_not_matching(publisher.outputs) | // filter the inputs that are already covered views::input_to_output_specs() | - sinks::append_to{publisher.outputs}; // append them to the publisher outputs + sinks::append_to{publisher.outputs}; // append them to the publisher outputs } void AnalysisSupportHelpers::addMissingOutputsToSpawner(std::vector const& providedSpecials, diff --git a/Framework/Core/src/WorkflowHelpers.cxx b/Framework/Core/src/WorkflowHelpers.cxx index 37f3cf4736246..02141678fec7c 100644 --- a/Framework/Core/src/WorkflowHelpers.cxx +++ b/Framework/Core/src/WorkflowHelpers.cxx @@ -315,7 +315,7 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext } break; case Lifetime::Condition: { requestedCCDBs.emplace_back(input); - if ((hasConditionOption == false) && std::none_of(processor.options.begin(), processor.options.end(), [](auto const& option){ return (option.name.compare("condition-backend") == 0); })) { + if ((hasConditionOption == false) && std::none_of(processor.options.begin(), processor.options.end(), [](auto const& option) { return (option.name.compare("condition-backend") == 0); })) { processor.options.emplace_back(ConfigParamSpec{"condition-backend", VariantType::String, defaultConditionBackend(), {"URL for CCDB"}}); processor.options.emplace_back(ConfigParamSpec{"condition-timestamp", VariantType::Int64, 0ll, {"Force timestamp for CCDB lookup"}}); hasConditionOption = true;