Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
113d551
remove obsolete code
aalkin Dec 9, 2025
f52b13d
std::any_of instead of a loop
aalkin Dec 9, 2025
cfe60b9
std::find_if instead of a loop
aalkin Dec 9, 2025
c2798e9
use range adaptors for spawnerInputs
aalkin Dec 9, 2025
3aec9e4
update addMissingOutputsToReader with ranges
aalkin Dec 9, 2025
e97ad87
update addMissingOutputsToSpawner with ranges
aalkin Dec 9, 2025
80deb08
update addMissingOutputsToBuilder with ranges
aalkin Dec 9, 2025
95ad8da
update addMissingOutputsToAnalysisCCDBFetcher with ranges
aalkin Dec 9, 2025
a098c0c
remove unused includes
aalkin Dec 9, 2025
6c1c30f
remove unused includes
aalkin Dec 9, 2025
c875b91
use ranges to populate requestedIDXs in adjustTopology
aalkin Dec 9, 2025
adf24a1
use ranges to populate requestedDYNs and spawnerInputs in adjustTopology
aalkin Dec 9, 2025
072ccf1
use ranges to populate requestedAODs in adjustTopology
aalkin Dec 9, 2025
b9f6aac
ensure constness
aalkin Dec 9, 2025
da95b4c
Please consider the following formatting changes
alibuild Dec 10, 2025
cbc4b65
Merge pull request #117 from alibuild/alibot-cleanup-14907
aalkin Dec 10, 2025
e881667
extract filters/transformations into helpers; merge identical functions
aalkin Dec 10, 2025
646c8e2
Please consider the following formatting changes
alibuild Dec 10, 2025
7d8e3a6
Merge pull request #118 from alibuild/alibot-cleanup-14907
aalkin Dec 10, 2025
a68da09
fix
aalkin Dec 10, 2025
2c49488
potential fix for ccdb-fetcher issue
aalkin Dec 12, 2025
9394914
fixup! potential fix for ccdb-fetcher issue
aalkin Dec 12, 2025
ac02539
fixup! potential fix for ccdb-fetcher issue
aalkin Dec 13, 2025
a484194
fix
aalkin Dec 13, 2025
8879b6f
Please consider the following formatting changes
alibuild Dec 13, 2025
20415ab
Merge pull request #120 from alibuild/alibot-cleanup-14907
aalkin Dec 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions Framework/Core/include/Framework/AnalysisSupportHelpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,6 @@ struct AnalysisSupportHelpers {
std::vector<InputSpec> const& requestedSpecials,
std::vector<InputSpec>& requestedAODs,
DataProcessorSpec& publisher);
static void addMissingOutputsToAnalysisCCDBFetcher(std::vector<OutputSpec> const& providedSpecials,
std::vector<InputSpec> const& requestedSpecials,
std::vector<InputSpec>& requestedAODs,
std::vector<InputSpec>& requestedDYNs,
DataProcessorSpec& publisher);
static void addMissingOutputsToBuilder(std::vector<InputSpec> const& requestedSpecials,
std::vector<InputSpec>& requestedAODs,
std::vector<InputSpec>& requestedDYNs,
Expand Down
43 changes: 42 additions & 1 deletion Framework/Core/include/Framework/DataSpecViews.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,32 @@ static auto filter_not_matching(auto const& provided)
return std::views::filter([&provided](auto const& input) { return std::none_of(provided.begin(), provided.end(), [&input](auto const& output) { return DataSpecUtils::match(input, output); }); });
}

static auto filter_matching(auto const& provided)
{
return std::views::filter([&provided](auto const& input) { return std::any_of(provided.begin(), provided.end(), [&input](auto const& output) { return DataSpecUtils::match(input, output); }); });
}

static auto filter_string_params_with(std::string match)
{
return std::views::filter([match](auto const& param) {
return (param.type == VariantType::String) && (param.name.find(match) != std::string::npos);
});
}

static auto input_to_output_specs()
{
return std::views::transform([](auto const& input) {
auto concrete = DataSpecUtils::asConcreteDataMatcher(input);
return OutputSpec{concrete.origin, concrete.description, concrete.subSpec, input.lifetime, input.metadata};
});
}

