From 9ffb6812d240d351bc6f9c5b6e8e8b0f60e71110 Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Wed, 16 Jul 2025 15:10:40 +0200 Subject: [PATCH] DPL: move topological sort in a separate file Simplifies testing. --- Framework/Core/CMakeLists.txt | 2 + .../include/Framework/TopologyPolicyHelpers.h | 23 +++++ Framework/Core/src/TopologyPolicyHelpers.cxx | 92 +++++++++++++++++++ Framework/Core/src/runDataProcessing.cxx | 70 +------------- Framework/Core/test/test_TopologyPolicies.cxx | 59 ++++++++++++ 5 files changed, 178 insertions(+), 68 deletions(-) create mode 100644 Framework/Core/include/Framework/TopologyPolicyHelpers.h create mode 100644 Framework/Core/src/TopologyPolicyHelpers.cxx create mode 100644 Framework/Core/test/test_TopologyPolicies.cxx diff --git a/Framework/Core/CMakeLists.txt b/Framework/Core/CMakeLists.txt index 17320348d9272..d69194c83285e 100644 --- a/Framework/Core/CMakeLists.txt +++ b/Framework/Core/CMakeLists.txt @@ -133,6 +133,7 @@ o2_add_library(Framework src/TableConsumer.cxx src/TableTreeHelpers.cxx src/TopologyPolicy.cxx + src/TopologyPolicyHelpers.cxx src/TextDriverClient.cxx src/TimesliceIndex.cxx src/TimingHelpers.cxx @@ -248,6 +249,7 @@ add_executable(o2-test-framework-core test/test_TimeParallelPipelining.cxx test/test_TimesliceIndex.cxx test/test_TypeTraits.cxx + test/test_TopologyPolicies.cxx test/test_Variants.cxx test/test_WorkflowHelpers.cxx test/test_WorkflowSerialization.cxx diff --git a/Framework/Core/include/Framework/TopologyPolicyHelpers.h b/Framework/Core/include/Framework/TopologyPolicyHelpers.h new file mode 100644 index 0000000000000..71d40755f2d50 --- /dev/null +++ b/Framework/Core/include/Framework/TopologyPolicyHelpers.h @@ -0,0 +1,23 @@ +// Copyright 2019-2025 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +#ifndef O2_FRAMEWORK_TOPOLOGYPOLICYHELPERS_H_ +#define O2_FRAMEWORK_TOPOLOGYPOLICYHELPERS_H_ +#include "Framework/WorkflowSpec.h" +#include + +namespace o2::framework +{ +struct TopologyPolicyHelpers { + static auto buildEdges(WorkflowSpec& physicalWorkflow) -> std::vector>; +}; +} // namespace o2::framework +#endif // O2_FRAMEWORK_TOPOLOGYPOLICYHELPERS_H_ diff --git a/Framework/Core/src/TopologyPolicyHelpers.cxx b/Framework/Core/src/TopologyPolicyHelpers.cxx new file mode 100644 index 0000000000000..31423bf27e0ff --- /dev/null +++ b/Framework/Core/src/TopologyPolicyHelpers.cxx @@ -0,0 +1,92 @@ +// Copyright 2019-2025 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +#include "Framework/TopologyPolicyHelpers.h" +#include "Framework/TopologyPolicy.h" + +namespace o2::framework +{ +namespace +{ +void describeDataProcessorSpec(std::ostream& stream, DataProcessorSpec const& spec) +{ + stream << spec.name; + if (!spec.labels.empty()) { + stream << "("; + bool first = false; + for (auto& label : spec.labels) { + stream << (first ? "" : ",") << label.value; + first = true; + } + stream << ")"; + } +} +} // namespace + +auto TopologyPolicyHelpers::buildEdges(WorkflowSpec& physicalWorkflow) -> std::vector> +{ + std::vector topologyPolicies = TopologyPolicy::createDefaultPolicies(); + std::vector dependencyCheckers; + dependencyCheckers.reserve(physicalWorkflow.size()); + + for (auto& spec : physicalWorkflow) { + for (auto& policy : topologyPolicies) { + if (policy.matcher(spec)) { + dependencyCheckers.push_back(policy.checkDependency); + break; + } + } + } + assert(dependencyCheckers.size() == physicalWorkflow.size()); + // check if DataProcessorSpec at i depends on j + auto checkDependencies = [&workflow = physicalWorkflow, + &dependencyCheckers](int i, int j) { + TopologyPolicy::DependencyChecker& checker = dependencyCheckers[i]; + return checker(workflow[i], workflow[j]); + }; + std::vector> edges; + for (size_t i = 0; i < physicalWorkflow.size() - 1; ++i) { + for (size_t j = i; j < physicalWorkflow.size(); ++j) { + if (i == j && checkDependencies(i, j)) { + throw std::runtime_error(physicalWorkflow[i].name + " depends on itself"); + } + bool both = false; + if (checkDependencies(i, j)) { + edges.emplace_back(j, i); + both = true; + } + if (checkDependencies(j, i)) { + edges.emplace_back(i, j); + if (both) { + std::ostringstream str; + describeDataProcessorSpec(str, physicalWorkflow[i]); + str << " has circular dependency with "; + describeDataProcessorSpec(str, physicalWorkflow[j]); + str << ":\n"; + for (auto x : {i, j}) { + str << physicalWorkflow[x].name << ":\n"; + str << "inputs:\n"; + for (auto& input : physicalWorkflow[x].inputs) { + str << "- " << input << " " << (int)input.lifetime << "\n"; + } + str << "outputs:\n"; + for (auto& output : physicalWorkflow[x].outputs) { + str << "- " << output << " " << (int)output.lifetime << "\n"; + } + } + throw std::runtime_error(str.str()); + } + } + } + } + return edges; +}; +} // namespace o2::framework diff --git a/Framework/Core/src/runDataProcessing.cxx b/Framework/Core/src/runDataProcessing.cxx index d691041a366cf..59bacc67fef31 100644 --- a/Framework/Core/src/runDataProcessing.cxx +++ b/Framework/Core/src/runDataProcessing.cxx @@ -9,6 +9,7 @@ // granted to it by virtue of its status as an Intergovernmental Organization // or submit itself to any jurisdiction. #include +#include "Framework/TopologyPolicyHelpers.h" #define BOOST_BIND_GLOBAL_PLACEHOLDERS #include #include "Framework/BoostOptionsRetriever.h" @@ -2835,20 +2836,6 @@ std::unique_ptr createRegistry() return std::make_unique(); } -void describeDataProcessorSpec(std::ostream& stream, DataProcessorSpec const& spec) -{ - stream << spec.name; - if (!spec.labels.empty()) { - stream << "("; - bool first = false; - for (auto& label : spec.labels) { - stream << (first ? "" : ",") << label.value; - first = true; - } - stream << ")"; - } -} - // This is a toy executor for the workflow spec // What it needs to do is: // @@ -3034,65 +3021,12 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow, [](OutputSpec const& a, OutputSpec const& b) { return DataSpecUtils::describe(a) < DataSpecUtils::describe(b); }); } - std::vector topologyPolicies = TopologyPolicy::createDefaultPolicies(); - std::vector dependencyCheckers; - dependencyCheckers.reserve(physicalWorkflow.size()); - - for (auto& spec : physicalWorkflow) { - for (auto& policy : topologyPolicies) { - if (policy.matcher(spec)) { - dependencyCheckers.push_back(policy.checkDependency); - break; - } - } - } - assert(dependencyCheckers.size() == physicalWorkflow.size()); - // check if DataProcessorSpec at i depends on j - auto checkDependencies = [&workflow = physicalWorkflow, - &dependencyCheckers](int i, int j) { - TopologyPolicy::DependencyChecker& checker = dependencyCheckers[i]; - return checker(workflow[i], workflow[j]); - }; - // Create a list of all the edges, so that we can do a topological sort // before we create the graph. std::vector> edges; if (physicalWorkflow.size() > 1) { - for (size_t i = 0; i < physicalWorkflow.size() - 1; ++i) { - for (size_t j = i; j < physicalWorkflow.size(); ++j) { - if (i == j && checkDependencies(i, j)) { - throw std::runtime_error(physicalWorkflow[i].name + " depends on itself"); - } - bool both = false; - if (checkDependencies(i, j)) { - edges.emplace_back(j, i); - both = true; - } - if (checkDependencies(j, i)) { - edges.emplace_back(i, j); - if (both) { - std::ostringstream str; - describeDataProcessorSpec(str, physicalWorkflow[i]); - str << " has circular dependency with "; - describeDataProcessorSpec(str, physicalWorkflow[j]); - str << ":\n"; - for (auto x : {i, j}) { - str << physicalWorkflow[x].name << ":\n"; - str << "inputs:\n"; - for (auto& input : physicalWorkflow[x].inputs) { - str << "- " << input << " " << (int)input.lifetime << "\n"; - } - str << "outputs:\n"; - for (auto& output : physicalWorkflow[x].outputs) { - str << "- " << output << " " << (int)output.lifetime << "\n"; - } - } - throw std::runtime_error(str.str()); - } - } - } - } + edges = TopologyPolicyHelpers::buildEdges(physicalWorkflow); auto topoInfos = WorkflowHelpers::topologicalSort(physicalWorkflow.size(), &edges[0].first, &edges[0].second, sizeof(std::pair), edges.size()); if (topoInfos.size() != physicalWorkflow.size()) { diff --git a/Framework/Core/test/test_TopologyPolicies.cxx b/Framework/Core/test/test_TopologyPolicies.cxx new file mode 100644 index 0000000000000..82344c292ab15 --- /dev/null +++ b/Framework/Core/test/test_TopologyPolicies.cxx @@ -0,0 +1,59 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +#include "Mocking.h" +#include +#include "Framework/ChannelSpecHelpers.h" +#include "../src/DeviceSpecHelpers.h" +#include "../src/GraphvizHelpers.h" +#include "../src/WorkflowHelpers.h" +#include "Framework/DeviceSpec.h" +#include "Framework/WorkflowSpec.h" +#include "Framework/DataSpecUtils.h" +#include "../src/SimpleResourceManager.h" +#include "../src/ComputingResourceHelpers.h" +#include "test_HelperMacros.h" +#include "Framework/TopologyPolicyHelpers.h" + +using namespace o2::framework; + +// This is how you can define your processing in a declarative way +WorkflowSpec defineDataProcessingWithSporadic() +{ + return { + {.name = "input-proxy", .outputs = {OutputSpec{"QEMC", "CELL", 1}, OutputSpec{"CTF", "DONE", 0}}}, + {.name = "EMC-Cell-proxy", .inputs = Inputs{InputSpec{"a", "QEMC", "CELL", 1, Lifetime::Sporadic}}}, + {.name = "calib-output-proxy-barrel-tf", .inputs = {InputSpec{"a", "CTF", "DONE", 0}}}}; +} + +TEST_CASE("TestBrokenSporadic") +{ + auto workflow = defineDataProcessingWithSporadic(); + auto configContext = makeEmptyConfigContext(); + auto channelPolicies = ChannelConfigurationPolicy::createDefaultPolicies(*configContext); + auto completionPolicies = CompletionPolicy::createDefaultPolicies(); + auto callbacksPolicies = CallbacksPolicy::createDefaultPolicies(); + REQUIRE(channelPolicies.empty() == false); + REQUIRE(completionPolicies.empty() == false); + std::vector devices; + + std::vector resources{ComputingResourceHelpers::getLocalhostResource()}; + REQUIRE(resources.size() == 1); + REQUIRE(resources[0].startPort == 22000); + SimpleResourceManager rm(resources); + auto offers = rm.getAvailableOffers(); + REQUIRE(offers.size() == 1); + REQUIRE(offers[0].startPort == 22000); + REQUIRE(offers[0].rangeSize == 5000); + + DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id", *configContext); + TopologyPolicyHelpers::buildEdges(workflow); +}