From 8d45d497090a74bfcceb2607c5f833e973ed663e Mon Sep 17 00:00:00 2001 From: Anton Alkin Date: Tue, 6 May 2025 08:56:19 +0200 Subject: [PATCH 1/5] DPL Analysis: prevent slice cache from updating unnecessarily * Cache setup now only happens after init when process configurables' values are final * Add inline contrained functions to avoid using "overloaded" --- Framework/Core/include/Framework/ASoA.h | 12 +-- .../Core/include/Framework/AnalysisManagers.h | 14 ++- .../Core/include/Framework/AnalysisTask.h | 100 ++++++++++++------ .../Framework/ArrowTableSlicingCache.h | 45 +++++--- .../Core/include/Framework/GroupSlicer.h | 2 +- Framework/Core/src/ASoA.cxx | 2 +- Framework/Core/src/ArrowSupport.cxx | 19 ++-- Framework/Core/src/ArrowTableSlicingCache.cxx | 54 ++++++---- Framework/Core/test/test_GroupSlicer.cxx | 4 +- 9 files changed, 160 insertions(+), 92 deletions(-) diff --git a/Framework/Core/include/Framework/ASoA.h b/Framework/Core/include/Framework/ASoA.h index e098cd89f6d5d..2e478a8ca64a6 100644 --- a/Framework/Core/include/Framework/ASoA.h +++ b/Framework/Core/include/Framework/ASoA.h @@ -1400,10 +1400,10 @@ namespace o2::framework struct PreslicePolicyBase { const std::string binding; - StringPair bindingKey; + Entry bindingKey; bool isMissing() const; - StringPair const& getBindingKey() const; + Entry const& getBindingKey() const; }; struct PreslicePolicySorted : public PreslicePolicyBase { @@ -1428,7 +1428,7 @@ struct PresliceBase : public Policy { const std::string binding; PresliceBase(expressions::BindingNode index_) - : Policy{PreslicePolicyBase{{o2::soa::getLabelFromTypeForKey(std::string{index_.name})}, std::make_pair(o2::soa::getLabelFromTypeForKey(std::string{index_.name}), std::string{index_.name})}, {}} + : Policy{PreslicePolicyBase{{o2::soa::getLabelFromTypeForKey(std::string{index_.name})}, Entry(o2::soa::getLabelFromTypeForKey(std::string{index_.name}), std::string{index_.name})}, {}} { } @@ -1508,7 +1508,7 @@ auto doSliceBy(T const* table, o2::framework::PresliceBase const { if constexpr (OPT) { if (container.isMissing()) { - missingOptionalPreslice(getLabelFromType>().data(), container.bindingKey.second.c_str()); + missingOptionalPreslice(getLabelFromType>().data(), container.bindingKey.key.c_str()); } } uint64_t offset = 0; @@ -1545,7 +1545,7 @@ auto doSliceBy(T const* table, o2::framework::PresliceBase const { if constexpr (OPT) { if (container.isMissing()) { - missingOptionalPreslice(getLabelFromType>().data(), container.bindingKey.second.c_str()); + missingOptionalPreslice(getLabelFromType>().data(), container.bindingKey.key.c_str()); } } auto selection = container.getSliceFor(value); @@ -1574,7 +1574,7 @@ auto doFilteredSliceBy(T const* table, o2::framework::PresliceBase().data(), container.bindingKey.second.c_str()); + missingOptionalPreslice(getLabelFromType().data(), container.bindingKey.key.c_str()); } } uint64_t offset = 0; diff --git a/Framework/Core/include/Framework/AnalysisManagers.h b/Framework/Core/include/Framework/AnalysisManagers.h index 330eaf01f0be4..e310f3eef990c 100644 --- a/Framework/Core/include/Framework/AnalysisManagers.h +++ b/Framework/Core/include/Framework/AnalysisManagers.h @@ -534,39 +534,43 @@ static void setGroupedCombination(C& comb, TG& grouping, std::tuple& asso /// Preslice handling template requires(!is_preslice) -bool registerCache(T&, std::vector&, std::vector&) +bool registerCache(T&, Cache&, Cache&) { return false; } template requires std::same_as -bool registerCache(T& preslice, std::vector& bsks, std::vector&) +bool registerCache(T& preslice, Cache& bsks, Cache&) { if constexpr (T::optional) { if (preslice.binding == "[MISSING]") { return true; } } - auto locate = std::find_if(bsks.begin(), bsks.end(), [&](auto const& entry) { return (entry.first == preslice.bindingKey.first) && (entry.second == preslice.bindingKey.second); }); + auto locate = std::find_if(bsks.begin(), bsks.end(), [&](auto const& entry) { return (entry.binding == preslice.bindingKey.binding) && (entry.key == preslice.bindingKey.key); }); if (locate == bsks.end()) { bsks.emplace_back(preslice.getBindingKey()); + } else if (locate->enabled == false) { + locate->enabled = true; } return true; } template requires std::same_as -bool registerCache(T& preslice, std::vector&, std::vector& bsksU) +bool registerCache(T& preslice, Cache&, Cache& bsksU) { if constexpr (T::optional) { if (preslice.binding == "[MISSING]") { return true; } } - auto locate = std::find_if(bsksU.begin(), bsksU.end(), [&](auto const& entry) { return (entry.first == preslice.bindingKey.first) && (entry.second == preslice.bindingKey.second); }); + auto locate = std::find_if(bsksU.begin(), bsksU.end(), [&](auto const& entry) { return (entry.binding == preslice.bindingKey.binding) && (entry.key == preslice.bindingKey.key); }); if (locate == bsksU.end()) { bsksU.emplace_back(preslice.getBindingKey()); + } else if (locate->enabled == false) { + locate->enabled = true; } return true; } diff --git a/Framework/Core/include/Framework/AnalysisTask.h b/Framework/Core/include/Framework/AnalysisTask.h index c7f3da1948c62..bad5f699cb464 100644 --- a/Framework/Core/include/Framework/AnalysisTask.h +++ b/Framework/Core/include/Framework/AnalysisTask.h @@ -65,21 +65,18 @@ concept is_enumeration = is_enumeration_v>; // the contents of an AnalysisTask... namespace { struct AnalysisDataProcessorBuilder { - template - static void addGroupingCandidates(std::vector& bk, std::vector& bku) + template + static void addGroupingCandidates(Cache& bk, Cache& bku, bool enabled) { - [&bk, &bku](framework::pack) mutable { - std::string key; - if constexpr (soa::is_iterator>) { - key = std::string{"fIndex"} + o2::framework::cutString(soa::getLabelFromType>()); - } - ([&bk, &bku, &key]() mutable { + [&bk, &bku, enabled](framework::pack) mutable { + auto key = std::string{"fIndex"} + o2::framework::cutString(soa::getLabelFromType>()); + ([&bk, &bku, &key, enabled]() mutable { if constexpr (soa::relatedByIndex, std::decay_t>()) { auto binding = soa::getLabelFromTypeForKey>(key); if constexpr (o2::soa::is_smallgroups>) { - framework::updatePairList(bku, binding, key); + framework::updatePairList(bku, binding, key, enabled); } else { - framework::updatePairList(bk, binding, key); + framework::updatePairList(bk, binding, key, enabled); } } }(), @@ -145,34 +142,72 @@ struct AnalysisDataProcessorBuilder { } /// helper to parse the process arguments + template + inline static bool requestInputsFromArgs(T&, std::string const&, std::vector&, std::vector&) + { + return false; + } + template + inline static bool requestInputsFromArgs(T& pc, std::string const& name, std::vector& inputs, std::vector& eis) + { + AnalysisDataProcessorBuilder::inputsFromArgs(pc.process, (name + "/" + pc.name).c_str(), pc.value, inputs, eis); + return true; + } + template + inline static bool requestCacheFromArgs(T&, Cache&, Cache&) + { + return false; + } + template + inline static bool requestCacheFromArgs(T& pc, Cache& bk, Cache& bku) + { + AnalysisDataProcessorBuilder::cacheFromArgs(pc.process, pc.value, bk, bku); + return true; + } /// 1. enumeration (must be the only argument) template - static void inputsFromArgs(R (C::*)(A), const char* /*name*/, bool /*value*/, std::vector& inputs, std::vector&, std::vector&, std::vector&) + static void inputsFromArgs(R (C::*)(A), const char* /*name*/, bool /*value*/, std::vector& inputs, std::vector&)//, Cache&, Cache&) { std::vector inputMetadata; // FIXME: for the moment we do not support begin, end and step. DataSpecUtils::updateInputList(inputs, InputSpec{"enumeration", "DPL", "ENUM", 0, Lifetime::Enumeration, inputMetadata}); } - /// 2. grouping case - 1st argument is an iterator + /// 2. 1st argument is an iterator template - static void inputsFromArgs(R (C::*)(A, Args...), const char* name, bool value, std::vector& inputs, std::vector& eInfos, std::vector& bk, std::vector& bku) + static void inputsFromArgs(R (C::*)(A, Args...), const char* name, bool value, std::vector& inputs, std::vector& eInfos)//, Cache& bk, Cache& bku) requires(std::is_lvalue_reference_v && (std::is_lvalue_reference_v && ...)) { - addGroupingCandidates(bk, bku); constexpr auto hash = o2::framework::TypeIdHelpers::uniqueId(); addInputsAndExpressions::parent_t, Args...>(hash, name, value, inputs, eInfos); } /// 3. generic case template - static void inputsFromArgs(R (C::*)(Args...), const char* name, bool value, std::vector& inputs, std::vector& eInfos, std::vector&, std::vector&) + static void inputsFromArgs(R (C::*)(Args...), const char* name, bool value, std::vector& inputs, std::vector& eInfos)//, Cache&, Cache&) requires(std::is_lvalue_reference_v && ...) { constexpr auto hash = o2::framework::TypeIdHelpers::uniqueId(); addInputsAndExpressions(hash, name, value, inputs, eInfos); } + /// 1. enumeration (no grouping) + template + static void cacheFromArgs(R (C::*)(A), bool, Cache&, Cache&) + { + } + /// 2. iterator (the only grouping case) + template + static void cacheFromArgs(R (C::*)(A, Args...), bool value, Cache& bk, Cache& bku) + { + addGroupingCandidates(bk, bku, value); + } + /// 3. generic case (no grouping) + template + static void cacheFromArgs(R (C::*)(A, Args...), bool, Cache&, Cache&) + { + } + template static auto extractTableFromRecord(InputRecord& record) { @@ -480,8 +515,6 @@ DataProcessorSpec adaptAnalysisTask(ConfigContext const& ctx, Args&&... args) std::vector inputs; std::vector options; std::vector expressionInfos; - std::vector bindingsKeys; - std::vector bindingsKeysUnsorted; /// make sure options and configurables are set before expression infos are created homogeneous_apply_refs([&options, &hash](auto& element) { return analysis_task_parsers::appendOption(options, element); }, *task.get()); @@ -490,23 +523,15 @@ DataProcessorSpec adaptAnalysisTask(ConfigContext const& ctx, Args&&... args) /// parse process functions defined by corresponding configurables if constexpr (requires { &T::process; }) { - AnalysisDataProcessorBuilder::inputsFromArgs(&T::process, "default", true, inputs, expressionInfos, bindingsKeys, bindingsKeysUnsorted); + AnalysisDataProcessorBuilder::inputsFromArgs(&T::process, "default", true, inputs, expressionInfos); } homogeneous_apply_refs( - overloaded{ - [name = name_str, &expressionInfos, &inputs, &bindingsKeys, &bindingsKeysUnsorted](framework::is_process_configurable auto& x) mutable { - // this pushes (argumentIndex,processHash,schemaPtr,nullptr) into expressionInfos for arguments that are Filtered/filtered_iterators - AnalysisDataProcessorBuilder::inputsFromArgs(x.process, (name + "/" + x.name).c_str(), x.value, inputs, expressionInfos, bindingsKeys, bindingsKeysUnsorted); - return true; + [name = name_str, &expressionInfos, &inputs](auto& x) mutable { + // this pushes (argumentIndex, processHash, schemaPtr, nullptr) into expressionInfos for arguments that are Filtered/filtered_iterators + return AnalysisDataProcessorBuilder::requestInputsFromArgs(x, name, inputs, expressionInfos); }, - [](auto&) { - return false; - }}, *task.get()); - // add preslice declarations to slicing cache definition - homogeneous_apply_refs([&bindingsKeys, &bindingsKeysUnsorted](auto& element) { return analysis_task_parsers::registerCache(element, bindingsKeys, bindingsKeysUnsorted); }, *task.get()); - // request base tables for spawnable extended tables and indices to be built // this checks for duplications homogeneous_apply_refs([&inputs](auto& element) { @@ -526,7 +551,12 @@ DataProcessorSpec adaptAnalysisTask(ConfigContext const& ctx, Args&&... args) requiredServices.insert(requiredServices.end(), arrowServices.begin(), arrowServices.end()); homogeneous_apply_refs([&requiredServices](auto& element) { return analysis_task_parsers::addService(requiredServices, element); }, *task.get()); - auto algo = AlgorithmSpec::InitCallback{[task = task, expressionInfos, bindingsKeys, bindingsKeysUnsorted](InitContext& ic) mutable { + auto algo = AlgorithmSpec::InitCallback{[task = task, expressionInfos](InitContext& ic) mutable { + Cache bindingsKeys; + Cache bindingsKeysUnsorted; + // add preslice declarations to slicing cache definition + homogeneous_apply_refs([&bindingsKeys, &bindingsKeysUnsorted](auto& element) { return analysis_task_parsers::registerCache(element, bindingsKeys, bindingsKeysUnsorted); }, *task.get()); + homogeneous_apply_refs([&ic](auto&& element) { return analysis_task_parsers::prepareOption(ic, element); }, *task.get()); homogeneous_apply_refs([&ic](auto&& element) { return analysis_task_parsers::prepareService(ic, element); }, *task.get()); @@ -556,6 +586,16 @@ DataProcessorSpec adaptAnalysisTask(ConfigContext const& ctx, Args&&... args) task->init(ic); } + /// parse process functions to enable requested grouping caches - note that at this state process configurables have their final values + if constexpr (requires { &T::process; }) { + AnalysisDataProcessorBuilder::cacheFromArgs(&T::process, true, bindingsKeys, bindingsKeysUnsorted); + } + homogeneous_apply_refs( + [&bindingsKeys, &bindingsKeysUnsorted](auto& x) mutable { + return AnalysisDataProcessorBuilder::requestCacheFromArgs(x, bindingsKeys, bindingsKeysUnsorted); + }, + *task.get()); + ic.services().get().setCaches(std::move(bindingsKeys)); ic.services().get().setCachesUnsorted(std::move(bindingsKeysUnsorted)); // initialize global caches diff --git a/Framework/Core/include/Framework/ArrowTableSlicingCache.h b/Framework/Core/include/Framework/ArrowTableSlicingCache.h index 2edc23a63ce76..292a67023fc5e 100644 --- a/Framework/Core/include/Framework/ArrowTableSlicingCache.h +++ b/Framework/Core/include/Framework/ArrowTableSlicingCache.h @@ -34,51 +34,64 @@ struct SliceInfoUnsortedPtr { gsl::span getSliceFor(int value) const; }; -using StringPair = std::pair; +struct Entry { + std::string binding; + std::string key; + bool enabled; + + Entry(std::string b, std::string k, bool e = true) + : binding{b}, + key{k}, + enabled{e} + { + } +}; + +using Cache = std::vector; -void updatePairList(std::vector& list, std::string const& binding, std::string const& key); +void updatePairList(Cache& list, std::string const& binding, std::string const& key, bool enabled); struct ArrowTableSlicingCacheDef { constexpr static ServiceKind service_kind = ServiceKind::Global; - std::vector bindingsKeys; - std::vector bindingsKeysUnsorted; + Cache bindingsKeys; + Cache bindingsKeysUnsorted; - void setCaches(std::vector&& bsks); - void setCachesUnsorted(std::vector&& bsks); + void setCaches(Cache&& bsks); + void setCachesUnsorted(Cache&& bsks); }; struct ArrowTableSlicingCache { constexpr static ServiceKind service_kind = ServiceKind::Stream; - std::vector bindingsKeys; + Cache bindingsKeys; std::vector>> values; std::vector>> counts; - std::vector bindingsKeysUnsorted; + Cache bindingsKeysUnsorted; std::vector> valuesUnsorted; std::vector groups; - ArrowTableSlicingCache(std::vector&& bsks, std::vector&& bsksUnsorted = {}); + ArrowTableSlicingCache(Cache&& bsks, Cache&& bsksUnsorted = {}); // set caching information externally - void setCaches(std::vector&& bsks, std::vector&& bsksUnsorted = {}); + void setCaches(Cache&& bsks, Cache&& bsksUnsorted = {}); // update slicing info cache entry (assumes it is already present) arrow::Status updateCacheEntry(int pos, std::shared_ptr const& table); arrow::Status updateCacheEntryUnsorted(int pos, std::shared_ptr const& table); // helper to locate cache position - std::pair getCachePos(StringPair const& bindingKey) const; - int getCachePosSortedFor(StringPair const& bindingKey) const; - int getCachePosUnsortedFor(StringPair const& bindingKey) const; + std::pair getCachePos(Entry const& bindingKey) const; + int getCachePosSortedFor(Entry const& bindingKey) const; + int getCachePosUnsortedFor(Entry const& bindingKey) const; // get slice from cache for a given value - SliceInfoPtr getCacheFor(StringPair const& bindingKey) const; - SliceInfoUnsortedPtr getCacheUnsortedFor(StringPair const& bindingKey) const; + SliceInfoPtr getCacheFor(Entry const& bindingKey) const; + SliceInfoUnsortedPtr getCacheUnsortedFor(Entry const& bindingKey) const; SliceInfoPtr getCacheForPos(int pos) const; SliceInfoUnsortedPtr getCacheUnsortedForPos(int pos) const; - static void validateOrder(StringPair const& bindingKey, std::shared_ptr const& input); + static void validateOrder(Entry const& bindingKey, std::shared_ptr const& input); }; } // namespace o2::framework diff --git a/Framework/Core/include/Framework/GroupSlicer.h b/Framework/Core/include/Framework/GroupSlicer.h index 64b1d863c59e6..b8436314b057e 100644 --- a/Framework/Core/include/Framework/GroupSlicer.h +++ b/Framework/Core/include/Framework/GroupSlicer.h @@ -55,7 +55,7 @@ struct GroupSlicer { { constexpr auto index = framework::has_type_at_v>(associated_pack_t{}); auto binding = o2::soa::getLabelFromTypeForKey>(mIndexColumnName); - auto bk = std::make_pair(binding, mIndexColumnName); + auto bk = Entry(binding, mIndexColumnName); if constexpr (!o2::soa::is_smallgroups>) { if (table.size() == 0) { return; diff --git a/Framework/Core/src/ASoA.cxx b/Framework/Core/src/ASoA.cxx index 810398747de88..5940bc0427225 100644 --- a/Framework/Core/src/ASoA.cxx +++ b/Framework/Core/src/ASoA.cxx @@ -197,7 +197,7 @@ bool PreslicePolicyBase::isMissing() const return binding == "[MISSING]"; } -StringPair const& PreslicePolicyBase::getBindingKey() const +Entry const& PreslicePolicyBase::getBindingKey() const { return bindingKey; } diff --git a/Framework/Core/src/ArrowSupport.cxx b/Framework/Core/src/ArrowSupport.cxx index 12a4c7131e828..3b13e30581f70 100644 --- a/Framework/Core/src/ArrowSupport.cxx +++ b/Framework/Core/src/ArrowSupport.cxx @@ -567,26 +567,27 @@ o2::framework::ServiceSpec ArrowSupport::arrowTableSlicingCacheSpec() .name = "arrow-slicing-cache", .uniqueId = CommonServices::simpleServiceId(), .init = [](ServiceRegistryRef services, DeviceState&, fair::mq::ProgOptions&) { return ServiceHandle{TypeIdHelpers::uniqueId(), - new ArrowTableSlicingCache(std::vector>{services.get().bindingsKeys}, std::vector{services.get().bindingsKeysUnsorted}), + new ArrowTableSlicingCache(Cache{services.get().bindingsKeys}, + Cache{services.get().bindingsKeysUnsorted}), ServiceKind::Stream, typeid(ArrowTableSlicingCache).name()}; }, .configure = CommonServices::noConfiguration(), .preProcessing = [](ProcessingContext& pc, void* service_ptr) { auto* service = static_cast(service_ptr); auto& caches = service->bindingsKeys; - for (auto i = 0; i < caches.size(); ++i) { - if (pc.inputs().getPos(caches[i].first.c_str()) >= 0) { - auto status = service->updateCacheEntry(i, pc.inputs().get(caches[i].first.c_str())->asArrowTable()); + for (auto i = 0u; i < caches.size(); ++i) { + if (caches[i].enabled && pc.inputs().getPos(caches[i].binding.c_str()) >= 0) { + auto status = service->updateCacheEntry(i, pc.inputs().get(caches[i].binding.c_str())->asArrowTable()); if (!status.ok()) { - throw runtime_error_f("Failed to update slice cache for %s/%s", caches[i].first.c_str(), caches[i].second.c_str()); + throw runtime_error_f("Failed to update slice cache for %s/%s", caches[i].binding.c_str(), caches[i].key.c_str()); } } } auto& unsortedCaches = service->bindingsKeysUnsorted; - for (auto i = 0; i < unsortedCaches.size(); ++i) { - if (pc.inputs().getPos(unsortedCaches[i].first.c_str()) >= 0) { - auto status = service->updateCacheEntryUnsorted(i, pc.inputs().get(unsortedCaches[i].first.c_str())->asArrowTable()); + for (auto i = 0u; i < unsortedCaches.size(); ++i) { + if (unsortedCaches[i].enabled && pc.inputs().getPos(unsortedCaches[i].binding.c_str()) >= 0) { + auto status = service->updateCacheEntryUnsorted(i, pc.inputs().get(unsortedCaches[i].binding.c_str())->asArrowTable()); if (!status.ok()) { - throw runtime_error_f("failed to update slice cache (unsorted) for %s/%s", unsortedCaches[i].first.c_str(), unsortedCaches[i].second.c_str()); + throw runtime_error_f("failed to update slice cache (unsorted) for %s/%s", unsortedCaches[i].binding.c_str(), unsortedCaches[i].key.c_str()); } } } }, diff --git a/Framework/Core/src/ArrowTableSlicingCache.cxx b/Framework/Core/src/ArrowTableSlicingCache.cxx index 4b31f96e32fba..99809e2d4c145 100644 --- a/Framework/Core/src/ArrowTableSlicingCache.cxx +++ b/Framework/Core/src/ArrowTableSlicingCache.cxx @@ -19,10 +19,13 @@ namespace o2::framework { -void updatePairList(std::vector& list, std::string const& binding, std::string const& key) +void updatePairList(Cache& list, std::string const& binding, std::string const& key, bool enabled = true) { - if (std::find_if(list.begin(), list.end(), [&binding, &key](auto const& entry) { return (entry.first == binding) && (entry.second == key); }) == list.end()) { - list.emplace_back(binding, key); + auto locate = std::find_if(list.begin(), list.end(), [&binding, &key](auto const& entry) { return (entry.binding == binding) && (entry.key == key); }); + if (locate == list.end()) { + list.emplace_back(binding, key, enabled); + } else if (!locate->enabled && enabled) { + locate->enabled = true; } } @@ -65,17 +68,17 @@ gsl::span SliceInfoUnsortedPtr::getSliceFor(int value) const return {(*groups)[value].data(), (*groups)[value].size()}; } -void ArrowTableSlicingCacheDef::setCaches(std::vector&& bsks) +void ArrowTableSlicingCacheDef::setCaches(Cache&& bsks) { bindingsKeys = bsks; } -void ArrowTableSlicingCacheDef::setCachesUnsorted(std::vector&& bsks) +void ArrowTableSlicingCacheDef::setCachesUnsorted(Cache&& bsks) { bindingsKeysUnsorted = bsks; } -ArrowTableSlicingCache::ArrowTableSlicingCache(std::vector&& bsks, std::vector&& bsksUnsorted) +ArrowTableSlicingCache::ArrowTableSlicingCache(Cache&& bsks, Cache&& bsksUnsorted) : bindingsKeys{bsks}, bindingsKeysUnsorted{bsksUnsorted} { @@ -86,7 +89,7 @@ ArrowTableSlicingCache::ArrowTableSlicingCache(std::vector&& bsks, s groups.resize(bindingsKeysUnsorted.size()); } -void ArrowTableSlicingCache::setCaches(std::vector&& bsks, std::vector&& bsksUnsorted) +void ArrowTableSlicingCache::setCaches(Cache&& bsks, Cache&& bsksUnsorted) { bindingsKeys = bsks; bindingsKeysUnsorted = bsksUnsorted; @@ -107,11 +110,15 @@ arrow::Status ArrowTableSlicingCache::updateCacheEntry(int pos, std::shared_ptr< counts[pos].reset(); return arrow::Status::OK(); } + auto& [b,k,e] = bindingsKeys[pos]; + if (!e) { + throw runtime_error_f("Disabled slicing cache of %s by %s update requested - this should not happen.", b.c_str(), k.c_str()); + } validateOrder(bindingsKeys[pos], table); arrow::Datum value_counts; auto options = arrow::compute::ScalarAggregateOptions::Defaults(); ARROW_ASSIGN_OR_RAISE(value_counts, - arrow::compute::CallFunction("value_counts", {table->GetColumnByName(bindingsKeys[pos].second)}, + arrow::compute::CallFunction("value_counts", {table->GetColumnByName(bindingsKeys[pos].key)}, &options)); auto pair = static_cast(value_counts.array()); values[pos].reset(); @@ -128,7 +135,10 @@ arrow::Status ArrowTableSlicingCache::updateCacheEntryUnsorted(int pos, const st if (table->num_rows() == 0) { return arrow::Status::OK(); } - auto& [b, k] = bindingsKeysUnsorted[pos]; + auto& [b, k, e] = bindingsKeysUnsorted[pos]; + if (!e) { + throw runtime_error_f("Disabled slicing cache of %s by %s update requested - this should not happen.", b.c_str(), k.c_str()); + } auto column = table->GetColumnByName(k); auto row = 0; for (auto iChunk = 0; iChunk < column->num_chunks(); ++iChunk) { @@ -139,7 +149,7 @@ arrow::Status ArrowTableSlicingCache::updateCacheEntryUnsorted(int pos, const st if (std::find(valuesUnsorted[pos].begin(), valuesUnsorted[pos].end(), v) == valuesUnsorted[pos].end()) { valuesUnsorted[pos].push_back(v); } - if (groups[pos].size() <= v) { + if ((int)groups[pos].size() <= v) { groups[pos].resize(v + 1); } (groups[pos])[v].push_back(row); @@ -151,7 +161,7 @@ arrow::Status ArrowTableSlicingCache::updateCacheEntryUnsorted(int pos, const st return arrow::Status::OK(); } -std::pair ArrowTableSlicingCache::getCachePos(const StringPair& bindingKey) const +std::pair ArrowTableSlicingCache::getCachePos(const Entry& bindingKey) const { auto pos = getCachePosSortedFor(bindingKey); if (pos != -1) { @@ -161,41 +171,41 @@ std::pair ArrowTableSlicingCache::getCachePos(const StringPair& bindi if (pos != -1) { return {pos, false}; } - throw runtime_error_f("%s/%s not found neither in sorted or unsorted cache", bindingKey.first.c_str(), bindingKey.second.c_str()); + throw runtime_error_f("%s/%s not found neither in sorted or unsorted cache", bindingKey.binding.c_str(), bindingKey.key.c_str()); } -int ArrowTableSlicingCache::getCachePosSortedFor(StringPair const& bindingKey) const +int ArrowTableSlicingCache::getCachePosSortedFor(Entry const& bindingKey) const { - auto locate = std::find_if(bindingsKeys.begin(), bindingsKeys.end(), [&](StringPair const& bk) { return (bindingKey.first == bk.first) && (bindingKey.second == bk.second); }); + auto locate = std::find_if(bindingsKeys.begin(), bindingsKeys.end(), [&](Entry const& bk) { return (bindingKey.binding == bk.binding) && (bindingKey.key == bk.key); }); if (locate != bindingsKeys.end()) { return std::distance(bindingsKeys.begin(), locate); } return -1; } -int ArrowTableSlicingCache::getCachePosUnsortedFor(StringPair const& bindingKey) const +int ArrowTableSlicingCache::getCachePosUnsortedFor(Entry const& bindingKey) const { - auto locate_unsorted = std::find_if(bindingsKeysUnsorted.begin(), bindingsKeysUnsorted.end(), [&](StringPair const& bk) { return (bindingKey.first == bk.first) && (bindingKey.second == bk.second); }); + auto locate_unsorted = std::find_if(bindingsKeysUnsorted.begin(), bindingsKeysUnsorted.end(), [&](Entry const& bk) { return (bindingKey.binding == bk.binding) && (bindingKey.key == bk.key); }); if (locate_unsorted != bindingsKeysUnsorted.end()) { return std::distance(bindingsKeysUnsorted.begin(), locate_unsorted); } return -1; } -SliceInfoPtr ArrowTableSlicingCache::getCacheFor(StringPair const& bindingKey) const +SliceInfoPtr ArrowTableSlicingCache::getCacheFor(Entry const& bindingKey) const { auto [p, s] = getCachePos(bindingKey); if (!s) { - throw runtime_error_f("%s/%s is found in unsorted cache", bindingKey.first.c_str(), bindingKey.second.c_str()); + throw runtime_error_f("%s/%s is found in unsorted cache", bindingKey.binding.c_str(), bindingKey.key.c_str()); } return getCacheForPos(p); } -SliceInfoUnsortedPtr ArrowTableSlicingCache::getCacheUnsortedFor(const StringPair& bindingKey) const +SliceInfoUnsortedPtr ArrowTableSlicingCache::getCacheUnsortedFor(const Entry& bindingKey) const { auto [p, s] = getCachePos(bindingKey); if (s) { - throw runtime_error_f("%s/%s is found in sorted cache", bindingKey.first.c_str(), bindingKey.second.c_str()); + throw runtime_error_f("%s/%s is found in sorted cache", bindingKey.binding.c_str(), bindingKey.key.c_str()); } return getCacheUnsortedForPos(p); @@ -224,9 +234,9 @@ SliceInfoUnsortedPtr ArrowTableSlicingCache::getCacheUnsortedForPos(int pos) con }; } -void ArrowTableSlicingCache::validateOrder(StringPair const& bindingKey, const std::shared_ptr& input) +void ArrowTableSlicingCache::validateOrder(Entry const& bindingKey, const std::shared_ptr& input) { - auto const& [target, key] = bindingKey; + auto const& [target, key, enabled] = bindingKey; auto column = input->GetColumnByName(key); auto array0 = static_cast>(column->chunk(0)->data()); int32_t prev = 0; diff --git a/Framework/Core/test/test_GroupSlicer.cxx b/Framework/Core/test/test_GroupSlicer.cxx index 161939141e790..091c21eeae229 100644 --- a/Framework/Core/test/test_GroupSlicer.cxx +++ b/Framework/Core/test/test_GroupSlicer.cxx @@ -683,7 +683,7 @@ TEST_CASE("ArrowDirectSlicing") std::vector slices; std::vector offsts; - auto bk = std::make_pair(soa::getLabelFromType(), "fID"); + auto bk = Entry(soa::getLabelFromType(), "fID"); ArrowTableSlicingCache cache({bk}); auto s = cache.updateCacheEntry(0, {evtTable}); auto lcache = cache.getCacheFor(bk); @@ -741,7 +741,7 @@ TEST_CASE("TestSlicingException") } auto evtTable = builderE.finalize(); - auto bk = std::make_pair(soa::getLabelFromType(), "fID"); + auto bk = Entry(soa::getLabelFromType(), "fID"); ArrowTableSlicingCache cache({bk}); try { From 82947173e0c3e8bd898a7968e8587643f76d090a Mon Sep 17 00:00:00 2001 From: Anton Alkin Date: Fri, 9 May 2025 14:14:38 +0200 Subject: [PATCH 2/5] add error messages for unexpected situations --- Framework/Core/src/ArrowTableSlicingCache.cxx | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/Framework/Core/src/ArrowTableSlicingCache.cxx b/Framework/Core/src/ArrowTableSlicingCache.cxx index 99809e2d4c145..e2e355949f52d 100644 --- a/Framework/Core/src/ArrowTableSlicingCache.cxx +++ b/Framework/Core/src/ArrowTableSlicingCache.cxx @@ -112,7 +112,7 @@ arrow::Status ArrowTableSlicingCache::updateCacheEntry(int pos, std::shared_ptr< } auto& [b,k,e] = bindingsKeys[pos]; if (!e) { - throw runtime_error_f("Disabled slicing cache of %s by %s update requested - this should not happen.", b.c_str(), k.c_str()); + throw runtime_error_f("Disabled cache %s/%s update requested", b.c_str(), k.c_str()); } validateOrder(bindingsKeys[pos], table); arrow::Datum value_counts; @@ -137,7 +137,7 @@ arrow::Status ArrowTableSlicingCache::updateCacheEntryUnsorted(int pos, const st } auto& [b, k, e] = bindingsKeysUnsorted[pos]; if (!e) { - throw runtime_error_f("Disabled slicing cache of %s by %s update requested - this should not happen.", b.c_str(), k.c_str()); + throw runtime_error_f("Disabled unsorted cache %s/%s update requested", b.c_str(), k.c_str()); } auto column = table->GetColumnByName(k); auto row = 0; @@ -197,6 +197,9 @@ SliceInfoPtr ArrowTableSlicingCache::getCacheFor(Entry const& bindingKey) const if (!s) { throw runtime_error_f("%s/%s is found in unsorted cache", bindingKey.binding.c_str(), bindingKey.key.c_str()); } + if(!bindingsKeys[p].enabled) { + throw runtime_error_f("Disabled cache %s/%s is requested", bindingKey.binding.c_str(), bindingKey.key.c_str()); + } return getCacheForPos(p); } @@ -207,6 +210,9 @@ SliceInfoUnsortedPtr ArrowTableSlicingCache::getCacheUnsortedFor(const Entry& bi if (s) { throw runtime_error_f("%s/%s is found in sorted cache", bindingKey.binding.c_str(), bindingKey.key.c_str()); } + if(!bindingsKeys[p].enabled) { + throw runtime_error_f("Disabled unsorted cache %s/%s is requested", bindingKey.binding.c_str(), bindingKey.key.c_str()); + } return getCacheUnsortedForPos(p); } From 55a1cda4f5266dbe4ac1711750e09db0660ff4e1 Mon Sep 17 00:00:00 2001 From: ALICE Action Bot Date: Fri, 9 May 2025 12:17:38 +0000 Subject: [PATCH 3/5] Please consider the following formatting changes --- Framework/Core/include/Framework/AnalysisTask.h | 14 +++++++------- Framework/Core/src/ArrowTableSlicingCache.cxx | 6 +++--- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/Framework/Core/include/Framework/AnalysisTask.h b/Framework/Core/include/Framework/AnalysisTask.h index bad5f699cb464..74c62840b14dc 100644 --- a/Framework/Core/include/Framework/AnalysisTask.h +++ b/Framework/Core/include/Framework/AnalysisTask.h @@ -166,7 +166,7 @@ struct AnalysisDataProcessorBuilder { } /// 1. enumeration (must be the only argument) template - static void inputsFromArgs(R (C::*)(A), const char* /*name*/, bool /*value*/, std::vector& inputs, std::vector&)//, Cache&, Cache&) + static void inputsFromArgs(R (C::*)(A), const char* /*name*/, bool /*value*/, std::vector& inputs, std::vector&) //, Cache&, Cache&) { std::vector inputMetadata; // FIXME: for the moment we do not support begin, end and step. @@ -175,7 +175,7 @@ struct AnalysisDataProcessorBuilder { /// 2. 1st argument is an iterator template - static void inputsFromArgs(R (C::*)(A, Args...), const char* name, bool value, std::vector& inputs, std::vector& eInfos)//, Cache& bk, Cache& bku) + static void inputsFromArgs(R (C::*)(A, Args...), const char* name, bool value, std::vector& inputs, std::vector& eInfos) //, Cache& bk, Cache& bku) requires(std::is_lvalue_reference_v && (std::is_lvalue_reference_v && ...)) { constexpr auto hash = o2::framework::TypeIdHelpers::uniqueId(); @@ -184,7 +184,7 @@ struct AnalysisDataProcessorBuilder { /// 3. generic case template - static void inputsFromArgs(R (C::*)(Args...), const char* name, bool value, std::vector& inputs, std::vector& eInfos)//, Cache&, Cache&) + static void inputsFromArgs(R (C::*)(Args...), const char* name, bool value, std::vector& inputs, std::vector& eInfos) //, Cache&, Cache&) requires(std::is_lvalue_reference_v && ...) { constexpr auto hash = o2::framework::TypeIdHelpers::uniqueId(); @@ -526,10 +526,10 @@ DataProcessorSpec adaptAnalysisTask(ConfigContext const& ctx, Args&&... args) AnalysisDataProcessorBuilder::inputsFromArgs(&T::process, "default", true, inputs, expressionInfos); } homogeneous_apply_refs( - [name = name_str, &expressionInfos, &inputs](auto& x) mutable { - // this pushes (argumentIndex, processHash, schemaPtr, nullptr) into expressionInfos for arguments that are Filtered/filtered_iterators - return AnalysisDataProcessorBuilder::requestInputsFromArgs(x, name, inputs, expressionInfos); - }, + [name = name_str, &expressionInfos, &inputs](auto& x) mutable { + // this pushes (argumentIndex, processHash, schemaPtr, nullptr) into expressionInfos for arguments that are Filtered/filtered_iterators + return AnalysisDataProcessorBuilder::requestInputsFromArgs(x, name, inputs, expressionInfos); + }, *task.get()); // request base tables for spawnable extended tables and indices to be built diff --git a/Framework/Core/src/ArrowTableSlicingCache.cxx b/Framework/Core/src/ArrowTableSlicingCache.cxx index e2e355949f52d..89e9536661ce0 100644 --- a/Framework/Core/src/ArrowTableSlicingCache.cxx +++ b/Framework/Core/src/ArrowTableSlicingCache.cxx @@ -110,7 +110,7 @@ arrow::Status ArrowTableSlicingCache::updateCacheEntry(int pos, std::shared_ptr< counts[pos].reset(); return arrow::Status::OK(); } - auto& [b,k,e] = bindingsKeys[pos]; + auto& [b, k, e] = bindingsKeys[pos]; if (!e) { throw runtime_error_f("Disabled cache %s/%s update requested", b.c_str(), k.c_str()); } @@ -197,7 +197,7 @@ SliceInfoPtr ArrowTableSlicingCache::getCacheFor(Entry const& bindingKey) const if (!s) { throw runtime_error_f("%s/%s is found in unsorted cache", bindingKey.binding.c_str(), bindingKey.key.c_str()); } - if(!bindingsKeys[p].enabled) { + if (!bindingsKeys[p].enabled) { throw runtime_error_f("Disabled cache %s/%s is requested", bindingKey.binding.c_str(), bindingKey.key.c_str()); } @@ -210,7 +210,7 @@ SliceInfoUnsortedPtr ArrowTableSlicingCache::getCacheUnsortedFor(const Entry& bi if (s) { throw runtime_error_f("%s/%s is found in sorted cache", bindingKey.binding.c_str(), bindingKey.key.c_str()); } - if(!bindingsKeys[p].enabled) { + if (!bindingsKeys[p].enabled) { throw runtime_error_f("Disabled unsorted cache %s/%s is requested", bindingKey.binding.c_str(), bindingKey.key.c_str()); } From e4e58a07d7e20626918d9890198a461eddb9b7c4 Mon Sep 17 00:00:00 2001 From: Anton Alkin Date: Sat, 10 May 2025 08:33:28 +0200 Subject: [PATCH 4/5] fixup! add error messages for unexpected situations --- Framework/Core/src/ArrowTableSlicingCache.cxx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Framework/Core/src/ArrowTableSlicingCache.cxx b/Framework/Core/src/ArrowTableSlicingCache.cxx index 89e9536661ce0..ef944bb2eac15 100644 --- a/Framework/Core/src/ArrowTableSlicingCache.cxx +++ b/Framework/Core/src/ArrowTableSlicingCache.cxx @@ -210,7 +210,7 @@ SliceInfoUnsortedPtr ArrowTableSlicingCache::getCacheUnsortedFor(const Entry& bi if (s) { throw runtime_error_f("%s/%s is found in sorted cache", bindingKey.binding.c_str(), bindingKey.key.c_str()); } - if (!bindingsKeys[p].enabled) { + if(!bindingsKeysUnsorted[p].enabled) { throw runtime_error_f("Disabled unsorted cache %s/%s is requested", bindingKey.binding.c_str(), bindingKey.key.c_str()); } From 572ee7ff32a2e318551d16ddc7bfa2d80ab50212 Mon Sep 17 00:00:00 2001 From: ALICE Action Bot Date: Sat, 10 May 2025 06:35:04 +0000 Subject: [PATCH 5/5] Please consider the following formatting changes --- Framework/Core/src/ArrowTableSlicingCache.cxx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Framework/Core/src/ArrowTableSlicingCache.cxx b/Framework/Core/src/ArrowTableSlicingCache.cxx index ef944bb2eac15..e001e293c4733 100644 --- a/Framework/Core/src/ArrowTableSlicingCache.cxx +++ b/Framework/Core/src/ArrowTableSlicingCache.cxx @@ -210,7 +210,7 @@ SliceInfoUnsortedPtr ArrowTableSlicingCache::getCacheUnsortedFor(const Entry& bi if (s) { throw runtime_error_f("%s/%s is found in sorted cache", bindingKey.binding.c_str(), bindingKey.key.c_str()); } - if(!bindingsKeysUnsorted[p].enabled) { + if (!bindingsKeysUnsorted[p].enabled) { throw runtime_error_f("Disabled unsorted cache %s/%s is requested", bindingKey.binding.c_str(), bindingKey.key.c_str()); }