static auto params_to_input_specs()
{
return std::views::transform([](auto const& param) {
return DataSpecUtils::fromMetadataString(param.defaultValue.template get<std::string>());
});
}
} // namespace o2::framework::views
//
namespace o2::framework::sinks
Expand All @@ -54,14 +80,29 @@ struct update_input_list {
template <std::ranges::input_range R>
friend Container& operator|(R&& r, update_input_list self)
{
for (auto& item : r) {
for (auto const& item : r) {
auto copy = item;
DataSpecUtils::updateInputList(self.c, std::move(copy));
}
return self.c;
}
};

template <class Container>
struct update_output_list {
Container& c;
// ends the pipeline, returns the container
template <std::ranges::input_range R>
friend Container& operator|(R&& r, update_output_list self)
{
for (auto const& item : r) {
auto copy = item;
DataSpecUtils::updateOutputList(self.c, std::move(copy));
}
return self.c;
}
};

} // namespace o2::framework::sinks

#endif // O2_FRAMEWORK_DATASPECVIEWS_H_
127 changes: 37 additions & 90 deletions Framework/Core/src/AnalysisSupportHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,7 @@

#include "Framework/AnalysisSupportHelpers.h"
#include "Framework/DataOutputDirector.h"
#include "Framework/OutputObjHeader.h"
#include "Framework/ControlService.h"
#include "Framework/EndOfStreamContext.h"
#include "Framework/DeviceSpec.h"
#include "Framework/DataSpecViews.h"
#include "Framework/PluginManager.h"
#include "Framework/ConfigContext.h"
#include "WorkflowHelpers.h"
Expand Down Expand Up @@ -129,109 +126,59 @@ void AnalysisSupportHelpers::addMissingOutputsToReader(std::vector<OutputSpec> c
std::vector<InputSpec> const& requestedInputs,
DataProcessorSpec& publisher)
{
auto matchingOutputFor = [](InputSpec const& requested) {
return [&requested](OutputSpec const& provided) {
return DataSpecUtils::match(requested, provided);
};
};
for (InputSpec const& requested : requestedInputs) {
auto provided = std::find_if(providedOutputs.begin(),
providedOutputs.end(),
matchingOutputFor(requested));

if (provided != providedOutputs.end()) {
continue;
}

auto inList = std::find_if(publisher.outputs.begin(),
publisher.outputs.end(),
matchingOutputFor(requested));
if (inList != publisher.outputs.end()) {
continue;
}

auto concrete = DataSpecUtils::asConcreteDataMatcher(requested);
publisher.outputs.emplace_back(concrete.origin, concrete.description, concrete.subSpec, requested.lifetime, requested.metadata);
}
requestedInputs |
views::filter_not_matching(providedOutputs) | // filter the inputs that are already provided
views::filter_not_matching(publisher.outputs) | // filter the inputs that are already covered
views::input_to_output_specs() |
sinks::append_to{publisher.outputs}; // append them to the publisher outputs
}

