From eef5e2c4bf281db6298a0665776005cf710e7035 Mon Sep 17 00:00:00 2001 From: Ernst Hellbar Date: Thu, 13 Feb 2025 16:59:08 +0100 Subject: [PATCH 1/3] DPL: trigger runtime error in case of duplicate processor names --- Framework/Core/src/runDataProcessing.cxx | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/Framework/Core/src/runDataProcessing.cxx b/Framework/Core/src/runDataProcessing.cxx index 66fc2c7b2c3df..269f5099bcf43 100644 --- a/Framework/Core/src/runDataProcessing.cxx +++ b/Framework/Core/src/runDataProcessing.cxx @@ -2959,7 +2959,11 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow, for (auto& dp : importedWorkflow) { auto found = std::find_if(physicalWorkflow.begin(), physicalWorkflow.end(), [&name = dp.name](DataProcessorSpec const& spec) { return spec.name == name; }); - if (found == physicalWorkflow.end()) { + // also checking the workflow for processors with the same name but from a different executable, + // adding them to the workflow to trigger the check for duplicate names in the MATERIALISE_WORKFLOW state + auto duplicate_name = std::find_if(dataProcessorInfos.begin(), dataProcessorInfos.end(), + [&name = dp.name, &exec = currentWorkflow.executable](DataProcessorInfo const& info) { return (info.name == name && info.executable != exec); }); + if (found == physicalWorkflow.end() || duplicate_name != dataProcessorInfos.end()) { physicalWorkflow.push_back(dp); rankIndex.insert(std::make_pair(dp.name, workflowHashB)); } From 841b74a31674ca3374537c36a7f45d7216e58a95 Mon Sep 17 00:00:00 2001 From: Ernst Hellbar Date: Tue, 18 Feb 2025 17:50:35 +0100 Subject: [PATCH 2/3] simplifying logic and accounting for multiple pipelines --- Framework/Core/src/runDataProcessing.cxx | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/Framework/Core/src/runDataProcessing.cxx b/Framework/Core/src/runDataProcessing.cxx index 269f5099bcf43..df55e6f59e338 100644 --- a/Framework/Core/src/runDataProcessing.cxx +++ b/Framework/Core/src/runDataProcessing.cxx @@ -2956,14 +2956,11 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow, // We remove the duplicates because for the moment child get themself twice: // once from the actual definition in the child, a second time from the // configuration they get passed by their parents. + std::regex pipe_pattern{"_t[0-9][0-9]*$"}; for (auto& dp : importedWorkflow) { - auto found = std::find_if(physicalWorkflow.begin(), physicalWorkflow.end(), - [&name = dp.name](DataProcessorSpec const& spec) { return spec.name == name; }); - // also checking the workflow for processors with the same name but from a different executable, - // adding them to the workflow to trigger the check for duplicate names in the MATERIALISE_WORKFLOW state - auto duplicate_name = std::find_if(dataProcessorInfos.begin(), dataProcessorInfos.end(), - [&name = dp.name, &exec = currentWorkflow.executable](DataProcessorInfo const& info) { return (info.name == name && info.executable != exec); }); - if (found == physicalWorkflow.end() || duplicate_name != dataProcessorInfos.end()) { + auto found = std::find_if(dataProcessorInfos.begin(), dataProcessorInfos.end(), + [&name = dp.name, &pipe_pattern, &exec = currentWorkflow.executable](DataProcessorInfo const& info) { return (std::regex_replace(info.name, pipe_pattern, "") == name && info.executable == exec); }); + if (found == dataProcessorInfos.end()) { physicalWorkflow.push_back(dp); rankIndex.insert(std::make_pair(dp.name, workflowHashB)); } From 914bfa1149dd8edcad2b683d2671796ce96d8e68 Mon Sep 17 00:00:00 2001 From: Ernst Hellbar Date: Wed, 19 Feb 2025 17:17:08 +0100 Subject: [PATCH 3/3] going back to original implementation with DataProcessorSpec and using its metadata to store executable --- Framework/Core/src/runDataProcessing.cxx | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/Framework/Core/src/runDataProcessing.cxx b/Framework/Core/src/runDataProcessing.cxx index df55e6f59e338..4a6e2e6accfb2 100644 --- a/Framework/Core/src/runDataProcessing.cxx +++ b/Framework/Core/src/runDataProcessing.cxx @@ -2933,8 +2933,9 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow, workflowHashA += hash_fn(dp.name); } - for (auto& dp : workflow) { + for (auto& dp : physicalWorkflow) { rankIndex.insert(std::make_pair(dp.name, workflowHashA)); + dp.metadata.emplace(dp.metadata.begin(), std::string{"executable"}, currentWorkflow.executable); } std::vector dataProcessorInfos; @@ -2956,11 +2957,10 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow, // We remove the duplicates because for the moment child get themself twice: // once from the actual definition in the child, a second time from the // configuration they get passed by their parents. - std::regex pipe_pattern{"_t[0-9][0-9]*$"}; for (auto& dp : importedWorkflow) { - auto found = std::find_if(dataProcessorInfos.begin(), dataProcessorInfos.end(), - [&name = dp.name, &pipe_pattern, &exec = currentWorkflow.executable](DataProcessorInfo const& info) { return (std::regex_replace(info.name, pipe_pattern, "") == name && info.executable == exec); }); - if (found == dataProcessorInfos.end()) { + auto found = std::find_if(physicalWorkflow.begin(), physicalWorkflow.end(), + [&name = dp.name](DataProcessorSpec const& spec) { return spec.name == name; }); + if (found == physicalWorkflow.end() || (*dp.metadata.begin()).value != currentWorkflow.executable) { physicalWorkflow.push_back(dp); rankIndex.insert(std::make_pair(dp.name, workflowHashB)); }