diff --git a/Framework/AnalysisSupport/CMakeLists.txt b/Framework/AnalysisSupport/CMakeLists.txt index 92fd55b86a33d..6024134a5495d 100644 --- a/Framework/AnalysisSupport/CMakeLists.txt +++ b/Framework/AnalysisSupport/CMakeLists.txt @@ -16,6 +16,12 @@ if(TARGET JAliEn::JAliEn) set(EXTRA_TARGETS XRootD::Client JAliEn::JAliEn) endif() +o2_add_library(FrameworkOnDemandTablesSupport + SOURCES src/OnDemandPlugin.cxx + src/AODReaderHelpers.cxx + PRIVATE_INCLUDE_DIRECTORIES ${CMAKE_CURRENT_LIST_DIR}/src + PUBLIC_LINK_LIBRARIES O2::Framework ${EXTRA_TARGETS}) + o2_add_library(FrameworkAnalysisSupport SOURCES src/Plugin.cxx src/DataInputDirector.cxx diff --git a/Framework/AnalysisSupport/src/AODReaderHelpers.cxx b/Framework/AnalysisSupport/src/AODReaderHelpers.cxx new file mode 100644 index 0000000000000..40aa5a9537c7f --- /dev/null +++ b/Framework/AnalysisSupport/src/AODReaderHelpers.cxx @@ -0,0 +1,206 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +#include "AODReaderHelpers.h" +#include "../src/ExpressionJSONHelpers.h" +#include "../src/IndexJSONHelpers.h" + +#include "Framework/AnalysisDataModel.h" +#include "Framework/AnalysisHelpers.h" +#include "Framework/DataProcessingHelpers.h" +#include "Framework/AlgorithmSpec.h" +#include "Framework/DataSpecUtils.h" +#include "Framework/ConfigContext.h" +#include "Framework/AnalysisContext.h" + +namespace o2::framework::readers +{ +namespace +{ +struct Buildable { + bool exclusive = false; + std::string binding; + std::vector labels; + header::DataOrigin origin; + header::DataDescription description; + header::DataHeader::SubSpecificationType version; + std::vector records; + std::shared_ptr outputSchema; + + Buildable(InputSpec const& spec) + : binding{spec.binding} + { + auto&& [origin_, description_, version_] = DataSpecUtils::asConcreteDataMatcher(spec); + origin = origin_; + description = description_; + version = version_; + + auto loc = std::find_if(spec.metadata.begin(), spec.metadata.end(), [](ConfigParamSpec const& cps) { return cps.name.compare("index-records") == 0; }); + std::stringstream iws(loc->defaultValue.get()); + records = IndexJSONHelpers::read(iws); + + loc = std::find_if(spec.metadata.begin(), spec.metadata.end(), [](ConfigParamSpec const& cps) { return cps.name.compare("index-exclusive") == 0; }); + exclusive = loc->defaultValue.get(); + + for (auto const& r : records) { + labels.emplace_back(r.label); + } + outputSchema = std::make_shared([](std::vector const& recs) { + std::vector> fields; + for (auto& r : recs) { + fields.push_back(r.field()); + } + return fields; + }(records)) + ->WithMetadata(std::make_shared(std::vector{std::string{"label"}}, std::vector{std::string{binding}})); + } + + framework::Builder createBuilder() const + { + return { + exclusive, + labels, + records, + outputSchema, + origin, + description, + version, + nullptr}; + } +}; + +} // namespace + +AlgorithmSpec AODReaderHelpers::indexBuilderCallback(ConfigContext const& ctx) +{ + auto& ac = ctx.services().get(); + return AlgorithmSpec::InitCallback{[requested = ac.requestedIDXs](InitContext& /*ic*/) { + std::vector buildables; + for (auto& i : requested) { + buildables.emplace_back(i); + } + std::vector builders; + for (auto& b : buildables) { + builders.push_back(b.createBuilder()); + } + return [builders](ProcessingContext& pc) mutable { + auto outputs = pc.outputs(); + for (auto& builder : builders) { + outputs.adopt(Output{builder.origin, builder.description, builder.version}, builder.materialize(pc)); + } + }; + }}; +} + +namespace +{ +struct Spawnable { + std::string binding; + std::vector labels; + std::vector projectors; + std::vector> expressions; + std::shared_ptr outputSchema; + std::shared_ptr inputSchema; + + header::DataOrigin origin; + header::DataDescription description; + header::DataHeader::SubSpecificationType version; + + Spawnable(InputSpec const& spec) + : binding{spec.binding} + { + auto&& [origin_, description_, version_] = DataSpecUtils::asConcreteDataMatcher(spec); + origin = origin_; + description = description_; + version = version_; + auto loc = std::find_if(spec.metadata.begin(), spec.metadata.end(), [](ConfigParamSpec const& cps) { return cps.name.compare("projectors") == 0; }); + std::stringstream iws(loc->defaultValue.get()); + projectors = ExpressionJSONHelpers::read(iws); + + loc = std::find_if(spec.metadata.begin(), spec.metadata.end(), [](ConfigParamSpec const& cps) { return cps.name.compare("schema") == 0; }); + iws.clear(); + iws.str(loc->defaultValue.get()); + outputSchema = ArrowJSONHelpers::read(iws); + o2::framework::addLabelToSchema(outputSchema, binding.c_str()); + + std::vector> schemas; + for (auto& i : spec.metadata) { + if (i.name.starts_with("input-schema:")) { + labels.emplace_back(i.name.substr(13)); + iws.clear(); + auto json = i.defaultValue.get(); + iws.str(json); + schemas.emplace_back(ArrowJSONHelpers::read(iws)); + } + } + + std::vector> fields; + for (auto& s : schemas) { + std::copy(s->fields().begin(), s->fields().end(), std::back_inserter(fields)); + } + + inputSchema = std::make_shared(fields); + expressions = expressions::materializeProjectors(projectors, inputSchema, outputSchema->fields()); + } + + std::shared_ptr makeProjector() const + { + std::shared_ptr p = nullptr; + auto s = gandiva::Projector::Make( + inputSchema, + expressions, + &p); + if (!s.ok()) { + throw o2::framework::runtime_error_f("Failed to create projector: %s", s.ToString().c_str()); + } + return p; + } + + framework::Spawner createMaker() const + { + return { + binding, + labels, + expressions, + makeProjector(), + outputSchema, + inputSchema, + origin, + description, + version}; + } +}; + +} // namespace + +AlgorithmSpec AODReaderHelpers::aodSpawnerCallback(ConfigContext const& ctx) +{ + auto& ac = ctx.services().get(); + return AlgorithmSpec::InitCallback{[requested = ac.spawnerInputs](InitContext& /*ic*/) { + std::vector spawnables; + for (auto& i : requested) { + spawnables.emplace_back(i); + } + std::vector spawners; + for (auto& s : spawnables) { + spawners.push_back(s.createMaker()); + } + + return [spawners](ProcessingContext& pc) mutable { + auto outputs = pc.outputs(); + for (auto& spawner : spawners) { + outputs.adopt(Output{spawner.origin, spawner.description, spawner.version}, spawner.materialize(pc)); + } + }; + }}; +} + +} // namespace o2::framework::readers diff --git a/Framework/Core/include/Framework/AODReaderHelpers.h b/Framework/AnalysisSupport/src/AODReaderHelpers.h similarity index 92% rename from Framework/Core/include/Framework/AODReaderHelpers.h rename to Framework/AnalysisSupport/src/AODReaderHelpers.h index 800d26c2aeae0..197907ca3ccb1 100644 --- a/Framework/Core/include/Framework/AODReaderHelpers.h +++ b/Framework/AnalysisSupport/src/AODReaderHelpers.h @@ -18,11 +18,10 @@ namespace o2::framework::readers { - struct AODReaderHelpers { static AlgorithmSpec rootFileReaderCallback(); static AlgorithmSpec aodSpawnerCallback(ConfigContext const& ctx); - static AlgorithmSpec indexBuilderCallback(std::vector& requested); + static AlgorithmSpec indexBuilderCallback(ConfigContext const& ctx); }; } // namespace o2::framework::readers diff --git a/Framework/AnalysisSupport/src/OnDemandPlugin.cxx b/Framework/AnalysisSupport/src/OnDemandPlugin.cxx new file mode 100644 index 0000000000000..9438f9bf69c96 --- /dev/null +++ b/Framework/AnalysisSupport/src/OnDemandPlugin.cxx @@ -0,0 +1,32 @@ +// Copyright 2019-2025 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. +#include "Framework/Plugins.h" +#include "Framework/AlgorithmSpec.h" +#include "AODReaderHelpers.h" + +struct ExtendedTableSpawner : o2::framework::AlgorithmPlugin { + o2::framework::AlgorithmSpec create(o2::framework::ConfigContext const& config) override + { + return o2::framework::readers::AODReaderHelpers::aodSpawnerCallback(config); + } +}; + +struct IndexTableBuilder : o2::framework::AlgorithmPlugin { + o2::framework::AlgorithmSpec create(o2::framework::ConfigContext const& config) override + { + return o2::framework::readers::AODReaderHelpers::indexBuilderCallback(config); + } +}; + +DEFINE_DPL_PLUGINS_BEGIN +DEFINE_DPL_PLUGIN_INSTANCE(ExtendedTableSpawner, CustomAlgorithm); +DEFINE_DPL_PLUGIN_INSTANCE(IndexTableBuilder, CustomAlgorithm); +DEFINE_DPL_PLUGINS_END diff --git a/Framework/Core/CMakeLists.txt b/Framework/Core/CMakeLists.txt index cefb903c29895..ce8fbb0dc55f7 100644 --- a/Framework/Core/CMakeLists.txt +++ b/Framework/Core/CMakeLists.txt @@ -10,8 +10,7 @@ # or submit itself to any jurisdiction. o2_add_library(Framework - SOURCES src/AODReaderHelpers.cxx - src/AnalysisHelpers.cxx + SOURCES src/AnalysisHelpers.cxx src/AlgorithmSpec.cxx src/ArrowSupport.cxx src/ArrowTableSlicingCache.cxx @@ -143,6 +142,7 @@ o2_add_library(Framework src/Variant.cxx src/VariantJSONHelpers.cxx src/ExpressionJSONHelpers.cxx + src/IndexJSONHelpers.cxx src/VariantPropertyTreeHelpers.cxx src/WorkflowCustomizationHelpers.cxx src/WorkflowHelpers.cxx diff --git a/Framework/Core/include/Framework/ASoA.h b/Framework/Core/include/Framework/ASoA.h index 10c1fc4ac3ceb..a30363605af36 100644 --- a/Framework/Core/include/Framework/ASoA.h +++ b/Framework/Core/include/Framework/ASoA.h @@ -34,7 +34,6 @@ #include #include #include // IWYU pragma: export -#include namespace o2::framework { @@ -53,6 +52,12 @@ void dereferenceWithWrongType(const char* getter, const char* target); void missingFilterDeclaration(int hash, int ai); void notBoundTable(const char* tableName); void* extractCCDBPayload(char* payload, size_t size, TClass const* cl, const char* what); + +template +auto createFieldsFromColumns(framework::pack) +{ + return std::vector>{C::asArrowField()...}; +} } // namespace o2::soa namespace o2::soa @@ -212,6 +217,20 @@ using is_self_index_t = typename std::conditional_t, std namespace o2::aod { +namespace +{ +template map> +static consteval int getIndexPosToKey_impl() +{ + constexpr const auto pos = std::find(map.begin(), map.end(), true); + if constexpr (pos != map.end()) { + return std::distance(map.begin(), pos); + } else { + return -1; + } +} +} // namespace + /// Base type for table metadata template struct TableMetadata { @@ -238,15 +257,9 @@ struct TableMetadata { return getIndexPosToKey_impl(persistent_columns_t{})>(); } - template map> - static consteval int getIndexPosToKey_impl() + static std::shared_ptr getSchema() { - constexpr const auto pos = std::find(map.begin(), map.end(), true); - if constexpr (pos != map.end()) { - return std::distance(map.begin(), pos); - } else { - return -1; - } + return std::make_shared([](framework::pack&& p) { return o2::soa::createFieldsFromColumns(p); }(persistent_columns_t{})); } }; @@ -406,12 +419,6 @@ struct Binding { } }; -template -auto createFieldsFromColumns(framework::pack) -{ - return std::vector>{C::asArrowField()...}; -} - using SelectionVector = std::vector; template @@ -686,7 +693,7 @@ struct Column { static auto asArrowField() { - return std::make_shared(inherited_t::mLabel, framework::expressions::concreteArrowType(framework::expressions::selectArrowType())); + return std::make_shared(inherited_t::mLabel, soa::asArrowDataType()); } /// FIXME: rather than keeping this public we should have a protected @@ -1303,6 +1310,11 @@ concept with_expression_pack = requires { typename T::expression_pack_t{}; }; +template +concept with_index_pack = requires { + typename T::index_pack_t{}; +}; + template os1, size_t N2, std::array os2> consteval bool is_compatible() { @@ -3251,28 +3263,29 @@ consteval auto getIndexTargets() O2HASH(#_Name_ "CfgExtension"); \ DECLARE_SOA_CONFIGURABLE_EXTENDED_TABLE_FULL(_Name_, #_Name_ "CfgExtension", _Table_, "AOD", "EX" _Description_, 0, __VA_ARGS__) -#define DECLARE_SOA_INDEX_TABLE_FULL(_Name_, _Key_, _Origin_, _Version_, _Desc_, _Exclusive_, ...) \ - O2HASH(#_Name_); \ - O2HASH(_Desc_ "/" #_Version_); \ - template > \ - struct _Name_##MetadataFrom : o2::aod::TableMetadata, soa::Index<>, __VA_ARGS__> { \ - static constexpr bool exclusive = _Exclusive_; \ - using Key = _Key_; \ - using index_pack_t = framework::pack<__VA_ARGS__>; \ - static constexpr const auto sources = [](framework::pack) { \ - constexpr auto a = o2::soa::mergeOriginals(); \ - return o2::aod::filterForKey(); \ - }(framework::pack<__VA_ARGS__>{}); \ - }; \ - using _Name_##Metadata = _Name_##MetadataFrom>; \ - \ - template > \ - using _Name_##From = o2::soa::IndexTable, o2::aod::Hash<_Desc_ "/" #_Version_ ""_h>, O, _Key_, __VA_ARGS__>; \ - using _Name_ = _Name_##From>; \ - \ - template <> \ - struct MetadataTrait> { \ - using metadata = _Name_##Metadata; \ +#define DECLARE_SOA_INDEX_TABLE_FULL(_Name_, _Key_, _Origin_, _Version_, _Desc_, _Exclusive_, ...) \ + O2HASH(#_Name_); \ + O2HASH(_Desc_ "/" #_Version_); \ + template > \ + struct _Name_##MetadataFrom : o2::aod::TableMetadata, soa::Index<>, __VA_ARGS__> { \ + static constexpr bool exclusive = _Exclusive_; \ + using Key = _Key_; \ + using index_pack_t = framework::pack<__VA_ARGS__>; \ + static constexpr const auto sources = [](framework::pack) { \ + constexpr auto a = o2::soa::mergeOriginals(); \ + return o2::aod::filterForKey(); \ + }(framework::pack<__VA_ARGS__>{}); \ + static_assert(sources.size() - Key::originals.size() + 1 == framework::pack_size(index_pack_t{}), "One of the referred tables does not have index to Key"); \ + }; \ + using _Name_##Metadata = _Name_##MetadataFrom>; \ + \ + template > \ + using _Name_##From = o2::soa::IndexTable, o2::aod::Hash<_Desc_ "/" #_Version_ ""_h>, O, _Key_, __VA_ARGS__>; \ + using _Name_ = _Name_##From>; \ + \ + template <> \ + struct MetadataTrait> { \ + using metadata = _Name_##Metadata; \ }; // Declare were each row is associated to a timestamp column of an _TimestampSource_ diff --git a/Framework/Core/include/Framework/AnalysisHelpers.h b/Framework/Core/include/Framework/AnalysisHelpers.h index fa82151c6e756..3666fe1299489 100644 --- a/Framework/Core/include/Framework/AnalysisHelpers.h +++ b/Framework/Core/include/Framework/AnalysisHelpers.h @@ -26,10 +26,147 @@ #include "Framework/Traits.h" #include +namespace o2::soa +{ +struct IndexRecord { + std::string label; + std::string columnLabel; + IndexKind kind; + int pos; + std::shared_ptr type = [](IndexKind kind) -> std::shared_ptr { + switch (kind) { + case IndexKind::IdxSingle: + case IndexKind::IdxSelf: + return arrow::int32(); + case IndexKind::IdxSlice: + return arrow::fixed_size_list(arrow::int32(), 2); + case IndexKind::IdxArray: + return arrow::list(arrow::int32()); + default: + return {nullptr}; + } + }(kind); + + auto operator==(IndexRecord const& other) const + { + return (this->label == other.label) && (this->columnLabel == other.columnLabel) && (this->kind == other.kind) && (this->pos == other.pos); + } + + std::shared_ptr field() const + { + return std::make_shared(columnLabel, type); + } +}; + +struct IndexBuilder { + static std::vector makeBuilders(std::vector>&& tables, std::vector const& records); + static void resetBuilders(std::vector& builders, std::vector>&& tables); + + static std::shared_ptr materialize(std::vector& builders, std::vector>&& tables, std::vector const& records, std::shared_ptr const& schema, bool exclusive); +}; +} // namespace o2::soa + namespace o2::framework { +std::shared_ptr makeEmptyTableImpl(const char* name, std::shared_ptr& schema); + +template +auto makeEmptyTable(const char* name) +{ + auto schema = std::make_shared(soa::createFieldsFromColumns(typename T::table_t::persistent_columns_t{})); + return makeEmptyTableImpl(name, schema); +} + +template +auto makeEmptyTable() +{ + auto schema = std::make_shared(soa::createFieldsFromColumns(typename aod::MetadataTrait>::metadata::persistent_columns_t{})); + return makeEmptyTableImpl(o2::aod::label(), schema); +} + +template +auto makeEmptyTable(const char* name, framework::pack p) +{ + auto schema = std::make_shared(soa::createFieldsFromColumns(p)); + return makeEmptyTableImpl(name, schema); +} + +template +auto makeEmptyTable(const char* name) +{ + auto schema = std::make_shared(soa::createFieldsFromColumns(typename aod::MetadataTrait::metadata::persistent_columns_t{})); + return makeEmptyTableImpl(name, schema); +} + +std::shared_ptr spawnerHelper(std::shared_ptr const& fullTable, std::shared_ptr newSchema, size_t nColumns, + expressions::Projector* projectors, const char* name, std::shared_ptr& projector); + +std::shared_ptr spawnerHelper(std::shared_ptr const& fullTable, std::shared_ptr newSchema, + const char* name, size_t nColumns, + const std::shared_ptr& projector); + +/// Expression-based column generator to materialize columns +template + requires(soa::has_extension::metadata>) +auto spawner(std::shared_ptr const& fullTable, const char* name, o2::framework::expressions::Projector* projectors, std::shared_ptr& projector, std::shared_ptr const& schema) +{ + if (fullTable->num_rows() == 0) { + return makeEmptyTable(name); + } + constexpr auto Ncol = []() { + if constexpr (soa::has_configurable_extension) { + return framework::pack_size(typename M::placeholders_pack_t{}); + } else { + return framework::pack_size(typename M::expression_pack_t{}); + } + }.template operator()::metadata>(); + return spawnerHelper(fullTable, schema, Ncol, projectors, name, projector); +} + +template +auto spawner(framework::pack, std::vector>&& tables, const char* name, expressions::Projector* projectors, std::shared_ptr& projector, std::shared_ptr const& schema) +{ + std::array labels{"original"}; + auto fullTable = soa::ArrowHelpers::joinTables(std::move(tables), std::span{labels}); + if (fullTable->num_rows() == 0) { + return makeEmptyTable(name, framework::pack{}); + } + return spawnerHelper(fullTable, schema, sizeof...(C), projectors, name, projector); +} + std::string serializeProjectors(std::vector& projectors); -std::string serializeSchema(std::shared_ptr& schema); +std::string serializeSchema(std::shared_ptr schema); +std::string serializeIndexRecords(std::vector& irs); +std::vector> extractSources(ProcessingContext& pc, std::vector const& labels); + +struct Spawner { + std::string binding; + std::vector labels; + std::vector> expressions; + std::shared_ptr projector = nullptr; + std::shared_ptr schema = nullptr; + std::shared_ptr inputSchema = nullptr; + + header::DataOrigin origin; + header::DataDescription description; + header::DataHeader::SubSpecificationType version; + + std::shared_ptr materialize(ProcessingContext& pc) const; +}; + +struct Builder { + bool exclusive; + std::vector labels; + std::vector records; + std::shared_ptr outputSchema; + header::DataOrigin origin; + header::DataDescription description; + header::DataHeader::SubSpecificationType version; + + std::shared_ptr> builders = nullptr; + + std::shared_ptr materialize(ProcessingContext& pc); +}; } // namespace o2::framework namespace o2::soa @@ -44,6 +181,16 @@ constexpr auto tableRef2ConfigParamSpec() {"\"\""}}; } +template +constexpr auto tableRef2Schema() +{ + return o2::framework::ConfigParamSpec{ + std::string{"input-schema:"} + o2::aod::label(), + framework::VariantType::String, + framework::serializeSchema(o2::aod::MetadataTrait>::metadata::getSchema()), + {"\"\""}}; +} + namespace { template @@ -56,6 +203,16 @@ inline constexpr auto getSources() }.template operator()(); } +template +inline constexpr auto getSourceSchemas() +{ + return [] refs>() { + return [](std::index_sequence) { + return std::vector{soa::tableRef2Schema()...}; + }(std::make_index_sequence()); + }.template operator()(); +} + template inline constexpr auto getCCDBUrls() { @@ -69,15 +226,66 @@ inline constexpr auto getCCDBUrls() return result; } +template + requires(std::same_as) +consteval IndexKind getIndexKind() +{ + return IndexKind::IdxSingle; +} + +template + requires(std::is_bounded_array_v) +consteval IndexKind getIndexKind() +{ + return IndexKind::IdxSlice; +} + +template + requires(framework::is_specialization_v) +consteval IndexKind getIndexKind() +{ + return IndexKind::IdxArray; +} + +template +inline constexpr auto getIndexMapping() +{ + std::vector idx; + using indices = T::index_pack_t; + using Key = T::Key; + [&idx](std::index_sequence) mutable { + constexpr auto refs = T::sources; + ([&idx]() mutable { + constexpr auto pos = o2::aod::MetadataTrait>::metadata::template getIndexPosToKey(); + if constexpr (pos == -1) { + idx.emplace_back(o2::aod::label(), C::columnLabel(), IndexKind::IdxSelf, pos); + } else { + idx.emplace_back(o2::aod::label(), C::columnLabel(), getIndexKind(), pos); + } + }.template operator()>(), + ...); + }(std::make_index_sequence()); + ; + return idx; +} + template constexpr auto getInputMetadata() -> std::vector { std::vector inputMetadata; + auto inputSources = getSources(); std::sort(inputSources.begin(), inputSources.end(), [](framework::ConfigParamSpec const& a, framework::ConfigParamSpec const& b) { return a.name < b.name; }); auto last = std::unique(inputSources.begin(), inputSources.end(), [](framework::ConfigParamSpec const& a, framework::ConfigParamSpec const& b) { return a.name == b.name; }); inputSources.erase(last, inputSources.end()); inputMetadata.insert(inputMetadata.end(), inputSources.begin(), inputSources.end()); + + auto inputSchemas = getSourceSchemas(); + std::sort(inputSchemas.begin(), inputSchemas.end(), [](framework::ConfigParamSpec const& a, framework::ConfigParamSpec const& b) { return a.name < b.name; }); + last = std::unique(inputSchemas.begin(), inputSchemas.end(), [](framework::ConfigParamSpec const& a, framework::ConfigParamSpec const& b) { return a.name == b.name; }); + inputSchemas.erase(last, inputSchemas.end()); + inputMetadata.insert(inputMetadata.end(), inputSchemas.begin(), inputSchemas.end()); + return inputMetadata; } @@ -115,11 +323,8 @@ constexpr auto getExpressionMetadata() -> std::vector(o2::soa::createFieldsFromColumns(expression_pack_t{})); - auto json = framework::serializeProjectors(projectors); - return {framework::ConfigParamSpec{"projectors", framework::VariantType::String, json, {"\"\""}}, - framework::ConfigParamSpec{"schema", framework::VariantType::String, framework::serializeSchema(schema), {"\"\""}}}; + return {framework::ConfigParamSpec{"projectors", framework::VariantType::String, json, {"\"\""}}}; } template @@ -129,6 +334,21 @@ constexpr auto getExpressionMetadata() -> std::vector +constexpr auto getIndexMetadata() -> std::vector +{ + auto map = getIndexMapping(); + return {framework::ConfigParamSpec{"index-records", framework::VariantType::String, framework::serializeIndexRecords(map), {"\"\""}}, + {framework::ConfigParamSpec{"index-exclusive", framework::VariantType::Bool, T::exclusive, {"\"\""}}}}; +} + +template + requires(!soa::with_index_pack) +constexpr auto getIndexMetadata() -> std::vector +{ + return {}; +} + } // namespace template @@ -141,6 +361,11 @@ constexpr auto tableRef2InputSpec() metadata.insert(metadata.end(), ccdbMetadata.begin(), ccdbMetadata.end()); auto p = getExpressionMetadata>::metadata>(); metadata.insert(metadata.end(), p.begin(), p.end()); + auto idx = getIndexMetadata>::metadata>(); + metadata.insert(metadata.end(), idx.begin(), idx.end()); + if constexpr (!soa::with_ccdb_urls>::metadata>) { + metadata.emplace_back(framework::ConfigParamSpec{"schema", framework::VariantType::String, framework::serializeSchema(o2::aod::MetadataTrait>::metadata::getSchema()), {"\"\""}}); + } return framework::InputSpec{ o2::aod::label(), @@ -319,29 +544,29 @@ struct TableTransform { constexpr static auto sources = M::sources; template - static constexpr auto base_spec() + static auto base_spec() { return soa::tableRef2InputSpec(); } static auto base_specs() { - return [](std::index_sequence) -> std::vector { - return {base_spec()...}; + return [](std::index_sequence) { + return std::array{base_spec()...}; }(std::make_index_sequence{}); } - constexpr auto spec() const + static constexpr auto spec() { return soa::tableRef2OutputSpec(); } - constexpr auto output() const + static constexpr auto output() { return soa::tableRef2Output(); } - constexpr auto ref() const + static constexpr auto ref() { return soa::tableRef2OutputRef(); } @@ -367,15 +592,9 @@ struct Spawns : decltype(transformBase()) { using spawnable_t = T; using metadata = decltype(transformBase())::metadata; using extension_t = typename metadata::extension_table_t; - using base_table_t = typename metadata::base_table_t; using expression_pack_t = typename metadata::expression_pack_t; static constexpr size_t N = framework::pack_size(expression_pack_t{}); - constexpr auto pack() - { - return expression_pack_t{}; - } - typename T::table_t* operator->() { return table.get(); @@ -389,6 +608,7 @@ struct Spawns : decltype(transformBase()) { { return extension->asArrowTable(); } + std::shared_ptr table = nullptr; std::shared_ptr extension = nullptr; std::array projectors = [](framework::pack) -> std::array @@ -397,13 +617,17 @@ struct Spawns : decltype(transformBase()) { } (expression_pack_t{}); std::shared_ptr projector = nullptr; - std::shared_ptr schema = std::make_shared(o2::soa::createFieldsFromColumns(expression_pack_t{})); + std::shared_ptr schema = []() { + auto s = std::make_shared(o2::soa::createFieldsFromColumns(expression_pack_t{})); + s->WithMetadata(std::make_shared(std::vector{std::string{"label"}}, std::vector{std::string{o2::aod::label()}})); + return s; + }(); }; template concept is_spawns = requires(T t) { typename T::metadata; - requires std::same_as; + typename T::expression_pack_t; requires std::same_as>; }; @@ -418,15 +642,9 @@ struct Defines : decltype(transformBase()) { using spawnable_t = T; using metadata = decltype(transformBase())::metadata; using extension_t = typename metadata::extension_table_t; - using base_table_t = typename metadata::base_table_t; using placeholders_pack_t = typename metadata::placeholders_pack_t; static constexpr size_t N = framework::pack_size(placeholders_pack_t{}); - constexpr auto pack() - { - return placeholders_pack_t{}; - } - typename T::table_t* operator->() { return table.get(); @@ -445,7 +663,11 @@ struct Defines : decltype(transformBase()) { std::array projectors; std::shared_ptr projector = nullptr; - std::shared_ptr schema = std::make_shared(o2::soa::createFieldsFromColumns(placeholders_pack_t{})); + std::shared_ptr schema = []() { + auto s = std::make_shared(o2::soa::createFieldsFromColumns(placeholders_pack_t{})); + s->WithMetadata(std::make_shared(std::vector{std::string{"label"}}, std::vector{std::string{o2::aod::label()}})); + return s; + }(); std::shared_ptr inputSchema = nullptr; bool needRecompilation = false; @@ -462,7 +684,7 @@ using DefinesDelayed = Defines; template concept is_defines = requires(T t) { typename T::metadata; - requires std::same_as; + typename T::placeholders_pack_t; requires std::same_as>; requires std::same_as; &T::recompile; @@ -477,129 +699,6 @@ struct Exclusive { struct Sparse { }; -namespace -{ -template -inline std::shared_ptr getIndexToKey(arrow::Table* table) -{ - using IC = framework::pack_element_t(typename T::external_index_columns_t{}), typename T::external_index_columns_t>; - return table->column(framework::has_type_at_v(typename T::persistent_columns_t{})); -} - -template -struct ColumnTrait { - using column_t = C; - - static consteval auto listSize() - { - if constexpr (std::same_as>) { - return -1; - } else if constexpr (std::same_as) { - return 2; - } else { - return 1; - } - } - - template - static std::shared_ptr makeColumnBuilder(arrow::Table* table, arrow::MemoryPool* pool) - { - if constexpr (!std::same_as) { - return std::make_shared(getIndexToKey(table), C::columnLabel(), listSize(), pool); - } else { - return std::make_shared(C::columnLabel(), pool); - } - } -}; - -template -struct Reduction { - using type = typename std::conditional(), SelfIndexColumnBuilder, IndexColumnBuilder>::type; -}; - -template -using reduced_t = Reduction::type; -} // namespace - -template -struct IndexBuilder { - template refs, typename C1, typename... Cs> - static auto indexBuilder(const char* label, std::vector>&& tables, framework::pack) - { - auto pool = arrow::default_memory_pool(); - SelfIndexColumnBuilder self{C1::columnLabel(), pool}; - std::unique_ptr keyIndex = nullptr; - if constexpr (!Key::template hasOriginal()) { - keyIndex = std::make_unique(tables[0]->column(o2::aod::MetadataTrait>::metadata::template getIndexPosToKey())); - } - - auto sq = std::make_index_sequence(); - - auto columnBuilders = [&tables, &pool ](std::index_sequence) -> std::array, sizeof...(Cs)> - { - return {[](arrow::Table* table, arrow::MemoryPool* pool) { - using T = framework::pack_element_t>; - if constexpr (!Key::template hasOriginal()) { - constexpr auto pos = o2::aod::MetadataTrait>::metadata::template getIndexPosToKey(); - return std::make_shared(table->column(pos), T::columnLabel(), ColumnTrait::listSize(), pool); - } else { - return std::make_shared(T::columnLabel(), pool); - } - }(tables[Is + 1].get(), pool)...}; - } - (sq); - - std::array finds; - - for (int64_t counter = 0; counter < tables[0]->num_rows(); ++counter) { - int64_t idx = -1; - if constexpr (Key::template hasOriginal()) { - idx = counter; - } else { - idx = keyIndex->valueAt(counter); - } - finds = [&idx, &columnBuilders](std::index_sequence) { - return std::array{ - [&idx, &columnBuilders]() { - using T = typename framework::pack_element_t>; - return std::static_pointer_cast>(columnBuilders[Is])->template find(idx); - }()...}; - }(sq); - if constexpr (std::same_as) { - [&idx, &columnBuilders](std::index_sequence) { - ([&idx, &columnBuilders]() { - using T = typename framework::pack_element_t>; - return std::static_pointer_cast>(columnBuilders[Is])->template fill(idx); }(), ...); - }(sq); - self.fill(counter); - } else if constexpr (std::same_as) { - if (std::none_of(finds.begin(), finds.end(), [](bool const x) { return x == false; })) { - [&idx, &columnBuilders](std::index_sequence) { - ([&idx, &columnBuilders]() { - using T = typename framework::pack_element_t>; - return std::static_pointer_cast>(columnBuilders[Is])->template fill(idx); - }(), - ...); - }(sq); - self.fill(counter); - } - } - } - - return [&label, &columnBuilders, &self](std::index_sequence) { - return makeArrowTable(label, - {self.template result(), [&columnBuilders]() { - using T = typename framework::pack_element_t>; - return std::static_pointer_cast>(columnBuilders[Is])->template result(); - }()...}, - {self.field(), [&columnBuilders]() { - using T = typename framework::pack_element_t>; - return std::static_pointer_cast>(columnBuilders[Is])->field(); - }()...}); - }(sq); - } -}; - /// This helper struct allows you to declare index tables to be created in a task template @@ -613,12 +712,17 @@ template struct Builds : decltype(transformBase()) { using buildable_t = T; using metadata = decltype(transformBase())::metadata; - using IP = std::conditional_t, IndexBuilder>; using Key = metadata::Key; using H = typename T::first_t; using Ts = typename T::rest_t; using index_pack_t = metadata::index_pack_t; + std::shared_ptr outputSchema = []() { return std::make_shared(soa::createFieldsFromColumns(index_pack_t{}))->WithMetadata(std::make_shared(std::vector{std::string{"label"}}, std::vector{std::string{o2::aod::label()}})); }(); + + std::vector map = soa::getIndexMapping(); + + std::vector builders; + T* operator->() { return table.get(); @@ -639,10 +743,9 @@ struct Builds : decltype(transformBase()) { return index_pack_t{}; } - template - auto build(framework::pack, std::vector>&& tables) + auto build(std::vector>&& tables) { - this->table = std::make_shared(IP::template indexBuilder(o2::aod::label(), std::forward>>(tables), framework::pack{})); + this->table = std::make_shared(soa::IndexBuilder::materialize(builders, std::forward>>(tables), map, outputSchema, metadata::exclusive)); return (this->table != nullptr); } }; @@ -651,7 +754,7 @@ template concept is_builds = requires(T t) { typename T::metadata; typename T::Key; - requires std::same_as; + requires std::same_as>; }; /// This helper class allows you to declare things which will be created by a diff --git a/Framework/Core/include/Framework/AnalysisManagers.h b/Framework/Core/include/Framework/AnalysisManagers.h index 596f3da6a557a..fbb499940b9b9 100644 --- a/Framework/Core/include/Framework/AnalysisManagers.h +++ b/Framework/Core/include/Framework/AnalysisManagers.h @@ -34,18 +34,6 @@ namespace o2::framework namespace { -template -static inline auto extractOriginal(ProcessingContext& pc) -{ - return pc.inputs().get(aod::MetadataTrait::metadata::tableLabel())->asArrowTable(); -} - -template -static inline std::vector> extractOriginals(framework::pack, ProcessingContext& pc) -{ - return {extractOriginal(pc)...}; -} - template refs> static inline auto extractOriginals(ProcessingContext& pc) { @@ -160,12 +148,12 @@ const char* controlOption() } template -concept with_base_table = requires(T const& t) { t.base_specs(); }; +concept with_base_table = requires { T::base_specs(); }; template bool requestInputs(std::vector& inputs, T const& entity) { - auto base_specs = entity.base_specs(); + auto base_specs = T::base_specs(); for (auto base_spec : base_specs) { base_spec.metadata.push_back(ConfigParamSpec{std::string{controlOption()}, VariantType::Bool, true, {"\"\""}}); DataSpecUtils::updateInputList(inputs, std::forward(base_spec)); @@ -289,9 +277,8 @@ bool prepareOutput(ProcessingContext& context, T& spawns) { using metadata = o2::aod::MetadataTrait>::metadata; auto originalTable = soa::ArrowHelpers::joinTables(extractOriginals(context), std::span{metadata::base_table_t::originalLabels}); - if (originalTable->schema()->fields().empty() == true) { - using base_table_t = typename T::base_table_t::table_t; - originalTable = makeEmptyTable(o2::aod::label()); + if (originalTable->num_rows() == 0) { + originalTable = makeEmptyTable(); } using D = o2::aod::Hash; @@ -308,7 +295,7 @@ template bool prepareOutput(ProcessingContext& context, T& builds) { using metadata = o2::aod::MetadataTrait>::metadata; - return builds.template build(builds.pack(), extractOriginals(context)); + return builds.build(extractOriginals(context)); } template @@ -317,9 +304,8 @@ bool prepareOutput(ProcessingContext& context, T& defines) { using metadata = o2::aod::MetadataTrait>::metadata; auto originalTable = soa::ArrowHelpers::joinTables(extractOriginals(context), std::span{metadata::base_table_t::originalLabels}); - if (originalTable->schema()->fields().empty() == true) { - using base_table_t = typename T::base_table_t::table_t; - originalTable = makeEmptyTable(o2::aod::label()); + if (originalTable->num_rows() == 0) { + originalTable = makeEmptyTable(); } if (defines.inputSchema == nullptr) { defines.inputSchema = originalTable->schema(); @@ -350,9 +336,8 @@ bool prepareDelayedOutput(ProcessingContext& context, T& defines) } using metadata = o2::aod::MetadataTrait>::metadata; auto originalTable = soa::ArrowHelpers::joinTables(extractOriginals(context), std::span{metadata::base_table_t::originalLabels}); - if (originalTable->schema()->fields().empty() == true) { - using base_table_t = typename T::base_table_t::table_t; - originalTable = makeEmptyTable(o2::aod::label()); + if (originalTable->num_rows() == 0) { + originalTable = makeEmptyTable(); } if (defines.inputSchema == nullptr) { defines.inputSchema = originalTable->schema(); diff --git a/Framework/Core/include/Framework/ArrowTypes.h b/Framework/Core/include/Framework/ArrowTypes.h index 6fd70113fede7..2673472a81152 100644 --- a/Framework/Core/include/Framework/ArrowTypes.h +++ b/Framework/Core/include/Framework/ArrowTypes.h @@ -11,6 +11,7 @@ #ifndef O2_FRAMEWORK_ARROWTYPES_H #define O2_FRAMEWORK_ARROWTYPES_H +#include "Framework/Traits.h" #include "arrow/type_fwd.h" #include @@ -117,5 +118,54 @@ template using arrow_array_for_t = typename arrow_array_for::type; template using value_for_t = typename arrow_array_for::value_type; + +template +using array_element_t = std::decay_t()[0])>; + +template +std::shared_ptr asArrowDataType(int list_size = 1) +{ + auto typeGenerator = [](std::shared_ptr const& type, int list_size) -> std::shared_ptr { + switch (list_size) { + case -1: + return arrow::list(type); + case 1: + return std::move(type); + default: + return arrow::fixed_size_list(type, list_size); + } + }; + + if constexpr (std::is_arithmetic_v) { + if constexpr (std::same_as) { + return typeGenerator(arrow::boolean(), list_size); + } else if constexpr (std::same_as) { + return typeGenerator(arrow::uint8(), list_size); + } else if constexpr (std::same_as) { + return typeGenerator(arrow::uint16(), list_size); + } else if constexpr (std::same_as) { + return typeGenerator(arrow::uint32(), list_size); + } else if constexpr (std::same_as) { + return typeGenerator(arrow::uint64(), list_size); + } else if constexpr (std::same_as) { + return typeGenerator(arrow::int8(), list_size); + } else if constexpr (std::same_as) { + return typeGenerator(arrow::int16(), list_size); + } else if constexpr (std::same_as) { + return typeGenerator(arrow::int32(), list_size); + } else if constexpr (std::same_as) { + return typeGenerator(arrow::int64(), list_size); + } else if constexpr (std::same_as) { + return typeGenerator(arrow::float32(), list_size); + } else if constexpr (std::same_as) { + return typeGenerator(arrow::float64(), list_size); + } + } else if constexpr (std::is_bounded_array_v) { + return asArrowDataType>(std::extent_v); + } else if constexpr (o2::framework::is_specialization_v) { + return asArrowDataType(-1); + } + return nullptr; +} } // namespace o2::soa #endif // O2_FRAMEWORK_ARROWTYPES_H diff --git a/Framework/Core/include/Framework/Expressions.h b/Framework/Core/include/Framework/Expressions.h index e08bf8db52bb4..0be19954f1faa 100644 --- a/Framework/Core/include/Framework/Expressions.h +++ b/Framework/Core/include/Framework/Expressions.h @@ -712,6 +712,8 @@ std::shared_ptr createProjectorHelper(size_t nColumns, expre std::shared_ptr schema, std::vector> const& fields); +std::vector> materializeProjectors(std::vector const& projectors, std::shared_ptr const& inputSchema, std::vector> outputFields); + template std::shared_ptr createProjectors(framework::pack, std::vector> const& fields, gandiva::SchemaPtr schema) { diff --git a/Framework/Core/include/Framework/IndexBuilderHelpers.h b/Framework/Core/include/Framework/IndexBuilderHelpers.h index d02d5cfc59b3f..30754e62a8dc3 100644 --- a/Framework/Core/include/Framework/IndexBuilderHelpers.h +++ b/Framework/Core/include/Framework/IndexBuilderHelpers.h @@ -11,23 +11,32 @@ #ifndef O2_FRAMEWORK_INDEXBUILDERHELPERS_H_ #define O2_FRAMEWORK_INDEXBUILDERHELPERS_H_ -#include "arrow/array.h" #include #include #include -#include #include -#include + +namespace o2::soa +{ +enum struct IndexKind : int { + IdxInvalid = -1, + IdxSelf = 0, + IdxSingle = 1, + IdxSlice = 2, + IdxArray = 3 +}; +} // namespace o2::soa namespace o2::framework { void cannotBuildAnArray(); +void cannotCreateIndexBuilder(); struct ChunkedArrayIterator { ChunkedArrayIterator(std::shared_ptr source); - virtual ~ChunkedArrayIterator() = default; + void reset(std::shared_ptr& source); - std::shared_ptr mSource; + std::shared_ptr mSource = nullptr; size_t mPosition = 0; int mChunk = 0; size_t mOffset = 0; @@ -35,6 +44,7 @@ struct ChunkedArrayIterator { int const* mCurrent = nullptr; int const* mLast = nullptr; size_t mFirstIndex = 0; + size_t mSourceSize = 0; std::shared_ptr getCurrentArray(); void nextChunk(); @@ -42,114 +52,72 @@ struct ChunkedArrayIterator { int valueAt(size_t pos); }; -struct SelfIndexColumnBuilder { - SelfIndexColumnBuilder(const char* name, arrow::MemoryPool* pool); - virtual ~SelfIndexColumnBuilder() = default; - - template - inline std::shared_ptr result() const - { - std::shared_ptr array; - auto status = static_cast(mBuilder.get())->Finish(&array); - if (!status.ok()) { - cannotBuildAnArray(); - } +struct SelfBuilder { + std::unique_ptr mBuilder = nullptr; + std::unique_ptr keyIndex = nullptr; + SelfBuilder(arrow::MemoryPool* pool); + void reset(std::shared_ptr); - return std::make_shared(array); - } - std::shared_ptr field() const; - template - inline bool find(int) + inline bool find(int) const { return true; } - - template - inline void fill(int idx) - { - (void)static_cast(mBuilder.get())->Append(idx); - } - - std::string mColumnName; - std::shared_ptr mArrowType; - std::unique_ptr mBuilder = nullptr; + void fill(int idx); + std::shared_ptr result() const; }; -class IndexColumnBuilder : public SelfIndexColumnBuilder, public ChunkedArrayIterator -{ - public: - IndexColumnBuilder(std::shared_ptr source, const char* name, int listSize, arrow::MemoryPool* pool); - ~IndexColumnBuilder() override = default; +struct SingleBuilder : public ChunkedArrayIterator { + std::unique_ptr mBuilder = nullptr; + SingleBuilder(std::shared_ptr source, arrow::MemoryPool* pool); + void reset(std::shared_ptr source); - template - inline std::shared_ptr result() const - { - if constexpr (std::same_as>) { - return resultMulti(); - } else if constexpr (std::same_as) { - return resultSlice(); - } else { - return resultSingle(); - } - } + bool find(int idx); + void fill(int idx); + std::shared_ptr result() const; +}; - template - inline bool find(int idx) - { - if constexpr (std::same_as>) { - return findMulti(idx); - } else if constexpr (std::same_as) { - return findSlice(idx); - } else { - return findSingle(idx); - } - } +struct SliceBuilder : public ChunkedArrayIterator { + arrow::ArrayBuilder* mValueBuilder = nullptr; + std::unique_ptr mListBuilder = nullptr; + std::shared_ptr> mValues = nullptr; + std::shared_ptr> mCounts = nullptr; + int mValuePos = 0; + SliceBuilder(std::shared_ptr source, arrow::MemoryPool* pool); + void reset(std::shared_ptr source); - template - inline void fill(int idx) - { - ++mResultSize; - if constexpr (std::same_as>) { - fillMulti(idx); - } else if constexpr (std::same_as) { - fillSlice(idx); - } else { - fillSingle(idx); - } - } + bool find(int idx); + void fill(int idx); + std::shared_ptr result() const; - private: arrow::Status preSlice(); - arrow::Status preFind(); - - bool findSingle(int idx); - bool findSlice(int idx); - bool findMulti(int idx); - - void fillSingle(int idx); - void fillSlice(int idx); - void fillMulti(int idx); - - std::shared_ptr resultSingle() const; - std::shared_ptr resultSlice() const; - std::shared_ptr resultMulti() const; +}; - int mListSize = 1; +struct ArrayBuilder : public ChunkedArrayIterator { arrow::ArrayBuilder* mValueBuilder = nullptr; + std::vector mValues; + std::vector> mIndices; std::unique_ptr mListBuilder = nullptr; + ArrayBuilder(std::shared_ptr source, arrow::MemoryPool* pool); + void reset(std::shared_ptr source); - size_t mSourceSize = 0; - size_t mResultSize = 0; + bool find(int idx); + void fill(int idx); + std::shared_ptr result() const; - std::shared_ptr> mValuesArrow = nullptr; - std::shared_ptr> mCounts = nullptr; - std::vector mValues; - std::vector> mIndices; - int mFillOffset = 0; - int mValuePos = 0; + arrow::Status preFind(); }; -std::shared_ptr makeArrowTable(const char* label, std::vector>&& columns, std::vector>&& fields); +struct IndexColumnBuilder { + std::variant builder; + size_t mResultSize = 0; + int mColumnPos = -1; + IndexColumnBuilder(soa::IndexKind kind, int pos, arrow::MemoryPool* pool, std::shared_ptr source = nullptr); + void reset(std::shared_ptr source = nullptr); + + bool find(int idx); + void fill(int idx); + std::shared_ptr result() const; +}; } // namespace o2::framework #endif // O2_FRAMEWORK_INDEXBUILDERHELPERS_H_ diff --git a/Framework/Core/include/Framework/TableBuilder.h b/Framework/Core/include/Framework/TableBuilder.h index 7707afe45b380..845820dfe4bff 100644 --- a/Framework/Core/include/Framework/TableBuilder.h +++ b/Framework/Core/include/Framework/TableBuilder.h @@ -15,7 +15,6 @@ #include "Framework/ASoA.h" #include "Framework/StructToTuple.h" #include "Framework/RuntimeError.h" -#include "arrow/type_traits.h" // Apparently needs to be on top of the arrow includes. @@ -26,6 +25,7 @@ #include #include #include +#include #include #include @@ -764,92 +764,5 @@ class TableBuilder std::shared_ptr mSchema; std::vector> mArrays; }; - -template -auto makeEmptyTable(const char* name) -{ - TableBuilder b; - [[maybe_unused]] auto writer = b.cursor(); - b.setLabel(name); - return b.finalize(); -} - -template -auto makeEmptyTable() -{ - TableBuilder b; - [[maybe_unused]] auto writer = b.cursor(typename aod::MetadataTrait>::metadata::persistent_columns_t{}); - b.setLabel(aod::label()); - return b.finalize(); -} - -template -auto makeEmptyTable(const char* name, framework::pack p) -{ - TableBuilder b; - [[maybe_unused]] auto writer = b.cursor(p); - b.setLabel(name); - return b.finalize(); -} - -std::shared_ptr spawnerHelper(std::shared_ptr const& fullTable, std::shared_ptr newSchema, size_t nColumns, - expressions::Projector* projectors, const char* name, std::shared_ptr& projector); - -std::shared_ptr spawnerHelper(std::shared_ptr const& fullTable, std::shared_ptr newSchema, - const char* name, size_t nColumns, - const std::shared_ptr& projector); - -/// Expression-based column generator to materialize columns -template - requires(soa::has_configurable_extension::metadata>) -auto spawner(std::shared_ptr const& fullTable, const char* name, o2::framework::expressions::Projector* projectors, std::shared_ptr& projector, std::shared_ptr const& schema) -{ - using placeholders_pack_t = typename o2::aod::MetadataTrait::metadata::placeholders_pack_t; - if (fullTable->num_rows() == 0) { - return makeEmptyTable(name, placeholders_pack_t{}); - } - return spawnerHelper(fullTable, schema, framework::pack_size(placeholders_pack_t{}), projectors, name, projector); -} - -template - requires(soa::has_configurable_extension::metadata>) -auto spawner(std::vector>&& tables, const char* name, o2::framework::expressions::Projector* projectors, std::shared_ptr& projector, std::shared_ptr const& schema) -{ - auto fullTable = soa::ArrowHelpers::joinTables(std::move(tables), std::span{o2::aod::MetadataTrait::metadata::base_table_t::originalLabels}); - return spawner(fullTable, name, projectors, projector, schema); -} - -template - requires(soa::has_extension::metadata> && !soa::has_configurable_extension::metadata>) -auto spawner(std::shared_ptr const& fullTable, const char* name, expressions::Projector* projectors, std::shared_ptr& projector, std::shared_ptr const& schema) -{ - using expression_pack_t = typename o2::aod::MetadataTrait::metadata::expression_pack_t; - if (fullTable->num_rows() == 0) { - return makeEmptyTable(name, expression_pack_t{}); - } - return spawnerHelper(fullTable, schema, framework::pack_size(expression_pack_t{}), projectors, name, projector); -} - -template - requires(soa::has_extension::metadata> && !soa::has_configurable_extension::metadata>) -auto spawner(std::vector>&& tables, const char* name, expressions::Projector* projectors, std::shared_ptr& projector, std::shared_ptr const& schema) -{ - auto fullTable = soa::ArrowHelpers::joinTables(std::move(tables), std::span{o2::aod::MetadataTrait::metadata::base_table_t::originalLabels}); - return spawner(fullTable, name, projectors, projector, schema); -} - -template -auto spawner(framework::pack, std::vector>&& tables, const char* name, expressions::Projector* projectors, std::shared_ptr& projector, std::shared_ptr const& schema) -{ - std::array labels{"original"}; - auto fullTable = soa::ArrowHelpers::joinTables(std::move(tables), std::span{labels}); - if (fullTable->num_rows() == 0) { - return makeEmptyTable(name, framework::pack{}); - } - return spawnerHelper(fullTable, schema, sizeof...(C), projectors, name, projector); -} - -template -using iterator_tuple_t = std::tuple; } // namespace o2::framework #endif // FRAMEWORK_TABLEBUILDER_H diff --git a/Framework/Core/src/AODReaderHelpers.cxx b/Framework/Core/src/AODReaderHelpers.cxx deleted file mode 100644 index 09ec16a93b087..0000000000000 --- a/Framework/Core/src/AODReaderHelpers.cxx +++ /dev/null @@ -1,280 +0,0 @@ -// Copyright 2019-2020 CERN and copyright holders of ALICE O2. -// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. -// All rights not expressly granted are reserved. -// -// This software is distributed under the terms of the GNU General Public -// License v3 (GPL Version 3), copied verbatim in the file "COPYING". -// -// In applying this license CERN does not waive the privileges and immunities -// granted to it by virtue of its status as an Intergovernmental Organization -// or submit itself to any jurisdiction. - -#include "Framework/AODReaderHelpers.h" -#include "Framework/AnalysisHelpers.h" -#include "Framework/AnalysisDataModelHelpers.h" -#include "Framework/ExpressionHelpers.h" -#include "Framework/DataProcessingHelpers.h" -#include "Framework/AlgorithmSpec.h" -#include "Framework/ControlService.h" -#include "Framework/CallbackService.h" -#include "Framework/EndOfStreamContext.h" -#include "Framework/DataSpecUtils.h" -#include "ExpressionJSONHelpers.h" -#include "Framework/ConfigContext.h" -#include "Framework/AnalysisContext.h" - -#include - -#include -#include -#include - -#include -#include -#include -#include -#include - -namespace o2::framework::readers -{ -auto setEOSCallback(InitContext& ic) -{ - ic.services().get().set( - [](EndOfStreamContext& eosc) { - auto& control = eosc.services().get(); - control.endOfStream(); - control.readyToQuit(QuitRequest::Me); - }); -} - -template refs> -static inline auto extractOriginals(ProcessingContext& pc) -{ - return [&](std::index_sequence) -> std::vector> { - return {pc.inputs().get(o2::aod::label())->asArrowTable()...}; - }(std::make_index_sequence()); -} -namespace -{ -template - requires(D::exclusive) -auto make_build(D metadata, InputSpec const& input, ProcessingContext& pc) -{ - using metadata_t = decltype(metadata); - using Key = typename metadata_t::Key; - using index_pack_t = typename metadata_t::index_pack_t; - constexpr auto sources = metadata_t::sources; - return o2::framework::IndexBuilder::indexBuilder(input.binding.c_str(), - extractOriginals(pc), - index_pack_t{}); -} - -template - requires(!D::exclusive) -auto make_build(D metadata, InputSpec const& input, ProcessingContext& pc) -{ - using metadata_t = decltype(metadata); - using Key = typename metadata_t::Key; - using index_pack_t = typename metadata_t::index_pack_t; - constexpr auto sources = metadata_t::sources; - return o2::framework::IndexBuilder::indexBuilder(input.binding.c_str(), - extractOriginals(pc), - index_pack_t{}); -} -} // namespace - -AlgorithmSpec AODReaderHelpers::indexBuilderCallback(std::vector& requested) -{ - return AlgorithmSpec::InitCallback{[requested](InitContext& /*ic*/) { - return [requested](ProcessingContext& pc) { - auto outputs = pc.outputs(); - // spawn tables - for (auto& input : requested) { - auto&& [origin, description, version] = DataSpecUtils::asConcreteDataMatcher(input); - if (description == header::DataDescription{"MA_RN2_EX"}) { - outputs.adopt(Output{origin, description, version}, make_build(o2::aod::Run2MatchedExclusiveMetadata{}, input, pc)); - } else if (description == header::DataDescription{"MA_RN2_SP"}) { - outputs.adopt(Output{origin, description, version}, make_build(o2::aod::Run2MatchedSparseMetadata{}, input, pc)); - } else if (description == header::DataDescription{"MA_RN3_EX"}) { - outputs.adopt(Output{origin, description, version}, make_build(o2::aod::Run3MatchedExclusiveMetadata{}, input, pc)); - } else if (description == header::DataDescription{"MA_RN3_SP"}) { - outputs.adopt(Output{origin, description, version}, make_build(o2::aod::Run3MatchedSparseMetadata{}, input, pc)); - } else if (description == header::DataDescription{"MA_BCCOL_EX"}) { - outputs.adopt(Output{origin, description, version}, make_build(o2::aod::MatchedBCCollisionsExclusiveMetadata{}, input, pc)); - } else if (description == header::DataDescription{"MA_BCCOL_SP"}) { - outputs.adopt(Output{origin, description, version}, make_build(o2::aod::MatchedBCCollisionsSparseMetadata{}, input, pc)); - } else if (description == header::DataDescription{"MA_BCCOLS_EX"}) { - outputs.adopt(Output{origin, description, version}, make_build(o2::aod::MatchedBCCollisionsExclusiveMultiMetadata{}, input, pc)); - } else if (description == header::DataDescription{"MA_BCCOLS_SP"}) { - outputs.adopt(Output{origin, description, version}, make_build(o2::aod::MatchedBCCollisionsSparseMultiMetadata{}, input, pc)); - } else if (description == header::DataDescription{"MA_RN3_BC_SP"}) { - outputs.adopt(Output{origin, description, version}, make_build(o2::aod::Run3MatchedToBCSparseMetadata{}, input, pc)); - } else if (description == header::DataDescription{"MA_RN3_BC_EX"}) { - outputs.adopt(Output{origin, description, version}, make_build(o2::aod::Run3MatchedToBCExclusiveMetadata{}, input, pc)); - } else if (description == header::DataDescription{"MA_RN2_BC_SP"}) { - outputs.adopt(Output{origin, description, version}, make_build(o2::aod::Run2MatchedToBCSparseMetadata{}, input, pc)); - } else { - throw std::runtime_error("Not an index table"); - } - } - }; - }}; -} - -namespace -{ -template -auto make_spawn(InputSpec const& input, ProcessingContext& pc) -{ - using metadata_t = o2::aod::MetadataTrait::metadata; - constexpr auto sources = metadata_t::sources; - static std::shared_ptr projector = nullptr; - static std::shared_ptr schema = std::make_shared(o2::soa::createFieldsFromColumns(typename metadata_t::expression_pack_t{})); - static auto projectors = [](framework::pack) -> std::array - { - return {{std::move(C::Projector())...}}; - } - (typename metadata_t::expression_pack_t{}); - return o2::framework::spawner(extractOriginals(pc), input.binding.c_str(), projectors.data(), projector, schema); -} - -struct Maker { - std::string binding; - std::vector labels; - std::vector> expressions; - std::shared_ptr projector = nullptr; - std::shared_ptr schema; - - header::DataOrigin origin; - header::DataDescription description; - header::DataHeader::SubSpecificationType version; - - std::shared_ptr make(ProcessingContext& pc) - { - std::vector> originals; - for (auto const& label : labels) { - originals.push_back(pc.inputs().get(label)->asArrowTable()); - } - auto fullTable = soa::ArrowHelpers::joinTables(std::move(originals), std::span{labels.begin(), labels.size()}); - if (fullTable->num_rows() == 0) { - return arrow::Table::MakeEmpty(schema).ValueOrDie(); - } - if (projector == nullptr) { - auto s = gandiva::Projector::Make( - fullTable->schema(), - expressions, - &projector); - if (!s.ok()) { - throw o2::framework::runtime_error_f("Failed to create projector: %s", s.ToString().c_str()); - } - } - - return spawnerHelper(fullTable, schema, binding.c_str(), schema->num_fields(), projector); - } -}; - -struct Spawnable { - std::string binding; - std::vector labels; - std::vector projectors; - std::vector> expressions; - std::shared_ptr outputSchema; - std::shared_ptr inputSchema; - - header::DataOrigin origin; - header::DataDescription description; - header::DataHeader::SubSpecificationType version; - - Spawnable(InputSpec const& spec) - : binding{spec.binding} - { - auto&& [origin_, description_, version_] = DataSpecUtils::asConcreteDataMatcher(spec); - origin = origin_; - description = description_; - version = version_; - auto loc = std::find_if(spec.metadata.begin(), spec.metadata.end(), [](ConfigParamSpec const& cps) { return cps.name.compare("projectors") == 0; }); - std::stringstream iws(loc->defaultValue.get()); - projectors = ExpressionJSONHelpers::read(iws); - - loc = std::find_if(spec.metadata.begin(), spec.metadata.end(), [](ConfigParamSpec const& cps) { return cps.name.compare("schema") == 0; }); - iws.clear(); - iws.str(loc->defaultValue.get()); - outputSchema = ArrowJSONHelpers::read(iws); - - for (auto& i : spec.metadata) { - if (i.name.starts_with("input:")) { - labels.emplace_back(i.name.substr(6)); - } - } - - std::vector> fields; - for (auto& p : projectors) { - expressions::walk(p.node.get(), - [&fields](expressions::Node* n) mutable { - if (n->self.index() == 1) { - auto& b = std::get(n->self); - if (std::find_if(fields.begin(), fields.end(), [&b](std::shared_ptr const& field) { return field->name() == b.name; }) == fields.end()) { - fields.emplace_back(std::make_shared(b.name, expressions::concreteArrowType(b.type))); - } - } - }); - } - inputSchema = std::make_shared(fields); - - int i = 0; - for (auto& p : projectors) { - expressions.push_back( - expressions::makeExpression( - expressions::createExpressionTree( - expressions::createOperations(p), - inputSchema), - outputSchema->field(i))); - ++i; - } - } - - std::shared_ptr makeProjector() - { - return expressions::createProjectorHelper(projectors.size(), projectors.data(), inputSchema, outputSchema->fields()); - } - - Maker createMaker() - { - o2::framework::addLabelToSchema(outputSchema, binding.c_str()); - return { - binding, - labels, - expressions, - nullptr, - outputSchema, - origin, - description, - version}; - } -}; - -} // namespace - -AlgorithmSpec AODReaderHelpers::aodSpawnerCallback(/*std::vector& requested*/ ConfigContext const& ctx) -{ - auto& ac = ctx.services().get(); - return AlgorithmSpec::InitCallback{[requested = ac.spawnerInputs](InitContext& /*ic*/) { - std::vector spawnables; - for (auto& i : requested) { - spawnables.emplace_back(i); - } - std::vector makers; - for (auto& s : spawnables) { - makers.push_back(s.createMaker()); - } - - return [makers](ProcessingContext& pc) mutable { - auto outputs = pc.outputs(); - for (auto& maker : makers) { - outputs.adopt(Output{maker.origin, maker.description, maker.version}, maker.make(pc)); - } - }; - }}; -} - -} // namespace o2::framework::readers diff --git a/Framework/Core/src/AnalysisHelpers.cxx b/Framework/Core/src/AnalysisHelpers.cxx index 4f78cc42f3f98..b8e0348d5df9c 100644 --- a/Framework/Core/src/AnalysisHelpers.cxx +++ b/Framework/Core/src/AnalysisHelpers.cxx @@ -8,11 +8,147 @@ // In applying this license CERN does not waive the privileges and immunities // granted to it by virtue of its status as an Intergovernmental Organization // or submit itself to any jurisdiction. +#include "Framework/AnalysisHelpers.h" #include "Framework/ExpressionHelpers.h" #include "ExpressionJSONHelpers.h" +#include "IndexJSONHelpers.h" + +namespace o2::soa +{ +std::vector IndexBuilder::makeBuilders(std::vector>&& tables, std::vector const& records) +{ + std::vector builders; + builders.reserve(records.size()); + auto pool = arrow::default_memory_pool(); + builders.emplace_back(IndexKind::IdxSelf, records[0].pos, pool); + if (records[0].pos >= 0) { + std::get(builders[0].builder).keyIndex = std::make_unique(tables[0]->column(records[0].pos)); + } + + for (auto i = 1U; i < records.size(); ++i) { + builders.emplace_back(records[i].kind, records[i].pos, pool, records[i].pos >= 0 ? tables[i]->column(records[i].pos) : nullptr); + } + + return builders; +} + +void IndexBuilder::resetBuilders(std::vector& builders, std::vector>&& tables) +{ + for (auto i = 0U; i < builders.size(); ++i) { + builders[i].reset(builders[i].mColumnPos >= 0 ? tables[i]->column(builders[i].mColumnPos) : nullptr); + } + + if (builders[0].mColumnPos >= 0) { + std::get(builders[0].builder).keyIndex = std::make_unique(tables[0]->column(builders[0].mColumnPos)); + } +} + +std::shared_ptr IndexBuilder::materialize(std::vector& builders, std::vector>&& tables, std::vector const& records, std::shared_ptr const& schema, bool exclusive) +{ + auto size = tables[0]->num_rows(); + if (builders.empty()) { + builders = makeBuilders(std::move(tables), records); + } else { + resetBuilders(builders, std::move(tables)); + } + + std::vector finds; + finds.resize(builders.size()); + for (int64_t counter = 0; counter < size; ++counter) { + int64_t idx = -1; + if (std::get(builders[0].builder).keyIndex == nullptr) { + idx = counter; + } else { + idx = std::get(builders[0].builder).keyIndex->valueAt(counter); + } + for (auto i = 0U; i < builders.size(); ++i) { + finds[i] = builders[i].find(idx); + } + if (exclusive) { + if (std::none_of(finds.begin(), finds.end(), [](bool const x) { return x == false; })) { + builders[0].fill(counter); + for (auto i = 1U; i < builders.size(); ++i) { + builders[i].fill(idx); + } + } + } else { + builders[0].fill(counter); + for (auto i = 1U; i < builders.size(); ++i) { + builders[i].fill(idx); + } + } + } + + std::vector> arrays; + arrays.reserve(builders.size()); + for (auto& builder : builders) { + arrays.push_back(builder.result()); + } + + return arrow::Table::Make(schema, arrays); +} +} // namespace o2::soa namespace o2::framework { +std::shared_ptr makeEmptyTableImpl(const char* name, std::shared_ptr& schema) +{ + schema = schema->WithMetadata(std::make_shared(std::vector{std::string{"label"}}, std::vector{std::string{name}})); + return arrow::Table::MakeEmpty(schema).ValueOrDie(); +} + +std::shared_ptr spawnerHelper(std::shared_ptr const& fullTable, std::shared_ptr newSchema, size_t nColumns, + expressions::Projector* projectors, const char* name, + std::shared_ptr& projector) +{ + if (projector == nullptr) { + projector = framework::expressions::createProjectorHelper(nColumns, projectors, fullTable->schema(), newSchema->fields()); + } + + return spawnerHelper(fullTable, newSchema, name, nColumns, projector); +} + +std::shared_ptr spawnerHelper(std::shared_ptr const& fullTable, std::shared_ptr newSchema, + const char* name, size_t nColumns, + std::shared_ptr const& projector) +{ + arrow::TableBatchReader reader(*fullTable); + std::shared_ptr batch; + arrow::ArrayVector v; + std::vector chunks; + chunks.resize(nColumns); + std::vector> arrays; + + while (true) { + auto s = reader.ReadNext(&batch); + if (!s.ok()) { + throw runtime_error_f("Cannot read batches from the source table to spawn %s: %s", name, s.ToString().c_str()); + } + if (batch == nullptr) { + break; + } + try { + s = projector->Evaluate(*batch, arrow::default_memory_pool(), &v); + if (!s.ok()) { + throw runtime_error_f("Cannot apply projector to the source table of %s: %s", name, s.ToString().c_str()); + } + } catch (std::exception& e) { + throw runtime_error_f("Cannot apply projector to the source table of %s: exception caught: %s", name, e.what()); + } + + for (auto i = 0U; i < nColumns; ++i) { + chunks[i].emplace_back(v.at(i)); + } + } + + arrays.reserve(nColumns); + for (auto i = 0U; i < nColumns; ++i) { + arrays.push_back(std::make_shared(chunks[i])); + } + + return arrow::Table::Make(newSchema, arrays); +} + void initializePartitionCaches(std::set const& hashes, std::shared_ptr const& schema, expressions::Filter const& filter, gandiva::NodePtr& tree, gandiva::FilterPtr& gfilter) { if (tree == nullptr) { @@ -35,10 +171,49 @@ std::string serializeProjectors(std::vector& return osm.str(); } -std::string serializeSchema(std::shared_ptr& schema) +std::string serializeSchema(std::shared_ptr schema) { std::stringstream osm; ArrowJSONHelpers::write(osm, schema); return osm.str(); } + +std::string serializeIndexRecords(std::vector& irs) +{ + std::stringstream osm; + IndexJSONHelpers::write(osm, irs); + return osm.str(); +} + +std::vector> extractSources(ProcessingContext& pc, std::vector const& labels) +{ + std::vector> tables; + for (auto const& label : labels) { + tables.emplace_back(pc.inputs().get(label.c_str())->asArrowTable()); + } + return tables; +} + +std::shared_ptr Spawner::materialize(ProcessingContext& pc) const +{ + auto tables = extractSources(pc, labels); + auto fullTable = soa::ArrowHelpers::joinTables(std::move(tables), std::span{labels.begin(), labels.size()}); + if (fullTable->num_rows() == 0) { + return arrow::Table::MakeEmpty(schema).ValueOrDie(); + } + + return spawnerHelper(fullTable, schema, binding.c_str(), schema->num_fields(), projector); +} + +std::shared_ptr Builder::materialize(ProcessingContext& pc) +{ + if (builders == nullptr) { + builders = std::make_shared>(); + builders->reserve(records.size()); + } + std::shared_ptr result; + auto tables = extractSources(pc, labels); + result = o2::soa::IndexBuilder::materialize(*builders.get(), std::move(tables), records, outputSchema, exclusive); + return result; +} } // namespace o2::framework diff --git a/Framework/Core/src/AnalysisSupportHelpers.cxx b/Framework/Core/src/AnalysisSupportHelpers.cxx index 7cfab22885671..b5c898faa515a 100644 --- a/Framework/Core/src/AnalysisSupportHelpers.cxx +++ b/Framework/Core/src/AnalysisSupportHelpers.cxx @@ -219,7 +219,6 @@ void AnalysisSupportHelpers::addMissingOutputsToAnalysisCCDBFetcher( // FIXME: good enough for now... for (auto& i : input.metadata) { if ((i.type == VariantType::String) && (i.name.find("input:") != std::string::npos)) { - auto value = i.defaultValue.get(); auto spec = DataSpecUtils::fromMetadataString(i.defaultValue.get()); auto j = std::find_if(publisher.inputs.begin(), publisher.inputs.end(), [&](auto x) { return x.binding == spec.binding; }); if (j == publisher.inputs.end()) { diff --git a/Framework/Core/src/ArrowSupport.cxx b/Framework/Core/src/ArrowSupport.cxx index 4150fda9f63f1..cf2d364027932 100644 --- a/Framework/Core/src/ArrowSupport.cxx +++ b/Framework/Core/src/ArrowSupport.cxx @@ -10,7 +10,6 @@ // or submit itself to any jurisdiction. #include "ArrowSupport.h" -#include "Framework/AODReaderHelpers.h" #include "Framework/ArrowContext.h" #include "Framework/ArrowTableSlicingCache.h" #include "Framework/DataProcessor.h" @@ -619,7 +618,7 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() builder->outputs.clear(); // replace AlgorithmSpec // FIXME: it should be made more generic, so it does not need replacement... - builder->algorithm = readers::AODReaderHelpers::indexBuilderCallback(ac.requestedIDXs); + builder->algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkOnDemandTablesSupport", "IndexTableBuilder", ctx); // readers::AODReaderHelpers::indexBuilderCallback(ctx); AnalysisSupportHelpers::addMissingOutputsToBuilder(ac.requestedIDXs, ac.requestedAODs, ac.requestedDYNs, *builder); } @@ -654,7 +653,7 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() spawner->inputs.clear(); // replace AlgorithmSpec // FIXME: it should be made more generic, so it does not need replacement... - spawner->algorithm = readers::AODReaderHelpers::aodSpawnerCallback(ctx); + spawner->algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkOnDemandTablesSupport", "ExtendedTableSpawner", ctx); AnalysisSupportHelpers::addMissingOutputsToSpawner({}, ac.spawnerInputs, ac.requestedAODs, *spawner); } diff --git a/Framework/Core/src/ExpressionJSONHelpers.cxx b/Framework/Core/src/ExpressionJSONHelpers.cxx index 8d4907a721f7e..a6e19875381cd 100644 --- a/Framework/Core/src/ExpressionJSONHelpers.cxx +++ b/Framework/Core/src/ExpressionJSONHelpers.cxx @@ -637,6 +637,18 @@ void o2::framework::ExpressionJSONHelpers::write(std::ostream& o, std::vector arrowDataTypeFromId(atype::type type, int list_size = 1, atype::type element = atype::NA) +{ + switch (list_size) { + case -1: + return arrow::list(expressions::concreteArrowType(element)); + case 1: + return expressions::concreteArrowType(type); + default: + return arrow::fixed_size_list(expressions::concreteArrowType(element), list_size); + } +} + struct SchemaReader : public rapidjson::BaseReaderHandler, SchemaReader> { using Ch = rapidjson::UTF8<>::Ch; using SizeType = rapidjson::SizeType; @@ -658,6 +670,8 @@ struct SchemaReader : public rapidjson::BaseReaderHandler, Sch std::string name; atype::type type; + atype::type element; + int list_size = 1; SchemaReader() { @@ -667,7 +681,7 @@ struct SchemaReader : public rapidjson::BaseReaderHandler, Sch bool StartArray() { - debug << "Starting array" << std::endl; + debug << "StartArray()" << std::endl; if (states.top() == State::IN_START && currentKey.compare("fields") == 0) { states.push(State::IN_LIST); return true; @@ -678,7 +692,7 @@ struct SchemaReader : public rapidjson::BaseReaderHandler, Sch bool EndArray(SizeType) { - debug << "Ending array" << std::endl; + debug << "EndArray()" << std::endl; if (states.top() == State::IN_LIST) { // finalize schema schema = std::make_shared(fields); @@ -706,6 +720,12 @@ struct SchemaReader : public rapidjson::BaseReaderHandler, Sch if (currentKey.compare("type") == 0) { return true; } + if (currentKey.compare("size") == 0) { + return true; + } + if (currentKey.compare("element") == 0) { + return true; + } } states.push(State::IN_ERROR); @@ -721,6 +741,9 @@ struct SchemaReader : public rapidjson::BaseReaderHandler, Sch if (states.top() == State::IN_LIST) { states.push(State::IN_FIELD); + list_size = 1; + element = atype::NA; + type = atype::NA; return true; } @@ -734,7 +757,7 @@ struct SchemaReader : public rapidjson::BaseReaderHandler, Sch if (states.top() == State::IN_FIELD) { states.pop(); // add a field - fields.emplace_back(std::make_shared(name, expressions::concreteArrowType(type))); + fields.emplace_back(std::make_shared(name, arrowDataTypeFromId(type, list_size, element))); return true; } @@ -754,6 +777,14 @@ struct SchemaReader : public rapidjson::BaseReaderHandler, Sch type = (atype::type)i; return true; } + if (currentKey.compare("element") == 0) { + element = (atype::type)i; + return true; + } + if (currentKey.compare("size") == 0) { + list_size = i; + return true; + } } states.push(State::IN_ERROR); @@ -777,6 +808,10 @@ struct SchemaReader : public rapidjson::BaseReaderHandler, Sch bool Int(int i) { debug << "Int(" << i << ")" << std::endl; + if (states.top() == State::IN_FIELD && currentKey.compare("size") == 0) { + list_size = i; + return true; + } return Uint(i); } }; @@ -791,7 +826,7 @@ std::shared_ptr o2::framework::ArrowJSONHelpers::read(std::istrea bool ok = reader.Parse(isw, sreader); if (!ok) { - throw framework::runtime_error_f("Cannot parse serialized Expression, error: %s at offset: %d", rapidjson::GetParseError_En(reader.GetParseErrorCode()), reader.GetErrorOffset()); + throw framework::runtime_error_f("Cannot parse serialized Schema, error: %s at offset: %d", rapidjson::GetParseError_En(reader.GetParseErrorCode()), reader.GetErrorOffset()); } return sreader.schema; } @@ -804,6 +839,20 @@ void writeSchema(rapidjson::Writer& w, arrow::Schema* w.StartObject(); w.Key("name"); w.String(f->name().c_str()); + auto fixedList = dynamic_cast(f->type().get()); + if (fixedList != nullptr) { + w.Key("size"); + w.Int(fixedList->list_size()); + w.Key("element"); + w.Int(fixedList->field(0)->type()->id()); + } + auto varList = dynamic_cast(f->type().get()); + if (varList != nullptr) { + w.Key("size"); + w.Int(-1); + w.Key("element"); + w.Int(varList->field(0)->type()->id()); + } w.Key("type"); w.Int(f->type()->id()); w.EndObject(); diff --git a/Framework/Core/src/Expressions.cxx b/Framework/Core/src/Expressions.cxx index 05a3462d6e4da..43143f781ddf4 100644 --- a/Framework/Core/src/Expressions.cxx +++ b/Framework/Core/src/Expressions.cxx @@ -1348,4 +1348,20 @@ OpNode Parser::opFromToken(std::string const& token) return OpNode{static_cast(std::distance(mapping.begin(), locate))}; } +std::vector> materializeProjectors(std::vector const& projectors, std::shared_ptr const& inputSchema, std::vector> outputFields) +{ + std::vector> expressions; + int i = 0; + for (auto& p : projectors) { + expressions.push_back( + expressions::makeExpression( + expressions::createExpressionTree( + expressions::createOperations(p), + inputSchema), + outputFields[i])); + ++i; + } + return expressions; +} + } // namespace o2::framework::expressions diff --git a/Framework/Core/src/IndexBuilderHelpers.cxx b/Framework/Core/src/IndexBuilderHelpers.cxx index 52d6080690fe1..d7231f72cbee8 100644 --- a/Framework/Core/src/IndexBuilderHelpers.cxx +++ b/Framework/Core/src/IndexBuilderHelpers.cxx @@ -12,6 +12,7 @@ #include "Framework/RuntimeError.h" #include "Framework/IndexBuilderHelpers.h" #include "Framework/CompilerBuiltins.h" +#include "Framework/VariantHelpers.h" #include #include #include @@ -22,130 +23,87 @@ namespace o2::framework { void cannotBuildAnArray() { - throw runtime_error("Cannot build an array"); + throw framework::runtime_error("Cannot finish an array"); +} + +void cannotCreateIndexBuilder() +{ + throw framework::runtime_error("Cannot create index column builder: invalid kind of index column"); } ChunkedArrayIterator::ChunkedArrayIterator(std::shared_ptr source) - : mSource{source} + : mSource{source}, + mSourceSize{(size_t)source->length()} { mCurrentArray = getCurrentArray(); mCurrent = reinterpret_cast(mCurrentArray->values()->data()) + mOffset; mLast = mCurrent + mCurrentArray->length(); } -SelfIndexColumnBuilder::SelfIndexColumnBuilder(const char* name, arrow::MemoryPool* pool) - : mColumnName{name}, - mArrowType{arrow::int32()} +void ChunkedArrayIterator::reset(std::shared_ptr& source) { - auto status = arrow::MakeBuilder(pool, arrow::int32(), &mBuilder); - if (!status.ok()) { - throw runtime_error("Cannot create array builder!"); - } -} + mPosition = 0; + mChunk = 0; + mOffset = 0; + mCurrentArray = nullptr; + mCurrent = nullptr; + mLast = nullptr; + mFirstIndex = 0; + mSourceSize = 0; -std::shared_ptr SelfIndexColumnBuilder::field() const -{ - return std::make_shared(mColumnName, mArrowType); + mSource = source; + mSourceSize = (size_t)source->length(); + mCurrentArray = getCurrentArray(); + mCurrent = reinterpret_cast(mCurrentArray->values()->data()) + mOffset; + mLast = mCurrent + mCurrentArray->length(); } -IndexColumnBuilder::IndexColumnBuilder(std::shared_ptr source, const char* name, int listSize, arrow::MemoryPool* pool) - : SelfIndexColumnBuilder{name, pool}, - ChunkedArrayIterator{source}, - mListSize{listSize}, - mSourceSize{(size_t)source->length()} +SelfBuilder::SelfBuilder(arrow::MemoryPool* pool) { - switch (mListSize) { - case 1: { - mValueBuilder = mBuilder.get(); - mArrowType = arrow::int32(); - }; break; - case 2: { - if (preSlice().ok()) { - mListBuilder = std::make_unique(pool, std::move(mBuilder), mListSize); - mValueBuilder = static_cast(mListBuilder.get())->value_builder(); - mArrowType = arrow::fixed_size_list(arrow::int32(), 2); - } else { - throw runtime_error("Cannot pre-slice an array"); - } - }; break; - case -1: { - if (preFind().ok()) { - mListBuilder = std::make_unique(pool, std::move(mBuilder)); - mValueBuilder = static_cast(mListBuilder.get())->value_builder(); - mArrowType = arrow::list(arrow::int32()); - } else { - throw runtime_error("Cannot pre-find array groups"); - } - }; break; - default: - throw runtime_error_f("Invalid list size for index column: %d", mListSize); + auto status = arrow::MakeBuilder(pool, arrow::int32(), &mBuilder); + if (!status.ok()) { + throw framework::runtime_error("Cannot create array builder for the self-index!"); } } - -arrow::Status IndexColumnBuilder::preSlice() +// static_cast(this)->reset(pool); +void SelfBuilder::reset(std::shared_ptr) { - arrow::Datum value_counts; - auto options = arrow::compute::ScalarAggregateOptions::Defaults(); - ARROW_ASSIGN_OR_RAISE(value_counts, arrow::compute::CallFunction("value_counts", {mSource}, &options)); - auto pair = static_cast(value_counts.array()); - mValuesArrow = std::make_shared>(pair.field(0)->data()); - mCounts = std::make_shared>(pair.field(1)->data()); - return arrow::Status::OK(); + mBuilder->Reset(); + keyIndex = nullptr; } -arrow::Status IndexColumnBuilder::preFind() +void SelfBuilder::fill(int idx) { - arrow::Datum max; - auto options = arrow::compute::ScalarAggregateOptions::Defaults(); - ARROW_ASSIGN_OR_RAISE(max, arrow::compute::CallFunction("max", {mSource}, &options)); - auto maxValue = std::dynamic_pointer_cast(max.scalar())->value; - mIndices.resize(maxValue + 1); - - auto row = 0; - for (auto i = 0; i < mSource->length(); ++i) { - auto v = valueAt(i); - if (v >= 0) { - mValues.emplace_back(v); - mIndices[v].push_back(row); - } - ++row; - } - std::sort(mValues.begin(), mValues.end()); - - return arrow::Status::OK(); + (void)static_cast(mBuilder.get())->Append(idx); } -std::shared_ptr IndexColumnBuilder::resultSingle() const +std::shared_ptr SelfBuilder::result() const { std::shared_ptr array; - auto status = static_cast(mValueBuilder)->Finish(&array); + auto status = static_cast(mBuilder.get())->Finish(&array); if (!status.ok()) { - throw runtime_error("Cannot build an array"); + cannotBuildAnArray(); } + return std::make_shared(array); } -std::shared_ptr IndexColumnBuilder::resultSlice() const +SingleBuilder::SingleBuilder(std::shared_ptr source, arrow::MemoryPool* pool) + : ChunkedArrayIterator{source} { - std::shared_ptr array; - auto status = static_cast(mListBuilder.get())->Finish(&array); + auto status = arrow::MakeBuilder(pool, arrow::int32(), &mBuilder); if (!status.ok()) { - throw runtime_error("Cannot build an array"); + throw framework::runtime_error("Cannot create array builder for the single-valued index!"); } - return std::make_shared(array); } -std::shared_ptr IndexColumnBuilder::resultMulti() const +void SingleBuilder::reset(std::shared_ptr source) { - std::shared_ptr array; - auto status = static_cast(mListBuilder.get())->Finish(&array); - if (!status.ok()) { - throw runtime_error("Cannot build an array"); - } - return std::make_shared(array); + static_cast(this)->reset(source); + mBuilder->Reset(); } -bool IndexColumnBuilder::findSingle(int idx) +bool SingleBuilder::find(int idx) { auto count = mSourceSize - mPosition; while (count > 0) { @@ -166,13 +124,60 @@ bool IndexColumnBuilder::findSingle(int idx) return (mPosition < mSourceSize && valueAt(mPosition) == idx); } -bool IndexColumnBuilder::findSlice(int idx) +void SingleBuilder::fill(int idx) { - auto count = mValuesArrow->length() - mValuePos; + if (mPosition < mSourceSize && valueAt(mPosition) == idx) { + (void)static_cast(mBuilder.get())->Append((int)mPosition); + } else { + (void)static_cast(mBuilder.get())->Append(-1); + } +} + +std::shared_ptr SingleBuilder::result() const +{ + std::shared_ptr array; + auto status = static_cast(mBuilder.get())->Finish(&array); + if (!status.ok()) { + cannotBuildAnArray(); + } + return std::make_shared(array); +} + +SliceBuilder::SliceBuilder(std::shared_ptr source, arrow::MemoryPool* pool) + : ChunkedArrayIterator{source} +{ + if (!preSlice().ok()) { + throw framework::runtime_error("Cannot pre-slice the source for slice-index building"); + } + + std::unique_ptr builder; + auto status = arrow::MakeBuilder(pool, arrow::int32(), &builder); + if (!status.ok()) { + throw framework::runtime_error("Cannot create array for the slice-index builder!"); + } + mListBuilder = std::make_unique(pool, std::move(builder), 2); + mValueBuilder = static_cast(mListBuilder.get())->value_builder(); +} + +void SliceBuilder::reset(std::shared_ptr source) +{ + static_cast(this)->reset(source); + if (!preSlice().ok()) { + throw framework::runtime_error("Cannot pre-slice the source for slice-index building"); + } + mListBuilder->Reset(); + mValues = nullptr; + mCounts = nullptr; + mValuePos = 0; +} + +bool SliceBuilder::find(int idx) +{ + auto count = mValues->length() - mValuePos; while (count > 0) { auto step = count / 2; mValuePos += step; - if (mValuesArrow->Value(mValuePos) <= idx) { + if (mValues->Value(mValuePos) <= idx) { count -= step + 1; } else { mValuePos -= step; @@ -180,32 +185,17 @@ bool IndexColumnBuilder::findSlice(int idx) } } - if (mValuePos < mValuesArrow->length() && mValuesArrow->Value(mValuePos) <= idx) { + if (mValuePos < mValues->length() && mValues->Value(mValuePos) <= idx) { ++mPosition; } - return (mValuePos < mValuesArrow->length() && mValuesArrow->Value(mValuePos) == idx); -} - -bool IndexColumnBuilder::findMulti(int idx) -{ - return (std::find(mValues.begin(), mValues.end(), idx) != mValues.end()); -} - -void IndexColumnBuilder::fillSingle(int idx) -{ - // entry point - if (mPosition < mSourceSize && valueAt(mPosition) == idx) { - (void)static_cast(mValueBuilder)->Append((int)mPosition); - } else { - (void)static_cast(mValueBuilder)->Append(-1); - } + return (mValuePos < mValues->length() && mValues->Value(mValuePos) == idx); } -void IndexColumnBuilder::fillSlice(int idx) +void SliceBuilder::fill(int idx) { int data[2] = {-1, -1}; - if (mValuePos < mValuesArrow->length() && mValuesArrow->Value(mValuePos) == idx) { + if (mValuePos < mValues->length() && mValues->Value(mValuePos) == idx) { for (auto i = 0; i < mValuePos; ++i) { data[0] += mCounts->Value(i); } @@ -216,7 +206,60 @@ void IndexColumnBuilder::fillSlice(int idx) (void)static_cast(mValueBuilder)->AppendValues(data, 2); } -void IndexColumnBuilder::fillMulti(int idx) +std::shared_ptr SliceBuilder::result() const +{ + std::shared_ptr array; + auto status = static_cast(mListBuilder.get())->Finish(&array); + if (!status.ok()) { + cannotBuildAnArray(); + } + return std::make_shared(array); +} + +arrow::Status SliceBuilder::SliceBuilder::preSlice() +{ + arrow::Datum value_counts; + auto options = arrow::compute::ScalarAggregateOptions::Defaults(); + ARROW_ASSIGN_OR_RAISE(value_counts, arrow::compute::CallFunction("value_counts", {mSource}, &options)); + auto pair = static_cast(value_counts.array()); + mValues = std::make_shared>(pair.field(0)->data()); + mCounts = std::make_shared>(pair.field(1)->data()); + return arrow::Status::OK(); +} + +ArrayBuilder::ArrayBuilder(std::shared_ptr source, arrow::MemoryPool* pool) + : ChunkedArrayIterator{source} +{ + if (!preFind().ok()) { + throw framework::runtime_error("Cannot pre-find in a source for array-index building"); + } + + std::unique_ptr builder; + auto status = arrow::MakeBuilder(pool, arrow::int32(), &builder); + if (!status.ok()) { + throw framework::runtime_error("Cannot create array for the array-index builder!"); + } + mListBuilder = std::make_unique(pool, std::move(builder)); + mValueBuilder = static_cast(mListBuilder.get())->value_builder(); +} + +void ArrayBuilder::reset(std::shared_ptr source) +{ + static_cast(this)->reset(source); + if (!preFind().ok()) { + throw framework::runtime_error("Cannot pre-find in a source for array-index building"); + } + mValues.clear(); + mIndices.clear(); + mListBuilder->Reset(); +} + +bool ArrayBuilder::find(int idx) +{ + return (std::find(mValues.begin(), mValues.end(), idx) != mValues.end()); +} + +void ArrayBuilder::fill(int idx) { (void)static_cast(mListBuilder.get())->Append(); if (std::find(mValues.begin(), mValues.end(), idx) != mValues.end()) { @@ -226,6 +269,96 @@ void IndexColumnBuilder::fillMulti(int idx) } } +std::shared_ptr ArrayBuilder::result() const +{ + std::shared_ptr array; + auto status = static_cast(mListBuilder.get())->Finish(&array); + if (!status.ok()) { + cannotBuildAnArray(); + } + return std::make_shared(array); +} + +arrow::Status ArrayBuilder::preFind() +{ + arrow::Datum max; + auto options = arrow::compute::ScalarAggregateOptions::Defaults(); + ARROW_ASSIGN_OR_RAISE(max, arrow::compute::CallFunction("max", {mSource}, &options)); + auto maxValue = std::dynamic_pointer_cast(max.scalar())->value; + mIndices.resize(maxValue + 1); + + auto row = 0; + for (auto i = 0; i < mSource->length(); ++i) { + auto v = valueAt(i); + if (v >= 0) { + mValues.emplace_back(v); + mIndices[v].push_back(row); + } + ++row; + } + std::sort(mValues.begin(), mValues.end()); + + return arrow::Status::OK(); +} + +IndexColumnBuilder::IndexColumnBuilder(soa::IndexKind kind, int pos, arrow::MemoryPool* pool, std::shared_ptr source) + : mColumnPos{pos} +{ + switch (kind) { + case soa::IndexKind::IdxSelf: + builder = SelfBuilder{pool}; + break; + case soa::IndexKind::IdxSingle: + builder = SingleBuilder{source, pool}; + break; + case soa::IndexKind::IdxSlice: + builder = SliceBuilder{source, pool}; + break; + case soa::IndexKind::IdxArray: + builder = ArrayBuilder{source, pool}; + break; + default: + cannotCreateIndexBuilder(); + } +} + +void IndexColumnBuilder::reset(std::shared_ptr source) +{ + std::visit( + overloaded{ + [](std::monostate) {}, + [&source](auto& b) { b.reset(source); }}, + builder); +} + +bool IndexColumnBuilder::find(int idx) +{ + return std::visit( + overloaded{ + [](std::monostate) { return false; }, + [&idx](auto& b) { return b.find(idx); }, + }, + builder); +} + +void IndexColumnBuilder::fill(int idx) +{ + std::visit( + overloaded{ + [](std::monostate) {}, + [&idx](auto& b) { b.fill(idx); }}, + builder); +} + +std::shared_ptr IndexColumnBuilder::result() const +{ + return std::visit( + overloaded{ + [](std::monostate) -> std::shared_ptr { return nullptr; }, + [](auto& b) { return b.result(); }}, + builder); +} + std::shared_ptr ChunkedArrayIterator::getCurrentArray() { auto chunk = mSource->chunk(mChunk); @@ -265,14 +398,4 @@ int ChunkedArrayIterator::valueAt(size_t pos) } return *(mCurrent + pos); } - -std::shared_ptr makeArrowTable(const char* label, std::vector>&& columns, std::vector>&& fields) -{ - auto schema = std::make_shared(fields); - schema->WithMetadata( - std::make_shared( - std::vector{std::string{"label"}}, - std::vector{std::string{label}})); - return arrow::Table::Make(schema, columns); -} } // namespace o2::framework diff --git a/Framework/Core/src/IndexJSONHelpers.cxx b/Framework/Core/src/IndexJSONHelpers.cxx new file mode 100644 index 0000000000000..19ae94a4bcd4c --- /dev/null +++ b/Framework/Core/src/IndexJSONHelpers.cxx @@ -0,0 +1,230 @@ +// Copyright 2019-2025 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. +#include "IndexJSONHelpers.h" + +#include +#include +#include +#include +#include + +#include +#include + +namespace o2::framework +{ +namespace +{ +struct IndexRecordsReader : public rapidjson::BaseReaderHandler, IndexRecordsReader> { + using Ch = rapidjson::UTF8<>::Ch; + using SizeType = rapidjson::SizeType; + + enum struct State { + IN_START, + IN_LIST, + IN_RECORD, + IN_ERROR + }; + + std::stack states; + std::ostringstream debug; + + std::vector records; + std::string currentKey; + std::string label; + std::string columnLabel; + o2::soa::IndexKind kind; + int pos; + + IndexRecordsReader() + { + debug << ">>> Start" << std::endl; + states.push(State::IN_START); + } + + bool StartArray() + { + debug << "StartArray()" << std::endl; + if (states.top() == State::IN_START && currentKey.compare("records") == 0) { + states.push(State::IN_LIST); + return true; + } + states.push(State::IN_ERROR); + return false; + } + + bool EndArray(SizeType) + { + debug << "EndArray()" << std::endl; + if (states.top() == State::IN_LIST) { + // records done + states.pop(); + return true; + } + states.push(State::IN_ERROR); + return false; + } + + bool Key(const Ch* str, SizeType, bool) + { + debug << "Key(" << str << ")" << std::endl; + currentKey = str; + if (states.top() == State::IN_START) { + if (currentKey.compare("records") == 0) { + return true; + } + } + + if (states.top() == State::IN_RECORD) { + if (currentKey.compare("label") == 0) { + return true; + } + if (currentKey.compare("column") == 0) { + return true; + } + if (currentKey.compare("kind") == 0) { + return true; + } + if (currentKey.compare("pos") == 0) { + return true; + } + } + + states.push(State::IN_ERROR); + return false; + } + + bool StartObject() + { + debug << "StartObject()" << std::endl; + if (states.top() == State::IN_START) { + return true; + } + + if (states.top() == State::IN_LIST) { + states.push(State::IN_RECORD); + label = ""; + kind = soa::IndexKind::IdxInvalid; + pos = -2; + return true; + } + + states.push(State::IN_ERROR); + return false; + } + + bool EndObject(SizeType) + { + debug << "EndObject()" << std::endl; + if (states.top() == State::IN_RECORD) { + states.pop(); + // add a record + records.emplace_back(label, columnLabel, kind, pos); + return true; + } + + if (states.top() == State::IN_START) { + return true; + } + + states.push(State::IN_ERROR); + return false; + } + + bool Uint(unsigned i) + { + debug << "Uint(" << i << ") passed to Int()" << std::endl; + return Int(i); + } + + bool Int(int i) + { + debug << "Int(" << i << ")" << std::endl; + if (states.top() == State::IN_RECORD) { + if (currentKey.compare("kind") == 0) { + kind = (soa::IndexKind)i; + return true; + } + if (currentKey.compare("pos") == 0) { + pos = i; + return true; + } + } + + states.push(State::IN_ERROR); + return false; + } + + bool String(const Ch* str, SizeType, bool) + { + debug << "String(" << str << ")" << std::endl; + if (states.top() == State::IN_RECORD) { + if (currentKey.compare("label") == 0) { + label = str; + return true; + } + if (currentKey.compare("column") == 0) { + columnLabel = str; + return true; + } + } + + states.push(State::IN_ERROR); + return false; + } +}; +} // namespace + +std::vector IndexJSONHelpers::read(std::istream& s) +{ + rapidjson::Reader reader; + rapidjson::IStreamWrapper isw(s); + IndexRecordsReader irreader; + + bool ok = reader.Parse(isw, irreader); + + if (!ok) { + throw framework::runtime_error_f("Cannot parse serialized index records vector, error: %s at offset: %d", rapidjson::GetParseError_En(reader.GetParseErrorCode()), reader.GetErrorOffset()); + } + return irreader.records; +} + +namespace +{ +void writeRecords(rapidjson::Writer& w, std::vector& records) +{ + for (auto& r : records) { + w.StartObject(); + w.Key("label"); + w.String(r.label.c_str()); + w.Key("column"); + w.String(r.columnLabel.c_str()); + w.Key("kind"); + w.Int((int)r.kind); + w.Key("pos"); + w.Int(r.pos); + w.EndObject(); + } +} +} // namespace + +void IndexJSONHelpers::write(std::ostream& o, std::vector& irs) +{ + rapidjson::OStreamWrapper osw(o); + rapidjson::Writer w(osw); + w.StartObject(); + w.Key("records"); + w.StartArray(); + writeRecords(w, irs); + w.EndArray(); + w.EndObject(); +} +} // namespace o2::framework diff --git a/Framework/Core/src/IndexJSONHelpers.h b/Framework/Core/src/IndexJSONHelpers.h new file mode 100644 index 0000000000000..dee534ae390f5 --- /dev/null +++ b/Framework/Core/src/IndexJSONHelpers.h @@ -0,0 +1,25 @@ +// Copyright 2019-2025 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. +#ifndef INDEXJSONHELPERS_H +#define INDEXJSONHELPERS_H + +#include + +namespace o2::framework +{ +struct IndexJSONHelpers { + static std::vector read(std::istream& s); + static void write(std::ostream& o, std::vector& irs); +}; + +} // namespace o2::framework + +#endif // INDEXJSONHELPERS_H diff --git a/Framework/Core/src/TableBuilder.cxx b/Framework/Core/src/TableBuilder.cxx index c80fef9f0533c..955fe686e12a8 100644 --- a/Framework/Core/src/TableBuilder.cxx +++ b/Framework/Core/src/TableBuilder.cxx @@ -81,94 +81,7 @@ void TableBuilder::validate() const void TableBuilder::setLabel(const char* label) { - mSchema = mSchema->WithMetadata(std::make_shared(std::vector{std::string{"label"}}, std::vector{std::string{label}})); -} - -std::shared_ptr spawnerHelper(std::shared_ptr const& fullTable, std::shared_ptr newSchema, size_t nColumns, - expressions::Projector* projectors, const char* name, - std::shared_ptr& projector) -{ - if (projector == nullptr) { - projector = framework::expressions::createProjectorHelper(nColumns, projectors, fullTable->schema(), newSchema->fields()); - } - - arrow::TableBatchReader reader(*fullTable); - std::shared_ptr batch; - arrow::ArrayVector v; - std::vector chunks; - chunks.resize(nColumns); - std::vector> arrays; - - while (true) { - auto s = reader.ReadNext(&batch); - if (!s.ok()) { - throw runtime_error_f("Cannot read batches from source table to spawn %s: %s", name, s.ToString().c_str()); - } - if (batch == nullptr) { - break; - } - try { - s = projector->Evaluate(*batch, arrow::default_memory_pool(), &v); - if (!s.ok()) { - throw runtime_error_f("Cannot apply projector to source table of %s: %s", name, s.ToString().c_str()); - } - } catch (std::exception& e) { - throw runtime_error_f("Cannot apply projector to source table of %s: exception caught: %s", name, e.what()); - } - - for (auto i = 0U; i < nColumns; ++i) { - chunks[i].emplace_back(v.at(i)); - } - } - - arrays.reserve(nColumns); - for (auto i = 0U; i < nColumns; ++i) { - arrays.push_back(std::make_shared(chunks[i])); - } - - addLabelToSchema(newSchema, name); - return arrow::Table::Make(newSchema, arrays); -} - -std::shared_ptr spawnerHelper(std::shared_ptr const& fullTable, std::shared_ptr newSchema, - const char* name, size_t nColumns, - std::shared_ptr const& projector) -{ - arrow::TableBatchReader reader(*fullTable); - std::shared_ptr batch; - arrow::ArrayVector v; - std::vector chunks; - chunks.resize(nColumns); - std::vector> arrays; - - while (true) { - auto s = reader.ReadNext(&batch); - if (!s.ok()) { - throw runtime_error_f("Cannot read batches from the source table to spawn %s: %s", name, s.ToString().c_str()); - } - if (batch == nullptr) { - break; - } - try { - s = projector->Evaluate(*batch, arrow::default_memory_pool(), &v); - if (!s.ok()) { - throw runtime_error_f("Cannot apply projector to the source table of %s: %s", name, s.ToString().c_str()); - } - } catch (std::exception& e) { - throw runtime_error_f("Cannot apply projector to the source table of %s: exception caught: %s", name, e.what()); - } - - for (auto i = 0U; i < nColumns; ++i) { - chunks[i].emplace_back(v.at(i)); - } - } - - arrays.reserve(nColumns); - for (auto i = 0U; i < nColumns; ++i) { - arrays.push_back(std::make_shared(chunks[i])); - } - - return arrow::Table::Make(newSchema, arrays); + addLabelToSchema(mSchema, label); } } // namespace o2::framework diff --git a/Framework/Core/src/WorkflowHelpers.cxx b/Framework/Core/src/WorkflowHelpers.cxx index b3af5636127f9..61443f5f71616 100644 --- a/Framework/Core/src/WorkflowHelpers.cxx +++ b/Framework/Core/src/WorkflowHelpers.cxx @@ -11,7 +11,6 @@ #include "WorkflowHelpers.h" #include "Framework/AnalysisSupportHelpers.h" #include "Framework/AlgorithmSpec.h" -#include "Framework/AODReaderHelpers.h" #include "Framework/ConfigParamSpec.h" #include "Framework/ConfigParamsHelper.h" #include "Framework/CommonDataProcessors.h" @@ -416,7 +415,7 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext "internal-dpl-aod-index-builder", {}, {}, - readers::AODReaderHelpers::indexBuilderCallback(ac.requestedIDXs), + PluginManager::loadAlgorithmFromPlugin("O2FrameworkOnDemandTablesSupport", "IndexTableBuilder", ctx), // readers::AODReaderHelpers::indexBuilderCallback(ctx), {}}; AnalysisSupportHelpers::addMissingOutputsToBuilder(ac.requestedIDXs, ac.requestedAODs, ac.requestedDYNs, indexBuilder); @@ -436,7 +435,7 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext "internal-dpl-aod-spawner", {}, {}, - readers::AODReaderHelpers::aodSpawnerCallback(ctx), + PluginManager::loadAlgorithmFromPlugin("O2FrameworkOnDemandTablesSupport", "ExtendedTableSpawner", ctx), // readers::AODReaderHelpers::aodSpawnerCallback(ctx), {}}; AnalysisSupportHelpers::addMissingOutputsToSpawner({}, ac.spawnerInputs, ac.requestedAODs, aodSpawner); diff --git a/Framework/Core/test/test_Expressions.cxx b/Framework/Core/test/test_Expressions.cxx index 41be7d53d2276..b4a65fb0c7b48 100644 --- a/Framework/Core/test/test_Expressions.cxx +++ b/Framework/Core/test/test_Expressions.cxx @@ -454,4 +454,33 @@ TEST_CASE("TestExpressionSerialization") ism.str(osm.str()); auto newSchemap = ArrowJSONHelpers::read(ism); REQUIRE(schemap->ToString() == newSchemap->ToString()); + + osm.clear(); + osm.str(""); + ArrowJSONHelpers::write(osm, schemap1); + + ism.clear(); + ism.str(osm.str()); + auto newSchemap1 = ArrowJSONHelpers::read(ism); + REQUIRE(schemap1->ToString() == newSchemap1->ToString()); + + osm.clear(); + osm.str(""); + auto realisticSchema = std::make_shared(o2::soa::createFieldsFromColumns(o2::aod::MetadataTrait>::metadata::persistent_columns_t{})); + ArrowJSONHelpers::write(osm, realisticSchema); + + ism.clear(); + ism.str(osm.str()); + auto restoredSchema = ArrowJSONHelpers::read(ism); + REQUIRE(realisticSchema->ToString() == restoredSchema->ToString()); + + osm.clear(); + osm.str(""); + auto realisticSchema1 = std::make_shared(o2::soa::createFieldsFromColumns(o2::aod::MetadataTrait>::metadata::persistent_columns_t{})); + ArrowJSONHelpers::write(osm, realisticSchema1); + + ism.clear(); + ism.str(osm.str()); + auto restoredSchema1 = ArrowJSONHelpers::read(ism); + REQUIRE(realisticSchema1->ToString() == restoredSchema1->ToString()); } diff --git a/Framework/Core/test/test_IndexBuilder.cxx b/Framework/Core/test/test_IndexBuilder.cxx index ea9f715f20c8a..e357b1164af80 100644 --- a/Framework/Core/test/test_IndexBuilder.cxx +++ b/Framework/Core/test/test_IndexBuilder.cxx @@ -10,7 +10,7 @@ // or submit itself to any jurisdiction. #include "Framework/AnalysisDataModel.h" -#include "Framework/AnalysisTask.h" +#include "../src/IndexJSONHelpers.h" #include using namespace o2::framework; @@ -102,8 +102,11 @@ TEST_CASE("TestIndexBuilder") auto t4 = b4.finalize(); Categorys st4{t4}; - using m1 = MetadataTrait>::metadata; - auto t5 = IndexBuilder::indexBuilder("test1a", {t1, t2, t3, t4}, typename IDXs::persistent_columns_t{}); + auto map = getIndexMapping>::metadata>(); + auto schema1 = o2::aod::MetadataTrait>::metadata::getSchema(); + std::vector builders1; + auto t5 = IndexBuilder::materialize(builders1, {t1, t2, t3, t4}, map, schema1, true); + // auto t5 = IndexBuilder::materialize({t1, t2, t3, t4}, map, schema1, true); REQUIRE(t5->num_rows() == 4); IDXs idxt{t5}; idxt.bindExternalIndices(&st1, &st2, &st3, &st4); @@ -113,8 +116,10 @@ TEST_CASE("TestIndexBuilder") REQUIRE(row.category().pointId() == row.pointId()); } - using m2 = MetadataTrait>::metadata; - auto t6 = IndexBuilder::indexBuilder("test3", {t2, t1, t3, t4}, typename IDX2s::persistent_columns_t{}); + map = getIndexMapping>::metadata>(); + auto schema2 = o2::aod::MetadataTrait>::metadata::getSchema(); + std::vector builders2; + auto t6 = IndexBuilder::materialize(builders2, {t2, t1, t3, t4}, map, schema2, false); REQUIRE(t6->num_rows() == st2.size()); IDX2s idxs{t6}; std::array fs{0, 1, 2, -1, -1, 4, -1}; @@ -212,8 +217,10 @@ TEST_CASE("AdvancedIndexTables") {14, 34}, {8, 31, 42, 46, 58}}}; - using m3 = MetadataTrait>::metadata; - auto t3 = IndexBuilder::indexBuilder("test4", {t1, t2, tc}, typename IDX3s::persistent_columns_t{}); + auto map = getIndexMapping>::metadata>(); + auto schema3 = o2::aod::MetadataTrait>::metadata::getSchema(); + std::vector builders3; + auto t3 = IndexBuilder::materialize(builders3, {t1, t2, tc}, map, schema3, false); REQUIRE(t3->num_rows() == st1.size()); IDX3s idxs{t3}; idxs.bindExternalIndices(&st1, &st2, &st3); @@ -235,3 +242,38 @@ TEST_CASE("AdvancedIndexTables") ++count; } } + +TEST_CASE("IndexRecordsSerialization") +{ + auto map = getIndexMapping>::metadata>(); + + std::stringstream osm; + IndexJSONHelpers::write(osm, map); + + std::stringstream ism; + ism.str(osm.str()); + auto rmap = IndexJSONHelpers::read(ism); + REQUIRE(map == rmap); + + map = getIndexMapping>::metadata>(); + + osm.clear(); + osm.str(""); + IndexJSONHelpers::write(osm, map); + + ism.clear(); + ism.str(osm.str()); + rmap = IndexJSONHelpers::read(ism); + REQUIRE(map == rmap); + + map = getIndexMapping>::metadata>(); + + osm.clear(); + osm.str(""); + IndexJSONHelpers::write(osm, map); + + ism.clear(); + ism.str(osm.str()); + rmap = IndexJSONHelpers::read(ism); + REQUIRE(map == rmap); +}