void AnalysisSupportHelpers::addMissingOutputsToSpawner(std::vector<OutputSpec> const& providedSpecials,
std::vector<InputSpec> const& requestedSpecials,
std::vector<InputSpec>& requestedAODs,
DataProcessorSpec& publisher)
{
for (auto& input : requestedSpecials) {
if (std::any_of(providedSpecials.begin(), providedSpecials.end(), [&input](auto const& x) {
return DataSpecUtils::match(input, x);
})) {
continue;
}
auto concrete = DataSpecUtils::asConcreteDataMatcher(input);
publisher.outputs.emplace_back(concrete.origin, concrete.description, concrete.subSpec);
for (auto& i : input.metadata) {
if ((i.type == VariantType::String) && (i.name.find("input:") != std::string::npos)) {
auto spec = DataSpecUtils::fromMetadataString(i.defaultValue.get<std::string>());
auto j = std::find(publisher.inputs.begin(), publisher.inputs.end(), spec);
if (j == publisher.inputs.end()) {
publisher.inputs.push_back(spec);
}
DataSpecUtils::updateInputList(requestedAODs, std::move(spec));
}
}
requestedSpecials |
views::filter_not_matching(providedSpecials) | // filter the inputs that are already provided
views::input_to_output_specs() |
sinks::append_to{publisher.outputs}; // append them to the publisher outputs

std::vector<InputSpec> additionalInputs;
for (auto& input : requestedSpecials | views::filter_not_matching(providedSpecials)) {
input.metadata |
views::filter_string_params_with("input:") |
views::params_to_input_specs() |
sinks::update_input_list{additionalInputs}; // store into a temporary
}
additionalInputs | sinks::update_input_list{requestedAODs}; // update requestedAODs
additionalInputs | sinks::update_input_list{publisher.inputs}; // update publisher inputs
}

void AnalysisSupportHelpers::addMissingOutputsToBuilder(std::vector<InputSpec> const& requestedSpecials,
std::vector<InputSpec>& requestedAODs,
std::vector<InputSpec>& requestedDYNs,
DataProcessorSpec& publisher)
{
for (auto& input : requestedSpecials) {
auto concrete = DataSpecUtils::asConcreteDataMatcher(input);
publisher.outputs.emplace_back(concrete.origin, concrete.description, concrete.subSpec);
for (auto& i : input.metadata) {
if ((i.type == VariantType::String) && (i.name.find("input:") != std::string::npos)) {
auto spec = DataSpecUtils::fromMetadataString(i.defaultValue.get<std::string>());
auto j = std::find_if(publisher.inputs.begin(), publisher.inputs.end(), [&](auto x) { return x.binding == spec.binding; });
if (j == publisher.inputs.end()) {
publisher.inputs.push_back(spec);
}
if (DataSpecUtils::partialMatch(spec, AODOrigins)) {
DataSpecUtils::updateInputList(requestedAODs, std::move(spec));
} else if (DataSpecUtils::partialMatch(spec, header::DataOrigin{"DYN"})) {
DataSpecUtils::updateInputList(requestedDYNs, std::move(spec));
}
}
}
requestedSpecials |
views::input_to_output_specs() |
sinks::append_to{publisher.outputs}; // append them to the publisher outputs

std::vector<InputSpec> additionalInputs;
for (auto const& input : requestedSpecials) {
input.metadata |
views::filter_string_params_with("input:") |
views::params_to_input_specs() |
sinks::update_input_list{additionalInputs}; // store into a temporary
}
}

void AnalysisSupportHelpers::addMissingOutputsToAnalysisCCDBFetcher(
std::vector<OutputSpec> const& providedSpecials,
std::vector<InputSpec> const& requestedSpecials,
std::vector<InputSpec>& requestedAODs,
std::vector<InputSpec>& requestedDYNs,
DataProcessorSpec& publisher)
{
for (auto& input : requestedSpecials) {
auto concrete = DataSpecUtils::asConcreteDataMatcher(input);
publisher.outputs.emplace_back(concrete.origin, concrete.description, concrete.subSpec);
// FIXME: good enough for now...
for (auto& i : input.metadata) {
if ((i.type == VariantType::String) && (i.name.find("input:") != std::string::npos)) {
auto spec = DataSpecUtils::fromMetadataString(i.defaultValue.get<std::string>());
auto j = std::find_if(publisher.inputs.begin(), publisher.inputs.end(), [&](auto x) { return x.binding == spec.binding; });
if (j == publisher.inputs.end()) {
publisher.inputs.push_back(spec);
}
if (DataSpecUtils::partialMatch(spec, AODOrigins)) {
DataSpecUtils::updateInputList(requestedAODs, std::move(spec));
} else if (DataSpecUtils::partialMatch(spec, header::DataOrigin{"DYN"})) {
DataSpecUtils::updateInputList(requestedDYNs, std::move(spec));
}
}
}
}
additionalInputs | sinks::update_input_list{publisher.inputs}; // update publisher inputs
// FIXME: until we have a single list of pairs
additionalInputs |
views::partial_match_filter(AODOrigins) |
sinks::update_input_list{requestedAODs}; // update requestedAODs
additionalInputs |
views::partial_match_filter(header::DataOrigin{"DYN"}) |
sinks::update_input_list{requestedDYNs}; // update requestedDYNs
}

// =============================================================================
Expand Down
60 changes: 19 additions & 41 deletions Framework/Core/src/ArrowSupport.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -595,23 +595,16 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
ac.providedTIMs.clear();
ac.requestedTIMs.clear();


auto inputSpecLessThan = [](InputSpec const& lhs, InputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); };
auto outputSpecLessThan = [](OutputSpec const& lhs, OutputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); };

