From 1ab987aa94f076a1bc9df655f9d429205d802bab Mon Sep 17 00:00:00 2001 From: Anton Alkin Date: Mon, 3 Nov 2025 14:31:56 +0100 Subject: [PATCH 1/9] DPL Analysis: Expressions <-> JSON * add serialization/deserialization support for expressions * add test --- Framework/Core/CMakeLists.txt | 1 + .../include/Framework/ExpressionJSONHelpers.h | 23 + .../Core/include/Framework/Expressions.h | 14 +- Framework/Core/src/ExpressionJSONHelpers.cxx | 558 ++++++++++++++++++ Framework/Core/test/test_Expressions.cxx | 22 + 5 files changed, 617 insertions(+), 1 deletion(-) create mode 100644 Framework/Core/include/Framework/ExpressionJSONHelpers.h create mode 100644 Framework/Core/src/ExpressionJSONHelpers.cxx diff --git a/Framework/Core/CMakeLists.txt b/Framework/Core/CMakeLists.txt index 11eb4bdc08a66..1aed1f776b775 100644 --- a/Framework/Core/CMakeLists.txt +++ b/Framework/Core/CMakeLists.txt @@ -142,6 +142,7 @@ o2_add_library(Framework src/Array2D.cxx src/Variant.cxx src/VariantJSONHelpers.cxx + src/ExpressionJSONHelpers.cxx src/VariantPropertyTreeHelpers.cxx src/WorkflowCustomizationHelpers.cxx src/WorkflowHelpers.cxx diff --git a/Framework/Core/include/Framework/ExpressionJSONHelpers.h b/Framework/Core/include/Framework/ExpressionJSONHelpers.h new file mode 100644 index 0000000000000..4cc8306462004 --- /dev/null +++ b/Framework/Core/include/Framework/ExpressionJSONHelpers.h @@ -0,0 +1,23 @@ +// Copyright 2019-2025 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. +#ifndef FRAMEWORK_EXPRESSIONJSONHELPERS_H +#define FRAMEWORK_EXPRESSIONJSONHELPERS_H + +#include "Framework/Expressions.h" + +namespace o2::framework { +struct ExpressionJSONHelpers { + static std::unique_ptr read(std::istream& s); + static void write(std::ostream& o, expressions::Node* n); +}; +} + +#endif // FRAMEWORK_EXPRESSIONJSONHELPERS_H diff --git a/Framework/Core/include/Framework/Expressions.h b/Framework/Core/include/Framework/Expressions.h index 5a889e9ae26ec..735bbb890afb4 100644 --- a/Framework/Core/include/Framework/Expressions.h +++ b/Framework/Core/include/Framework/Expressions.h @@ -110,6 +110,8 @@ std::string upcastTo(atype::type f); /// An expression tree node corresponding to a literal value struct LiteralNode { + using var_t = LiteralValue::stored_type; + LiteralNode() : value{-1}, type{atype::INT32} @@ -120,7 +122,12 @@ struct LiteralNode { { } - using var_t = LiteralValue::stored_type; + LiteralNode(var_t v, atype::type t) + : value{v}, + type{t} + { + } + var_t value; atype::type type = atype::NA; }; @@ -617,6 +624,11 @@ inline Node ncfg(T defaultValue, std::string path) struct Filter { Filter() = default; + Filter(std::unique_ptr&& ptr) + { + node = std::move(ptr); + } + Filter(Node&& node_) : node{std::make_unique(std::forward(node_))} { (void)designateSubtrees(node.get()); diff --git a/Framework/Core/src/ExpressionJSONHelpers.cxx b/Framework/Core/src/ExpressionJSONHelpers.cxx new file mode 100644 index 0000000000000..3953d2f7614db --- /dev/null +++ b/Framework/Core/src/ExpressionJSONHelpers.cxx @@ -0,0 +1,558 @@ +// Copyright 2019-2025 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. +#include "Framework/ExpressionJSONHelpers.h" + +#include +#include +#include +#include +#include + +#include +#include +#include "Framework/VariantHelpers.h" + +namespace o2::framework { + +using nodes = expressions::Node::self_t; +enum struct Nodes : int { + NLITERAL = 0, + NBINDING = 1, + NOP = 2, + NNPH = 3, + NCOND = 4, + NPAR = 5 +}; + +enum struct ToWrite { + FULL, + LEFT, + RIGHT, + COND, + POP +}; + +struct Entry { + expressions::Node* ptr = nullptr; + ToWrite toWrite = ToWrite::FULL; +}; + +std::array validKeys{ + "kind", + "binding", + "index", + "arrow_type", + "value", + "hash", + "operation", + "left", + "right", + "condition"}; + +namespace +{ +struct ExpressionReader : public rapidjson::BaseReaderHandler, ExpressionReader> { + using Ch = rapidjson::UTF8<>::Ch; + using SizeType = rapidjson::SizeType; + + enum struct State { + IN_START, + IN_STOP, + IN_NODE_LITERAL, + IN_NODE_BINDING, + IN_NODE_OP, + IN_NODE_CONDITIONAL, + IN_ROOT, + IN_LEFT, + IN_RIGHT, + IN_COND, + IN_ERROR + }; + + std::stack states; + std::stack path; + std::ostringstream debug; + + std::unique_ptr rootNode = nullptr; + std::unique_ptr node = nullptr; + expressions::LiteralValue::stored_type value; + atype::type type; + Nodes kind; + std::string binding; + BasicOp operation; + uint32_t hash; + size_t index; + + std::string previousKey; + std::string currentKey; + + ExpressionReader() + { + debug << ">>> Start" << std::endl; + states.push(State::IN_START); + } + + bool Key(const Ch* str, SizeType, bool) + { + debug << "Key(" << str << ")" << std::endl; + previousKey = currentKey; + currentKey = str; + if (std::find(validKeys.begin(), validKeys.end(), currentKey) == validKeys.end()) { + states.push(State::IN_ERROR); + return false; + } + + if (states.top() == State::IN_START) { + if (currentKey.compare("kind") == 0) { + states.push(State::IN_ROOT); + return true; + } else { + states.push(State::IN_ERROR); // should start from root node + return false; + } + } + + if (states.top() == State::IN_LEFT || states.top() == State::IN_RIGHT || states.top() == State::IN_COND) { + if (currentKey.compare("kind") == 0) { + return true; + } + } + + if (states.top() == State::IN_NODE_LITERAL || states.top() == State::IN_NODE_OP || states.top() == State::IN_NODE_BINDING || states.top() == State::IN_NODE_CONDITIONAL) { + if (currentKey.compare("index") == 0) { + return true; + } + if (currentKey.compare("left") == 0) { + // this is the point where the node header is parsed and we can create it + // create a new node instance here and set a pointer to it in a parent (current stack top), based on its state + // push the new node into the stack with LEFT state + switch (states.top()) { + case State::IN_NODE_LITERAL: + node = std::make_unique(expressions::LiteralNode{value, type}); + break; + case State::IN_NODE_BINDING: + node = std::make_unique(expressions::BindingNode{hash, type}, binding); + break; + case State::IN_NODE_OP: + node = std::make_unique(expressions::OpNode{operation}, expressions::LiteralNode{-1}); + break; + case State::IN_NODE_CONDITIONAL: + node = std::make_unique(expressions::ConditionalNode{}, expressions::LiteralNode{-1}, expressions::LiteralNode{-1}, expressions::LiteralNode{true}); + break; + default: + states.push(State::IN_ERROR); + return false; + } + + if (path.empty()) { + rootNode = std::move(node); + path.emplace(rootNode.get(), ToWrite::LEFT); + } else { + auto* n = path.top().ptr; + switch (path.top().toWrite) { + case ToWrite::LEFT: + n->left = std::move(node); + path.top().toWrite = ToWrite::RIGHT; + path.emplace(n->left.get(), ToWrite::LEFT); + break; + case ToWrite::RIGHT: + n->right = std::move(node); + path.top().toWrite = ToWrite::COND; + path.emplace(n->right.get(), ToWrite::LEFT); + break; + case ToWrite::COND: + n->condition = std::move(node); + path.pop(); + path.emplace(n->condition.get(), ToWrite::LEFT); + break; + default: + states.push(State::IN_ERROR); + return false; + } + } + + states.push(State::IN_LEFT); + return true; + } + if (currentKey.compare("right") == 0) { + if (states.top() == State::IN_LEFT) { + states.pop(); + } + // move the stack state of the node to RIGHT state + path.top().toWrite = ToWrite::RIGHT; + states.push(State::IN_RIGHT); + return true; + } + if (currentKey.compare("condition") == 0) { + if (states.top() == State::IN_RIGHT) { + states.pop(); + } + // move the stack state of the node to COND state + path.top().toWrite = ToWrite::COND; + states.push(State::IN_COND); + return true; + } + } + + if (states.top() == State::IN_NODE_LITERAL) { + if (currentKey.compare("arrow_type") == 0 || currentKey.compare("value") == 0) { + return true; + } + } + + if (states.top() == State::IN_NODE_BINDING) { + if (currentKey.compare("binding") == 0 || currentKey.compare("hash") == 0 || currentKey.compare("arrow_type") == 0) { + return true; + } + } + + if (states.top() == State::IN_NODE_OP) { + if (currentKey.compare("operation") == 0) { + return true; + } + } + + debug << ">>> Unrecognized" << std::endl; + states.push(State::IN_ERROR); + return false; + } + + bool StartObject() + { + debug << "StartObject()" << std::endl; + if (states.top() == State::IN_LEFT || states.top() == State::IN_RIGHT || states.top() == State::IN_COND) { // ready to start a new node + return true; + } + if (states.top() == State::IN_START) { + return true; + } + states.push(State::IN_ERROR); + return false; + } + + bool EndObject(SizeType) + { + debug << "EndObject()" << std::endl; + if (states.top() == State::IN_NODE_LITERAL || states.top() == State::IN_NODE_OP || states.top() == State::IN_NODE_BINDING || states.top() == State::IN_NODE_CONDITIONAL) { // finalize node + // finalize the current node and pop it from the stack (the pointers should be already set + states.pop(); + if (states.top() == State::IN_LEFT || states.top() == State::IN_RIGHT || states.top() == State::IN_COND) { + states.pop(); + } + return true; + } + if (states.top() == State::IN_ROOT) { + return true; + } + states.push(State::IN_ERROR); + return false; + } + + bool Null() + { + debug << "Null()" << std::endl; + if (states.top() == State::IN_LEFT || states.top() == State::IN_RIGHT || states.top() == State::IN_COND) { + // empty node, nothing to do + // move the path state to the next + if (path.top().toWrite == ToWrite::LEFT) { + path.top().toWrite = ToWrite::RIGHT; + } else if (path.top().toWrite == ToWrite::RIGHT) { + path.top().toWrite = ToWrite::COND; + } else if (path.top().toWrite == ToWrite::COND) { + path.pop(); + } + + states.pop(); + return true; + } + states.push(State::IN_ERROR); // no other contexts allow null + return false; + } + + bool Bool(bool b) + { + debug << "Bool(" << b << ")" << std::endl; + if (states.top() == State::IN_NODE_LITERAL && currentKey.compare("value") == 0) { + value = b; + return true; + } + states.push(State::IN_ERROR); // no other contexts allow booleans + return false; + } + + bool Int(int i) + { + debug << "Int(" << i << ")" << std::endl; + if (states.top() == State::IN_NODE_LITERAL && currentKey.compare("value") == 0) { // literal + switch (type) { + case atype::INT8: + value = (int8_t)i; + break; + case atype::INT16: + value = (int16_t)i; + break; + case atype::INT32: + value = i; + break; + case atype::UINT8: + value = (uint8_t)i; + break; + case atype::UINT16: + value = (uint16_t)i; + break; + case atype::UINT32: + value = i; + break; + default: + states.push(State::IN_ERROR); + return false; + } + return true; + } + if (states.top() == State::IN_ROOT || states.top() == State::IN_LEFT || states.top() == State::IN_RIGHT || states.top() == State::IN_COND) { + if (currentKey.compare("kind") == 0) { + kind = (Nodes)i; + switch (kind) { + case Nodes::NLITERAL: + case Nodes::NNPH: + case Nodes::NPAR: { + states.push(State::IN_NODE_LITERAL); + debug << ">>> Literal node" << std::endl; + return true; + } + case Nodes::NBINDING: { + states.push(State::IN_NODE_BINDING); + debug << ">>> Binding node" << std::endl; + return true; + } + case Nodes::NOP: { + states.push(State::IN_NODE_OP); + debug << ">>> Operation node" << std::endl; + return true; + } + case Nodes::NCOND: { + states.push(State::IN_NODE_CONDITIONAL); + debug << ">>> Conditional node" << std::endl; + return true; + } + } + } + } + if (states.top() == State::IN_NODE_BINDING || states.top() == State::IN_NODE_CONDITIONAL || states.top() == State::IN_NODE_LITERAL || states.top() == State::IN_NODE_OP) { + if (currentKey.compare("index") == 0) { + index = (size_t)i; + return true; + } + } + if (states.top() == State::IN_NODE_LITERAL || states.top() == State::IN_NODE_BINDING) { + if (currentKey.compare("arrow_type") == 0) { + type = (atype::type)i; + return true; + } + } + if (states.top() == State::IN_NODE_OP && currentKey.compare("operation") == 0) { + operation = (BasicOp)i; + return true; + } + states.push(State::IN_ERROR); // no other contexts allow ints + return false; + } + + bool Uint(unsigned i) + { + debug << "Uint(" << i << ")" << std::endl; + if (states.top() == State::IN_NODE_BINDING && currentKey.compare("hash") == 0) { + hash = i; + return true; + } + debug << ">> falling back to Int" << std::endl; + return Int(i); + } + + bool Int64(int64_t i) + { + debug << "Int64(" << i << ")" << std::endl; + if (states.top() == State::IN_NODE_LITERAL && currentKey.compare("value") == 0) { + value = i; + return true; + } + states.push(State::IN_ERROR); // no other contexts allow int64s + return false; + } + + bool Uint64(uint64_t i) + { + debug << "Uint64(" << i << ")" << std::endl; + if (states.top() == State::IN_NODE_LITERAL && currentKey.compare("value") == 0) { + value = i; + return true; + } + states.push(State::IN_ERROR); // no other contexts allow uints + return false; + } + + bool Double(double d) + { + debug << "Double(" << d << ")" << std::endl; + if (states.top() == State::IN_NODE_LITERAL) { + switch (type) { + case atype::FLOAT: + value = (float)d; + break; + case atype::DOUBLE: + value = d; + break; + default: + states.push(State::IN_ERROR); + return false; + } + return true; + } + states.push(State::IN_ERROR); // no other contexts allow doubles + return false; + } + + bool String(const Ch* str, SizeType, bool) + { + debug << "String(" << str << ")" << std::endl; + if (states.top() == State::IN_NODE_BINDING && currentKey.compare("binding") == 0) { + binding = str; + return true; + } + states.push(State::IN_ERROR); // no strings are expected + return false; + } +}; +} // namespace + +std::unique_ptr o2::framework::ExpressionJSONHelpers::read(std::istream& s) +{ + rapidjson::Reader reader; + rapidjson::IStreamWrapper isw(s); + ExpressionReader ereader; + bool ok = reader.Parse(isw, ereader); + + if (!ok) { + std::stringstream error; + error << "Cannot parse serialized Expression, error: " << rapidjson::GetParseError_En(reader.GetParseErrorCode()) << " at offset: " << reader.GetErrorOffset(); + throw std::runtime_error(error.str()); + } + return std::move(ereader.rootNode); +} + +void writeNodeHeader(rapidjson::Writer& w, expressions::Node const* node) +{ + w.Key("kind"); + w.Int((int)node->self.index()); + w.Key("index"); + w.Uint64(node->index); + std::visit(overloaded{ + [&w](expressions::LiteralNode const& node) { + w.Key("arrow_type"); + w.Int(node.type); + w.Key("value"); + std::visit(overloaded{ + [&w](bool v) { w.Bool(v); }, + [&w](float v) { w.Double(v); }, + [&w](double v) { w.Double(v); }, + [&w](uint8_t v) { w.Uint(v); }, + [&w](uint16_t v) { w.Uint(v); }, + [&w](uint32_t v) { w.Uint(v); }, + [&w](uint64_t v) { w.Uint64(v); }, + [&w](int8_t v) { w.Int(v); }, + [&w](int16_t v) { w.Int(v); }, + [&w](int v) { w.Int(v); }, + [&w](int64_t v) { w.Int64(v); }}, + node.value); + }, + [&w](expressions::BindingNode const& node) { + w.Key("binding"); + w.String(node.name); + w.Key("hash"); + w.Uint(node.hash); + w.Key("arrow_type"); + w.Int(node.type); + }, + [&w](expressions::OpNode const& node) { + w.Key("operation"); + w.Int(node.op); + }, + [](expressions::ConditionalNode const&) { + }}, + node->self); +} + +void writeExpression(std::ostream& o, expressions::Node* n) +{ + rapidjson::OStreamWrapper osw(o); + rapidjson::Writer w(osw); + + std::stack path; + path.emplace(n, ToWrite::FULL); + while (!path.empty()) { + auto& top = path.top(); + + if (top.toWrite == ToWrite::FULL) { + w.StartObject(); + writeNodeHeader(w, top.ptr); + top.toWrite = ToWrite::LEFT; + continue; + } + + if (top.toWrite == ToWrite::LEFT) { + w.Key("left"); + top.toWrite = ToWrite::RIGHT; + auto* left = top.ptr->left.get(); + if (left != nullptr) { + path.emplace(left, ToWrite::FULL); + } else { + w.Null(); + } + continue; + } + + if (top.toWrite == ToWrite::RIGHT) { + w.Key("right"); + top.toWrite = ToWrite::COND; + auto* right = top.ptr->right.get(); + if (right != nullptr) { + path.emplace(right, ToWrite::FULL); + } else { + w.Null(); + } + continue; + } + + if (top.toWrite == ToWrite::COND) { + w.Key("condition"); + top.toWrite = ToWrite::POP; + auto* cond = top.ptr->condition.get(); + if (cond != nullptr) { + path.emplace(cond, ToWrite::FULL); + } else { + w.Null(); + } + continue; + } + + if (top.toWrite == ToWrite::POP) { + w.EndObject(); + path.pop(); + continue; + } + } +} +} + +void o2::framework::ExpressionJSONHelpers::write(std::ostream& o, expressions::Node* n) +{ + writeExpression(o, n); +} diff --git a/Framework/Core/test/test_Expressions.cxx b/Framework/Core/test/test_Expressions.cxx index 4c6fc51795ca8..5f801d84dd8c2 100644 --- a/Framework/Core/test/test_Expressions.cxx +++ b/Framework/Core/test/test_Expressions.cxx @@ -12,6 +12,7 @@ #include "Framework/Configurable.h" #include "Framework/ExpressionHelpers.h" #include "Framework/AnalysisDataModel.h" +#include "Framework/ExpressionJSONHelpers.h" #include #include #include @@ -391,3 +392,24 @@ TEST_CASE("TestStringExpressionsParsing") REQUIRE(tree1c->ToString() == tree2c->ToString()); } + +TEST_CASE("TestExpressionSerialization") +{ + Filter f = o2::aod::track::signed1Pt > 0.f && ifnode(nabs(o2::aod::track::eta) < 1.0f, nabs(o2::aod::track::x) > 2.0f, nabs(o2::aod::track::y) > 3.0f); + auto ops = createOperations(f); + auto schema = std::make_shared(std::vector{o2::aod::track::Eta::asArrowField(), o2::aod::track::Signed1Pt::asArrowField(), o2::aod::track::X::asArrowField(), o2::aod::track::Y::asArrowField()}); + auto tree = createExpressionTree(ops, schema); + + std::stringstream osm; + ExpressionJSONHelpers::write(osm, f.node.get()); + + std::stringstream ism; + ism.str(osm.str()); + Filter fr = ExpressionJSONHelpers::read(ism); + + auto s1 = createOperations(f); + auto s2 = createOperations(fr); + auto t1 = createExpressionTree(s1, schema); + auto t2 = createExpressionTree(s2, schema); + REQUIRE(t1->ToString() == t2->ToString()); +} From 434359672ca7957f4efa3a9346ae673daaadd832 Mon Sep 17 00:00:00 2001 From: ALICE Action Bot Date: Wed, 5 Nov 2025 09:57:09 +0000 Subject: [PATCH 2/9] Please consider the following formatting changes --- Framework/Core/include/Framework/ExpressionJSONHelpers.h | 5 +++-- Framework/Core/src/ExpressionJSONHelpers.cxx | 5 +++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/Framework/Core/include/Framework/ExpressionJSONHelpers.h b/Framework/Core/include/Framework/ExpressionJSONHelpers.h index 4cc8306462004..6b865399ecffb 100644 --- a/Framework/Core/include/Framework/ExpressionJSONHelpers.h +++ b/Framework/Core/include/Framework/ExpressionJSONHelpers.h @@ -13,11 +13,12 @@ #include "Framework/Expressions.h" -namespace o2::framework { +namespace o2::framework +{ struct ExpressionJSONHelpers { static std::unique_ptr read(std::istream& s); static void write(std::ostream& o, expressions::Node* n); }; -} +} // namespace o2::framework #endif // FRAMEWORK_EXPRESSIONJSONHELPERS_H diff --git a/Framework/Core/src/ExpressionJSONHelpers.cxx b/Framework/Core/src/ExpressionJSONHelpers.cxx index 3953d2f7614db..07713fcf395eb 100644 --- a/Framework/Core/src/ExpressionJSONHelpers.cxx +++ b/Framework/Core/src/ExpressionJSONHelpers.cxx @@ -20,7 +20,8 @@ #include #include "Framework/VariantHelpers.h" -namespace o2::framework { +namespace o2::framework +{ using nodes = expressions::Node::self_t; enum struct Nodes : int { @@ -550,7 +551,7 @@ void writeExpression(std::ostream& o, expressions::Node* n) } } } -} +} // namespace o2::framework void o2::framework::ExpressionJSONHelpers::write(std::ostream& o, expressions::Node* n) { From ff12e026098a7a83628da2ae36b319e19a0fecb6 Mon Sep 17 00:00:00 2001 From: Anton Alkin Date: Thu, 6 Nov 2025 09:58:04 +0100 Subject: [PATCH 3/9] use bulk conversion --- .../include/Framework/ExpressionJSONHelpers.h | 5 +- Framework/Core/src/AODReaderHelpers.cxx | 22 --- Framework/Core/src/ExpressionJSONHelpers.cxx | 139 ++++++++++++++---- Framework/Core/test/test_Expressions.cxx | 28 ++-- 4 files changed, 132 insertions(+), 62 deletions(-) diff --git a/Framework/Core/include/Framework/ExpressionJSONHelpers.h b/Framework/Core/include/Framework/ExpressionJSONHelpers.h index 6b865399ecffb..6abb0047ec524 100644 --- a/Framework/Core/include/Framework/ExpressionJSONHelpers.h +++ b/Framework/Core/include/Framework/ExpressionJSONHelpers.h @@ -16,8 +16,9 @@ namespace o2::framework { struct ExpressionJSONHelpers { - static std::unique_ptr read(std::istream& s); - static void write(std::ostream& o, expressions::Node* n); + // static std::unique_ptr read(std::istream& s); + static std::vector read(std::istream& s); + static void write(std::ostream& o, std::vector& projectors); }; } // namespace o2::framework diff --git a/Framework/Core/src/AODReaderHelpers.cxx b/Framework/Core/src/AODReaderHelpers.cxx index 2587b8e4ca03a..553c5b5eb8ba3 100644 --- a/Framework/Core/src/AODReaderHelpers.cxx +++ b/Framework/Core/src/AODReaderHelpers.cxx @@ -44,28 +44,6 @@ auto setEOSCallback(InitContext& ic) }); } -template -static inline auto doExtractOriginal(framework::pack, ProcessingContext& pc) -{ - if constexpr (sizeof...(Ts) == 1) { - return pc.inputs().get(aod::MetadataTrait>>::metadata::tableLabel())->asArrowTable(); - } else { - return std::vector{pc.inputs().get(aod::MetadataTrait::metadata::tableLabel())->asArrowTable()...}; - } -} - -template -static inline auto extractOriginalsTuple(framework::pack, ProcessingContext& pc) -{ - return std::make_tuple(extractTypedOriginal(pc)...); -} - -template -static inline auto extractOriginalsVector(framework::pack, ProcessingContext& pc) -{ - return std::vector{extractOriginal(pc)...}; -} - template refs> static inline auto extractOriginals(ProcessingContext& pc) { diff --git a/Framework/Core/src/ExpressionJSONHelpers.cxx b/Framework/Core/src/ExpressionJSONHelpers.cxx index 07713fcf395eb..66e62cebf1989 100644 --- a/Framework/Core/src/ExpressionJSONHelpers.cxx +++ b/Framework/Core/src/ExpressionJSONHelpers.cxx @@ -22,7 +22,8 @@ namespace o2::framework { - +namespace +{ using nodes = expressions::Node::self_t; enum struct Nodes : int { NLITERAL = 0, @@ -46,7 +47,8 @@ struct Entry { ToWrite toWrite = ToWrite::FULL; }; -std::array validKeys{ +std::array validKeys{ + "projectors", "kind", "binding", "index", @@ -58,30 +60,30 @@ std::array validKeys{ "right", "condition"}; -namespace -{ struct ExpressionReader : public rapidjson::BaseReaderHandler, ExpressionReader> { using Ch = rapidjson::UTF8<>::Ch; using SizeType = rapidjson::SizeType; enum struct State { - IN_START, - IN_STOP, - IN_NODE_LITERAL, - IN_NODE_BINDING, - IN_NODE_OP, - IN_NODE_CONDITIONAL, - IN_ROOT, - IN_LEFT, - IN_RIGHT, - IN_COND, - IN_ERROR + IN_START, // global start + IN_LIST, // opening brace of the list + IN_ROOT, // after encountering the opening of the expression object + IN_LEFT, // in "left" key - subexpression + IN_RIGHT, // in "right" key - subexpression + IN_COND, // in "condition" key - subexpression + IN_NODE_LITERAL, // in literal node + IN_NODE_BINDING, // in binding node + IN_NODE_OP, // in operation node + IN_NODE_CONDITIONAL, // in conditional node + IN_ERROR // generic error state }; std::stack states; std::stack path; std::ostringstream debug; + std::vector result; + std::unique_ptr rootNode = nullptr; std::unique_ptr node = nullptr; expressions::LiteralValue::stored_type value; @@ -101,6 +103,28 @@ struct ExpressionReader : public rapidjson::BaseReaderHandler, states.push(State::IN_START); } + bool StartArray() + { + debug << "Starting array" << std::endl; + if (states.top() == State::IN_START) { + states.push(State::IN_LIST); + return true; + } + states.push(State::IN_ERROR); + return false; + } + + bool EndArray(SizeType) + { + debug << "Ending array" << std::endl; + if (states.top() == State::IN_LIST) { + states.pop(); + return true; + } + states.push(State::IN_ERROR); + return false; + } + bool Key(const Ch* str, SizeType, bool) { debug << "Key(" << str << ")" << std::endl; @@ -112,8 +136,13 @@ struct ExpressionReader : public rapidjson::BaseReaderHandler, } if (states.top() == State::IN_START) { + if (currentKey.compare("projectors") == 0) { + return true; + } + } + + if (states.top() == State::IN_ROOT) { if (currentKey.compare("kind") == 0) { - states.push(State::IN_ROOT); return true; } else { states.push(State::IN_ERROR); // should start from root node @@ -228,38 +257,62 @@ struct ExpressionReader : public rapidjson::BaseReaderHandler, bool StartObject() { + // opening brace encountered debug << "StartObject()" << std::endl; - if (states.top() == State::IN_LEFT || states.top() == State::IN_RIGHT || states.top() == State::IN_COND) { // ready to start a new node + // the first opening brace in the input + if (states.top() == State::IN_START) { return true; } - if (states.top() == State::IN_START) { + // the opening of an expression + if (states.top() == State::IN_LIST) { + states.push(State::IN_ROOT); return true; } + // if we are looking at subexpression + if (states.top() == State::IN_LEFT || states.top() == State::IN_RIGHT || states.top() == State::IN_COND) { // ready to start a new node + return true; + } + // no other object starts are expected states.push(State::IN_ERROR); return false; } bool EndObject(SizeType) { + // closing brace encountered debug << "EndObject()" << std::endl; + // we are closing up an expression if (states.top() == State::IN_NODE_LITERAL || states.top() == State::IN_NODE_OP || states.top() == State::IN_NODE_BINDING || states.top() == State::IN_NODE_CONDITIONAL) { // finalize node // finalize the current node and pop it from the stack (the pointers should be already set states.pop(); + // subexpression if (states.top() == State::IN_LEFT || states.top() == State::IN_RIGHT || states.top() == State::IN_COND) { states.pop(); + return true; + } + + // expression + if (states.top() == State::IN_ROOT) { + result.emplace_back(std::move(rootNode)); + states.pop(); + return true; } - return true; } - if (states.top() == State::IN_ROOT) { + + // we are closing the list + if (states.top() == State::IN_START) { return true; } + // no other object ends are expectedd states.push(State::IN_ERROR); return false; } bool Null() { + // null value debug << "Null()" << std::endl; + // the subexpression can be empty if (states.top() == State::IN_LEFT || states.top() == State::IN_RIGHT || states.top() == State::IN_COND) { // empty node, nothing to do // move the path state to the next @@ -281,6 +334,7 @@ struct ExpressionReader : public rapidjson::BaseReaderHandler, bool Bool(bool b) { debug << "Bool(" << b << ")" << std::endl; + // can be a value in a literal node if (states.top() == State::IN_NODE_LITERAL && currentKey.compare("value") == 0) { value = b; return true; @@ -292,6 +346,7 @@ struct ExpressionReader : public rapidjson::BaseReaderHandler, bool Int(int i) { debug << "Int(" << i << ")" << std::endl; + // can be a value in a literal node if (states.top() == State::IN_NODE_LITERAL && currentKey.compare("value") == 0) { // literal switch (type) { case atype::INT8: @@ -312,12 +367,19 @@ struct ExpressionReader : public rapidjson::BaseReaderHandler, case atype::UINT32: value = i; break; + case atype::UINT64: + value = (uint64_t)i; + break; + case atype::INT64: + value = (int64_t)i; + break; default: states.push(State::IN_ERROR); return false; } return true; } + // can be a node kind designator if (states.top() == State::IN_ROOT || states.top() == State::IN_LEFT || states.top() == State::IN_RIGHT || states.top() == State::IN_COND) { if (currentKey.compare("kind") == 0) { kind = (Nodes)i; @@ -347,18 +409,21 @@ struct ExpressionReader : public rapidjson::BaseReaderHandler, } } } + // can be node index if (states.top() == State::IN_NODE_BINDING || states.top() == State::IN_NODE_CONDITIONAL || states.top() == State::IN_NODE_LITERAL || states.top() == State::IN_NODE_OP) { if (currentKey.compare("index") == 0) { index = (size_t)i; return true; } } + // can be a node type designator if (states.top() == State::IN_NODE_LITERAL || states.top() == State::IN_NODE_BINDING) { if (currentKey.compare("arrow_type") == 0) { type = (atype::type)i; return true; } } + // can be a node operation designato if (states.top() == State::IN_NODE_OP && currentKey.compare("operation") == 0) { operation = (BasicOp)i; return true; @@ -370,10 +435,12 @@ struct ExpressionReader : public rapidjson::BaseReaderHandler, bool Uint(unsigned i) { debug << "Uint(" << i << ")" << std::endl; + // can be node hash if (states.top() == State::IN_NODE_BINDING && currentKey.compare("hash") == 0) { hash = i; return true; } + // any positive value will be first read as unsigned, however the actual type is determined by node's arrow_type debug << ">> falling back to Int" << std::endl; return Int(i); } @@ -381,6 +448,7 @@ struct ExpressionReader : public rapidjson::BaseReaderHandler, bool Int64(int64_t i) { debug << "Int64(" << i << ")" << std::endl; + // can only be a literal node value if (states.top() == State::IN_NODE_LITERAL && currentKey.compare("value") == 0) { value = i; return true; @@ -392,6 +460,7 @@ struct ExpressionReader : public rapidjson::BaseReaderHandler, bool Uint64(uint64_t i) { debug << "Uint64(" << i << ")" << std::endl; + // can only be a literal node value if (states.top() == State::IN_NODE_LITERAL && currentKey.compare("value") == 0) { value = i; return true; @@ -403,6 +472,7 @@ struct ExpressionReader : public rapidjson::BaseReaderHandler, bool Double(double d) { debug << "Double(" << d << ")" << std::endl; + // can only be a literal node value if (states.top() == State::IN_NODE_LITERAL) { switch (type) { case atype::FLOAT: @@ -424,6 +494,7 @@ struct ExpressionReader : public rapidjson::BaseReaderHandler, bool String(const Ch* str, SizeType, bool) { debug << "String(" << str << ")" << std::endl; + // can only be a binding node if (states.top() == State::IN_NODE_BINDING && currentKey.compare("binding") == 0) { binding = str; return true; @@ -434,7 +505,7 @@ struct ExpressionReader : public rapidjson::BaseReaderHandler, }; } // namespace -std::unique_ptr o2::framework::ExpressionJSONHelpers::read(std::istream& s) +std::vector o2::framework::ExpressionJSONHelpers::read(std::istream& s) { rapidjson::Reader reader; rapidjson::IStreamWrapper isw(s); @@ -446,9 +517,11 @@ std::unique_ptr o2::framework::ExpressionJSONHelpers::read(st error << "Cannot parse serialized Expression, error: " << rapidjson::GetParseError_En(reader.GetParseErrorCode()) << " at offset: " << reader.GetErrorOffset(); throw std::runtime_error(error.str()); } - return std::move(ereader.rootNode); + return std::move(ereader.result); } +namespace +{ void writeNodeHeader(rapidjson::Writer& w, expressions::Node const* node) { w.Key("kind"); @@ -491,11 +564,8 @@ void writeNodeHeader(rapidjson::Writer& w, expression node->self); } -void writeExpression(std::ostream& o, expressions::Node* n) +void writeExpression(rapidjson::Writer& w, expressions::Node* n) { - rapidjson::OStreamWrapper osw(o); - rapidjson::Writer w(osw); - std::stack path; path.emplace(n, ToWrite::FULL); while (!path.empty()) { @@ -551,9 +621,20 @@ void writeExpression(std::ostream& o, expressions::Node* n) } } } -} // namespace o2::framework +} // namespace -void o2::framework::ExpressionJSONHelpers::write(std::ostream& o, expressions::Node* n) +void o2::framework::ExpressionJSONHelpers::write(std::ostream& o, std::vector& projectors) { - writeExpression(o, n); + rapidjson::OStreamWrapper osw(o); + rapidjson::Writer w(osw); + w.StartObject(); + w.Key("projectors"); + w.StartArray(); + for (auto& p : projectors) { + writeExpression(w, p.node.get()); + } + w.EndArray(); + w.EndObject(); } + +} // namespace o2::framework diff --git a/Framework/Core/test/test_Expressions.cxx b/Framework/Core/test/test_Expressions.cxx index 5f801d84dd8c2..00a529c108324 100644 --- a/Framework/Core/test/test_Expressions.cxx +++ b/Framework/Core/test/test_Expressions.cxx @@ -396,20 +396,30 @@ TEST_CASE("TestStringExpressionsParsing") TEST_CASE("TestExpressionSerialization") { Filter f = o2::aod::track::signed1Pt > 0.f && ifnode(nabs(o2::aod::track::eta) < 1.0f, nabs(o2::aod::track::x) > 2.0f, nabs(o2::aod::track::y) > 3.0f); - auto ops = createOperations(f); - auto schema = std::make_shared(std::vector{o2::aod::track::Eta::asArrowField(), o2::aod::track::Signed1Pt::asArrowField(), o2::aod::track::X::asArrowField(), o2::aod::track::Y::asArrowField()}); - auto tree = createExpressionTree(ops, schema); + Projector p = -1.f * nlog(ntan(o2::constants::math::PIQuarter - 0.5f * natan(o2::aod::fwdtrack::tgl))); + + std::vector projectors; + projectors.emplace_back(std::move(f)); + projectors.emplace_back(std::move(p)); std::stringstream osm; - ExpressionJSONHelpers::write(osm, f.node.get()); + ExpressionJSONHelpers::write(osm, projectors); std::stringstream ism; ism.str(osm.str()); - Filter fr = ExpressionJSONHelpers::read(ism); + auto ps = ExpressionJSONHelpers::read(ism); - auto s1 = createOperations(f); - auto s2 = createOperations(fr); - auto t1 = createExpressionTree(s1, schema); - auto t2 = createExpressionTree(s2, schema); + auto s1 = createOperations(projectors[0]); + auto s2 = createOperations(ps[0]); + auto schemaf = std::make_shared(std::vector{o2::aod::track::Eta::asArrowField(), o2::aod::track::Signed1Pt::asArrowField(), o2::aod::track::X::asArrowField(), o2::aod::track::Y::asArrowField()}); + auto t1 = createExpressionTree(s1, schemaf); + auto t2 = createExpressionTree(s2, schemaf); REQUIRE(t1->ToString() == t2->ToString()); + + auto s12 = createOperations(projectors[1]); + auto s22 = createOperations(ps[1]); + auto schemap = std::make_shared(std::vector{o2::aod::fwdtrack::Tgl::asArrowField()}); + auto t12 = createExpressionTree(s12, schemap); + auto t22 = createExpressionTree(s22, schemap); + REQUIRE(t12->ToString() == t22->ToString()); } From 6b7c6cdf530e743ffe9012cd222ab99e581910cb Mon Sep 17 00:00:00 2001 From: Anton Alkin Date: Thu, 6 Nov 2025 10:42:54 +0100 Subject: [PATCH 4/9] add expression metadata to input specs --- Framework/Core/include/Framework/ASoA.h | 3 +++ .../Core/include/Framework/AnalysisHelpers.h | 26 +++++++++++++++++++ Framework/Core/src/AnalysisHelpers.cxx | 8 ++++++ 3 files changed, 37 insertions(+) diff --git a/Framework/Core/include/Framework/ASoA.h b/Framework/Core/include/Framework/ASoA.h index b9b97bfa5ca9c..d1252946637c5 100644 --- a/Framework/Core/include/Framework/ASoA.h +++ b/Framework/Core/include/Framework/ASoA.h @@ -1295,6 +1295,9 @@ concept with_ccdb_urls = requires { template concept with_base_table = not_void>::metadata::base_table_t>; +template +concept with_expression_pack = not_void>::metadata::expression_pack_t>; + template os1, size_t N2, std::array os2> consteval bool is_compatible() { diff --git a/Framework/Core/include/Framework/AnalysisHelpers.h b/Framework/Core/include/Framework/AnalysisHelpers.h index 842263cd75abc..8741d99f7b02d 100644 --- a/Framework/Core/include/Framework/AnalysisHelpers.h +++ b/Framework/Core/include/Framework/AnalysisHelpers.h @@ -38,6 +38,8 @@ constexpr auto tableRef2ConfigParamSpec() {"\"\""}}; } +std::string serializeProjectors(std::vector& projectors); + namespace { template @@ -97,6 +99,26 @@ constexpr auto getCCDBMetadata() -> std::vector { return {}; } + +template +constexpr auto getExpressionMetadata() -> std::optional +{ + using expression_pack_t = T::expression_pack_t; + + auto projectors = [](framework::pack) -> std::vector { + return {C::Projector()...}; + }(expression_pack_t{}); + + auto json = serializeProjectors(projectors); + return framework::ConfigParamSpec{"projectors", framework::VariantType::String, json, {"\"\""}}; +} + +template +constexpr auto getExpressionMetadata() -> std::optional +{ + return {}; +} + } // namespace template @@ -107,6 +129,10 @@ constexpr auto tableRef2InputSpec() metadata.insert(metadata.end(), m.begin(), m.end()); auto ccdbMetadata = getCCDBMetadata>::metadata>(); metadata.insert(metadata.end(), ccdbMetadata.begin(), ccdbMetadata.end()); + auto p = getExpressionMetadata>::metadata>(); + if (p) { + metadata.insert(metadata.end(), p.value()); + } return framework::InputSpec{ o2::aod::label(), diff --git a/Framework/Core/src/AnalysisHelpers.cxx b/Framework/Core/src/AnalysisHelpers.cxx index c0f804b47f5af..d304711102251 100644 --- a/Framework/Core/src/AnalysisHelpers.cxx +++ b/Framework/Core/src/AnalysisHelpers.cxx @@ -9,6 +9,7 @@ // granted to it by virtue of its status as an Intergovernmental Organization // or submit itself to any jurisdiction. #include "Framework/ExpressionHelpers.h" +#include "Framework/ExpressionJSONHelpers.h" namespace o2::framework { @@ -26,4 +27,11 @@ void initializePartitionCaches(std::set const& hashes, std::shared_ptr gfilter = framework::expressions::createFilter(schema, framework::expressions::makeCondition(tree)); } } + +std::string serializeProjectors(std::vector& projectors) +{ + std::stringstream osm; + ExpressionJSONHelpers::write(osm, projectors); + return osm.str(); +} } // namespace o2::framework From 760d797fbb588c4dbe068954d2d1da4cfd41e8a4 Mon Sep 17 00:00:00 2001 From: Anton Alkin Date: Mon, 10 Nov 2025 09:27:29 +0100 Subject: [PATCH 5/9] fixes --- Framework/Core/include/Framework/ASoA.h | 8 +++++-- .../Core/include/Framework/AnalysisHelpers.h | 23 +++++++++++-------- Framework/Core/src/AODReaderHelpers.cxx | 23 ++++++++++++++++++- Framework/Core/test/test_Expressions.cxx | 3 +++ 4 files changed, 44 insertions(+), 13 deletions(-) diff --git a/Framework/Core/include/Framework/ASoA.h b/Framework/Core/include/Framework/ASoA.h index d1252946637c5..5d5408a638a9a 100644 --- a/Framework/Core/include/Framework/ASoA.h +++ b/Framework/Core/include/Framework/ASoA.h @@ -1293,10 +1293,14 @@ concept with_ccdb_urls = requires { }; template -concept with_base_table = not_void>::metadata::base_table_t>; +concept with_base_table = requires { + typename aod::MetadataTrait>::metadata::base_table_t; +}; template -concept with_expression_pack = not_void>::metadata::expression_pack_t>; +concept with_expression_pack = requires { + typename T::expression_pack_t{}; +}; template os1, size_t N2, std::array os2> consteval bool is_compatible() diff --git a/Framework/Core/include/Framework/AnalysisHelpers.h b/Framework/Core/include/Framework/AnalysisHelpers.h index 8741d99f7b02d..78e5c16f9d1bb 100644 --- a/Framework/Core/include/Framework/AnalysisHelpers.h +++ b/Framework/Core/include/Framework/AnalysisHelpers.h @@ -26,6 +26,10 @@ #include "Framework/Traits.h" #include +namespace o2::framework { +std::string serializeProjectors(std::vector& projectors); +} + namespace o2::soa { template @@ -38,8 +42,6 @@ constexpr auto tableRef2ConfigParamSpec() {"\"\""}}; } -std::string serializeProjectors(std::vector& projectors); - namespace { template @@ -101,20 +103,23 @@ constexpr auto getCCDBMetadata() -> std::vector } template -constexpr auto getExpressionMetadata() -> std::optional +constexpr auto getExpressionMetadata() -> std::vector { using expression_pack_t = T::expression_pack_t; auto projectors = [](framework::pack) -> std::vector { - return {C::Projector()...}; + std::vector result; + (result.emplace_back(std::move(C::Projector())), ...); + return result; }(expression_pack_t{}); - auto json = serializeProjectors(projectors); - return framework::ConfigParamSpec{"projectors", framework::VariantType::String, json, {"\"\""}}; + auto json = framework::serializeProjectors(projectors); + return {framework::ConfigParamSpec{"projectors", framework::VariantType::String, json, {"\"\""}}}; } template -constexpr auto getExpressionMetadata() -> std::optional + requires(!soa::with_expression_pack) +constexpr auto getExpressionMetadata() -> std::vector { return {}; } @@ -130,9 +135,7 @@ constexpr auto tableRef2InputSpec() auto ccdbMetadata = getCCDBMetadata>::metadata>(); metadata.insert(metadata.end(), ccdbMetadata.begin(), ccdbMetadata.end()); auto p = getExpressionMetadata>::metadata>(); - if (p) { - metadata.insert(metadata.end(), p.value()); - } + metadata.insert(metadata.end(), p.begin(), p.end()); return framework::InputSpec{ o2::aod::label(), diff --git a/Framework/Core/src/AODReaderHelpers.cxx b/Framework/Core/src/AODReaderHelpers.cxx index 553c5b5eb8ba3..ad5984d65080b 100644 --- a/Framework/Core/src/AODReaderHelpers.cxx +++ b/Framework/Core/src/AODReaderHelpers.cxx @@ -19,6 +19,7 @@ #include "Framework/CallbackService.h" #include "Framework/EndOfStreamContext.h" #include "Framework/DataSpecUtils.h" +#include "Framework/ExpressionJSONHelpers.h" #include @@ -134,12 +135,32 @@ auto make_spawn(InputSpec const& input, ProcessingContext& pc) (typename metadata_t::expression_pack_t{}); return o2::framework::spawner(extractOriginals(pc), input.binding.c_str(), projectors.data(), projector, schema); } + +struct Spawnable { + std::vector projectors; + std::vector labels; + std::shared_ptr schema; + + Spawnable(InputSpec const& spec) + { + auto loc = std::find_if(spec.metadata.begin(), spec.metadata.end(), [](ConfigParamSpec const& spc){ return spc.name.compare("projectors") == 0; }); + std::stringstream iws(loc->defaultValue.get()); + projectors = ExpressionJSONHelpers::read(iws); + for (auto& i : spec.metadata) { + if (i.name.starts_with("input:")) { + labels.emplace_back(i.name.substr(6)); + } + } + } +}; } // namespace AlgorithmSpec AODReaderHelpers::aodSpawnerCallback(std::vector& requested) { return AlgorithmSpec::InitCallback{[requested](InitContext& /*ic*/) { - return [requested](ProcessingContext& pc) { + std::vector spawnables; + + return [requested, spawnables](ProcessingContext& pc) { auto outputs = pc.outputs(); // spawn tables for (auto& input : requested) { diff --git a/Framework/Core/test/test_Expressions.cxx b/Framework/Core/test/test_Expressions.cxx index 00a529c108324..de5bd19fc0d4c 100644 --- a/Framework/Core/test/test_Expressions.cxx +++ b/Framework/Core/test/test_Expressions.cxx @@ -422,4 +422,7 @@ TEST_CASE("TestExpressionSerialization") auto t12 = createExpressionTree(s12, schemap); auto t22 = createExpressionTree(s22, schemap); REQUIRE(t12->ToString() == t22->ToString()); + + std::cout << schemaf->ToString() << std::endl; + std::cout << schemap->ToString() << std::endl; } From e458133abb3bb1aa75ebfd018cd3b56959248e36 Mon Sep 17 00:00:00 2001 From: Anton Alkin Date: Tue, 11 Nov 2025 12:32:52 +0100 Subject: [PATCH 6/9] update serialization --- .../Core/include/Framework/AnalysisHelpers.h | 6 +- .../Core/include/Framework/Expressions.h | 2 +- Framework/Core/src/AnalysisHelpers.cxx | 9 +- Framework/Core/src/ExpressionJSONHelpers.cxx | 196 +++++++++++++++++- .../Framework => src}/ExpressionJSONHelpers.h | 6 +- Framework/Core/test/test_Expressions.cxx | 21 +- 6 files changed, 227 insertions(+), 13 deletions(-) rename Framework/Core/{include/Framework => src}/ExpressionJSONHelpers.h (84%) diff --git a/Framework/Core/include/Framework/AnalysisHelpers.h b/Framework/Core/include/Framework/AnalysisHelpers.h index 78e5c16f9d1bb..6b9aa957f6d4f 100644 --- a/Framework/Core/include/Framework/AnalysisHelpers.h +++ b/Framework/Core/include/Framework/AnalysisHelpers.h @@ -28,6 +28,7 @@ #include namespace o2::framework { std::string serializeProjectors(std::vector& projectors); +std::string serializeSchema(std::shared_ptr& schema); } namespace o2::soa @@ -113,8 +114,11 @@ constexpr auto getExpressionMetadata() -> std::vector(o2::soa::createFieldsFromColumns(expression_pack_t{})); + auto json = framework::serializeProjectors(projectors); - return {framework::ConfigParamSpec{"projectors", framework::VariantType::String, json, {"\"\""}}}; + return {framework::ConfigParamSpec{"projectors", framework::VariantType::String, json, {"\"\""}}, + framework::ConfigParamSpec{"schema", framework::VariantType::String, framework::serializeSchema(schema), {"\"\""}}}; } template diff --git a/Framework/Core/include/Framework/Expressions.h b/Framework/Core/include/Framework/Expressions.h index 735bbb890afb4..e08bf8db52bb4 100644 --- a/Framework/Core/include/Framework/Expressions.h +++ b/Framework/Core/include/Framework/Expressions.h @@ -627,6 +627,7 @@ struct Filter { Filter(std::unique_ptr&& ptr) { node = std::move(ptr); + (void)designateSubtrees(node.get()); } Filter(Node&& node_) : node{std::make_unique(std::forward(node_))} @@ -636,7 +637,6 @@ struct Filter { Filter(Filter&& other) : node{std::forward>(other.node)} { - (void)designateSubtrees(node.get()); } Filter(std::string const& input_) : input{input_} {} diff --git a/Framework/Core/src/AnalysisHelpers.cxx b/Framework/Core/src/AnalysisHelpers.cxx index d304711102251..4f78cc42f3f98 100644 --- a/Framework/Core/src/AnalysisHelpers.cxx +++ b/Framework/Core/src/AnalysisHelpers.cxx @@ -9,7 +9,7 @@ // granted to it by virtue of its status as an Intergovernmental Organization // or submit itself to any jurisdiction. #include "Framework/ExpressionHelpers.h" -#include "Framework/ExpressionJSONHelpers.h" +#include "ExpressionJSONHelpers.h" namespace o2::framework { @@ -34,4 +34,11 @@ std::string serializeProjectors(std::vector& ExpressionJSONHelpers::write(osm, projectors); return osm.str(); } + +std::string serializeSchema(std::shared_ptr& schema) +{ + std::stringstream osm; + ArrowJSONHelpers::write(osm, schema); + return osm.str(); +} } // namespace o2::framework diff --git a/Framework/Core/src/ExpressionJSONHelpers.cxx b/Framework/Core/src/ExpressionJSONHelpers.cxx index 66e62cebf1989..9f98eaddc56ce 100644 --- a/Framework/Core/src/ExpressionJSONHelpers.cxx +++ b/Framework/Core/src/ExpressionJSONHelpers.cxx @@ -8,7 +8,7 @@ // In applying this license CERN does not waive the privileges and immunities // granted to it by virtue of its status as an Intergovernmental Organization // or submit itself to any jurisdiction. -#include "Framework/ExpressionJSONHelpers.h" +#include "ExpressionJSONHelpers.h" #include #include @@ -105,7 +105,7 @@ struct ExpressionReader : public rapidjson::BaseReaderHandler, bool StartArray() { - debug << "Starting array" << std::endl; + debug << "StartArray()" << std::endl; if (states.top() == State::IN_START) { states.push(State::IN_LIST); return true; @@ -116,7 +116,7 @@ struct ExpressionReader : public rapidjson::BaseReaderHandler, bool EndArray(SizeType) { - debug << "Ending array" << std::endl; + debug << "EndArray()" << std::endl; if (states.top() == State::IN_LIST) { states.pop(); return true; @@ -513,9 +513,7 @@ std::vector o2::framework::ExpressionJSONHelpers::read(s bool ok = reader.Parse(isw, ereader); if (!ok) { - std::stringstream error; - error << "Cannot parse serialized Expression, error: " << rapidjson::GetParseError_En(reader.GetParseErrorCode()) << " at offset: " << reader.GetErrorOffset(); - throw std::runtime_error(error.str()); + throw framework::runtime_error_f("Cannot parse serialized Expression, error: %s at offset: %d", rapidjson::GetParseError_En(reader.GetParseErrorCode()), reader.GetErrorOffset()); } return std::move(ereader.result); } @@ -637,4 +635,190 @@ void o2::framework::ExpressionJSONHelpers::write(std::ostream& o, std::vector, SchemaReader> { + using Ch = rapidjson::UTF8<>::Ch; + using SizeType = rapidjson::SizeType; + + enum struct State { + IN_START, + IN_LIST, + IN_FIELD, + IN_ERROR + }; + + std::stack states; + std::ostringstream debug; + + std::shared_ptr schema = nullptr; + std::vector> fields; + + std::string currentKey; + + std::string name; + atype::type type; + + SchemaReader() + { + debug << ">>> Start" << std::endl; + states.push(State::IN_START); + } + + bool StartArray() + { + debug << "Starting array" << std::endl; + if (states.top() == State::IN_START && currentKey.compare("fields") == 0) { + states.push(State::IN_LIST); + return true; + } + states.push(State::IN_ERROR); + return false; + } + + bool EndArray(SizeType) + { + debug << "Ending array" << std::endl; + if (states.top() == State::IN_LIST) { + //finalize schema + schema = std::make_shared(fields); + states.pop(); + return true; + } + states.push(State::IN_ERROR); + return false; + } + + bool Key(const Ch* str, SizeType, bool) + { + debug << "Key(" << str << ")" << std::endl; + currentKey = str; + if (states.top() == State::IN_START) { + if (currentKey.compare("fields") == 0) { + return true; + } + } + + if (states.top() == State::IN_FIELD) { + if (currentKey.compare("name") == 0) { + return true; + } + if (currentKey.compare("type") == 0) { + return true; + } + } + + states.push(State::IN_ERROR); + return false; + } + + bool StartObject() + { + debug << "StartObject()" << std::endl; + if (states.top() == State::IN_START) { + return true; + } + + if (states.top() == State::IN_LIST) { + states.push(State::IN_FIELD); + return true; + } + + states.push(State::IN_ERROR); + return false; + } + + bool EndObject(SizeType) + { + debug << "EndObject()" << std::endl; + if (states.top() == State::IN_FIELD) { + states.pop(); + // add a field + fields.emplace_back(std::make_shared(name, expressions::concreteArrowType(type))); + return true; + } + + if (states.top() == State::IN_START) { + return true; + } + + states.push(State::IN_ERROR); + return false; + } + + bool Uint(unsigned i) + { + debug << "Uint(" << i << ")" << std::endl; + if (states.top() == State::IN_FIELD) { + if (currentKey.compare("type") == 0) { + type = (atype::type)i; + return true; + } + } + + states.push(State::IN_ERROR); + return false; + } + + bool String(const Ch* str, SizeType, bool) + { + debug << "String(" << str << ")" << std::endl; + if (states.top() == State::IN_FIELD) { + if (currentKey.compare("name") == 0) { + name = str; + return true; + } + } + + states.push(State::IN_ERROR); + return false; + } + + bool Int(int i) { + debug << "Int(" << i << ")" << std::endl; + return Uint(i); + } + +}; +} + +std::shared_ptr o2::framework::ArrowJSONHelpers::read(std::istream& s) +{ + rapidjson::Reader reader; + rapidjson::IStreamWrapper isw(s); + SchemaReader sreader; + + bool ok = reader.Parse(isw, sreader); + + if(!ok) { + throw framework::runtime_error_f("Cannot parse serialized Expression, error: %s at offset: %d", rapidjson::GetParseError_En(reader.GetParseErrorCode()), reader.GetErrorOffset()); + } + return sreader.schema; +} + +namespace { +void writeSchema(rapidjson::Writer& w, arrow::Schema* schema) +{ + for (auto& f : schema->fields()) { + w.StartObject(); + w.Key("name"); + w.String(f->name().c_str()); + w.Key("type"); + w.Int(f->type()->id()); + w.EndObject(); + } +} +} + +void o2::framework::ArrowJSONHelpers::write(std::ostream& o, std::shared_ptr& schema) +{ + rapidjson::OStreamWrapper osw(o); + rapidjson::Writer w(osw); + w.StartObject(); + w.Key("fields"); + w.StartArray(); + writeSchema(w, schema.get()); + w.EndArray(); + w.EndObject(); +} + } // namespace o2::framework diff --git a/Framework/Core/include/Framework/ExpressionJSONHelpers.h b/Framework/Core/src/ExpressionJSONHelpers.h similarity index 84% rename from Framework/Core/include/Framework/ExpressionJSONHelpers.h rename to Framework/Core/src/ExpressionJSONHelpers.h index 6abb0047ec524..ed4c51c58d5c2 100644 --- a/Framework/Core/include/Framework/ExpressionJSONHelpers.h +++ b/Framework/Core/src/ExpressionJSONHelpers.h @@ -16,10 +16,14 @@ namespace o2::framework { struct ExpressionJSONHelpers { - // static std::unique_ptr read(std::istream& s); static std::vector read(std::istream& s); static void write(std::ostream& o, std::vector& projectors); }; + +struct ArrowJSONHelpers { + static std::shared_ptr read(std::istream& s); + static void write(std::ostream& o, std::shared_ptr& schema); +}; } // namespace o2::framework #endif // FRAMEWORK_EXPRESSIONJSONHELPERS_H diff --git a/Framework/Core/test/test_Expressions.cxx b/Framework/Core/test/test_Expressions.cxx index de5bd19fc0d4c..5df34e5aac14a 100644 --- a/Framework/Core/test/test_Expressions.cxx +++ b/Framework/Core/test/test_Expressions.cxx @@ -12,7 +12,7 @@ #include "Framework/Configurable.h" #include "Framework/ExpressionHelpers.h" #include "Framework/AnalysisDataModel.h" -#include "Framework/ExpressionJSONHelpers.h" +#include "../src/ExpressionJSONHelpers.h" #include #include #include @@ -423,6 +423,21 @@ TEST_CASE("TestExpressionSerialization") auto t22 = createExpressionTree(s22, schemap); REQUIRE(t12->ToString() == t22->ToString()); - std::cout << schemaf->ToString() << std::endl; - std::cout << schemap->ToString() << std::endl; + osm.clear(); + osm.str(""); + ArrowJSONHelpers::write(osm, schemaf); + + ism.clear(); + ism.str(osm.str()); + auto newSchemaf = ArrowJSONHelpers::read(ism); + REQUIRE(schemaf->ToString() == newSchemaf->ToString()); + + osm.clear(); + osm.str(""); + ArrowJSONHelpers::write(osm, schemap); + + ism.clear(); + ism.str(osm.str()); + auto newSchemap = ArrowJSONHelpers::read(ism); + REQUIRE(schemap->ToString() == newSchemap->ToString()); } From bac0d5908de641e0639ea80a4fa239e8fd6d6c4a Mon Sep 17 00:00:00 2001 From: Anton Alkin Date: Tue, 11 Nov 2025 12:36:44 +0100 Subject: [PATCH 7/9] generalize spawner --- .../Core/include/Framework/AODReaderHelpers.h | 5 +- Framework/Core/include/Framework/ASoA.h | 1 + .../Core/include/Framework/TableBuilder.h | 5 +- Framework/Core/src/AODReaderHelpers.cxx | 167 +++++++++++++----- Framework/Core/src/ASoA.cxx | 30 ++++ Framework/Core/src/ArrowSupport.cxx | 2 +- Framework/Core/src/TableBuilder.cxx | 42 +++++ Framework/Core/src/WorkflowHelpers.cxx | 3 +- 8 files changed, 200 insertions(+), 55 deletions(-) diff --git a/Framework/Core/include/Framework/AODReaderHelpers.h b/Framework/Core/include/Framework/AODReaderHelpers.h index 957a5b1cd25ba..800d26c2aeae0 100644 --- a/Framework/Core/include/Framework/AODReaderHelpers.h +++ b/Framework/Core/include/Framework/AODReaderHelpers.h @@ -12,10 +12,7 @@ #ifndef O2_FRAMEWORK_AODREADERHELPERS_H_ #define O2_FRAMEWORK_AODREADERHELPERS_H_ -#include "Framework/TableBuilder.h" #include "Framework/AlgorithmSpec.h" -#include "Framework/Logger.h" -#include "Framework/RootMessageContext.h" #include namespace o2::framework::readers @@ -24,7 +21,7 @@ namespace o2::framework::readers struct AODReaderHelpers { static AlgorithmSpec rootFileReaderCallback(); - static AlgorithmSpec aodSpawnerCallback(std::vector& requested); + static AlgorithmSpec aodSpawnerCallback(ConfigContext const& ctx); static AlgorithmSpec indexBuilderCallback(std::vector& requested); }; diff --git a/Framework/Core/include/Framework/ASoA.h b/Framework/Core/include/Framework/ASoA.h index 5d5408a638a9a..10c1fc4ac3ceb 100644 --- a/Framework/Core/include/Framework/ASoA.h +++ b/Framework/Core/include/Framework/ASoA.h @@ -1270,6 +1270,7 @@ struct TableIterator : IP, C... { struct ArrowHelpers { static std::shared_ptr joinTables(std::vector>&& tables, std::span labels); + static std::shared_ptr joinTables(std::vector>&& tables, std::span labels); static std::shared_ptr concatTables(std::vector>&& tables); }; diff --git a/Framework/Core/include/Framework/TableBuilder.h b/Framework/Core/include/Framework/TableBuilder.h index 1eb493bfd052d..7707afe45b380 100644 --- a/Framework/Core/include/Framework/TableBuilder.h +++ b/Framework/Core/include/Framework/TableBuilder.h @@ -18,7 +18,6 @@ #include "arrow/type_traits.h" // Apparently needs to be on top of the arrow includes. -#include #include #include @@ -796,6 +795,10 @@ auto makeEmptyTable(const char* name, framework::pack p) std::shared_ptr spawnerHelper(std::shared_ptr const& fullTable, std::shared_ptr newSchema, size_t nColumns, expressions::Projector* projectors, const char* name, std::shared_ptr& projector); +std::shared_ptr spawnerHelper(std::shared_ptr const& fullTable, std::shared_ptr newSchema, + const char* name, size_t nColumns, + const std::shared_ptr& projector); + /// Expression-based column generator to materialize columns template requires(soa::has_configurable_extension::metadata>) diff --git a/Framework/Core/src/AODReaderHelpers.cxx b/Framework/Core/src/AODReaderHelpers.cxx index ad5984d65080b..f346757de6284 100644 --- a/Framework/Core/src/AODReaderHelpers.cxx +++ b/Framework/Core/src/AODReaderHelpers.cxx @@ -12,14 +12,16 @@ #include "Framework/AODReaderHelpers.h" #include "Framework/AnalysisHelpers.h" #include "Framework/AnalysisDataModelHelpers.h" -#include "Framework/DataProcessingHelpers.h" #include "Framework/ExpressionHelpers.h" +#include "Framework/DataProcessingHelpers.h" #include "Framework/AlgorithmSpec.h" #include "Framework/ControlService.h" #include "Framework/CallbackService.h" #include "Framework/EndOfStreamContext.h" #include "Framework/DataSpecUtils.h" -#include "Framework/ExpressionJSONHelpers.h" +#include "ExpressionJSONHelpers.h" +#include "Framework/ConfigContext.h" +#include "Framework/AnalysisContext.h" #include @@ -136,72 +138,141 @@ auto make_spawn(InputSpec const& input, ProcessingContext& pc) return o2::framework::spawner(extractOriginals(pc), input.binding.c_str(), projectors.data(), projector, schema); } -struct Spawnable { - std::vector projectors; +struct Maker +{ + std::string binding; std::vector labels; + std::vector> expressions; + std::shared_ptr projector = nullptr; std::shared_ptr schema; + header::DataOrigin origin; + header::DataDescription description; + header::DataHeader::SubSpecificationType version; + + std::shared_ptr make(ProcessingContext& pc) + { + std::vector> originals; + for (auto const& label : labels) { + originals.push_back(pc.inputs().get(label)->asArrowTable()); + } + auto fullTable = soa::ArrowHelpers::joinTables(std::move(originals), std::span{labels.begin(), labels.size()}); + if (projector == nullptr) { + auto s = gandiva::Projector::Make( + fullTable->schema(), + expressions, + &projector); + if (!s.ok()) { + throw o2::framework::runtime_error_f("Failed to create projector: %s", s.ToString().c_str()); + } + } + + return spawnerHelper(fullTable, schema, binding.c_str(), schema->num_fields(), projector); + } + +}; + +struct Spawnable { + std::string binding; + std::vector labels; + std::vector projectors; + std::vector> expressions; + std::shared_ptr outputSchema; + std::shared_ptr inputSchema; + + header::DataOrigin origin; + header::DataDescription description; + header::DataHeader::SubSpecificationType version; + Spawnable(InputSpec const& spec) + : binding{spec.binding} { - auto loc = std::find_if(spec.metadata.begin(), spec.metadata.end(), [](ConfigParamSpec const& spc){ return spc.name.compare("projectors") == 0; }); + auto&& [origin_, description_, version_] = DataSpecUtils::asConcreteDataMatcher(spec); + origin = origin_; + description = description_; + version = version_; + auto loc = std::find_if(spec.metadata.begin(), spec.metadata.end(), [](ConfigParamSpec const& cps){ return cps.name.compare("projectors") == 0; }); std::stringstream iws(loc->defaultValue.get()); projectors = ExpressionJSONHelpers::read(iws); + + loc = std::find_if(spec.metadata.begin(), spec.metadata.end(), [](ConfigParamSpec const& cps){ return cps.name.compare("schema") == 0; }); + iws.clear(); + iws.str(loc->defaultValue.get()); + outputSchema = ArrowJSONHelpers::read(iws); + for (auto& i : spec.metadata) { if (i.name.starts_with("input:")) { labels.emplace_back(i.name.substr(6)); } } + + std::vector> fields; + for (auto& p : projectors) { + expressions::walk(p.node.get(), + [&fields](expressions::Node* n) mutable { + if (n->self.index() == 1) { + auto& b = std::get(n->self); + if ( std::find_if(fields.begin(), fields.end(), [&b](std::shared_ptr const& field){ return field->name() == b.name; }) == fields.end() ) { + fields.emplace_back(std::make_shared(b.name, expressions::concreteArrowType(b.type))); + } + } + }); + } + inputSchema = std::make_shared(fields); + + int i = 0; + for (auto& p : projectors) { + expressions.push_back( + expressions::makeExpression( + expressions::createExpressionTree( + expressions::createOperations(p), + inputSchema), + outputSchema->field(i)) + ); + ++i; + } + } + + std::shared_ptr makeProjector() + { + return expressions::createProjectorHelper(projectors.size(), projectors.data(), inputSchema, outputSchema->fields()); + } + + Maker createMaker() + { + return { + binding, + labels, + expressions, + nullptr, + outputSchema, + origin, + description, + version + }; } + }; + } // namespace -AlgorithmSpec AODReaderHelpers::aodSpawnerCallback(std::vector& requested) +AlgorithmSpec AODReaderHelpers::aodSpawnerCallback(/*std::vector& requested*/ ConfigContext const& ctx) { - return AlgorithmSpec::InitCallback{[requested](InitContext& /*ic*/) { + auto& ac = ctx.services().get(); + return AlgorithmSpec::InitCallback{[requested = ac.spawnerInputs](InitContext& /*ic*/) { std::vector spawnables; + for (auto& i : requested) { + spawnables.emplace_back(i); + } + std::vector makers; + for (auto& s : spawnables) { + makers.push_back(s.createMaker()); + } - return [requested, spawnables](ProcessingContext& pc) { + return [makers](ProcessingContext& pc) mutable { auto outputs = pc.outputs(); - // spawn tables - for (auto& input : requested) { - auto&& [origin, description, version] = DataSpecUtils::asConcreteDataMatcher(input); - if (description == header::DataDescription{"EXTRACK"}) { - outputs.adopt(Output{origin, description, version}, make_spawn>(input, pc)); - } else if (description == header::DataDescription{"EXTRACK_IU"}) { - outputs.adopt(Output{origin, description, version}, make_spawn>(input, pc)); - } else if (description == header::DataDescription{"EXTRACKCOV"}) { - outputs.adopt(Output{origin, description, version}, make_spawn>(input, pc)); - } else if (description == header::DataDescription{"EXTRACKCOV_IU"}) { - outputs.adopt(Output{origin, description, version}, make_spawn>(input, pc)); - } else if (description == header::DataDescription{"EXTRACKEXTRA"}) { - if (version == 0U) { - outputs.adopt(Output{origin, description, version}, make_spawn>(input, pc)); - } else if (version == 1U) { - outputs.adopt(Output{origin, description, version}, make_spawn>(input, pc)); - } else if (version == 2U) { - outputs.adopt(Output{origin, description, version}, make_spawn>(input, pc)); - } - } else if (description == header::DataDescription{"EXMFTTRACK"}) { - if (version == 0U) { - outputs.adopt(Output{origin, description, version}, make_spawn>(input, pc)); - } else if (version == 1U) { - outputs.adopt(Output{origin, description, version}, make_spawn>(input, pc)); - } - } else if (description == header::DataDescription{"EXMFTTRACKCOV"}) { - outputs.adopt(Output{origin, description, version}, make_spawn>(input, pc)); - } else if (description == header::DataDescription{"EXFWDTRACK"}) { - outputs.adopt(Output{origin, description, version}, make_spawn>(input, pc)); - } else if (description == header::DataDescription{"EXFWDTRACKCOV"}) { - outputs.adopt(Output{origin, description, version}, make_spawn>(input, pc)); - } else if (description == header::DataDescription{"EXMCPARTICLE"}) { - if (version == 0U) { - outputs.adopt(Output{origin, description, version}, make_spawn>(input, pc)); - } else if (version == 1U) { - outputs.adopt(Output{origin, description, version}, make_spawn>(input, pc)); - } - } else { - throw runtime_error("Not an extended table"); - } + for (auto& maker : makers) { + outputs.adopt(Output{maker.origin, maker.description, maker.version}, maker.make(pc)); } }; }}; diff --git a/Framework/Core/src/ASoA.cxx b/Framework/Core/src/ASoA.cxx index 83ca358525f9f..6a846c3d45b6c 100644 --- a/Framework/Core/src/ASoA.cxx +++ b/Framework/Core/src/ASoA.cxx @@ -99,6 +99,36 @@ std::shared_ptr ArrowHelpers::joinTables(std::vector ArrowHelpers::joinTables(std::vector>&& tables, std::span labels) +{ + if (tables.size() == 1) { + return tables[0]; + } + for (auto i = 0U; i < tables.size() - 1; ++i) { + if (tables[i]->num_rows() != tables[i + 1]->num_rows()) { + throw o2::framework::runtime_error_f("Tables %s and %s have different sizes (%d vs %d) and cannot be joined!", + labels[i].c_str(), labels[i + 1].c_str(), tables[i]->num_rows(), tables[i + 1]->num_rows()); + } + } + std::vector> fields; + std::vector> columns; + + for (auto& t : tables) { + auto tf = t->fields(); + std::copy(tf.begin(), tf.end(), std::back_inserter(fields)); + } + + auto schema = std::make_shared(fields); + + if (tables[0]->num_rows() != 0) { + for (auto& t : tables) { + auto tc = t->columns(); + std::copy(tc.begin(), tc.end(), std::back_inserter(columns)); + } + } + return arrow::Table::Make(schema, columns); +} + std::shared_ptr ArrowHelpers::concatTables(std::vector>&& tables) { if (tables.size() == 1) { diff --git a/Framework/Core/src/ArrowSupport.cxx b/Framework/Core/src/ArrowSupport.cxx index 932c1fdacacfb..fbdbfac919e27 100644 --- a/Framework/Core/src/ArrowSupport.cxx +++ b/Framework/Core/src/ArrowSupport.cxx @@ -608,7 +608,7 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() spawner->inputs.clear(); // replace AlgorithmSpec // FIXME: it should be made more generic, so it does not need replacement... - spawner->algorithm = readers::AODReaderHelpers::aodSpawnerCallback(ac.spawnerInputs); + spawner->algorithm = readers::AODReaderHelpers::aodSpawnerCallback(ctx); AnalysisSupportHelpers::addMissingOutputsToSpawner({}, ac.spawnerInputs, ac.requestedAODs, *spawner); } diff --git a/Framework/Core/src/TableBuilder.cxx b/Framework/Core/src/TableBuilder.cxx index 2169722efa9da..7e256fb9f1712 100644 --- a/Framework/Core/src/TableBuilder.cxx +++ b/Framework/Core/src/TableBuilder.cxx @@ -130,6 +130,48 @@ std::shared_ptr spawnerHelper(std::shared_ptr const& return arrow::Table::Make(newSchema, arrays); } +std::shared_ptr spawnerHelper(std::shared_ptr const& fullTable, std::shared_ptr newSchema, + const char* name, size_t nColumns, + std::shared_ptr const& projector) +{ + arrow::TableBatchReader reader(*fullTable); + std::shared_ptr batch; + arrow::ArrayVector v; + std::vector chunks; + chunks.resize(nColumns); + std::vector> arrays; + + while (true) { + auto s = reader.ReadNext(&batch); + if (!s.ok()) { + throw runtime_error_f("Cannot read batches from source table to spawn %s: %s", name, s.ToString().c_str()); + } + if (batch == nullptr) { + break; + } + try { + s = projector->Evaluate(*batch, arrow::default_memory_pool(), &v); + if (!s.ok()) { + throw runtime_error_f("Cannot apply projector to source table of %s: %s", name, s.ToString().c_str()); + } + } catch (std::exception& e) { + throw runtime_error_f("Cannot apply projector to source table of %s: exception caught: %s", name, e.what()); + } + + for (auto i = 0U; i < nColumns; ++i) { + chunks[i].emplace_back(v.at(i)); + } + } + + arrays.reserve(nColumns); + for (auto i = 0U; i < nColumns; ++i) { + arrays.push_back(std::make_shared(chunks[i])); + } + + addLabelToSchema(newSchema, name); + return arrow::Table::Make(newSchema, arrays); +} + } // namespace o2::framework template class arrow::NumericBuilder; diff --git a/Framework/Core/src/WorkflowHelpers.cxx b/Framework/Core/src/WorkflowHelpers.cxx index d27753848d544..0b82e8265b604 100644 --- a/Framework/Core/src/WorkflowHelpers.cxx +++ b/Framework/Core/src/WorkflowHelpers.cxx @@ -38,6 +38,7 @@ #include #include #include +#include O2_DECLARE_DYNAMIC_LOG(workflow_helpers); @@ -435,7 +436,7 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext "internal-dpl-aod-spawner", {}, {}, - readers::AODReaderHelpers::aodSpawnerCallback(ac.spawnerInputs), + readers::AODReaderHelpers::aodSpawnerCallback(ctx), {}}; AnalysisSupportHelpers::addMissingOutputsToSpawner({}, ac.spawnerInputs, ac.requestedAODs, aodSpawner); From 6125de3147dfa960bd67be5d2b23eb23ea44e6f0 Mon Sep 17 00:00:00 2001 From: ALICE Action Bot Date: Tue, 11 Nov 2025 11:44:46 +0000 Subject: [PATCH 8/9] Please consider the following formatting changes --- .../Core/include/Framework/AnalysisHelpers.h | 5 +-- Framework/Core/src/AODReaderHelpers.cxx | 33 ++++++++----------- Framework/Core/src/ExpressionJSONHelpers.cxx | 18 +++++----- 3 files changed, 27 insertions(+), 29 deletions(-) diff --git a/Framework/Core/include/Framework/AnalysisHelpers.h b/Framework/Core/include/Framework/AnalysisHelpers.h index 6b9aa957f6d4f..fa82151c6e756 100644 --- a/Framework/Core/include/Framework/AnalysisHelpers.h +++ b/Framework/Core/include/Framework/AnalysisHelpers.h @@ -26,10 +26,11 @@ #include "Framework/Traits.h" #include -namespace o2::framework { +namespace o2::framework +{ std::string serializeProjectors(std::vector& projectors); std::string serializeSchema(std::shared_ptr& schema); -} +} // namespace o2::framework namespace o2::soa { diff --git a/Framework/Core/src/AODReaderHelpers.cxx b/Framework/Core/src/AODReaderHelpers.cxx index f346757de6284..febc19e02834e 100644 --- a/Framework/Core/src/AODReaderHelpers.cxx +++ b/Framework/Core/src/AODReaderHelpers.cxx @@ -138,8 +138,7 @@ auto make_spawn(InputSpec const& input, ProcessingContext& pc) return o2::framework::spawner(extractOriginals(pc), input.binding.c_str(), projectors.data(), projector, schema); } -struct Maker -{ +struct Maker { std::string binding; std::vector labels; std::vector> expressions; @@ -169,7 +168,6 @@ struct Maker return spawnerHelper(fullTable, schema, binding.c_str(), schema->num_fields(), projector); } - }; struct Spawnable { @@ -185,17 +183,17 @@ struct Spawnable { header::DataHeader::SubSpecificationType version; Spawnable(InputSpec const& spec) - : binding{spec.binding} + : binding{spec.binding} { auto&& [origin_, description_, version_] = DataSpecUtils::asConcreteDataMatcher(spec); origin = origin_; description = description_; version = version_; - auto loc = std::find_if(spec.metadata.begin(), spec.metadata.end(), [](ConfigParamSpec const& cps){ return cps.name.compare("projectors") == 0; }); + auto loc = std::find_if(spec.metadata.begin(), spec.metadata.end(), [](ConfigParamSpec const& cps) { return cps.name.compare("projectors") == 0; }); std::stringstream iws(loc->defaultValue.get()); projectors = ExpressionJSONHelpers::read(iws); - loc = std::find_if(spec.metadata.begin(), spec.metadata.end(), [](ConfigParamSpec const& cps){ return cps.name.compare("schema") == 0; }); + loc = std::find_if(spec.metadata.begin(), spec.metadata.end(), [](ConfigParamSpec const& cps) { return cps.name.compare("schema") == 0; }); iws.clear(); iws.str(loc->defaultValue.get()); outputSchema = ArrowJSONHelpers::read(iws); @@ -209,14 +207,14 @@ struct Spawnable { std::vector> fields; for (auto& p : projectors) { expressions::walk(p.node.get(), - [&fields](expressions::Node* n) mutable { - if (n->self.index() == 1) { - auto& b = std::get(n->self); - if ( std::find_if(fields.begin(), fields.end(), [&b](std::shared_ptr const& field){ return field->name() == b.name; }) == fields.end() ) { - fields.emplace_back(std::make_shared(b.name, expressions::concreteArrowType(b.type))); - } - } - }); + [&fields](expressions::Node* n) mutable { + if (n->self.index() == 1) { + auto& b = std::get(n->self); + if (std::find_if(fields.begin(), fields.end(), [&b](std::shared_ptr const& field) { return field->name() == b.name; }) == fields.end()) { + fields.emplace_back(std::make_shared(b.name, expressions::concreteArrowType(b.type))); + } + } + }); } inputSchema = std::make_shared(fields); @@ -227,8 +225,7 @@ struct Spawnable { expressions::createExpressionTree( expressions::createOperations(p), inputSchema), - outputSchema->field(i)) - ); + outputSchema->field(i))); ++i; } } @@ -248,10 +245,8 @@ struct Spawnable { outputSchema, origin, description, - version - }; + version}; } - }; } // namespace diff --git a/Framework/Core/src/ExpressionJSONHelpers.cxx b/Framework/Core/src/ExpressionJSONHelpers.cxx index 9f98eaddc56ce..43741a68a2098 100644 --- a/Framework/Core/src/ExpressionJSONHelpers.cxx +++ b/Framework/Core/src/ExpressionJSONHelpers.cxx @@ -635,7 +635,8 @@ void o2::framework::ExpressionJSONHelpers::write(std::ostream& o, std::vector, SchemaReader> { using Ch = rapidjson::UTF8<>::Ch; using SizeType = rapidjson::SizeType; @@ -679,7 +680,7 @@ struct SchemaReader : public rapidjson::BaseReaderHandler, Sch { debug << "Ending array" << std::endl; if (states.top() == State::IN_LIST) { - //finalize schema + // finalize schema schema = std::make_shared(fields); states.pop(); return true; @@ -773,13 +774,13 @@ struct SchemaReader : public rapidjson::BaseReaderHandler, Sch return false; } - bool Int(int i) { + bool Int(int i) + { debug << "Int(" << i << ")" << std::endl; return Uint(i); } - }; -} +} // namespace std::shared_ptr o2::framework::ArrowJSONHelpers::read(std::istream& s) { @@ -789,13 +790,14 @@ std::shared_ptr o2::framework::ArrowJSONHelpers::read(std::istrea bool ok = reader.Parse(isw, sreader); - if(!ok) { + if (!ok) { throw framework::runtime_error_f("Cannot parse serialized Expression, error: %s at offset: %d", rapidjson::GetParseError_En(reader.GetParseErrorCode()), reader.GetErrorOffset()); } return sreader.schema; } -namespace { +namespace +{ void writeSchema(rapidjson::Writer& w, arrow::Schema* schema) { for (auto& f : schema->fields()) { @@ -807,7 +809,7 @@ void writeSchema(rapidjson::Writer& w, arrow::Schema* w.EndObject(); } } -} +} // namespace void o2::framework::ArrowJSONHelpers::write(std::ostream& o, std::shared_ptr& schema) { From 811c56e75f9e890953e0843a7689ef8ca98911b0 Mon Sep 17 00:00:00 2001 From: Anton Alkin Date: Wed, 12 Nov 2025 19:07:33 +0100 Subject: [PATCH 9/9] fix uint deserialization in expressions --- Framework/Core/src/ExpressionJSONHelpers.cxx | 30 ++++++++++---------- Framework/Core/test/test_Expressions.cxx | 14 +++++++++ 2 files changed, 29 insertions(+), 15 deletions(-) diff --git a/Framework/Core/src/ExpressionJSONHelpers.cxx b/Framework/Core/src/ExpressionJSONHelpers.cxx index 43741a68a2098..8d4907a721f7e 100644 --- a/Framework/Core/src/ExpressionJSONHelpers.cxx +++ b/Framework/Core/src/ExpressionJSONHelpers.cxx @@ -365,13 +365,7 @@ struct ExpressionReader : public rapidjson::BaseReaderHandler, value = (uint16_t)i; break; case atype::UINT32: - value = i; - break; - case atype::UINT64: - value = (uint64_t)i; - break; - case atype::INT64: - value = (int64_t)i; + value = (uint32_t)i; break; default: states.push(State::IN_ERROR); @@ -450,7 +444,17 @@ struct ExpressionReader : public rapidjson::BaseReaderHandler, debug << "Int64(" << i << ")" << std::endl; // can only be a literal node value if (states.top() == State::IN_NODE_LITERAL && currentKey.compare("value") == 0) { - value = i; + switch (type) { + case atype::UINT64: + value = (uint64_t)i; + break; + case atype::INT64: + value = (int64_t)i; + break; + default: + states.push(State::IN_ERROR); + return false; + } return true; } states.push(State::IN_ERROR); // no other contexts allow int64s @@ -460,13 +464,9 @@ struct ExpressionReader : public rapidjson::BaseReaderHandler, bool Uint64(uint64_t i) { debug << "Uint64(" << i << ")" << std::endl; - // can only be a literal node value - if (states.top() == State::IN_NODE_LITERAL && currentKey.compare("value") == 0) { - value = i; - return true; - } - states.push(State::IN_ERROR); // no other contexts allow uints - return false; + // any positive value will be first read as unsigned, however the actual type is determined by node's arrow_type + debug << ">> falling back to Int64" << std::endl; + return Int64(i); } bool Double(double d) diff --git a/Framework/Core/test/test_Expressions.cxx b/Framework/Core/test/test_Expressions.cxx index 5df34e5aac14a..41be7d53d2276 100644 --- a/Framework/Core/test/test_Expressions.cxx +++ b/Framework/Core/test/test_Expressions.cxx @@ -397,10 +397,15 @@ TEST_CASE("TestExpressionSerialization") { Filter f = o2::aod::track::signed1Pt > 0.f && ifnode(nabs(o2::aod::track::eta) < 1.0f, nabs(o2::aod::track::x) > 2.0f, nabs(o2::aod::track::y) > 3.0f); Projector p = -1.f * nlog(ntan(o2::constants::math::PIQuarter - 0.5f * natan(o2::aod::fwdtrack::tgl))); + Projector p1 = ifnode(o2::aod::track::itsClusterSizes > (uint32_t)0, static_cast(o2::aod::track::ITS), (uint8_t)0x0) | + ifnode(o2::aod::track::tpcNClsFindable > (uint8_t)0, static_cast(o2::aod::track::TPC), (uint8_t)0x0) | + ifnode(o2::aod::track::trdPattern > (uint8_t)0, static_cast(o2::aod::track::TRD), (uint8_t)0x0) | + ifnode((o2::aod::track::tofChi2 >= 0.f) && (o2::aod::track::tofExpMom > 0.f), static_cast(o2::aod::track::TOF), (uint8_t)0x0); std::vector projectors; projectors.emplace_back(std::move(f)); projectors.emplace_back(std::move(p)); + projectors.emplace_back(std::move(p1)); std::stringstream osm; ExpressionJSONHelpers::write(osm, projectors); @@ -423,6 +428,15 @@ TEST_CASE("TestExpressionSerialization") auto t22 = createExpressionTree(s22, schemap); REQUIRE(t12->ToString() == t22->ToString()); + auto s13 = createOperations(projectors[2]); + auto s23 = createOperations(ps[2]); + auto schemap1 = std::make_shared(std::vector{o2::aod::track::ITSClusterSizes::asArrowField(), o2::aod::track::TPCNClsFindable::asArrowField(), + o2::aod::track::TRDPattern::asArrowField(), o2::aod::track::TOFChi2::asArrowField(), + o2::aod::track::TOFExpMom::asArrowField()}); + auto t13 = createExpressionTree(s13, schemap1); + auto t23 = createExpressionTree(s23, schemap1); + REQUIRE(t13->ToString() == t23->ToString()); + osm.clear(); osm.str(""); ArrowJSONHelpers::write(osm, schemaf);