if (builder != workflow.end()) {
// collect currently requested IDXs
ac.requestedIDXs.clear();
for (auto& d : workflow) {
if (d.name == builder->name) {
continue;
}
for (auto& i : d.inputs) {
if (DataSpecUtils::partialMatch(i, header::DataOrigin{"IDX"})) {
auto copy = i;
DataSpecUtils::updateInputList(ac.requestedIDXs, std::move(copy));
}
}
for (auto& d : workflow | views::exclude_by_name(builder->name)) {
d.inputs |
views::partial_match_filter(header::DataOrigin{"IDX"}) |
sinks::update_input_list{ac.requestedIDXs};
}
// recreate inputs and outputs
builder->inputs.clear();
Expand All @@ -624,37 +617,27 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()

if (spawner != workflow.end()) {
// collect currently requested DYNs
for (auto& d : workflow) {
if (d.name == spawner->name) {
continue;
}
for (auto const& i : d.inputs) {
if (DataSpecUtils::partialMatch(i, header::DataOrigin{"DYN"})) {
auto copy = i;
DataSpecUtils::updateInputList(ac.requestedDYNs, std::move(copy));
}
}
for (auto const& o : d.outputs) {
if (DataSpecUtils::partialMatch(o, header::DataOrigin{"DYN"})) {
ac.providedDYNs.emplace_back(o);
}
}
for (auto& d : workflow | views::exclude_by_name(spawner->name)) {
d.inputs |
views::partial_match_filter(header::DataOrigin{"DYN"}) |
sinks::update_input_list{ac.requestedDYNs};
d.outputs |
views::partial_match_filter(header::DataOrigin{"DYN"}) |
sinks::append_to{ac.providedDYNs};
}
std::sort(ac.requestedDYNs.begin(), ac.requestedDYNs.end(), inputSpecLessThan);
std::sort(ac.providedDYNs.begin(), ac.providedDYNs.end(), outputSpecLessThan);
ac.spawnerInputs.clear();
for (auto& input : ac.requestedDYNs) {
if (std::none_of(ac.providedDYNs.begin(), ac.providedDYNs.end(), [&input](auto const& x) { return DataSpecUtils::match(input, x); })) {
ac.spawnerInputs.emplace_back(input);
}
}
ac.requestedDYNs |
views::filter_not_matching(ac.providedDYNs) |
sinks::append_to{ac.spawnerInputs};
// recreate inputs and outputs
spawner->outputs.clear();
spawner->inputs.clear();
AnalysisSupportHelpers::addMissingOutputsToSpawner({}, ac.spawnerInputs, ac.requestedAODs, *spawner);
// replace AlgorithmSpec
// FIXME: it should be made more generic, so it does not need replacement...
spawner->algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkOnDemandTablesSupport", "ExtendedTableSpawner", ctx);
AnalysisSupportHelpers::addMissingOutputsToSpawner({}, ac.spawnerInputs, ac.requestedAODs, *spawner);
}

if (analysisCCDB != workflow.end()) {
Expand All @@ -675,7 +658,7 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
// FIXME: it should be made more generic, so it does not need replacement...
// FIXME how can I make the lookup depend on DYN tables as well??
analysisCCDB->algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkCCDBSupport", "AnalysisCCDBFetcherPlugin", ctx);
AnalysisSupportHelpers::addMissingOutputsToAnalysisCCDBFetcher({}, ac.analysisCCDBInputs, ac.requestedAODs, ac.requestedDYNs, *analysisCCDB);
AnalysisSupportHelpers::addMissingOutputsToBuilder(ac.analysisCCDBInputs, ac.requestedAODs, ac.requestedDYNs, *analysisCCDB);
}

if (writer != workflow.end()) {
Expand All @@ -686,12 +669,9 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
// If reader and/or builder were adjusted, remove unneeded outputs
// update currently requested AODs
for (auto& d : workflow) {
for (auto const& i : d.inputs) {
if (DataSpecUtils::partialMatch(i, AODOrigins)) {
auto copy = i;
DataSpecUtils::updateInputList(ac.requestedAODs, std::move(copy));
}
}
d.inputs |
views::partial_match_filter(AODOrigins) |
sinks::update_input_list{ac.requestedAODs};
}

// remove unmatched outputs
Expand All @@ -705,8 +685,6 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
}
}



// replace writer as some outputs may have become dangling and some are now consumed
auto [outputsInputs, isDangling] = WorkflowHelpers::analyzeOutputs(workflow);

Expand Down
Loading